Python多进程multiprocessing使用示例

简介: 来源:http://outofmemory.cn/code-snippet/2267/Python-duojincheng-multiprocessing-usage-example 来源:http://www.jb51.net/article/67116.htm 来源:http://blog.csdn.net/qdx411324962/article/details/468104

来源:http://outofmemory.cn/code-snippet/2267/Python-duojincheng-multiprocessing-usage-example

来源:http://www.jb51.net/article/67116.htm

来源:http://blog.csdn.net/qdx411324962/article/details/46810421

来源:http://www.lxway.com/4488626156.htm

由于要做把一个多线程改成多进程,看一下相关方面的东西,总结一下,主要是以下几个相关的标准库

  1. subprocess
  2. signal
  3. threading
  4. multiprocessing
单线程
#coding=utf-8
import threading
from time import ctime,sleep

def music(func):
    for i in range(2):
        print "listen music  %s. %s" %(func,ctime())
        sleep(1)

def movie(func):
    for i in range(2):
        print "watch movie! %s" %(func,ctime())
        sleep(5)

if __name__ == '__main__':
    music(u'trouble is a friend')
    movie(u'变形金刚')
    print "all over %s" %ctime()


多线程
python提供了两个模块来实现多线程thread 和threading 。 thread有一些缺点,在threading 得到了弥补,直接使用threading。
#coding=utf-8
import threading
from time import ctime,sleep

def music(func):
    for i in range(2):
        print "listen music  %s. %s" %(func,ctime())
        sleep(1)

def movie(func):
    for i in range(2):
        print "watch movie  %s! %s" %(func,ctime())
        sleep(5)

threads = []
t1 = threading.Thread(target=music,args=(u'trouble is a friend',))
threads.append(t1)
t2 = threading.Thread(target=movie,args=(u'变形金刚',))
threads.append(t2)

if __name__ == '__main__':
    for t in threads:
        t.setDaemon(True)
        t.start()
    print "all over %s" %ctime()
 
setDaemon(True)将线程声明为守护线程,必须在start() 方法调用之前设置,如果不设置为守护线程程序会被无限挂起。
子线程启动后,父线程也继续执行下去,当父线程执行完最后一条语句print "all over %s" %ctime()后,没有等待子线程,直接就退出了,同时子线程也一同结束。 
start()开始线程活动。

调整程序:
if __name__ == '__main__':
    for t in threads:
        t.setDaemon(True)
        t.start()
    
    t.join()
    print "all over %s" %ctime()

对上面的程序加了个join()方法,用于等待线程终止。join()的作用是,在子线程完成运行之前,这个子线程的父线程将一直被阻塞。
注意:  join()方法的位置是在for循环外的,也就是说必须等待for循环里的两个进程都结束后,才去执行主进程。
import threading
import time

def worker(num):
    time.sleep(1)
    print("The num is  %d" % num)
    print t.getName()
    return

for i in range(20):
    t = threading.Thread(target=worker, args=(i,), name="testThread")
    t.start()

Thread方法说明   
t.start() : 激活线程,   
t.getName() : 获取线程的名称   
t.setName() :设置线程的名称   
t.name : 获取或设置线程的名称   
t.is_alive() :判断线程是否为激活状态   
t.isAlive() :判断线程是否为激活状态   
t.setDaemon() 设置为后台线程或前台线程(默认:False);通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之后才可以使用。如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止   
t.isDaemon() :判断是否为守护线程   
t.ident:获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。   
t.join() :逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义   
t.run() :线程被cpu调度后自动执行线程对象的run方法

2、线程锁threading.RLock和threading.Lock    
由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,CPU接着执行其他线程。为了保证数据的准确性,引入了锁的概念。
所以,可能出现如下问题:   
例:假设列表A的所有元素就为0,当一个线程从前向后打印列表的所有元素,另外一个线程则从后向前修改列表的元素为1,
那么输出的时候,列表的元素就会一部分为0,一部分为1,这就导致了数据的不一致。锁的出现解决了这个问题。

import threading
import time
globals_num = 0
lock = threading.RLock()

def func():
    lock.acquire()  # 获得锁
    global globals_num
    globals_num += 1
    time.sleep(1)
    print(globals_num)
    lock.release()  # 释放锁

for i in range(10):
    t = threading.Thread(target=func)
    t.start()
    pass

3、threading.RLock和threading.Lock的区别  
RLock允许在同一线程中被多次acquire。
而Lock却不允许这种情况。如果使用RLock,那么acquire和release必须成对出现,
即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐。

import threading  
lock = threading.Lock()    #Lock对象 
lock.acquire()  
lock.acquire()  #产生了死琐。 
lock.release() 
lock.release()  


import threading  
rLock = threading.RLock()  #RLock对象 
rLock.acquire()  
rLock.acquire()    #在同一线程内,程序不会堵塞。 
rLock.release() 
rLock.release()

4、threading.Event    
python线程的事件用于主线程控制其他线程的执行,
事件主要提供了三个方法 set、wait、clear。   
事件处理的机制:全局定义了一个“Flag”,
如果“Flag”值为 False,那么当程序执行event.wait方法时就会阻塞,
如果“Flag”值为True,那么event.wait方法时便不再阻塞。
clear:将“Flag”设置为False 
set:将“Flag”设置为True  
Event.isSet() :判断标识位是否为Ture。

import threading
def do(event):
    print('start')
    event.wait()
    print('execute')

event_obj = threading.Event()
for i in range(10):
    t = threading.Thread(target=do, args=(event_obj,))
    t.start()

event_obj.clear()
# inp = input('input:')
inp = raw_input('input:')
if inp == 'true':
    event_obj.set()
当线程执行的时候,如果flag为False,则线程会阻塞,当flag为True的时候,线程不会阻塞。它提供了本地和远程的并发性。

5、threading.Condition    
一个condition变量总是与某些类型的锁相联系,这个可以使用默认的情况或创建一个,
当几个condition变量必须共享和同一个锁的时候,是很有用的。锁是conditon对象的一部分:没有必要分别跟踪。

condition变量服从上下文管理协议:with语句块封闭之前可以获取与锁的联系。 
acquire() 和 release() 会调用与锁相关联的相应的方法。   
其他和锁关联的方法必须被调用,wait()方法会释放锁,
当另外一个线程使用 notify() or notify_all()唤醒它之前会一直阻塞。一旦被唤醒,wait()会重新获得锁并返回,

