- 进程和线程
在操作系统看来,一个任务就是一个进程,而一个进程内部如果要做多个任务就是有多个线程。一个进程至少有一个线程。
真正的并行执行任务是由多个CUP分别执行任务,实际中是由,操作系统轮流让各个任务交替执行,任务1执行0.01秒,任务2执行0.01秒,之后再依次切换。
Python中支持两种模式:
多进程模式
多线程模式
- 多进程
Linux操作系统下,提供了一个fork()系统调用。调用一次fork(),返回两次,因为操作系统自动把当前的进程(作为父进程)复制了一份(称为子进程),然后子进程返回0,父进程返回子进程的ID。
# multiprocessing.py import os print 'Process (%s) start...' % os.getpid() pid = os.fork() if pid==0: print 'I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()) else: print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)
Process (876) start... I (876) just created a child process (877). I am child process (877) and my parent is 876.
由于windows下没有fork()调用,提供了multiprocessing模块进行跨平台版本的多进程模块。
用Process类代表创建进程对象,传入一个执行函数和函数的参数。之后再用start()方法启动,jion()方法可以等待子进程结束后再继续往下进行,通常用于进程间的同步。
from multiprocessing import Process import os # 子进程要执行的代码 def run_proc(name): print 'Run child process %s (%s)...' % (name, os.getpid()) if __name__=='__main__': print 'Parent process %s.' % os.getpid() p = Process(target=run_proc, args=('test',)) print 'Process will start.' p.start() p.join() print 'Process end.'
Parent process 928. Process will start. Run child process test (929)... Process end.
Pool进程池创建多个子进程
对Pool对象创建多个子进程后,用close()方法结束创建,再用join()方法等待所有子进程执行完毕。在每个子进程中会随机休眠一段时间,其他的子进程在这段休眠时间里就会调用。
from multiprocessing import Pool import os, time, random def long_time_task(name): print 'Run task %s (%s)...' % (name, os.getpid()) start = time.time() time.sleep(random.random() * 3) end = time.time() print 'Task %s runs %0.2f seconds.' % (name, (end - start)) if __name__=='__main__': print 'Parent process %s.' % os.getpid() p = Pool() for i in range(5): p.apply_async(long_time_task, args=(i,)) print 'Waiting for all subprocesses done...' p.close() p.join() print 'All subprocesses done.'
Parent process 669. Waiting for all subprocesses done... Run task 0 (671)... Run task 1 (672)... Run task 2 (673)... Run task 3 (674)... Task 2 runs 0.14 seconds. Run task 4 (673)... Task 1 runs 0.27 seconds. Task 3 runs 0.86 seconds. Task 0 runs 1.41 seconds. Task 4 runs 1.91 seconds. All subprocesses done.
进程间通信
Python的miltiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。
在父进程中创建两个子进程,一个往Queue中写数据,一个从Queue里读数据。
from multiprocessing import Process, Queue import os, time, random # 写数据进程执行的代码: def write(q): for value in ['A', 'B', 'C']: print 'Put %s to queue...' % value q.put(value) time.sleep(random.random()) # 读数据进程执行的代码: def read(q): while True: value = q.get(True) print 'Get %s from queue.' % value if __name__=='__main__': # 父进程创建Queue,并传给各个子进程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 启动子进程pw,写入: pw.start() # 启动子进程pr,读取: pr.start() # 等待pw结束: pw.join() # pr进程里是死循环,无法等待其结束,只能强行终止: pr.terminate()
Put A to queue... Get A from queue. Put B to queue... Get B from queue. Put C to queue... Get C from queue.
- 多线程
Python中提供两个模块,thread是低级模块,threading是高级模块,对thread进行了封装。
import time, threading # 新线程执行的代码: def loop(): print 'thread %s is running...' % threading.current_thread().name n = 0 while n < 5: n = n + 1 print 'thread %s >>> %s' % (threading.current_thread().name, n) time.sleep(1) print 'thread %s ended.' % threading.current_thread().name print 'thread %s is running...' % threading.current_thread().name t = threading.Thread(target=loop, name='LoopThread') t.start() t.join() print 'thread %s ended.' % threading.current_thread().name
thread MainThread is running... thread LoopThread is running... thread LoopThread >>> 1 thread LoopThread >>> 2 thread LoopThread >>> 3 thread LoopThread >>> 4 thread LoopThread >>> 5 thread LoopThread ended. thread MainThread ended.
多进程和多线程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改。因此,线程之间共享数据最大的危险在于多个线程同时该变一个变量,把内容给改乱了。
因此得加上一把锁lock
balance = 0 lock = threading.Lock() def run_thread(n): for i in range(100000): # 先要获取锁: lock.acquire() try: # 放心地改吧: change_it(n) finally: # 改完了一定要释放锁: lock.release()
当多个线程同时执行lock.acquire()时,只有一个线程能成功地获取锁,然后继续执行代码,其他线程就继续等待直到获得锁为止。
但是,这样实际上就不是并行处理了。
Python的多进程由于存在GIL锁的问题,所以多线程实际上不能有效利用多核。多线程的并发在Python中是无用的。
- ThreadLocal
全局变量local_school就是一个ThreadLoacl对象,每个Thread对它都可以读写student属性,但是互不影响。可以把local_school看成全局变量,但每个属性如local_school.student都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。
import threading # 创建全局ThreadLocal对象: local_school = threading.local() def process_student(): print 'Hello, %s (in %s)' % (local_school.student, threading.current_thread().name) def process_thread(name): # 绑定ThreadLocal的student: local_school.student = name process_student() t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A') t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B') t1.start() t2.start() t1.join() t2.join()
Hello, Alice (in Thread-A) Hello, Bob (in Thread-B)
- 进程vs线程
多进程的优点是稳定性高,一个崩溃,不会影响其他的进程,但是,代价大,在linux下,调用fork还可以,但是windows下进程开销巨大。
多线程模式比多进程快一点,但是也快不了多少,缺点十分明显,由于共享进程的内存,一个线程崩了,就都崩了。
计算密集型和IO密集型:
计算密集型会消耗大量的CPU资源,代码的运行效率就至关重要,Python等脚本语言运行效率低,不适合。
IO密集型涉及到网络、磁盘IO的任务,它们的CUP消耗较少,任务的主要时间在等待IO操作完成,CUP效率无法完全使用,所以适合开发效率高的语言。
现代操作系统对IO操作进行了巨大的改进,支持异步IO。利用异步IO,就可以用单进程模型来执行多任务,这种全新的模型称为事件驱动型。
- 分布式进程
多台电脑协助工作,一台电脑作为调度者,依靠网络通信,将任务分布到其他电脑的进程中。
通过manager模块把Queue通过网络暴露出去,让其他机器的进程可以访问Queue
服务器继承中,负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务:
# taskmanager.py import random, time, Queue from multiprocessing.managers import BaseManager # 发送任务的队列: task_queue = Queue.Queue() # 接收结果的队列: result_queue = Queue.Queue() # 从BaseManager继承的QueueManager: class QueueManager(BaseManager): pass # 把两个Queue都注册到网络上, callable参数关联了Queue对象: QueueManager.register('get_task_queue', callable=lambda: task_queue) QueueManager.register('get_result_queue', callable=lambda: result_queue) # 绑定端口5000, 设置验证码'abc': manager = QueueManager(address=('', 5000), authkey='abc') # 启动Queue: manager.start() # 获得通过网络访问的Queue对象: task = manager.get_task_queue() result = manager.get_result_queue() # 放几个任务进去: for i in range(10): n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n) # 从result队列读取结果: print('Try get results...') for i in range(10): r = result.get(timeout=10) print('Result: %s' % r) # 关闭: manager.shutdown()
在另一台机器上启动任务进程:
# taskworker.py import time, sys, Queue from multiprocessing.managers import BaseManager # 创建类似的QueueManager: class QueueManager(BaseManager): pass # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字: QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 连接到服务器,也就是运行taskmanager.py的机器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和验证码注意保持与taskmanager.py设置的完全一致: m = QueueManager(address=(server_addr, 5000), authkey='abc') # 从网络连接: m.connect() # 获取Queue的对象: task = m.get_task_queue() result = m.get_result_queue() # 从task队列取任务,并把结果写入result队列: for i in range(10): try: n = task.get(timeout=1) print('run task %d * %d...' % (n, n)) r = '%d * %d = %d' % (n, n, n*n) time.sleep(1) result.put(r) except Queue.Empty: print('task queue is empty.') # 处理结束: print('worker exit.')
服务进程启动如下:
$ python taskmanager.py Put task 3411... Put task 1605... Put task 1398... Put task 4729... Put task 5300... Put task 7471... Put task 68... Put task 4219... Put task 339... Put task 7866... Try get results...
工作进程启动如下:
$ python taskworker.py 127.0.0.1 Connect to server 127.0.0.1... run task 3411 * 3411... run task 1605 * 1605... run task 1398 * 1398... run task 4729 * 4729... run task 5300 * 5300... run task 7471 * 7471... run task 68 * 68... run task 4219 * 4219... run task 339 * 339... run task 7866 * 7866... worker exit.
等到工作进程结束后,服务进程如下:
Result: 3411 * 3411 = 11634921 Result: 1605 * 1605 = 2576025 Result: 1398 * 1398 = 1954404 Result: 4729 * 4729 = 22363441 Result: 5300 * 5300 = 28090000 Result: 7471 * 7471 = 55815841 Result: 68 * 68 = 4624 Result: 4219 * 4219 = 17799961 Result: 339 * 339 = 114921 Result: 7866 * 7866 = 61873956
注意Queue的作用是来传递任务和接受结果的,每个任务的描述量要尽量小。比如发送一个处理日志文件的任务,不要发送几百兆的日志文件本身,而是发送日志文件存放的完整路径,由Worker进程再去共享的磁盘上读取文件。