线程-线程池1

本文涉及的产品
应用实时监控服务ARMS - 应用监控,每月50GB免费额度
简介: 今天我们说说线程池,线程池为什么来呢?

带着问题去思考!大家好!

今天我们说说线程池,线程池为什么来呢?

  之前我们讲过线程,它的创建和协作的几种方式。花费极少的时间来完成创建很多异步操作。创建线程是昂贵的操作,所以为每个短暂的异步操作创建线程会产生显著的开销

  那么为了解决这一问题,有一个常用的方式叫做池,线程池可以成功的适用于任何需要大量短暂的开销大的资源情形。事先分配一定的资源,将这些资源放到资源池中。每次需要新的资源,只需要从池中获取一个,不需要创建新的。

  通过System.Threading.ThreadPool类型可以使用线程池。线程池是受.NET通用语言运行时(CLR)管理的。每个CLR都有一个线程池的实例,ThreadPool类型拥有一个QueueUserWorkItem静态方法,该静态方法接受一个委托。方法被调用后,委托会进入内部队列中,如果池中没有任何线程,将创建一个新的工作线程并将队列中第一个委托放入到该工作线程中。

注意:保持线程中的操作都是短暂的。

  停止:停止线程池中放置新操作时,线程池最终会删除一定时间后过期的不再使用的线程。这将释放所有那些不再需要的系统资源。

线程池中的的工作线程都是后台线程。这意味着当所有的前台线程(主线程)完成后,所有的后台线程将停止工作。

在线程池中使用委托

这里我们将说到异步编程模型(Asynchronous Programming Model,简称APM)的方式。

