一、CancellationTokenSource 概述
CancellationTokenSource 用於生成 CancellationTokens,以用於監視取消請求。當取消令牌被請求時,任何等待它的操作都會收到該請求。CancellationTokenSource 可以用於應用程序中的任何工作單元,以便在取消請求發出時停止正在進行的工作單元。
二、如何使用CancellationTokenSource
要使用 CancellationTokenSource,需要先實例化它,然後將其 CancellationToken 屬性傳遞給需要實現取消邏輯的方法中。下面是一個使用 CancellationTokenSource 取消一個任務的示例。
public class TaskCancellationExample { public void Run(CancellationToken token) { Task.Run(() => { while (true) { token.ThrowIfCancellationRequested(); // do something } }, token); } } public class Program { static void Main() { CancellationTokenSource source = new CancellationTokenSource(); TaskCancellationExample example = new TaskCancellationExample(); example.Run(source.Token); Console.ReadLine(); source.Cancel(); } }
以上代碼中,我們首先實例化了一個 CancellationTokenSource 對象,然後通過調用其 Token 屬性,將取消令牌傳遞給了 Run 方法中的任務。接着在主方法中,我們通過調用 CancellationTokenSource 的 Cancel 方法發出取消請求,從而停止正在進行的任務。
三、使用 CancellationTokenSource 實現限制並發執行的任務
在某些情況下,我們需要限制並發執行的任務數量,這時我們可以使用 CancellationTokenSource 來實現。
示例代碼如下:
public class ConcurrentTaskExample { private readonly CancellationTokenSource _cancellationTokenSource; private readonly int _maxDegreeOfParallelism; private readonly BlockingCollection _blockingCollection; private readonly List _tasks; public ConcurrentTaskExample(int maxDegreeOfParallelism) { _cancellationTokenSource = new CancellationTokenSource(); _maxDegreeOfParallelism = maxDegreeOfParallelism; _blockingCollection = new BlockingCollection(maxDegreeOfParallelism); _tasks = new List(); for (int i = 0; i < maxDegreeOfParallelism; i++) { _blockingCollection.Add(new ManualResetEventSlim(false)); } } public void Execute(IEnumerable actions) { Task.Run(() => { foreach (var action in actions) { var resetEvent = _blockingCollection.Take(_cancellationTokenSource.Token); var task = Task.Run(() => { try { action.Invoke(); } finally { resetEvent.Set(); } }); _tasks.Add(task); } Task.WaitAll(_tasks.ToArray()); }); } public void Cancel() { _cancellationTokenSource.Cancel(); } public void Wait() { ManualResetEventSlim.WaitAll(_blockingCollection.ToArray()); } } public class Program { static void Main() { ConcurrentTaskExample example = new ConcurrentTaskExample(2); example.Execute(new List { () => Thread.Sleep(5000), () => { Console.WriteLine("task1 completed"); }, () => Thread.Sleep(1000), () => { Console.WriteLine("task2 completed"); }, () => Thread.Sleep(2000), () => { Console.WriteLine("task3 completed"); }, () => Thread.Sleep(1000), () => { Console.WriteLine("task4 completed"); }, () => Thread.Sleep(3000), () => { Console.WriteLine("task5 completed"); }, }); example.Wait(); example.Cancel(); } }
以上代碼中,我們定義了一個 ConcurrentTaskExample 類,它接受一個最大並發執行任務數(maxDegreeOfParallelism),並維護了一個 ManualResetEventSlim 實例的 BlockingCollection(_blockingCollection)和一組 Task(_tasks)。由於我們限制最大並發執行數量,因此當有任務可以執行時,需要從 BlockingCollection 中獲取一個 ManualResetEventSlim 對象,這裡我們調用 _blockingCollection.Take 方法來實現。當一個任務完成執行後,需要將該 ManualResetEventSlim 設置為有信號狀態,以便等待中的任務可以繼續執行。
在 Execute 方法中,我們首先通過 Task.Run 來啟動一個循環,用於一直取出任務並執行,直到所有的任務完成。我們將每個任務封裝在一個 Task.Run 中,從而可以監視其執行狀態。在取出任務時,我們使用 _blockingCollection.Take 方法來等待有可用的 ManualResetEventSlim 對象。然後我們重新封裝一個新的 Task,任務完成後,將 ManualResetEventSlim 設置為有信號狀態。每個任務完成後,我們都將其添加到 _tasks 集合中,以便我們可以在 Execute 方法結束時等待它們執行完成。
在主方法中,我們創建了一個 ConcurrentTaskExample 實例,它使用了 2 個並發執行的任務。在執行 Execute 方法時,我們傳遞了一組 Action,每個 Action 代表一個任務。最後,我們在 Wait 方法中等待所有任務完成,並在完成後通過調用 CancellationTokenSource 的 Cancel 方法來取消任務。由於使用了 CancellationTokenSource,我們可以在任何時候通過調用該對象的 Cancel 方法來取消任務。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/286841.html