wsx 发表于 2025-2-7 00:53:27

在 .NET Core 中使用 ActionBlock 实现高效率的多步骤数据处理

目录

[*]一、引言
[*]二、ActionBlock介绍

[*]什么是 ActionBlock?
[*]ActionBlock 的特点
[*]ActionBlock 的使用场景
[*]ActionBlock 的基本用法

[*]三、假设场景
[*]四、解决方案


一、引言

上一篇博客 分享了使用 Channel 来实现针对大量数据的多线程异步处理,感谢大哥们在评论中提出的宝贵的问题和建议!本篇将分享使用 ActionBlock 如何实现,欢迎在评论区留言讨论。
二、ActionBlock介绍

什么是 ActionBlock?

ActionBlock是 .NET 中 TPL Dataflow 库的一部分,用于处理数据流和并行任务。它提供了一种简单而强大的方式来处理并行任务,并且可以轻松地实现生产者-消费者模式。
ActionBlock 的特点


[*]并行处理:ActionBlock可以配置为并行处理多个任务,从而提高处理效率
[*]异步编程:支持异步编程模型,可以避免阻塞线程,提高应用程序的响应速度和吞吐量
[*]数据流控制:可以通过设置最大并行度和其他选项来控制数据流的处理方式
[*]任务调度:可以用于调度和管理并行任务,确保任务按预期执行
ActionBlock 的使用场景


[*]生产者-消费者模式:可以用于实现生产者-消费者模式,其中生产者将数据发送到ActionBlock,消费者从ActionBlock中读取数据并进行处理
[*]数据流处理:适用于需要处理大量数据并且需要并行处理的场景,例如日志处理、数据转换等
[*]任务调度:可以用于调度和管理并行任务,确保任务按预期执行
ActionBlock 的基本用法

使用ActionBlock非常简单,主要步骤如下:

[*]创建 ActionBlock:定义一个 ActionBlock,指定要执行的操作和并行选项
[*]发送数据到 ActionBlock:使用SendAsync方法将数据发送到 ActionBlock
[*]完成 ActionBlock:在所有数据发送完成后,调用Complete方法通知 ActionBlock 不再接收新的数据
[*]等待处理完成:使用Completion属性等待所有数据处理完成
以下是一个简单的示例代码,展示了如何使用 ActionBlock:
using System.Threading.Tasks.Dataflow;var actionBlock = new ActionBlock<int>(async item =>{    // 模拟异步处理    await Task.Delay(100);    Console.WriteLine($"Processed item: {item}");}, new ExecutionDataflowBlockOptions{    MaxDegreeOfParallelism = 4 // 设置最大并行度});// 发送数据到 ActionBlockfor (int i = 0; i < 10; i++){    await actionBlock.SendAsync(i);}// 完成 ActionBlockactionBlock.Complete();// 等待处理完成await actionBlock.Completion;Console.WriteLine("All items processed.");三、假设场景

假设我们有一组数据需要经过两个步骤的处理。每个数据项都需要进行初步处理,然后进行进一步处理。希望步骤2可以在步骤1产生结果数据后立即开始处理,而不是等待步骤1完全处理完毕。
四、解决方案

使用TransformBlock和ActionBlock来实现生产者-消费者模式。生产者负责读取数据并将其发送到TransformBlock中,消费者从TransformBlock中读取数据并进行处理。
以下是一个简单的示例代码,演示如何使用TransformBlock和ActionBlock实现生产者-消费者模式来处理数据:
using System.Threading.Tasks.Dataflow;var cts = new CancellationTokenSource();// 假设有一组数据var dataItems = Enumerable.Range(0, 1000).Select(x => $"data_{x}").ToList();var processor = new DataProcessor(10, cts.Token);await processor.ProcessAsync(dataItems);Console.ReadKey();/// <summary>/// 数据处理器/// </summary>public class DataProcessor(int maxDegreeOfParallelism, CancellationToken cancellationToken){    public async Task ProcessAsync(List<string> dataItems)    {      // 创建一个 TransformBlock 用于步骤1的处理,并将结果发送到步骤2的 ActionBlock      var step1Block = new TransformBlock<string, string>(async dataItem => await Step1(dataItem), new ExecutionDataflowBlockOptions      {            MaxDegreeOfParallelism = maxDegreeOfParallelism,            CancellationToken = cancellationToken      });               // 创建一个 ActionBlock 用于步骤2的处理      var step2Block = new ActionBlock<string>(async dataItem =>      {            await Step2(dataItem);      }, new ExecutionDataflowBlockOptions      {            MaxDegreeOfParallelism = maxDegreeOfParallelism,            CancellationToken = cancellationToken      });      // 将 TransformBlock 链接到 ActionBlock      step1Block.LinkTo(step2Block, new DataflowLinkOptions { PropagateCompletion = true });      // 启动多个步骤1的任务(生产者)      foreach (var dataItem in dataItems)      {            await step1Block.SendAsync(dataItem, cancellationToken);      }      // 完成步骤1的 TransformBlock 的写入      step1Block.Complete();      // 等待步骤1的 TransformBlock 处理完成      await step1Block.Completion;      // 完成步骤2的 ActionBlock 的写入      step2Block.Complete();      // 等待步骤2的 ActionBlock 处理完成      await step2Block.Completion;    }    private async Task<string> Step1(string dataItem)    {      // 模拟步骤1的处理(如初步处理数据)      await Task.Delay(10, cancellationToken);      Console.WriteLine($"Step1 processed data item: {dataItem}");      return dataItem;    }    private async Task Step2(string dataItem)    {      // 模拟步骤2的处理(如进一步处理数据)      await Task.Delay(10, cancellationToken);      Console.WriteLine($"Step2 processed data item: {dataItem}");    }}代码解释:

[*]创建Step1的 TransformBlock:在ProcessAsync方法中,我们首先创建了一个 TransformBlock,用于Step1的处理,TransformBlock 接受一个输入数据项,进行处理后返回一个输出数据项,TransformBlock<string, string>表示输入和输出都是string类型
[*]创建Step2的 ActionBlock:创建一个 ActionBlock 用于Step2的处理,ActionBlock 接受一个输入数据项并进行处理,但不返回输出数据项。ActionBlock<string>表示输入是string类型
[*]链接 TransformBlock 和 ActionBlock:将 TransformBlock 链接到 ActionBlock ,以便将Step1的处理结果发送到Step2进行处理,使用LinkTo方法将两个块连接起来,并设置PropagateCompletion为 true,表示当 TransformBlock 完成时,ActionBlock 也会完成
[*]启动Step1的任务:逐个将数据项发送到 TransformBlock,并等待所有数据处理完成,使用SendAsync方法将数据项发送到 TransformBlock
[*]等待任务完成:使用Complete方法通知 TransformBlock 不再接收新的数据,并使用Completion属性等待所有数据处理完成。然后完成Step2的 ActionBlock 的写入,并等待Step2的 ActionBlock 处理完成
页: [1]
查看完整版本: 在 .NET Core 中使用 ActionBlock 实现高效率的多步骤数据处理