思绪由Q1引发,后续Q2、Q3基于Q1的发散探究
Q1. Task.Run、Task.Factory.StartNew 的区别?
我们常使用
Task.Run
和Task.Factory.StartNew
创建并启动任务,但是他们的区别在哪里?在哪种场景下使用前后者?
官方推荐使用Task.Run方法启动基于计算的任务, 当需要对长时间运行、基于计算的任务做精细化控制时使用Task.Factory.StartNew。
Task.Factory提供了自定义选项、自定义调度器的能力,这也说明了Task.Run是Task.Factory.StartNew的一个特例,Task.Run 只是提供了一个无参、默认的任务创建和调度方式。
当你在Task.Run传递委托
Task.Run(someAction);
实际上等价于
Task.Factory.StartNew(someAction, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
一个长时间运行的任务,如果使用Task.Run铁定会使用线程池线程,可能构成滥用线程池线程,这个时候最好在独立线程中执行任务。
Q2. 既然说到Task.Run使用线程池线程,线程池线程有哪些特征?为什么有自定义调度器一说?
github: TaskScheduler[1] 251行显示TaskScheduler.Dafult
确实是线程池任务调度器。
线程池[2]线程的特征:
① 池中线程都是后台线程
② 线程可重用,一旦线程池中的线程完成任务,将返回到等待线程队列中, 避免了创建线程的开销
③ 池中预热了工作者线程、IO线程
我启动一个脚手架项目:默认最大工作者线程32767,最大IO线程1000 ; 默认最小工作线程数、最小IO线程数均为8个
github: ThreadPoolTaskScheduler[3] 显示线程池任务调度器是这样调度任务的:
/// <summary>/// Schedules a task to the ThreadPool./// </summary>/// <param name="task">The task to schedule.</param>protected internal override void QueueTask(Task task){ TaskCreationOptions options = task.Options; if ((options & TaskCreationOptions.LongRunning) != 0) { // Run LongRunning tasks on their own dedicated thread. Thread thread = new Thread(s_longRunningThreadWork); thread.IsBackground = true; // Keep this thread from blocking process shutdown thread.Start(task); } else { // Normal handling for non-LongRunning tasks. bool preferLocal = ((options & TaskCreationOptions.PreferFairness) == 0); ThreadPool.UnsafeQueueUserWorkItemInternal(task, preferLocal); }}
请注意8-14行:若上层使用者将LongRunning
任务应用到默认的任务调度器(也即ThreadPoolTaskScheduler),ThreadPoolTaskScheduler会有一个兜底方案:会将任务放在独立线程上执行。
何时不使用线程池线程
有几种应用场景,其中适合创建并管理自己的线程,而非使用线程池线程:
•需要一个前台线程。
•需要具有特定优先级的线程。
•拥有会导致线程长时间阻塞的任务。线程池具有最大线程数,因此大量被阻塞的线程池线程可能会阻止任务启动。
•需将线程放入单线程单元。所有 ThreadPool 线程均位于多线程单元中。
•需具有与线程关联的稳定标识,或需将一个线程专用于一项任务。
Q3. 既然要自定义任务调度器,那我们就来倒腾?
实现TaskScheduler
抽象类,其中的抓手是“调度”,也就是 QueueTask、TryExecuteTask
方法,之后你可以自定义数据结构和算法, 从数据结构中调度出任务执行。
给个例子:
public sealed class CustomTaskScheduler : TaskScheduler, IDisposable { private BlockingCollection<Task> tasksCollection = new BlockingCollection<Task>(); private readonly Thread mainThread = null; public CustomTaskScheduler() { mainThread = new Thread(new ThreadStart(Execute)); if (!mainThread.IsAlive) { mainThread.Start(); } } private void Execute() { foreach (var task in tasksCollection.GetConsumingEnumerable()) { TryExecuteTask(task); } } protected override IEnumerable<Task> GetScheduledTasks() { return tasksCollection.ToArray(); } protected override void QueueTask(Task task) { if (task != null) tasksCollection.Add(task); } protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { return false; } private void Dispose(bool disposing) { if (!disposing) return; tasksCollection.CompleteAdding(); tasksCollection.Dispose(); } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } }