在alpha和bravo执行完成之后,两个进程都已经关闭。
多线程的优点是缩短脚本执行时间,提高执行效率。
多进程存在的问题有:
- 通过进程模块执行的函数无法获取返回值
- 多个今进程同时修改文件可能会出现错误
- 进程数量太多会造成资源不足、死机的情况
进程池
进程池的概念与数据库连接池的概念是类似的,都是为了提高效率,避免线程创建于关闭的消耗
多进程模块multiprocessing中进程池的相关函数:
- Pool:进程池的创建,参数为要创建的进程的个数,返回一个进程池对象
- applu_async:任务加入线程池(异步),参数函数名和函数的参数,无返回值
- close:关闭进程池,无参数、无返回值
- join:等待进程池任务结束,无参数、无返回值
import multiprocessing import os import time def alpha(count): print(count, os.getpid()) time.sleep(5) if __name__ == '__main__': pool = multiprocessing.Pool(5) for i in range(20): pool.apply_async(func=alpha, args=(i,)) time.sleep(20) 复制代码
进程被重复利用了,这里调用了异步,异步就是非同步,导致前后使用的进程号顺序不一致。
进程池结束任务之前,主进程就已经结束了,程序结束,进程池就被关闭了。
pool.close() pool.join() 复制代码
在time.sleep()函数下添加代码,并注释time.sleep()函数。
20个任务全部完成,需要通过close()函数和join()函数,来保证在子线程执行结束之后,再结束主线程,在退出程序。
alpha()函数添加return, 异步是可以获取返回值的。
import multiprocessing import os import time def alpha(count): print(count, os.getpid()) time.sleep(5) return 'result is %s, pid is %s' % (count, os.getpid()) if __name__ == '__main__': pool = multiprocessing.Pool(5) res_list = [] for i in range(20): res = pool.apply_async(func=alpha, args=(i, )) res_list.append(res) for res in res_list: print(res.get()) # time.sleep(20) # pool.close() # pool.join() 复制代码
第一组先执行,执行完成之后打印出结果,同时第二组也开始执行。
进程锁
当一个进程开始执行任务的时候,为了避免进程被其他任务使用,需要通过锁开控制,只有解锁之后才能执行下一个任务
进程锁相关的函数:
acquire:上锁,无参数、无返回值 release:开锁,无参数、无返回值
import multiprocessing import os import time def alpha(count, lock): # 上锁 lock.acquire() print(count, os.getpid()) time.sleep(5) # 解锁 lock.release() return 'result is %s, pid is %s' % (count, os.getpid()) if __name__ == '__main__': pool = multiprocessing.Pool(5) manager = multiprocessing.Manager() lock = manager.Lock() res_list = [] for i in range(20): res = pool.apply_async(func=alpha, args=(i, lock)) # res_list.append(res) pool.close() pool.join() 复制代码
每次只有一个进程在工作,锁不可以滥用,锁没有解开就会造成死锁现象。
三、进程之间的通信
两个进程之间需要相互配合工作,就需要通信的帮助。进程之间通过队列进行通信,队列可以解决进程模块执行的函数无法获取返回值的问题
队列是一种数据结构,队列中数据存储的特点是先入先出或者后入后出
多线程模块multipartprocessing中队列相关函数
- Queue:队列的创建,返回一个队列对象
- put:将信息放入队列,参数为放入队列的信息,无返回值
- get:获取队列中的信息,无参数,返回值为字符串既具体的消息
import json import multiprocessing class Work(): def __init__(self, queue): self.queue = queue def send(self, message): if not isinstance(message, str): message = json.dumps(message) self.queue.put(message) def reveive(self): while True: result = self.queue.get() try: res = json.loads(result) except: res = result print('Message is {}'.format(res)) if __name__ == '__main__': queue = multiprocessing.Queue() work = Work(queue) send = multiprocessing.Process(target=work.send, args=({'name': 'stark'},)) receive = multiprocessing.Process(target=work.reveive) send.start() receive.start() send.join() 复制代码
此时接收到数据之后,程序并不会停止,而是持续运行,需要通过调用函数来终止程序,在脚本末尾增加代码。
receive.terminate() print(send.is_alive()) 复制代码
将批量消息放入队列中,增加send_list()函数。
import json import multiprocessing import time class Work(): def __init__(self, queue): self.queue = queue # 其余代码不变 def send_list(self): for i in range(10): self.queue.put('Mark {}'.format(i)) time.sleep(1) if __name__ == '__main__': queue = multiprocessing.Queue() work = Work(queue) send = multiprocessing.Process(target=work.send, args=({'name': 'stark'},)) receive = multiprocessing.Process(target=work.reveive) # 为send_list函数创建一个进程 send_list = multiprocessing.Process(target=work.send_list) # 启动该进程 send_list.start() send.start() receive.start() # print(send.is_alive()) send.join() 复制代码
要想程序能够正常停止,只需阻塞最长的进程即可。
# send.join() send_list.join() receive.terminate() print(send.is_alive()) 复制代码