<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont

本文涉及的产品
转发路由器TR,750小时连接 100GB跨地域
简介: #-*- coding:utf-8 -*-import Queueimport threadingimport timeimport jsonimport sysimport signalimport randomreload( sys )sys.
#-*- coding:utf-8 -*-

import Queue
import threading
import time
import json
import sys
import signal
import random
reload( sys )
sys.setdefaultencoding('utf-8')

class Enum(set):
    def __getattr__(self, name):
        if name in self:
            return name
        else:
            raise AttributeError
State = Enum(['NORMAL', 'UPDATE', 'STOP'])

engine_do = True
def handler(signum, frame):
    print 'receive signal: %s' % signum
    global engine_do
    engine_do = False

signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)

class Consumer(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.do = True

    def stop(self):
        self.do = False
        print 'change consumer.do to False'
    
    def run(self):
        print 'Create new consumer thread, id: %s' % self.ident
        while self.do:
            messages = []
            result = []
            msg = random.randint(0,100)
            self.queue.put(msg)
        print 'Consumer thread will exit.'
        
class Producer(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.msgs = Queue.Queue()
        self.state = State.NORMAL
        self.do = True

    def stop(self):
        self.do = False
        self.state = State.STOP

    def run(self):
        while self.do:
            if self.state == State.NORMAL:
                if not self.queue.empty():
                    data = self.queue.get()
                    print 'Producer get data: %s' % data
                else:
                    print 'data queue is empty, sleep 5 seconds.'
                    time.sleep(5)
            elif self.state == State.STOP:
                while not self.queue.empty():
                    data = self.queue.get()
                    print 'Producer get data: %s' % data
        print 'Producer thread will exit.'

class Engine():
    def __init__(self):
        # 在获取所有的topic并初始化连接
        # 初始化消费Queue中数据的线程
        self.queue = Queue.Queue()
        self.threads_consumer = []
        self.threads_producer = []


    def run(self):
        # 启动Consumer线程
        for i in xrange(10):
            consumer = Consumer(self.queue)
            consumer.start()
            self.threads_consumer.append(consumer)
        producer = Producer(self.queue)
        self.threads_producer.append(producer)
        producer.start()
        while True:
            time.sleep(5)
            print engine_do
            if not engine_do:
                print 'engine will exit...'
                print 'first stop consumer threads'
                for consumer in self.threads_consumer:
                    consumer.stop()
                for consumer in self.threads_consumer:
                    consumer.join()
                print 'all consumer threads are done.'
                print 'second stop producer threads...'
                for producer in self.threads_producer:
                    producer.stop()
                for producer in self.threads_producer:
                    producer.join()
                print 'all producer threads are done.'
                break
        print 'All threads are not alive, main thread will exit.'
        return 

if __name__=='__main__':
    engine = Engine()
    engine.run()

目录
相关文章
|
存储 Web App开发 监控
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
我们以前使用过的对hbase和hdfs进行健康检查,及剩余hdfs容量告警,简单易用 1.针对hadoop2的脚本: #/bin/bashbin=`dirname $0`bin=`cd $bin;pwd`STATE_OK=...
1111 0
|
SQL Web App开发 前端开发
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
在运行一个group by的sql时,抛出以下错误信息: Task with the most failures(4):  -----Task ID:  task_201411191723_723592_m_000004URL:  http://DDS0204.
1073 0
|
Web App开发 前端开发
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
异步通信 对于BS(Browser-Server 浏览器)架构,很多情景下server的处理时间较长。 如果浏览器发送请求后,保持跟server的连接,等待server响应,那么一方面会对用户的体验有负面影响; 另一方面,很有可能会由于超时,提示用户服务请求失败。
833 0
|
Web App开发 前端开发 关系型数据库
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
fuser可用于查询文件、目录、socket端口和文件系统的使用进程 1.查询文件和目录使用者 fuser最基本的用法是查询某个文件或目录被哪个进程使用: # fuser -v .
943 0
|
Web App开发 前端开发
|
Web App开发 前端开发
|
存储 监控 数据库
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
为首次部署MongoDB做好准备:容量计划和监控 作者Mat Keep ,译者孙镜涛如果你已经完成了自己新的MongoDB应用程序的开发,并且现在正准备将它部署进产品中,那么你和你的运营团队需要讨论一些关键的问题: 最佳部署实践是什么? 为了确保应用程序满足它所必须的服务层次我们需要监控哪些关键指标? 如何能够确定添加分片的时机? 有哪些工具可以对数据库进行备份和恢复? 怎样才能安全地访问所有新的实时大数据? 本文介绍了硬件选择、扩展、HA和监控。
2697 0
|
Web App开发 前端开发 算法
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
基于大数据的精准营销与应用场景 2015年08月11日 大数据 大数据营销时代来临营销学领域过去半个多世纪的发展让我们见证了从“以产品为中心”到“以客户为中心”的转变。
997 0
|
Web App开发 前端开发 Java
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
 Connection reset by peer的常见原因: 1)服务器的并发连接数超过了其承载量,服务器会将其中一些连接关闭;    如果知道实际连接服务器的并发客户数没有超过服务器的承载量,看下有没有网络流量异常。
930 0