Condition类实现了一个conditon变量。这个conditiaon变量允许一个或多个线程等待,直到他们被另一个线程通知。
如果lock参数,被给定一个非空的值,,那么他必须是一个lock或者Rlock对象,它用来做底层锁。否则,会创建一个新的Rlock对象,用来做底层锁。

wait(timeout=None) :等待通知,或者等到设定的超时时间。
当调用这wait()方法时,如果调用它的线程没有得到锁,那么会抛出一个RuntimeError异常。
wati()释放锁以后,在被调用相同条件的另一个进程用notify() or notify_all() 叫醒之前会一直阻塞。
wait()还可以指定一个超时时间。  如果有等待的线程,notify()方法会唤醒一个在等待conditon变量的线程。notify_all() 则会唤醒所有在等待conditon变量的线程。 

注意: notify()和notify_all()不会释放锁,也就是说,线程被唤醒后不会立刻返回他们的wait() 调用。
除非线程调用notify()和notify_all()之后放弃了锁的所有权。   在典型的设计风格里,利用condition变量用锁去通许访问一些共享状态,线程在获取到它想得到的状态前,会反复调用wait()。修改状态的线程在他们状态改变时调用 notify() or notify_all(),用这种方式,线程会尽可能的获取到想要的一个等待者状态。

例子:生产者-消费者模型,
import threading
import time

def consumer(cond):
    with cond:
        print("consumer before wait")
        cond.wait()
        print("consumer after wait")

def producer(cond):
    with cond:
        print("producer before notifyAll")
        cond.notifyAll()
        print("producer after notifyAll")

condition = threading.Condition()
c1 = threading.Thread(name="c1", target=consumer, args=(condition,))
c2 = threading.Thread(name="c2", target=consumer, args=(condition,))
p = threading.Thread(name="p", target=producer, args=(condition,))

c1.start()
time.sleep(2)
c2.start()
time.sleep(2)
p.start()

在使用并发设计的时候最好尽可能的避免共享数据,尤其是在使用多进程的时候。
如果你真有需要要共享数据, multiprocessing提供了两种方式。   
(1)multiprocessing,Array,Value
数据可以用Value或Array存储在一个共享内存地图里,如下:
from multiprocessing import Array, Value, Process

def func(a, b):
    a.value = 3.333333333333333
    for j in range(len(b)):
        b[j] = -b[j]

if __name__ == "__main__":
    num = Value('d', 0.0)
    arr = Array('i', range(11))

    if 0:
        a = Process(target=func, args=(num, arr))
        a.start()
        a.join()
    else:
        c = Process(target=func, args=(num, arr))
        d = Process(target=func, args=(num, arr))
        c.start()
        d.start()
        c.join()
        d.join()

    print(num.value)
    for i in arr:
        print i,

输出:
3.1415927  
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

创建num和arr时,“d”和“i”参数由Array模块使用的typecodes创建:
“d”表示一个双精度的浮点数,“i”表示一个有符号的整数,这些共享对象将被线程安全的处理。
Array(‘i’, range(10))中的‘i’参数:   
‘c’: ctypes.c_char    
‘u’: ctypes.c_wchar    
‘b’: ctypes.c_byte    
‘B’: ctypes.c_ubyte
‘h’: ctypes.c_short    
‘H’: ctypes.c_ushort   
‘i’: ctypes.c_int     
‘I’: ctypes.c_uint  
‘l’: ctypes.c_long,    
‘L’: ctypes.c_ulong    
‘f’: ctypes.c_float    
‘d’: ctypes.c_double

(2)multiprocessing,Manager   
由Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array类型的支持。

from multiprocessing import Process, Manager
def f(d, l):
    d["name"] = "king"
    d["age"] = 100
    d["Job"] = "python"
    l.reverse()

if __name__ == "__main__":
    with Manager() as man:
        d_temp = man.dict()
        l_temp = man.list(range(10))

        p = Process(target=f, args=(d_temp, l_temp))
        p.start()
        p.join()

        print(d_temp)
        print(l_temp) 

Server process manager比 shared memory 更灵活,因为它可以支持任意的对象类型。
另外,一个单独的manager可以通过进程在网络上不同的计算机之间共享,不过他比shared memory要慢。

2、进程池(Using a pool of workers)    Pool类描述了一个工作进程池,他有几种不同的方法让任务卸载工作进程。   
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,
如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
我们可以用Pool类创建一个进程池,展开提交的任务给进程池。
例:
#apply
from multiprocessing import Pool
import time

def f1(arg):
    time.sleep(0.5)
    print(arg)
    return arg + 100

if __name__ == "__main__":
    pool = Pool(5)
    for i in range(1, 10):
        pool.apply(func=f1, args=(i,))

#apply_async
from multiprocessing import Pool
def f1(i):
    time.sleep(1)
    print(i)
    return i + 100

def f2(arg):
    print(arg)

if __name__ == "__main__":
    pool = Pool(5)
    for i in range(1, 10):
        pool.apply_async(func=f1, args=(i,), callback=f2)
    pool.close()
    pool.join()

一个进程池对象可以控制工作进程池的哪些工作可以被提交,它支持超时和回调的异步结果,有一个类似map的实现。
processes :使用的工作进程的数量,如果processes是None那么使用os.cpu_count()返回的数量。
initializer:如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。  
maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个心的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。  
context: 用在制定工作进程启动时的上下文,一般使用multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context
注意:Pool对象的方法只可以被创建pool的进程所调用。

进程池的方法  
apply(func[, args[, kwds]]) :使用arg和kwds参数调用func函数,结果返回前会一直阻塞,
由于这个原因,apply_async()更适合并发执行,另外,func函数仅被pool中的一个进程运行。   
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : 
apply()方法的一个变体,会返回一个结果对象。
如果callback被指定,那么callback可以接收一个参数然后被调用,当结果准备好回调时会调用callback,调用失败时,
则用error_callback替换callback。 Callbacks应被立即完成,否则处理结果的线程会被阻塞。   
close() :阻止更多的任务提交到pool,待任务完成后,工作进程会退出。   
terminate() :不管任务是否完成,立即停止工作进程。在对pool对象进程垃圾回收的时候,会立即调用terminate()。
join() : wait工作线程的退出,在调用join()前,必须调用close() or terminate()。
这样是因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。   
map(func, iterable[, chunksize])   
map_async(func, iterable[, chunksize[, callback[, error_callback]]])
imap(func, iterable[, chunksize])
imap_unordered(func, iterable[, chunksize])   
starmap(func, iterable[, chunksize])
starmap_async(func, iterable[, chunksize[, callback[, error_back]]])

