【Python】浅谈 multiprocessing

简介: 一前言     使用python进行并发处理多台机器/多个实例的时候,我们可以使用threading ,但是由于著名的GIL存在,实际上threading 并未提供真正有效的并发处理,要充分利用到多核CPU,我们需要使用多进程。
一前言 
   使用python进行并发处理多台机器/多个实例的时候,我们可以使用threading ,但是由于著名的 GIL存在,实际上threading 并未提供真正有效的并发处理,要充分利用到多核CPU,我们需要使用多进程。Python提供了非常好用的多进程包-- multiprocessing。multiprocessing 可以利用multiprocessing.Process对象来创建一个进程,该Process对象与Threading对象的用法基本相同,具有相同的方法(官方原话:" The multiprocessing package mostly replicates the API of the threading module.") 比如:start(),run(),join()的方法。multiprocessing包中也有Lock/Event/Semaphore/Condition/Pipe/Queue类用于进程之间的通信。话不多说 show me the code!

二使用
2.1 初识异同
下面的程序显示threading和multiprocessing的在使用方面的异同,相近的函数join(),start(),append() 等,并做同一件事情打印自己的进程pid

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. import os
  4. import threading
  5. import multiprocessing
  6. def printer(msg):
  7.     print(msg, os.getpid())
  8. print('Main begin:', os.getpid())
  9. # threading
  10. record = []
  11. for i in range(5):
  12.     thread = threading.Thread(target=printer, args=('threading',))
  13.     thread.start()
  14.     record.append(thread)
  15. for thread in record:
  16.     thread.join()
  17. # multi-process
  18. record = []
  19. for i in range(5):
  20.     process = multiprocessing.Process(target=printer, args=('multiprocessing',))
  21.     process.start()
  22.     record.append(process)
  23. for process in record:
  24.     process.join()
  25. print('Main end:', os.getpid())
输出结果

点击(此处)折叠或打开

  1. Main begin: 9524
  2. threading 9524
  3. threading 9524
  4. threading 9524
  5. threading 9524
  6. threading 9524
  7. multiprocessing 9539
  8. multiprocessing 9540
  9. multiprocessing 9541
  10. multiprocessing 9542
  11. multiprocessing 9543
  12. Main end: 9524
从例子的结果可以看出多线程threading的进程id和主进程(父进程)pid一样 ,同为9524; 多进程打印的pid每个都不一样,for循环中每创建一个process对象都年开一个进程。其他相关的方法基本类似。

2.2 用法
创建进程的类:
Process([group [, target [, name [, args [, kwargs]]]]]),
target表示调用对象,
args表示调用对象的位置参数元组。
kwargs表示调用对象的字典。
name为进程的别名。
group实质上不使用,为None。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程,并自动调用run方法.
属性:authkey、daemon(要通过start()设置,必须设置在方法start之前)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。

2.3 创建单进程
单线程比较简单,创建一个 Process的实例对象就好,传入参数 target 为已经定义好的方法worker以及worker需要的参数

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. """
  4. author: yangyi@youzan.com
  5. time: 2017/7/2 下午6:45
  6. func:
  7. """
  8. import multiprocessing
  9. import datetime, time
  10. def worker(interval):
  11.     print("process start: {0}".format(datetime.datetime.today()));
  12.     time.sleep(interval)
  13.     print("process end: {0}".format(datetime.datetime.today()));

  14. if __name__ == "__main__":
  15.     p = multiprocessing.Process(target=worker, args=(5,))
  16.     p.start()
  17.     p.join()
  18.     print "end!"
2.4 创建多进程

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. """
  4. author: yangyi@youzan.com
  5. time: 2017/7/2 下午7:50
  6. func:
  7. """
  8. import multiprocessing
  9. def worker(num):
  10.     print "worker %d" %num


  11. if __name__ == "__main__":
  12.     print("The number of CPU is:" + str(multiprocessing.cpu_count()))
  13.     proc = []
  14.     for i in xrange(5):
  15.         p = multiprocessing.Process(target=worker, args=(i,))
  16.         proc.append(p)
  17.     for p in proc:
  18.         p.start()
  19.     for p in proc:
  20.         p.join()
  21.     print "end ..."
