Python自定义进程池(生产者/消费者模型)

简介:

代码说明一切:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#encoding=utf-8
#author: walker
#date: 2014-05-21
#summary: 自定义进程池遍历目录下文件
 
from  multiprocessing  import  Process, Queue, Lock
import  time, os
 
#消费者
class  Consumer(Process):
     def  __init__( self , queue, ioLock):
         super (Consumer,  self ).__init__()
         self .queue  =  queue
         self .ioLock  =  ioLock
         
     def  run( self ):
         while  True :
             task  =  self .queue.get()    #队列中无任务时,会阻塞进程
             if  isinstance (task,  str and  task  = =  'quit' :
                 break ;
             time.sleep( 1 )    #假定任务处理需要1秒钟
             self .ioLock.acquire()
             print str (os.getpid())  +  '  '  +  task)
             self .ioLock.release()
         self .ioLock.acquire()
         print  'Bye-bye'
         self .ioLock.release()
 
#生产者       
def  Producer():
     queue  =  Queue()     #这个队列是进程/线程安全的
     ioLock  =  Lock()
     subNum  =  4     #子进程数量
     workers  =  build_worker_pool(queue, ioLock, subNum)
     start_time  =  time.time()
     
     for  parent, dirnames, filenames  in  os.walk(r 'D:\test' ):
         for  filename  in  filenames:
             queue.put(filename)
             ioLock.acquire()   
             print ( 'qsize:'  +  str (queue.qsize()))
             ioLock.release()
             while  queue.qsize() > subNum  *  10 :   #控制队列中任务数量
                 time.sleep( 1 )
             
     for  worker  in  workers:
         queue.put( 'quit' )
         
     for  worker  in  workers:
         worker.join()
     
     ioLock.acquire()   
     print ( 'Done! Time taken: {}' . format (time.time()  -  start_time))
     ioLock.release()
 
#创建进程池
def  build_worker_pool(queue, ioLock, size):
     workers  =  []
     for  in  range (size):
         worker  =  Consumer(queue, ioLock)
         worker.start()
         workers.append(worker)
     return  workers
     
if  __name__  = =  '__main__' :
     Producer()

ps:

1
2
3
self .ioLock.acquire()  
...
self .ioLock.release()

可用

1
2
with  self .ioLock:
     ...

替代。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#encoding=utf-8
#author: walker
#date: 2017-03-14
#summary: 一个子进程生产,一个子进程消费
  
import  os, sys, time
from  multiprocessing  import  Process, Pool, Queue, Manager
 
#生产
def  Produce(q):
     print ( 'Produce %d ...'  %  os.getpid())
     for  in  range ( 1 20 ):
         while  q.full():
             print ( 'sleep %d/%d ...'  %  (i, q.qsize()))
             time.sleep( 1 )
         q.put(i)
         
     q.put( 0 )         #用0通知结束
 
#消费
def  Consume(q):
     print ( 'Consume %d ...'  %  os.getpid())
     while  True :
         num  =  q.get()
         if  0  = =  num:  #收到结束信号
             print ( 'receive 0' )
             break
         print ( 'Consumer '  +  str (num))
         time.sleep( 2 )
         print ( 'Consumer end '  +  str (num))
  
if  __name__  = =  '__main__'
     =  Queue( 10 )              #可用
     =  Manager().Queue( 10 )        #可用
     
     print (os.getpid())
     
     producerProcess  =  Process(target = Produce, args = (q,))      #生产进程
     consumerProcess  =  Process(target = Consume, args = (q,))      #消费进程
     
     producerProcess.start()
     consumerProcess.start()
     
     producerProcess.join()
     consumerProcess.join()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#encoding=utf-8
#author: walker
#date: 2017-03-14
#summary: 一个子进程生产,进程池消费
  
import  os, sys, time
from  multiprocessing  import  Process, Pool, Queue, Manager
 
#生产
def  Produce(q, poolSize):
     print ( 'Produce ...' )
     for  in  range ( 1 100 ):
         while  q.full():
             print ( 'sleep %d/%d ...'  %  (i, q.qsize()))
             time.sleep( 1 )
         q.put(i)
         
     for  in  range ( 0 , poolSize):
         q.put( 0 )         #用0通知结束
 
#消费
def  Consume(q):
     print ( 'Consume ...' )
     while  True :
         num  =  q.get()
         if  0  = =  num:  #收到结束信号
             print ( 'receive 0' )
             break
         print ( 'Consumer '  +  str (num))
         time.sleep( 2 )
         print ( 'Consumer end '  +  str (num))
  
