使用CancellationTokenSource实现并发控制

一、CancellationTokenSource 概述

CancellationTokenSource 用于生成 CancellationTokens,以用于监视取消请求。当取消令牌被请求时,任何等待它的操作都会收到该请求。CancellationTokenSource 可以用于应用程序中的任何工作单元,以便在取消请求发出时停止正在进行的工作单元。

二、如何使用CancellationTokenSource

要使用 CancellationTokenSource,需要先实例化它,然后将其 CancellationToken 属性传递给需要实现取消逻辑的方法中。下面是一个使用 CancellationTokenSource 取消一个任务的示例。

public class TaskCancellationExample
{
    public void Run(CancellationToken token)
    {
        Task.Run(() =>
        {
            while (true)
            {
                token.ThrowIfCancellationRequested();
                // do something
            }
        }, token);
    }
}

public class Program
{
    static void Main()
    {
        CancellationTokenSource source = new CancellationTokenSource();
        TaskCancellationExample example = new TaskCancellationExample();
        example.Run(source.Token);
     
        Console.ReadLine();
        source.Cancel();
    }
}

以上代码中,我们首先实例化了一个 CancellationTokenSource 对象,然后通过调用其 Token 属性,将取消令牌传递给了 Run 方法中的任务。接着在主方法中,我们通过调用 CancellationTokenSource 的 Cancel 方法发出取消请求,从而停止正在进行的任务。

三、使用 CancellationTokenSource 实现限制并发执行的任务

在某些情况下,我们需要限制并发执行的任务数量,这时我们可以使用 CancellationTokenSource 来实现。

示例代码如下:

public class ConcurrentTaskExample
{
    private readonly CancellationTokenSource _cancellationTokenSource;
    private readonly int _maxDegreeOfParallelism;
    private readonly BlockingCollection _blockingCollection;
    private readonly List _tasks;
  
    public ConcurrentTaskExample(int maxDegreeOfParallelism)
    {
        _cancellationTokenSource = new CancellationTokenSource();
        _maxDegreeOfParallelism = maxDegreeOfParallelism;
        _blockingCollection = new BlockingCollection(maxDegreeOfParallelism);
        _tasks = new List();
        for (int i = 0; i < maxDegreeOfParallelism; i++)
        {
            _blockingCollection.Add(new ManualResetEventSlim(false));
        }
    }

    public void Execute(IEnumerable actions)
    {
        Task.Run(() =>
        {
            foreach (var action in actions)
            {
                var resetEvent = _blockingCollection.Take(_cancellationTokenSource.Token);
                var task = Task.Run(() =>
                {
                    try
                    {
                        action.Invoke();
                    }
                    finally
                    {
                        resetEvent.Set();
                    }
                });
                _tasks.Add(task);
            }

            Task.WaitAll(_tasks.ToArray());
        });
    }

    public void Cancel()
    {
        _cancellationTokenSource.Cancel();
    }

    public void Wait()
    {
        ManualResetEventSlim.WaitAll(_blockingCollection.ToArray());
    }
}

public class Program
{
    static void Main()
    {
        ConcurrentTaskExample example = new ConcurrentTaskExample(2);
        example.Execute(new List
        {
            () => Thread.Sleep(5000),
            () => { Console.WriteLine("task1 completed"); },
            () => Thread.Sleep(1000),
            () => { Console.WriteLine("task2 completed"); },
            () => Thread.Sleep(2000),
            () => { Console.WriteLine("task3 completed"); },
            () => Thread.Sleep(1000),
            () => { Console.WriteLine("task4 completed"); },
            () => Thread.Sleep(3000),
            () => { Console.WriteLine("task5 completed"); },
        });
        example.Wait();
        example.Cancel();
    }
}

以上代码中,我们定义了一个 ConcurrentTaskExample 类,它接受一个最大并发执行任务数(maxDegreeOfParallelism),并维护了一个 ManualResetEventSlim 实例的 BlockingCollection(_blockingCollection)和一组 Task(_tasks)。由于我们限制最大并发执行数量,因此当有任务可以执行时,需要从 BlockingCollection 中获取一个 ManualResetEventSlim 对象,这里我们调用 _blockingCollection.Take 方法来实现。当一个任务完成执行后,需要将该 ManualResetEventSlim 设置为有信号状态,以便等待中的任务可以继续执行。

在 Execute 方法中,我们首先通过 Task.Run 来启动一个循环,用于一直取出任务并执行,直到所有的任务完成。我们将每个任务封装在一个 Task.Run 中,从而可以监视其执行状态。在取出任务时,我们使用 _blockingCollection.Take 方法来等待有可用的 ManualResetEventSlim 对象。然后我们重新封装一个新的 Task,任务完成后,将 ManualResetEventSlim 设置为有信号状态。每个任务完成后,我们都将其添加到 _tasks 集合中,以便我们可以在 Execute 方法结束时等待它们执行完成。

在主方法中,我们创建了一个 ConcurrentTaskExample 实例,它使用了 2 个并发执行的任务。在执行 Execute 方法时,我们传递了一组 Action,每个 Action 代表一个任务。最后,我们在 Wait 方法中等待所有任务完成,并在完成后通过调用 CancellationTokenSource 的 Cancel 方法来取消任务。由于使用了 CancellationTokenSource,我们可以在任何时候通过调用该对象的 Cancel 方法来取消任务。

原创文章,作者:小蓝,如若转载,请注明出处:https://www.506064.com/n/286841.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
小蓝小蓝
上一篇 2024-12-23 03:48
下一篇 2024-12-23 03:48

发表回复

登录后才能评论