Python自定义进程池(生产者/消费者模型)

简介:

代码说明一切:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#encoding=utf-8
#author: walker
#date: 2014-05-21
#summary: 自定义进程池遍历目录下文件
 
from  multiprocessing  import  Process, Queue, Lock
import  time, os
 
#消费者
class  Consumer(Process):
     def  __init__( self , queue, ioLock):
         super (Consumer,  self ).__init__()
         self .queue  =  queue
         self .ioLock  =  ioLock
         
     def  run( self ):
         while  True :
             task  =  self .queue.get()    #队列中无任务时,会阻塞进程
             if  isinstance (task,  str and  task  = =  'quit' :
                 break ;
             time.sleep( 1 )    #假定任务处理需要1秒钟
             self .ioLock.acquire()
             print str (os.getpid())  +  '  '  +  task)
             self .ioLock.release()
         self .ioLock.acquire()
         print  'Bye-bye'
         self .ioLock.release()
 
#生产者       
def  Producer():
     queue  =  Queue()     #这个队列是进程/线程安全的
     ioLock  =  Lock()
     subNum  =  4     #子进程数量
     workers  =  build_worker_pool(queue, ioLock, subNum)
     start_time  =  time.time()
     
     for  parent, dirnames, filenames  in  os.walk(r 'D:\test' ):
         for  filename  in  filenames:
             queue.put(filename)
             ioLock.acquire()   
             print ( 'qsize:'  +  str (queue.qsize()))
             ioLock.release()
             while  queue.qsize() > subNum  *  10 :   #控制队列中任务数量
                 time.sleep( 1 )
             
     for  worker  in  workers:
         queue.put( 'quit' )
         
     for  worker  in  workers:
         worker.join()
     
     ioLock.acquire()   
     print ( 'Done! Time taken: {}' . format (time.time()  -  start_time))
     ioLock.release()
 
#创建进程池
def  build_worker_pool(queue, ioLock, size):
     workers  =  []
     for  in  range (size):
         worker  =  Consumer(queue, ioLock)
         worker.start()
         workers.append(worker)
     return  workers
     
if  __name__  = =  '__main__' :
     Producer()

ps:

1
2
3
self .ioLock.acquire()  
...
self .ioLock.release()

可用

1
2
with  self .ioLock:
     ...

替代。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#encoding=utf-8
#author: walker
#date: 2017-03-14
#summary: 一个子进程生产,一个子进程消费
  
import  os, sys, time
from  multiprocessing  import  Process, Pool, Queue, Manager
 
#生产
def  Produce(q):
     print ( 'Produce %d ...'  %  os.getpid())
     for  in  range ( 1 20 ):
         while  q.full():
             print ( 'sleep %d/%d ...'  %  (i, q.qsize()))
             time.sleep( 1 )
         q.put(i)
         
     q.put( 0 )         #用0通知结束
 
#消费
def  Consume(q):
     print ( 'Consume %d ...'  %  os.getpid())
     while  True :
         num  =  q.get()
         if  0  = =  num:  #收到结束信号
             print ( 'receive 0' )
             break
         print ( 'Consumer '  +  str (num))
         time.sleep( 2 )
         print ( 'Consumer end '  +  str (num))
  
if  __name__  = =  '__main__'
     =  Queue( 10 )              #可用
     =  Manager().Queue( 10 )        #可用
     
     print (os.getpid())
     
     producerProcess  =  Process(target = Produce, args = (q,))      #生产进程
     consumerProcess  =  Process(target = Consume, args = (q,))      #消费进程
     
     producerProcess.start()
     consumerProcess.start()
     
     producerProcess.join()
     consumerProcess.join()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#encoding=utf-8
#author: walker
#date: 2017-03-14
#summary: 一个子进程生产,进程池消费
  
import  os, sys, time
from  multiprocessing  import  Process, Pool, Queue, Manager
 
#生产
def  Produce(q, poolSize):
     print ( 'Produce ...' )
     for  in  range ( 1 100 ):
         while  q.full():
             print ( 'sleep %d/%d ...'  %  (i, q.qsize()))
             time.sleep( 1 )
         q.put(i)
         
     for  in  range ( 0 , poolSize):
         q.put( 0 )         #用0通知结束
 