if  __name__  = =  '__main__'
     #q = Queue(10)                #不可用
     =  Manager().Queue( 10 )        #可用
     
     poolSize  =  4
     producerProcess  =  Process(target = Produce, args = (q, poolSize))        #生产进程
     consumerPool  =  Pool(processes = poolSize)    #消费进程池,默认子进程个数为os.cpu_count()
     for  in  range ( 0 , poolSize):
         consumerPool.apply_async(func = Consume, args = (q,))
     
     producerProcess.start()
     consumerPool.close()
     
     producerProcess.join()
     consumerPool.join()
    

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#encoding=utf-8
#author: walker
#date: 2017-03-14
#summary: 主进程生产,进程池消费
  
import  os, sys, time
from  multiprocessing  import  Process, Pool, Queue, Manager
 
#消费
def  Consume(q):
     print ( 'Consume ...' )
     num  =  q.get()
     print ( 'Consume %d ...'  %  num)
     time.sleep( 2 )
     print ( 'Consumer %d over'  %  num)
      
if  __name__  = =  '__main__' :    
     #q = Queue(10)                #不可用
     =  Manager().Queue( 10 )        #可用
     
     pool  =  Pool(processes  =  4 )
     for  in  range ( 1 100 ):      #生产
         while  q.full():
             print ( 'sleep %d ...'  %  q.qsize())
             time.sleep( 1 )
         q.put(i)
         print (i)
         pool.apply_async(Consume, (q,))
          
     pool.close()
     pool.join()

*** Updated 2016-01-06 ***

一个好玩的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#encoding=utf-8
#author: walker
#date: 2016-01-06
#summary: 一个多进程的好玩例子
 
import  os, sys, time
from  multiprocessing  import  Pool
 
cur_dir_fullpath  =  os.path.dirname(os.path.abspath(__file__))
 
g_List  =  [ 'a' ]
 
#修改全局变量g_List
def  ModifyDict_1():
     global  g_List
     g_List.append( 'b' )
 
#修改全局变量g_List    
def  ModifyDict_2():
     global  g_List
     g_List.append( 'c' )
  
#处理一个
def  ProcOne(num):
     print ( 'ProcOne '  +  str (num)  +  ', g_List:'  +  repr (g_List))
 
#处理所有
def  ProcAll():    
     pool  =  Pool(processes  =  4 )
     for  in  range ( 1 20 ):
         #ProcOne(i)
         #pool.apply(ProcOne, (i,))
         pool.apply_async(ProcOne, (i,))
         
     pool.close()
     pool.join()  
     
ModifyDict_1()   #修改全局变量g_List
     
if  __name__  = =  '__main__' :
     ModifyDict_2()   #修改全局变量g_List
     
     print ( 'In main g_List :'  +  repr (g_List))
     
     ProcAll()

Windows7 下运行的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
λ python3 demo.py
In main g_List :['a', 'b', 'c']
ProcOne 1, g_List:['a', 'b']
ProcOne 2, g_List:['a', 'b']
ProcOne 3, g_List:['a', 'b']
ProcOne 4, g_List:['a', 'b']
ProcOne 5, g_List:['a', 'b']
ProcOne 6, g_List:['a', 'b']
ProcOne 7, g_List:['a', 'b']
ProcOne 8, g_List:['a', 'b']
ProcOne 9, g_List:['a', 'b']
ProcOne 10, g_List:['a', 'b']
ProcOne 11, g_List:['a', 'b']
ProcOne 12, g_List:['a', 'b']
ProcOne 13, g_List:['a', 'b']
ProcOne 14, g_List:['a', 'b']
ProcOne 15, g_List:['a', 'b']
ProcOne 16, g_List:['a', 'b']
ProcOne 17, g_List:['a', 'b']
ProcOne 18, g_List:['a', 'b']
ProcOne 19, g_List:['a', 'b']

