#-*- coding:utf-8 -*- import Queue import threading import time import json import sys import signal import random reload( sys ) sys.setdefaultencoding('utf-8') class Enum(set): def __getattr__(self, name): if name in self: return name else: raise AttributeError State = Enum(['NORMAL', 'UPDATE', 'STOP']) engine_do = True def handler(signum, frame): print 'receive signal: %s' % signum global engine_do engine_do = False signal.signal(signal.SIGINT, handler) signal.signal(signal.SIGTERM, handler) class Consumer(threading.Thread): def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue self.do = True def stop(self): self.do = False print 'change consumer.do to False' def run(self): print 'Create new consumer thread, id: %s' % self.ident while self.do: messages = [] result = [] msg = random.randint(0,100) self.queue.put(msg) print 'Consumer thread will exit.' class Producer(threading.Thread): def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue self.msgs = Queue.Queue() self.state = State.NORMAL self.do = True def stop(self): self.do = False self.state = State.STOP def run(self): while self.do: if self.state == State.NORMAL: if not self.queue.empty(): data = self.queue.get() print 'Producer get data: %s' % data else: print 'data queue is empty, sleep 5 seconds.' time.sleep(5) elif self.state == State.STOP: while not self.queue.empty(): data = self.queue.get() print 'Producer get data: %s' % data print 'Producer thread will exit.' class Engine(): def __init__(self): # 在获取所有的topic并初始化连接 # 初始化消费Queue中数据的线程 self.queue = Queue.Queue() self.threads_consumer = [] self.threads_producer = [] def run(self): # 启动Consumer线程 for i in xrange(10): consumer = Consumer(self.queue) consumer.start() self.threads_consumer.append(consumer) producer = Producer(self.queue) self.threads_producer.append(producer) producer.start() while True: time.sleep(5) print engine_do if not engine_do: print 'engine will exit...' print 'first stop consumer threads' for consumer in self.threads_consumer: consumer.stop() for consumer in self.threads_consumer: consumer.join() print 'all consumer threads are done.' print 'second stop producer threads...' for producer in self.threads_producer: producer.stop() for producer in self.threads_producer: producer.join() print 'all producer threads are done.' break print 'All threads are not alive, main thread will exit.' return if __name__=='__main__': engine = Engine() engine.run()