单线程 #coding=utf-8 import threading from time import ctime,sleep def music(func): for i in range(2): print "listen music %s. %s" %(func,ctime()) sleep(1) def movie(func): for i in range(2): print "watch movie! %s" %(func,ctime()) sleep(5) if __name__ == '__main__': music(u'trouble is a friend') movie(u'变形金刚') print "all over %s" %ctime() 多线程 python提供了两个模块来实现多线程thread 和threading 。 thread有一些缺点,在threading 得到了弥补,直接使用threading。 #coding=utf-8 import threading from time import ctime,sleep def music(func): for i in range(2): print "listen music %s. %s" %(func,ctime()) sleep(1) def movie(func): for i in range(2): print "watch movie %s! %s" %(func,ctime()) sleep(5) threads = [] t1 = threading.Thread(target=music,args=(u'trouble is a friend',)) threads.append(t1) t2 = threading.Thread(target=movie,args=(u'变形金刚',)) threads.append(t2) if __name__ == '__main__': for t in threads: t.setDaemon(True) t.start() print "all over %s" %ctime() setDaemon(True)将线程声明为守护线程,必须在start() 方法调用之前设置,如果不设置为守护线程程序会被无限挂起。 子线程启动后,父线程也继续执行下去,当父线程执行完最后一条语句print "all over %s" %ctime()后,没有等待子线程,直接就退出了,同时子线程也一同结束。 start()开始线程活动。 调整程序: if __name__ == '__main__': for t in threads: t.setDaemon(True) t.start() t.join() print "all over %s" %ctime() 对上面的程序加了个join()方法,用于等待线程终止。join()的作用是,在子线程完成运行之前,这个子线程的父线程将一直被阻塞。 注意: join()方法的位置是在for循环外的,也就是说必须等待for循环里的两个进程都结束后,才去执行主进程。
import threading import time def worker(num): time.sleep(1) print("The num is %d" % num) print t.getName() return for i in range(20): t = threading.Thread(target=worker, args=(i,), name="testThread") t.start() Thread方法说明 t.start() : 激活线程, t.getName() : 获取线程的名称 t.setName() :设置线程的名称 t.name : 获取或设置线程的名称 t.is_alive() :判断线程是否为激活状态 t.isAlive() :判断线程是否为激活状态 t.setDaemon() 设置为后台线程或前台线程(默认:False);通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之后才可以使用。如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止 t.isDaemon() :判断是否为守护线程 t.ident:获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。 t.join() :逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义 t.run() :线程被cpu调度后自动执行线程对象的run方法 2、线程锁threading.RLock和threading.Lock 由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,CPU接着执行其他线程。为了保证数据的准确性,引入了锁的概念。 所以,可能出现如下问题: 例:假设列表A的所有元素就为0,当一个线程从前向后打印列表的所有元素,另外一个线程则从后向前修改列表的元素为1, 那么输出的时候,列表的元素就会一部分为0,一部分为1,这就导致了数据的不一致。锁的出现解决了这个问题。 import threading import time globals_num = 0 lock = threading.RLock() def func(): lock.acquire() # 获得锁 global globals_num globals_num += 1 time.sleep(1) print(globals_num) lock.release() # 释放锁 for i in range(10): t = threading.Thread(target=func) t.start() pass 3、threading.RLock和threading.Lock的区别 RLock允许在同一线程中被多次acquire。 而Lock却不允许这种情况。如果使用RLock,那么acquire和release必须成对出现, 即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐。 import threading lock = threading.Lock() #Lock对象 lock.acquire() lock.acquire() #产生了死琐。 lock.release() lock.release() import threading rLock = threading.RLock() #RLock对象 rLock.acquire() rLock.acquire() #在同一线程内,程序不会堵塞。 rLock.release() rLock.release() 4、threading.Event python线程的事件用于主线程控制其他线程的执行, 事件主要提供了三个方法 set、wait、clear。 事件处理的机制:全局定义了一个“Flag”, 如果“Flag”值为 False,那么当程序执行event.wait方法时就会阻塞, 如果“Flag”值为True,那么event.wait方法时便不再阻塞。 clear:将“Flag”设置为False set:将“Flag”设置为True Event.isSet() :判断标识位是否为Ture。 import threading def do(event): print('start') event.wait() print('execute') event_obj = threading.Event() for i in range(10): t = threading.Thread(target=do, args=(event_obj,)) t.start() event_obj.clear() # inp = input('input:') inp = raw_input('input:') if inp == 'true': event_obj.set() 当线程执行的时候,如果flag为False,则线程会阻塞,当flag为True的时候,线程不会阻塞。它提供了本地和远程的并发性。 5、threading.Condition 一个condition变量总是与某些类型的锁相联系,这个可以使用默认的情况或创建一个, 当几个condition变量必须共享和同一个锁的时候,是很有用的。锁是conditon对象的一部分:没有必要分别跟踪。 condition变量服从上下文管理协议:with语句块封闭之前可以获取与锁的联系。 acquire() 和 release() 会调用与锁相关联的相应的方法。 其他和锁关联的方法必须被调用,wait()方法会释放锁, 当另外一个线程使用 notify() or notify_all()唤醒它之前会一直阻塞。一旦被唤醒,wait()会重新获得锁并返回, Condition类实现了一个conditon变量。这个conditiaon变量允许一个或多个线程等待,直到他们被另一个线程通知。 如果lock参数,被给定一个非空的值,,那么他必须是一个lock或者Rlock对象,它用来做底层锁。否则,会创建一个新的Rlock对象,用来做底层锁。 wait(timeout=None) :等待通知,或者等到设定的超时时间。 当调用这wait()方法时,如果调用它的线程没有得到锁,那么会抛出一个RuntimeError异常。 wati()释放锁以后,在被调用相同条件的另一个进程用notify() or notify_all() 叫醒之前会一直阻塞。 wait()还可以指定一个超时时间。 如果有等待的线程,notify()方法会唤醒一个在等待conditon变量的线程。notify_all() 则会唤醒所有在等待conditon变量的线程。 注意: notify()和notify_all()不会释放锁,也就是说,线程被唤醒后不会立刻返回他们的wait() 调用。 除非线程调用notify()和notify_all()之后放弃了锁的所有权。 在典型的设计风格里,利用condition变量用锁去通许访问一些共享状态,线程在获取到它想得到的状态前,会反复调用wait()。修改状态的线程在他们状态改变时调用 notify() or notify_all(),用这种方式,线程会尽可能的获取到想要的一个等待者状态。 例子:生产者-消费者模型, import threading import time def consumer(cond): with cond: print("consumer before wait") cond.wait() print("consumer after wait") def producer(cond): with cond: print("producer before notifyAll") cond.notifyAll() print("producer after notifyAll") condition = threading.Condition() c1 = threading.Thread(name="c1", target=consumer, args=(condition,)) c2 = threading.Thread(name="c2", target=consumer, args=(condition,)) p = threading.Thread(name="p", target=producer, args=(condition,)) c1.start() time.sleep(2) c2.start() time.sleep(2) p.start() 在使用并发设计的时候最好尽可能的避免共享数据,尤其是在使用多进程的时候。 如果你真有需要要共享数据, multiprocessing提供了两种方式。 (1)multiprocessing,Array,Value 数据可以用Value或Array存储在一个共享内存地图里,如下: from multiprocessing import Array, Value, Process def func(a, b): a.value = 3.333333333333333 for j in range(len(b)): b[j] = -b[j] if __name__ == "__main__": num = Value('d', 0.0) arr = Array('i', range(11)) if 0: a = Process(target=func, args=(num, arr)) a.start() a.join() else: c = Process(target=func, args=(num, arr)) d = Process(target=func, args=(num, arr)) c.start() d.start() c.join() d.join() print(num.value) for i in arr: print i, 输出: 3.1415927 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9] 创建num和arr时,“d”和“i”参数由Array模块使用的typecodes创建: “d”表示一个双精度的浮点数,“i”表示一个有符号的整数,这些共享对象将被线程安全的处理。 Array(‘i’, range(10))中的‘i’参数: ‘c’: ctypes.c_char ‘u’: ctypes.c_wchar ‘b’: ctypes.c_byte ‘B’: ctypes.c_ubyte ‘h’: ctypes.c_short ‘H’: ctypes.c_ushort ‘i’: ctypes.c_int ‘I’: ctypes.c_uint ‘l’: ctypes.c_long, ‘L’: ctypes.c_ulong ‘f’: ctypes.c_float ‘d’: ctypes.c_double (2)multiprocessing,Manager 由Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array类型的支持。 from multiprocessing import Process, Manager def f(d, l): d["name"] = "king" d["age"] = 100 d["Job"] = "python" l.reverse() if __name__ == "__main__": with Manager() as man: d_temp = man.dict() l_temp = man.list(range(10)) p = Process(target=f, args=(d_temp, l_temp)) p.start() p.join() print(d_temp) print(l_temp) Server process manager比 shared memory 更灵活,因为它可以支持任意的对象类型。 另外,一个单独的manager可以通过进程在网络上不同的计算机之间共享,不过他比shared memory要慢。 2、进程池(Using a pool of workers) Pool类描述了一个工作进程池,他有几种不同的方法让任务卸载工作进程。 进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程, 如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。 我们可以用Pool类创建一个进程池,展开提交的任务给进程池。 例: #apply from multiprocessing import Pool import time def f1(arg): time.sleep(0.5) print(arg) return arg + 100 if __name__ == "__main__": pool = Pool(5) for i in range(1, 10): pool.apply(func=f1, args=(i,)) #apply_async from multiprocessing import Pool def f1(i): time.sleep(1) print(i) return i + 100 def f2(arg): print(arg) if __name__ == "__main__": pool = Pool(5) for i in range(1, 10): pool.apply_async(func=f1, args=(i,), callback=f2) pool.close() pool.join() 一个进程池对象可以控制工作进程池的哪些工作可以被提交,它支持超时和回调的异步结果,有一个类似map的实现。 processes :使用的工作进程的数量,如果processes是None那么使用os.cpu_count()返回的数量。 initializer:如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。 maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个心的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。 context: 用在制定工作进程启动时的上下文,一般使用multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context 注意:Pool对象的方法只可以被创建pool的进程所调用。 进程池的方法 apply(func[, args[, kwds]]) :使用arg和kwds参数调用func函数,结果返回前会一直阻塞, 由于这个原因,apply_async()更适合并发执行,另外,func函数仅被pool中的一个进程运行。 apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一个变体,会返回一个结果对象。 如果callback被指定,那么callback可以接收一个参数然后被调用,当结果准备好回调时会调用callback,调用失败时, 则用error_callback替换callback。 Callbacks应被立即完成,否则处理结果的线程会被阻塞。 close() :阻止更多的任务提交到pool,待任务完成后,工作进程会退出。 terminate() :不管任务是否完成,立即停止工作进程。在对pool对象进程垃圾回收的时候,会立即调用terminate()。 join() : wait工作线程的退出,在调用join()前,必须调用close() or terminate()。 这样是因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。 map(func, iterable[, chunksize]) map_async(func, iterable[, chunksize[, callback[, error_callback]]]) imap(func, iterable[, chunksize]) imap_unordered(func, iterable[, chunksize]) starmap(func, iterable[, chunksize]) starmap_async(func, iterable[, chunksize[, callback[, error_back]]]) python 协程 线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。 协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。 协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。 协程的适用场景:当程序中存在大量不需要CPU的操作时(IO), event loop是协程执行的控制点,如果你希望执行协程,就需要用到它们。 event loop提供了如下的特性: 注册、执行、取消延时调用(异步函数) 创建用于通信的client和server协议(工具) 创建和别的程序通信的子进程和协议(工具) 把函数调用送入线程池中 协程示例: #---------python3_start--------------- import asyncio async def cor1(): print("COR1 start") await cor2() print("COR1 end") async def cor2(): print("COR2") loop = asyncio.get_event_loop() loop.run_until_complete(cor1()) loop.close() #---------python3_end--------------- 最后三行是重点。 asyncio.get_event_loop() : asyncio启动默认的event loop run_until_complete() : 这个函数是阻塞执行的,知道所有的异步函数执行完成, close() : 关闭event loop。 1、greenlet import greenlet def fun1(): print("12") gr2.switch() print("56") gr2.switch() def fun2(): print("34") gr1.switch() print("78") gr1 = greenlet.greenlet(fun1) gr2 = greenlet.greenlet(fun2) gr1.switch() 2、gevent gevent属于第三方模块需要下载安装包 pip3 install --upgrade pip3 pip3 install gevent import gevent def fun1(): print("www.baidu.com") # 第一步 gevent.sleep(0) print("end the baidu.com") # 第三步 def fun2(): print("www.zhihu.com") # 第二步 gevent.sleep(0) print("end th zhihu.com") # 第四步 gevent.joinall([ gevent.spawn(fun1), gevent.spawn(fun2), ]) 遇到IO操作自动切换: import gevent import requests def func(url): print("get: %s" % url) gevent.sleep(0) proxies = { "http": "", "https": "", } date = requests.get(url, proxies=proxies) ret = date.text print(url, len(ret)) gevent.joinall([ gevent.spawn(func, 'https://www.baidu.com/'), gevent.spawn(func, 'http://www.sina.com.cn/'), gevent.spawn(func, 'http://www.qq.com/'), ])
import thread import time def print_time(thread_name, delay): count = 0 while count < 5: time.sleep(delay) count += 1 print "%s: %s" % (thread_name, time.ctime(time.time())) if __name__ == "__main__": try: thread.start_new_thread(print_time, ("Thread-1", 2)) thread.start_new_thread(print_time, ("Thread-2", 4)) except BaseException as e: print e print "Error: unable to start thread" while 1: pass
# coding=utf-8 # !/usr/bin/python import threading import time exitFlag = 0 class myThread(threading.Thread): def __init__(self, threadID, name, counter): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.counter = counter def run(self): print "Starting " + self.name print_time(self.name, self.counter, 5) print "Exiting " + self.name def print_time(threadName, delay, counter): while counter: if exitFlag: thread.exit() time.sleep(delay) print "%s: %s" % (threadName, time.ctime(time.time())) counter -= 1 thread1 = myThread(1, "Thread-1", 1) thread2 = myThread(2, "Thread-2", 2) thread1.start() thread2.start() print "Exiting Main Thread"
# coding=utf-8 # !/usr/bin/python import threading import time class myThread(threading.Thread): def __init__(self, threadID, name, counter): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.counter = counter def run(self): print "Starting " + self.name # 获得锁,成功获得锁定后返回True # 可选的timeout参数不填时将一直阻塞直到获得锁定 # 否则超时后将返回False threadLock.acquire() print_time(self.name, self.counter, 3) # 释放锁 threadLock.release() def print_time(threadName, delay, counter): while counter: time.sleep(delay) print "%s: %s" % (threadName, time.ctime(time.time())) counter -= 1 threadLock = threading.Lock() threads = [] # 创建新线程 thread1 = myThread(1, "Thread-1", 1) thread2 = myThread(2, "Thread-2", 2) # 开启新线程 thread1.start() thread2.start() # 添加线程到线程列表中 threads.append(thread1) threads.append(thread2) # 等待所有线程完成 for t in threads: t.join() print "Exiting Main Thread"线程优先级队列( Queue)
Queue.qsize() 返回队列的大小
Queue.empty() 如果队列为空,返回True,反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.full 与 maxsize 大小对应
Queue.get([block[, timeout]])获取队列,timeout等待时间
Queue.get_nowait() 相当Queue.get(False)
Queue.put(item) 写入队列,timeout等待时间
Queue.put_nowait(item) 相当Queue.put(item, False)
Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
Queue.join() 实际上意味着等到队列为空,再执行别的操作
# coding=utf-8 # !/usr/bin/python import Queue import threading import time exitFlag = 0 class myThread(threading.Thread): def __init__(self, threadID, name, q): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.q = q def run(self): print "Starting " + self.name process_data(self.name, self.q) print "Exiting " + self.name def process_data(threadName, q): while not exitFlag: queueLock.acquire() if not workQueue.empty(): data = q.get() queueLock.release() print "%s processing %s" % (threadName, data) else: queueLock.release() time.sleep(1) threadList = ["Thread-1", "Thread-2", "Thread-3"] nameList = ["One", "Two", "Three", "Four", "Five"] queueLock = threading.Lock() workQueue = Queue.Queue(10) threads = [] threadID = 1 # 创建线程 for tName in threadList: thread = myThread(threadID, tName, workQueue) thread.start() threads.append(thread) threadID += 1 # 填充队列 queueLock.acquire() for word in nameList: workQueue.put(word) queueLock.release() # 等待队列清空 while not workQueue.empty(): pass # 通知线程退出 exitFlag = 1 # 等待所有线程完成 for t in threads: t.join() print "Exiting Main Thread"
from multiprocessing import Process, Manager import os manager = Manager() temp_list = [] manager_list = manager.list() manager_value = manager.Value('i', 0) def test_func(cc): temp_list.append(cc) manager_list.append(cc) manager_value.value += cc print 'process id:', os.getpid() if __name__ == '__main__': threads = [] for ll in range(10): t = Process(target=test_func, args=(ll,)) t.daemon = True threads.append(t) for i in range(len(threads)): threads[i].start() for j in range(len(threads)): threads[j].join() print "------------------------" print 'process id:', os.getpid() print temp_list print manager_list print manager_value
from multiprocessing import Process, Manager import time manager = Manager() manager_value = manager.Value('i', 0) def test_func(cc, seconds): while True: manager_value.value += cc print manager_value.value time.sleep(seconds) if __name__ == '__main__': threads = [Process(target=test_func, args=(1, i+1)) for i in range(5)] map(lambda x: x.start(), threads) map(lambda x: x.join(), threads) pass
import multiprocessing def worker(num): """thread worker function""" print 'Worker:', num return if __name__ == '__main__': jobs = [] for i in range(5): p = multiprocessing.Process(target=worker, args=(i,)) jobs.append(p) p.start()
import multiprocessing import time def worker(): name = multiprocessing.current_process().name print name, 'Starting' time.sleep(2) print name, 'Exiting' def my_service(): name = multiprocessing.current_process().name print name, 'Starting' time.sleep(3) print name, 'Exiting' if __name__ == '__main__': service = multiprocessing.Process(name='my_service', target=my_service) worker_1 = multiprocessing.Process(name='worker 1', target=worker) worker_2 = multiprocessing.Process(target=worker) # default name worker_1.start() worker_2.start() service.start()
import multiprocessing import time import sys def daemon(): name = multiprocessing.current_process().name print 'Starting:', name time.sleep(2) print 'Exiting :', name def non_daemon(): name = multiprocessing.current_process().name print 'Starting:', name print 'Exiting :', name if __name__ == '__main__': d = multiprocessing.Process(name='daemon', target=daemon) d.daemon = True n = multiprocessing.Process(name='non-daemon', target=non_daemon) n.daemon = False d.start() n.start() d.join(1) print 'd.is_alive()', d.is_alive() n.join()
最好使用 poison pill,强制的使用terminate()
注意 terminate之后要join,使其可以更新状态
import multiprocessing import time def slow_worker(): print 'Starting worker' time.sleep(0.1) print 'Finished worker' if __name__ == '__main__': p = multiprocessing.Process(target=slow_worker) print 'BEFORE:', p, p.is_alive() p.start() print 'DURING:', p, p.is_alive() p.terminate() print 'TERMINATED:', p, p.is_alive() p.join() print 'JOINED:', p, p.is_alive()
import multiprocessing import sys import time def exit_error(): sys.exit(1) def exit_ok(): return def return_value(): return 1 def raises(): raise RuntimeError('There was an error!') def terminated(): time.sleep(3) if __name__ == '__main__': jobs = [] for f in [exit_error, exit_ok, return_value, raises, terminated]: print 'Starting process for', f.func_name j = multiprocessing.Process(target=f, name=f.func_name) jobs.append(j) j.start() jobs[-1].terminate() for j in jobs: j.join() print '%15s.exitcode = %s' % (j.name, j.exitcode)
import multiprocessing import logging import sys def worker(): print 'Doing some work' sys.stdout.flush() if __name__ == '__main__': multiprocessing.log_to_stderr() logger = multiprocessing.get_logger() logger.setLevel(logging.INFO) p = multiprocessing.Process(target=worker) p.start() p.join()
import multiprocessing class Worker(multiprocessing.Process): def run(self): print 'In %s' % self.name return if __name__ == '__main__': jobs = [] for i in range(5): p = Worker() jobs.append(p) p.start() for j in jobs: j.join()
import multiprocessing class MyFancyClass(object): def __init__(self, name): self.name = name def do_something(self): proc_name = multiprocessing.current_process().name print 'Doing something fancy in %s for %s!' % \ (proc_name, self.name) def worker(q): obj = q.get() obj.do_something() if __name__ == '__main__': queue = multiprocessing.Queue() p = multiprocessing.Process(target=worker, args=(queue,)) p.start() queue.put(MyFancyClass('Fancy Dan')) # Wait for the worker to finish queue.close() queue.join_thread() p.join() import multiprocessing import time class Consumer(multiprocessing.Process): def __init__(self, task_queue, result_queue): multiprocessing.Process.__init__(self) self.task_queue = task_queue self.result_queue = result_queue def run(self): proc_name = self.name while True: next_task = self.task_queue.get() if next_task is None: # Poison pill means shutdown print '%s: Exiting' % proc_name self.task_queue.task_done() break print '%s: %s' % (proc_name, next_task) answer = next_task() self.task_queue.task_done() self.result_queue.put(answer) return class Task(object): def __init__(self, a, b): self.a = a self.b = b def __call__(self): time.sleep(0.1) # pretend to take some time to do the work return '%s * %s = %s' % (self.a, self.b, self.a * self.b) def __str__(self): return '%s * %s' % (self.a, self.b) if __name__ == '__main__': # Establish communication queues tasks = multiprocessing.JoinableQueue() results = multiprocessing.Queue() # Start consumers num_consumers = multiprocessing.cpu_count() * 2 print 'Creating %d consumers' % num_consumers consumers = [ Consumer(tasks, results) for i in xrange(num_consumers) ] for w in consumers: w.start() # Enqueue jobs num_jobs = 10 for i in xrange(num_jobs): tasks.put(Task(i, i)) # Add a poison pill for each consumer for i in xrange(num_consumers): tasks.put(None) # Wait for all of the tasks to finish tasks.join() # Start printing results while num_jobs: result = results.get() print 'Result:', result num_jobs -= 1
import multiprocessing import time def wait_for_event(e): """Wait for the event to be set before doing anything""" print 'wait_for_event: starting' e.wait() print 'wait_for_event: e.is_set()->', e.is_set() def wait_for_event_timeout(e, t): """Wait t seconds and then timeout""" print 'wait_for_event_timeout: starting' e.wait(t) print 'wait_for_event_timeout: e.is_set()->', e.is_set() if __name__ == '__main__': e = multiprocessing.Event() w1 = multiprocessing.Process(name='block', target=wait_for_event, args=(e,)) w1.start() w2 = multiprocessing.Process(name='nonblock', target=wait_for_event_timeout, args=(e, 2)) w2.start() print 'main: waiting before calling Event.set()' time.sleep(3) e.set() print 'main: event is set'
import multiprocessing import time def func(msg): for i in xrange(3): print msg time.sleep(1) if __name__ == "__main__": p = multiprocessing.Process(target=func, args=("hello", )) p.start() p.join() print "Sub-process done."
import multiprocessing import time def func(msg): for i in xrange(3): print msg time.sleep(1) if __name__ == "__main__": pool = multiprocessing.Pool(processes=4) for i in xrange(10): msg = "hello %d" %(i) pool.apply_async(func, (msg, )) pool.close() pool.join() print "Sub-process(es) done."
#coding: utf-8 import multiprocessing import time def func(msg): print "msg:", msg time.sleep(3) print "end" if __name__ == "__main__": pool = multiprocessing.Pool(processes = 3) for i in xrange(4): msg = "hello %d" %(i) pool.apply(func, (msg, )) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~" pool.close() pool.join() #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束 print "Sub-process(es) done."
import multiprocessing import time def func(msg): for i in xrange(3): print msg time.sleep(1) return "done " + msg if __name__ == "__main__": pool = multiprocessing.Pool(processes=4) result = [] for i in xrange(10): msg = "hello %d" %(i) result.append(pool.apply_async(func, (msg, ))) pool.close() pool.join() for res in result: print res.get() print "Sub-process(es) done."
import multiprocessing def do_calculation(data): return data*2 def start_process(): print 'Starting', multiprocessing.current_process().name if __name__ == '__main__': inputs = list(range(10)) print 'Inputs :', inputs builtin_output = map(do_calculation, inputs) print 'Build-In :', builtin_output pool_size = multiprocessing.cpu_count()*2 pool = multiprocessing.Pool(processes=pool_size, initializer=start_process,) # 默认情况下,Pool会创建固定数目的工作进程,并向这些工作进程传递作业,直到再没有更多作业为止。 # maxtasksperchild参数为每个进程执行task的最大数目, # 设置maxtasksperchild参数可以告诉池在完成一定数量任务之后重新启动一个工作进程, # 来避免运行时间很长的工作进程消耗太多的系统资源。 # pool = multiprocessing.Pool(processes=pool_size, initializer=start_process, maxtasksperchild=2) print '-' * 20 pool_outputs = pool.map(do_calculation, inputs) pool.close() pool.join() print 'Pool :', pool_outputs
#coding: utf-8 import multiprocessing import os, time, random def Lee(): print "\nRun task Lee-%s" %(os.getpid()) #os.getpid()获取当前的进程的ID start = time.time() time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数 end = time.time() print 'Task Lee, runs %0.2f seconds.' %(end - start) def Marlon(): print "\nRun task Marlon-%s" %(os.getpid()) start = time.time() time.sleep(random.random() * 40) end=time.time() print 'Task Marlon runs %0.2f seconds.' %(end - start) def Allen(): print "\nRun task Allen-%s" %(os.getpid()) start = time.time() time.sleep(random.random() * 30) end = time.time() print 'Task Allen runs %0.2f seconds.' %(end - start) def Frank(): print "\nRun task Frank-%s" %(os.getpid()) start = time.time() time.sleep(random.random() * 20) end = time.time() print 'Task Frank runs %0.2f seconds.' %(end - start) if __name__=='__main__': function_list= [Lee, Marlon, Allen, Frank] print "parent process %s" %(os.getpid()) pool=multiprocessing.Pool(4) for func in function_list: pool.apply_async(func) #Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中 print 'Waiting for all subprocesses done...' pool.close() pool.join() #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束 print 'All subprocesses done.'multiprocessing pool map
#coding: utf-8 import multiprocessing def m1(x): print x * x if __name__ == '__main__': pool = multiprocessing.Pool(multiprocessing.cpu_count()) i_list = range(8) pool.map(m1, i_list)
#coding: utf-8 import multiprocessing import logging def create_logger(i): print i class CreateLogger(object): def __init__(self, func): self.func = func if __name__ == '__main__': ilist = range(10) cl = CreateLogger(create_logger) pool = multiprocessing.Pool(multiprocessing.cpu_count()) pool.map(cl.func, ilist) print "hello------------>"
Python 多进程 multiprocessing.Pool类详解
multiprocessing包是Python中的多进程管理包。它与 threading.Thread类似,可以利用multiprocessing.Process对象来创建一个进程。该进程可以允许放在Python程序内部编写的函数中。该Process对象与Thread对象的用法相同,拥有is_alive()、join([timeout])、run()、start()、terminate()等方法。属性有:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类,用来同步进程,其用法也与threading包中的同名类一样。multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。
__init__(self, group=None, target=None, name=None, args=(), kwargs={})
#coding=utf-8 import multiprocessing def do(n) : #获取当前线程的名字 name = multiprocessing.current_process().name print name,'starting' print "worker ", n return if __name__ == '__main__' : numList = [] for i in xrange(5) : p = multiprocessing.Process(target=do, args=(i,)) numList.append(p) p.start() p.join() print "Process end."执行结果:
Process-1 starting worker 0 Process end. Process-2 starting worker 1 Process end. Process-3 starting worker 2 Process end. Process-4 starting worker 3 Process end. Process-5 starting worker 4 Process end.
在Windows上要想使用进程模块,就必须把有关进程的代码写在当前.py文件的if __name__ == ‘__main__’ :语句的下面,才能正常使用Windows下的进程模块。Unix/Linux下则不需要。
下面介绍一下multiprocessing 模块下的Pool类下的几个方法
import time from multiprocessing import Pool def run(fn): #fn: 函数参数是数据列表的一个元素 time.sleep(1) return fn*fn if __name__ == "__main__": testFL = [1,2,3,4,5,6] print 'shunxu:' #顺序执行(也就是串行执行,单进程) s = time.time() for fn in testFL: run(fn) e1 = time.time() print "顺序执行时间:", int(e1 - s) print 'concurrent:' #创建多个进程,并行执行 pool = Pool(5) #创建拥有5个进程数量的进程池 #testFL:要处理的数据列表,run:处理testFL列表中数据的函数 rl =pool.map(run, testFL) pool.close()#关闭进程池,不再接受新的进程 pool.join()#主进程阻塞等待子进程的退出 e2 = time.time() print "并行执行时间:", int(e2-e1) print rl执行结果:
shunxu: 顺序执行时间: 6 concurrent: 并行执行时间: 2 [1, 4, 9, 16, 25, 36]
import time from multiprocessing import Pool def run(fn) : time.sleep(2) print fn if __name__ == "__main__" : startTime = time.time() testFL = [1,2,3,4,5] pool = Pool(10)#可以同时跑10个进程 pool.map(run,testFL) pool.close() pool.join() endTime = time.time() print "time :", endTime - startTime执行结果:
21 3 4 5 time : 2.51999998093再次执行结果如下:
1 34 2 5 time : 2.48600006104
import os import time from multiprocessing import Pool def getFile(path) : #获取目录下的文件list fileList = [] for root, dirs, files in list(os.walk(path)) : for i in files : if i.endswith('.txt') or i.endswith('.10w') : fileList.append(root + "\\" + i) return fileList def operFile(filePath) : #统计每个文件中行数和字符数,并返回 filePath = filePath fp = open(filePath) content = fp.readlines() fp.close() lines = len(content) alphaNum = 0 for i in content : alphaNum += len(i.strip('\n')) return lines,alphaNum,filePath def out(list1, writeFilePath) : #将统计结果写入结果文件中 fileLines = 0 charNum = 0 fp = open(writeFilePath,'a') for i in list1 : fp.write(i[2] + " 行数:"+ str(i[0]) + " 字符数:"+str(i[1]) + "\n") fileLines += i[0] charNum += i[1] fp.close() print fileLines, charNum if __name__ == "__main__": #创建多个进程去统计目录中所有文件的行数和字符数 startTime = time.time() filePath = "C:\\wcx\\a" fileList = getFile(filePath) pool = Pool(5) resultList =pool.map(operFile, fileList) pool.close() pool.join() writeFilePath = "c:\\wcx\\res.txt" print resultList out(resultList, writeFilePath) endTime = time.time() print "used time is ", endTime - startTime
1) 我们总是让subprocess运行外部的程序,而不是运行一个Python脚本内部编写的函数。
2) 进程间只通过管道进行文本交流。以上限制了我们将subprocess包应用到更广泛的多进程任务。
multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。
- 在UNIX平台上,当某个进程终结之后,该进程需要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。对于多线程来说,由于只有一个进程,所以不存在此必要性。
- multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占据的不是用户进程的资源)。
- 多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。此时我们可以通过共享内存和Manager的方法来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。
# Similarity and difference of multi thread vs. multi process # Written by Vamei import os import threading import multiprocessing # worker function def worker(sign, lock): lock.acquire() print(sign, os.getpid()) lock.release() # Main print('Main:',os.getpid()) # Multi-thread record = [] lock = threading.Lock() for i in range(5): thread = threading.Thread(target=worker,args=('thread',lock)) thread.start() record.append(thread) for thread in record: thread.join() # Multi-process record = [] lock = multiprocessing.Lock() for i in range(5): process = multiprocessing.Process(target=worker,args=('process',lock)) process.start() record.append(process) for process in record: process.join()
(练习: 使用mutiprocessing包将Python多线程与同步中的多线程程序更改为多进程程序)
正如我们在Linux多线程中介绍的管道PIPE和消息队列message queue,multiprocessing包中有Pipe类和Queue类来分别支持这两种IPC机制。Pipe和Queue可以用来传送常见的对象。
1) Pipe可以是单向(half-duplex),也可以是双向(duplex)。我们通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。一个进程从PIPE一端输入对象,然后被PIPE另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。
# Multiprocessing with Pipe # Written by Vamei import multiprocessing as mul def proc1(pipe): pipe.send('hello') print('proc1 rec:',pipe.recv()) def proc2(pipe): print('proc2 rec:',pipe.recv()) pipe.send('hello, too') # Build a pipe pipe = mul.Pipe() # Pass an end of the pipe to process 1 p1 = mul.Process(target=proc1, args=(pipe[0],)) # Pass the other end of the pipe to process 2 p2 = mul.Process(target=proc2, args=(pipe[1],)) p1.start() p2.start() p1.join() p2.join()
2) Queue与Pipe相类似,都是先进先出的结构。但Queue允许多个进程放入,多个进程从队列取出对象。Queue使用mutiprocessing.Queue(maxsize)创建,maxsize表示队列中可以存放对象的最大数量。
# Written by Vamei import os import multiprocessing import time #================== # input worker def inputQ(queue): info = str(os.getpid()) + '(put):' + str(time.time()) queue.put(info) # output worker def outputQ(queue,lock): info = queue.get() lock.acquire() print (str(os.getpid()) + '(get):' + info) lock.release() #=================== # Main record1 = [] # store input processes record2 = [] # store output processes lock = multiprocessing.Lock() # To prevent messy print queue = multiprocessing.Queue(3) # input processes for i in range(10): process = multiprocessing.Process(target=inputQ,args=(queue,)) process.start() record1.append(process) # output processes for i in range(10): process = multiprocessing.Process(target=outputQ,args=(queue,lock)) process.start() record2.append(process) for p in record1: p.join() queue.close() # No more object will come, close the queue for p in record2: p.join()一些进程使用put()在Queue中放入字符串,这个字符串中包含PID和时间。另一些进程从Queue中取出,并打印自己的PID以及get()的字符串
进程池 (Process Pool)可以创建多个进程。这些进程就像是随时待命的士兵,准备执行任务(程序)。一个进程池中可以容纳多个待命的士兵。
比如下面的程序:import multiprocessing as mul def f(x): return x**2 pool = mul.Pool(5) rel = pool.map(f,[1,2,3,4,5,6,7,8,9,10]) print(rel)
我们创建了一个容许5个进程的进程池 (Process Pool) 。Pool运行的每个进程都执行f()函数。我们利用map()方法,将f()函数作用到表的每个元素上。这与built-in的map()函数类似,只是这里用5个进程并行处理。如果进程运行结束后,还有需要处理的元素,那么的进程会被用于重新运行f()函数。除了map()方法外,Pool还有下面的常用方法。
apply_async(func,args) 从进程池中取出一个进程执行func,args为func的参数。它将返回一个AsyncResult的对象,你可以对该对象调用get()方法以获得结果。
close() 进程池不再创建新的进程
join() wait进程池中的全部进程。必须对Pool先调用close()方法才能join。
我们在Python多进程初步已经提到,我们应该尽量避免多进程共享资源。多进程共享资源必然会带来进程间相互竞争。而这种竞争又会造成race condition,我们的结果有可能被竞争的不确定性所影响。但如果需要,我们依然可以通过共享内存和Manager对象这么做。
在Linux进程间通信中,我们已经讲述了共享内存(shared memory)的原理,这里给出用Python实现的例子:
# modified from official documentation import multiprocessing def f(n, a): n.value = 3.14 a[0] = 5 num = multiprocessing.Value('d', 0.0) arr = multiprocessing.Array('i', range(10)) p = multiprocessing.Process(target=f, args=(num, arr)) p.start() p.join() print num.value print arr[:]
这里我们实际上只有主进程和Process对象代表的进程。我们在主进程的内存空间中创建共享的内存,也就是Value和Array两个对象。对象Value被设置成为双精度数(d), 并初始化为0.0。而Array则类似于C中的数组,有固定的类型(i, 也就是整数)。在Process进程中,我们修改了Value和Array对象。回到主程序,打印出结果,主程序也看到了两个对象的改变,说明资源确实在两个进程之间共享。
Manager对象类似于服务器与客户之间的通信 (server-client),与我们在Internet上的活动很类似。我们用一个进程作为服务器,建立Manager来真正存放资源。其它的进程可以通过参数传递或者根据地址来访问Manager,建立连接后,操作服务器上的资源。在防火墙允许的情况下,我们完全可以将Manager运用于多计算机,从而模仿了一个真实的网络情境。下面的例子中,我们对Manager的使用类似于shared memory,但可以共享更丰富的对象类型。import multiprocessing def f(x, arr, l): x.value = 3.14 arr[0] = 5 l.append('Hello') server = multiprocessing.Manager() x = server.Value('d', 0.0) arr = server.Array('i', range(10)) l = server.list() proc = multiprocessing.Process(target=f, args=(x, arr, l)) proc.start() proc.join() print(x.value) print(arr) print(l)Manager利用list()方法提供了表的共享方式。实际上你可以利用dict()来共享词典,Lock()来共享threading.Lock(注意,我们共享的是threading.Lock,而不是进程的mutiprocessing.Lock。后者本身已经实现了进程共享)等。 这样Manager就允许我们共享更多样的对象。