一、CancellationTokenSource 概述
CancellationTokenSource 用于生成 CancellationTokens,以用于监视取消请求。当取消令牌被请求时,任何等待它的操作都会收到该请求。CancellationTokenSource 可以用于应用程序中的任何工作单元,以便在取消请求发出时停止正在进行的工作单元。
二、如何使用CancellationTokenSource
要使用 CancellationTokenSource,需要先实例化它,然后将其 CancellationToken 属性传递给需要实现取消逻辑的方法中。下面是一个使用 CancellationTokenSource 取消一个任务的示例。
public class TaskCancellationExample
{
public void Run(CancellationToken token)
{
Task.Run(() =>
{
while (true)
{
token.ThrowIfCancellationRequested();
// do something
}
}, token);
}
}
public class Program
{
static void Main()
{
CancellationTokenSource source = new CancellationTokenSource();
TaskCancellationExample example = new TaskCancellationExample();
example.Run(source.Token);
Console.ReadLine();
source.Cancel();
}
}
以上代码中,我们首先实例化了一个 CancellationTokenSource 对象,然后通过调用其 Token 属性,将取消令牌传递给了 Run 方法中的任务。接着在主方法中,我们通过调用 CancellationTokenSource 的 Cancel 方法发出取消请求,从而停止正在进行的任务。
三、使用 CancellationTokenSource 实现限制并发执行的任务
在某些情况下,我们需要限制并发执行的任务数量,这时我们可以使用 CancellationTokenSource 来实现。
示例代码如下:
public class ConcurrentTaskExample
{
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly int _maxDegreeOfParallelism;
private readonly BlockingCollection _blockingCollection;
private readonly List _tasks;
public ConcurrentTaskExample(int maxDegreeOfParallelism)
{
_cancellationTokenSource = new CancellationTokenSource();
_maxDegreeOfParallelism = maxDegreeOfParallelism;
_blockingCollection = new BlockingCollection(maxDegreeOfParallelism);
_tasks = new List();
for (int i = 0; i < maxDegreeOfParallelism; i++)
{
_blockingCollection.Add(new ManualResetEventSlim(false));
}
}
public void Execute(IEnumerable actions)
{
Task.Run(() =>
{
foreach (var action in actions)
{
var resetEvent = _blockingCollection.Take(_cancellationTokenSource.Token);
var task = Task.Run(() =>
{
try
{
action.Invoke();
}
finally
{
resetEvent.Set();
}
});
_tasks.Add(task);
}
Task.WaitAll(_tasks.ToArray());
});
}
public void Cancel()
{
_cancellationTokenSource.Cancel();
}
public void Wait()
{
ManualResetEventSlim.WaitAll(_blockingCollection.ToArray());
}
}
public class Program
{
static void Main()
{
ConcurrentTaskExample example = new ConcurrentTaskExample(2);
example.Execute(new List
{
() => Thread.Sleep(5000),
() => { Console.WriteLine("task1 completed"); },
() => Thread.Sleep(1000),
() => { Console.WriteLine("task2 completed"); },
() => Thread.Sleep(2000),
() => { Console.WriteLine("task3 completed"); },
() => Thread.Sleep(1000),
() => { Console.WriteLine("task4 completed"); },
() => Thread.Sleep(3000),
() => { Console.WriteLine("task5 completed"); },
});
example.Wait();
example.Cancel();
}
}
以上代码中,我们定义了一个 ConcurrentTaskExample 类,它接受一个最大并发执行任务数(maxDegreeOfParallelism),并维护了一个 ManualResetEventSlim 实例的 BlockingCollection(_blockingCollection)和一组 Task(_tasks)。由于我们限制最大并发执行数量,因此当有任务可以执行时,需要从 BlockingCollection 中获取一个 ManualResetEventSlim 对象,这里我们调用 _blockingCollection.Take 方法来实现。当一个任务完成执行后,需要将该 ManualResetEventSlim 设置为有信号状态,以便等待中的任务可以继续执行。
在 Execute 方法中,我们首先通过 Task.Run 来启动一个循环,用于一直取出任务并执行,直到所有的任务完成。我们将每个任务封装在一个 Task.Run 中,从而可以监视其执行状态。在取出任务时,我们使用 _blockingCollection.Take 方法来等待有可用的 ManualResetEventSlim 对象。然后我们重新封装一个新的 Task,任务完成后,将 ManualResetEventSlim 设置为有信号状态。每个任务完成后,我们都将其添加到 _tasks 集合中,以便我们可以在 Execute 方法结束时等待它们执行完成。
在主方法中,我们创建了一个 ConcurrentTaskExample 实例,它使用了 2 个并发执行的任务。在执行 Execute 方法时,我们传递了一组 Action,每个 Action 代表一个任务。最后,我们在 Wait 方法中等待所有任务完成,并在完成后通过调用 CancellationTokenSource 的 Cancel 方法来取消任务。由于使用了 CancellationTokenSource,我们可以在任何时候通过调用该对象的 Cancel 方法来取消任务。
原创文章,作者:小蓝,如若转载,请注明出处:https://www.506064.com/n/286841.html
微信扫一扫
支付宝扫一扫