python实现线程池

简介:

什么是线程池?
     诸如web服务器、数据库服务器、文件服务器和邮件服务器等许多服务器应用都面向处理来自某些远程来源的大量短小的任务。构建服务器应用程序的一个过于简单的模型是:每当一个请求到达就创建一个新的服务对象,然后在新的服务对象中为请求服务。但当有大量请求并发访问时,服务器不断的创建和销毁对象的开销很大。
所以提高服务器效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁,这样就引入了“池”的概念,
“池”的概念使得人们可以定制一定量的资源,然后对这些资源进行反复的使用用,而不是频繁的创建和销毁这些资源。

线程池是预先创建线程的一种技术
     这些线程都是处于睡眠状态,即均为启动,不消耗CPU,而只是占用较小的内存空间。
当请求到来之后,缓冲池给这次请求分配一个空闲线程,把请求传入此线程中运行,进行处理。
当预先创建的线程都处于运行状态,即预制线程不够,线程池可以自由创建一定数量的新线程,用于处理更多的请求。
当系统比较闲的时候,也可以通过移除一部分一直处于停用状态的线程。

线程池的注意事项
     虽然线程池是构建多线程应用程序的强大机制,但使用它并不是没有风险的。在使用线程池时需注意线程池大小与性能的关系,注意并发风险、死锁、资源不足和线程泄漏等问题。
1、线程池大小。多线程应用并非线程越多越好,需要根据系统运行的软硬件环境以及应用本身的特点决定线程池的大小。

一般来说,如果代码结构合理的话,线程数目与CPU 数量相适合即可。
如果线程运行时可能出现阻塞现象,可相应增加池的大小;如有必要可采用自适应算法来动态调整线程池的大小,以提高CPU 的有效利用率和系统的整体性能。
2、并发错误。多线程应用要特别注意并发错误,要从逻辑上保证程序的正确性,注意避免死锁现象的发生。
3、线程泄漏。这是线程池应用中一个严重的问题,当任务执行完毕而线程没能返回池中就会发生线程泄漏现象。


线程池要点:

1、通过判断等待的任务数量和线程池中的最大值,取最小值来判断开启多少线程来工作

比如:

任务数是3,进程池最大20  ,那么咱们只需要开启3个线程就行了。

任务数是500,进程池是20,那么咱们只开20个线程就可以了。

 取最小值

2、实现线程池正在运行,有一个查看的功能,查看一下现在线程里面活跃的线程是多少等待的是多少?

线程总共是多少,等待中多少,正在运行中多少

作用:

方便查看当前线程池状态

能获取到这个之后就可以当线程一直处于空闲状态

查看状态用:上下文管理来做,非常nice的一点


3、关闭线程


简单线程池实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#!/usr/bin/env python
#-*- coding:utf-8 -*-
__author__  =  'luo_t'
import  Queue
import  threading
import  time
'''
这个简单的例子的想法是通过:
1、利用Queue特性,在Queue里创建多个线程对象
2、那我执行代码的时候,去queue里去拿线程!
如果线程池里有可用的,直接拿。
如果线程池里没有可用,那就等。
3、线程执行完毕,归还给线程池
'''
class  ThreadPool( object ):  #创建线程池类
     def  __init__( self ,max_thread = 20 ): #构造方法,设置默认最大的线程数为20(如果不传值的时候)
         self .queue  =  Queue.Queue(max_thread)  #创建一个队列
         for  in  xrange (max_thread): #循环把线程对象加入到队列中
             self .queue.put(threading.Thread)
             #把线程的类名放进去,执行完这个Queue
     def  get_thread( self ): #定义方法从队列里获取线程
         return  self .queue.get()
     def  add_thread( self ): #定义方法在队列里添加线程
         self .queue.put(threading.Thread)
pool  =  ThreadPool( 10 )
def  func(arg,p):
     print  arg
     time.sleep( 2 )
     p.add_thread()  #当前线程执行完了,我在队列里加一个线程!
for  in  xrange ( 300 ):
     thread  =  pool.get_thread() #线程池10个线程,每一次循环拿走一个!默认queue.get(),如果队列里没有数据就会等待。
     =  thread(target = func,args = (i,pool))
     t.start()
'''
self.queue.put(threading.Thread) 添加的是类不是对象,在内存中如果相同的类只占一份内存空间并且如果这里存储的是对象的话每次都的新增都得在内存中开辟一段内存空间
还有如果是对象的话:下面的这个语句就不能这么调用了!
for i in xrange(300):
     thread = pool.get_thread()
     t = thread(target=func,args=(i,pool))
     t.start()
     通过查看源码可以知道,在thread的构造函数中:self.__args = args  self.__target = target  都是私有字段那么调用就应该这么写
for i in xrange(300):
     ret = pool.get_thread()
     ret._Thread__target = func
     ret._Thread__args = (i,pool)
     ret.start()
'''


