一、进程和线程概述
- 进程和线程
现在很多操作系统,例如:MAC OS X、UNIX、LINUX、Windows等,都是支持多任务的操作系统,而多任务,就和字面意思相同,执行多个任务。例如:边听歌编写代码边百度这样,同时有三个任务正在运行,并且还有很多任务在后台执行,只是在桌面没有显示而已
现在都是多核CPU,自然可以执行多任务,CPU执行代码都是顺序执行的,而以前的单核CPU也可以执行多任务,其中的原理就是:
操作系统轮流让各个任务交替执行,任务A执行0.01秒,然后切换到任务B,也执行0.01秒,然后再切换到任务C,再执行0.01秒,依次执行,表面看,每个任务都是交替执行的,但是,因为CPU的执行速度很快,感觉上来说就像是同时执行一样
虽然说因为CPU速度快,从而使得我们感觉单核CPU也能执行多任务,但是真正的并行执行多任务只能在多核CPU上实现,由于任务数量远远多于CPU的核心数量,所以操作系统也会自动的把很多任务轮流调度到每个核心上执行
对于操作系统来说,一个任务就是一个进程(Process),例如打开一个浏览器,就是启动一个浏览器进程,打开一个记事本,就是启动一个记事本进程。有些进程会同时干多件事情,例如打开一个word,可以同时进行打字、拼写检查、打印等多件事情。
想要在一个进程内部干多件事情,就需要同时运行多个子任务,这些子任务又叫做线程
由于每个进程至少要干一件事,所以一个进程至少会有一个线程,并且可以有多个线程,多个线程可以同时执行,多线程执行方式和多进程是一样的:
由操作系统在多个线程之间快速切换,让每个线程都短暂的交替运行,看起来就像同时执行一样,而真正执行多线程同样也需要多核CPU才能实现
- Python执行多任务
- 之前编写的Python程序都是单任务的进程,也就是只有一个线程,如果想要执行多任务怎么办,有三种解决方案:
- 多进程模式:启动多个进程,每个进程虽然只有一个线程,但是多个进程可以一起执行多个任务
- 多线程模式:启动一个进程,在单个进程内,创建多个线程,多个线程一起执行多个任务
- 多进程+多线程模式:启动多个进程,每个进程内启动多个线程,可以同时执行更多的任务,但是这种模型负载,实际很少采用
同时执行多任务时,任务之间都是有关联的,需要相互通信和协调
。例如:
任务A执行过程中必须要暂停,等待任务B执行完成之后任务A才能继续执行,任务C和任务D不能同时执行等
- 多进程、多线程的程序复杂度要远远高于之前的单进程单线程程序。因为复杂度高、调式困难,所以通常来说如果没有需求,是不会编写多任务程序的,但是有时必须要编写多任务才能实现需求,所以我们需要知道如何编写多任务的程序
小结:
线程是最小的执行单元,而进程由至少一个线程组成,如何调用进程和线程,这完全由操作系统决定,程序并不能决定什么时候执行,执行多长时间等
多进程和多线程的程序涉及到了同步、数据共享的问题,编写起来也更加复杂
- 注意:进程调度是操作系统决定的,千万不要在代码里假定哪个先执行
二、多进程
- fork()
Uinx、Linux操作系统提供了一个fork()系统调用函数,这个函数非常特殊,普通的函数调用,调用一次,返回一次,但是使用fork()函数调用,调用一次,返回两次,这是因为操作系统自动把当前进程复制了一份,也就是父进程复制了一份子进程,然后,分别在父进程和子进程内返回
子进程永远返回0,而父进程返回子进程的ID,这样做的原因是:
一个父进程可以fork出很多的子进程,所以,父进程需要记住每个子进程的ID,子进程只需要调用getppid()函数就可以拿到父进程的ID
Python的os模块封装了常见的系统调用,其中就包括fork,下面来看示例:
首先要知道:
os.getpid():可以获取当前进程的PID
os.getppid():可以获取当前进程的主进程的PID
#下面的代码只有在unix或者linux中才能正常运行,这里使用centos,mac的内核也是unix的一种,所以也可以执行 [root@centos-1 ~]# cat test.py #!/usr/bin/env python3 #-*- coding: utf-8 -*- import os print("父进程ID为:%s" % os.getpid()) pid = os.fork() if pid == 0: print('子进程ID为:%s,父进程ID为:%s' %(os.getpid(),os.getppid())) else: print('父进程ID %s 创建了子进程ID %s' %(os.getpid(),pid)) #执行: [root@centos-1 ~]# python test.py 父进程ID为:973 父进程ID 973 创建了子进程ID 974 子进程ID为:974,父进程ID为:973 #解析: 1、要看懂输出,就需要先认真看一下上面的概述,'使用fork()函数后,调用一次,返回两次,并且分别在父进程和子进程内返回',仔细读过这句话之后,我们可以知道,在'pid = os.fork()'之后,分成了两个进程分别执行之后的代码 2、从代码输出的第二行可以看出,代码先执行的是pid不等于0的语句,同样从上面的概述中可以知道,'子进程永远返回为0,父进程返回子进程的id',只有子进程才会返回0,所以,这里很明显是先执行的父进程,那么pid的值就等于子进程的id,也就是974,而os.getpid()可以得到当前进程的id也就是973,所以最终的输出为:'父进程ID 973 创建了子进程ID 974' 3、通过上面的解析后,最后一行输出就很好理解了,父进程执行完成、返回之后,开始执行子进程,因为子进程永远返回为0,所以pid为0,然后执行if判断为True的语句,os.getpid()得到当前进程id,也就是974,os.getppid()得到当前进程的父进程id,也就得到了973,最终输出:'子进程ID为:974,父进程ID为:973'
- 有了
fork
调用,一个进程在接到新任务时就可以复制一个子进程来处理新任务,常见的Apache服务器就是由父进程监听端口,每当收到新的http请求时,就会fork出子进程去处理新的http请求
- multiprocessing——process
由于Windows没有fork调用,所以上面的fork示例并不适用于windows,而如果想在windows上编写多进程的服务器程序,我们可以使用multiprocessing模块,multiprocessing模块是跨平台版本的多进程模块
multiprocessing模块提供了一个Process类来代表一个进程对象,下面来看示例:
在编写脚本之前,要知道:
process模块使用:
#作用: process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建 #使用: Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动) #参数: group:进程所属组,基本上不用,参数没有使用则值始终为None target:表示调用对象,即子进程将要执行的任务 args:表示调用对象的位置参数元组,例如 args=(1,2,'test',) kwargs:表示调用对象的字典,例如 kwargs={'name':'zhangsan','age':22} name:设置子进程的名称 #注意事项: 1、每个参数需要使用关键字方式来指定,例如 name='test'这样的,指定参数为指定的值 2、args指定的是给target参数传入的位置参数,是一个元组,要记得在末尾加上逗号 #方法: 在创建实例p后 p.start():启动进程,并调用该子进程中的p.run() p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法 p.join([timeout]):主线程等待实例p终止'(要注意主线程处于等待状态,但是p的子进程是处于运行状态的)',timeout是可选参数,指定超时时间,需要注意的是,p.join()只能终止p.start()开启的进程,而不能终止p.run开启的进程,通常用于进程之间的同步 p.terminate():用来杀死子进程的 p.is_alive():判断子进程是否存活,返回True为存活,False为已经关闭
#编写脚本,脚本在windows、linux、unix都能执行 #-*- coding: utf-8 -*- from multiprocessing import Process import os def run_proc(name): print("运行子进程,名称:%s,ID:%s" %(name,os.getpid())) if __name__ == "__main__": print("父进程ID:%s" % os.getpid()) p = Process(target=run_proc,args=('test',)) print('子进程开始执行') p.start() p.join() print('子进程执行结束') #执行输出 父进程ID:15460 子进程开始执行 运行子进程,名称:test,ID:15204 子进程执行结束 #解析 1、看过process的使用方法之后,这里就比较好理解了,p是创建的子进程实例,这个子进程会执行run_proc函数,并且传入参数'test' 2、p.start()开始执行p子进程,也就是执行run_proc函数,然后p.join()终止子进程,最后输出print
- multiprocessing——Pool
- 当需要启动大量的子进程时,我们可以使用进程池的方式批量创建
新进程
Pool类可以提供指定数量的进程
,当有新的请求提交到Pool池中时,如果池还没有满,那么就会创建一个新的进程来执行请求,如果Pool池满了,请求就会等待,直到池中有进程结束,才会创建新的进程来处理请求- Pool类的方法:
pool = multiprocessing.Pool(processes = 3) #最多允许3个进程
apply()
:此函数用于传递不定参数,主进程会被阻塞直到函数执行结束,不建议使用,3.x之后不再出现,函数参数:
apply(func,args=(),kwds={})
apply_async()
:与上面的apply()方法用法一致,不同的是,此函数是非阻塞的并且支持结果返回后进行回调,函数参数:
apply_async(func[,args=()[,kwds={}[,callback=None]]])
map()
:这个map方法和内置的map函数用法差不多,此方法会使进程阻塞直到结果返回,函数参数:
map(func,iterable,chunksize=None) #虽然iterable是迭代器,但是实际使用时,必须
map_async()
:和上面的map方法用法一致,但是此方法是非阻塞的,函数参数:
map_async(func,iterable,chunksize,callback)
close()
:关闭进程池,不在接收新的任务terminal()
:结束工作进程,不再处理未处理的任务join()
:主进程阻塞,等待子进程的退出,join方法需要再close()或者terminate()之后使用
- 下面来看示例:
#-*- coding: utf-8 -*- from multiprocessing import Pool import os,time,random def time_test(name): print('运行进程名称 %s,ID %s' %(name,os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('执行进程 %s ID %s 所使用的时间是 %0.2f 秒' %(name,os.getpid(),(end - start))) if __name__ == '__main__': print('主进程ID %s'% os.getpid()) p = Pool(processes=4) #这里指定进程最多4个 for i in range(5): p.apply_async(time_test,args=('test',)) print('等待所有进程执行完毕') p.close() p.join() print('所有进程执行完毕') #执行返回结果: 主进程ID 10708 等待所有进程执行完毕 运行进程名称 test,ID 12620 运行进程名称 test,ID 14176 运行进程名称 test,ID 16088 运行进程名称 test,ID 7744 执行进程 test ID 7744 所使用的时间是 0.64 秒 运行进程名称 test,ID 7744 执行进程 test ID 16088 所使用的时间是 0.91 秒 执行进程 test ID 12620 所使用的时间是 1.85 秒 执行进程 test ID 14176 所使用的时间是 2.17 秒 执行进程 test ID 7744 所使用的时间是 2.94 秒 所有进程执行完毕 #解析: 在看过process后,这里也比较好理解,但是可以看到输出信息中,在第四次运行time_test函数后,有一个进程先返回了,然后才执行第五次,这是因为当pool线程池数量满了之后,新进来的请求需要等待前面有进程处理完之后,再处理新的请求,因为设置了进程最多4个,而for循环的调用是5次,所以第5次请求就需要等待有进程执行完毕后才会执行新请求 '可以发现,执行第五次的进程id,是复用了一开始创建的进程'
- 注意:Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前,必须先调用close(),调用完close()之后就不能再添加新的进程了
- subprocess
很多时候,子进程并不是自身,而是一个外部进程,我们创建了子进程后,还需要控制子进程的输入和输出
subprocess模块可以启动一个子进程,然后控制子进程的输入和输出
下面来看关于subprocess的一些方法:
subprocess模块允许我们启动一个新进程,并连接到他们的输入、输出、错误管道,从而获取返回值
- subprocess.run()方法:Python 3.5中新增的函数。执行指定的命令,等待命令执行完成后返回一个包含执行结果的CompletedProcess类的实例,参数:
subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, shell=False, timeout=None, check=False, universal_newlines=False)
- subprocess.call()方法:执行指定的命令,返回命令执行状态,其功能类似于os.system(cmd),参数:
subprocess.call(args, *, stdin=None, stdout=None, stderr=None, shell=False, timeout=None)
- subprocess.check_call()方法:Python 2.5中新增的函数。 执行指定的命令,如果执行成功则返回状态码,否则抛出异常。其功能与subprocess.run(…, check=True)相同,参数:
subprocess.check_call(args, *, stdin=None, stdout=None, stderr=None, shell=False, timeout=None)
- subprocess.check_output()方法:Python 2.7中新增的的函数。执行指定的命令,如果执行状态码为0则返回命令执行结果,否则抛出异常,参数:
subprocess.check_output(args, *, stdin=None, stderr=None, shell=False, universal_newlines=False, timeout=None)
- subprocess.getoutput()方法:接收字符串格式的命令,执行命令并返回执行结果,其功能类似于os.popen(cmd).read()和commands.getoutput(cmd),参数:
subprocess.getstatusoutput(cmd)
- subprocess.getstatusoutput()方法:执行cmd命令,返回一个元组,元组中有命令执行状态, 命令执行结果输出,其功能类似于commands.getstatusoutput(),参数:
subprocess.getoutput(cmd)
- 参数解析:
1、'args': 要执行的shell命令,默认应该是一个字符串序列,如['df', '-Th']或('df', '-Th'),也可以是一个字符串,如'df -Th',但是想要直接使用linux命令的话,需要把shell参数的值设置为True 2、'shell': 如果shell为True,那么指定的命令将通过shell执行。如果我们需要访问某些shell的特性,如管道符、文件名通配符、环境变量扩展功能,这将是非常有用的。当然,python本身也提供了许多类似shell的特性的实现,如glob、fnmatch、os.walk()、os.path.expandvars()、os.expanduser()和shutil等 3、'check': 如果check参数的值是True,且执行命令的进程以非0状态码退出,则会抛出一个CalledProcessError的异常,且该异常对象会包含 参数、退出状态码、以及stdout和stderr(如果它们有被捕获的话) 4、'stdout, stderr:input':`该参数是传递给Popen.communicate()`,通常该参数的值必须是一个字节序列,如果universal_newlines=True,则其值应该是一个字符串 ---------------------------------------------------------------------- - run()函数默认不会捕获命令执行结果的正常输出和错误输出,如果我们向获取这些内容需要传递subprocess.PIPE,然后可以通过返回的CompletedProcess类实例的stdout和stderr属性或捕获相应的内容 - call()和check_call()函数返回的是命令执行的状态码,而不是CompletedProcess类实例,所以对于它们而言stdout和stderr不适合赋值为subprocess.PIPE - check_output()函数默认就会返回命令执行结果,所以不用设置stdout的值,如果我们希望在结果中捕获错误信息,可以执行stderr=subprocess.STDOUT - `subprocess.PIPE表示为进程创建新的管道,subprocess.DEVNULL表示使用os.devnull(代表当前系统的回收站)` ---------------------------------------------------------------------- 5、'universal_newlines': 该参数影响的是输入与输出的数据格式,比如它的值默认为False,此时stdout和stderr的输出是字节序列;当该参数的值设置为True时,stdout和stderr的输出是字符串
通过使用上面的被封装后的高级函数可以很方便的完成一些常见需求,但是由于subprocess模块底层的进程创建和管理都是由Popen类的处理的,所以当我们无法通过上面的高级函数实现一些不常见的功能时,就可以通过subprocess.Popen类提供的API来完成这些需求
subprocess.Popen类用于在一个新的进程中执行一个子程序,下面来看Popen类的相关信息
- subprocess.Popen的构造函数:
class subprocess.Popen(args, bufsize=-1, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None, universal_newlines=False, startup_info=None, creationflags=0, restore_signals=True, start_new_session=False, pass_fds=())
- 参数说明:
1、'args': 要执行的shell命令,可以是字符串,也可以是命令各个参数组成的序列。当该参数的值是一个字符串时,该命令的解释过程是与平台相关的,因此通常建议将args参数作为一个序列传递 2、'bufsize': 指定缓存策略,0表示不缓冲,1表示行缓冲,其他大于1的数字表示缓冲区大小,负数 表示使用系统默认缓冲策略。 3、'stdin, stdout, stderr': 分别表示程序标准输入、输出、错误句柄 4、'preexec_fn': 用于指定一个将在子进程运行之前被调用的可执行对象,只在Unix平台下有效 5、'close_fds': 如果该参数的值为True,则除了0,1和2之外的所有文件描述符都将会在子进程执行之前被关闭 6、'shell': 该参数用于标识是否使用shell作为要执行的程序,如果shell值为True,则建议将args参数作为一个字符串传递而不要作为一个序列传递 7、'cwd': 如果该参数值不是None,则该函数将会在执行这个子进程之前改变当前工作目录 8、'env': 用于指定子进程的环境变量,如果env=None,那么子进程的环境变量将从父进程中继承。如果env!=None,它的值必须是一个映射对象 9、'universal_newlines': 如果该参数值为True,则该文件对象的stdin,stdout和stderr将会作为文本流被打开,否则他们将会被作为二进制流被打开 10、'startupinfo和creationflags': 这两个参数只在Windows下有效,它们将被传递给底层的CreateProcess()函数,用于设置子进程的一些属性,如主窗口的外观,进程优先级等
1、'Popen.poll()':用于检查子进程(命令)是否已经执行结束,没结束返回None,结束后返回状态码 2、'Popen.wait(timeout=None)':等待子进程结束,并返回状态码;如果在timeout指定的秒数之后进程还没有结束,将会抛出一个TimeoutExpired异常 3、'Popen.communicate(input=None, timeout=None)':该方法可用来与进程进行交互,比如发送数据到stdin,从stdout和stderr读取数据,直到到达文件末尾 ------------------------------------------------------------------------------------- #Popen.communicate()方法注释: - 该方法中的可选参数 input 应该是将被发送给子进程的数据,或者如没有数据发送给子进程,该参数应该是None。input参数的数据类型必须是字节串,如果universal_newlines参数值为True,则input参数的数据类型必须是字符串 - 该方法返回一个元组(stdout_data, stderr_data),这些数据将会是字节串或字符串,如果universal_newlines的值为True - 如果在timeout指定的秒数后该进程还没有结束,将会抛出一个TimeoutExpired异常。捕获这个异常,然后重新尝试通信不会丢失任何输出的数据。但是超时之后子进程并没有被杀死,为了合理的清除相应的内容,一个好的应用应该手动杀死这个子进程来结束通信 - 需要注意的是,这里读取的数据是缓冲在内存中的,所以,如果数据大小非常大或者是无限的,就不应该使用这个方法 ----------------------------------------------------------------------------------------- 4、'Popen.send_signal(signal)':发送指定的信号给这个子进程 5、'Popen.terminate()':停止该子进程 6、'Popen.kill()':杀死该子进程
下面来看示例,执行nslookup www.baidu.com
命令
#-*- coding: utf-8 -*- import subprocess print('$ nslookup www.baidu.com') r = subprocess.call(['nslookup','www.baidu.com']) #执行指定命令,返回状态 print(r) #输出: $ nslookup www.baidu.com 服务器: UnKnown Address: 10.10.11.41 非权威应答: 名称: www.a.shifen.com Addresses: 110.242.68.3 110.242.68.4 Aliases: www.baidu.com 0
如果子进程需要输入,可以通过communicate()
方法:
#-*- coding: utf-8 -*- import subprocess print('$ nslookup') p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, err = p.communicate(b'www.baidu.com\nexit') #使用\n换行符,可以执行多条命令 print(output.decode('unicode_escape')) #指定字符集,原文中是utf-8,但是windows报错了,所以改成unicode print('Exit code:', p.returncode) #输出: $ nslookup ĬÈÏ·þÎñÆ÷: UnKnown #因为是unicode所以是乱码 Address: 10.10.11.41 > ·þÎñÆ÷: UnKnown Address: 10.10.11.41 Ãû³Æ: www.a.shifen.com Addresses: 110.242.68.4 110.242.68.3 Aliases: www.baidu.com > exit Exit code: 0 #解析: 上面的代码相当于执行了'nslookup',然后手动输入'www.baidu.com'、'exit'