输出

点击(此处)折叠或打开

  1. The number of CPU is:4
  2. worker 0
  3. worker 1
  4. worker 2
  5. worker 3
  6. worker 4
  7. main process end ...
2.5 线程池
multiprocessing提供进程池的类--Pool,它可以指定程序最大可以调用的进程数量,当有新的请求提交到pool中时,如果进程池还没有满,那么就会创建一个新的进程用来执行该请求;但如果进程池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。
构造方法:
Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
processes  : 使用的工作进程的数量,如果processes是None,默认使用os.cpu_count()返回的数量。
initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
context: 用在制定工作进程启动时的上下文,一般使用multiprocessing.Pool()或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。

实例方法:
  apply(func[, args[, kwds]]):同步进程池
  apply_async(func[, args[, kwds[, callback[, error_callback]]]]) :异步进程池
  close() : 关闭进程池,阻止更多的任务提交到pool,待任务完成后,工作进程会退出。
  terminate() : 结束工作进程,不在处理未完成的任务.
  join() : 等待工作线程的退出,在调用join()前必须调用close()或者 terminate(),因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. """
  4. author: yangyi@youzan.com
  5. time: 2017/7/2 下午7:50
  6. func:
  7. """
  8. from multiprocessing import Pool
  9. import time
  10. def worker(num):
  11.     print "worker %d" %num
  12.     time.sleep(2)
  13.     print "end worker %d" %num

  14. if __name__ == "__main__":
  15.     proc_pool = Pool(2)
  16.     for i in xrange(4):
  17.         proc_pool.apply_async(worker, (i,)) #使用了异步调用,从输出结果可以看出来

  18.     proc_pool.close()
  19.     proc_pool.join()
  20.     print "main process end ..."
输出结果

点击(此处)折叠或打开

  1. worker 0
  2. worker 1
  3. end worker 0
  4. end worker 1
  5. worker 2
  6. worker 3
  7. end worker 2
  8. end worker 3
  9. main process end ..
解释:创建一个进程池pool 对象proc_pool,并设定进程的数量为2,xrange(4)会相继产生四个对象[0, 1, 2, 4],四个对象被提交到pool中,因pool指定进程数为2,所以0、1会直接送到进程中执行,当其中的2个任务执行完之后才空出2进程处理对象2和3,所以会出现输出  worker 2 worker 3 出现在end worker 0 end worker 1之后。思考一下如果调用   proc_pool.apply(worker, (i,)) 的输出结果会是什么样的?

2.6 使用queue
multiprocessing提供队列类,可以通过调用multiprocessing.Queue(maxsize) 初始化队列对象,maxsize表示队列里面最多的元素个数。
例子 创建了两个函数入队,出队,出队处理时使用了lock特性,串行化取数据。
  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. """
  4. author: yangyi@youzan.com
  5. time: 2017/7/2 下午9:03
  6. func:
  7. """
  8. import time
  9. from multiprocessing import Process, current_process,Lock,Queue
  10. import datetime
  11. def inputQ(queue):
  12.     time.sleep(1)
  13.     info = "proc_name: " + current_process().name + ' was putted in queue at: ' + str(datetime.datetime.today())
  14.     queue.put(info)
  15. def outputQ(queue,lock):
  16.     info = queue.get()
  17.     lock.acquire()
  18.     print ("proc_name: " + current_process().name + ' gets info :' + info)
  19.     lock.release()
  20. if __name__ == '__main__':
  21.     record1 = [] # store input processes
  22.     record2 = [] # store output processes
  23.     lock = Lock() # To prevent messy print
  24.     queue = Queue(3)
  25.     for i in range(10):
  26.         process = Process(target=inputQ, args=(queue,))
  27.         process.start()
  28.         record1.append(process)
  29.     for i in range(10):
  30.         process = Process(target=outputQ, args=(queue,lock))
  31.         process.start()
  32.         record2.append(process)
  33.     for p in record1:
  34.         p.join()
  35.     queue.close() # No more object will come, close the queue
  36.     for p in record2:
  37.         p.join()