python 协程    
线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。
协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。
协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。   
协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),
event loop是协程执行的控制点,如果你希望执行协程,就需要用到它们。
event loop提供了如下的特性:   注册、执行、取消延时调用(异步函数) 创建用于通信的client和server协议(工具) 创建和别的程序通信的子进程和协议(工具) 把函数调用送入线程池中 协程示例:
#---------python3_start---------------
import asyncio
async def cor1():
	print("COR1 start")
	await cor2()
	print("COR1 end")


async def cor2():
	print("COR2")

loop = asyncio.get_event_loop()
loop.run_until_complete(cor1())
loop.close()
#---------python3_end---------------
最后三行是重点。
asyncio.get_event_loop()  : asyncio启动默认的event loop  
run_until_complete()  :  这个函数是阻塞执行的,知道所有的异步函数执行完成,  
close()  :  关闭event loop。

1、greenlet

import greenlet
def fun1():
    print("12")
    gr2.switch()
    print("56")
    gr2.switch()

def fun2():
    print("34")
    gr1.switch()
    print("78")

gr1 = greenlet.greenlet(fun1)
gr2 = greenlet.greenlet(fun2)
gr1.switch()

2、gevent
gevent属于第三方模块需要下载安装包
pip3 install --upgrade pip3 
pip3 install gevent 

import gevent
def fun1():
    print("www.baidu.com")  # 第一步
    gevent.sleep(0)
    print("end the baidu.com")  # 第三步

def fun2():
    print("www.zhihu.com")  # 第二步
    gevent.sleep(0)
    print("end th zhihu.com")  # 第四步

gevent.joinall([
    gevent.spawn(fun1),
    gevent.spawn(fun2),
])

遇到IO操作自动切换:
import gevent
import requests

def func(url):
    print("get: %s" % url)
    gevent.sleep(0)
    proxies = {
        "http": "http://172.17.18.80:8080",
        "https": "http://172.17.18.80:8080",
    }

    date = requests.get(url, proxies=proxies)
    ret = date.text
    print(url, len(ret))

gevent.joinall([
    gevent.spawn(func, 'https://www.baidu.com/'),
    gevent.spawn(func, 'http://www.sina.com.cn/'),
    gevent.spawn(func, 'http://www.qq.com/'),
])

Python中使用线程有两种方式:函数或者用类来包装线程对象。

函数式:调用thread模块中的start_new_thread()函数来产生新线程。
语法如下: thread.start_new_thread ( function, args[, kwargs] )
参数说明:
function : 线程函数。
args : 传递给线程函数的参数,他必须是个tuple类型。
kwargs :可选参数。

import thread
import time


def print_time(thread_name, delay):
        count = 0
        while count < 5:
                time.sleep(delay)
                count += 1
                print "%s: %s" % (thread_name, time.ctime(time.time()))


if __name__ == "__main__":
    try:
            thread.start_new_thread(print_time, ("Thread-1", 2))
            thread.start_new_thread(print_time, ("Thread-2", 4))
    except BaseException as e:
            print e
            print "Error: unable to start thread"
    while 1:
            pass
使用Threading模块创建线程,直接从threading.Thread继承,然后重写__init__方法和run方法:
# coding=utf-8
# !/usr/bin/python
import threading
import time

exitFlag = 0

class myThread(threading.Thread):
    def __init__(self, threadID, name, counter):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.counter = counter

    def run(self):
        print "Starting " + self.name
        print_time(self.name, self.counter, 5)
        print "Exiting " + self.name


def print_time(threadName, delay, counter):
    while counter:
        if exitFlag:
            thread.exit()
        time.sleep(delay)
        print "%s: %s" % (threadName, time.ctime(time.time()))
        counter -= 1

thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)

thread1.start()
thread2.start()
print "Exiting Main Thread"
线程同步
如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。
使用Thread对象的Lock和Rlock可以实现简单的线程同步,这两个对象都有acquire方法和release方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到acquire和release方法之间。
如下:
多线程的优势在于可以同时运行多个任务(至少感觉起来是这样)。但是当线程需要共享数据时,可能存在数据不同步的问题。考虑这样一种情况:一个列表里所有元素都是0,线程"set"从后向前把所有元素改成1,而线程"print"负责从前往后读取列表并打印。那么,可能线程"set"开始改的时候,线程"print"便来打印列表了,输出就成了一半0一半1,这就是数据的不同步。为了避免这种情况,引入了锁的概念。锁有两种状态——锁定和未锁定。每当一个线程比如"set"要访问共享数据时,必须先获得锁定;如果已经有别的线程比如"print"获得锁定了,那么就让线程"set"暂停,也就是同步阻塞;等到线程"print"访问完毕,释放锁以后,再让线程"set"继续。经过这样的处理,打印列表时要么全部输出0,要么全部输出1,不会再出现一半0一半1的尴尬场面。
# coding=utf-8
# !/usr/bin/python
import threading
import time


class myThread(threading.Thread):
    def __init__(self, threadID, name, counter):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.counter = counter

    def run(self):
        print "Starting " + self.name
        # 获得锁,成功获得锁定后返回True
        # 可选的timeout参数不填时将一直阻塞直到获得锁定
        # 否则超时后将返回False
        threadLock.acquire()
        print_time(self.name, self.counter, 3)
        # 释放锁
        threadLock.release()


def print_time(threadName, delay, counter):
    while counter:
        time.sleep(delay)
        print "%s: %s" % (threadName, time.ctime(time.time()))
        counter -= 1


threadLock = threading.Lock()
threads = []
# 创建新线程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# 开启新线程
thread1.start()
thread2.start()
# 添加线程到线程列表中
threads.append(thread1)
threads.append(thread2)
# 等待所有线程完成
for t in threads:
    t.join()
print "Exiting Main Thread"
线程优先级队列( Queue)
Python的Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。
Queue模块中的常用方法:
Queue.qsize() 返回队列的大小
Queue.empty() 如果队列为空,返回True,反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.full 与 maxsize 大小对应
Queue.get([block[, timeout]])获取队列,timeout等待时间
Queue.get_nowait() 相当Queue.get(False)
Queue.put(item) 写入队列,timeout等待时间
Queue.put_nowait(item) 相当Queue.put(item, False)
Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
Queue.join() 实际上意味着等到队列为空,再执行别的操作
# coding=utf-8
# !/usr/bin/python
import Queue
import threading
import time