复杂线程池需要知道的知识点

知识点①

1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python
#-*- coding:utf-8 -*-
 
import  Queue
obj  =  object ()  #object也是一个类,我创建了一个对象obj
=  Queue.Queue()
for  in  range ( 10 ):
     print  id (obj) #看萝卜号
     q.put(obj)
'''
这个队列里有10个萝卜(萝卜=obj),但是这10个萝卜只是个投影。
我们在for循环的时候put到队列里,obj有变化吗?是否有新开辟空间?显然没有
'''


知识点②

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import  contextlib
import  threading
import  time
import  random
doing  =  []
def  number(l2):
     while  True :
         print  len (l2)
         time.sleep( 1 )
=  threading.Thread(target = number,args = (doing,))   #开启一个线程,每一秒打印列表,当前工作中的线程数量
t.start()
#添加管理上下文的装饰器
@contextlib .contextmanager
def  show(li,iterm):
     li.append(iterm)
     yield
     '''
     yield冻结这次操作,就出去了,with就会捕捉到,然后就会执行with下的代码块,当with下的代码块
     执行完毕后就会回来继续执行yield下面没有执行的代码块!
     然后就执行完毕了
     如果with代码块中的非常耗时,那么doing的长度是不是一直是1,说明他没执行完呢?我们就可以获取到正在执行的数量,当他with执行完毕后
     执行yield的后续的代码块。把他移除后就为0了!
     '''
     li.remove(iterm)
