python生产者消费者模型

简介: #-*- coding:utf-8 -*-import Queueimport threadingimport timeimport jsonimport sysimport signalimport randomreload( sys )sys.
#-*- coding:utf-8 -*-

import Queue
import threading
import time
import json
import sys
import signal
import random
reload( sys )
sys.setdefaultencoding('utf-8')

class Enum(set):
    def __getattr__(self, name):
        if name in self:
            return name
        else:
            raise AttributeError
State = Enum(['NORMAL', 'UPDATE', 'STOP'])

engine_do = True
def handler(signum, frame):
    print 'receive signal: %s' % signum
    global engine_do
    engine_do = False

signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)

class Consumer(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.do = True

    def stop(self):
        self.do = False
        print 'change consumer.do to False'
    
    def run(self):
        print 'Create new consumer thread, id: %s' % self.ident
        while self.do:
            messages = []
            result = []
            msg = random.randint(0,100)
            self.queue.put(msg)
        print 'Consumer thread will exit.'
        
class Producer(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.msgs = Queue.Queue()
        self.state = State.NORMAL
        self.do = True

    def stop(self):
        self.do = False
        self.state = State.STOP

    def run(self):
        while self.do:
            if self.state == State.NORMAL:
                if not self.queue.empty():
                    data = self.queue.get()
                    print 'Producer get data: %s' % data
                else:
                    print 'data queue is empty, sleep 5 seconds.'
                    time.sleep(5)
            elif self.state == State.STOP:
                while not self.queue.empty():
                    data = self.queue.get()
                    print 'Producer get data: %s' % data
        print 'Producer thread will exit.'

class Engine():
    def __init__(self):
        # 在获取所有的topic并初始化连接
        # 初始化消费Queue中数据的线程
        self.queue = Queue.Queue()
        self.threads_consumer = []
        self.threads_producer = []


    def run(self):
        # 启动Consumer线程
        for i in xrange(10):
            consumer = Consumer(self.queue)
            consumer.start()
            self.threads_consumer.append(consumer)
        producer = Producer(self.queue)
        self.threads_producer.append(producer)
        producer.start()
        while True:
            time.sleep(5)
            print engine_do
            if not engine_do:
                print 'engine will exit...'
                print 'first stop consumer threads'
                for consumer in self.threads_consumer:
                    consumer.stop()
                for consumer in self.threads_consumer:
                    consumer.join()
                print 'all consumer threads are done.'
                print 'second stop producer threads...'
                for producer in self.threads_producer:
                    producer.stop()
                for producer in self.threads_producer:
                    producer.join()
                print 'all producer threads are done.'
                break
        print 'All threads are not alive, main thread will exit.'
        return 

if __name__=='__main__':
    engine = Engine()
    engine.run()

目录
相关文章
|
28天前
|
数据采集 数据可视化 数据挖掘
金融波动率的多模型建模研究:GARCH族与HAR模型的Python实现与对比分析
本文探讨了金融资产波动率建模中的三种主流方法:GARCH、GJR-GARCH和HAR模型,基于SPY的实际交易数据进行实证分析。GARCH模型捕捉波动率聚类特征,GJR-GARCH引入杠杆效应,HAR整合多时间尺度波动率信息。通过Python实现模型估计与性能比较,展示了各模型在风险管理、衍生品定价等领域的应用优势。
251 66
金融波动率的多模型建模研究:GARCH族与HAR模型的Python实现与对比分析
|
2月前
|
机器学习/深度学习 数据可视化 TensorFlow
使用Python实现深度学习模型的分布式训练
使用Python实现深度学习模型的分布式训练
195 73
|
2月前
|
机器学习/深度学习 数据采集 供应链
使用Python实现智能食品消费需求分析的深度学习模型
使用Python实现智能食品消费需求分析的深度学习模型
98 21
|
2月前
|
机器学习/深度学习 数据采集 数据挖掘
使用Python实现智能食品消费模式预测的深度学习模型
使用Python实现智能食品消费模式预测的深度学习模型
84 2
|
2月前
|
机器学习/深度学习 数据采集 搜索推荐
使用Python实现智能食品消费偏好预测的深度学习模型
使用Python实现智能食品消费偏好预测的深度学习模型
115 23
|
2月前
|
机器学习/深度学习 数据采集 数据挖掘
使用Python实现智能食品消费习惯预测的深度学习模型
使用Python实现智能食品消费习惯预测的深度学习模型
161 19
|
2月前
|
机器学习/深度学习 数据采集 数据挖掘
使用Python实现智能食品消费趋势分析的深度学习模型
使用Python实现智能食品消费趋势分析的深度学习模型
156 18
|
23天前
|
存储 缓存 Java
Python高性能编程:五种核心优化技术的原理与Python代码
Python在高性能应用场景中常因执行速度不及C、C++等编译型语言而受质疑,但通过合理利用标准库的优化特性,如`__slots__`机制、列表推导式、`@lru_cache`装饰器和生成器等,可以显著提升代码效率。本文详细介绍了这些实用的性能优化技术,帮助开发者在不牺牲代码质量的前提下提高程序性能。实验数据表明,这些优化方法能在内存使用和计算效率方面带来显著改进,适用于大规模数据处理、递归计算等场景。
58 5
Python高性能编程:五种核心优化技术的原理与Python代码
|
2月前
|
Python
[oeasy]python055_python编程_容易出现的问题_函数名的重新赋值_print_int
本文介绍了Python编程中容易出现的问题,特别是函数名、类名和模块名的重新赋值。通过具体示例展示了将内建函数(如`print`、`int`、`max`)或模块名(如`os`)重新赋值为其他类型后,会导致原有功能失效。例如,将`print`赋值为整数后,无法再用其输出内容;将`int`赋值为整数后,无法再进行类型转换。重新赋值后,这些名称失去了原有的功能,可能导致程序错误。总结指出,已有的函数名、类名和模块名不适合覆盖赋新值,否则会失去原有功能。如果需要使用类似的变量名,建议采用其他命名方式以避免冲突。
52 14
|
2月前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
116 2

热门文章

最新文章

推荐镜像

更多