线程-线程池1

本文涉及的产品
应用实时监控服务-应用监控,每月50GB免费额度
应用实时监控服务-用户体验监控,每月100OCU免费额度
简介: 今天我们说说线程池,线程池为什么来呢?

带着问题去思考!大家好!

今天我们说说线程池,线程池为什么来呢?

  之前我们讲过线程,它的创建和协作的几种方式。花费极少的时间来完成创建很多异步操作。创建线程是昂贵的操作,所以为每个短暂的异步操作创建线程会产生显著的开销

  那么为了解决这一问题,有一个常用的方式叫做池,线程池可以成功的适用于任何需要大量短暂的开销大的资源情形。事先分配一定的资源,将这些资源放到资源池中。每次需要新的资源,只需要从池中获取一个,不需要创建新的。

  通过System.Threading.ThreadPool类型可以使用线程池。线程池是受.NET通用语言运行时(CLR)管理的。每个CLR都有一个线程池的实例,ThreadPool类型拥有一个QueueUserWorkItem静态方法,该静态方法接受一个委托。方法被调用后,委托会进入内部队列中,如果池中没有任何线程,将创建一个新的工作线程并将队列中第一个委托放入到该工作线程中。

注意:保持线程中的操作都是短暂的。

  停止:停止线程池中放置新操作时,线程池最终会删除一定时间后过期的不再使用的线程。这将释放所有那些不再需要的系统资源。

线程池中的的工作线程都是后台线程。这意味着当所有的前台线程(主线程)完成后,所有的后台线程将停止工作。

在线程池中使用委托

这里我们将说到异步编程模型(Asynchronous Programming Model,简称APM)的方式。

