TPL实现Task.WhileAll扩展方法

简介:

文章翻译整理自 Nikola Malovic 两篇博文:

当 Task.WhenAll 遇见 Task.WhenAny

在 TPL (Task Parallel Library) 中,有两种通过非阻塞方式等待 Task 数组任务结束的方式:Task.WhenAll 和 Task.WhenAny 。

它们的工作方式是:

  • WhenAll 当每项任务都完成时为完成。
  • WhenAny 当任意项任务完成时为完成。

现在我们需要一项功能,完成 Task 数组中的所有任务,并且当有任务完成时汇报状态。

我们称这个扩展方法为:Task.WhileAll 。

扩展方法实现

复制代码
 1     public static class TaskExtensions
 2     {
 3         public static async Task<IList<T>> WhileAll<T>(this IList<Task<T>> tasks, IProgress<T> progress)
 4         {
 5             var result = new List<T>(tasks.Count);
 6             var done = new List<Task<T>>(tasks);
 7 
 8             while (done.Count > 0)
 9             {
10                 await Task.WhenAny(tasks);
11 
12                 var spinning = new List<Task<T>>(done.Count - 1);
13                 for (int i = 0; i < done.Count; i++)
14                 {
15                     if (done[i].IsCompleted)
16                     {
17                         result.Add(done[i].Result);
18                         progress.Report(done[i].Result);
19                     }
20                     else
21                     {
22                         spinning.Add(done[i]);
23                     }
24                 }
25 
26                 done = spinning;
27             }
28 
29             return result;
30         }
31     }
复制代码

代码实现很简单:

  • 其是 IList<Task<T>> 的一个 async 扩展方法
  • 方法返回完整的 IList<T> 结果
  • 方法会接受一个 IProgress<T> 类型的参数,用于向订阅者发布 Task 完成信息
  • 在方法体内,我们使用一个循环来检测,直到所有 Task 完成
  • 通过使用 Task.WhenAny 来异步等待 Task 完成

单元测试

复制代码
 1     [TestClass]
 2     public class UnitTest1
 3     {
 4         [TestMethod]
 5         public async Task TestTaskExtensionsWhileAll()
 6         {
 7             var task1 = Task.Run(() => 101);
 8             var task2 = Task.Run(() => 102);
 9             var tasks = new List<Task<int>>() { task1, task2 };
10 
11             List<int> result = new List<int>();
12             var listener = new Progress<int>(
13                 taskResult =>
14                 {
15                     result.Add(taskResult);
16                 });
17 
18             var actual = await tasks.WhileAll(listener);
19             Thread.Sleep(50); // wait a bit for progress reports to complete
20 
21             Assert.AreEqual(2, result.Count);
22             Assert.IsTrue(result.Contains(101));
23             Assert.IsTrue(result.Contains(102));
24 
25             Assert.AreEqual(2, actual.Count);
26             Assert.IsTrue(actual.Contains(101));
27             Assert.IsTrue(actual.Contains(102));
28         }
29     }
复制代码

同样,测试代码也不复杂:

  • 创建两个哑元 Task,并存到数组中
  • 定义进度侦听器 Progress<T>,来监测每个任务运行的结果
  • 通过 await 方式来调用方法
  • 使用 Thread.Sleep 来等待 50ms ,以便 Progress 可以来得及处理结果
  • 检查所有 Task 执行完毕后均已上报 Progress
  • 检查所有 Task 均已执行完毕

我知道每当使用 Thread.Sleep 时绝不是件好事,所以我决定摆脱它。

实现IProgressAsync<T>

问题实际上是因为 IProgress<T> 接口定义的是 void 委托,因此无法使用 await 进行等待。

因此我决定定义一个新的接口,使用同样的 Report 行为,但会返回 Task ,用以实现真正的异步。

1     public interface IProgressAsync<in T>
2     {
3         Task ReportAsync(T value);
4     }

有了异步版本的支持,将使订阅者更容易处理 await 调用。当然也可以使用 async void 来达成,但我认为 async void 总会延伸出更差的设计。所以,我还是选择通过定义 Task 返回值签名的接口来达成这一功能。

如下为接口实现:

复制代码
 1     public class ProgressAsync<T> : IProgressAsync<T>
 2     {
 3         private readonly Func<T, Task> handler;
 4 
 5         public ProgressAsync(Func<T, Task> handler)
 6         {
 7             this.handler = handler;
 8         }
 9 
10         public async Task ReportAsync(T value)
11         {
12             await this.handler.InvokeAsync(value);
13         }
14     }
复制代码

显然也没什么特别的:

  • 使用 Func<T, Task> 来代替 Action<T>,以便可以使用 await
  • ReportAsync 通过使用 await 方式来提供 Task

有了这些之后,我们来更新扩展方法:

复制代码
 1     public static class TaskExtensions
 2     {
 3         public static async Task<IList<T>> WhileAll<T>(this IList<Task<T>> tasks, IProgressAsync<T> progress)
 4         {
 5             var result = new List<T>(tasks.Count);
 6             var remainingTasks = new List<Task<T>>(tasks);
 7 
 8             while (remainingTasks.Count > 0)
 9             {
10                 await Task.WhenAny(tasks);
11                 var stillRemainingTasks = new List<Task<T>>(remainingTasks.Count - 1);
12                 for (int i = 0; i < remainingTasks.Count; i++)
13                 {
14                     if (remainingTasks[i].IsCompleted)
15                     {
16                         result.Add(remainingTasks[i].Result);
17                         await progress.ReportAsync(remainingTasks[i].Result);
18                     }
19                     else
20                     {
21                         stillRemainingTasks.Add(remainingTasks[i]);
22                     }
23                 }
24 
25                 remainingTasks = stillRemainingTasks;
26             }
27 
28             return result;
29         }
30 
31         public static Task InvokeAsync<T>(this Func<T, Task> task, T value)
32         {
33             return Task<Task>.Factory.FromAsync(task.BeginInvoke, task.EndInvoke, value, null);
34         }
35     }
复制代码

所有都就绪后,我们就可以将 Thread.Sleep 从单元测试中移除了。

复制代码
 1     [TestClass]
 2     public class UnitTest1
 3     {
 4         private List<int> result = new List<int>();
 5         private async Task OnProgressAsync(int arg)
 6         {
 7             result.Add(arg);
 8         }     
 9 
10         [TestMethod]
11         public async Task TestTaskExtensionsWhileAll()
12         {
13             var task1 = Task.Run(() => 101);
14             var task2 = Task.Run(() => 102);
15             var tasks = new List<Task<int>>() { task1, task2 };
16 
17             var listener = new ProgressAsync<int>(this.OnProgressAsync);
18             var actual = await tasks.WhileAll(listener);
19 
20             Assert.AreEqual(2, this.result.Count);
21             Assert.IsTrue(this.result.Contains(101));
22             Assert.IsTrue(this.result.Contains(102));
23 
24             Assert.AreEqual(2, actual.Count);
25             Assert.IsTrue(actual.Contains(101));
26             Assert.IsTrue(actual.Contains(102));
27         }
28     }
复制代码
目录
相关文章
|
移动开发 数据可视化 weex
如何在线生成App:将网页封装成APP
随着移动互联网的快速发展,很多企业和个人都希望能够将自己的网页封装成APP,以便更好地适应用户的移动需求。本文将介绍一种在线生成App的方法,帮助读者实现将网页封装成APP的目标
389 0
|
9月前
|
人工智能
精度与通用性不可兼得,北大华为理论证明低精度下scaling law难以实现
北京大学和华为的研究团队在论文《数值精度如何影响大型语言模型的数学推理能力》中指出,数值精度是影响Transformer模型在数学任务中表现的关键因素。研究发现,低数值精度下,模型难以处理算术任务,如迭代加法和整数乘法;而在标准数值精度下,模型表现更佳且所需规模较小。实验结果表明,提高数值精度可显著提升LLM的数学推理能力,为优化模型性能提供了新思路。
207 88
|
9月前
|
人工智能 Java 新能源
我的2024总结 | 降薪优化、焦虑生病、读书写作、逆势成长
本文回顾了作者在2024年的经历与感悟,涵盖了职业发展、健康管理、读书写作等多个方面。面对工作压力和身体不适的双重挑战,作者通过阅读心理学书籍找到了应对焦虑的方法,并坚持写作分享,逐步实现自我救赎与成长。展望2025,作者强调“知行合一”,希望通过持续努力突破困境,迎接新的开始。文中还推荐了几本有助于个人成长的书籍,如《正面管教》和《被讨厌的勇气》,鼓励读者共同进步,健康顺利地度过新的一年。
|
9月前
|
JavaScript 关系型数据库 数据库
探索Wiki:开源知识管理平台及其私有化部署
在信息时代,知识管理至关重要。本文介绍一款GitHub上的开源工具——Wiki,基于Node.js和Vue.js开发,旨在提供高效的知识管理解决方案。它具备简洁界面、权限管理、多语言支持及高度可定制等特点,适合团队协作。通过Docker-compose私有化部署,用户可轻松搭建专属知识库,保障数据安全。访问[GitHub](https://github.com/requarks/wiki)获取更多信息。
443 7
|
人工智能 自然语言处理 算法
|
数据库 对象存储
状态机的原理简析及重要用途
状态机的原理简析及重要用途
303 1
|
数据采集 开发框架 搜索推荐
开题报告-基于SpringBoot的“遇见”婚恋交友平台的设计与实现
开题报告-基于SpringBoot的“遇见”婚恋交友平台的设计与实现
513 0
|
JavaScript C# 数据安全/隐私保护
C# 软件Licence应用实例
我们在使用一些需要购买版权的软件产品时,或者我们做的商业软件需要进行售卖,为了收取费用,一般需要一个软件使用许可证,然后输入这个许可到软件里就能够使用软件。简单的是一串序列码或者一个许可证文件,复杂的是一个定制化插件包。于是有的小伙伴就开始好奇这个许可是怎么实现的,特别是在离线情况下它是怎么给软件授权,同时又能避免被破解的。
567 0
|
机器学习/深度学习 Python
Python 金融量化 道路突破策略(唐奇安道路突破策略&布林带通道及其市场风险)
Python 金融量化 道路突破策略(唐奇安道路突破策略&布林带通道及其市场风险)
1783 0
Python 金融量化 道路突破策略(唐奇安道路突破策略&布林带通道及其市场风险)
|
安全 JavaScript 小程序
云支付官方FAQ
云支付官方小二实时更新的浓缩FAQ,帮助广大服务商快速定位问题。