Ubuntu 14.04下运行的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
In main g_List :['a', 'b', 'c']
ProcOne 1, g_List:['a', 'b', 'c']
ProcOne 2, g_List:['a', 'b', 'c']
ProcOne 3, g_List:['a', 'b', 'c']
ProcOne 5, g_List:['a', 'b', 'c']
ProcOne 4, g_List:['a', 'b', 'c']
ProcOne 8, g_List:['a', 'b', 'c']
ProcOne 9, g_List:['a', 'b', 'c']
ProcOne 7, g_List:['a', 'b', 'c']
ProcOne 11, g_List:['a', 'b', 'c']
ProcOne 6, g_List:['a', 'b', 'c']
ProcOne 12, g_List:['a', 'b', 'c']
ProcOne 13, g_List:['a', 'b', 'c']
ProcOne 10, g_List:['a', 'b', 'c']
ProcOne 14, g_List:['a', 'b', 'c']
ProcOne 15, g_List:['a', 'b', 'c']
ProcOne 16, g_List:['a', 'b', 'c']
ProcOne 17, g_List:['a', 'b', 'c']
ProcOne 18, g_List:['a', 'b', 'c']
ProcOne 19, g_List:['a', 'b', 'c']

  可以看见Windows7下第二次修改没有成功,而Ubuntu下修改成功了。据uliweb作者limodou讲,原因是Windows下是充重启实现的子进程;Linux下是fork实现的。


相关阅读:

0、官方多进程文档

1、Python 并行任务技巧

2、python中的多进程处理

3、python的threading和multiprocessing模块

4、python下使用ctypes获取threading线程id 


*** walker * 2014-05-21 ***

本文转自walker snapshot博客51CTO博客,原文链接http://blog.51cto.com/walkerqt/1414703如需转载请自行联系原作者


RQSLT

相关文章
|
6月前
|
机器学习/深度学习 数据采集 并行计算
多步预测系列 | LSTM、CNN、Transformer、TCN、串行、并行模型集合研究(Python代码实现)
多步预测系列 | LSTM、CNN、Transformer、TCN、串行、并行模型集合研究(Python代码实现)
671 2
|
6月前
|
缓存 监控 供应链
唯品会自定义 API 自定义操作深度分析及 Python 实现
唯品会开放平台提供丰富API,支持商品查询、订单管理、促销活动等电商全流程操作。基于OAuth 2.0认证机制,具备安全稳定的特点。通过组合调用基础接口,可实现数据聚合、流程自动化、监控预警及跨平台集成,广泛应用于供应链管理、数据分析和智能采购等领域。结合Python实现方案,可高效完成商品搜索、订单分析、库存监控等功能,提升电商运营效率。
|
6月前
|
缓存 监控 供应链
京东自定义 API 操作深度分析及 Python 实现
京东开放平台提供丰富API接口,支持商品、订单、库存等电商全链路场景。通过自定义API组合调用,可实现店铺管理、数据分析、竞品监控等功能,提升运营效率。本文详解其架构、Python实现与应用策略。
缓存 监控 供应链
147 0
缓存 监控 数据挖掘
130 0
|
6月前
|
算法 安全 新能源
基于DistFlow的含分布式电源配电网优化模型【IEEE39节点】(Python代码实现)
基于DistFlow的含分布式电源配电网优化模型【IEEE39节点】(Python代码实现)
473 0
|
7月前
|
机器学习/深度学习 数据采集 数据挖掘
基于 GARCH -LSTM 模型的混合方法进行时间序列预测研究(Python代码实现)
基于 GARCH -LSTM 模型的混合方法进行时间序列预测研究(Python代码实现)
252 2
|
7月前
|
机器学习/深度学习 算法 调度
【切负荷】计及切负荷和直流潮流(DC-OPF)风-火-储经济调度模型研究【IEEE24节点】(Python代码实现)
【切负荷】计及切负荷和直流潮流(DC-OPF)风-火-储经济调度模型研究【IEEE24节点】(Python代码实现)
314 0
|
8月前
|
监控 编译器 Python
如何利用Python杀进程并保持驻留后台检测
本教程介绍如何使用Python编写进程监控与杀进程脚本,结合psutil库实现后台驻留、定时检测并强制终止指定进程。内容涵盖基础杀进程、多进程处理、自动退出机制、管理员权限启动及图形界面设计,并提供将脚本打包为exe的方法,适用于需持续清理顽固进程的场景。
|
9月前
|
存储 机器学习/深度学习 人工智能
稀疏矩阵存储模型比较与在Python中的实现方法探讨
本文探讨了稀疏矩阵的压缩存储模型及其在Python中的实现方法,涵盖COO、CSR、CSC等常见格式。通过`scipy.sparse`等工具,分析了稀疏矩阵在高效运算中的应用,如矩阵乘法和图结构分析。文章还结合实际场景(推荐系统、自然语言处理等),提供了优化建议及性能评估,并展望了稀疏计算与AI硬件协同的未来趋势。掌握稀疏矩阵技术,可显著提升大规模数据处理效率,为工程实践带来重要价值。
425 58

推荐镜像

更多