#消费
def  Consume(q):
     print ( 'Consume ...' )
     while  True :
         num  =  q.get()
         if  0  = =  num:  #收到结束信号
             print ( 'receive 0' )
             break
         print ( 'Consumer '  +  str (num))
         time.sleep( 2 )
         print ( 'Consumer end '  +  str (num))
  
if  __name__  = =  '__main__'
     #q = Queue(10)                #不可用
     =  Manager().Queue( 10 )        #可用
     
     poolSize  =  4
     producerProcess  =  Process(target = Produce, args = (q, poolSize))        #生产进程
     consumerPool  =  Pool(processes = poolSize)    #消费进程池,默认子进程个数为os.cpu_count()
     for  in  range ( 0 , poolSize):
         consumerPool.apply_async(func = Consume, args = (q,))
     
     producerProcess.start()
     consumerPool.close()
     
     producerProcess.join()
     consumerPool.join()
    

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#encoding=utf-8
#author: walker
#date: 2017-03-14
#summary: 主进程生产,进程池消费
  
import  os, sys, time
from  multiprocessing  import  Process, Pool, Queue, Manager
 
#消费
def  Consume(q):
     print ( 'Consume ...' )
     num  =  q.get()
     print ( 'Consume %d ...'  %  num)
     time.sleep( 2 )
     print ( 'Consumer %d over'  %  num)
      
if  __name__  = =  '__main__' :    
     #q = Queue(10)                #不可用
     =  Manager().Queue( 10 )        #可用
     
     pool  =  Pool(processes  =  4 )
     for  in  range ( 1 100 ):      #生产
         while  q.full():
             print ( 'sleep %d ...'  %  q.qsize())
             time.sleep( 1 )
         q.put(i)
         print (i)
         pool.apply_async(Consume, (q,))
          
     pool.close()
     pool.join()

*** Updated 2016-01-06 ***

一个好玩的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#encoding=utf-8
#author: walker
#date: 2016-01-06
#summary: 一个多进程的好玩例子
 
import  os, sys, time
from  multiprocessing  import  Pool
 
cur_dir_fullpath  =  os.path.dirname(os.path.abspath(__file__))
 
g_List  =  [ 'a' ]
 
#修改全局变量g_List
def  ModifyDict_1():
     global  g_List
     g_List.append( 'b' )
 
#修改全局变量g_List    
def  ModifyDict_2():
     global  g_List
     g_List.append( 'c' )
  
#处理一个
def  ProcOne(num):
     print ( 'ProcOne '  +  str (num)  +  ', g_List:'  +  repr (g_List))
 
#处理所有
def  ProcAll():    
     pool  =  Pool(processes  =  4 )
     for  in  range ( 1 20 ):
         #ProcOne(i)
         #pool.apply(ProcOne, (i,))
         pool.apply_async(ProcOne, (i,))
         
     pool.close()
     pool.join()  
     
ModifyDict_1()   #修改全局变量g_List
     
if  __name__  = =  '__main__' :
     ModifyDict_2()   #修改全局变量g_List
     
     print ( 'In main g_List :'  +  repr (g_List))
     
     ProcAll()

Windows7 下运行的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
λ python3 demo.py
In main g_List :['a', 'b', 'c']
ProcOne 1, g_List:['a', 'b']
ProcOne 2, g_List:['a', 'b']
ProcOne 3, g_List:['a', 'b']
ProcOne 4, g_List:['a', 'b']
ProcOne 5, g_List:['a', 'b']
ProcOne 6, g_List:['a', 'b']
ProcOne 7, g_List:['a', 'b']
ProcOne 8, g_List:['a', 'b']
ProcOne 9, g_List:['a', 'b']
ProcOne 10, g_List:['a', 'b']
ProcOne 11, g_List:['a', 'b']
ProcOne 12, g_List:['a', 'b']
ProcOne 13, g_List:['a', 'b']
ProcOne 14, g_List:['a', 'b']
ProcOne 15, g_List:['a', 'b']
ProcOne 16, g_List:['a', 'b']
ProcOne 17, g_List:['a', 'b']
ProcOne 18, g_List:['a', 'b']
ProcOne 19, g_List:['a', 'b']

