使用CancellationTokenSource實現並發控制

一、CancellationTokenSource 概述

CancellationTokenSource 用於生成 CancellationTokens,以用於監視取消請求。當取消令牌被請求時,任何等待它的操作都會收到該請求。CancellationTokenSource 可以用於應用程序中的任何工作單元,以便在取消請求發出時停止正在進行的工作單元。

二、如何使用CancellationTokenSource

要使用 CancellationTokenSource,需要先實例化它,然後將其 CancellationToken 屬性傳遞給需要實現取消邏輯的方法中。下面是一個使用 CancellationTokenSource 取消一個任務的示例。

public class TaskCancellationExample
{
    public void Run(CancellationToken token)
    {
        Task.Run(() =>
        {
            while (true)
            {
                token.ThrowIfCancellationRequested();
                // do something
            }
        }, token);
    }
}

public class Program
{
    static void Main()
    {
        CancellationTokenSource source = new CancellationTokenSource();
        TaskCancellationExample example = new TaskCancellationExample();
        example.Run(source.Token);
     
        Console.ReadLine();
        source.Cancel();
    }
}

以上代碼中,我們首先實例化了一個 CancellationTokenSource 對象,然後通過調用其 Token 屬性,將取消令牌傳遞給了 Run 方法中的任務。接著在主方法中,我們通過調用 CancellationTokenSource 的 Cancel 方法發出取消請求,從而停止正在進行的任務。

三、使用 CancellationTokenSource 實現限制並發執行的任務

在某些情況下,我們需要限制並發執行的任務數量,這時我們可以使用 CancellationTokenSource 來實現。

示例代碼如下:

public class ConcurrentTaskExample
{
    private readonly CancellationTokenSource _cancellationTokenSource;
    private readonly int _maxDegreeOfParallelism;
    private readonly BlockingCollection _blockingCollection;
    private readonly List _tasks;
  
    public ConcurrentTaskExample(int maxDegreeOfParallelism)
    {
        _cancellationTokenSource = new CancellationTokenSource();
        _maxDegreeOfParallelism = maxDegreeOfParallelism;
        _blockingCollection = new BlockingCollection(maxDegreeOfParallelism);
        _tasks = new List();
        for (int i = 0; i < maxDegreeOfParallelism; i++)
        {
            _blockingCollection.Add(new ManualResetEventSlim(false));
        }
    }

    public void Execute(IEnumerable actions)
    {
        Task.Run(() =>
        {
            foreach (var action in actions)
            {
                var resetEvent = _blockingCollection.Take(_cancellationTokenSource.Token);
                var task = Task.Run(() =>
                {
                    try
                    {
                        action.Invoke();
                    }
                    finally
                    {
                        resetEvent.Set();
                    }
                });
                _tasks.Add(task);
            }

            Task.WaitAll(_tasks.ToArray());
        });
    }

    public void Cancel()
    {
        _cancellationTokenSource.Cancel();
    }

    public void Wait()
    {
        ManualResetEventSlim.WaitAll(_blockingCollection.ToArray());
    }
}

public class Program
{
    static void Main()
    {
        ConcurrentTaskExample example = new ConcurrentTaskExample(2);
        example.Execute(new List
        {
            () => Thread.Sleep(5000),
            () => { Console.WriteLine("task1 completed"); },
            () => Thread.Sleep(1000),
            () => { Console.WriteLine("task2 completed"); },
            () => Thread.Sleep(2000),
            () => { Console.WriteLine("task3 completed"); },
            () => Thread.Sleep(1000),
            () => { Console.WriteLine("task4 completed"); },
            () => Thread.Sleep(3000),
            () => { Console.WriteLine("task5 completed"); },
        });
        example.Wait();
        example.Cancel();
    }
}

以上代碼中,我們定義了一個 ConcurrentTaskExample 類,它接受一個最大並發執行任務數(maxDegreeOfParallelism),並維護了一個 ManualResetEventSlim 實例的 BlockingCollection(_blockingCollection)和一組 Task(_tasks)。由於我們限制最大並發執行數量,因此當有任務可以執行時,需要從 BlockingCollection 中獲取一個 ManualResetEventSlim 對象,這裡我們調用 _blockingCollection.Take 方法來實現。當一個任務完成執行後,需要將該 ManualResetEventSlim 設置為有信號狀態,以便等待中的任務可以繼續執行。

在 Execute 方法中,我們首先通過 Task.Run 來啟動一個循環,用於一直取出任務並執行,直到所有的任務完成。我們將每個任務封裝在一個 Task.Run 中,從而可以監視其執行狀態。在取出任務時,我們使用 _blockingCollection.Take 方法來等待有可用的 ManualResetEventSlim 對象。然後我們重新封裝一個新的 Task,任務完成後,將 ManualResetEventSlim 設置為有信號狀態。每個任務完成後,我們都將其添加到 _tasks 集合中,以便我們可以在 Execute 方法結束時等待它們執行完成。

在主方法中,我們創建了一個 ConcurrentTaskExample 實例,它使用了 2 個並發執行的任務。在執行 Execute 方法時,我們傳遞了一組 Action,每個 Action 代表一個任務。最後,我們在 Wait 方法中等待所有任務完成,並在完成後通過調用 CancellationTokenSource 的 Cancel 方法來取消任務。由於使用了 CancellationTokenSource,我們可以在任何時候通過調用該對象的 Cancel 方法來取消任務。

原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/286841.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
小藍的頭像小藍
上一篇 2024-12-23 03:48
下一篇 2024-12-23 03:48

發表回復

登錄後才能評論