managers模块可以把多进程分布到多台机器上
依靠网络通信,一个服务进程可以作为调度者,将任务分布到其他多个进程中
代码示例
# -*- coding: utf-8 -*- # @File : task_master.py # @Date : 2018-06-11 # @Author : Peng Shiyu # 思路:通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了 import time from queue import Queue from multiprocessing.managers import BaseManager # 发送任务的队列 task_queue = Queue() # 接收结果的队列 result_queue = Queue() # 从BaseManager继承的QueueManager class QueueManager(BaseManager): pass # 把两个Queue都注册到网络上, callable参数关联了Queue对象 QueueManager.register("get_task_queue", callable=lambda: task_queue) QueueManager.register("get_result_queue", callable=lambda: result_queue) # 绑定端口5002, 设置验证码'abc' manager = QueueManager(address=("localhost", 5002), authkey=b"abc") manager.start() print("启动服务...") # 获得通过网络访问的Queue对象, # 必须通过manager获得的Queue接口添加 task = manager.get_task_queue() result = manager.get_result_queue() print("放入队列") for i in range(10): time.sleep(1) task.put(i) print("取出结果") for i in range(10): print(result.get(timeout=10)) # 关闭 manager.shutdown() """ 启动服务... 放入队列 取出结果 0 1 4 9 16 25 36 49 64 81 """
# -*- coding: utf-8 -*- # @File : task_worker.py # @Date : 2018-06-11 # @Author : Peng Shiyu import time from multiprocessing.managers import BaseManager # 创建类似的QueueManager class QueueManager(BaseManager): pass # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字 QueueManager.register("get_task_queue") QueueManager.register("get_result_queue") # 连接到服务器,也就是运行task_master.py的机器 # 端口和验证码注意保持与task_master.py设置的完全一致 manager = QueueManager(address=("localhost", 5002), authkey=b"abc") # 从网络连接 manager.connect() # 获取Queue的对象 task = manager.get_task_queue() result = manager.get_result_queue() # 从task队列取任务,并把结果写入result队列 for i in range(10): n = task.get(timeout=1) print("获取任务:%s"% n) time.sleep(2) result.put(n*n) if hasattr(manager, "shutdown"): manager.shutdown() """ task_worker-1 获取任务:0 获取任务:2 获取任务:4 获取任务:6 获取任务:8 task_worker-2 获取任务:1 获取任务:3 获取任务:5 获取任务:7 获取任务:9 """