privatedelegatestring RunOnThreadPool(outint threadId);         privatestaticvoid Callback(IAsyncResult ar)         {             Console.WriteLine("Starting a callback...");             Console.WriteLine("State passed to a callback:{0}", ar.AsyncState);             Console.WriteLine("Is thread pool thread {0}", Thread.CurrentThread.IsThreadPoolThread);             Console.WriteLine("Thread pool worker thread id:{0}", Thread.CurrentThread.ManagedThreadId);         }         privatestaticstring Test(outint threadId)         {             Console.WriteLine("Starting....");             Console.WriteLine("Is thraed pool thread :{0}", Thread.CurrentThread.IsThreadPoolThread);             Thread.Sleep(TimeSpan.FromSeconds(2));             threadId = Thread.CurrentThread.ManagedThreadId;             returnstring.Format("Thread pool worker thread id was:{0}", threadId);         }         staticvoid Main(string[] args)         {             int threadId = 0;             RunOnThreadPool poolDelegate = Test;             var t = new Thread(() => Test(out threadId));             t.Start();             t.Join();             Console.WriteLine("Thread id:{0}", threadId);             //使用APM方式,进行异步调用,            IAsyncResult r = poolDelegate.BeginInvoke(out threadId, Callback, "a delege asynchronous call");             r.AsyncWaitHandle.WaitOne();             string result = poolDelegate.EndInvoke(out threadId, r);             Console.WriteLine("Thread pool worker thread id:{0}", threadId);             Console.WriteLine(result);             Thread.Sleep(TimeSpan.FromSeconds(2));         }

 

运行结果:

.NET CORE不支持BeginInvoke

image.png

首先开始我们使用以前的方式创建一个线程,然后启动它并等待完成。而TheadPool为开始线程池任务输出的信息,Callback为APM模式运行任务结束后,执行的回调方法。

使用BeginOperationName/EndOperationName方法和.NET中的IAsyncResult对象等方式称为异步编程模型(或APM模式),这样的方法称为异步方法,但在现代编程中,更推荐使用任务并行库(Task Parallel Libary,TPL)来组织异步API

 

向线程池中放入异步操作

 

privatestaticvoid AsyncOperation(object state)         {             Console.WriteLine("Operation state:{0}",state??"(null)");             Console.WriteLine("Worker thread id :{0}", Thread.CurrentThread.ManagedThreadId);             Thread.Sleep(TimeSpan.FromSeconds(2));         }         staticvoid Main(string[] args)         {             constint x = 1;             constint y = 2;             conststring lambdaState = "lambda state 2";             ThreadPool.QueueUserWorkItem(AsyncOperation);             Thread.Sleep(TimeSpan.FromSeconds(1));             ThreadPool.QueueUserWorkItem(AsyncOperation, "async state");             Thread.Sleep(TimeSpan.FromSeconds(1));             ThreadPool.QueueUserWorkItem(state => {                 Console.WriteLine("Operation state:{0}", state);                 Console.WriteLine("Worker thread id :{0}", Thread.CurrentThread.ManagedThreadId);                 Thread.Sleep(TimeSpan.FromSeconds(2));             }, "lambda state");             ThreadPool.QueueUserWorkItem(_ => {                 Console.WriteLine("Operation state:{0},{1}", x+y,lambdaState);                 Console.WriteLine("Worker thread id :{0}", Thread.CurrentThread.ManagedThreadId);                 Thread.Sleep(TimeSpan.FromSeconds(2));             }, "lambda state");         }

定义一个方法,首先接受单个object类型的参数,通过QueueUserWorkItem方法将该方法放到线程池.接着再次放入该方法,但是这次给方法调用传入一个状态对象,该对象将作为状态参数给AsynchronousOperation方法,

闭包机制,无需传递lambda表达式的状态,闭包更灵活,允许我们向异步操作传递一个以上的对象而且这些对象具有静态类型。

线程池与并行度

staticvoid UseThreads(int numberOfOperations)         {             using (var countdown=new CountdownEvent(numberOfOperations))             {                 Console.WriteLine("Scheduling work by creating threads");                 for (int i = 0; i < numberOfOperations; i++)                 {                     var thread = new Thread(()=> {                         Console.Write("{0},", Thread.CurrentThread.ManagedThreadId);                         Thread.Sleep(TimeSpan.FromSeconds(0.1));                         countdown.Signal();//向 CountdownEvent 注册信号,同时减小 CurrentCount 的值。                    });                     thread.Start();                 }                 countdown.Wait();                 Console.WriteLine();             }         }         staticvoid UseThreadPool(int numberOfOperations)         {             using (var countdown=new CountdownEvent(numberOfOperations))             {                 Console.WriteLine("Stating work on a threadpool");                 for (int i = 0; i < numberOfOperations; i++)                 {                     ThreadPool.QueueUserWorkItem(_=> {                         Console.Write("{0},", Thread.CurrentThread.ManagedThreadId);                         Thread.Sleep(TimeSpan.FromSeconds(0.1));                         countdown.Signal();//向 CountdownEvent 注册信号,同时减小 CurrentCount 的值。                    });                 }                 countdown.Wait();                 Console.WriteLine();             }                    }         staticvoid Main(string[] args)         {             constint numberOfOperations = 500;             var sw = new Stopwatch();             sw.Start();             UseThreads(numberOfOperations);             sw.Stop();             Console.WriteLine("Execution time using thraeds:{0}",sw.ElapsedMilliseconds);             sw.Reset();             sw.Start();             UseThreadPool(numberOfOperations);             sw.Stop();             Console.WriteLine("Execution time using thraeds:{0}", sw.ElapsedMilliseconds);         }

当程序启动的时候,创建很多不同的线程,每个线程运行一个操作,该操作打印出线程id并阻塞线程100毫秒,我们创建500个线程,虽然使用了205毫秒,但是线程消耗了大量的操作系统资源。

然后我们执行同样的任务,只不过不为每个操作创建一个线程,将它们放入到线程池当中。线程池在快结束的时候创建更多的线程,但是仍然花费了更多的时间,在我的机器上是接近9秒,我们为操作系统节省了内存和线程数,但是执行时间长了

image.png

image.png

image.png

实现一个取消

staticvoid AsyncOperation1(CancellationToken token)         {             Console.WriteLine("Starting the first task");             for (int i = 0; i < 5; i++)             {                 if (token.IsCancellationRequested)                 {                     Console.WriteLine("The first task has been canceled");                     return;                 }                 Thread.Sleep(TimeSpan.FromSeconds(1));             }             Console.WriteLine("The first task has completed succesfuly");         }         staticvoid AsyncOperation2(CancellationToken token)         {             try            {                 Console.WriteLine("Starting the first task");                 for (int i = 0; i < 5; i++)                 {                     token.ThrowIfCancellationRequested();                     Thread.Sleep(TimeSpan.FromSeconds(1));                 }                 Console.WriteLine("The second task has completed succesfuly");             }             catch (OperationCanceledException)             {                 Console.WriteLine("The second task has been canceld");             }         }         privatestaticvoid AsyncOperation3(CancellationToken token)         {             bool cancellationFlag = false;             token.Register(() => cancellationFlag = true);             Console.WriteLine("Starting the third task");             for (int i = 0; i < 5; i++)             {                 if (cancellationFlag)                 {                     Console.WriteLine("The third task has been canceld");                     return;                 }                 Thread.Sleep(TimeSpan.FromSeconds(1));             }             Console.WriteLine("The third task has been succesfuly");         }         staticvoid Main(string[] args)         {             using (var cts=new CancellationTokenSource())             {                 CancellationToken token = cts.Token;                 ThreadPool.QueueUserWorkItem(_=>AsyncOperation1(token));                 Thread.Sleep(TimeSpan.FromSeconds(2));                 cts.Cancel();             }             using (var cts = new CancellationTokenSource())             {                 CancellationToken token = cts.Token;                 ThreadPool.QueueUserWorkItem(_ => AsyncOperation2(token));                 Thread.Sleep(TimeSpan.FromSeconds(2));                 cts.Cancel();             }             using (var cts = new CancellationTokenSource())             {                 CancellationToken token = cts.Token;                 ThreadPool.QueueUserWorkItem(_ => AsyncOperation3(token));                 Thread.Sleep(TimeSpan.FromSeconds(2));                 cts.Cancel();             }             Thread.Sleep(TimeSpan.FromSeconds(2));         }

image.png

这里我们用了CancellationTokenSource和CancellationToken两个新类,它们在.NET4.0被引入,这里使用了三种方式来实现取消标记功能。

第一种使用轮询来检查CancellationToken.IsCancellationRequested属性。如果该属性为true,则说明操作需要被取消,

第二种是抛出一个OperationCancelledException异常。这允许在操作之外控制取消过程,需要取消操作时,通过操作之外的代码来处理。

最后一种是注册一个回调函数,当操作被取消时,在线程池调用该回调函数,这允许链式传递一个取消逻辑到另一个异步操作中

 

 

在线程池中使用等待事件处理器及超时

static void RunOperations(TimeSpan workOperationTimeOut)         {             using (var evt = new ManualResetEvent(false))             using (var cts=new CancellationTokenSource())             {                 Console.WriteLine("Registering timeout operation....");                 var worker = ThreadPool.RegisterWaitForSingleObject(evt, (state, isTimedOut) => WorkOperationWait(cts, isTimedOut), null, workOperationTimeOut, true);                 Console.WriteLine("Statrting long running operation...");                 ThreadPool.QueueUserWorkItem(_ => WorkOperation(cts.Token, evt));                 Thread.Sleep(workOperationTimeOut.Add(TimeSpan.FromSeconds(2)));                 worker.Unregister(evt);             }         }         private static void WorkOperationWait(CancellationTokenSource cts, bool isTimedOut)         {             if(isTimedOut)             {                 cts.Cancel();                 Console.WriteLine("Worker operation timed out and was canceled");             }             else            {                 Console.WriteLine("Worker operation successed");             }         }         private static void WorkOperation(CancellationToken token,ManualResetEvent evt)         {             for (int i = 0; i < 6; i++)             {                 if(token.IsCancellationRequested)                 {                     return;                 }                 Thread.Sleep(TimeSpan.FromSeconds(1)) ;             }             evt.Set();         }         static void Main(string[] args)         {             RunOperations(TimeSpan.FromSeconds(5));             RunOperations(TimeSpan.FromSeconds(7));         }


image.png

线程池还有一个有用的方法:ThreadPool.RegisterWaitForSingleObject.该方法允许我们将回调函数放入线程池中的队列中。当提供的等待事件处理器收到信号或发生超时时,该回调函数将被调用,这允许我们为线程池中的操作实现超时功能。

首先按顺序向线程池中放入一个耗时长的操作,它运行6秒,一旦完成,会设置一个ManualResetEvent信号类。其他情况。比如取消,则操作会被丢弃、

注册第二个异步操作,当从ManualResetEvent对象接受一个信号后,该异步操作会被调用。如果一个操作顺利完成,会设置该信号量。另一种情况是第一个操作还未完成就已经超时,如果发生该情况,会使用CancellationToken来取消第一个操作。

最后,为操作提供5秒的超时时间是不够的,这是因为操作会花费6秒来完成,只能取消该操作。所以如果提供7秒的超时时间是可行的。该操作会顺利完成。

 

相关文章
|
2月前
|
缓存 监控 安全
Java的线程池和线程安全
Java的线程池和线程安全
|
2月前
|
设计模式 监控 Java
Java多线程基础-11:工厂模式及代码案例之线程池(一)
本文介绍了Java并发框架中的线程池工具,特别是`java.util.concurrent`包中的`Executors`和`ThreadPoolExecutor`类。线程池通过预先创建并管理一组线程,可以提高多线程任务的效率和响应速度,减少线程创建和销毁的开销。
47 2
|
3天前
|
Java
线程池和线程详细教程
线程池和线程详细教程
|
5天前
|
监控 Java 调度
Java面试题:描述Java线程池的概念、用途及常见的线程池类型。介绍一下Java中的线程池有哪些优缺点
Java面试题:描述Java线程池的概念、用途及常见的线程池类型。介绍一下Java中的线程池有哪些优缺点
18 1
|
5天前
|
设计模式 存储 安全
Java面试题:设计一个线程安全的单例类并解释其内存占用情况?使用Java多线程工具类实现一个高效的线程池,并解释其背后的原理。结合观察者模式与Java并发框架,设计一个可扩展的事件处理系统
Java面试题:设计一个线程安全的单例类并解释其内存占用情况?使用Java多线程工具类实现一个高效的线程池,并解释其背后的原理。结合观察者模式与Java并发框架,设计一个可扩展的事件处理系统
14 1
|
5天前
|
设计模式 安全 Java
Java面试题:请解释Java中的线程池以及为什么要使用线程池?请解释Java中的内存模型以及如何避免内存泄漏?请解释Java中的并发工具包以及如何实现一个简单的线程安全队列?
Java面试题:请解释Java中的线程池以及为什么要使用线程池?请解释Java中的内存模型以及如何避免内存泄漏?请解释Java中的并发工具包以及如何实现一个简单的线程安全队列?
10 1
|
5天前
|
设计模式 缓存 安全
Java面试题:工厂模式与内存泄漏防范?线程安全与volatile关键字的适用性?并发集合与线程池管理问题
Java面试题:工厂模式与内存泄漏防范?线程安全与volatile关键字的适用性?并发集合与线程池管理问题
11 1
|
5天前
|
设计模式 并行计算 安全
Java面试题:如何使用设计模式优化多线程环境下的资源管理?Java内存模型与并发工具类的协同工作,描述ForkJoinPool的工作机制,并解释其在并行计算中的优势。如何根据任务特性调整线程池参数
Java面试题:如何使用设计模式优化多线程环境下的资源管理?Java内存模型与并发工具类的协同工作,描述ForkJoinPool的工作机制,并解释其在并行计算中的优势。如何根据任务特性调整线程池参数
10 0
|
5天前
|
安全 算法 Java
Java面试题:如何诊断和解决Java应用程序中的内存泄漏问题?如何实现一个线程安全的计数器?如何合理配置线程池以应对不同的业务场景?
Java面试题:如何诊断和解决Java应用程序中的内存泄漏问题?如何实现一个线程安全的计数器?如何合理配置线程池以应对不同的业务场景?
7 0
|
5天前
|
算法 安全 Java
Java面试题:解释JVM中的堆内存分代收集策略,并讨论年轻代和老年代的特点,描述Java中的线程池,并解释线程池的优点,解释Java中的`volatile`关键字的作用和使用场景
Java面试题:解释JVM中的堆内存分代收集策略,并讨论年轻代和老年代的特点,描述Java中的线程池,并解释线程池的优点,解释Java中的`volatile`关键字的作用和使用场景
8 0