Python 网络编程(二)
黏包
socket 黏包问题
什么是粘包:粘包指的是数据和数据之间没有明确的分界线,导致不能正确读取数据
应用程序无法直接操作硬件,应用程序想要发送数据则必须将数据交给操作系统,而操作系统需要需要同时为所有应用程序提供数据传输服务,也就意味着,操作系统不可能立马就能将应用程序的数据发送出去,就需要为应用程序提供一个缓冲区,用于临时存放数据,具体流程如下:
发送方:
当应用程序调用 send 函数时,应用程序会将数据从应用程序拷贝到操作系统缓存,再由操作系统从缓冲区读取数据并发送出去
接收方:
对方计算机收到数据也是操作系统先收到,至于应用程序何时处理这些数据,操作系统并不清楚,所以同样需要将数据先存储到操作系统的缓冲区中,当应用程序调用 recv 时,实际上是从操作系统缓冲区中将数据拷贝到应用程序的过程
上述过程对于 TCP 与 UDP 都是相同的不同之处在于:
UDP:
UDP 在收发数据时是基于数据包的,即一个包一个包的发送,包与包之间有着明确的分界,到达对方操作系统缓冲区后也是一个一个独立的数据包,接收方从操作系统缓冲区中将数据包拷贝到应用程序这种方式存在的问题:
发送方发送的数据长度每个操作系统会有不同的限制,数据超过限制则无法发送
接收方接收数据时如果应用程序的提供的缓存容量小于数据包的长度将造成数据丢失,而缓冲区大小不可能无限大
这意味着 UDP 根本不会粘包,但是会丢数据,不可靠。
TCP:
当我们需要传输较大的数据,或需要保证数据完整性时,最简单的方式就是使用 TCP 协议了,与 UDP 不同的是,TCP 增加了一套校验规则来保证数据的完整性,会将超过 TCP 包最大长度的数据拆分为多个 TCP 包,并在传输数据时为每一个 TCP 数据包指定一个顺序号,接收方在收到 TCP 数据包后按照顺序将数据包进行重组,重组后的数据全都是二进制数据,且每次收到的二进制数据之间没有明显的分界基于这种工作机制 TCP 在三种情况下会发送粘包问题
当单个数据包较小时接收方可能一次性读取了多个包的数据
当整体数据较大时接收方可能一次仅读取了一个包的一部分内容
另外 TCP 协议为了提高效率,增加了一种优化机制,会将数据较小且发送间隔较短的数据合并发送,该机制也会导致发送方将两个数据包粘在一起发送
意味着:TCP 传输数据是可靠的,但是会粘包。
好的,我们先看一段代码
服务器端
# coding=utf-8 import socket server_socket=socket.socket(socket.AF_INET,socket.SOCK_STREAM) ip_port=('',8080) server_socket.bind(ip_port) server_socket.listen(5) conn,deer=server_socket.accept() data1=conn.recv(1024) data2=conn.recv(1024) print(data1) print(data2) conn.close() server_socket.close()
客户端代码
# -*- coding: utf-8 -*- import socket client_socket=socket.socket(socket.AF_INET,socket.SOCK_STREAM) ip_port=('192.168.5.180',8080) client_socket.connect(ip_port) client_socket.send('helloworld'.encode('utf-8')) client_socket.send('sb'.encode('utf-8')) client_socket.close()
直接上我们的运行结果
客户端发送了两个数据包,但是在服务器端接受 data1 的时候,把这两个包的数据全部接受了,这种显现就是黏包。其实如果服务器点代码改成 recv(2) 也会造成粘(黏)包。客户端发了一段数据,服务端只收了一小部分,也产生粘包。
服务器端代码:
import socket import time server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.bind(('', 9999)) # 0-65535:0-1024给操作系统使用 server_socket.listen(5) conn, addr = server_socket.accept() print('connect by ', addr) res1 = conn.recv(2) # 第一没有接收完整 print('第一次', res1) time.sleep(6) res2 = conn.recv(10) # 第二次会接收旧数据,再收取新的 print('第二次', res2) conn.close() server_socket.close()
客户端代码:
import socket import time client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect(('192.168.5.180', 9999)) client.send('mashibin'.encode('utf-8')) time.sleep(5) client.send('xiaobin'.encode('utf-8')) client.close()
黏包成因
所谓粘包问题主要还是因为:
接收方不知道消息之间的界限,不知道一个消息要提取多少字节的数据所造成的。(服务器端出现黏包)
tcp 在发送数据少且间隔时间短的数据时,会将几条和并一起发送。(客户端出现黏包)
黏包的解决办法
目前比较合理的处理方法是:为字节流加上一个报头,告诉发送的字节流总大小,然后接收端来一个死循环接收完所有数据。用 struct 将序列化后的数据长度打包成4个字节(4个字节完全够用)。
使用 struct 模块可以用于将 Python 的值根据格式符,转换为 C 语言的结构(byte 类型),便于数据流传输。
案例:客户端传送一个文件到服务器端(基于 TCP 协议),同时要解决黏包问题。
客户端代码:
import socket import os import struct client = socket.socket() client.connect(("192.168.5.180",9999)) file_path = "new.mp4" f = open(file_path,mode="rb") # 在发送数据前先发送报头 size = os.path.getsize(file_path) # 定制包头 i为4个字节,所以接收方为四个字节,这个大小并不是输入的大小,而是文件的大小 header = struct.pack('!i',size) #使用struct,直接将int转为二进制型数据传输,对方使用struct解包 client.send(header) # 发报头 # 发数据 while True: data = f.read(1024) if not data: break # 发送给服务器 client.send(data) print("上传完成...") client.close()
服务器端代码:
import socket import struct server = socket.socket() server.bind(("",9999)) server.listen() conn,addr = server.accept() f = open("接收到的文件",mode="wb") header_data = conn.recv(4) print(struct.unpack("!i",header_data)) size = struct.unpack("!i",header_data)[0] recv_size = 0 while recv_size < size: data = conn.recv(1024) f.write(data) recv_size += len(data) print("接收完成...") conn.close() server.close()
执行之后的结果:
总结:客户端把数据长度封装成一个固定大小的数据,这时服务端就可以指定读取固定大小的内容,不会读取数据的内容,服务端只要根据数据长度再来接收数据内容就好了,所以客户端连续两次发数据(文件),不会粘包,因为服务<器端每次接收都只接收了本次该接收的数据。
多线程与多进程
现代操作系统比如 Mac OS X,Linux,Windows 等,都是支持“多任务”的操作系统什 么叫“多任务”呢?简单地说,就是操作系统可以同时运行多个任务。打个比方,你一边在用逛淘宝,一边在听音乐,一边在用微信聊天,这就是多任务,至少同时有 3 个任务正在运行。
还有很多任务悄悄地在后台同时运行着,只是桌面上没有显示而已。现在,多核 CPU 已经非常普及了,但是,即使过去的单核 CPU,也可以执行多任务。由 于 CPU 执行代码都是顺序执行的,那么,单核 CPU 是怎么执行多任务的呢?
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wWXcHvpb-1644045923276)(https://s2.loli.net/2022/02/03/TdKynfJmLR2oYCN.png)]
我们知道,在一台计算机中,我们可以同时打开许多软件,比如同时浏览网页、听音乐、打字等等,看似非常正常。但仔细想想,为什么计算机可以做到这么多软件同时运行呢?这就涉及到计算机中的两个重要概念:多进程和多线程了。
进程和线程都是操作系统中的重要概念,既相似,又不同
对于一般的程序,可能会包含若干进程;而每一个进程又可能包含多个同时执行的线程。进程是资源管理的 小单位,而线程则是程序执行的 小单位。
进程和线程的概念
对于操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动 一个浏览器进程,打开记事本就启动了一个记事本进程,打开两个记事本就启动了两个记事本进程,打开 一个 Word 就启动了一个 Word 进程。
进程:直观地说,进程就是正在执行的程序,为多任务操作系统中执行任务的基本单元,是包含了程序指令和相关资源的集合。在 Windows 下,可以打开任务管理器,在进程标签栏中就可以看到当前计算机中正在运行的进程,操作系统隔离各个进程可以访问的地址空间。如果进程间需要传递信息,则需要使用进程间通信或者其他方式,非常不方便而且消耗 CPU 时间片段。为了能够更好地支持信息共享和减少切换开销,从进程中演化出了线程。
线程:线程是进程的执行单元。对于大多数程序来说,可能只有一个主线程。但是,为了能够提高效率,有些程序会采用多线程,在系统中所有的线程看起来都是同时执行的。例如,现在的多线程网络下载程序中,就使用了这种线程并发的特性,程序将欲下载的文件分成多个部分,然后同时进行下载,从而加快速度。
进程和线程的对比
明确进程和线程的区别,这一点对于使用 Python 编程是非常重要的。一般的,进程是重量级的。具体包括进程映像的结构、执行细节以及进程间切换的方法。在进程中,需要处理的问题包括进程间通信、临界区管理和进程调度等。这些特性使得新生成一个进程的开销比较大。而线程刚好相反,它是轻量级的。线程之间共享许多资源,容易进行通信,生成一个线程的开销较小。但是使用线程会有死锁、数据同步和实现复杂等问题。
并发编程解决方案
启动多个进程,每个进程虽然只有一个线程,但多个进程可以一块执行多个任务
启动一个进程,在一个进程内启动多个线程,这样,多个线程也可以一块执行多个任务
启动多个进程,每个进程再启动多个线程,这样同时执行的任务就更多了,当然这种模型更复杂,实际很少采用。
由于 Python 语言使用了全局解释器锁(Global Interpretor Lock,GIL)和队列模块,其在线程实现的复杂度上相对于其他语言来说要低得多。需要注意的是,由于 GIL 的存在,所以 Python 解释器并不是线程安全的。因为当前线程必须持有这个全局解释器锁,才可以安全地访问 Python 对象。虽然使用 GIL 使得 Python 不能够很好地利用多 CPU 优势,但是现在还没有比较好的办法来代替它,因为去掉 GIL 会带来许多问题。所以,针对 I/O 受限的程序,如网络下载类,可以使用多线程来提高程序性能。而对于 CPU 受限的程序,如科学计算类,使用多线程并不会带来效率的提升。这个时候,建议使用进程或者混合进程和线程的方法来实现。
进程的开发
在前面提到,Python 对于进程和线程处理都有很好的支持。接下来介绍在 Python 语言的标准库中相关的模块:
创建进程
subprocess 模块
subprocess 早在2.4版本引入。用来生成子进程,并可以通过管道连接他们的输入/输出/错误,以及获得他们的返回值。用来替换多个旧模块和函数
运行 python 的时候,我们都是在创建并运行一个进程,(linux 中一个进程可以 fork 一个子进程,并让这个子进程 exec 另外一个程序)。在 python 中,我们通过标准库中的 subprocess 包来 fork 一个子进程,并且运行一个外部的程序。subprocess 包中定义有数个创建子进程的函数,这些函数分别以不同的方式创建子进程,所欲我们可以根据需要来从中选取一个使用。另外 subprocess 还提供了一些管理标准流(standard stream)和管道(pipe)的工具,从而在进程间使用文本通信。
通俗地说就是通过这个模块,你可以在 Python 的代码里执行操作系统级别的命令,比如“ipconfig”、“du -sh”等等。 subprocess 模块替代了一些老的模块和函数,比如:
os.system
os.spawn*
subprocess 过去版本中的call,check_call 和 check_output 已经被 run 方法取代了。run 方法为3.5版本新增。**大多数情况下,推荐使用 run 方法调用子进程,执行操作系统命令。**在更高级的使用场景,你还可以使用 Popen 接口。其实 run() 方法在底层调用的就是 Popen 接口。
run
subprocess 模块首先推荐使用的是它的 run 方法,更高级的用法可以直接使用 Popen 接口。
注意,run() 方法返回的不是我们想要的执行结果或相关信息,而是一个 CompletedProcess 类型对象。
subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, capture_output=False, shell=False, cwd=None, timeout=None, check=False, encoding=None, errors=None, text=None, env=None, universal_newlines=None)
args:表示要执行的命令。必须是一个字符串,字符串参数列表。
args:表示要执行的命令。必须是一个字符串,字符串参数列表。
stdin、stdout 和 stderr:子进程的标准输入、输出和错误。其值可以是 subprocess.PIPE、 subprocess.DEVNULL、一个已经存在的文件描述符、已经打开的文件对象或者 None。subprocess.PIPE 表示为子进程创建新的管道。subprocess.DEVNULL 表示使用 os.devnull。默认使用的是 None,表示什么都不做。另外,stderr 可以合并到 stdout 里一起输出。
timeout:设置命令超时时间。如果命令执行时间超时,子进程将被杀死,并弹出 TimeoutExpired 异常。
check:如果该参数设置为 True,并且进程退出状态码不是 0,则弹 出 CalledProcessError 异常。
encoding: 如果指定了该参数,则 stdin、stdout 和 stderr 可以接收字符串数据,并以该编码方式编码。否则只接收 bytes 类型的数据。
shell:如果该参数为 True,将通过操作系统的 shell 执行指定的命令。
run 方法调用方式,返回 CompletedProcess 实例,和直接 Popen 差不多,实现是一样的,实际也是调用 Popen,与 Popen 构造函数大致相同,看一个例子
# coding = utf-8 # 文件名: subprocess_cmd.py # 导入模块 subprocess import subprocess # 这里我们使用了这么几个参数args,encoding,shell # dir 在windows命令行中是遍历这个目录下的文件 # Linux可以使用 ls 命令 runcmd = subprocess.run(['dir', 'C:\\'], encoding='utf8', shell=True) # 打印结果 print(runcmd)
看看运行结果
驱动器 C 中的卷没有标签。 卷的序列号是 0AA3-2019 C:\ 的目录 2018/03/16 10:16 <DIR> EFI 2020/03/18 17:33 <DIR> Intel 2020/07/13 11:10 <DIR> LeakHotfix 2020/07/15 16:50 <DIR> Program Files 2020/07/18 17:40 <DIR> Program Files (x86) 2020/06/22 11:51 <DIR> QMDownload 2020/03/18 17:43 <DIR> Users 2020/07/18 16:55 <DIR> Windows 0 个文件 0 字节 8 个目录 90,635,427,840 可用字节 CompletedProcess(args=['dir', 'C:\\'], returncode=0)
这里我们看到不仅执行了命令,并且返回了一个 CompletedProcess 实例,其中returncode: 执行完子进程状态,通常返回状态为0则表明它已经运行完毕,若值为负值 “-N”,表明子进程被终。
定义一个函数,写一个基于windows cmd命令行的实例
# coding=utf-8 # 文件名:subprocess_cmd_1.py # 导入模块subprocess import subprocess # 定义一个函数run_cmd进行subprocess.run操作 def run_cmd(command): # subprocess.run实例化一个变量return_cmd,需要注意一点, # 因为做subprocess.PIPE 有字符需要解码,我这里的encoding使用的是GBK, # 这是Windows默认常用的编码字符,因为存在部分中文字符存在异常,当然可以修改,Linux下的是utf8,注意区分 return_cmd = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='GB18030', shell=True,) # 判断实例化属性是否有存在异常returncode if return_cmd.returncode == 0: print("success:") print(return_cmd) else: print("error:") print(return_cmd) run_cmd(["dir", "C:\\"]) # 序列参数 run_cmd("exit 1") # 字符串参数
执行结果:
success: CompletedProcess(args=['dir', 'C:\\'], returncode=0, stdout=' 驱动器 C 中的卷没有标签。 卷的序列号是 0AA3-2019 C:\\ 的目录 2018/03/16 10:16 <DIR> EFI 2020/03/18 17:33 <DIR> Intel 2020/07/13 11:10 <DIR> LeakHotfix 2020/07/15 16:50 <DIR> Program Files 2020/07/18 17:40 <DIR> Program Files (x86) 2020/06/22 11:51 <DIR> QMDownload 2020/03/18 17:43 <DIR> Users 2020/07/18 16:55 <DIR> Windows 0 个文件 0 字节 8 个目录 89,087,983,616 可用字节 ', stderr='') error: CompletedProcess(args='exit 1', returncode=1, stdout='', stderr='')
我们看到对应的两条命令是没有问题的,成功与错误的信息与returncode的代码显示提示一致,我们的结果也相应地打印处理出来,详解一下参数
args 启动进程的参数,通常是个列表或字符串。
returncode 进程结束状态返回码。0表示成功状态。
stdout 获取子进程的stdout。通常为bytes类型序列,None表示没有捕获值。如果你在调用run()方法时,设置了参数stderr=subprocess.STDOUT,则错误信息会和stdout一起输出,此时stderr的值是None。
stderr 获取子进程的错误信息。通常为bytes类型序列,None表示没有捕获值。
check_returncode() 用于检查返回码。如果返回状态码不为零,弹出 CalledProcessError 异常。
subprocess.DEVNULL用于传递给stdout、stdin和stderr参数。表示使用 os.devnull 作为参数值。
subprocess.PIPE管道,可传递给stdout、stdin和stderr参数。
subprocess.STDOUT特殊值,可传递给stderr参数,表示stdout和stderr合并输出。
这里再详细介绍一下args与shell的参数
args参数可以接收一个类似 ‘ls -la’ 的字符串,也可以传递一个类似 [‘ls’, ‘/b’] 的字符串分割列表
shell参数默认为False,设置为True的时候表示使用操作系统的shell执行命令
一般在命令行里面运行,或者是Linux里面使用时候会默认设置为shell=True
而且在Linux的环境中,args参数为字符串是,shell必须为True
而在Windows的环境中,args参数不论是字符串还是列表,shell建议为True
但是不是所有的操作系统命令都像‘dir’或者‘ipconfig’那样单纯地返回执行结果,还有很多像‘python’这种交互式的命令,如果需要输入点什么,然后它返回执行的结果。使用run()方法怎么向stdin里输入?
错误代码示范
import subprocess ret = subprocess.run("python", stdin = subprocess.PIPE, stdout = subprocess.PIPE, shell = True) ret.stdin = "print('haha')" # 错误的用法 print(ret)
这样是不行的,ret作为一个 CompletedProcess 对象,根本没有stdin属性。那怎么办呢?前面说了,run()方法的stdin参数可以接收一个文件句柄。比如在一个 1.txt 文件中写入 print(‘hello,python’) 。然后参考下面的使用方法
>>> import subprocess >>> fd = open("D:\\1.txt") >>> ret = subprocess.run("python", stdin=fd, stdout=subprocess.PIPE, shell=True) >>> print(ret.stdout.decode('utf-8')) hello,python >>> fd.close() >>>
这样做,虽然可以达到目的,但是很不方便,也不是以代码驱动的方式。这个时候,我们可以使用Popen类。
Popen
Popen 是 subprocess的核心,子进程的创建和管理都靠它处理。
用法和参数与run()方法基本类同,但是注意哦,
它的返回值是一个 Popen 对象,而不是 CompletedProcess 对象。
subprocess模块中定义了一个Popen类,通过它可以来创建进程,并与其进行复杂的交互。查看一下它的构造函数:
构造函数:
class subprocess.Popen(args, bufsize=-1, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0,restore_signals=True, start_new_session=False, pass_fds=(), * , encoding=None, errors=None)
常用的参数有:
args:shell命令,可以是字符串或者序列类型(如:list,元组)
bufsize:缓冲区大小。当创建标准流的管道对象时使用,默认-1。 0:不使用缓冲区 1:表示行缓冲,仅当 universal_newlines=True时可用,也就是文本模式 正数:表示缓冲区大小 负数:表示使用系统默认的缓冲区大小。
stdin, stdout, stderr:分别表示程序的标准输入、输出、错误句柄
preexec_fn:只在 Unix 平台下有效,用于指定一个可执行对象(callable object),它将在子进程运行之前被调用
shell:如果该参数为 True,将通过操作系统的 shell 执行指定的命令。 cwd:用于设置子进程的当前目录。
env:用于指定子进程的环境变量。如果 env = None,子进程的环境变量将从父进程中继承。
创建一个子进程,然后执行一个简单的命令:
# coding=utf-8 # 文件名:subprocess_cmd.py # 导入模块subprocess import subprocess # 这里我们使用了这么几个参数args,encoding,shell # dir 在windows命令行中是遍历这个目录下的文件 # Linux可以使用 ls 命令 runcmd = subprocess.Popen(['dir', 'C:\\'], encoding='utf8', shell=True) # 打印结果 print(runcmd)
查看运行结果:
<subprocess.Popen object at 0x00000295D613C2C8> 驱动器 C 中的卷没有标签。 卷的序列号是 0AA3-2019 C:\ 的目录 2018/03/16 10:16 <DIR> EFI 2020/03/18 17:33 <DIR> Intel ......
sub_popen
Popen对象的stdin、stdout和stderr是三个文件句柄,可以像文件那样进行读写操作。而且我们看到其返回值是一个Popen的对象,
>>> import subprocess >>> ret = subprocess.Popen("python", stdout=subprocess.PIPE, stdin=subprocess.PIPE, shell=True) >>> ret.stdin.write(b"import os\n") 10 >>> ret.stdin.write(b"print(os.environ)") 17 >>> ret.stdin.close() >>> out = ret.stdout.read().decode("GBK") >>> ret.stdout.close() >>> print(out) environ({ 'ALLUSERSPROFILE': 'C:\\ProgramData', '......', 'USERPROFILE': 'C:\\Users\\Administrator.SC-202003181819', 'WINDIR': 'C:\\Windows' })
这里看到,通过 s.stdin.write() 可以写入数据,而 s.stdout.read() 则能输出数据。自由度在交互程序,进行交互信息,完成数据的整体提取或写入,这些事Popen类模块提供的比较多的方法
下面是其他的一些比较常用的方法
poll(): 检查进程是否终止,如果终止返回 returncode,否则返回 None。
wait(timeout): 等待子进程终止。
communicate(input,timeout): 和子进程交互,发送和读取数据。
send_signal(singnal): 发送信号到子进程 。
terminate(): 停止子进程,也就是发送SIGTERM信号到子进程。
kill(): 杀死子进程。发送 SIGKILL 信号到子进程。
multiprocessing模块
multiprocessing 是一个用与 threading 模块相似API的支持产生进程的包。 multiprocessing 包同时提供本地和远程并发,使用子进程代替线程,有效避免 Global Interpreter Lock 带来的影响。因此, multiprocessing 模块允许程序员充分利用机器上的多个核心。Unix 和 Windows 上都可以运行。multiprocessing 模块还引入了在threading 模块中没有类似物的API。这方面的一个主要例子是 Pool 对象,它提供了一种方便的方法,可以跨多个输入值并行化函数的执行,跨进程分配输入数据(数据并行)。
multiprocessing模块的功能众多,支持子进程,通信,共享数据,执行不同形式的同步。为些它提供了Process、Queue、Pipe、Lock等组件
注意:使用 multiprocessing模块 if __name__ == '__main__' 部分是必需的,这样避免避免共享状态,避免杀死进程等。总而言之:这是官方规定的编程习惯。所以我们有两个方式创建进程:
第一种:函数包装:使用一个子进程调用某一个函数
# -*- coding: UTF-8 -*- # 文件名:Process_a.py # 导入模块 from multiprocessing import Process import os from time import sleep, time def test1(name): ''' 测试进程 :param name: 进程对象 :return: 无返回值 ''' print("当前进程的ID", os.getpid()) print("父进程的ID", os.getppid()) print("当前进程的名字:", name) # 休息3秒 sleep(3) # 入口 if __name__ == '__main__': start = time() # 创建多个子进程,并且把这些子进程放入列表中 process_list = [] print("主进程的ID", os.getpid()) for i in range(10): # args:表示被调用对象的位置参数元组,这里Process就是属于Process类 p = Process(target=test1, args=('process-%s' % i,)) # 开始进程 p.start() process_list.append(p)
既然要等到子进程结束后再执行父进程的后续部分,那么是不是感觉到这样多进程就没什么用了?其实不然,一般情况下我们的父进程是不会执行任何其它操作的,它会创建多个子进程来进行任务的处理。当这些子进程全部结束完成后,我们再关闭我们的父进程。
注意: join是等待当前的进程结束
第二种:类包装:自定义一个Process进程类,该类中的run函数由一个子进程调用执行继承Process类,重写run方法就可以了
# -*- coding: UTF-8 -*- # 文件名:Process_b.py # 导入模块 from multiprocessing import Process import os from time import sleep, time # 自定义一个进程类 继承Process类 class MyProcess(Process): def __init__(self, name): Process.__init__(self) self.name = name def run(self): ''' 重写run方法 :return: 无返回值 ''' print("当前进程的ID", os.getpid()) print("父进程的ID", os.getppid()) print("当前进程的名字:", self.name) sleep(3) # 入口 if __name__ == '__main__': print("主进程ID", os.getpid()) # 返回当前时间的时间戳 start = time() process_list = [] for i in range(10): # args:表示被调用对象的位置参数元组 p = MyProcess("process-%s" % i) # 开始进程 p.start() process_list.append(p) for p in process_list: # 我们一般都会需要父进程等待子进程结束再执行父进程后面的代码,需要加join,等待所有的子进程结束 p.join() # 计算时间 每个子进程开始至接受结束运行时间,浮点秒数 end = time() - start print(end)
总结:
使用进程优点:
可以使用计算机多核,进行任务的并发执行,提高执行效率
运行不受其他进程影响,创建方便
空间独立,数据安全
使用进程缺点:
进程的创建和删除消耗的系统资源较多
全局变量在多个进程中不能共享
在子进程中修改全局变量对父进程中的全局变量没有影响。因为父进程在创建子进程时对全局变量做了一个备份,父进程中的全局变量与子进程的全局变量完全是不同的两个变量。全局变量在多个进程中不能共享。
进程间的通信
Python 提供了多种实现进程间通信的机制,主要有以下 2 种:
Python multiprocessing 模块下的 Queue 类,提供了多个进程之间实现通信的诸多方法
Pipe,又被称为“管道”,常用于实现 2 个进程之间的通信,这 2 个进程分别位于管道的两端
Queue 实现进程间通信
需要使用 multiprocessing 模块中的 Queue 类。简单的理解 Queue 实现进程间通信的方式,就是使用了操作系统给开辟的一个队列空间,各个进程可以把数据放到该队列中,当然也可以从队列中把自己需要的信息取走。
现在有这样一个需求:我们有两个进程,一个进程负责写(write)一个进程负责读(read)。当写的进程写完某部分以后要把数据交给读的进程进行使用, 这时候我们就需要使用到了multiprocessing模块的Queue(队列):write() 将写完的数据交给队列,再由队列交给read()
import os, time from multiprocessing import Process, Queue class WriterProcess(Process): ''' 自定义写类,继承 Process ''' def __init__(self, name, mq): ''' 初始化 :param name: 写名称 :param mq: 子进程 ''' Process.__init__(self) self.name = name self.mq = mq def run(self): ''' 重写 run 方法 :return: 无返回值 ''' print("进程%s,已经启动,ID 是:%s" % (self.name, os.getpid())) for i in range(4): # write 进程负责把数据写出去 self.mq.put(i) time.sleep(1) print("进程%s,已经结束" % self.name) class ReaderProcess(Process): ''' 自定义写类,继承 Process ''' def __init__(self, name, mq): ''' 初始化 :param name: 写名称 :param mq: 子进程 ''' Process.__init__(self) self.name = name self.mq = mq def run(self): ''' 重写 run 方法 :return: 无返回值 ''' # 阻塞,等待获取 write 的值 while True: value = self.mq.get(True) print(value) print("结束子进程:%s" % self.name) if __name__ == '__main__': ''' 入口''' # 父进程创建队列,并传递给子进程 q = Queue() pw = WriterProcess("write", q) pr = ReaderProcess("read", q) # 进程开始 pw.start() pr.start() # 进程结束 pw.join() # pr 是一个死循环,无法等待其结束,只能强行结束 pr.terminate() print("父进程结束")
注意: 代码里面的while循环、join、terminate等函数
Pipe 实现进程间通信
Pipe 直译过来的意思是“管”或“管道”,该种实现多进程编程的方式,和实际生活中的管(管道)是非常类似的。通常情况下,管道有 2 个口,而 Pipe 也常用来实现 2 个进程之间的通信,这 2 个进程分别位于管道的两端,一端用来发送数据,另一端用来接收数据。
send(obj)
发送一个 obj 给管道的另一端,另一端使用 recv() 方法接收。需要说明的是,该 obj 必须是可序列化的,如果该对象序列化之后超过 32MB,则很可能会引发 ValueError 异常。
recv()
接收另一端通过 send() 方法发送过来的数据
close()
关闭连接
poll([timeout])
返回连接中是否还有数据可以读取
send_bytes(buffer[, offset[, size]])
发送字节数据。如果没有指定 offset、size 参数,则默认发送 buffer 字节串的全部数据;如果指定了 offset 和 size 参数,则只发送 buffer 字节串中从 offset 开始、长度为 size的字节数据。通过该方法发送的数据,应该使用 recv_bytes() 或 recv_bytes_into 方法接收。
recv_bytes([maxlength])
接收通过 send_bytes() 方法发送的数据,maxlength 指定 多接收的字节数。该方法返回接收到的字节数据
recv_bytes_into(buffer[, offset])
功能与 recv_bytes() 方法类似,只是该方法将接收到的数据放在 buffer 中
import os, time import multiprocessing from multiprocessing import Process, Pipe class WriterProcess(Process): ''' 自定义写类,继承 Process ''' def __init__(self, name, pip): ''' 初始化 ''' Process.__init__(self) self.name = name self.pip = pip def run(self): ''' 重写 run 方法 :return: 无返回值 ''' print("进程%s,已经启动,ID 是:%s" % (self.name, multiprocessing.current_process().pid)) for i in range(4): # 子进程通过管道写数据出去 # write 进程负责把数据写出去 self.pip.send(i) time.sleep(1) print("进程%s,已经结束" % self.name) class ReaderProcess(Process): ''' 自定义读类,继承 Process ''' def __init__(self, name, pip): ''' 初始化 ''' Process.__init__(self) self.name = name self.pip = pip def run(self): ''' 重写 run 方法 :return: 无返回值 ''' # 阻塞,等待获取 write 的值 while True: value = self.pip.recv() print(value) print("结束子进程:%s" % self.name) if __name__ == '__main__': ''' 入口''' # 父进程创建两个 pipe,并传递给子进程 p1, p2 = Pipe() pw = WriterProcess("write", p1) pr = ReaderProcess("read", p2) # 进程开始 pw.start() pr.start() # 进程结束 pw.join() # pr 是一个死循环,无法等待其结束,只能强行结束 pr.terminate() print("父进程结束")
进程池Pool
Python 提供了更好的管理多个进程的方式,就是使用进程池。进程池可以提供指定数量的进程给用户使用,即当有新的请求提交到进程池中时,如果池未满,则会创建一个新的进程用来执行该请求;反之,如果池中的进程数已经达到规定 大值,那么该请求就会等待,只要池中有进程空闲下来,该请求就能得到执行。
使用进程池的优点
提高效率,节省开辟进程和开辟内存空间的时间及销毁进程的时间
节省内存空间
Pool中的函数说明:
Pool(12):创建多个进程,表示可以同时执行的进程数量。默认大小是CPU的核心数果。
join():进程池对象调用join,会等待进程池中所有的子进程结束完毕再去结束父进程。
close():如果我们用的是进程池,在调用join()之前必须要先close(),并且在close()之后不能再继续往进程池添加新的进程
pool.apply_async(func,args,kwds) : 异步执行 ;将事件放入到进程池队列 。args以元组的方式传参,kwds 以字典的方式传参。
pool.apply_sync(func,args,kwds):同步执行;将事件放入到进程池队列。
这里解释一下同步与异步:
同步和异步关注的是消息通信机制 (synchronous communication/ asynchronous communication)。
同步,就是调用某个东西时,调用方得等待这个调用返回结果才能继续往后执行。
异步,和同步相反 调用方不会等待得到结果,而是在调用发出后调用者可用继续执行后续操作,被调用者通过状体来通知调用者,或者通过回掉函数来处理这个调用
# -*- coding: UTF-8 -*- # 文件名:Pool_a.py # 导入的模块 import random from multiprocessing.pool import Pool from time import sleep, time import os def run(name): '''自定义run方法''' print("%s子进程开始,进程ID:%d" % (name, os.getpid())) start = time() # 随机休息时间 sleep(random.choice([1, 2, 3, 4])) end = time() # 执行进程的时间 print("%s 子进程结束,进程ID:%d。耗时%.2f" % (name, os.getpid(), end - start)) if __name__ == "__main__": '''入口''' print("父进程开始") # 创建多个进程,表示可以同时执行的进程数量。默认大小是CPU的核心数 p = Pool(4) for i in range(10): # 创建进程,放入进程池统一管理,异步非阻塞式 p.apply_async(run, args=(i,)) # 如果我们用的是进程池,在调用 join() 之前必须要先 close(),并且在 close() 之后不能再继续往进程池添加新的进程 p.close() # 进程池对象调用join,会等待进程吃中所有的子进程结束完毕再去结束父进程 p.join() print("父进程结束。")
注意:因为我们Pool(4)指定了同时 多只能执行4个进程(Pool进程池默认大小是CPU的核心数),但是我们多放入了6个进程进入我们的进程池,所以程序一开始就会只开启4个进程。 而且子进程执行是没有顺序的,先执行哪个子进程操作系统说了算的。而且进程的创建和销毁也是非常消耗资源的,所以如果进行一些本来就不需要多少耗时的任务你会发现多进程甚至比单进程还要慢
线程的开发
Python 的标准库提供了两个模块:thread 和 threading,thread 是低级模块,threading 是高级模块,对_thread 进行了封装。绝大多数情况下,我们只需要使用threading 这个高级模块。
多线程概念
多线程使得系统可以在单独的进程中执行并发任务。虽然进程也可以在独立的内存空间中并发执行,但是其系统开销会比较大。生成一个新进程必须为其分配独立的地址空间,并维护其代码段、堆栈段和数据段等,这种开销是巨大的。另外,进程间的通信实现也不方便。在程序功能日益复杂的时候,需要有更好的系统模型来满足要求,线程由此产生了。 线程是“轻量级”的,一个进程中的线程使用同样的地址空间,且共享许多资源。启动线程的时间远远小于启动进程的时间和空间,而且,线程间的切换也要比进程间的切换快得多。由于使用同样的地址空间,所以线程之间的数据通信比较方便,一个进程下的线程之间可以直接使用彼此的数据。当然,这种方便性也会带来一些问题,特别是同步问题。 多线程对于那些I/O受限的程序特别适用。其实使用多线程的一个重要目的,就是 大化地利用CPU的资源。当某一线程在等待I/O的时候,另外一个线程可以占用CPU资源。如 简单的GUI程序,一般需要有一个任务支持前台界面的交互,还要有一个任务支持后台的处理。这时候,就适合采用线程模型,因为前台UI是在等待用户的输入或者鼠标单击等操作。除此之外,多线程在网络领域和嵌入式领域的应用也比较多。
多线程类似于同时执行多个不同程序,多线程运行有如下应用:
使用线程可以把占据长时间的程序中的任务放到后台去处理。
用户界面可以更加吸引人,比如用户点击了一个按钮去触发某些事件的处理,可以弹出一个进度条来显示处理的进度。
程序的运行速度可能加快。
在一些等待的任务实现上如用户输入、文件读写和网络收发数据等,线程就比较有用了。在这种情况下我们可以释放一些珍贵的资源如内存占用等等。
线程可以分为:
内核线程:由操作系统内核创建和撤销。
用户线程:不需要内核支持而在用户程序中实现的线程。
线程的状态
一个线程在其生命周期内,会在不同的状态之间转换。在任何一个时刻,线程总是处于某种线程状态中。虽然不同的操作系统可以实现不同的线程模型,定义不同的线程状态,但是总的说来,一个线程模型中下面几种状态是通用的。
就绪状态:线程已经获得了除CPU外的其他资源,正在参与调度,等待被执行。当被调度选中后,将立即执行。
运行状态:占用CPU资源,正在系统中运行。
等待(阻塞)状态:暂时不参与调度,等待特定事件发生,如I/O事件。
中止状态:线程已经运行结束,等待系统回收其线程资源。
Python使用全局解释器锁
(GIL)来保证在解释器中只包含一个线程,并在各个线程之间切换。当GIL可用的时候,处于就绪状态的线程在获取GIL后就可以运行了。线程将在指定的间隔时间内运行。当时间到期后,正在执行的线程将重新进入就绪状态并排队。GIL重新可用并且为就绪状态的线程获取。当然,特定的事件也有可能中断正在运行的线程。具体的线程状态转移将在下面进行详细介绍。
现在,Python语言中已经为各种平台提供了多线程处理能力,包括Windows、Linux等系统平台。在具体的库上,提供了两种不同的方式。一种是低级的线程处理模块_thread,仅仅提供一个 小的线程处理功能集,在实际的代码中 好不要直接使用;另外一种是高级的线程处理模块threading,现在大部分应用的线程实现都是基于此模块的。threading模块是基于thread模块的,部分实现思想来自于Java的threads类。 多线程设计的 大问题是如何协调多个线程。因此,在threading模块中,提供了多种数据同步的方法。为了能够更好地实现线程同步,Python 中提供了Queue模块,用来同步线程。在Queue模块中,含有一个同步的FIFO队列类型,特别适合线程之间的数据通信和同步。 由于大部分程序并不需要有多线程处理的能力,所以在Python启动的时候,并不支持多线程。也就说,Python中支持多线程所需要的各种数据结构特别是GIL还没有创建。当Python虚拟机启动的时候,多线程处理并没有打开,而仅支持单线程。这样做的好处是使得系统处理更加高效。只有当程序中使用了如thread.start_new_thread等方法的时候,Python才意识到需要多线程处理的支持。这时,Python虚拟机才会自动创建多线程处理所需要的数据结构和GIL。
创建线程
Python3 通过两个标准库 _thread 和 threading 提供对线程的支持。
_thread 提供了低级别的、原始的线程以及一个简单的锁,它相比于 threading 模块的功能还是比较有限的。
threading 模块除了包含 _thread 模块中的所有方法外,还提供的其他方法:
threading.currentThread(): 返回当前的线程变量。
threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
我们的线程创建方法也类似进程两种,第一种函数包装,即方法包装,另一种是类包装,线程的执行统一通过 start()方法。
使用threading模块
使用threading模块来创建线程是很方便的。简单地说,只要将类继承于threading.Thread,然后在init方法中调用 threading.Thread类中的init方法,重写类的run方法就可以了。看一些例子。
单线程执行:
import time def say(): print("这是单线程!") time.sleep(1) if __name__ == '__main__': start = time.time() for i in range(5): say() end = time.time() print(f"使用时间:{end - start}")
多线程执行:
import time import threading def say(): print("这是多线程!") time.sleep(1) if __name__ == '__main__': start = time.time() for i in range(5): # 通过 Threading 的 Thread 方法创建线程,并执行函数 t = threading.Thread(target = say) # 启动线程 t.start() end = time.time() print(f"使用时间:{end - start}")
可以明显看出使用了多线程并发的操作,花费时间要短很多
创建好的线程,需要调用 start() 方法来启动
我们从多线程与单线程对比,可以很明显的发现多线程的等待时间大大缩减了,程序是通过threading模块下的Thread的类,去实例化一个此对象,并调用方法,实现生成多线程,target参数表示线程需要执行的方法,通过对象的start的方法,开启线程。在使用start方法的时候需要注意,此方法一个线程 多只能调用一次看下面的代码:
# 文件名:threading_c.py import threading from time import sleep, ctime def sing(): for i in range(3): print("正在唱歌...%d" % i) sleep(1) def dance(): for i in range(3): print("正在跳舞...%d" % i) sleep(1) if __name__ == '__main__': print('---开始---:%s' % ctime()) t1 = threading.Thread(target = sing) t2 = threading.Thread(target = dance) t1.start() t2.start() sleep(5) # 屏蔽此行代码,试试看,程序是否立马结束 print("---结束---:%s" % ctime())
主线程会等待所有的子线程结束后才结束,运行看效果,当我们执行完所有的线程后,主线程后才会打印结束,,但是如果屏蔽,或者将我们的sleep(5)这行代码再来看尼,会发现主程序立马结束,但是线程还在运行?