进程是操作系统进行资源分配和调度的基本单位,进程之间是通过轮流占用cpu来执行的。
一. 创建进程的类Process
multiprocessing模块提供了一个创建进程的类Process,创建进程有一下两种方法
- 创建一个Process类的实例,并制定目标任务函数;
- 自定义一个类并继承Process类,重写其
__init__
方法和run
方法。
1.1 对比单进程和多进程耗时
第一种方法:
from multiprocessing import Process
import os
import time
# 子进程要执行的代码
def task_process(delay):
num = 0
for i in range(delay*100000000):
num+=i
print(f"进程pid为 {os.getpid()},执行完成")
if __name__=='__main__':
print( '父进程pid为 %s.' % os.getpid())
t0 = time.time()
task_process(3)
task_process(3)
t1 = time.time()
print(f"顺序执行耗时 {t1-t0} ")
p0 = Process(target=task_process, args=(3,))
p1 = Process(target=task_process, args=(3,))
t2 = time.time()
p0.start();p1.start()
p0.join();p1.join()
t3 = time.time()
print(f"多进程并发执行耗时 {t3-t2}")
第二种方法: 自定义一个类并继承Process类
from multiprocessing import Process
import os
import time
class MyProcess(Process):
def __init__(self, delay):
super().__init__()
self.delay = delay
# 子进程要执行的代码
def run(self):
num = 0
#for i in range(self.delay * 100000000):
for i in range(self.delay * 100000):
num += i
print(f"进程pid为 {os.getpid()},执行完成")
if __name__ == "__main__":
print("父进程pid为 %s." % os.getpid())
p0 = MyProcess(3)
p1 = MyProcess(3)
t0 = time.time()
print(p0.authkey)
p0.start()
p1.start()
p0.join()
p1.join()
t1 = time.time()
print(f"多进程并发执行耗时 {t1-t0}")
源码:
class BaseProcess(object):
'''
Process objects represent activity that is run in a separate process
The class is analogous to `threading.Thread`
'''
def _Popen(self):
raise NotImplementedError
def __init__(self, group=None, target=None, name=None, args=(), kwargs={
},
*, daemon=None):
assert group is None, 'group argument must be None for now'
count = next(_process_counter)
self._identity = _current_process._identity + (count,)
self._config = _current_process._config.copy()
self._parent_pid = os.getpid()
self._popen = None
self._closed = False
self._target = target
self._args = tuple(args)
self._kwargs = dict(kwargs)
self._name = name or type(self).__name__ + '-' + \
':'.join(str(i) for i in self._identity)
if daemon is not None:
self.daemon = daemon
_dangling.add(self)
参数说明:
- target: 表示调用对象,一般为函数,也可以为类;
- args: 表示调用对象的位置参数元组;
- kwargs: 表示调用对象的字典;
- name: 进程的别名;
- group: 参数不使用,可忽略
类提供的常用方法:
- is_alive(): 返回进程是否是激活的;
- join(): 可传入超时时间,阻塞进程,知道进程执行完成或超时或进程被终止;
- run(): 代表进程执行的任务函数,可被重写;
- start(): 激活进程;
- terminate(): 终止进程
属性:
- authkey: 字节码,进程的准秘钥;
- daemon: 父进程终止后自动终止,且不能产生新的进程,必须在start()之前设置;
- exitcode: 退出码,进程在运行时为None;
- name: 获取进程名称;
- pid: 进程id
1.2 daemon属性对比
from multiprocessing import Process
import os
import time
# 子进程要执行的代码
def task_process(delay):
print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 子进程执行开始。")
print(f"sleep {delay}s")
time.sleep(delay)
print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 子进程执行结束。")
if __name__=='__main__':
print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 父进程执行开始。")
p0 = Process(target=task_process, args=(3,))
p0.start()
print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 父进程执行结束。")
设置daemon属性
from multiprocessing import Process
import os
import time
# 子进程要执行的代码
def task_process(delay):
print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 子进程执行开始。")
print(f"sleep {delay}s")
time.sleep(delay)
print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 子进程执行结束。")
if __name__=='__main__':
print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 父进程执行开始。")
p0 = Process(target=task_process, args=(3,))
#设置 daemon属性为True, 只要主程序运行结束,程序即退出
p0.daemon = True
p0.start()
p0.join()
print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 父进程执行结束。")
二. 进程并发控制之Semaphore
用来控制对共享资源的访问数量,可以控制同一时刻并发的进程数。
多进程同步控制:
import multiprocessing
import time
def worker(s, i):
s.acquire()
print(time.strftime('%H:%M:%S'),multiprocessing.current_process().name + " 获得锁运行")
time.sleep(i)
print(time.strftime('%H:%M:%S'),multiprocessing.current_process().name + " 释放锁结束")
s.release()
if __name__ == "__main__":
# 同一时刻只有两个进程在执行操作
s = multiprocessing.Semaphore(2)
for i in range(6):
p = multiprocessing.Process(target = worker, args=(s, 2))
p.start()
三. 进程同步之lock
如果某一时间只能有一个进程访问某个共享资源,这种情形就需要使用锁
多个进程输出信息,不加锁:
import multiprocessing
import time
def task1():
n = 5
while n > 1:
print(f"{time.strftime('%H:%M:%S')} task1 输出信息")
time.sleep(1)
n -= 1
def task2():
n = 5
while n > 1:
print(f"{time.strftime('%H:%M:%S')} task2 输出信息")
time.sleep(1)
n -= 1
def task3():
n = 5
while n > 1:
print(f"{time.strftime('%H:%M:%S')} task3 输出信息")
time.sleep(1)
n -= 1
if __name__ == "__main__":
p1 = multiprocessing.Process(target=task1)
p2 = multiprocessing.Process(target=task2)
p3 = multiprocessing.Process(target=task3)
p1.start()
p2.start()
p3.start()
上面代码同一时刻有2个进程在打印信息,实际应用中,可能会造成混乱。
现在修改一下程序,同一时刻仅有一个进程输出信息,加锁:
import multiprocessing
import time
def task1(lock):
# 使用上下文管理器with来写
with lock:
n = 5
while n > 1:
print(f"{time.strftime('%H:%M:%S')} task1 输出信息")
time.sleep(1)
n -= 1
def task2(lock):
lock.acquire()
n = 5
while n > 1:
print(f"{time.strftime('%H:%M:%S')} task2 输出信息")
time.sleep(1)
n -= 1
lock.release()
def task3(lock):
lock.acquire()
n = 5
while n > 1:
print(f"{time.strftime('%H:%M:%S')} task3 输出信息")
time.sleep(1)
n -= 1
lock.release()
if __name__ == "__main__":
lock = multiprocessing.Lock()
p1 = multiprocessing.Process(target=task1, args=(lock,))
p2 = multiprocessing.Process(target=task2, args=(lock,))
p3 = multiprocessing.Process(target=task3, args=(lock,))
p1.start()
p2.start()
p3.start()
四. 进程同步至Event
Event用来实现进程之间的同步通信
下面代码定义了2个进程函数,一个用于等待事件发生,另一个用于等待事件发生并设置超时时间,主进程调用事件的set()方法唤醒等待事件的进程,唤醒后用clear()方法清除事件的状态并重新等待,以此达到进程的同步控制。
import multiprocessing
import time
def wait_for_event(e):
e.wait()
time.sleep(1)
e.clear()
print(f'{time.strftime("%H:%M%S")} 进程A 等')
e.wait()
print(f'{time.strftime("%H:%M%S")} 进程A 一起走')
def wait_for_timeout(e, t):
e.wait()
time.sleep(1)
e.clear()
print(f'{time.strftime("%H:%M%S")} 进程B 最多等{t}秒')
e.wait()
print(f'{time.strftime("%H:%M%S")} 进程B 继续走')
if __name__ == '__main__':
e = multiprocessing.Event()
w1 = multiprocessing.Process(target=wait_for_event, args=(e,))
w2 = multiprocessing.Process(target=wait_for_timeout, args=(e, 3))
w1.start()
w2.start()
print(f'{time.strftime("%H:%M%S")} 主进程 需要5秒')
e.set()
time.sleep(8)
print(f'{time.strftime("%H:%M%S")} 主进程 赶上')
e.set()
w1.join()
w2.join()
print(f'{time.strftime("%H:%M%S")} 主进程 退出')
五. 进程优先队列Queue
使用多进程实现生产者-消费者模式
from multiprocessing import Process,Queue
import time
def ProducerA(q):
count = 1
while True:
q.put(f"冷饮 {count}")
print(f"{time.strftime('%H:%M:%S')} A 放入:[冷饮 {count}]")
count +=1
time.sleep(1)
def ConsumerB(q):
while True:
print(f"{time.strftime('%H:%M:%S')} B 取出 [{q.get()}]")
time.sleep(5)
if __name__ == '__main__':
q = Queue(maxsize=5)
p = Process(target=ProducerA,args=(q,))
c = Process(target=ConsumerB,args=(q,))
c.start()
p.start()
c.join()
p.join()
上面代码定义了生产者和消费者函数,队列最大容量为5,生产者A生产的速度较快,当队列满时,等待消费者B取出后才能继续放入。
六. 多进程之进程池 pool
pool可以提供指定数量的进程供用户调用,池没满可以接受新的请求到池中,池满该请求就会等待,知道池中有进程结束才会创建新的进程。
#coding: utf-8
import multiprocessing
import time
def task(name):
print(f"{time.strftime('%H:%M:%S')}: {name} 开始执行")
time.sleep(3)
if __name__ == "__main__":
# 同一时刻有3个进程在执行
pool = multiprocessing.Pool(processes = 3)
for i in range(10):
#维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
pool.apply_async(func = task, args=(i,))
pool.close()
pool.join()
print("hello")
七. 多进程之数据交换pipe
import multiprocessing
import time
def task1(pipe):
for i in range(5):
str = f"task1-{i}"
print(f"{time.strftime('%H:%M:%S')} task1 发送:{str}")
pipe.send(str)
time.sleep(2)
for i in range(5):
print(f"{time.strftime('%H:%M:%S')} task1 接收: { pipe.recv() }")
def task2(pipe):
for i in range(5):
print(f"{time.strftime('%H:%M:%S')} task2 接收: { pipe.recv() }")
time.sleep(1)
for i in range(5):
str = f"task2-{i}"
print(f"{time.strftime('%H:%M:%S')} task2 发送:{str}")
pipe.send(str)
if __name__ == "__main__":
pipe = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=task1, args=(pipe[0],))
p2 = multiprocessing.Process(target=task2, args=(pipe[1],))
p1.start()
p2.start()
p1.join()
p2.join()
程序定义了2个子进程函数: task1先发送5条消息,再接收消息;task2先接收消息,再发送消息。