Python自定义进程池(生产者/消费者模型)

简介:

代码说明一切:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#encoding=utf-8
#author: walker
#date: 2014-05-21
#summary: 自定义进程池遍历目录下文件
 
from  multiprocessing  import  Process, Queue, Lock
import  time, os
 
#消费者
class  Consumer(Process):
     def  __init__( self , queue, ioLock):
         super (Consumer,  self ).__init__()
         self .queue  =  queue
         self .ioLock  =  ioLock
         
     def  run( self ):
         while  True :
             task  =  self .queue.get()    #队列中无任务时,会阻塞进程
             if  isinstance (task,  str and  task  = =  'quit' :
                 break ;
             time.sleep( 1 )    #假定任务处理需要1秒钟
             self .ioLock.acquire()
             print str (os.getpid())  +  '  '  +  task)
             self .ioLock.release()
         self .ioLock.acquire()
         print  'Bye-bye'
         self .ioLock.release()
 
#生产者       
def  Producer():
     queue  =  Queue()     #这个队列是进程/线程安全的
     ioLock  =  Lock()
     subNum  =  4     #子进程数量
     workers  =  build_worker_pool(queue, ioLock, subNum)
     start_time  =  time.time()
     
     for  parent, dirnames, filenames  in  os.walk(r 'D:\test' ):
         for  filename  in  filenames:
             queue.put(filename)
             ioLock.acquire()   
             print ( 'qsize:'  +  str (queue.qsize()))
             ioLock.release()
             while  queue.qsize() > subNum  *  10 :   #控制队列中任务数量
                 time.sleep( 1 )
             
     for  worker  in  workers:
         queue.put( 'quit' )
         
     for  worker  in  workers:
         worker.join()
     
     ioLock.acquire()   
     print ( 'Done! Time taken: {}' . format (time.time()  -  start_time))
     ioLock.release()
 
#创建进程池
def  build_worker_pool(queue, ioLock, size):
     workers  =  []
     for  in  range (size):
         worker  =  Consumer(queue, ioLock)
         worker.start()
         workers.append(worker)
     return  workers
     
if  __name__  = =  '__main__' :
     Producer()

ps:

1
2
3
self .ioLock.acquire()  
...
self .ioLock.release()

可用

1
2
with  self .ioLock:
     ...

替代。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#encoding=utf-8
#author: walker
#date: 2017-03-14
#summary: 一个子进程生产,一个子进程消费
  
import  os, sys, time
from  multiprocessing  import  Process, Pool, Queue, Manager
 
#生产
def  Produce(q):
     print ( 'Produce %d ...'  %  os.getpid())
     for  in  range ( 1 20 ):
         while  q.full():
             print ( 'sleep %d/%d ...'  %  (i, q.qsize()))
             time.sleep( 1 )
         q.put(i)
         
     q.put( 0 )         #用0通知结束
 
#消费
def  Consume(q):
     print ( 'Consume %d ...'  %  os.getpid())
     while  True :
         num  =  q.get()
         if  0  = =  num:  #收到结束信号
             print ( 'receive 0' )
             break
         print ( 'Consumer '  +  str (num))
         time.sleep( 2 )
         print ( 'Consumer end '  +  str (num))
  
if  __name__  = =  '__main__'
     =  Queue( 10 )              #可用
     =  Manager().Queue( 10 )        #可用
     
     print (os.getpid())
     
     producerProcess  =  Process(target = Produce, args = (q,))      #生产进程
     consumerProcess  =  Process(target = Consume, args = (q,))      #消费进程
     
     producerProcess.start()
     consumerProcess.start()
     
     producerProcess.join()
     consumerProcess.join()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#encoding=utf-8
#author: walker
#date: 2017-03-14
#summary: 一个子进程生产,进程池消费
  
import  os, sys, time
from  multiprocessing  import  Process, Pool, Queue, Manager
 
#生产
def  Produce(q, poolSize):
     print ( 'Produce ...' )
     for  in  range ( 1 100 ):
         while  q.full():
             print ( 'sleep %d/%d ...'  %  (i, q.qsize()))
             time.sleep( 1 )
         q.put(i)
         
     for  in  range ( 0 , poolSize):
         q.put( 0 )         #用0通知结束
 