exitFlag = 0


class myThread(threading.Thread):
    def __init__(self, threadID, name, q):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.q = q

    def run(self):
        print "Starting " + self.name
        process_data(self.name, self.q)
        print "Exiting " + self.name


def process_data(threadName, q):
    while not exitFlag:
        queueLock.acquire()
        if not workQueue.empty():
            data = q.get()
            queueLock.release()
            print "%s processing %s" % (threadName, data)
        else:
            queueLock.release()
        time.sleep(1)


threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = Queue.Queue(10)
threads = []
threadID = 1

# 创建线程
for tName in threadList:
    thread = myThread(threadID, tName, workQueue)
    thread.start()
    threads.append(thread)
    threadID += 1

# 填充队列
queueLock.acquire()
for word in nameList:
    workQueue.put(word)
queueLock.release()

# 等待队列清空
while not workQueue.empty():
    pass
# 通知线程退出
exitFlag = 1
# 等待所有线程完成
for t in threads:
    t.join()
print "Exiting Main Thread"


共享变量 (参考: http://www.jb51.net/article/65459.htm )
from multiprocessing import Process, Manager
import os

manager = Manager()
temp_list = []
manager_list = manager.list()
manager_value = manager.Value('i', 0)


def test_func(cc):
    temp_list.append(cc)
    manager_list.append(cc)
    manager_value.value += cc
    print 'process id:', os.getpid()


if __name__ == '__main__':
    threads = []

    for ll in range(10):
        t = Process(target=test_func, args=(ll,))
        t.daemon = True
        threads.append(t)

    for i in range(len(threads)):
        threads[i].start()

    for j in range(len(threads)):
        threads[j].join()

    print "------------------------"
    print 'process id:', os.getpid()
    print temp_list
    print manager_list
    print manager_value
from multiprocessing import Process, Manager
import time

manager = Manager()
manager_value = manager.Value('i', 0)

def test_func(cc, seconds):
    while True:
        manager_value.value += cc
        print manager_value.value
        time.sleep(seconds)

if __name__ == '__main__':
    threads = [Process(target=test_func, args=(1, i+1)) for i in range(5)]
    map(lambda x: x.start(), threads)
    map(lambda x: x.join(), threads)
    pass




mutilprocess简介

像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多。

简单的创建进程

import multiprocessing

def worker(num):
    """thread worker function"""
    print 'Worker:', num
    return

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

确定当前的进程,即是给进程命名,方便标识区分,跟踪

import multiprocessing
import time

def worker():
    name = multiprocessing.current_process().name
    print name, 'Starting'
    time.sleep(2)
    print name, 'Exiting'

def my_service():
    name = multiprocessing.current_process().name
    print name, 'Starting'
    time.sleep(3)
    print name, 'Exiting'

if __name__ == '__main__':
    service = multiprocessing.Process(name='my_service',
                                      target=my_service)
    worker_1 = multiprocessing.Process(name='worker 1',
                                       target=worker)
    worker_2 = multiprocessing.Process(target=worker) # default name

    worker_1.start()
    worker_2.start()
    service.start()

守护进程

守护进程就是不阻挡主程序退出,自己干自己的mutilprocess.setDaemon(True)

就这句

等待守护进程退出,要加上join,join可以传入浮点数值,等待n久就不等了

import multiprocessing
import time
import sys

def daemon():
    name = multiprocessing.current_process().name
    print 'Starting:', name
    time.sleep(2)
    print 'Exiting :', name

def non_daemon():
    name = multiprocessing.current_process().name
    print 'Starting:', name
    print 'Exiting :', name

if __name__ == '__main__':
    d = multiprocessing.Process(name='daemon',
                                target=daemon)
    d.daemon = True

    n = multiprocessing.Process(name='non-daemon',
                                target=non_daemon)
    n.daemon = False

    d.start()
    n.start()

    d.join(1)
    print 'd.is_alive()', d.is_alive()
    n.join()

终止进程

最好使用 poison pill,强制的使用terminate()

注意 terminate之后要join,使其可以更新状态

import multiprocessing
import time

def slow_worker():
    print 'Starting worker'
    time.sleep(0.1)
    print 'Finished worker'

if __name__ == '__main__':
    p = multiprocessing.Process(target=slow_worker)
    print 'BEFORE:', p, p.is_alive()

    p.start()
    print 'DURING:', p, p.is_alive()

    p.terminate()
    print 'TERMINATED:', p, p.is_alive()

    p.join()
    print 'JOINED:', p, p.is_alive()

进程的退出状态

  1. == 0 未生成任何错误
  2. 0 进程有一个错误,并以该错误码退出

  3. < 0 进程由一个-1 * exitcode信号结束
import multiprocessing
import sys
import time

def exit_error():
    sys.exit(1)

def exit_ok():
    return

def return_value():
    return 1

def raises():
    raise RuntimeError('There was an error!')

def terminated():
    time.sleep(3)

if __name__ == '__main__':
    jobs = []
    for f in [exit_error, exit_ok, return_value, raises, terminated]:
        print 'Starting process for', f.func_name
        j = multiprocessing.Process(target=f, name=f.func_name)
        jobs.append(j)
        j.start()

    jobs[-1].terminate()

    for j in jobs:
        j.join()
        print '%15s.exitcode = %s' % (j.name, j.exitcode)

日志

方便的调试,可以用logging

import multiprocessing
import logging
import sys

def worker():
    print 'Doing some work'
    sys.stdout.flush()

if __name__ == '__main__':
    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    p = multiprocessing.Process(target=worker)
    p.start()
    p.join()

派生进程

利用class来创建进程,定制子类

import multiprocessing

class Worker(multiprocessing.Process):

    def run(self):
        print 'In %s' % self.name
        return

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = Worker()
        jobs.append(p)
        p.start()
    for j in jobs:
        j.join()

python进程间传递消息

这一块我之前结合SocketServer写过一点,见Python多进程

一般的情况是Queue来传递。

import multiprocessing

class MyFancyClass(object):

    def __init__(self, name):
        self.name = name

    def do_something(self):
        proc_name = multiprocessing.current_process().name
        print 'Doing something fancy in %s for %s!' % \
            (proc_name, self.name)

def worker(q):
    obj = q.get()
    obj.do_something()

if __name__ == '__main__':
    queue = multiprocessing.Queue()

    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()

    queue.put(MyFancyClass('Fancy Dan'))

    # Wait for the worker to finish
    queue.close()
    queue.join_thread()
    p.join()

import multiprocessing
import time

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print '%s: Exiting' % proc_name
                self.task_queue.task_done()
                break
            print '%s: %s' % (proc_name, next_task)
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return

class Task(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b
    def __call__(self):
        time.sleep(0.1) # pretend to take some time to do the work
        return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
    def __str__(self):
        return '%s * %s' % (self.a, self.b)

if __name__ == '__main__':
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Start consumers
    num_consumers = multiprocessing.cpu_count() * 2
    print 'Creating %d consumers' % num_consumers
    consumers = [ Consumer(tasks, results)
                  for i in xrange(num_consumers) ]
    for w in consumers:
        w.start()

    # Enqueue jobs
    num_jobs = 10
    for i in xrange(num_jobs):
        tasks.put(Task(i, i))

    # Add a poison pill for each consumer
    for i in xrange(num_consumers):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()

    # Start printing results
    while num_jobs:
        result = results.get()
        print 'Result:', result
        num_jobs -= 1

进程间信号传递

Event提供一种简单的方法,可以在进程间传递状态信息。事件可以切换设置和未设置状态。通过使用一个可选的超时值,时间对象的用户可以等待其状态从未设置变为设置。

import multiprocessing
import time

def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    print 'wait_for_event: starting'
    e.wait()
    print 'wait_for_event: e.is_set()->', e.is_set()

def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    print 'wait_for_event_timeout: starting'
    e.wait(t)
    print 'wait_for_event_timeout: e.is_set()->', e.is_set()

if __name__ == '__main__':
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name='block', 
                                 target=wait_for_event,
                                 args=(e,))
    w1.start()

    w2 = multiprocessing.Process(name='nonblock', 
                                 target=wait_for_event_timeout, 
                                 args=(e, 2))
    w2.start()

    print 'main: waiting before calling Event.set()'
    time.sleep(3)
    e.set()
    print 'main: event is set'


由于Python设计的限制(我说的是咱们常用的CPython)。最多只能用满1个CPU核心。
Python提供了非常好用的多进程包multiprocessing,你只需要定义一个函数,Python会替你完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。

1、新建单一进程

如果我们新建少量进程,可以如下:

import multiprocessing
import time
def func(msg):
  for i in xrange(3):
    print msg
    time.sleep(1)
if __name__ == "__main__":
  p = multiprocessing.Process(target=func, args=("hello", ))
  p.start()
  p.join()
  print "Sub-process done."

2、使用进程池(非阻塞)

是的,你没有看错,不是线程池。它可以让你跑满多核CPU,而且使用方法非常简单。

注意要用apply_async,如果落下async,就变成阻塞版本了。

processes=4是最多并发进程数量。

import multiprocessing
import time
def func(msg):
  for i in xrange(3):
    print msg
    time.sleep(1)
if __name__ == "__main__":
  pool = multiprocessing.Pool(processes=4)
  for i in xrange(10):
    msg = "hello %d" %(i)
    pool.apply_async(func, (msg, ))
  pool.close()
  pool.join()
  print "Sub-process(es) done."

函数解释

  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞
  • close()    关闭pool,使其不在接受新的任务。
  • terminate()    结束工作进程,不在处理未完成的任务。
  • join()    主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。

使用进程池(阻塞)

#coding: utf-8
import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
    pool.close()
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print "Sub-process(es) done."


3、使用Pool,并需要关注结果

更多的时候,我们不仅需要多进程执行,还需要关注每个进程的执行结果,如下:

import multiprocessing
import time
def func(msg):
  for i in xrange(3):
    print msg
    time.sleep(1)
  return "done " + msg
if __name__ == "__main__":
  pool = multiprocessing.Pool(processes=4)
  result = []
  for i in xrange(10):
    msg = "hello %d" %(i)
    result.append(pool.apply_async(func, (msg, )))
  pool.close()
  pool.join()
  for res in result:
    print res.get()
  print "Sub-process(es) done."

import multiprocessing

def do_calculation(data):
    return data*2

def start_process():
    print 'Starting', multiprocessing.current_process().name

if __name__ == '__main__':
    inputs = list(range(10))
    print 'Inputs  :', inputs

    builtin_output = map(do_calculation, inputs)
    print 'Build-In :', builtin_output

    pool_size = multiprocessing.cpu_count()*2
    pool = multiprocessing.Pool(processes=pool_size, initializer=start_process,)
    # 默认情况下,Pool会创建固定数目的工作进程,并向这些工作进程传递作业,直到再没有更多作业为止。
    # maxtasksperchild参数为每个进程执行task的最大数目,
    # 设置maxtasksperchild参数可以告诉池在完成一定数量任务之后重新启动一个工作进程,
    # 来避免运行时间很长的工作进程消耗太多的系统资源。
    # pool = multiprocessing.Pool(processes=pool_size, initializer=start_process, maxtasksperchild=2)
    print '-' * 20
    pool_outputs = pool.map(do_calculation, inputs)
    pool.close()
    pool.join()

    print 'Pool  :', pool_outputs

使用多个进程池

#coding: utf-8
import multiprocessing
import os, time, random

def Lee():
    print "\nRun task Lee-%s" %(os.getpid()) #os.getpid()获取当前的进程的ID
    start = time.time()
    time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数
    end = time.time()
    print 'Task Lee, runs %0.2f seconds.' %(end - start)

def Marlon():
    print "\nRun task Marlon-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 40)
    end=time.time()
    print 'Task Marlon runs %0.2f seconds.' %(end - start)

def Allen():
    print "\nRun task Allen-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print 'Task Allen runs %0.2f seconds.' %(end - start)

def Frank():
    print "\nRun task Frank-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print 'Task Frank runs %0.2f seconds.' %(end - start)
        
if __name__=='__main__':
    function_list=  [Lee, Marlon, Allen, Frank] 
    print "parent process %s" %(os.getpid())

    pool=multiprocessing.Pool(4)
    for func in function_list:
        pool.apply_async(func)     #Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中

    print 'Waiting for all subprocesses done...'
    pool.close()
    pool.join()    #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
    print 'All subprocesses done.'
multiprocessing pool map

#coding: utf-8
import multiprocessing 

def m1(x): 
    print x * x 

if __name__ == '__main__': 
    pool = multiprocessing.Pool(multiprocessing.cpu_count()) 
    i_list = range(8)
    pool.map(m1, i_list)

#coding: utf-8
import multiprocessing
import logging

def create_logger(i):
    print i

class CreateLogger(object):
    def __init__(self, func):
        self.func = func

if __name__ == '__main__':
    ilist = range(10)

    cl = CreateLogger(create_logger)
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    pool.map(cl.func, ilist)

    print "hello------------>"

Python 多进程 multiprocessing.Pool类详解

multiprocessing模块

multiprocessing包是Python中的多进程管理包。它与 threading.Thread类似,可以利用multiprocessing.Process对象来创建一个进程。该进程可以允许放在Python程序内部编写的函数中。该Process对象与Thread对象的用法相同,拥有is_alive()、join([timeout])、run()、start()、terminate()等方法。属性有:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类,用来同步进程,其用法也与threading包中的同名类一样。multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。

这个模块表示像线程一样管理进程,这个是multiprocessing的核心,它与threading很相似,对多核CPU的利用率会比threading好的多。

看一下Process类的构造方法:

__init__(self, group=None, target=None, name=None, args=(), kwargs={})

参数说明:
group
:进程所属组。基本不用
target:表示调用对象。
args:表示调用对象的位置参数元组。
name:别名
kwargs:表示调用对象的字典。

创建进程的简单实例:

#coding=utf-8
import multiprocessing

def do(n) :
  #获取当前线程的名字
  name = multiprocessing.current_process().name
  print name,'starting'
  print "worker ", n
  return 

if __name__ == '__main__' :
  numList = []
  for i in xrange(5) :
    p = multiprocessing.Process(target=do, args=(i,))
    numList.append(p)
    p.start()
    p.join()
    print "Process end."
执行结果:
Process-1 starting
worker  0
Process end.
Process-2 starting
worker  1
Process end.
Process-3 starting
worker  2
Process end.
Process-4 starting
worker  3
Process end.
Process-5 starting
worker  4
Process end.

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,并用其start()方法启动,这样创建进程比fork()还要简单。
join()方法表示等待子进程结束以后再继续往下运行,通常用于进程间的同步。

注意:
在Windows上要想使用进程模块,就必须把有关进程的代码写在当前.py文件的if __name__ == ‘__main__’ :语句的下面,才能正常使用Windows下的进程模块。Unix/Linux下则不需要。

   Pool类

在使用Python进行系统管理时,特别是同时操作多个文件目录或者远程控制多台主机,并行操作可以节约大量的时间。如果操作的对象数目不大时,还可以直接使用Process类动态的生成多个进程,十几个还好,但是如果上百个甚至更多,那手动去限制进程数量就显得特别的繁琐,此时进程池就派上用场了。
Pool类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。
下面介绍一下multiprocessing 模块下的Pool类下的几个方法

   apply()

函数原型:

apply(func[, args=()[, kwds={}]])

该函数用于传递不定参数,主进程会被阻塞直到函数执行结束(不建议使用,并且3.x以后不在出现)。

   apply_async()

函数原型:

apply_async(func[, args=()[, kwds={}[, callback=None]]])

与apply用法一样,但它是非阻塞且支持结果返回进行回调。

   map()

函数原型:

map(func, iterable[, chunksize=None])

Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到返回结果。
注意,虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。

   close()

关闭进程池(pool),使其不在接受新的任务。

   terminate()

结束工作进程,不在处理未处理的任务。

   join()

主进程阻塞等待子进程的退出,join方法必须在close或terminate之后使用。

multiprocessing.Pool类的实例:

import time
from multiprocessing import Pool
def run(fn):
  #fn: 函数参数是数据列表的一个元素
  time.sleep(1)
  return fn*fn

if __name__ == "__main__":
  testFL = [1,2,3,4,5,6]  
  print 'shunxu:' #顺序执行(也就是串行执行,单进程)
  s = time.time()
  for fn in testFL:
    run(fn)

  e1 = time.time()
  print "顺序执行时间:", int(e1 - s)

  print 'concurrent:' #创建多个进程,并行执行
  pool = Pool(5)  #创建拥有5个进程数量的进程池
  #testFL:要处理的数据列表,run:处理testFL列表中数据的函数
  rl =pool.map(run, testFL) 
  pool.close()#关闭进程池,不再接受新的进程
  pool.join()#主进程阻塞等待子进程的退出
  e2 = time.time()
  print "并行执行时间:", int(e2-e1)
  print rl
执行结果:
shunxu:
顺序执行时间: 6
concurrent:
并行执行时间: 2
[1, 4, 9, 16, 25, 36]

上例是一个创建多个进程并发处理与顺序执行处理同一数据,所用时间的差别。从结果可以看出,并发执行的时间明显比顺序执行要快很多,但是进程是要耗资源的,所以平时工作中,进程数也不能开太大。
程序中的r1表示全部进程执行结束后全局的返回结果集,run函数有返回值,所以一个进程对应一个返回结果,这个结果存在一个列表中,也就是一个结果堆中,实际上是用了队列的原理,等待所有进程都执行完毕,就返回这个列表(列表的顺序不定)。
对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),让其不再接受新的Process了。