2.7 使用pipe 
Pipe可以是单向(half-duplex),也可以是双向(duplex)。我们通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。一个进程从PIPE一端输入对象,然后被PIPE另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。
用法 multiprocessing.Pipe([duplex])
该类返回一组对象实例(conn1, conn2),分别代表发送和接受消息的两端。

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. """
  4. author: yangyi@youzan.com
  5. time: 2017/7/2 下午8:01
  6. func:
  7. """
  8. from multiprocessing import Process, Pipe
  9. def p1(conn, name):
  10.     conn.send('hello ,{name}'.format(name=name))
  11.     print "p1 receive :", conn.recv()
  12.     conn.close()

  13. def p2(conn, name):
  14.     conn.send('hello ,{name}'.format(name=name))
  15.     print "p2 receive :", conn.recv()
  16.     conn.close()

  17. if __name__ == '__main__':
  18.     parent_conn, child_conn = Pipe()
  19.     proc1 = Process(target=p1, args=(child_conn, "parent_conn"))
  20.     proc2 = Process(target=p2, args=(parent_conn, "child_conn"))
  21.     proc1.start()
  22.     proc2.start()
  23.     proc1.join()
  24.     proc2.join()
输出:

点击(此处)折叠或打开

  1. p1 receive : hello ,child_conn
  2. p2 receive : hello ,parent_conn
该例子中 p1 p2 通过pipe 给彼此相互发送信息,p1 发送"parent_conn" 给 p2 ,p2 发送"child_conn" 给p1.
2.8 daemon程序对比结果
  1. import multiprocessing
  2. import datetime, time
  3. def worker(interval):
  4.     print("process start: {0}".format(datetime.datetime.today()));
  5.     time.sleep(interval)
  6.     print("process end: {0}".format(datetime.datetime.today()));
  7. if __name__ == "__main__":
  8.     p = multiprocessing.Process(target=worker, args=(5,))
  9.     p.start()
  10.     print "end!"
输出:

点击(此处)折叠或打开

  1. end!
  2. process start: 2017-07-02 18:47:30.656244
  3. process end: 2017-07-02 18:47:35.657464

设置 daemon  =  True,程序随着主程序结束而不等待子进程。
  1. import multiprocessing
  2. import datetime, time
  3. def worker(interval):
  4.     print("process start: {0}".format(datetime.datetime.today()));
  5.     time.sleep(interval)
  6.     print("process end: {0}".format(datetime.datetime.today()));
  7. if __name__ == "__main__":
  8.     p = multiprocessing.Process(target=worker, args=(5,))
  9.     p.daemon = True
  10.     p.start()
  11.     print "end!"
输出:
end!
因为子进程设置了daemon属性,主进程结束,multiprocessing创建的进程对象就随着结束了。

  1. import multiprocessing
  2. import datetime, time
  3. def worker(interval):
  4.     print("process start: {0}".format(datetime.datetime.today()));
  5.     time.sleep(interval)
  6.     print("process end: {0}".format(datetime.datetime.today()));
  7. if __name__ == "__main__":
  8.     p = multiprocessing.Process(target=worker, args=(5,))
  9.     p.daemon = True  #
  10.     p.start()
  11.     p.join() #进程执行完毕后再关闭
  12.     print "end!"
输出:

点击(此处)折叠或打开

  1. process start: 2017-07-02 18:48:20.953754
  2. process end: 2017-07-02 18:48:25.954736

2.9 Lock()
当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。
实例方法:
acquire([timeout]): 使线程进入同步阻塞状态,尝试获得锁定。
release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。
例子:
多个进程使用同一个std_out ,使用lock机制确保同一个时刻有一个一个进程获取输出。

  1. #!/usr/bin/env python
    # encoding: utf-8
    """
    author: yangyi@youzan.com
    time: 2017/7/2 下午9:28
    func: 
    """
    from multiprocessing import Process, Lock
    def func_with_lock(l, i):
        l.acquire()
        print 'hello world', i
        l.release()


    def func_without_lock(i):
        print 'hello world', i


    if __name__ == '__main__':
        lock = Lock()
        print "func_with_lock :"
        for num in range(10):
            Process(target=func_with_lock, args=(lock, num)).start()


输出:

点击(此处)折叠或打开

  1. func_with_lock :
  2. hello world 0
  3. hello world 1
  4. hello world 2
  5. hello world 3
  6. hello world 4
  7. hello world 5
  8. hello world 6
  9. hello world 7
  10. hello world 8
  11. hello world 9

三 小结
 本文参考官方资料以及其他资源,对multiprocesssing 的使用方式做了总结,还有很多知识需要详细阅读官方文档。纸上来得终觉浅,绝知此事要躬行。参考资料
[1] 官方文档 
[2] Python标准库10 多进程初步 (multiprocessing包)
目录
相关文章
|
Unix Linux Python
114 python高级 - multiprocessing
114 python高级 - multiprocessing
41 0
|
3月前
|
数据采集 并行计算 安全
Python并发编程:多进程(multiprocessing模块)
在处理CPU密集型任务时,Python的全局解释器锁(GIL)可能会成为瓶颈。为了充分利用多核CPU的性能,可以使用Python的multiprocessing模块来实现多进程编程。与多线程不同,多进程可以绕过GIL,使得每个进程在自己的独立内存空间中运行,从而实现真正的并行计算。
|
3月前
|
Unix Linux API
Python multiprocessing模块
Python multiprocessing模块
|
4月前
|
数据处理 调度 Python
Python并发编程实战指南:深入理解线程(threading)与进程(multiprocessing)的奥秘,打造高效并发应用!
【7月更文挑战第8天】Python并发编程探索:使用`threading`模块创建线程处理任务,虽受限于GIL,适合I/O密集型工作。而`multiprocessing`模块通过进程实现多核利用,适用于CPU密集型任务。通过实例展示了线程和进程的创建与同步,强调了根据任务类型选择合适并发模型的重要性。
60 5
|
4月前
|
数据库 数据安全/隐私保护 C++
Python并发编程实战:线程(threading)VS进程(multiprocessing),谁才是并发之王?
【7月更文挑战第10天】Python并发对比:线程轻量级,适合I/O密集型任务,但受GIL限制;进程绕过GIL,擅CPU密集型,但通信成本高。选择取决于应用场景,线程利于数据共享,进程利于多核利用。并发无“王者”,灵活运用方为上策。
59 2
|
4月前
|
Python
在Python中,`multiprocessing`模块提供了一种在多个进程之间共享数据和同步的机制。
在Python中,`multiprocessing`模块提供了一种在多个进程之间共享数据和同步的机制。
|
4月前
|
安全 API Python
`multiprocessing`是Python的一个标准库,用于支持生成进程,并通过管道和队列、信号量、锁和条件变量等同步原语进行进程间通信(IPC)。
`multiprocessing`是Python的一个标准库,用于支持生成进程,并通过管道和队列、信号量、锁和条件变量等同步原语进行进程间通信(IPC)。
|
4月前
|
API 数据库 C++
震惊!Python并发编程大揭秘:线程(threading)VS进程(multiprocessing),你选对了吗?
【7月更文挑战第8天】在Python并发编程中,线程适合I/O密集型任务,如实时订单处理,而进程适合CPU密集型任务,如商品信息同步。线程利用轻量级并发,处理I/O等待时切换成本低;进程通过multiprocessing模块充分利用多核CPU。根据任务类型选择合适工具,能提升效率并优化系统性能。理解和运用线程与进程,是解决并发问题的关键。
39 0
|
6月前
|
数据采集 Java Python
python并发编程:使用多进程multiprocessing模块加速程序的运行
python并发编程:使用多进程multiprocessing模块加速程序的运行
125 1
|
6月前
|
安全 Python
python多进程multiprocessing使用
如果你想在python中使用线程来实现并发以提高效率,大多数情况下你得到的结果是比串行执行的效率还要慢;这主要是python中GIL(全局解释锁)的缘故,通常情况下线程比较适合高IO低CPU的任务,否则创建线程的耗时可能比串行的还要多。GIL是历史问题,和C解释器有关系。 为了解决这个问题,python中提供了多进程的方式来处理需要并发的任务,可以有效的利用多核cpu达到并行的目的。【2月更文挑战第5天】
128 0
下一篇
无影云桌面