#消费
def  Consume(q):
     print ( 'Consume ...' )
     while  True :
         num  =  q.get()
         if  0  = =  num:  #收到结束信号
             print ( 'receive 0' )
             break
         print ( 'Consumer '  +  str (num))
         time.sleep( 2 )
         print ( 'Consumer end '  +  str (num))
  
if  __name__  = =  '__main__'
     #q = Queue(10)                #不可用
     =  Manager().Queue( 10 )        #可用
     
     poolSize  =  4
     producerProcess  =  Process(target = Produce, args = (q, poolSize))        #生产进程
     consumerPool  =  Pool(processes = poolSize)    #消费进程池,默认子进程个数为os.cpu_count()
     for  in  range ( 0 , poolSize):
         consumerPool.apply_async(func = Consume, args = (q,))
     
     producerProcess.start()
     consumerPool.close()
     
     producerProcess.join()
     consumerPool.join()
    

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#encoding=utf-8
#author: walker
#date: 2017-03-14
#summary: 主进程生产,进程池消费
  
import  os, sys, time
from  multiprocessing  import  Process, Pool, Queue, Manager
 
#消费
def  Consume(q):
     print ( 'Consume ...' )
     num  =  q.get()
     print ( 'Consume %d ...'  %  num)
     time.sleep( 2 )
     print ( 'Consumer %d over'  %  num)
      
if  __name__  = =  '__main__' :    
     #q = Queue(10)                #不可用
     =  Manager().Queue( 10 )        #可用
     
     pool  =  Pool(processes  =  4 )
     for  in  range ( 1 100 ):      #生产
         while  q.full():
             print ( 'sleep %d ...'  %  q.qsize())
             time.sleep( 1 )
         q.put(i)
         print (i)
         pool.apply_async(Consume, (q,))
          
     pool.close()
     pool.join()

*** Updated 2016-01-06 ***

一个好玩的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#encoding=utf-8
#author: walker
#date: 2016-01-06
#summary: 一个多进程的好玩例子
 
import  os, sys, time
from  multiprocessing  import  Pool
 
cur_dir_fullpath  =  os.path.dirname(os.path.abspath(__file__))
 
g_List  =  [ 'a' ]
 
#修改全局变量g_List
def  ModifyDict_1():
     global  g_List
     g_List.append( 'b' )
 
#修改全局变量g_List    
def  ModifyDict_2():
     global  g_List
     g_List.append( 'c' )
  
#处理一个
def  ProcOne(num):
     print ( 'ProcOne '  +  str (num)  +  ', g_List:'  +  repr (g_List))
 
#处理所有
def  ProcAll():    
     pool  =  Pool(processes  =  4 )
     for  in  range ( 1 20 ):
         #ProcOne(i)
         #pool.apply(ProcOne, (i,))
         pool.apply_async(ProcOne, (i,))
         
     pool.close()
     pool.join()  
     
ModifyDict_1()   #修改全局变量g_List
     
if  __name__  = =  '__main__' :
     ModifyDict_2()   #修改全局变量g_List
     
     print ( 'In main g_List :'  +  repr (g_List))
     
     ProcAll()

Windows7 下运行的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
λ python3 demo.py
In main g_List :['a', 'b', 'c']
ProcOne 1, g_List:['a', 'b']
ProcOne 2, g_List:['a', 'b']
ProcOne 3, g_List:['a', 'b']
ProcOne 4, g_List:['a', 'b']
ProcOne 5, g_List:['a', 'b']
ProcOne 6, g_List:['a', 'b']
ProcOne 7, g_List:['a', 'b']
ProcOne 8, g_List:['a', 'b']
ProcOne 9, g_List:['a', 'b']
ProcOne 10, g_List:['a', 'b']
ProcOne 11, g_List:['a', 'b']
ProcOne 12, g_List:['a', 'b']
ProcOne 13, g_List:['a', 'b']
ProcOne 14, g_List:['a', 'b']
ProcOne 15, g_List:['a', 'b']
ProcOne 16, g_List:['a', 'b']
ProcOne 17, g_List:['a', 'b']
ProcOne 18, g_List:['a', 'b']
ProcOne 19, g_List:['a', 'b']

