python实现线程池

简介:

什么是线程池?
     诸如web服务器、数据库服务器、文件服务器和邮件服务器等许多服务器应用都面向处理来自某些远程来源的大量短小的任务。构建服务器应用程序的一个过于简单的模型是:每当一个请求到达就创建一个新的服务对象,然后在新的服务对象中为请求服务。但当有大量请求并发访问时,服务器不断的创建和销毁对象的开销很大。
所以提高服务器效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁,这样就引入了“池”的概念,
“池”的概念使得人们可以定制一定量的资源,然后对这些资源进行反复的使用用,而不是频繁的创建和销毁这些资源。

线程池是预先创建线程的一种技术
     这些线程都是处于睡眠状态,即均为启动,不消耗CPU,而只是占用较小的内存空间。
当请求到来之后,缓冲池给这次请求分配一个空闲线程,把请求传入此线程中运行,进行处理。
当预先创建的线程都处于运行状态,即预制线程不够,线程池可以自由创建一定数量的新线程,用于处理更多的请求。
当系统比较闲的时候,也可以通过移除一部分一直处于停用状态的线程。

线程池的注意事项
     虽然线程池是构建多线程应用程序的强大机制,但使用它并不是没有风险的。在使用线程池时需注意线程池大小与性能的关系,注意并发风险、死锁、资源不足和线程泄漏等问题。
1、线程池大小。多线程应用并非线程越多越好,需要根据系统运行的软硬件环境以及应用本身的特点决定线程池的大小。

一般来说,如果代码结构合理的话,线程数目与CPU 数量相适合即可。
如果线程运行时可能出现阻塞现象,可相应增加池的大小;如有必要可采用自适应算法来动态调整线程池的大小,以提高CPU 的有效利用率和系统的整体性能。
2、并发错误。多线程应用要特别注意并发错误,要从逻辑上保证程序的正确性,注意避免死锁现象的发生。
3、线程泄漏。这是线程池应用中一个严重的问题,当任务执行完毕而线程没能返回池中就会发生线程泄漏现象。


线程池要点:

1、通过判断等待的任务数量和线程池中的最大值,取最小值来判断开启多少线程来工作

比如:

任务数是3,进程池最大20  ,那么咱们只需要开启3个线程就行了。

任务数是500,进程池是20,那么咱们只开20个线程就可以了。

 取最小值

2、实现线程池正在运行,有一个查看的功能,查看一下现在线程里面活跃的线程是多少等待的是多少?

线程总共是多少,等待中多少,正在运行中多少

作用:

方便查看当前线程池状态

能获取到这个之后就可以当线程一直处于空闲状态

查看状态用:上下文管理来做,非常nice的一点


3、关闭线程


简单线程池实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#!/usr/bin/env python
#-*- coding:utf-8 -*-
__author__  =  'luo_t'
import  Queue
import  threading
import  time
'''
这个简单的例子的想法是通过:
1、利用Queue特性,在Queue里创建多个线程对象
2、那我执行代码的时候,去queue里去拿线程!
如果线程池里有可用的,直接拿。
如果线程池里没有可用,那就等。
3、线程执行完毕,归还给线程池
'''
class  ThreadPool( object ):  #创建线程池类
     def  __init__( self ,max_thread = 20 ): #构造方法,设置默认最大的线程数为20(如果不传值的时候)
         self .queue  =  Queue.Queue(max_thread)  #创建一个队列
         for  in  xrange (max_thread): #循环把线程对象加入到队列中
             self .queue.put(threading.Thread)
             #把线程的类名放进去,执行完这个Queue
     def  get_thread( self ): #定义方法从队列里获取线程
         return  self .queue.get()
     def  add_thread( self ): #定义方法在队列里添加线程
         self .queue.put(threading.Thread)
pool  =  ThreadPool( 10 )
def  func(arg,p):
     print  arg
     time.sleep( 2 )
     p.add_thread()  #当前线程执行完了,我在队列里加一个线程!
for  in  xrange ( 300 ):
     thread  =  pool.get_thread() #线程池10个线程,每一次循环拿走一个!默认queue.get(),如果队列里没有数据就会等待。
     =  thread(target = func,args = (i,pool))
     t.start()
'''
self.queue.put(threading.Thread) 添加的是类不是对象,在内存中如果相同的类只占一份内存空间并且如果这里存储的是对象的话每次都的新增都得在内存中开辟一段内存空间
还有如果是对象的话:下面的这个语句就不能这么调用了!
for i in xrange(300):
     thread = pool.get_thread()
     t = thread(target=func,args=(i,pool))
     t.start()
     通过查看源码可以知道,在thread的构造函数中:self.__args = args  self.__target = target  都是私有字段那么调用就应该这么写
for i in xrange(300):
     ret = pool.get_thread()
     ret._Thread__target = func
     ret._Thread__args = (i,pool)
     ret.start()
'''


