查看芹菜文档,我可以看到任务监视器是在一个脚本中启动的(见下面)。在django的实现中(据我所知),情况并非如此,因为我必须在线程中启动任务监视器。 当前,我在第一次运行作业时启动监视器,然后在以后每次运行作业时检查它的状态(参见下面的内容)。这似乎不是一个好方法。 在django项目中为芹菜实例化任务监视器的正确方法是什么? 我似乎错过了一些很明显的东西。
# docs example
from celery import Celery
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
# task name is sent only with -received event, and state
# will keep track of this for us.
task = state.tasks.get(event['uuid'])
print('TASK FAILED: %s[%s] %s' % (
task.name, task.uuid, task.info(),))
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
app = Celery(broker='amqp://guest@localhost//')
# LAUNCHED HERE
my_monitor(app)
# my current implementation
# If the celery_monitor is not instantiated, set it up
app = Celery('scheduler',
broker=rabbit_url, # Rabbit-MQ
backend=redis_url, # Redis
include=tasks
)
celery_monitor = Thread(target=build_monitor, args=[app], name='monitor-global', daemon=True)
# import celery_monitor into another module
global celery_monitor
if not celery_monitor.is_alive():
try:
celery_monitor.start()
logger.debug('Celery Monitor - Thread Started (monitor-retry) ')
except RuntimeError as e: # occurs if thread is dead
# create new instance if thread is dead
logger.debug('Celery Monitor - Error restarting thread (monitor-rety): {}'.format(e))
celery_monitor = Thread(target=build_monitor, args=[app], name='monitor-retry', daemon=True)
celery_monitor.start() # start thread
logger.debug('Celery Monitor - Thread Re-Started (monitor-retry) ')
else:
logger.debug('Celery Monitor - Thread is already alive. Dont do anything.')
问题来源StackOverflow 地址:/questions/59379268/launching-celery-task-monitor-in-django
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。