def  task(arg):
     with show(doing, 1 ): #通过with管理上下文进行切换
         print  len (doing)
         time.sleep( 10 #等待10秒这里可以使用random模块来操作~
for  in  range ( 20 ):  #开启20个线程执行
     temp  =  threading.Thread(target = task,args = (i,))
     temp.start()
'''
作用:我们要记录正在工作的的列表
比如正在工作的线程我把加入到doing这个列表中,如果工作完成的把它从doing列表中移除。
通过这个机制,就可以获取现在正在执行的线程都有多少
'''



线程池的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#!/usr/bin/env python
#-*- coding:utf-8 -*-
from  Queue  import  Queue
import  contextlib
import  threading
WorkerStop  =  object ()
class  ThreadPool:
     workers  =  0
     threadFactory  =  threading.Thread
     currentThread  =  staticmethod (threading.currentThread)
     def  __init__( self , maxthreads = 20 , name = None ):
         self .q  =  Queue( 0 #这里创建一个队列,如果是0的话表示不限制,现在这个队列里放的是任务
         self . max  =  maxthreads  #定义最大线程数
         self .name  =  name
         self .waiters  =  [] #这两个是用来计数的
         self .working  =  [] #这两个是用来技术的
     def  start( self ):
         #self.max 最大线程数
         #q.qisze(),任务个数
         needSize  =  self .q.qsize()
         while  self .workers <  min ( self . max , needSize): #min(10,20)取最小值
             #wokers默认为0  【workers = 0】
             '''
             举例来说:
             while self.workers < min(self.max, needSize):
             这个循环,比如最大线程为20,咱们的任务个数为10,取最小值为10
             每次循环开1个线程,并且workers自增1,那么循环10次后,开了10个线程了workers = 10 ,那么workers就不小于10了
             就不开线程了,我线程开到最大了,你们这10个线程去消耗这10个任务去吧
             并且这里不阻塞,创建完线程就去执行了!
             每一个线程都去执行_worker方法去了
             '''
             self .startAWorker()
     def  startAWorker( self ):
         self .workers  + =  1
         newThread  =  self .threadFactory(target = self ._worker, name = 'shuaige' #创建一个线程并去执行_worker方法
         newThread.start()
     def  callInThread( self , func,  * args,  * * kw):
         self .callInThreadWithCallback( None , func,  * args,  * * kw)
     def  callInThreadWithCallback( self , onResult, func,  * args,  * * kw):
         =  (func, args, kw, onResult)
         self .q.put(o)
     @contextlib.contextmanager
     def  _workerState( self , stateList, workerThread):
         stateList.append(workerThread)
         try :
             yield
         finally :
             stateList.remove(workerThread)
     def  _worker( self ):
         ct  =  self .currentThread()
         =  self .q.get()  #去队列里取任务,如果有任务就O就会有值,每个任务是个元组,有方法,有参数
         while  is  not  WorkerStop:
             with  self ._workerState( self .working, ct):   #上下文切换
                 function, args, kwargs, onResult  =  o
                 del  o
                 try :
                     result  =  function( * args,  * * kwargs)
                     success  =  True
                 except :
                     success  =  False
                     if  onResult  is  None :
                         pass
                     else :
                         pass
                 del  function, args, kwargs
                 if  onResult  is  not  None :
                     try :
                         onResult(success, result)
                     except :
                         #context.call(ctx, log.err)
                         pass
                 del  onResult, result
             with  self ._workerState( self .waiters, ct):  #当线程工作完闲暇的时候,在去取任务执行
                 =  self .q.get()
     def  stop( self ):  #定义关闭线程方法
         while  self .workers:  #循环workers值
             self .q.put(WorkerStop)  #在队列中增加一个信号~
             self .workers  - =  1  #workers值-1 直到所有线程关闭
def  show(arg):
     import  time
     time.sleep( 1 )
     print  arg
pool  =  ThreadPool( 10 )
#创建500个任务,队列里添加了500个任务
#每个任务都是一个元组(方法名,动态参数,动态参数,默认为NoNe)
for  in  range ( 100 ):
     pool.callInThread(show, i)
pool.start()   #队列添加完成之后,开启线程让线程一个一个去队列里去拿
pool.stop()  #当上面的任务都执行完之后,线程中都在等待着在队列里去数据呢!
'''
我们要关闭所有的线程,执行stop方法,首先workers这个值是当前的线程数量,我们给线程发送一个信号“WorkerStop”
在线程的工作里:        while o is not WorkerStop:   如果线程获取到这个值就不执行了,然后这个线程while循环就停止了,等待
python的垃圾回收机制,回收。
然后在self.workers -= 1 ,那么所有的线程收到这个信号之后就会停止!!!
over~
'''


更多请参考:http://www.cnblogs.com/wupeiqi/articles/4839959.html

      http://www.cnblogs.com/luotianshuai/p/5131001.html


      本文转自027ryan  51CTO博客,原文链接:http://blog.51cto.com/ucode/1766332,如需转载请自行联系原作者




相关文章
聊聊python多线程与多进程
为什么要使用多进程与多线程呢? 因为我们如果按照流程一步步执行任务实在是太慢了,假如一个任务就是10秒,两个任务就是20秒,那100个任务呢?况且cpu这么贵,时间长了就是浪费生命啊!一个任务比喻成一个人,别个做高铁,你做绿皮火车,可想而知!接下来我们先看个例子:
|
22天前
|
Python
|
24天前
|
安全 调度 Python
探索Python中的并发编程:协程与多线程的比较
本文将深入探讨Python中的并发编程技术,重点比较协程与多线程的特点和应用场景。通过对协程和多线程的原理解析,以及在实际项目中的应用案例分析,读者将能够更好地理解两种并发编程模型的异同,并在实践中选择合适的方案来提升Python程序的性能和效率。
|
2天前
|
Python
Python多线程中递归锁如何解决死锁问题的详细阐述
Python多线程中递归锁如何解决死锁问题的详细阐述
|
2天前
|
安全 Python
Python多线程中的死锁与递归锁
Python多线程中的死锁与递归锁
|
9天前
|
分布式计算 安全 Java
Python 多线程
Python 多线程
17 0
|
10天前
|
数据采集 存储 C++
单线程 vs 多进程:Python网络爬虫效率对比
本文探讨了Python网络爬虫中的单线程与多进程应用。单线程爬虫实现简单,但处理速度慢,无法充分利用多核CPU。而多进程爬虫通过并行处理提高效率,更适合现代多核架构。代码示例展示了如何使用代理IP实现单线程和多进程爬虫,显示了多进程在效率上的优势。实际使用时还需考虑代理稳定性和反爬策略。
单线程 vs 多进程:Python网络爬虫效率对比
|
11天前
|
Python
深度解析Python中的多线程编程
深度解析Python中的多线程编程
32 1
|
15天前
|
并行计算 Python
Python并发编程与多线程
Python编程中,多线程和并发编程是优化复杂任务执行的关键。借助标准库中的`threading`模块,可实现多线程,如示例所示,创建线程并执行函数。然而,由于全局解释器锁(GIL),多线程在CPU密集型任务中并不高效。对于I/O密集型任务,多线程仍能提高效率。为充分利用多核,可采用多进程(如`multiprocessing`模块)或异步编程。选择技术时需依据任务类型和性能需求。
|
15天前
|
消息中间件 安全 调度
基于Python的性能优化(线程、协程、进程)
一、多线程 在CPU不密集、IO密集的任务下,多线程可以一定程度的提升运行效率。