我亲自试过,确实有用
""" @author: Zhigang Jiang @date: 2022/1/16 @description: """ import functools import pika import threading import time def ack_message(channel, delivery_tag): print(f'ack_message thread id: {threading.get_ident()}') if channel.is_open: channel.basic_ack(delivery_tag) else: # Channel is already closed, so we can't ACK this message; # log and/or do something that makes sense for your app in this case. pass def do_work(channel, delivery_tag, body): print(f'do_work thread id: {threading.get_ident()}') print(body, "start") for i in range(10): print(i) time.sleep(20) print(body, "end") cb = functools.partial(ack_message, channel, delivery_tag) channel.connection.add_callback_threadsafe(cb) def on_message(channel, method_frame, header_frame, body): print(f'on_message thread id: {threading.get_ident()}') delivery_tag = method_frame.delivery_tag t = threading.Thread(target=do_work, args=(channel, delivery_tag, body)) t.start() credentials = pika.PlainCredentials('username', 'password') parameters = pika.ConnectionParameters('test.webapi.username.com', credentials=credentials, heartbeat=5) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare(queue="standard", durable=True) channel.basic_qos(prefetch_count=1) channel.basic_consume('standard', on_message) print(f'main thread id: {threading.get_ident()}') try: channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() connection.close()
长时间的话,家里的网抖动可能出现,我们家有时候就会断网个10几秒,有时候打游戏就会掉线:
pika.exceptions.AMQPHeartbeatTimeout: No activity or too many missed heartbeats in the last xx seconds
这种情况,把他拉起就行了,加一个
感谢每一个认真阅读我文章的人,看着粉丝一路的上涨和关注,礼尚往来总是要有的:
① 2000多本Python电子书(主流和经典的书籍应该都有了)
② Python标准库资料(最全中文版)
③ 项目源码(四五十个有趣且经典的练手项目及源码)
④ Python基础入门、爬虫、web开发、大数据分析方面的视频(适合小白学习)
⑤ Python学习路线图(告别不入流的学习)