Python并发编程
本文比较长,绕的也比较快,需要慢慢跟着敲代码并亲自运行一遍,并发编程本身来说就是编程里面最为抽象的概念,单纯的理论确实很枯燥,但这是基础,基础不牢,地洞山摇,在概念这节里面还需要好好的品味一番。
注意:看本文需要Python基础,以下所有代码均在centos上运行,因为牵扯协程问题,所以推荐python 3.6以上版本,函数作用域、返回值、挂起,偏函数等。没有此基础暂时不建议阅读
一、概念
Python并发的概念非常的抽象,但同时也非常的重要,因为这事关能不能准确的写出高并发的质量性代码。
进程:顾名思义,正在进行的一个过程。
背景:进程起源于操作系统,是操作系统最核心的概念,也是操作系统提供的最古老最重要最抽象的概念之一。
1. 单核并发
即使操作系统只有一个CPU,但是使用进程的概念也能使这一个CPU支持并发的能力,这种并发就称之为伪并发。
将一个CPU变成多个虚拟的CPU的技术就称之为多道技术。而多道技术又分为时间多路复用和空间多路复用(当然,必须硬件支持隔离),所以就有这样一个理论,没有进程的抽象,现代计算机将不复存在。
2. 操作系统
在进程并发上,操作系统起着非常重要的作用,它隐藏了复杂的硬件接口,提供了良好的抽象接口,与此同时,它更是对进程的管理与调度起着不可或缺的作用,因为如果没有操作系统来管理进程,多个进程就会变得杂乱无章,使得计算机资源严重浪费。
3. 多道技术
前边提到了多道技术,不难发现,它就是针对单核的计算机实现并发。
(1) 空间复用
举个例子:
假设现在双击桌面上的IDM快捷方式,然后呈现软件界面。其实后台运行了很多的复杂的IO操作。
- 双击快捷方式
- 快捷方式会告诉操作系统一个资源路径,也就是快捷方式所对应的应用程序的路径
- 操作系统从硬盘读取文件内容并克隆到内存中
- CPU从内存中读取数据,然后执行
而这里的多道技术可以理解为多个程序,使用空间复用就是在内存中同时跑多个程序,够简单了吧。
(2) 时间复用
同样举例说明:
我们知道有一款下载软件叫IDM,下载速度非常的快,它的原理其实就是将一个文件切割成非常多的细小文件下载,假设现在下载一张图片,它会将一张图片分为10块,现在有2个进程同时下载,在遇到阻塞的情况下,就切换到其他小块下载,下载完之后再回到这个小块上试试能不能继续下载,如果不能,就再切换到其他的小块,如此往复循环,直到一个完整的文件下载完毕。
时间上的复用,其实就是复用CPU上的一个时间片,进程在执行的时候,遇到IO就切换,占用CPU时间过长的时候也切,值得注意的是,在切换之前,会保存进程的状态,这样才能保证下次切换回来的时候继续上次停顿的地方。
二、多进程
多进程的概念网上比比皆是,简单来讲,就是正在进行的一个过程或者一个任务,而负责执行任务的是CPU。
需要注意的是,进程和程序之间是有区别的,两者绝对不能混淆。
进程与程序之间的区别:
程序仅仅是一堆代码,而进程是这堆代码的执行过程
看到这里还在迷糊不要紧,举个例子就能很明白了
栗子:
假设M正在织毛衣,组成毛衣的毛线就是一堆堆的代码,而M就是CPU正在执行织毛衣的过程。
那么现在M的丈夫回来了,丈夫说好饿,这个时候就需要考虑哪件事情相对来说重要,M考虑一下,觉得先做饭比较重要,这就是优先级,然后M记录下自己织的毛衣织到哪里了,再去做饭,这种切换就是处理其他优先级高的任务,每个进程拥有各自的程序,就是菜和毛线,当M做完饭后又回来织毛衣,从离开任务的地方继续执行。
这里需要注意一点:同一个程序执行多次,也就是多个进程,比如上边的IDM,我启动两次,就既可以下载苍老师,也可以下载波老师,两个进程之间互不影响。
1. 并发与并行
并行:多个程序同时运行
并发:伪并行,看起来是同时运行,其实质是利用了多道技术
无论是并行还是并发,在用户眼里看起来都是同时运行的,不管是线程还是进程,都只是一个任务,真正干活的CPU,而同一个CPU在同一时刻只能执行一个任务。
2. 进程的创建
我们将可以跑很多应用程序的系统称之为通用系统,那么对于通用系统来说,创建进程有4中形式
- 系统的初始化,在linux中查看进程ps,windows中使用任务管理器。在前台运行的进程负责与用户发生交互,后台进程与用户无关,而有些时候,用户需要去唤醒后台的进程与之发生交互,这种类似于睡眠的后台进程就称之为守护进程。
- 一个进程在开启的时候必须开启子进程工作。比如python的fork方法
- 用户的交互式请求使得操作系统创建一个新的进程。如双击IDM
- 批处理作业的初始化。这种情况只会在大型机中出现
不管是哪一种,创建新进程都是由已经存在的进程调用系统创建进程的接口来实现的。
- linux中的系统调用是
fork
,它会创建一个与本身进程一模一样的副本进程,这个被创建的进程就是子进程,二者具有相同的存储映像、相同的环境字符串和相同的打开文件。比如说,在shell解释器中,每执行一个命令就会创建一个子进程 - windows中调用是
createProcess
,它会有两种作用,既创建进程,还会将程序装进新的进程
以上两种操作系统创建进程并不完全一样
- 相同点:进程在创建之后,两个进程都各自有不同的地址空间,任何一个进程的地址空间的修改都不会影响其他进程
- 不同点:在linux中,子进程的初始地址空间是父进程的一个副本,它的子进程和父进程之间是可以存在只读的共享内存区;windows中,从一开始两个进程的地址空间完全不同
所以说,学习Python推荐在linux上学习,便于后期进程之间的通信,而mac其核心也是linux,所以linux的任何python代码在mac上都是可行的
3. 进程的终止
进程有四种退出方式
- 正常退出:用户行为退出。比如点击IDM界面的X号关掉,在linux中使用exit或者quit
- 报错退出:用户行为报错。比如现在执行命令
python demo.py
,而该路径下并没有demo.py
文件 - 告警退出:系统本身出错。比如执行
pa -aux
,而系统本身是没有该命令 - 杀死退出:其他进程杀死。比如常用的
kil -9
4. 进程的层其结构
- 在linux中,所有的进程都是以init进程为根,组成树形结构。父进程共同组成进程组。
- 在windows中没有进程层次之分,所有进程地位相同。
值得注意的是:
在windows创建进程的时候,父进程会得到一个特别的令牌,这个令牌就是句柄,这个句柄可以控制子进程,这时进程就有了层次的概念,但是windows中的父进程有权把句柄传递给其他子进程,这样一来,进程又没有了层次的概念。
5. 进程的状态
进程有三种状态:就绪、阻塞、运行
举个例子:
现在执行linux命令tail -f web.log | grep '404'
执行程序tail,则会开启一个子进程,而grep又会另外开启一个子进程,这两个进程基于管道|
来进行通讯,也就是将tail出来的内容交给grep处理。在这个过程中,grep等待tail的结果,这种现象就是阻塞,如果tail一直阻塞,则grep将无法执行。
实质上总结出一下两点情况:
- 进程挂起是自身原因,在遇到IO阻塞,就让出CPU给其他进程,这样下来就保证了CPU一直处于工作的状态
- 在遇到CPU占用时间过长或者处理优先级较高的进程
6. 进程并发的实现
之所以在这里添加进程表的概念,是为了更好的理解操作系统与进程之间的关联,了解即可
所谓进程的并发,无非是硬件中断一个正在进行的进程,然后保存当前进程的状态。
操作系统会维护一张表格,我们称之为进程表,每一个进程都会占用一个进程表项,这个进程表项就称之为进程控制块。
这张表大致分四块:进程描述信息、进程控制信息、CPU现场保护结构
(1) 进程描述信息
- 进程名或者进程标识号:每个进程都有自己唯一的进程名或标识号
- 用户名或者用户识别号:每个进程隶属于某个用户,有利于资源共享和保护
- 家族关系:有的系统中的所有进程互成家族关系。比如常见的linux系统就是以init为根的家族树
(2) 进程控制信息
- 进程当前状态:说明进程当前处于什么状态,就是前边提到的就绪、运行、阻塞
- 进程优先级:用于选取进程占有CPU。与优先级有关的PCB表项还有占有CPU时间、进程优先级偏移、占据内存时间等等
- 程序开始地址:用来规定该进程以此地开始进行
- 各种计时信息:给出进程占有和利用资源的情况
- 通信信息:说明该进程在执行过程中与其他进程之间发生的信息交换
- 资源管理信息:占用内存大小以及其管理用数据结构指针
(3) CPU现场保护结构
寄存器值:通用、程序计数器PC、状态PSW、地址包括栈指针
7. 开启子进程
如果你顺利的看到了这里,则已经掌握了进程的理论知识,现在开始最为精彩的代码小节
(1) multiprocessing模块
Python中的多线程是无法利用计算多核的优势,如果需要充分的使用多核资源,在Python中大部分使用多进程。
multiprocessing模块用来开启子进程,并且在子进程中执行指定的任务。
该模块功能诸多:支持子进程、通信、数据共享、执行不同形式的同步,更是提供了 Process、Queue、Pipe、Lock等组件
这里一定要注意:与线程不同,进程没有任何的共享状态,进程修改的数据、改动仅限于该进程之内
(2) Process类
创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]])
由该类实例化得到的对象,表示一个子进程中的任务。
注意:
- 这里的参数必须使用关键字来指定
- args为传给target函数的位置参数,必须以元组的形式传入
参数:
group:值始终为None
target:调用的对象,即子进程需要执行的任务
args:位置参数元组,按照位置传参
kwargs:按照字典传参
name:子进程名称
方法:
假设p = Process()
p.start()
:启动进程
p.run()
:进程启动时运行的方法,用于调用target来指定需要执行的函数
p.terminate()
:强制终止进程p,并不会做任何的清理操作,如果p下还创建了子进程,那么这个子进程并没有父进程处理,这个子进程就称之为僵尸进程。
p.is_alive()
:判断p是否还在运行,如果还在运行则返回True
p.join([timeout]):
主线程等待p终止,也可以理解为回收计算机资源,值得注意的是,主线程从始至终处于等待的状态,而p是处于运行的状态,join只能作用于start的进程,而不能作用于run的进程
属性介绍:
p.daemon
:默认值为False,如果设置为True,则代表p为后台的守护进程,也就是前边提到的后台运行的守护进程等待用户与之发生交互。当p的父进程终止的时候,p也随之终止 ,并且设定为True之后是不能创建自己的新进程的,改设置必须在start之前设置
p.name
:进程的名称
p.pid
:进程的PID,和linux的PID类似
(3) 代码实现
from multiprocessing import Process import time def fun(name): print("{} 正在执行。。。".format(name)) time.sleep(2) print("{} 执行完毕。。。".format(name)) if __name__ == '__main__': p = Process(target=fun, args=('Chancey',)) # 这里传参必须是以元组的形式传参 p.start() print("主线程启动。。。")
上述例子非常简单,不过,这只是开启了一个进程,接下来开启4个进程
from multiprocessing import Process import time def fun(name): print("{} 正在执行。。。".format(name)) time.sleep(2) print("{} 执行完毕。。。".format(name)) if __name__ == '__main__': # 实例化以得到四个对象 p1 = Process(target=fun, args=('Chancey',)) p2 = Process(target=fun, args=('Waller',)) p3 = Process(target=fun, args=('Mary',)) p4 = Process(target=fun, args=('Arry',)) # 调用方法,开启四个进程 p1.start() p2.start() p3.start() p4.start() print("主线程启动。。。")
运行上述代码的时候会发现,四个进程同时进行,同时结束,这是因为设定了sleep的时间。
还有一种启动方式,就是面向对象的三大特征之一的继承,我们通过继承process类并重写父类方法以达到我们的需求
from multiprocessing import Process import time class MyProcess(Process): def __init__(self, name): super().__init__() # 重用父类的init方法 self.name = name def run(self): # 重写父类方法 print("{} 正在执行。。。".format(self.name)) time.sleep(2) print("{} 执行完毕。。。".format(self.name)) if __name__ == '__main__': p = MyProcess('Chancey') p.start() print('主进程')
那么开启多进程就变成了
from multiprocessing import Process import time class MyProcess(Process): def __init__(self, name): super().__init__() # 重用父类的init self.name = name def run(self): # 重写父类方法 print("{} 正在执行。。。".format(self.name)) time.sleep(2) print("{} 执行完毕。。。".format(self.name)) if __name__ == '__main__': p1 = MyProcess('Chancey') p2 = MyProcess('Waller') p3 = MyProcess('Mary') p4 = MyProcess('Arry') p1.start() p2.start() p3.start() p4.start() print('主进程')
(4) 查看进程的信息
pid用来查看父进程的ID
ppid用来查看子进程的ID
from multiprocessing import Process import time import os class MyProcess(Process): def __init__(self, name): super().__init__() self.name = name def run(self): print("{} 正在执行。。。子进程ID:{},父进程ID:{}".format(self.name, os.getpid(), os.getppid())) time.sleep(2) print("{} 执行完毕。。。子进程ID:{},父进程ID:{}".format(self.name, os.getpid(), os.getppid())) if __name__ == '__main__': p1 = MyProcess('Chancey') p2 = MyProcess('Waller') p3 = MyProcess('Mary') p1.start() p2.start() p3.start() print("主进程", os.getppid(), os.getppid())
非常的简单,同一个父进程下边有三个子进程工作
(5) 其他属性
在Python的多进程编程中,还有其他很重要的Process对象属性
join方法
在主进程运行的过程中如果想并发的执行其他任务,就需要开启子进程,这时就有两种情况
- 如果主进程的任务和子进程的任务彼此独立,主进程在完成执行任务之后等待子进程执行完毕,然后统一回收资源;
- 如果主进程在执行到某一个阶段需要子进程执行完毕之后才能继续,届时就需要一种机制来检测子进程是否执行完毕,而这种检测机制正是join的作用。如果子进程没有执行完毕,就需要阻塞等待
简单示例:
from multiprocessing import Process import time import os class MyProcess(Process): def __init__(self, name): super().__init__() self.name = name def run(self): print("{} 正在执行。。。子进程ID:{},父进程ID:{}".format(self.name, os.getpid(), os.getppid())) time.sleep(2) print("{} 执行完毕。。。子进程ID:{},父进程ID:{}".format(self.name, os.getpid(), os.getppid())) if __name__ == '__main__': p1 = MyProcess('Chancey') p2 = MyProcess('Waller') p1.start() p2.start() p1.join() p2.join() print('主进程', '父进程ID:', os.getpid(), '子进程ID:', os.getppid())
运行截图:
在这里就可以很容易的发现,主进程从原来的最先执行变为了最后,这正是因为使用join使得主进程等待子进程执行完毕才回收,那么,这样下来会不会有僵尸进程的存在,有关僵尸进程忘记的,请移步至第7节的(2)Process类。只需要在上边的代码结尾加上print(p.pid())
即可查看
事实证明,这里确实存在了僵尸进程。
多个进程同时进行就是并发,如果多个进程是每一个都等待上一个进程执行完毕之后才执行,这种执行方式就是串行
对上边代码稍微改动一下:
from multiprocessing import Process import time import os class MyProcess(Process): def __init__(self, name): super().__init__() self.name = name def run(self): print("{} 正在执行。。。子进程ID:{},父进程ID:{}".format(self.name, os.getpid(), os.getppid())) time.sleep(2) print("{} 执行完毕。。。子进程ID:{},父进程ID:{}".format(self.name, os.getpid(), os.getppid())) if __name__ == '__main__': p1 = MyProcess('Chancey') p2 = MyProcess('Waller') p1.start() p1.join() p2.start() p2.join() print('主进程', '父进程ID:', os.getpid(), '子进程ID:', os.getppid()) print(p1.pid) print(p2.pid)
贴上运行截图
不难发现,其实每个进程都是分时间段进行的,在同一时间并没同时进行,正是所谓的串行。
is_alive方法
该方法是用于查看进程是否存活,如果存活则返回True
,反之亦然。
同时这里还有个属性name
,用于给进程起名
from multiprocessing import Process import time import os def fun(name): print("{} 正在执行。。。子进程ID:{},父进程ID:{}".format(name, os.getpid(), os.getppid())) time.sleep(2) print("{} 执行完毕。。。子进程ID:{},父进程ID:{}".format(name, os.getpid(), os.getppid())) if __name__ == '__main__': p = Process(target=fun, args=('Chancey',)) print(p.is_alive()) p.start() print(p.is_alive()) print(p.name) print("主进程, 父进程ID:{},子进程ID:{}".format(os.getpid(), os.getppid())) print('='*60) p = Process(target=fun, args=('Waller',), name="Cute_Process") print(p.is_alive()) p.start() print(p.is_alive()) p.terminate() # 强制杀死该进程 print(p.is_alive()) time.sleep(3) print(p.is_alive()) print(p.name) print("主进程, 父进程ID:{},子进程ID:{}".format(os.getpid(), os.getppid()))
上边的代码中p.terminate()
是为了后边验证是否存活的使用
这里可以清晰的看到,在刚刚执行了p.terminate()
之后还是返回了True,但是在sleep了3秒之后,又变成了False,所以说,在杀死一个进程的时候并不会立即回收空间;没有命名的进程默认Process-1
,取名之后为自己定义的那个值。
在函数中我们都知道如果要在一个函数中使用另外一个函数中的变量,就可以这么写
n = 0 def fun1(): global n n = 100 print(n) def fun2(): print(n) if __name__ == '__main__': fun1() fun2()
这样一来,在fun2中输出的n就是fun1中的值,那么进程中同样使用global
关键字试试
from multiprocessing import Process n = 0 def fun1(): global n n = 100 print('子进程中的变量:', n) if __name__ == '__main__': p = Process(target=fun1) p.start() print('主进程中的变量:', n)
非常容易就能发现,其实关键字并没有起作用,这就证明,进程之间的内存空间是隔离的
(6) 守护进程
前边有介绍过什么是守护进程
那么定义守护进程必须在start之前使用daemon定义
写个未使用守护进程的例子:
from multiprocessing import Process import time def fun(name): print('{}正在执行'.format(name)) p = Process(target=time.sleep, args=(3,)) p.start() if __name__ == '__main__': p = Process(target=fun, args=("Chancey", )) p.start() p.join() print('主进程')
运行上边的代码不会出现任何问题,现在将其改为守护进程
只需要在创建实例的时候添加参数daemon = True
即可
动手能力强的人可能已经跑了一遍修改过的代码了,毫无疑问,这段代码有错误,看一下官方文档
这个说明正好对应上了上边修改过代码的报错
没有猜错,它就是说守护进程的子进程不能再次创建子进程,再次修改代码
from multiprocessing import Process import time def fun(name): print('{}正在执行'.format(name)) # p = Process(target=time.sleep, args=(3,)) # p.start() time.sleep(3) if __name__ == '__main__': p = Process(target=fun, args=("Chancey", ), daemon=True) p.start() p.join() print('主进程')
这样一来,主进程总是等待子进程执行完毕才执行,那如果不做join呢,去掉上边代码中的p.join()
,执行发现,主进程并没有等待子进程执行完毕,而是直接退出,这也就使得子进程的任务并没有被执行就被迫退出,这就是守护进程存在的意义。
来个多进程的实例:
from multiprocessing import Process import time def game(name): print('%s 正在玩游戏。。。' % name) time.sleep(3) print('%s 玩完游戏了。。。' % name) def sing(name): print('%s 正在唱歌。。。' % name) time.sleep(3) print('%s 唱完歌了。。。' % name) if __name__ == '__main__': p1 = Process(target=game, args=('Chancey', ), daemon=True) p2 = Process(target=sing, args=('Chancey', )) p1.start() p2.start() print("进程一:", p1.name) print("进程二:", p2.name)
主进程结束后,只会让子进程跟着结束,但是其他的子进程会依旧执行,这就是为什么game没有执行而sing执行的原因,如果在这里join一下呢,答案是肯定的,game和sing都会执行。
(7) 互斥锁
现在有一段代码
from multiprocessing import Process import time def foo(name): print("进程{}输出:1".format(name)) time.sleep(2) print("进程{}输出:2".format(name)) time.sleep(3) print("进程{}输出:3".format(name)) if __name__ == '__main__': for i in range(3): p = Process(target=foo, args=(i, )) p.start()
先不要运行,分析一下逻辑,当运行一个进程,函数中的代码自上而下的执行,所以它应该是这样输出的
现在运行一下代码
并不是想象那样输出,输出步骤完全紊乱,要是用这样的操作数据库的话,后果不堪设想,估计就得跑路了。
OK,为了解决这种出现数据紊乱的情况,就出现了Lock()
互斥锁,它会在运行的时候锁住资源,从而使得其他进程并不会使用该资源。
通俗点讲:现在有一群工人,他们要抢一间房子,当一个工人抢到之后就给房间上一把锁,然后执行任务,这时其他工人在外边等候,当这个抢到房子并完成任务开锁之后,其他工人才能进入执行任务。
这里的工人就是进程,房子就是计算机资源,而门锁就是互斥锁,正是因为有了互斥锁,才保证了共享数据的完整性
语法
mutex = Lock()
:实例化一个互斥锁
mutex.acquire()
:上锁
mutex.release()
:解锁
使用
from multiprocessing import Process, Lock import time def foo(i, mutex): mutex.acquire() print("进程{}输出:1".format(i)) time.sleep(2) print("进程{}输出:2".format(i)) time.sleep(1) print("进程{}输出:3".format(i)) mutex.release() if __name__ == '__main__': mutex = Lock() for i in range(3): p = Process(target=foo, args=(i, mutex)) p.start()
然后这里输出正常了,但是,这里变成了串行, 因为使用时占用时间,影响其他进程等待,所以尽量修改处理块的数据后立即释放锁。
用当下最为火热的抢票过程演示一下互斥锁的使用场景
查询余票是并发的,而购票只能确保一个人成功,所以购票的方法应该使用互斥锁
from multiprocessing import Process, Lock import time def search(name): time.sleep(1) with open('data.txt') as f: count = int(f.read()) print('<%s> 查看到剩余票数【%s】' % (name, count)) def get(name): time.sleep(1) f = open('data.txt') count = f.read() f.close() count = int(count) if count > 0: count -= 1 time.sleep(1) f = open('data.txt', 'w') f.write(str(count)) f.close() print('<%s> 购票成功' % name) def start(name, mutex): search(name) mutex.acquire() get(name) mutex.release() if __name__ == '__main__': mutex = Lock() for i in range(5): p = Process(target=start, args=('顾客%s' % i, mutex)) p.start()
这里的data.txt里只有余票,就是数字2,通过运行,多人并发查询,一人购票
这里想到了join一下里面的get或者search,原理上就会出现所有程序的串行运行,极大的降低了程序的运行效率。
(8) 队列
前边有提到,每个进程的空间都是互相隔离的,如果想要在进程之间进行通信,就需要的手段。通常情况下,需要在内存中开辟一块空间,然后多个进程使用同一空间进行IO操作,这个空间就是管道空间。
按照作用可以分为两种:
- 双向管道:全双工,所有进程均可读写(默认)
- 单向管道:半双工,一个只读,一个只写
管道均由Pipe()
实现
但是如果多个进程同时进入管道的话,数据依旧会乱,那么这个时候,还是需要加锁,而multiprocessing
还提供了Queue,即队列,队列就是管道加锁的体现。
Queue
Queue()
:参数为队列的最大项数,不设置则不限制大小
注意:队列里面存放的消息而不是数据,另外,队列占用的是内存空间,所以参数即使不设置也会受限于内存大小
方法
q.put()
:插入数据至队列
q.get()
:从队列中获取一个数据并删除
q.full()
:判断队列是否已满,已满的话返回True,反之亦然
q.empty()
:判断队列是否为空,空则返回True,反之亦然
q.nowait(obj)
:相当于put(obj, Flase)
q.size()
:返回队列的大致长度,不够准确,甚至在linux平台报NotImplementedError
错
例如:
from multiprocessing import Queue q = Queue() for i in range(5): q.put(i) print('队列是否已满:', q.full()) print('队列大小:', q.qsize()) for i in range(5): q.get() print('取出一个消息') print('队列是否为空:', q.empty())
这里并没有限制队列的大小,所以队列一直没满,而在取出的时候,直到取完最后一个消息的时候才返回True
现在设置一下队列大小,将q = Queue()
改为q = Queue(3)
可以看到,当队列大小达到3的时候,进程阻塞,因为设置了队列的大小使得消息添加不进去,但是取出的方法还没有执行,所以就一直阻塞
(9) 生产者和消费者
上边例子中,put添加消息,get取出消息,实际上就是生产者与消费者的关系,一个负责生产数据,一个负责消费数据。
在并发编程中,如果生产者的处理速度非常快,而消费者处理速度慢,这时生产者就需要等待消费者消费完队列的数据才再次生产。同样的道理,如果消费者的处理速度快于生产者的速度,消费者也是要等到生产者生产出数据才能继续执行任务。
实质上,生产者和消费者模式是通过一个容器来来解决生产者和消费者之间的耦合度问题。他们之间彼此不直接通信。而当生产者或者消费者积累一定消息的时候,彼此无法执行,所以当生产者生产完数据的时候,直接扔进一个地方,然后消费者去那个地方那数据,有一个缓冲的作用,被称之为阻塞队列,阻塞队列平衡了生产者和消费者的处理能力,用于耦合他们。
示例:
from multiprocessing import Process, Queue import time # 生产程序 def production(q, number): for i in range(3): msg = 'URL%s' % i time.sleep(2) print("生产者%s生产了" % number, msg) q.put(msg) # 消费程序 def consumption(q, number): while True: msg = q.get() if msg is None : break time.sleep(2) print('消费者%s爬取了%s' % (number, msg)) if __name__ == '__main__': q = Queue() # 生产者们 p1 = Process(target=production, args=(q, 1)) p2 = Process(target=production, args=(q, 2)) p3 = Process(target=production, args=(q, 3)) # 消费者们 c1 = Process(target=consumption, args=(q, 1)) c2 = Process(target=consumption, args=(q, 2)) p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() q.put(None) q.put(None) print("主线程")
以上生产者充当URL生产器,而消费者则为爬虫,消耗URL来爬取数据,这也正是上次爬虫博文中采用并发的方式