再看一个实例:

import time
from multiprocessing import Pool
def run(fn) :
  time.sleep(2)
  print fn
if __name__ == "__main__" :
  startTime = time.time()
  testFL = [1,2,3,4,5]
  pool = Pool(10)#可以同时跑10个进程
  pool.map(run,testFL)
  pool.close()
  pool.join()   
  endTime = time.time()
  print "time :", endTime - startTime
执行结果:
21

3
4
5
time : 2.51999998093
再次执行结果如下:
1
34

2
5
time : 2.48600006104

结果中为什么还有空行和没有折行的数据呢?其实这跟进程调度有关,当有多个进程并行执行时,每个进程得到的时间片时间不一样,哪个进程接受哪个请求以及执行完成时间都是不定的,所以会出现输出乱序的情况。那为什么又会有没这行和空行的情况呢?因为有可能在执行第一个进程时,刚要打印换行符时,切换到另一个进程,这样就极有可能两个数字打印到同一行,并且再次切换回第一个进程时会打印一个换行符,所以就会出现空行的情况。

   进程实战实例

并行处理某个目录下文件中的字符个数和行数,存入res.txt文件中,
每个文件一行,格式为:filename:lineNumber,charNumber

import os
import time
from multiprocessing import Pool

def getFile(path) :
  #获取目录下的文件list
  fileList = []
  for root, dirs, files in list(os.walk(path)) :
    for i in files :
      if i.endswith('.txt') or i.endswith('.10w') :
        fileList.append(root + "\\" + i)
  return fileList