Ubuntu 14.04下运行的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
In main g_List :['a', 'b', 'c']
ProcOne 1, g_List:['a', 'b', 'c']
ProcOne 2, g_List:['a', 'b', 'c']
ProcOne 3, g_List:['a', 'b', 'c']
ProcOne 5, g_List:['a', 'b', 'c']
ProcOne 4, g_List:['a', 'b', 'c']
ProcOne 8, g_List:['a', 'b', 'c']
ProcOne 9, g_List:['a', 'b', 'c']
ProcOne 7, g_List:['a', 'b', 'c']
ProcOne 11, g_List:['a', 'b', 'c']
ProcOne 6, g_List:['a', 'b', 'c']
ProcOne 12, g_List:['a', 'b', 'c']
ProcOne 13, g_List:['a', 'b', 'c']
ProcOne 10, g_List:['a', 'b', 'c']
ProcOne 14, g_List:['a', 'b', 'c']
ProcOne 15, g_List:['a', 'b', 'c']
ProcOne 16, g_List:['a', 'b', 'c']
ProcOne 17, g_List:['a', 'b', 'c']
ProcOne 18, g_List:['a', 'b', 'c']
ProcOne 19, g_List:['a', 'b', 'c']

  可以看见Windows7下第二次修改没有成功,而Ubuntu下修改成功了。据uliweb作者limodou讲,原因是Windows下是充重启实现的子进程;Linux下是fork实现的。


相关阅读:

0、官方多进程文档

1、Python 并行任务技巧

2、python中的多进程处理

3、python的threading和multiprocessing模块

4、python下使用ctypes获取threading线程id 


*** walker * 2014-05-21 ***

本文转自walker snapshot博客51CTO博客,原文链接http://blog.51cto.com/walkerqt/1414703如需转载请自行联系原作者


RQSLT

相关文章
|
23天前
|
机器学习/深度学习 数据可视化 TensorFlow
使用Python实现深度学习模型的分布式训练
使用Python实现深度学习模型的分布式训练
165 73
|
26天前
|
机器学习/深度学习 数据采集 供应链
使用Python实现智能食品消费需求分析的深度学习模型
使用Python实现智能食品消费需求分析的深度学习模型
79 21
|
28天前
|
机器学习/深度学习 数据采集 搜索推荐
使用Python实现智能食品消费偏好预测的深度学习模型
使用Python实现智能食品消费偏好预测的深度学习模型
75 23
|
29天前
|
机器学习/深度学习 数据采集 数据挖掘
使用Python实现智能食品消费习惯预测的深度学习模型
使用Python实现智能食品消费习惯预测的深度学习模型
108 19
|
30天前
|
机器学习/深度学习 数据采集 数据挖掘
使用Python实现智能食品消费趋势分析的深度学习模型
使用Python实现智能食品消费趋势分析的深度学习模型
116 18
|
30天前
|
机器学习/深度学习 数据采集 搜索推荐
使用Python实现深度学习模型:智能食品消费行为预测
使用Python实现深度学习模型:智能食品消费行为预测
70 8
|
27天前
|
机器学习/深度学习 数据采集 数据挖掘
使用Python实现智能食品消费模式预测的深度学习模型
使用Python实现智能食品消费模式预测的深度学习模型
56 2
|
Java Apache Scala
如何在 PyFlink 1.10 中自定义 Python UDF?
本篇从架构到 UDF 接口定义,再到具体的实例,向大家介绍了在 Apache Flink 1.10 发布之后,如何利用 PyFlink 进行业务开发
如何在 PyFlink 1.10 中自定义 Python UDF?
|
1月前
|
人工智能 数据可视化 数据挖掘
探索Python编程:从基础到高级
在这篇文章中,我们将一起深入探索Python编程的世界。无论你是初学者还是有经验的程序员,都可以从中获得新的知识和技能。我们将从Python的基础语法开始,然后逐步过渡到更复杂的主题,如面向对象编程、异常处理和模块使用。最后,我们将通过一些实际的代码示例,来展示如何应用这些知识解决实际问题。让我们一起开启Python编程的旅程吧!
|
1月前
|
存储 数据采集 人工智能
Python编程入门:从零基础到实战应用
本文是一篇面向初学者的Python编程教程,旨在帮助读者从零开始学习Python编程语言。文章首先介绍了Python的基本概念和特点,然后通过一个简单的例子展示了如何编写Python代码。接下来,文章详细介绍了Python的数据类型、变量、运算符、控制结构、函数等基本语法知识。最后,文章通过一个实战项目——制作一个简单的计算器程序,帮助读者巩固所学知识并提高编程技能。