python 分布式进程通信

简介: task_master.txttask_worker.txt 点击(此处)折叠或打开 #!/usr/bin/env python3 #-*- coding:utf-8 -*- ...
img_e25d4fb2f8de1caf41a735ec53088516.pngtask_master.txt
img_e25d4fb2f8de1caf41a735ec53088516.pngtask_worker.txt

点击(此处)折叠或打开

  1. #!/usr/bin/env python3
  2. #-*- coding:utf-8 -*-
  3. '''
  4. '''
  5. import time,random,queue
  6. from multiprocessing.managers import BaseManager

  7. task_queue = queue.Queue()
  8. result_queue = queue.Queue()

  9. class QueueManager(BaseManager):
  10.     pass

  11. QueueManager.register('get_task_queue', callable=lambda: task_queue)
  12. QueueManager.register('get_result_queue', callable=lambda: result_queue)

  13. manager = QueueManager(address=('', 5000), authkey=b'talen')
  14. manager.start()
  15. task = manager.get_task_queue()
  16. result = manager.get_result_queue()
  17. for i in range(10):
  18.     n = random.randint(0,9000)
  19.     print('Put task %d' % n)
  20.     task.put(n)
  21. for i in range(10):
  22.     r=result.get(timeout=10)
  23.     print("Result : %s " % r)
  24. manager.shutdown()

  25. print('master exit.')


点击(此处)折叠或打开

  1. #!/usr/bin/env python3
  2. #-*- coding:utf-8 -*-
  3. '''
  4. '''


  5. import time, queue, sys
  6. from multiprocessing.managers import BaseManager
  7. class QueueManger(BaseManager):
  8.     pass
  9. QueueManger.register('get_task_queue')
  10. QueueManger.register('get_result_queue')
  11. server_addr='127.0.0.1'
  12. print('Connect to server %s '% server_addr)
  13. m=QueueManger(address=(server_addr,5000), authkey=b'talen')
  14. m.connect()
  15. task=m.get_task_queue()
  16. result=m.get_result_queue()
  17. for i in range(10):
  18.     try:
  19.         n=task.get(timeout=1)
  20.         print('run task %d * %d ...' %(n,n))
  21.         r='%d * %d = %d' % (n,n,n*n)
  22.         time.sleep(1)
  23.         result.put(r)
  24.     except Queue.Empty:
  25.         print('task queue is empty')
  26. print('worker exit.')
t@localhost untitled$ python3 task_master.py 
Put task 6811
Put task 5164
Put task 8492
Put task 177
Put task 5496
Put task 8724
Put task 6422
Put task 2887
Put task 287
Put task 876
Result : 6811 * 6811 = 46389721 
Result : 5164 * 5164 = 26666896 
Result : 8492 * 8492 = 72114064 
Result : 177 * 177 = 31329 
Result : 5496 * 5496 = 30206016 
Result : 8724 * 8724 = 76108176 
Result : 6422 * 6422 = 41242084 
Result : 2887 * 2887 = 8334769 
Result : 287 * 287 = 82369 
Result : 876 * 876 = 767376 
master exit.
t@localhost untitled$ 
t@localhost untitled$ python3 task_worker.py 
Connect to server 127.0.0.1 
run task 6811 * 6811 ...
run task 5164 * 5164 ...
run task 8492 * 8492 ...
run task 177 * 177 ...
run task 5496 * 5496 ...
run task 8724 * 8724 ...
run task 6422 * 6422 ...
run task 2887 * 2887 ...
run task 287 * 287 ...
run task 876 * 876 ...
worker exit.
t@localhost untitled$ 