复杂线程池需要知道的知识点

知识点①

1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python
#-*- coding:utf-8 -*-
 
import  Queue
obj  =  object ()  #object也是一个类,我创建了一个对象obj
=  Queue.Queue()
for  in  range ( 10 ):
     print  id (obj) #看萝卜号
     q.put(obj)
'''
这个队列里有10个萝卜(萝卜=obj),但是这10个萝卜只是个投影。
我们在for循环的时候put到队列里,obj有变化吗?是否有新开辟空间?显然没有
'''


知识点②

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import  contextlib
import  threading
import  time
import  random
doing  =  []
def  number(l2):
     while  True :
         print  len (l2)
         time.sleep( 1 )
=  threading.Thread(target = number,args = (doing,))   #开启一个线程,每一秒打印列表,当前工作中的线程数量
t.start()
#添加管理上下文的装饰器
@contextlib .contextmanager
def  show(li,iterm):
     li.append(iterm)
     yield
     '''
     yield冻结这次操作,就出去了,with就会捕捉到,然后就会执行with下的代码块,当with下的代码块
     执行完毕后就会回来继续执行yield下面没有执行的代码块!
     然后就执行完毕了
     如果with代码块中的非常耗时,那么doing的长度是不是一直是1,说明他没执行完呢?我们就可以获取到正在执行的数量,当他with执行完毕后
     执行yield的后续的代码块。把他移除后就为0了!
     '''
     li.remove(iterm)