privatedelegatestring RunOnThreadPool(outint threadId);         privatestaticvoid Callback(IAsyncResult ar)         {             Console.WriteLine("Starting a callback...");             Console.WriteLine("State passed to a callback:{0}", ar.AsyncState);             Console.WriteLine("Is thread pool thread {0}", Thread.CurrentThread.IsThreadPoolThread);             Console.WriteLine("Thread pool worker thread id:{0}", Thread.CurrentThread.ManagedThreadId);         }         privatestaticstring Test(outint threadId)         {             Console.WriteLine("Starting....");             Console.WriteLine("Is thraed pool thread :{0}", Thread.CurrentThread.IsThreadPoolThread);             Thread.Sleep(TimeSpan.FromSeconds(2));             threadId = Thread.CurrentThread.ManagedThreadId;             returnstring.Format("Thread pool worker thread id was:{0}", threadId);         }         staticvoid Main(string[] args)         {             int threadId = 0;             RunOnThreadPool poolDelegate = Test;             var t = new Thread(() => Test(out threadId));             t.Start();             t.Join();             Console.WriteLine("Thread id:{0}", threadId);             //使用APM方式,进行异步调用,            IAsyncResult r = poolDelegate.BeginInvoke(out threadId, Callback, "a delege asynchronous call");             r.AsyncWaitHandle.WaitOne();             string result = poolDelegate.EndInvoke(out threadId, r);             Console.WriteLine("Thread pool worker thread id:{0}", threadId);             Console.WriteLine(result);             Thread.Sleep(TimeSpan.FromSeconds(2));         }

 

运行结果:

.NET CORE不支持BeginInvoke

image.png

首先开始我们使用以前的方式创建一个线程,然后启动它并等待完成。而TheadPool为开始线程池任务输出的信息,Callback为APM模式运行任务结束后,执行的回调方法。

使用BeginOperationName/EndOperationName方法和.NET中的IAsyncResult对象等方式称为异步编程模型(或APM模式),这样的方法称为异步方法,但在现代编程中,更推荐使用任务并行库(Task Parallel Libary,TPL)来组织异步API

 

向线程池中放入异步操作

 

privatestaticvoid AsyncOperation(object state)         {             Console.WriteLine("Operation state:{0}",state??"(null)");             Console.WriteLine("Worker thread id :{0}", Thread.CurrentThread.ManagedThreadId);             Thread.Sleep(TimeSpan.FromSeconds(2));         }         staticvoid Main(string[] args)         {             constint x = 1;             constint y = 2;             conststring lambdaState = "lambda state 2";             ThreadPool.QueueUserWorkItem(AsyncOperation);             Thread.Sleep(TimeSpan.FromSeconds(1));             ThreadPool.QueueUserWorkItem(AsyncOperation, "async state");             Thread.Sleep(TimeSpan.FromSeconds(1));             ThreadPool.QueueUserWorkItem(state => {                 Console.WriteLine("Operation state:{0}", state);                 Console.WriteLine("Worker thread id :{0}", Thread.CurrentThread.ManagedThreadId);                 Thread.Sleep(TimeSpan.FromSeconds(2));             }, "lambda state");             ThreadPool.QueueUserWorkItem(_ => {                 Console.WriteLine("Operation state:{0},{1}", x+y,lambdaState);                 Console.WriteLine("Worker thread id :{0}", Thread.CurrentThread.ManagedThreadId);                 Thread.Sleep(TimeSpan.FromSeconds(2));             }, "lambda state");         }

定义一个方法,首先接受单个object类型的参数,通过QueueUserWorkItem方法将该方法放到线程池.接着再次放入该方法,但是这次给方法调用传入一个状态对象,该对象将作为状态参数给AsynchronousOperation方法,

闭包机制,无需传递lambda表达式的状态,闭包更灵活,允许我们向异步操作传递一个以上的对象而且这些对象具有静态类型。

线程池与并行度

staticvoid UseThreads(int numberOfOperations)         {             using (var countdown=new CountdownEvent(numberOfOperations))             {                 Console.WriteLine("Scheduling work by creating threads");                 for (int i = 0; i < numberOfOperations; i++)                 {                     var thread = new Thread(()=> {                         Console.Write("{0},", Thread.CurrentThread.ManagedThreadId);                         Thread.Sleep(TimeSpan.FromSeconds(0.1));                         countdown.Signal();//向 CountdownEvent 注册信号,同时减小 CurrentCount 的值。                    });                     thread.Start();                 }                 countdown.Wait();                 Console.WriteLine();             }         }         staticvoid UseThreadPool(int numberOfOperations)         {             using (var countdown=new CountdownEvent(numberOfOperations))             {                 Console.WriteLine("Stating work on a threadpool");                 for (int i = 0; i < numberOfOperations; i++)                 {                     ThreadPool.QueueUserWorkItem(_=> {                         Console.Write("{0},", Thread.CurrentThread.ManagedThreadId);                         Thread.Sleep(TimeSpan.FromSeconds(0.1));                         countdown.Signal();//向 CountdownEvent 注册信号,同时减小 CurrentCount 的值。                    });                 }                 countdown.Wait();                 Console.WriteLine();             }                    }         staticvoid Main(string[] args)         {             constint numberOfOperations = 500;             var sw = new Stopwatch();             sw.Start();             UseThreads(numberOfOperations);             sw.Stop();             Console.WriteLine("Execution time using thraeds:{0}",sw.ElapsedMilliseconds);             sw.Reset();             sw.Start();             UseThreadPool(numberOfOperations);             sw.Stop();             Console.WriteLine("Execution time using thraeds:{0}", sw.ElapsedMilliseconds);         }

当程序启动的时候,创建很多不同的线程,每个线程运行一个操作,该操作打印出线程id并阻塞线程100毫秒,我们创建500个线程,虽然使用了205毫秒,但是线程消耗了大量的操作系统资源。

然后我们执行同样的任务,只不过不为每个操作创建一个线程,将它们放入到线程池当中。线程池在快结束的时候创建更多的线程,但是仍然花费了更多的时间,在我的机器上是接近9秒,我们为操作系统节省了内存和线程数,但是执行时间长了

image.png

image.png

image.png

实现一个取消

staticvoid AsyncOperation1(CancellationToken token)         {             Console.WriteLine("Starting the first task");             for (int i = 0; i < 5; i++)             {                 if (token.IsCancellationRequested)                 {                     Console.WriteLine("The first task has been canceled");                     return;                 }                 Thread.Sleep(TimeSpan.FromSeconds(1));             }             Console.WriteLine("The first task has completed succesfuly");         }         staticvoid AsyncOperation2(CancellationToken token)         {             try            {                 Console.WriteLine("Starting the first task");                 for (int i = 0; i < 5; i++)                 {                     token.ThrowIfCancellationRequested();                     Thread.Sleep(TimeSpan.FromSeconds(1));                 }                 Console.WriteLine("The second task has completed succesfuly");             }             catch (OperationCanceledException)             {                 Console.WriteLine("The second task has been canceld");             }         }         privatestaticvoid AsyncOperation3(CancellationToken token)         {             bool cancellationFlag = false;             token.Register(() => cancellationFlag = true);             Console.WriteLine("Starting the third task");             for (int i = 0; i < 5; i++)             {                 if (cancellationFlag)                 {                     Console.WriteLine("The third task has been canceld");                     return;                 }                 Thread.Sleep(TimeSpan.FromSeconds(1));             }             Console.WriteLine("The third task has been succesfuly");         }         staticvoid Main(string[] args)         {             using (var cts=new CancellationTokenSource())             {                 CancellationToken token = cts.Token;                 ThreadPool.QueueUserWorkItem(_=>AsyncOperation1(token));                 Thread.Sleep(TimeSpan.FromSeconds(2));                 cts.Cancel();             }             using (var cts = new CancellationTokenSource())             {                 CancellationToken token = cts.Token;                 ThreadPool.QueueUserWorkItem(_ => AsyncOperation2(token));                 Thread.Sleep(TimeSpan.FromSeconds(2));                 cts.Cancel();             }             using (var cts = new CancellationTokenSource())             {                 CancellationToken token = cts.Token;                 ThreadPool.QueueUserWorkItem(_ => AsyncOperation3(token));                 Thread.Sleep(TimeSpan.FromSeconds(2));                 cts.Cancel();             }             Thread.Sleep(TimeSpan.FromSeconds(2));         }

image.png

这里我们用了CancellationTokenSource和CancellationToken两个新类,它们在.NET4.0被引入,这里使用了三种方式来实现取消标记功能。

第一种使用轮询来检查CancellationToken.IsCancellationRequested属性。如果该属性为true,则说明操作需要被取消,

第二种是抛出一个OperationCancelledException异常。这允许在操作之外控制取消过程,需要取消操作时,通过操作之外的代码来处理。

最后一种是注册一个回调函数,当操作被取消时,在线程池调用该回调函数,这允许链式传递一个取消逻辑到另一个异步操作中

 

 

在线程池中使用等待事件处理器及超时

static void RunOperations(TimeSpan workOperationTimeOut)         {             using (var evt = new ManualResetEvent(false))             using (var cts=new CancellationTokenSource())             {                 Console.WriteLine("Registering timeout operation....");                 var worker = ThreadPool.RegisterWaitForSingleObject(evt, (state, isTimedOut) => WorkOperationWait(cts, isTimedOut), null, workOperationTimeOut, true);                 Console.WriteLine("Statrting long running operation...");                 ThreadPool.QueueUserWorkItem(_ => WorkOperation(cts.Token, evt));                 Thread.Sleep(workOperationTimeOut.Add(TimeSpan.FromSeconds(2)));                 worker.Unregister(evt);             }         }         private static void WorkOperationWait(CancellationTokenSource cts, bool isTimedOut)         {             if(isTimedOut)             {                 cts.Cancel();                 Console.WriteLine("Worker operation timed out and was canceled");             }             else            {                 Console.WriteLine("Worker operation successed");             }         }         private static void WorkOperation(CancellationToken token,ManualResetEvent evt)         {             for (int i = 0; i < 6; i++)             {                 if(token.IsCancellationRequested)                 {                     return;                 }                 Thread.Sleep(TimeSpan.FromSeconds(1)) ;             }             evt.Set();         }         static void Main(string[] args)         {             RunOperations(TimeSpan.FromSeconds(5));             RunOperations(TimeSpan.FromSeconds(7));         }


image.png

线程池还有一个有用的方法:ThreadPool.RegisterWaitForSingleObject.该方法允许我们将回调函数放入线程池中的队列中。当提供的等待事件处理器收到信号或发生超时时,该回调函数将被调用,这允许我们为线程池中的操作实现超时功能。

首先按顺序向线程池中放入一个耗时长的操作,它运行6秒,一旦完成,会设置一个ManualResetEvent信号类。其他情况。比如取消,则操作会被丢弃、

注册第二个异步操作,当从ManualResetEvent对象接受一个信号后,该异步操作会被调用。如果一个操作顺利完成,会设置该信号量。另一种情况是第一个操作还未完成就已经超时,如果发生该情况,会使用CancellationToken来取消第一个操作。

最后,为操作提供5秒的超时时间是不够的,这是因为操作会花费6秒来完成,只能取消该操作。所以如果提供7秒的超时时间是可行的。该操作会顺利完成。

 

相关实践学习
通过云拨测对指定服务器进行Ping/DNS监测
本实验将通过云拨测对指定服务器进行Ping/DNS监测,评估网站服务质量和用户体验。
相关文章
|
3月前
|
存储 监控 Java
Java多线程优化:提高线程池性能的技巧与实践
Java多线程优化:提高线程池性能的技巧与实践
121 1
|
3月前
|
存储 监控 安全
一天十道Java面试题----第三天(对线程安全的理解------>线程池中阻塞队列的作用)
这篇文章是Java面试第三天的笔记,讨论了线程安全、Thread与Runnable的区别、守护线程、ThreadLocal原理及内存泄漏问题、并发并行串行的概念、并发三大特性、线程池的使用原因和解释、线程池处理流程,以及线程池中阻塞队列的作用和设计考虑。
|
28天前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
104 38
|
26天前
|
Java
线程池内部机制:线程的保活与回收策略
【10月更文挑战第24天】 线程池是现代并发编程中管理线程资源的一种高效机制。它不仅能够复用线程,减少创建和销毁线程的开销,还能有效控制并发线程的数量,提高系统资源的利用率。本文将深入探讨线程池中线程的保活和回收机制,帮助你更好地理解和使用线程池。
52 2
|
28天前
|
Prometheus 监控 Cloud Native
JAVA线程池监控以及动态调整线程池
【10月更文挑战第22天】在 Java 中,线程池的监控和动态调整是非常重要的,它可以帮助我们更好地管理系统资源,提高应用的性能和稳定性。
65 4
|
28天前
|
Prometheus 监控 Cloud Native
在 Java 中,如何使用线程池监控以及动态调整线程池?
【10月更文挑战第22天】线程池的监控和动态调整是一项重要的任务,需要我们结合具体的应用场景和需求,选择合适的方法和策略,以确保线程池始终处于最优状态,提高系统的性能和稳定性。
110 2
|
2月前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
143 29
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
1月前
|
Dubbo Java 应用服务中间件
剖析Tomcat线程池与JDK线程池的区别和联系!
剖析Tomcat线程池与JDK线程池的区别和联系!
114 0
剖析Tomcat线程池与JDK线程池的区别和联系!
|
2月前
|
Java
直接拿来用:进程&进程池&线程&线程池
直接拿来用:进程&进程池&线程&线程池
|
1月前
|
设计模式 Java 物联网
【多线程-从零开始-玖】内核态,用户态,线程池的参数、使用方法详解
【多线程-从零开始-玖】内核态,用户态,线程池的参数、使用方法详解
58 0
下一篇
无影云桌面