参考学习:http://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001431929340191970154d52b9d484b88a7b343708fcc60000#0
目录
相关文章
|
3月前
|
监控 编译器 Python
如何利用Python杀进程并保持驻留后台检测
本教程介绍如何使用Python编写进程监控与杀进程脚本,结合psutil库实现后台驻留、定时检测并强制终止指定进程。内容涵盖基础杀进程、多进程处理、自动退出机制、管理员权限启动及图形界面设计,并提供将脚本打包为exe的方法,适用于需持续清理顽固进程的场景。
|
7月前
|
机器学习/深度学习 分布式计算 API
Python 高级编程与实战:深入理解并发编程与分布式系统
在前几篇文章中,我们探讨了 Python 的基础语法、面向对象编程、函数式编程、元编程、性能优化、调试技巧、数据科学、机器学习、Web 开发、API 设计、网络编程和异步IO。本文将深入探讨 Python 在并发编程和分布式系统中的应用,并通过实战项目帮助你掌握这些技术。
|
7月前
|
消息中间件 分布式计算 并行计算
Python 高级编程与实战:构建分布式系统
本文深入探讨了 Python 中的分布式系统,介绍了 ZeroMQ、Celery 和 Dask 等工具的使用方法,并通过实战项目帮助读者掌握这些技术。ZeroMQ 是高性能异步消息库,支持多种通信模式;Celery 是分布式任务队列,支持异步任务执行;Dask 是并行计算库,适用于大规模数据处理。文章结合具体代码示例,帮助读者理解如何使用这些工具构建分布式系统。
|
8月前
|
消息中间件 Linux C++
c++ linux通过实现独立进程之间的通信和传递字符串 demo
的进程间通信机制,适用于父子进程之间的数据传输。希望本文能帮助您更好地理解和应用Linux管道,提升开发效率。 在实际开发中,除了管道,还可以根据具体需求选择消息队列、共享内存、套接字等其他进程间通信方
172 16
|
9月前
|
分布式计算 DataWorks 数据处理
产品测评 | 上手分布式Python计算服务MaxFrame产品最佳实践
MaxFrame是阿里云自研的分布式计算框架,专为大数据处理设计,提供高效便捷的Python开发体验。其主要功能包括Python编程接口、直接利用MaxCompute资源、与MaxCompute Notebook集成及镜像管理功能。本文基于MaxFrame最佳实践,详细介绍了在DataWorks中使用MaxFrame创建数据源、PyODPS节点和MaxFrame会话的过程,并展示了如何通过MaxFrame实现分布式Pandas处理和大语言模型数据处理。测评反馈指出,虽然MaxFrame具备强大的数据处理能力,但在文档细节和新手友好性方面仍有改进空间。
|
9月前
|
数据采集 人工智能 分布式计算
🚀 MaxFrame 产品深度体验评测:Python 分布式计算的未来
在数据驱动的时代,大数据分析和AI模型训练对数据预处理的效率要求极高。传统的Pandas工具在小数据集下表现出色,但面对大规模数据时力不从心。阿里云推出的Python分布式计算框架MaxFrame,以“Pandas风格”为核心设计理念,旨在降低分布式计算门槛,同时支持超大规模数据处理。MaxFrame不仅保留了Pandas的操作习惯,还通过底层优化实现了高效的分布式调度、内存管理和容错机制,并深度集成阿里云大数据生态。本文将通过实践评测,全面解析MaxFrame的能力与价值,展示其在大数据和AI场景中的卓越表现。
203 4
🚀 MaxFrame 产品深度体验评测:Python 分布式计算的未来
|
9月前
|
人工智能 分布式计算 大数据
MaxFrame 产品评测:大数据与AI融合的Python分布式计算框架
MaxFrame是阿里云MaxCompute推出的自研Python分布式计算框架,支持大规模数据处理与AI应用。它提供类似Pandas的API,简化开发流程,并兼容多种机器学习库,加速模型训练前的数据准备。MaxFrame融合大数据和AI,提升效率、促进协作、增强创新能力。尽管初次配置稍显复杂,但其强大的功能集、性能优化及开放性使其成为现代企业与研究机构的理想选择。未来有望进一步简化使用门槛并加强社区建设。
409 8
|
8月前
|
数据采集 Java 数据处理
Python实用技巧:轻松驾驭多线程与多进程,加速任务执行
在Python编程中,多线程和多进程是提升程序效率的关键工具。多线程适用于I/O密集型任务,如文件读写、网络请求;多进程则适合CPU密集型任务,如科学计算、图像处理。本文详细介绍这两种并发编程方式的基本用法及应用场景,并通过实例代码展示如何使用threading、multiprocessing模块及线程池、进程池来优化程序性能。结合实际案例,帮助读者掌握并发编程技巧,提高程序执行速度和资源利用率。
376 0
|
9月前
|
SQL 分布式计算 DataWorks
MaxCompute MaxFrame评测 | 分布式Python计算服务MaxFrame(完整操作版)
在当今数字化迅猛发展的时代,数据信息的保存与分析对企业决策至关重要。MaxCompute MaxFrame是阿里云自研的分布式计算框架,支持Python编程接口、兼容Pandas接口并自动进行分布式计算。通过MaxCompute的海量计算资源,企业可以进行大规模数据处理、可视化数据分析及科学计算等任务。本文将详细介绍如何开通MaxCompute和DataWorks服务,并使用MaxFrame进行数据操作。包括创建项目、绑定数据源、编写PyODPS 3节点代码以及执行SQL查询等内容。最后,针对使用过程中遇到的问题提出反馈建议,帮助用户更好地理解和使用MaxFrame。
|
9月前
|
SQL 分布式计算 数据处理
云产品评测|分布式Python计算服务MaxFrame | 在本地环境中使用MaxFrame + 基于MaxFrame实现大语言模型数据处理
本文基于官方文档,介绍了由浅入深的两个部分实操测试,包括在本地环境中使用MaxFrame & 基于MaxFrame实现大语言模型数据处理,对步骤有详细说明。体验下来对MaxCompute的感受是很不错的,值得尝试并使用!
209 1

推荐镜像

更多
下一篇
oss教程