开发者社区> 问答> 正文

Celery使用app.control.purge()时正在运行的任务会发生什么?

目前我有一个用django运行的芹菜批次,如下所示:

Celery.py:

from __future__ import absolute_import, unicode_literals
import os
import celery
from celery import Celery
from celery.schedules import crontab
import django

load_dotenv(os.path.join(os.path.dirname(os.path.dirname(__file__)), '.env'))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'base.settings')
django.setup()
app = Celery('base')

app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, \*kwargs):
   app.control.purge()
   sender.add_periodic_task(30.0, check_loop.s())
   recursion_function.delay() #need to use recursive because it need to wait for loop to finish(time can't be predict)
   print("setup_periodic_tasks")

@app.task()
def check_loop():
    .....
    start = database start number
    end = database end number
    callling apis in a list from id=start to id=end
    create objects
    update database(start number = end, end number = end + 3)

    ....


@app.task()
def recursion_function(default_retry_delay=10):
   .....
   do some looping
   ....
   #when finished, call itself again
   recursion_function.apply_async(countdown=30)

我的目的是每当芹菜文件被编辑时,它将重新启动所有任务-删除尚未执行的排队任务(我这样做是因为recursion_function将在完成后再次运行,这是检查表的每条记录的工作在我的数据库中,因此我不必担心它会在中途停止)。

check_loop函数将调用具有分页功能的api以返回对象列表,我将通过表中的记录与之进行比较,如果匹配则创建另一个模型的新自定义记录

我的问题是,当我清除所有消息时,当前正在运行的任务会中途停止还是继续运行?因为如果check_loop函数停止在api列表中途循环,则它将再次运行循环,而我将创建新的重复记录,而我不想

例:

check_loop()的销毁任务期间,它在途中创建了对象(在api列表中,从元素id = 2到id = 5),服务器重启->再次运行,现在check_loop()从头开始运行(在元素的api列表上) id = 2到id = 5),然后再次从该列表中创建对象(我不希望100%)

这是怎么运行的?我只需要一个确认

编辑:

https://docs.celeryproject.org/zh-CN/4.4.1/faq.html#how-do-i-purge-all-waiting- 任务

我添加了app.control.purge(),因为当我重新启动时,recursion_function在setup_periodic_tasks中再次被调用,而recursion_function.apply_async(countdown = 30)中的上一个recursion_function也被执行,因此它自身也会倍增

问题来源:stackoverflow

展开
收起
is大龙 2020-03-24 22:28:48 769 0
1 条回答
写回答
取消 提交回答
  • ,除非重新启动工作程序,否则工作程序将继续执行当前正在运行的任务。

    另外,* _ Celery Way_ 总是希望*任务在并发环境中运行,并具有以下注意事项:

    • there are many tasks running concurrently
    • there are many celery workers executing tasks
    • same task may run again
    • multiple instances of the same task may run at the same moment
    • any task may be terminated any time

    即使您确定在您的环境中只有一个工人手动启动/停止并且这些工人都不适用,这些工人也不适用-应该以这种方式创建任务以允许所有这些事情发生。

    一些有用的技术:

    • use database transactions
    • use locking
    • split long-running tasks into faster ones
    • if task has intermediate values to be saved or they are important (i.e. non-reproducible like some api calls) and their processing in next step takes time - consider splitting into several chained tasks 如果一次只需要运行一个任务的实例-使用某种 locking *-在数据库或缓存中创建/更新锁记录,以便其他(相同任务)可以检查并知道此任务是运行并返回或等待上一个完成。

    recursion_function也可以是定期任务。成为周期性任务将确保它在每个间隔都运行,即使先前的间隔由于某种原因而失败(因此也无法像常规非周期性任务一样再次将自身排队)。使用锁定,可以确保一次只运行一个。

    *check_loop()

    首先,建议将结果保存在数据库中的一个事务中,以确保所有或不保存或修改数据库中的所有内容。

    您还可以保存一些标记,该标记指示已保存对象的数量/状态,因此以后的任务可以仅检查此标记,而不是每个对象。

    或者以某种方式在创建每个元素之前检查每个元素是否已经存在于数据库中。

    回答来源:stackoverflow

    2020-03-24 22:28:56
    赞同 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
App快速回归测试 立即下载
使用TensorFlow搭建智能开发系统自劢生成App UI代码 立即下载
使用TensorFlow搭建智能开发系统自动生成App UI 立即下载