一、CancellationTokenSource 概述
CancellationTokenSource 用於生成 CancellationTokens,以用於監視取消請求。當取消令牌被請求時,任何等待它的操作都會收到該請求。CancellationTokenSource 可以用於應用程序中的任何工作單元,以便在取消請求發出時停止正在進行的工作單元。
二、如何使用CancellationTokenSource
要使用 CancellationTokenSource,需要先實例化它,然後將其 CancellationToken 屬性傳遞給需要實現取消邏輯的方法中。下面是一個使用 CancellationTokenSource 取消一個任務的示例。
public class TaskCancellationExample
{
public void Run(CancellationToken token)
{
Task.Run(() =>
{
while (true)
{
token.ThrowIfCancellationRequested();
// do something
}
}, token);
}
}
public class Program
{
static void Main()
{
CancellationTokenSource source = new CancellationTokenSource();
TaskCancellationExample example = new TaskCancellationExample();
example.Run(source.Token);
Console.ReadLine();
source.Cancel();
}
}
以上代碼中,我們首先實例化了一個 CancellationTokenSource 對象,然後通過調用其 Token 屬性,將取消令牌傳遞給了 Run 方法中的任務。接著在主方法中,我們通過調用 CancellationTokenSource 的 Cancel 方法發出取消請求,從而停止正在進行的任務。
三、使用 CancellationTokenSource 實現限制並發執行的任務
在某些情況下,我們需要限制並發執行的任務數量,這時我們可以使用 CancellationTokenSource 來實現。
示例代碼如下:
public class ConcurrentTaskExample
{
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly int _maxDegreeOfParallelism;
private readonly BlockingCollection _blockingCollection;
private readonly List _tasks;
public ConcurrentTaskExample(int maxDegreeOfParallelism)
{
_cancellationTokenSource = new CancellationTokenSource();
_maxDegreeOfParallelism = maxDegreeOfParallelism;
_blockingCollection = new BlockingCollection(maxDegreeOfParallelism);
_tasks = new List();
for (int i = 0; i < maxDegreeOfParallelism; i++)
{
_blockingCollection.Add(new ManualResetEventSlim(false));
}
}
public void Execute(IEnumerable actions)
{
Task.Run(() =>
{
foreach (var action in actions)
{
var resetEvent = _blockingCollection.Take(_cancellationTokenSource.Token);
var task = Task.Run(() =>
{
try
{
action.Invoke();
}
finally
{
resetEvent.Set();
}
});
_tasks.Add(task);
}
Task.WaitAll(_tasks.ToArray());
});
}
public void Cancel()
{
_cancellationTokenSource.Cancel();
}
public void Wait()
{
ManualResetEventSlim.WaitAll(_blockingCollection.ToArray());
}
}
public class Program
{
static void Main()
{
ConcurrentTaskExample example = new ConcurrentTaskExample(2);
example.Execute(new List
{
() => Thread.Sleep(5000),
() => { Console.WriteLine("task1 completed"); },
() => Thread.Sleep(1000),
() => { Console.WriteLine("task2 completed"); },
() => Thread.Sleep(2000),
() => { Console.WriteLine("task3 completed"); },
() => Thread.Sleep(1000),
() => { Console.WriteLine("task4 completed"); },
() => Thread.Sleep(3000),
() => { Console.WriteLine("task5 completed"); },
});
example.Wait();
example.Cancel();
}
}
以上代碼中,我們定義了一個 ConcurrentTaskExample 類,它接受一個最大並發執行任務數(maxDegreeOfParallelism),並維護了一個 ManualResetEventSlim 實例的 BlockingCollection(_blockingCollection)和一組 Task(_tasks)。由於我們限制最大並發執行數量,因此當有任務可以執行時,需要從 BlockingCollection 中獲取一個 ManualResetEventSlim 對象,這裡我們調用 _blockingCollection.Take 方法來實現。當一個任務完成執行後,需要將該 ManualResetEventSlim 設置為有信號狀態,以便等待中的任務可以繼續執行。
在 Execute 方法中,我們首先通過 Task.Run 來啟動一個循環,用於一直取出任務並執行,直到所有的任務完成。我們將每個任務封裝在一個 Task.Run 中,從而可以監視其執行狀態。在取出任務時,我們使用 _blockingCollection.Take 方法來等待有可用的 ManualResetEventSlim 對象。然後我們重新封裝一個新的 Task,任務完成後,將 ManualResetEventSlim 設置為有信號狀態。每個任務完成後,我們都將其添加到 _tasks 集合中,以便我們可以在 Execute 方法結束時等待它們執行完成。
在主方法中,我們創建了一個 ConcurrentTaskExample 實例,它使用了 2 個並發執行的任務。在執行 Execute 方法時,我們傳遞了一組 Action,每個 Action 代表一個任務。最後,我們在 Wait 方法中等待所有任務完成,並在完成後通過調用 CancellationTokenSource 的 Cancel 方法來取消任務。由於使用了 CancellationTokenSource,我們可以在任何時候通過調用該對象的 Cancel 方法來取消任務。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/286841.html
微信掃一掃
支付寶掃一掃