def operFile(filePath) :
  #统计每个文件中行数和字符数,并返回
  filePath = filePath
  fp = open(filePath)
  content = fp.readlines()
  fp.close()
  lines = len(content)
  alphaNum = 0
  for i in content :
    alphaNum += len(i.strip('\n'))
  return lines,alphaNum,filePath

def out(list1, writeFilePath) :
  #将统计结果写入结果文件中
  fileLines = 0
  charNum = 0
  fp = open(writeFilePath,'a')
  for i in list1 :
    fp.write(i[2] + " 行数:"+ str(i[0]) + " 字符数:"+str(i[1]) + "\n")
    fileLines += i[0]
    charNum += i[1]
  fp.close()
  print fileLines, charNum

if __name__ == "__main__":
  #创建多个进程去统计目录中所有文件的行数和字符数
  startTime = time.time()
  filePath = "C:\\wcx\\a"
  fileList = getFile(filePath)
  pool = Pool(5)  
  resultList =pool.map(operFile, fileList)  
  pool.close()
  pool.join()

  writeFilePath = "c:\\wcx\\res.txt"
  print resultList
  out(resultList, writeFilePath)
  endTime = time.time()
  print "used time is ", endTime - startTime

执行结果:

1
耗时不到1秒,可见多进程并发执行速度是很快的。


