运行结果如下:
ForkPoolWorker-4 : Do Something 3 ForkPoolWorker-2 : Do Something 1 ForkPoolWorker-1 : Do Something 0 ForkPoolWorker-3 : Do Something 2 ForkPoolWorker-5 : Do Something 4 ForkPoolWorker-6 : Do Something 5 ForkPoolWorker-7 : Do Something 6 ForkPoolWorker-8 : Do Something 7 ForkPoolWorker-2 : Do Something 9 ForkPoolWorker-4 : Do Something 8 ForkPoolWorker-1 : Do Something 11 ForkPoolWorker-7 : Do Something 12 ForkPoolWorker-5 : Do Something 13 ForkPoolWorker-6 : Do Something 14 ForkPoolWorker-3 : Do Something 10 ForkPoolWorker-8 : Do Something 15 ForkPoolWorker-6 : Do Something 19 ForkPoolWorker-1 : Do Something 17 ForkPoolWorker-5 : Do Something 18 ForkPoolWorker-7 : Do Something 16 子进程执行任务完毕!
上面的输出结果顺序并没有按照循环中的顺序输出,可以利用apply_async
的返回值是:被进程调用的函数的返回值,来规避,修改后的代码如下:
import multiprocessing as mp import time def func(msg): time.sleep(1) return mp.current_process().name + " : " + msg if __name__ == '__main__': pool = mp.Pool() results = [] for i in range(20): msg = "Do Something %d" % i results.append(pool.apply_async(func, (msg,))) pool.close() pool.join() for result in results: print(result.get()) print("子进程执行任务完毕!")
运行结果如下:
ForkPoolWorker-1 : Do Something 0 ForkPoolWorker-2 : Do Something 1 ForkPoolWorker-3 : Do Something 2 ForkPoolWorker-4 : Do Something 3 ForkPoolWorker-5 : Do Something 4 ForkPoolWorker-7 : Do Something 6 ForkPoolWorker-6 : Do Something 5 ForkPoolWorker-8 : Do Something 7 ForkPoolWorker-1 : Do Something 8 ForkPoolWorker-2 : Do Something 9 ForkPoolWorker-4 : Do Something 11 ForkPoolWorker-3 : Do Something 10 ForkPoolWorker-7 : Do Something 12 ForkPoolWorker-8 : Do Something 13 ForkPoolWorker-5 : Do Something 14 ForkPoolWorker-6 : Do Something 15 ForkPoolWorker-1 : Do Something 16 ForkPoolWorker-2 : Do Something 17 ForkPoolWorker-4 : Do Something 18 ForkPoolWorker-3 : Do Something 19 子进程执行任务完毕!
感觉还是有点模糊,通过一个多进程统计目录下文件的行数和字符个数的脚本来巩固,代码示例如下:
import multiprocessing as mp import time import os result_file = 'result.txt' # 统计结果写入文件名 # 获得路径下的文件列表 def get_files(path): file_list = [] for file in os.listdir(path): if file.endswith('py'): file_list.append(os.path.join(path, file)) return file_list # 统计每个文件中函数与字符数 def get_msg(path): with open(path, 'r', encoding='utf-8') as f: content = f.readlines() f.close() lines = len(content) char_count = 0 for i in content: char_count += len(i.strip("\n")) return lines, char_count, path # 将数据写入到文件中 def write_result(result_list): with open(result_file, 'a', encoding='utf-8') as f: for result in result_list: f.write(result[2] + " 行数:" + str(result[0]) + " 字符数:" + str(result[1]) + "\n") f.close() if __name__ == '__main__': start_time = time.time() file_list = get_files(os.getcwd()) pool = mp.Pool() result_list = pool.map(get_msg, file_list) pool.close() pool.join() write_result(result_list) print("处理完毕,用时:", time.time() - start_time)
运行结果如下:
# 控制台输出 处理完毕,用时: 0.13662314414978027 # result.txt文件内容 /Users/jay/Project/Python/Book/Chapter 11/11_4.py 行数:33 字符数:621 /Users/jay/Project/Python/Book/Chapter 11/11_1.py 行数:32 字符数:578 /Users/jay/Project/Python/Book/Chapter 11/11_5.py 行数:52 字符数:1148 /Users/jay/Project/Python/Book/Chapter 11/11_13.py 行数:20 字符数:333 /Users/jay/Project/Python/Book/Chapter 11/11_16.py 行数:62 字符数:1320 /Users/jay/Project/Python/Book/Chapter 11/11_12.py 行数:23 字符数:410 /Users/jay/Project/Python/Book/Chapter 11/11_15.py 行数:48 字符数:1087 /Users/jay/Project/Python/Book/Chapter 11/11_8.py 行数:17 字符数:259 /Users/jay/Project/Python/Book/Chapter 11/11_11.py 行数:18 字符数:314 /Users/jay/Project/Python/Book/Chapter 11/11_10.py 行数:46 字符数:919 /Users/jay/Project/Python/Book/Chapter 11/11_14.py 行数:20 字符数:401 /Users/jay/Project/Python/Book/Chapter 11/11_9.py 行数:31 字符数:623 /Users/jay/Project/Python/Book/Chapter 11/11_2.py 行数:32 字符数:565 /Users/jay/Project/Python/Book/Chapter 11/11_6.py 行数:23 字符数:453 /Users/jay/Project/Python/Book/Chapter 11/11_7.py 行数:37 字符数:745 /Users/jay/Project/Python/Book/Chapter 11/11_3.py 行数:29 字符数:518
③ 进程间共享数据
涉及到了多个进程,不可避免的要处理进程间数据交换问题,多进程不像多线程,不同进程之间内存是不共享的,multiprocessing模块提供了四种进程间共享数据的方式:Queue,Value和Array,Manager.dict和pipe。下面一一介绍这四种方式的具体用法。
1.Queue队列
多进程安全的队列,put方法用以插入数据到队列中,put方法有两个可选参数:
blocked和timeout。若blocked为True(默认)且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常,而get方法则从队列读取并且删除一个元素,参数规则同抛出的一场是Queue.Empty。另外Queue不止适用于进程通信,也适用于线程,顺道写一个比较单线程,多线程和多进程的运行效率对比示例,具体代码如下:
import threading as td import multiprocessing as mp import time def do_something(queue): result = 0 for i in range(100000): result += i ** 2 queue.put(result) # 单线程 def normal(): result = 0 for _ in range(3): for i in range(100000): result += i ** 2 print("单线程处理结果:", result) # 多线程 def multi_threading(): q = mp.Queue() t1 = td.Thread(target=do_something, args=(q,)) t2 = td.Thread(target=do_something, args=(q,)) t3 = td.Thread(target=do_something, args=(q,)) t1.start() t2.start() t3.start() t1.join() t2.join() t3.join() print("多线程处理结果:", (q.get() + q.get() + q.get())) # 多进程 def multi_process(): q = mp.Queue() p1 = mp.Process(target=do_something, args=(q,)) p2 = mp.Process(target=do_something, args=(q,)) p3 = mp.Process(target=do_something, args=(q,)) p1.start() p2.start() p3.start() p1.join() p2.join() p3.join() print("多进程处理结果:", (q.get() + q.get() + q.get())) if __name__ == '__main__': start_time_1 = time.time() normal() start_time_2 = time.time() print("单线程处理耗时:", start_time_2 - start_time_1) multi_threading() start_time_3 = time.time() print("多线程处理耗时:", start_time_3 - start_time_2) multi_process() start_time_4 = time.time() print("多进程处理耗时:", start_time_4 - start_time_3)