Ubuntu 14.04下运行的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
In main g_List :['a', 'b', 'c']
ProcOne 1, g_List:['a', 'b', 'c']
ProcOne 2, g_List:['a', 'b', 'c']
ProcOne 3, g_List:['a', 'b', 'c']
ProcOne 5, g_List:['a', 'b', 'c']
ProcOne 4, g_List:['a', 'b', 'c']
ProcOne 8, g_List:['a', 'b', 'c']
ProcOne 9, g_List:['a', 'b', 'c']
ProcOne 7, g_List:['a', 'b', 'c']
ProcOne 11, g_List:['a', 'b', 'c']
ProcOne 6, g_List:['a', 'b', 'c']
ProcOne 12, g_List:['a', 'b', 'c']
ProcOne 13, g_List:['a', 'b', 'c']
ProcOne 10, g_List:['a', 'b', 'c']
ProcOne 14, g_List:['a', 'b', 'c']
ProcOne 15, g_List:['a', 'b', 'c']
ProcOne 16, g_List:['a', 'b', 'c']
ProcOne 17, g_List:['a', 'b', 'c']
ProcOne 18, g_List:['a', 'b', 'c']
ProcOne 19, g_List:['a', 'b', 'c']

  可以看见Windows7下第二次修改没有成功,而Ubuntu下修改成功了。据uliweb作者limodou讲,原因是Windows下是充重启实现的子进程;Linux下是fork实现的。


相关阅读:

0、官方多进程文档

1、Python 并行任务技巧

2、python中的多进程处理

3、python的threading和multiprocessing模块

4、python下使用ctypes获取threading线程id 


*** walker * 2014-05-21 ***

本文转自walker snapshot博客51CTO博客,原文链接http://blog.51cto.com/walkerqt/1414703如需转载请自行联系原作者


RQSLT

相关文章
|
1天前
|
Java 数据库连接 数据处理
Python从入门到精通:3.1.2多线程与多进程编程
Python从入门到精通:3.1.2多线程与多进程编程
|
1天前
|
人工智能 自然语言处理 Python
使用Python实现自然语言处理模型
使用Python实现自然语言处理模型
8 1
|
2天前
|
机器学习/深度学习 算法 搜索推荐
Python用机器学习算法进行因果推断与增量、增益模型Uplift Modeling智能营销模型
Python用机器学习算法进行因果推断与增量、增益模型Uplift Modeling智能营销模型
28 12
|
2天前
|
机器学习/深度学习 算法 vr&ar
PYTHON用时变马尔可夫区制转换(MARKOV REGIME SWITCHING)自回归模型分析经济时间序列
PYTHON用时变马尔可夫区制转换(MARKOV REGIME SWITCHING)自回归模型分析经济时间序列
13 4
|
2天前
|
API vr&ar Python
Python 用ARIMA、GARCH模型预测分析股票市场收益率时间序列(上)
Python 用ARIMA、GARCH模型预测分析股票市场收益率时间序列
30 5
|
2天前
|
人工智能 Python
【AI大模型应用开发】【LangChain系列】实战案例1:用LangChain写Python代码并执行来生成答案
【AI大模型应用开发】【LangChain系列】实战案例1:用LangChain写Python代码并执行来生成答案
6 0
Python
18 0
|
5天前
|
数据可视化 Python
Python模型评估与选择:面试必备知识点
【4月更文挑战第17天】本文深入探讨了Python模型评估与选择在面试中的关键点,包括性能度量、过拟合与欠拟合识别、模型比较与选择、模型融合和偏差-方差权衡。强调了避免混淆评估指标、忽视模型验证和盲目追求高复杂度模型的常见错误,并提供相关代码示例,如交叉验证、网格搜索和超参数调优。通过理解这些概念和技巧,可在面试中展示出色的数据科学能力。
31 12
|
7天前
|
机器学习/深度学习 数据可视化 Linux
python用ARIMA模型预测CO2浓度时间序列实现
python用ARIMA模型预测CO2浓度时间序列实现
20 0
|
8天前
|
Python 数据可视化 索引
PYTHON用GARCH、离散随机波动率模型DSV模拟估计股票收益时间序列与蒙特卡洛可视化
PYTHON用GARCH、离散随机波动率模型DSV模拟估计股票收益时间序列与蒙特卡洛可视化
24 0
PYTHON用GARCH、离散随机波动率模型DSV模拟估计股票收益时间序列与蒙特卡洛可视化