一、CancellationTokenSource 概述
CancellationTokenSource 用于生成 CancellationTokens,以用于监视取消请求。当取消令牌被请求时,任何等待它的操作都会收到该请求。CancellationTokenSource 可以用于应用程序中的任何工作单元,以便在取消请求发出时停止正在进行的工作单元。
二、如何使用CancellationTokenSource
要使用 CancellationTokenSource,需要先实例化它,然后将其 CancellationToken 属性传递给需要实现取消逻辑的方法中。下面是一个使用 CancellationTokenSource 取消一个任务的示例。
public class TaskCancellationExample { public void Run(CancellationToken token) { Task.Run(() => { while (true) { token.ThrowIfCancellationRequested(); // do something } }, token); } } public class Program { static void Main() { CancellationTokenSource source = new CancellationTokenSource(); TaskCancellationExample example = new TaskCancellationExample(); example.Run(source.Token); Console.ReadLine(); source.Cancel(); } }
以上代码中,我们首先实例化了一个 CancellationTokenSource 对象,然后通过调用其 Token 属性,将取消令牌传递给了 Run 方法中的任务。接着在主方法中,我们通过调用 CancellationTokenSource 的 Cancel 方法发出取消请求,从而停止正在进行的任务。
三、使用 CancellationTokenSource 实现限制并发执行的任务
在某些情况下,我们需要限制并发执行的任务数量,这时我们可以使用 CancellationTokenSource 来实现。
示例代码如下:
public class ConcurrentTaskExample { private readonly CancellationTokenSource _cancellationTokenSource; private readonly int _maxDegreeOfParallelism; private readonly BlockingCollection _blockingCollection; private readonly List _tasks; public ConcurrentTaskExample(int maxDegreeOfParallelism) { _cancellationTokenSource = new CancellationTokenSource(); _maxDegreeOfParallelism = maxDegreeOfParallelism; _blockingCollection = new BlockingCollection(maxDegreeOfParallelism); _tasks = new List(); for (int i = 0; i < maxDegreeOfParallelism; i++) { _blockingCollection.Add(new ManualResetEventSlim(false)); } } public void Execute(IEnumerable actions) { Task.Run(() => { foreach (var action in actions) { var resetEvent = _blockingCollection.Take(_cancellationTokenSource.Token); var task = Task.Run(() => { try { action.Invoke(); } finally { resetEvent.Set(); } }); _tasks.Add(task); } Task.WaitAll(_tasks.ToArray()); }); } public void Cancel() { _cancellationTokenSource.Cancel(); } public void Wait() { ManualResetEventSlim.WaitAll(_blockingCollection.ToArray()); } } public class Program { static void Main() { ConcurrentTaskExample example = new ConcurrentTaskExample(2); example.Execute(new List { () => Thread.Sleep(5000), () => { Console.WriteLine("task1 completed"); }, () => Thread.Sleep(1000), () => { Console.WriteLine("task2 completed"); }, () => Thread.Sleep(2000), () => { Console.WriteLine("task3 completed"); }, () => Thread.Sleep(1000), () => { Console.WriteLine("task4 completed"); }, () => Thread.Sleep(3000), () => { Console.WriteLine("task5 completed"); }, }); example.Wait(); example.Cancel(); } }
以上代码中,我们定义了一个 ConcurrentTaskExample 类,它接受一个最大并发执行任务数(maxDegreeOfParallelism),并维护了一个 ManualResetEventSlim 实例的 BlockingCollection(_blockingCollection)和一组 Task(_tasks)。由于我们限制最大并发执行数量,因此当有任务可以执行时,需要从 BlockingCollection 中获取一个 ManualResetEventSlim 对象,这里我们调用 _blockingCollection.Take 方法来实现。当一个任务完成执行后,需要将该 ManualResetEventSlim 设置为有信号状态,以便等待中的任务可以继续执行。
在 Execute 方法中,我们首先通过 Task.Run 来启动一个循环,用于一直取出任务并执行,直到所有的任务完成。我们将每个任务封装在一个 Task.Run 中,从而可以监视其执行状态。在取出任务时,我们使用 _blockingCollection.Take 方法来等待有可用的 ManualResetEventSlim 对象。然后我们重新封装一个新的 Task,任务完成后,将 ManualResetEventSlim 设置为有信号状态。每个任务完成后,我们都将其添加到 _tasks 集合中,以便我们可以在 Execute 方法结束时等待它们执行完成。
在主方法中,我们创建了一个 ConcurrentTaskExample 实例,它使用了 2 个并发执行的任务。在执行 Execute 方法时,我们传递了一组 Action,每个 Action 代表一个任务。最后,我们在 Wait 方法中等待所有任务完成,并在完成后通过调用 CancellationTokenSource 的 Cancel 方法来取消任务。由于使用了 CancellationTokenSource,我们可以在任何时候通过调用该对象的 Cancel 方法来取消任务。
原创文章,作者:小蓝,如若转载,请注明出处:https://www.506064.com/n/286841.html