一前言
使用python进行并发处理多台机器/多个实例的时候,我们可以使用threading ,但是由于著名的 GIL存在,实际上threading 并未提供真正有效的并发处理,要充分利用到多核CPU,我们需要使用多进程。Python提供了非常好用的多进程包-- multiprocessing。multiprocessing 可以利用multiprocessing.Process对象来创建一个进程,该Process对象与Threading对象的用法基本相同,具有相同的方法(官方原话:" The multiprocessing package mostly replicates the API of the threading module.") 比如:start(),run(),join()的方法。multiprocessing包中也有Lock/Event/Semaphore/Condition/Pipe/Queue类用于进程之间的通信。话不多说 show me the code!
二使用
2.1 初识异同
下面的程序显示threading和multiprocessing的在使用方面的异同,相近的函数join(),start(),append() 等,并做同一件事情打印自己的进程pid
输出结果
从例子的结果可以看出多线程threading的进程id和主进程(父进程)pid一样 ,同为9524; 多进程打印的pid每个都不一样,for循环中每创建一个process对象都年开一个进程。其他相关的方法基本类似。
2.2 用法
创建进程的类:
Process([group [, target [, name [, args [, kwargs]]]]]),
target表示调用对象,
args表示调用对象的位置参数元组。
kwargs表示调用对象的字典。
name为进程的别名。
group实质上不使用,为None。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程,并自动调用run方法.
属性:authkey、daemon(要通过start()设置,必须设置在方法start之前)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。
2.3 创建单进程
单线程比较简单,创建一个 Process的实例对象就好,传入参数 target 为已经定义好的方法worker以及worker需要的参数
2.4 创建多进程
输出
2.5 线程池
multiprocessing提供进程池的类--Pool,它可以指定程序最大可以调用的进程数量,当有新的请求提交到pool中时,如果进程池还没有满,那么就会创建一个新的进程用来执行该请求;但如果进程池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。
构造方法:
Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
processes : 使用的工作进程的数量,如果processes是None,默认使用os.cpu_count()返回的数量。
initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
context: 用在制定工作进程启动时的上下文,一般使用multiprocessing.Pool()或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。
实例方法:
apply(func[, args[, kwds]]):同步进程池
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) :异步进程池
close() : 关闭进程池,阻止更多的任务提交到pool,待任务完成后,工作进程会退出。
terminate() : 结束工作进程,不在处理未完成的任务.
join() : 等待工作线程的退出,在调用join()前必须调用close()或者 terminate(),因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。
输出结果
解释:创建一个进程池pool 对象proc_pool,并设定进程的数量为2,xrange(4)会相继产生四个对象[0, 1, 2, 4],四个对象被提交到pool中,因pool指定进程数为2,所以0、1会直接送到进程中执行,当其中的2个任务执行完之后才空出2进程处理对象2和3,所以会出现输出
worker 2 worker 3 出现在end worker 0 end worker 1之后。思考一下如果调用
proc_pool.apply(worker, (i,)) 的输出结果会是什么样的?
2.6 使用queue
multiprocessing提供队列类,可以通过调用multiprocessing.Queue(maxsize) 初始化队列对象,maxsize表示队列里面最多的元素个数。
例子 创建了两个函数入队,出队,出队处理时使用了lock特性,串行化取数据。
2.7 使用pipe
Pipe可以是单向(half-duplex),也可以是双向(duplex)。我们通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。一个进程从PIPE一端输入对象,然后被PIPE另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。
用法 multiprocessing.Pipe([duplex])
该类返回一组对象实例(conn1, conn2),分别代表发送和接受消息的两端。
输出:
该例子中 p1 p2 通过pipe 给彼此相互发送信息,p1 发送"parent_conn" 给 p2 ,p2 发送"child_conn" 给p1.
2.8 daemon程序对比结果
输出:
设置 daemon = True,程序随着主程序结束而不等待子进程。
输出:
end!
因为子进程设置了daemon属性,主进程结束,multiprocessing创建的进程对象就随着结束了。
输出:
2.9 Lock()
当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。
实例方法:
acquire([timeout]): 使线程进入同步阻塞状态,尝试获得锁定。
release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。
例子:
多个进程使用同一个std_out ,使用lock机制确保同一个时刻有一个一个进程获取输出。
输出:
三 小结
本文参考官方资料以及其他资源,对multiprocesssing 的使用方式做了总结,还有很多知识需要详细阅读官方文档。纸上来得终觉浅,绝知此事要躬行。参考资料
[1] 官方文档
[2] Python标准库10 多进程初步 (multiprocessing包)
使用python进行并发处理多台机器/多个实例的时候,我们可以使用threading ,但是由于著名的 GIL存在,实际上threading 并未提供真正有效的并发处理,要充分利用到多核CPU,我们需要使用多进程。Python提供了非常好用的多进程包-- multiprocessing。multiprocessing 可以利用multiprocessing.Process对象来创建一个进程,该Process对象与Threading对象的用法基本相同,具有相同的方法(官方原话:" The multiprocessing package mostly replicates the API of the threading module.") 比如:start(),run(),join()的方法。multiprocessing包中也有Lock/Event/Semaphore/Condition/Pipe/Queue类用于进程之间的通信。话不多说 show me the code!
二使用
2.1 初识异同
下面的程序显示threading和multiprocessing的在使用方面的异同,相近的函数join(),start(),append() 等,并做同一件事情打印自己的进程pid
- #!/usr/bin/env python
- # encoding: utf-8
- import os
- import threading
- import multiprocessing
- def printer(msg):
- print(msg, os.getpid())
- print('Main begin:', os.getpid())
- # threading
- record = []
- for i in range(5):
- thread = threading.Thread(target=printer, args=('threading',))
- thread.start()
- record.append(thread)
- for thread in record:
- thread.join()
- # multi-process
- record = []
- for i in range(5):
- process = multiprocessing.Process(target=printer, args=('multiprocessing',))
- process.start()
- record.append(process)
- for process in record:
- process.join()
- print('Main end:', os.getpid())
点击(此处)折叠或打开
- Main begin: 9524
- threading 9524
- threading 9524
- threading 9524
- threading 9524
- threading 9524
- multiprocessing 9539
- multiprocessing 9540
- multiprocessing 9541
- multiprocessing 9542
- multiprocessing 9543
- Main end: 9524
2.2 用法
创建进程的类:
Process([group [, target [, name [, args [, kwargs]]]]]),
target表示调用对象,
args表示调用对象的位置参数元组。
kwargs表示调用对象的字典。
name为进程的别名。
group实质上不使用,为None。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程,并自动调用run方法.
属性:authkey、daemon(要通过start()设置,必须设置在方法start之前)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。
2.3 创建单进程
单线程比较简单,创建一个 Process的实例对象就好,传入参数 target 为已经定义好的方法worker以及worker需要的参数
- #!/usr/bin/env python
- # encoding: utf-8
- """
- author: yangyi@youzan.com
- time: 2017/7/2 下午6:45
- func:
- """
- import multiprocessing
- import datetime, time
- def worker(interval):
- print("process start: {0}".format(datetime.datetime.today()));
- time.sleep(interval)
- print("process end: {0}".format(datetime.datetime.today()));
-
- if __name__ == "__main__":
- p = multiprocessing.Process(target=worker, args=(5,))
- p.start()
- p.join()
- print "end!"
- #!/usr/bin/env python
- # encoding: utf-8
- """
- author: yangyi@youzan.com
- time: 2017/7/2 下午7:50
- func:
- """
- import multiprocessing
- def worker(num):
- print "worker %d" %num
-
-
- if __name__ == "__main__":
- print("The number of CPU is:" + str(multiprocessing.cpu_count()))
- proc = []
- for i in xrange(5):
- p = multiprocessing.Process(target=worker, args=(i,))
- proc.append(p)
- for p in proc:
- p.start()
- for p in proc:
- p.join()
- print "end ..."
点击(此处)折叠或打开
- The number of CPU is:4
- worker 0
- worker 1
- worker 2
- worker 3
- worker 4
- main process end ...
multiprocessing提供进程池的类--Pool,它可以指定程序最大可以调用的进程数量,当有新的请求提交到pool中时,如果进程池还没有满,那么就会创建一个新的进程用来执行该请求;但如果进程池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。
构造方法:
Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
processes : 使用的工作进程的数量,如果processes是None,默认使用os.cpu_count()返回的数量。
initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
context: 用在制定工作进程启动时的上下文,一般使用multiprocessing.Pool()或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。
实例方法:
apply(func[, args[, kwds]]):同步进程池
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) :异步进程池
close() : 关闭进程池,阻止更多的任务提交到pool,待任务完成后,工作进程会退出。
terminate() : 结束工作进程,不在处理未完成的任务.
join() : 等待工作线程的退出,在调用join()前必须调用close()或者 terminate(),因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。
- #!/usr/bin/env python
- # encoding: utf-8
- """
- author: yangyi@youzan.com
- time: 2017/7/2 下午7:50
- func:
- """
- from multiprocessing import Pool
- import time
- def worker(num):
- print "worker %d" %num
- time.sleep(2)
- print "end worker %d" %num
-
- if __name__ == "__main__":
- proc_pool = Pool(2)
- for i in xrange(4):
- proc_pool.apply_async(worker, (i,)) #使用了异步调用,从输出结果可以看出来
-
- proc_pool.close()
- proc_pool.join()
- print "main process end ..."
点击(此处)折叠或打开
- worker 0
- worker 1
- end worker 0
- end worker 1
- worker 2
- worker 3
- end worker 2
- end worker 3
- main process end ..
2.6 使用queue
multiprocessing提供队列类,可以通过调用multiprocessing.Queue(maxsize) 初始化队列对象,maxsize表示队列里面最多的元素个数。
例子 创建了两个函数入队,出队,出队处理时使用了lock特性,串行化取数据。
- #!/usr/bin/env python
- # encoding: utf-8
- """
- author: yangyi@youzan.com
- time: 2017/7/2 下午9:03
- func:
- """
- import time
- from multiprocessing import Process, current_process,Lock,Queue
- import datetime
- def inputQ(queue):
- time.sleep(1)
- info = "proc_name: " + current_process().name + ' was putted in queue at: ' + str(datetime.datetime.today())
- queue.put(info)
- def outputQ(queue,lock):
- info = queue.get()
- lock.acquire()
- print ("proc_name: " + current_process().name + ' gets info :' + info)
- lock.release()
- if __name__ == '__main__':
- record1 = [] # store input processes
- record2 = [] # store output processes
- lock = Lock() # To prevent messy print
- queue = Queue(3)
- for i in range(10):
- process = Process(target=inputQ, args=(queue,))
- process.start()
- record1.append(process)
- for i in range(10):
- process = Process(target=outputQ, args=(queue,lock))
- process.start()
- record2.append(process)
- for p in record1:
- p.join()
- queue.close() # No more object will come, close the queue
- for p in record2:
- p.join()
Pipe可以是单向(half-duplex),也可以是双向(duplex)。我们通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。一个进程从PIPE一端输入对象,然后被PIPE另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。
用法 multiprocessing.Pipe([duplex])
该类返回一组对象实例(conn1, conn2),分别代表发送和接受消息的两端。
- #!/usr/bin/env python
- # encoding: utf-8
- """
- author: yangyi@youzan.com
- time: 2017/7/2 下午8:01
- func:
- """
- from multiprocessing import Process, Pipe
- def p1(conn, name):
- conn.send('hello ,{name}'.format(name=name))
- print "p1 receive :", conn.recv()
- conn.close()
-
- def p2(conn, name):
- conn.send('hello ,{name}'.format(name=name))
- print "p2 receive :", conn.recv()
- conn.close()
-
- if __name__ == '__main__':
- parent_conn, child_conn = Pipe()
- proc1 = Process(target=p1, args=(child_conn, "parent_conn"))
- proc2 = Process(target=p2, args=(parent_conn, "child_conn"))
- proc1.start()
- proc2.start()
- proc1.join()
- proc2.join()
点击(此处)折叠或打开
- p1 receive : hello ,child_conn
- p2 receive : hello ,parent_conn
2.8 daemon程序对比结果
- import multiprocessing
- import datetime, time
- def worker(interval):
- print("process start: {0}".format(datetime.datetime.today()));
- time.sleep(interval)
- print("process end: {0}".format(datetime.datetime.today()));
- if __name__ == "__main__":
- p = multiprocessing.Process(target=worker, args=(5,))
- p.start()
- print "end!"
点击(此处)折叠或打开
- end!
- process start: 2017-07-02 18:47:30.656244
- process end: 2017-07-02 18:47:35.657464
设置 daemon = True,程序随着主程序结束而不等待子进程。
- import multiprocessing
- import datetime, time
- def worker(interval):
- print("process start: {0}".format(datetime.datetime.today()));
- time.sleep(interval)
- print("process end: {0}".format(datetime.datetime.today()));
- if __name__ == "__main__":
- p = multiprocessing.Process(target=worker, args=(5,))
- p.daemon = True
- p.start()
- print "end!"
end!
因为子进程设置了daemon属性,主进程结束,multiprocessing创建的进程对象就随着结束了。
- import multiprocessing
- import datetime, time
- def worker(interval):
- print("process start: {0}".format(datetime.datetime.today()));
- time.sleep(interval)
- print("process end: {0}".format(datetime.datetime.today()));
- if __name__ == "__main__":
- p = multiprocessing.Process(target=worker, args=(5,))
- p.daemon = True #
- p.start()
- p.join() #进程执行完毕后再关闭
- print "end!"
点击(此处)折叠或打开
- process start: 2017-07-02 18:48:20.953754
- process end: 2017-07-02 18:48:25.954736
-
当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。
实例方法:
acquire([timeout]): 使线程进入同步阻塞状态,尝试获得锁定。
release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。
例子:
多个进程使用同一个std_out ,使用lock机制确保同一个时刻有一个一个进程获取输出。
- #!/usr/bin/env python
# encoding: utf-8
"""
author: yangyi@youzan.com
time: 2017/7/2 下午9:28
func:
"""
from multiprocessing import Process, Lock
def func_with_lock(l, i):
l.acquire()
print 'hello world', i
l.release()
def func_without_lock(i):
print 'hello world', i
if __name__ == '__main__':
lock = Lock()
print "func_with_lock :"
for num in range(10):
Process(target=func_with_lock, args=(lock, num)).start()
点击(此处)折叠或打开
- func_with_lock :
- hello world 0
- hello world 1
- hello world 2
- hello world 3
- hello world 4
- hello world 5
- hello world 6
- hello world 7
- hello world 8
- hello world 9
三 小结
本文参考官方资料以及其他资源,对multiprocesssing 的使用方式做了总结,还有很多知识需要详细阅读官方文档。纸上来得终觉浅,绝知此事要躬行。参考资料
[1] 官方文档
[2] Python标准库10 多进程初步 (multiprocessing包)