python生产者消费者模型

简介: #-*- coding:utf-8 -*-import Queueimport threadingimport timeimport jsonimport sysimport signalimport randomreload( sys )sys.
#-*- coding:utf-8 -*-

import Queue
import threading
import time
import json
import sys
import signal
import random
reload( sys )
sys.setdefaultencoding('utf-8')

class Enum(set):
    def __getattr__(self, name):
        if name in self:
            return name
        else:
            raise AttributeError
State = Enum(['NORMAL', 'UPDATE', 'STOP'])

engine_do = True
def handler(signum, frame):
    print 'receive signal: %s' % signum
    global engine_do
    engine_do = False

signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)

class Consumer(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.do = True

    def stop(self):
        self.do = False
        print 'change consumer.do to False'
    
    def run(self):
        print 'Create new consumer thread, id: %s' % self.ident
        while self.do:
            messages = []
            result = []
            msg = random.randint(0,100)
            self.queue.put(msg)
        print 'Consumer thread will exit.'
        
class Producer(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.msgs = Queue.Queue()
        self.state = State.NORMAL
        self.do = True

    def stop(self):
        self.do = False
        self.state = State.STOP

    def run(self):
        while self.do:
            if self.state == State.NORMAL:
                if not self.queue.empty():
                    data = self.queue.get()
                    print 'Producer get data: %s' % data
                else:
                    print 'data queue is empty, sleep 5 seconds.'
                    time.sleep(5)
            elif self.state == State.STOP:
                while not self.queue.empty():
                    data = self.queue.get()
                    print 'Producer get data: %s' % data
        print 'Producer thread will exit.'

class Engine():
    def __init__(self):
        # 在获取所有的topic并初始化连接
        # 初始化消费Queue中数据的线程
        self.queue = Queue.Queue()
        self.threads_consumer = []
        self.threads_producer = []


    def run(self):
        # 启动Consumer线程
        for i in xrange(10):
            consumer = Consumer(self.queue)
            consumer.start()
            self.threads_consumer.append(consumer)
        producer = Producer(self.queue)
        self.threads_producer.append(producer)
        producer.start()
        while True:
            time.sleep(5)
            print engine_do
            if not engine_do:
                print 'engine will exit...'
                print 'first stop consumer threads'
                for consumer in self.threads_consumer:
                    consumer.stop()
                for consumer in self.threads_consumer:
                    consumer.join()
                print 'all consumer threads are done.'
                print 'second stop producer threads...'
                for producer in self.threads_producer:
                    producer.stop()
                for producer in self.threads_producer:
                    producer.join()
                print 'all producer threads are done.'
                break
        print 'All threads are not alive, main thread will exit.'
        return 

if __name__=='__main__':
    engine = Engine()
    engine.run()

目录
相关文章
|
4天前
|
机器学习/深度学习 算法框架/工具 数据库
使用Python实现深度学习模型:智能城市噪音监测与控制
使用Python实现深度学习模型:智能城市噪音监测与控制
12 1
|
11天前
|
机器学习/深度学习 TensorFlow 调度
使用Python实现深度学习模型:智能能源消耗预测与管理
使用Python实现深度学习模型:智能能源消耗预测与管理
98 30
|
4天前
|
机器学习/深度学习 数据采集 传感器
使用Python实现深度学习模型:智能极端天气事件预测
使用Python实现深度学习模型:智能极端天气事件预测
27 3
|
5天前
|
机器学习/深度学习 数据采集 消息中间件
使用Python实现智能火山活动监测模型
使用Python实现智能火山活动监测模型
21 1
|
9天前
|
机器学习/深度学习 数据可视化 TensorFlow
使用Python实现深度学习模型:智能天气预测与气候分析
使用Python实现深度学习模型:智能天气预测与气候分析
138 3
|
8天前
|
机器学习/深度学习 数据可视化 TensorFlow
使用Python实现深度学习模型:智能海洋监测与保护
使用Python实现深度学习模型:智能海洋监测与保护
35 1
|
11天前
|
机器学习/深度学习 算法 Python
深度解析机器学习中过拟合与欠拟合现象:理解模型偏差背后的原因及其解决方案,附带Python示例代码助你轻松掌握平衡技巧
【10月更文挑战第10天】机器学习模型旨在从数据中学习规律并预测新数据。训练过程中常遇过拟合和欠拟合问题。过拟合指模型在训练集上表现优异但泛化能力差,欠拟合则指模型未能充分学习数据规律,两者均影响模型效果。解决方法包括正则化、增加训练数据和特征选择等。示例代码展示了如何使用Python和Scikit-learn进行线性回归建模,并观察不同情况下的表现。
109 3
|
11天前
|
机器学习/深度学习 数据挖掘 Serverless
手把手教你全面评估机器学习模型性能:从选择正确评价指标到使用Python与Scikit-learn进行实战演练的详细指南
【10月更文挑战第10天】评估机器学习模型性能是开发流程的关键,涉及准确性、可解释性、运行速度等多方面考量。不同任务(如分类、回归)采用不同评价指标,如准确率、F1分数、MSE等。示例代码展示了使用Scikit-learn库评估逻辑回归模型的过程,包括数据准备、模型训练、性能评估及交叉验证。
29 1
|
2天前
|
机器学习/深度学习 数据采集 API
使用Python实现深度学习模型:智能光污染监测与管理
使用Python实现深度学习模型:智能光污染监测与管理
9 0
|
10天前
|
机器学习/深度学习 数据采集 算法
一个 python + 数据预处理+随机森林模型 (案列)
本文介绍了一个使用Python进行数据预处理和构建随机森林模型的实际案例。首先,作者通过删除不必要的列和特征编码对数据进行了预处理,然后应用随机森林算法进行模型训练,通过GridSearchCV优化参数,最后展示了模型的评估结果。
32 0