def  task(arg):
     with show(doing, 1 ): #通过with管理上下文进行切换
         print  len (doing)
         time.sleep( 10 #等待10秒这里可以使用random模块来操作~
for  in  range ( 20 ):  #开启20个线程执行
     temp  =  threading.Thread(target = task,args = (i,))
     temp.start()
'''
作用:我们要记录正在工作的的列表
比如正在工作的线程我把加入到doing这个列表中,如果工作完成的把它从doing列表中移除。
通过这个机制,就可以获取现在正在执行的线程都有多少
'''



线程池的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#!/usr/bin/env python
#-*- coding:utf-8 -*-
from  Queue  import  Queue
import  contextlib
import  threading
WorkerStop  =  object ()
class  ThreadPool:
     workers  =  0
     threadFactory  =  threading.Thread
     currentThread  =  staticmethod (threading.currentThread)
     def  __init__( self , maxthreads = 20 , name = None ):
         self .q  =  Queue( 0 #这里创建一个队列,如果是0的话表示不限制,现在这个队列里放的是任务
         self . max  =  maxthreads  #定义最大线程数
         self .name  =  name
         self .waiters  =  [] #这两个是用来计数的
         self .working  =  [] #这两个是用来技术的
     def  start( self ):
         #self.max 最大线程数
         #q.qisze(),任务个数
         needSize  =  self .q.qsize()
         while  self .workers <  min ( self . max , needSize): #min(10,20)取最小值
             #wokers默认为0  【workers = 0】
             '''
             举例来说:
             while self.workers < min(self.max, needSize):
             这个循环,比如最大线程为20,咱们的任务个数为10,取最小值为10
             每次循环开1个线程,并且workers自增1,那么循环10次后,开了10个线程了workers = 10 ,那么workers就不小于10了
             就不开线程了,我线程开到最大了,你们这10个线程去消耗这10个任务去吧
             并且这里不阻塞,创建完线程就去执行了!
             每一个线程都去执行_worker方法去了
             '''
             self .startAWorker()
     def  startAWorker( self ):
         self .workers  + =  1
         newThread  =  self .threadFactory(target = self ._worker, name = 'shuaige' #创建一个线程并去执行_worker方法
         newThread.start()
     def  callInThread( self , func,  * args,  * * kw):
         self .callInThreadWithCallback( None , func,  * args,  * * kw)
     def  callInThreadWithCallback( self , onResult, func,  * args,  * * kw):
         =  (func, args, kw, onResult)
         self .q.put(o)
     @contextlib.contextmanager
     def  _workerState( self , stateList, workerThread):
         stateList.append(workerThread)
         try :
             yield
         finally :
             stateList.remove(workerThread)
     def  _worker( self ):
         ct  =  self .currentThread()
         =  self .q.get()  #去队列里取任务,如果有任务就O就会有值,每个任务是个元组,有方法,有参数
         while  is  not  WorkerStop:
             with  self ._workerState( self .working, ct):   #上下文切换
                 function, args, kwargs, onResult  =  o
                 del  o
                 try :
                     result  =  function( * args,  * * kwargs)
                     success  =  True
                 except :
                     success  =  False
                     if  onResult  is  None :
                         pass
                     else :
                         pass
                 del  function, args, kwargs
                 if  onResult  is  not  None :
                     try :
                         onResult(success, result)
                     except :
                         #context.call(ctx, log.err)
                         pass
                 del  onResult, result
             with  self ._workerState( self .waiters, ct):  #当线程工作完闲暇的时候,在去取任务执行
                 =  self .q.get()
     def  stop( self ):  #定义关闭线程方法
         while  self .workers:  #循环workers值
             self .q.put(WorkerStop)  #在队列中增加一个信号~
             self .workers  - =  1  #workers值-1 直到所有线程关闭
def  show(arg):
     import  time
     time.sleep( 1 )
     print  arg
pool  =  ThreadPool( 10 )
#创建500个任务,队列里添加了500个任务
#每个任务都是一个元组(方法名,动态参数,动态参数,默认为NoNe)
for  in  range ( 100 ):
     pool.callInThread(show, i)
pool.start()   #队列添加完成之后,开启线程让线程一个一个去队列里去拿
pool.stop()  #当上面的任务都执行完之后,线程中都在等待着在队列里去数据呢!
'''
我们要关闭所有的线程,执行stop方法,首先workers这个值是当前的线程数量,我们给线程发送一个信号“WorkerStop”
在线程的工作里:        while o is not WorkerStop:   如果线程获取到这个值就不执行了,然后这个线程while循环就停止了,等待
python的垃圾回收机制,回收。
然后在self.workers -= 1 ,那么所有的线程收到这个信号之后就会停止!!!
over~
'''


更多请参考:http://www.cnblogs.com/wupeiqi/articles/4839959.html

      http://www.cnblogs.com/luotianshuai/p/5131001.html


      本文转自027ryan  51CTO博客,原文链接:http://blog.51cto.com/ucode/1766332,如需转载请自行联系原作者




相关文章
|
5月前
|
数据采集 存储 JSON
Python爬取知乎评论:多线程与异步爬虫的性能优化
Python爬取知乎评论:多线程与异步爬虫的性能优化
|
5月前
|
人工智能 安全 调度
Python并发编程之线程同步详解
并发编程在Python中至关重要,线程同步确保多线程程序正确运行。本文详解线程同步机制,包括互斥锁、信号量、事件、条件变量和队列,探讨全局解释器锁(GIL)的影响及解决线程同步问题的最佳实践,如避免全局变量、使用线程安全数据结构、精细化锁的使用等。通过示例代码帮助开发者理解并提升多线程程序的性能与可靠性。
214 0
|
2月前
|
Java 调度 数据库
Python threading模块:多线程编程的实战指南
本文深入讲解Python多线程编程,涵盖threading模块的核心用法:线程创建、生命周期、同步机制(锁、信号量、条件变量)、线程通信(队列)、守护线程与线程池应用。结合实战案例,如多线程下载器,帮助开发者提升程序并发性能,适用于I/O密集型任务处理。
333 0
|
4月前
|
数据采集 消息中间件 并行计算
Python多线程与多进程性能对比:从原理到实战的深度解析
在Python编程中,多线程与多进程是提升并发性能的关键手段。本文通过实验数据、代码示例和通俗比喻,深入解析两者在不同任务类型下的性能表现,帮助开发者科学选择并发策略,优化程序效率。
374 1
|
5月前
|
数据采集 监控 调度
干货分享“用 多线程 爬取数据”:单线程 + 协程的效率反超 3 倍,这才是 Python 异步的正确打开方式
在 Python 爬虫中,多线程因 GIL 和切换开销效率低下,而协程通过用户态调度实现高并发,大幅提升爬取效率。本文详解协程原理、实战对比多线程性能,并提供最佳实践,助你掌握异步爬虫核心技术。
|
6月前
|
JSON 算法 Java
打造终端里的下载利器:Python实现可恢复式多线程下载器
在数字时代,大文件下载已成为日常需求。本文教你用Python打造专业级下载器,支持断点续传、多线程加速、速度限制等功能,显著提升终端下载体验。内容涵盖智能续传、多线程分块下载、限速控制及Rich库构建现代终端界面,助你从零构建高效下载工具。
445 1
|
5月前
|
数据采集 存储 Java
多线程Python爬虫:加速大规模学术文献采集
多线程Python爬虫:加速大规模学术文献采集
|
6月前
|
数据采集 网络协议 前端开发
Python多线程爬虫模板:从原理到实战的完整指南
多线程爬虫通过并发请求大幅提升数据采集效率,适用于大规模网页抓取。本文详解其原理与实现,涵盖任务队列、线程池、会话保持、异常处理、反爬对抗等核心技术,并提供可扩展的Python模板代码,助力高效稳定的数据采集实践。
332 0
|
安全 数据处理 开发者
Python中的多线程编程:从入门到精通
本文将深入探讨Python中的多线程编程,包括其基本原理、应用场景、实现方法以及常见问题和解决方案。通过本文的学习,读者将对Python多线程编程有一个全面的认识,能够在实际项目中灵活运用。

推荐镜像

更多