我们已经见过了使用subprocess包来创建子进程,但这个包有两个很大的局限性:

1) 我们总是让subprocess运行外部的程序,而不是运行一个Python脚本内部编写的函数。

2) 进程间只通过管道进行文本交流。以上限制了我们将subprocess包应用到更广泛的多进程任务。

   (这样的比较实际是不公平的,因为subprocessing本身就是设计成为一个shell,而不是一个多进程管理包)

threading和multiprocessing

(请尽量先阅读Python多线程与同步)

multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。

但在使用这些共享API的时候,我们要注意以下几点:

  • 在UNIX平台上,当某个进程终结之后,该进程需要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。对于多线程来说,由于只有一个进程,所以不存在此必要性。
  • multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占据的不是用户进程的资源)。
  • 多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。此时我们可以通过共享内存Manager的方法来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。

Process.PID中保存有PID,如果进程还没有start(),则PID为None。

我们可以从下面的程序中看到Thread对象和Process对象在使用上的相似性与结果上的不同。各个线程和进程都做一件事:打印PID。但问题是,所有的任务在打印的时候都会向同一个标准输出(stdout)输出。这样输出的字符会混合在一起,无法阅读。使用Lock同步,在一个任务输出完成之后,再允许另一个任务输出,可以避免多个任务同时向终端输出。

# Similarity and difference of multi thread vs. multi process
# Written by Vamei

import os
import threading
import multiprocessing

# worker function
def worker(sign, lock):
    lock.acquire()
    print(sign, os.getpid())
    lock.release()

# Main
print('Main:',os.getpid())

# Multi-thread
record = []
lock  = threading.Lock()
for i in range(5):
    thread = threading.Thread(target=worker,args=('thread',lock))
    thread.start()
    record.append(thread)

for thread in record:
    thread.join()

# Multi-process
record = []
lock = multiprocessing.Lock()
for i in range(5):
    process = multiprocessing.Process(target=worker,args=('process',lock))
    process.start()
    record.append(process)

for process in record:
    process.join()

所有Thread的PID都与主程序相同,而每个Process都有一个不同的PID。

(练习: 使用mutiprocessing包将Python多线程与同步中的多线程程序更改为多进程程序)

  Pipe和Queue

正如我们在Linux多线程中介绍的管道PIPE和消息队列message queue,multiprocessing包中有PipeQueue类来分别支持这两种IPC机制。Pipe和Queue可以用来传送常见的对象。

1) Pipe可以是单向(half-duplex),也可以是双向(duplex)。我们通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。一个进程从PIPE一端输入对象,然后被PIPE另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。

下面的程序展示了Pipe的使用:

# Multiprocessing with Pipe
# Written by Vamei

import multiprocessing as mul

def proc1(pipe):
    pipe.send('hello')
    print('proc1 rec:',pipe.recv())

def proc2(pipe):
    print('proc2 rec:',pipe.recv())
    pipe.send('hello, too')

# Build a pipe
pipe = mul.Pipe()

# Pass an end of the pipe to process 1
p1   = mul.Process(target=proc1, args=(pipe[0],))
# Pass the other end of the pipe to process 2
p2   = mul.Process(target=proc2, args=(pipe[1],))
p1.start()
p2.start()
p1.join()
p2.join()

这里的Pipe是双向的。

Pipe对象建立的时候,返回一个含有两个元素的表,每个元素代表Pipe的一端(Connection对象)。我们对Pipe的某一端调用send()方法来传送对象,在另一端使用recv()来接收。

2) Queue与Pipe相类似,都是先进先出的结构。但Queue允许多个进程放入,多个进程从队列取出对象。Queue使用mutiprocessing.Queue(maxsize)创建,maxsize表示队列中可以存放对象的最大数量。

下面的程序展示了Queue的使用:

# Written by Vamei
import os
import multiprocessing
import time
#==================
# input worker
def inputQ(queue):
    info = str(os.getpid()) + '(put):' + str(time.time())
    queue.put(info)

# output worker
def outputQ(queue,lock):
    info = queue.get()
    lock.acquire()
    print (str(os.getpid()) + '(get):' + info)
    lock.release()
#===================
# Main
record1 = []   # store input processes
record2 = []   # store output processes
lock  = multiprocessing.Lock()    # To prevent messy print
queue = multiprocessing.Queue(3)

# input processes
for i in range(10):
    process = multiprocessing.Process(target=inputQ,args=(queue,))
    process.start()
    record1.append(process)

# output processes
for i in range(10):
    process = multiprocessing.Process(target=outputQ,args=(queue,lock))
    process.start()
    record2.append(process)

for p in record1:
    p.join()

queue.close()  # No more object will come, close the queue

for p in record2:
    p.join()
一些进程使用put()在Queue中放入字符串,这个字符串中包含PID和时间。另一些进程从Queue中取出,并打印自己的PID以及get()的字符串

   进程池

