代码说明一切:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
#encoding=utf-8
#author: walker
#date: 2014-05-21
#summary: 自定义进程池遍历目录下文件
from
multiprocessing
import
Process, Queue, Lock
import
time, os
#消费者
class
Consumer(Process):
def
__init__(
self
, queue, ioLock):
super
(Consumer,
self
).__init__()
self
.queue
=
queue
self
.ioLock
=
ioLock
def
run(
self
):
while
True
:
task
=
self
.queue.get()
#队列中无任务时,会阻塞进程
if
isinstance
(task,
str
)
and
task
=
=
'quit'
:
break
;
time.sleep(
1
)
#假定任务处理需要1秒钟
self
.ioLock.acquire()
print
(
str
(os.getpid())
+
' '
+
task)
self
.ioLock.release()
self
.ioLock.acquire()
print
'Bye-bye'
self
.ioLock.release()
#生产者
def
Producer():
queue
=
Queue()
#这个队列是进程/线程安全的
ioLock
=
Lock()
subNum
=
4
#子进程数量
workers
=
build_worker_pool(queue, ioLock, subNum)
start_time
=
time.time()
for
parent, dirnames, filenames
in
os.walk(r
'D:\test'
):
for
filename
in
filenames:
queue.put(filename)
ioLock.acquire()
print
(
'qsize:'
+
str
(queue.qsize()))
ioLock.release()
while
queue.qsize() > subNum
*
10
:
#控制队列中任务数量
time.sleep(
1
)
for
worker
in
workers:
queue.put(
'quit'
)
for
worker
in
workers:
worker.join()
ioLock.acquire()
print
(
'Done! Time taken: {}'
.
format
(time.time()
-
start_time))
ioLock.release()
#创建进程池
def
build_worker_pool(queue, ioLock, size):
workers
=
[]
for
_
in
range
(size):
worker
=
Consumer(queue, ioLock)
worker.start()
workers.append(worker)
return
workers
if
__name__
=
=
'__main__'
:
Producer()
|
ps:
1
2
3
|
self
.ioLock.acquire()
...
self
.ioLock.release()
|
可用
1
2
|
with
self
.ioLock:
...
|
替代。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
#encoding=utf-8
#author: walker
#date: 2017-03-14
#summary: 一个子进程生产,一个子进程消费
import
os, sys, time
from
multiprocessing
import
Process, Pool, Queue, Manager
#生产
def
Produce(q):
print
(
'Produce %d ...'
%
os.getpid())
for
i
in
range
(
1
,
20
):
while
q.full():
print
(
'sleep %d/%d ...'
%
(i, q.qsize()))
time.sleep(
1
)
q.put(i)
q.put(
0
)
#用0通知结束
#消费
def
Consume(q):
print
(
'Consume %d ...'
%
os.getpid())
while
True
:
num
=
q.get()
if
0
=
=
num:
#收到结束信号
print
(
'receive 0'
)
break
print
(
'Consumer '
+
str
(num))
time.sleep(
2
)
print
(
'Consumer end '
+
str
(num))
if
__name__
=
=
'__main__'
:
q
=
Queue(
10
)
#可用
q
=
Manager().Queue(
10
)
#可用
print
(os.getpid())
producerProcess
=
Process(target
=
Produce, args
=
(q,))
#生产进程
consumerProcess
=
Process(target
=
Consume, args
=
(q,))
#消费进程
producerProcess.start()
consumerProcess.start()
producerProcess.join()
consumerProcess.join()
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
#encoding=utf-8
#author: walker
#date: 2017-03-14
#summary: 一个子进程生产,进程池消费
import
os, sys, time
from
multiprocessing
import
Process, Pool, Queue, Manager
#生产
def
Produce(q, poolSize):
print
(
'Produce ...'
)
for
i
in
range
(
1
,
100
):
while
q.full():
print
(
'sleep %d/%d ...'
%
(i, q.qsize()))
time.sleep(
1
)
q.put(i)
for
_
in
range
(
0
, poolSize):
q.put(
0
)
#用0通知结束
#消费
def
Consume(q):
print
(
'Consume ...'
)
while
True
:
num
=
q.get()
if
0
=
=
num:
#收到结束信号
print
(
'receive 0'
)
break
print
(
'Consumer '
+
str
(num))
time.sleep(
2
)
print
(
'Consumer end '
+
str
(num))
if
__name__
=
=
'__main__'
:
#q = Queue(10) #不可用
q
=
Manager().Queue(
10
)
#可用
poolSize
=
4
producerProcess
=
Process(target
=
Produce, args
=
(q, poolSize))
#生产进程
consumerPool
=
Pool(processes
=
poolSize)
#消费进程池,默认子进程个数为os.cpu_count()
for
_
in
range
(
0
, poolSize):
consumerPool.apply_async(func
=
Consume, args
=
(q,))
producerProcess.start()
consumerPool.close()
producerProcess.join()
consumerPool.join()
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
#encoding=utf-8
#author: walker
#date: 2017-03-14
#summary: 主进程生产,进程池消费
import
os, sys, time
from
multiprocessing
import
Process, Pool, Queue, Manager
#消费
def
Consume(q):
print
(
'Consume ...'
)
num
=
q.get()
print
(
'Consume %d ...'
%
num)
time.sleep(
2
)
print
(
'Consumer %d over'
%
num)
if
__name__
=
=
'__main__'
:
#q = Queue(10) #不可用
q
=
Manager().Queue(
10
)
#可用
pool
=
Pool(processes
=
4
)
for
i
in
range
(
1
,
100
):
#生产
while
q.full():
print
(
'sleep %d ...'
%
q.qsize())
time.sleep(
1
)
q.put(i)
print
(i)
pool.apply_async(Consume, (q,))
pool.close()
pool.join()
|
*** Updated 2016-01-06 ***
一个好玩的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
#encoding=utf-8
#author: walker
#date: 2016-01-06
#summary: 一个多进程的好玩例子
import
os, sys, time
from
multiprocessing
import
Pool
cur_dir_fullpath
=
os.path.dirname(os.path.abspath(__file__))
g_List
=
[
'a'
]
#修改全局变量g_List
def
ModifyDict_1():
global
g_List
g_List.append(
'b'
)
#修改全局变量g_List
def
ModifyDict_2():
global
g_List
g_List.append(
'c'
)
#处理一个
def
ProcOne(num):
print
(
'ProcOne '
+
str
(num)
+
', g_List:'
+
repr
(g_List))
#处理所有
def
ProcAll():
pool
=
Pool(processes
=
4
)
for
i
in
range
(
1
,
20
):
#ProcOne(i)
#pool.apply(ProcOne, (i,))
pool.apply_async(ProcOne, (i,))
pool.close()
pool.join()
ModifyDict_1()
#修改全局变量g_List
if
__name__
=
=
'__main__'
:
ModifyDict_2()
#修改全局变量g_List
print
(
'In main g_List :'
+
repr
(g_List))
ProcAll()
|
Windows7 下运行的结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
λ python3 demo.py
In main g_List :['a', 'b', 'c']
ProcOne 1, g_List:['a', 'b']
ProcOne 2, g_List:['a', 'b']
ProcOne 3, g_List:['a', 'b']
ProcOne 4, g_List:['a', 'b']
ProcOne 5, g_List:['a', 'b']
ProcOne 6, g_List:['a', 'b']
ProcOne 7, g_List:['a', 'b']
ProcOne 8, g_List:['a', 'b']
ProcOne 9, g_List:['a', 'b']
ProcOne 10, g_List:['a', 'b']
ProcOne 11, g_List:['a', 'b']
ProcOne 12, g_List:['a', 'b']
ProcOne 13, g_List:['a', 'b']
ProcOne 14, g_List:['a', 'b']
ProcOne 15, g_List:['a', 'b']
ProcOne 16, g_List:['a', 'b']
ProcOne 17, g_List:['a', 'b']
ProcOne 18, g_List:['a', 'b']
ProcOne 19, g_List:['a', 'b']
|
Ubuntu 14.04下运行的结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
In main g_List :['a', 'b', 'c']
ProcOne 1, g_List:['a', 'b', 'c']
ProcOne 2, g_List:['a', 'b', 'c']
ProcOne 3, g_List:['a', 'b', 'c']
ProcOne 5, g_List:['a', 'b', 'c']
ProcOne 4, g_List:['a', 'b', 'c']
ProcOne 8, g_List:['a', 'b', 'c']
ProcOne 9, g_List:['a', 'b', 'c']
ProcOne 7, g_List:['a', 'b', 'c']
ProcOne 11, g_List:['a', 'b', 'c']
ProcOne 6, g_List:['a', 'b', 'c']
ProcOne 12, g_List:['a', 'b', 'c']
ProcOne 13, g_List:['a', 'b', 'c']
ProcOne 10, g_List:['a', 'b', 'c']
ProcOne 14, g_List:['a', 'b', 'c']
ProcOne 15, g_List:['a', 'b', 'c']
ProcOne 16, g_List:['a', 'b', 'c']
ProcOne 17, g_List:['a', 'b', 'c']
ProcOne 18, g_List:['a', 'b', 'c']
ProcOne 19, g_List:['a', 'b', 'c']
|
可以看见Windows7下第二次修改没有成功,而Ubuntu下修改成功了。据uliweb作者limodou讲,原因是Windows下是充重启实现的子进程;Linux下是fork实现的。
相关阅读:
0、官方多进程文档。
3、python的threading和multiprocessing模块
4、python下使用ctypes获取threading线程id
*** walker * 2014-05-21 ***
本文转自walker snapshot博客51CTO博客,原文链接http://blog.51cto.com/walkerqt/1414703如需转载请自行联系原作者
RQSLT