- 进程间通信
进程之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信,Python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据
下面以Queue为例,在父进程中创建两个子进程,一个往Queue写数据,一个从Queue读数据
#-*- coding: utf-8 -*- from multiprocessing import Process,Queue import os,time,random def write(q): print('写进程为:%s' % os.getpid()) for value in ['A','B','C']: print('将 %s 放入队列' % value) q.put(value) time.sleep(random.random()) def read(q): print('读进程为:%s' % os.getpid()) while True: value = q.get(True) print('在队列中获取 %s' % value) if __name__ == '__main__': q = Queue() pw = Process(target=write,args=(q,)) pr = Process(target=read,args=(q,)) pw.start() pr.start() pw.join() pr.terminate() #杀死子进程 #输出结果: 写进程为:3576 将 A 放入队列 读进程为:880 在队列中获取 A 将 B 放入队列 在队列中获取 B 将 C 放入队列 在队列中获取 C #解析 q是Queue的实例,Queue的方法put()可以将元素放入item队列,get()可以查看队列中的元素 get(True)可以控制是否使用报错,如果使用False的话,在队列没有值的话会报错,反之则不会,默认为True
- 小结:
- 在Linux或Unix下,可以使用
fork()
调用实现多进程multiprocessing
模块可以实现跨平台的多进程- 进程间通信可以通过
multiprocessing
模块的Queue
、Pipes
方法来实现
三、多线程
多任务可以由多进程完成,也可以由进程内的多线程完成,一个进程至少拥有一个线程
由于线程是操作系统直接支持的执行单元,线程的调度也是由操作系统决定,因此,高级语言通常都会内置多线程的支持,Python也不例外,并且Python的多线程是真正的"线程",而不是模拟出的线程
Python标准库提供了两个模块,分别是_thread和threading,_thread是低级模块,threading是高级模块,threading对_thread进行了封装,大部分情况下,通常只需要使用threading这个高级模块
启动一个线程就是把一个函数传入并创建Thread实例,然后调用start()开始执行:
首先要知道,任何进程默认都会启动一个线程,我们把这个线程称之为主线程,主线程又可以启动新的线程
而Python中的threading模块有一个current_thread()函数,此函数永远返回当前线程的实例名称
主线程的实例名称叫做MainThread,子线程的名称是在创建时指定的,一般使用LoopThread来命名子线程,名称仅仅只是打印时用来显示,除此之外没有其他意义,如果不指定名称,则Python会自动以Thread-1、Thread-2这样的顺序取名
#-*- coding: utf-8 -*- import time, threading # 新线程执行的代码: def loop(): print('线程 %s 正在运行' % threading.current_thread().name) n = 0 while n < 5: n = n + 1 print('线程 %s >>> %s' % (threading.current_thread().name, n)) time.sleep(1) print('线程 %s 结束' % threading.current_thread().name) print('线程 %s 正在运行' % threading.current_thread().name) t = threading.Thread(target=loop, name='LoopThread') t.start() t.join() print('线程 %s 已经结束' % threading.current_thread().name) #输出: 线程 MainThread 正在运行 线程 LoopThread 正在运行 线程 LoopThread >>> 1 线程 LoopThread >>> 2 线程 LoopThread >>> 3 线程 LoopThread >>> 4 线程 LoopThread >>> 5 线程 LoopThread 结束 线程 MainThread 已经结束 #解析 第一行输出的名称'MainThread',其实就是当前进程创建的主线程名称,然后创建了线程,并且命名为'LoopThread',当线程结束后,最后主线程也结束
Lock互斥锁
- 多线程和多进程最大的不同就是:
多进程中,同一个变量,各自有一份拷贝保存在每个进程中,修改是互不影响的,但是在多线程中,所有的变量都会由所有线程共享,从而使任何一个变量都可以被任何线程修改,因此,线程之间共享数据最大的危险在于多个线程同时修改一个变量,使变量的值出现混乱
- 下面来看案例:
#-*- coding: utf-8 -*- import time, threading # 假定这是你的银行存款: balance = 0 def change_it(n): # 先存后取,结果应该为0: global balance balance = balance + n balance = balance - n def run_thread(n): for i in range(20000000): change_it(n) t1 = threading.Thread(target=run_thread, args=(5,)) t2 = threading.Thread(target=run_thread, args=(8,)) t1.start() t2.start() t1.join() t2.join() print(balance) #执行结果: 电脑速度过快的话,这里的输出可能还是0,可以把循环增加几百倍,我这里使用虚拟机用centos执行的,结果都是大于0的 #解析: 这里定义了共享变量'balance',初始值为0,并且启动两个线程去执行,从逻辑来看,现存后取,最终结果肯定也是0,但是由于线程的调度是由操作系统决定的,所以当两个线程交替执行并且循环次数足够多使,最终的结果就不一定为0了
出现上面的这种情况,是因为修改balance变量的值需要多条语句,而执行这几条语句时,线程可能会中断,从而导致多个线程把同一个对象的值改乱了
如果我们想要最终结果为0,那么就需要确保在一个线程修改balance变量时,别的线程不能修改。为了确保balance变量值计算正确,我们可以给change_it()方法上一把锁,锁的作用:
当某个线程开始执行change_it()方法时,如果该线程获得了锁,那么其他线程是不能同时执行change_it()方法的,只能等待锁被释放后,获得该锁后才能继续执行,但是由于锁只有一个,所以无论有多少线程,同一时刻最多只能有一个线程持有锁,这样就不会造成修改的冲突
通过threading.Lock()方法可以创建锁,下面来看案例:
看之前需要知道threading.Lock()的方法:
acquire():查询锁状态,如果锁是locked上锁状态,则同步阻塞,如果是unlocked非上锁状态,则将其上锁
release():解锁
#!/usr/bin/env python3 #-*- coding: utf-8 -*- import time, threading # 假定这是你的银行存款: balance = 0 lock = threading.Lock() #创建实例 def change_it(n): # 先存后取,结果应该为0: global balance balance = balance + n balance = balance - n def run_thread(n): #修改run_thread方法 for i in range(2000000): lock.acquire() #先获取锁 try: change_it(n) finally: lock.release() #执行完后要释放锁,否则会造成死锁 t1 = threading.Thread(target=run_thread, args=(5,)) t2 = threading.Thread(target=run_thread, args=(8,)) t1.start() t2.start() t1.join() t2.join() print(balance) #执行结果: 0 #解析: 每次在执行change_it()函数时,需要先获取锁,然后等待执行完毕之后通过finally释放锁,这样在执行函数时就会只有一个线程执行,从而避免变量被多个线程修改
在多个线程同时执行lock.acquire()时,只有一个线程可以成功获取到锁,然后继续执行代码,其他线程需要等待锁释放之后才能获取到锁
锁的优缺点:
优点:锁确保了某段关键代码只能由一个线程从头到尾完整的执行
缺点:阻止了多线程并发执行,包含锁的某段代码实际上只能单线程执行,效率会下降很多。并且由于可以存在多个锁,在不同的线程持有不同的锁、并且试图获取对方持有的锁时,可能会造成死锁,导致多个线程全部挂起,既不能执行也无法结束,只能由操作系统强制终止
注意:获得锁的线程用完后一定要释放锁,否则其他的线程就会一直进行等待,成为死锁,我们可以使用try...finally来确保锁一定会释放
多线程编程,模型复杂,容易发生冲突,必须用锁加以隔离,同时又要小心死锁的发生
- GIL锁
Python的线程虽然是真正的线程,但是解释器在执行代码时,有一个CIL(Global Interpreter Lock)全局锁,任何Python线程执行前,必须要先获取GIL锁,然后每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行
GIL全局锁实际上把所有线程的执行代码都上了锁,所以多线程在Python中只能交替执行,即使100个线程跑在100核CPU,也只能用到1核
GIL锁是Python解释器设计的历史遗留问题,通常我们用的解释器是官方的CPython,要真正利用多核,需要重写一个不带GIL的解释器,所以在Python中,可以使用多线程,但是无法有效利用多核,如果一定要通过多线程利用多核,那么只能通过C扩展来实现,不过这样就失去了Python简单易用的特点
不过Python虽然不能利用多线程实现多核任务,但是可以通过多进程实现多核任务,多个Python进程都各自拥有独立的GIL锁,互相不影响
四、ThreadLocal
- 在多线程环境下,每个线程都有自己的数据
一个线程使用自己的局部变量比使用全局变量好
,因为局部变量只有线程自己可以看到,不会影响其他线程,而使用全局变量的话就需要加锁,不然就可能使变量的值发生混乱- 使用局部变量也有问题,在函数调用的时候,传递起来很麻烦:
def process_student(name): std = Student(name) #编写了一个调用其他两个函数的函数,并且创建std实例,然后把实例传入其他函数,这里的std就相当于局部变量 do_task_1(std) do_task_2(std) def do_task_1(std): do_subtask_1(std) do_subtask_2(std) def do_task_2(std): do_subtask_2(std) do_subtask_2(std)
- 通过上面一层一层调用虽然可以进行传递,但是代码就会显得很臃肿。
- 我们可以使用一个全局的字典,这个字典存放所有的
Student
对象,然后以这个字典本身获取线程对应的Student
对象,下面来看案例:
global_dict = {} #全局字典 def std_thread(name): std = Student(name) global_dict[threading.current_thread()] = std #写入键值,键值为 '当前线程名称':'std实例' do_task_1() do_task_2() def do_task_1(): std = global_dict[threading.current_thread()] #根据当前线程名称取出std实例 ... def do_task_2(): std = global_dict[threading.current_thread()] #根据当前线程名称取出std实例 ...
上面的方式理论可行,而Python有更简单的方式,那就是使用ThreadLocal,使用ThreadLocal不用查找字典,它会帮我们自动做这件事
下面来看案例:
import threading # 创建全局ThreadLocal对象: local_school = threading.local() def process_student(): # 获取当前线程关联的student: std = local_school.student print('Hello, %s (in %s)' % (std, threading.current_thread().name)) #std是process_thread函数传入的name变量,第二个是线程名称 def process_thread(name): # 绑定ThreadLocal的student: local_school.student = name process_student() t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A') t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B') t1.start() t2.start() t1.join() t2.join() #执行结果: Hello, Alice (in Thread-A) Hello, Bob (in Thread-B)
全局变量local_school就是一个ThreadLocal对象,每个Thread对它都可以读写student属性,但互不影响。你可以把local_school看成全局变量,但每个属性如local_school.student都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。
可以理解为全局变量local_school是一个字典,不但可以用local_school.student,还可以绑定其他变量,如local_school.teacher等等。
ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。
小结
一个ThreadLocal变量虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰。ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题。
五、进程 VS 线程
要实现多任务,通常会设置Master-Worker
模式,其中Master负责分配任务,而Worker负责执行任务,因此多任务环境下,通常是一个Master和多个Worker
多进程实现Master-Worker,主进程是Master,其他进程就是Worker
多线程实现Master-Worker,主线程是Master,其他进程就是Worker
在了解过多线程和多进程后,下面来看一下这两种方式的优缺点:
多进程:
优点:多进程稳定性高,因为一个子进程崩溃了是不会影响主进程和其他子进程的。需要注意的是如果是主进程挂了,那么所有的进程就挂了,但是主进程只负责分配任务,挂掉的几率很低,Apache最早就是采用多进程模式
缺点:多进程创建进程的代价大,如果在Unix/Linux系统下,使用fork()创建进程还行,但是如果是在Windows系统,那么创建进程开销会很大。并且操作系统能够同时运行的进程数量也是有限的,在内存和CPU的限制下,如果有几千个进程同时运行,那么操作系统连调度都会成为问题
多线程:
优点:多线程模式通常要比多进程快一点,但是也快不到哪里去。但是在Windows系统下,多线程的效率要比多进程高,微软的IIS服务器默认都是使用多线程模式
缺点:多线程模式的致命缺点就是任何一个线程挂掉都可能会使整个进程崩溃,因为所有线程共享进程的内存,如果一个线程执行的代码除了问题,可能会看到提示“该程序执行了非法操作,即将关闭”,这样的提示往往是某个线程出现了问题,然后操作系统会强制结束整个进程
- 线程切换
- 每次处理一个任务,然后依次做完,这种方式称之为
单任务模型
,或者批处理任务模型
- 处理一个任务一分钟然后切换到下一个任务,以此推类,依次循环往复,只要切换的速度够快,这种方式其实就和单核CPU执行多任务是一样的,这种方式就叫
多任务模型
。
但是切换任务也是有代价的,每次切换之前需要先保存当前执行的线程环境,例如CPU寄存器状态、内存页等,然后把新任务的执行环境准备好,即恢复上次的CPU寄存器状态、内存页等,准备好之后才能开始执行,这个过程虽然很快但是也会耗费时间,如果有几千个任务同时执行,操作系统可能会忙着切换任务,从而没有时间去执行任务,最常见的就是硬盘狂响,点击窗口桌面没有反应,系统处于假死状态
总之,多任务一旦多到一个限度,就会消耗掉系统的所有资源,使效率急剧下降,所有任务都做不好
- 计算密集型 VS IO密集型
- 是否采用多任务的第二个考虑是任务的类型,任务类型分为计算密集型和IO密集型
计算密集型:
计算密集型任务的特点就是需要进行大量的计算,消耗CPU资源,例如计算圆周率、对视频进行高清解码等,主要依靠CPU的运算能力,这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低。
如果想要高效的利用CPU,那么计算密集型任务同时进行的数量应该和CPU的核数相等
计算密集型任务主要消耗CPU,因此,代码运行效率至关重要,Python这样的脚本语言运行效率很低,完全不适合计算密集型任务,对于计算密集型任务应该使用C语言编写
IO密集型:
IO密集型任务涉及到网络、磁盘IO,这类任务的特点是CPU消耗少,任务的大部分时间都在等待IO操作完成,这是因为IO的速度远远低于CPU和内存的速度。
对于IO密集型任务,任务越多,CPU效率越高,但是也有一个限度。常见的大部分任务都是IO密集型任务,例如Web应用
IO密集型任务在执行期间,99%的时间都花在IO上,花在CPU的时间很少,因为就算使用C语言替换Python,也不会提升运行效率,对于IO密集型任务,最合适的就算开发效率高、代码量少的语言,首选脚本语言,例如Pyhon,C语言最差
六、分布式进程
- 在Thread线程和Process进程中,优先选择Process进程,因为进程更加稳定,并且,进程可以分布到多台机器上,而线程最多只能分布到同一台机器的多个CPU上
- Python的multiprocessing模块不但支持多进程,其中的managers子模块还支持把多进程分布到多台机器上。
- 一个服务进程可以作为调度者,将任务分布到其他多个进程中,并且依靠网络通信,由于managers模块封装很好,所以我们不必了解网络通信的细节,就可以编写分布式多进程程序,例如:
当我们有一个通过Queue通信的多进程程序在同一台机器上运行,而现在处理任务的进程任务繁重,需要把发送任务的进程和处理任务的进程分布到两台机器上
可以这么做,原有的Queue可以继续使用,我们通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问这个Queue了
下面来看案例:
因为原文的代码有点问题,这里使用别的案例,案例来源:https://blog.csdn.net/u011318077/article/details/88094583
使用模块
使用multiprocessing和queue模块 - 使用multiprocessing.managers中的BaseManager创建分布式管理器 - 使用Queue创建队列,用于多个进程之间的通信
服务进程test.py
逻辑步骤:
定义两个Queue队列,一个用于发送任务,一个接收结果
把上面创建的两个队列注册在网络上,利用register方法
绑定端口8001,设置验证口令,这个相当于对象的初始化
启动管理器,启动Queue队列,监听信息通道
通过管理实例的方法获访问网络中的Queue对象
添加任务,获取返回的结果
关闭服务
# -*- coding:utf-8 -*- import random, queue from multiprocessing.managers import BaseManager from multiprocessing import freeze_support #1.定义两个Queue队列,tesk_queue用于发送任务,result_queue用于接收结果 task_queue = queue.Queue() result_queue = queue.Queue() #创建QueueManager类,继承BaseManager,用于后面创建管理器 class QueueManager(BaseManager): pass #定义两个函数,返回结果是Queue队列 def return_task_queue(): global task_queue #定义全局变量 return task_queue #返回发送任务的队列 def return_result_queue(): global result_queue return result_queue #返回接收结果的队列 #2.利用register方法把上面创建的两个队列(也就是两个函数)注册在网络上 #callable参数关联了Queue对象,将Queue对象在网络中暴露 #QueueManager.register的第一个参数是注册在网络上队列的名称 def test(): QueueManager.register('get_task_queue', callable=return_task_queue) QueueManager.register('get_result_queue', callable=return_result_queue) #3.绑定端口8001,设置验证口令,这个相当于对象的初始化 #绑定端口并填写验证口令,windows下需要填写IP地址,Linux下默认为本地,地址为空 manager = QueueManager(address=('127.0.0.1', 8001), authkey=b'abc') #口令必须写成类似b'abc'形式,只写'abc'运行错误,其中口令为'abc' #4.启动管理器,启动Queue队列,监听信息通道 manager.start() #5.通过管理实例的方法获访问网络中的Queue对象,即通过网络访问获取任务队列和结果队列,创建了两个Queue实例, task = manager.get_task_queue() #发送 result = manager.get_result_queue() #接收 #6.添加任务,获取返回的结果,将任务放到Queue队列中 for i in range(10): n = random.randint(0, 10) #返回0到10之间的随机数 print("Put task %s ..." % n) task.put(n) # 将n放入到任务队列中 #从结果队列中取出结果 print("Try get results...") for i in range(11): # 注意,这里结果队列中取结果设置为11次,总共只有10个任务和10个结果,多1次主要是确认队列中是不是已经空了 #总共循环10次,上面放入了10个数字作为任务 #加载一个异常捕获 try: r = result.get(timeout=5) # 每次等待5秒,取结果队列中的值 print("Result: %s" % r) except queue.Empty: #如果上面代码出现异常则输出这个 print("result queue is empty.") #7.关闭服务 #一定要关闭,否则会报管道未关闭的错误 manager.shutdown() print("master exit.") if __name__ == '__main__': freeze_support() #Windows下多进程可能出现问题,添加这个可以缓解 print("Start!") # 运行服务进程 test()
- 任务进程test_2.py
逻辑步骤
- 使用QueueManager注册用于获取Queue的方法名称
- 连接到服务器,也就是运行服务进程代码的机器
- 从网络上获取Queue对象,并进行本地化,与服务进程是同一个队列
- 从task队列获取任务,并把结果写入到resul队列
- 任务结束
# coding: utf-8 # 定义具体的任务进程,具体的工作任务是什么 import time, sys, queue from multiprocessing.managers import BaseManager #创建QueueManager类,继承BaseManager,用于后面创建管理器 class QueueManager(BaseManager): pass #1.使用QueueManager注册用于获取Queue的方法名称 #前面服务进程已经将队列名称暴露到网络中,该任务进程注册时只需要提供名称即可,与服务进程中队列名称一致 QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') #2.连接到服务器,也就是运行服务进程代码的机器 server_addr = '127.0.0.1' print("Connet to server %s..." % server_addr) #创建一个管理器实例,端口和验证口令保持与服务进程中完全一致 m = QueueManager(address=(server_addr, 8001), authkey=b'abc') #连接到网络服务器 m.connect() #3.从网络上获取Queue对象,并进行本地化,与服务进程是同一个队列 task = m.get_task_queue() result = m.get_result_queue() #4.从task队列获取任务,并把结果写入到resul队列,这里也循环10次,因为上面的服务队列也是计算10次 for i in range(10): try: #前面服务进程向task队列中放入了n,这里取出n,计算n和n相乘,并将相乘的算式和结果放入到result队列中去 n = task.get(timeout=1) #每次等待1秒后取出任务 print("run task %d * %d..." % (n, n)) r = '%d * %d = %d' % (n, n, n*n) #进行计算 time.sleep(1) result.put(r) #把r的值返回到result队列中 except queue.Empty: print("task queue is empty.") # 任务处理结束 print("worker exit.")
执行时,可以开两个cmd,先执行服务进程,再执行任务进程