进程池 (Process Pool)可以创建多个进程。这些进程就像是随时待命的士兵,准备执行任务(程序)。一个进程池中可以容纳多个待命的士兵。

比如下面的程序:
import multiprocessing as mul

def f(x):
    return x**2

pool = mul.Pool(5)
rel  = pool.map(f,[1,2,3,4,5,6,7,8,9,10])
print(rel)

我们创建了一个容许5个进程的进程池 (Process Pool) 。Pool运行的每个进程都执行f()函数。我们利用map()方法,将f()函数作用到表的每个元素上。这与built-in的map()函数类似,只是这里用5个进程并行处理。如果进程运行结束后,还有需要处理的元素,那么的进程会被用于重新运行f()函数。除了map()方法外,Pool还有下面的常用方法。

apply_async(func,args)  从进程池中取出一个进程执行func,args为func的参数。它将返回一个AsyncResult的对象,你可以对该对象调用get()方法以获得结果。

close()  进程池不再创建新的进程

join()   wait进程池中的全部进程。必须对Pool先调用close()方法才能join。

练习

有下面一个文件download.txt。

www.sina.com.cn
www.163.com
www.iciba.com
www.cnblogs.com
www.qq.com
www.douban.com

使用包含3个进程的进程池下载文件中网站的首页。(你可以使用subprocess调用wget或者curl等下载工具执行具体的下载任务)

共享资源

我们在Python多进程初步已经提到,我们应该尽量避免多进程共享资源。多进程共享资源必然会带来进程间相互竞争。而这种竞争又会造成race condition,我们的结果有可能被竞争的不确定性所影响。但如果需要,我们依然可以通过共享内存和Manager对象这么做。

共享内存

Linux进程间通信中,我们已经讲述了共享内存(shared memory)的原理,这里给出用Python实现的例子:

# modified from official documentation
import multiprocessing

def f(n, a):
    n.value   = 3.14
    a[0]      = 5

num   = multiprocessing.Value('d', 0.0)
arr   = multiprocessing.Array('i', range(10))

p = multiprocessing.Process(target=f, args=(num, arr))
p.start()
p.join()

print num.value
print arr[:]

这里我们实际上只有主进程和Process对象代表的进程。我们在主进程的内存空间中创建共享的内存,也就是ValueArray两个对象。对象Value被设置成为双精度数(d), 并初始化为0.0。而Array则类似于C中的数组,有固定的类型(i, 也就是整数)。在Process进程中,我们修改了Value和Array对象。回到主程序,打印出结果,主程序也看到了两个对象的改变,说明资源确实在两个进程之间共享。

 

Manager

Manager对象类似于服务器与客户之间的通信 (server-client),与我们在Internet上的活动很类似。我们用一个进程作为服务器,建立Manager来真正存放资源。其它的进程可以通过参数传递或者根据地址来访问Manager,建立连接后,操作服务器上的资源。在防火墙允许的情况下,我们完全可以将Manager运用于多计算机,从而模仿了一个真实的网络情境。下面的例子中,我们对Manager的使用类似于shared memory,但可以共享更丰富的对象类型。
import multiprocessing

def f(x, arr, l):
    x.value = 3.14
    arr[0] = 5
    l.append('Hello')

server = multiprocessing.Manager()
x    = server.Value('d', 0.0)
arr  = server.Array('i', range(10))
l    = server.list()

proc = multiprocessing.Process(target=f, args=(x, arr, l))
proc.start()
proc.join()

print(x.value)
print(arr)
print(l)
Manager利用list()方法提供了表的共享方式。实际上你可以利用dict()来共享词典,Lock()来共享threading.Lock(注意,我们共享的是threading.Lock,而不是进程的mutiprocessing.Lock。后者本身已经实现了进程共享)等。 这样Manager就允许我们共享更多样的对象。



目录
相关文章
|
7天前
|
Python
以下是一些常用的图表类型及其Python代码示例,使用Matplotlib和Seaborn库。
以下是一些常用的图表类型及其Python代码示例,使用Matplotlib和Seaborn库。
|
5天前
|
存储 Python
Python示例:分解一个不多于指定位的正整数
Python示例:分解一个不多于指定位的正整数
14 0
|
19天前
|
机器学习/深度学习 数据采集 算法
数据稀缺条件下的时间序列微分:符号回归(Symbolic Regression)方法介绍与Python示例
有多种方法可以处理时间序列数据中的噪声。本文将介绍一种在我们的研究项目中表现良好的方法,特别适用于时间序列概况中数据点较少的情况。
30 1
数据稀缺条件下的时间序列微分:符号回归(Symbolic Regression)方法介绍与Python示例
|
6天前
|
Python
Python编程的循环结构小示例(二)
Python编程的循环结构小示例(二)
|
10天前
|
负载均衡 Java 调度
探索Python的并发编程:线程与进程的比较与应用
本文旨在深入探讨Python中的并发编程,重点比较线程与进程的异同、适用场景及实现方法。通过分析GIL对线程并发的影响,以及进程间通信的成本,我们将揭示何时选择线程或进程更为合理。同时,文章将提供实用的代码示例,帮助读者更好地理解并运用这些概念,以提升多任务处理的效率和性能。
|
17天前
|
算法 调度 Python
探索操作系统的内核——一个简单的进程调度示例
【9月更文挑战第17天】在这篇文章中,我们将深入探讨操作系统的核心组件之一——进程调度。通过一个简化版的代码示例,我们将了解进程调度的基本概念、目的和实现方式。无论你是初学者还是有一定基础的学习者,这篇文章都将帮助你更好地理解操作系统中进程调度的原理和实践。
|
5天前
|
数据采集 Linux 调度
Python之多线程与多进程
Python之多线程与多进程
11 0
|
6天前
|
存储 Python
Python示例:分解一个不多于指定位的正整数
Python示例:分解一个不多于指定位的正整数
10 0
|
6天前
|
机器学习/深度学习 Python
Python编程的循环结构小示例(一)
Python编程的循环结构小示例(一)
15 0
|
9天前
|
存储 算法 Java
关于python3的一些理解(装饰器、垃圾回收、进程线程协程、全局解释器锁等)
该文章深入探讨了Python3中的多个重要概念,包括装饰器的工作原理、垃圾回收机制、进程与线程的区别及全局解释器锁(GIL)的影响等,并提供了详细的解释与示例代码。
15 0
下一篇
无影云桌面