celery--调用异步任务的三种方法和task参数

简介: celery--调用异步任务的三种方法和task参数

调用异步任务的三种方法


第一种


调用异步任务有三种方法,前面我们使用的是task.delay(),这是apply_async方法的别名,但接受的参数较为简单


第二种


我们常用的是task.apply_async(args=[arg1,args],kwargs={key:value}):可以接受复杂的参数

这种可以接收的参数有:

  • task_id:为任务分配唯一id,默认是uuid
  • countdown:设置该任务等待一段时间在执行,单位为秒
  • eta:定义任务的开始时间,eta=time.time()+5,单位为秒,是UTC时间,设置成国内时间也没有用
  • expires:设置任务过期时间,任务在过期时间后还没有执行则被丢弃,单位为秒
  • retry:如果任务失败后,是否重试,默认为True
  • shadow:重新指定任务的名字,覆盖其在日志中使用的任务名称
  • retry_policy:{} 重试策略,max_retries:最大重试次数,默认为3次。interval_start:重试等待的时间间隔,默认为0。interval_step:每次重试让重试间隔增加的秒数,默认为0.2秒。interval_max:重试间隔最大的秒数,既通过interval_step增大到多少秒之后,就不在增加了,默认为0.2秒。
  • routing_key:自定义路由键
  • queue:指定发送到哪个队列
  • exchange:指定发送到哪个交换机
  • priority:任务队列的优先级,0到255之间,对于rabbitmq来说0是最高优先级
  • headers:为任务添加额外的消息

还是使用前面的例子,使用task1和task2两个任务,demo.py调用

在demo.py里更改调用方式

from apps.task1 import add
from apps.task2 import subs
if __name__ == '__main__':
    add.delay(3,5)
    subs.apply_async(args=[55,22],
                     task_id='aaaaa2222',
                     countdown=5,
                     shadow = 'zouzou'
                     )

执行结果


第三种


app.send_task(task1.add,args=[1,2])

不建议用,因为不会校验是否存在这个方法,直接就发送成功里,celery执行就会报错


task参数


task常用参数

  • name:可以显示指定任务的名字,默认是本函数的名字,也就是上面的 shadow
  • bind:一个bool值,设置是否绑定一个task的实例,如果绑定,task实例会作为参数传递到任务方法中(第一个参数为self),可以访问task实例的所有属性。
  • base:定义任务的基类,可以以此来定义回调函数,默认是Task类,我们也可以定义自己的Task类
  • default_retry_delay:设置该任务重试的延迟时间,当任务执行失败后,会自动重试,单位是秒,默认为3分钟
task不常用参数

  • serializer:指定本任务的序列化的方法
  • autoretry_for:设置在特定异常时重试任务,默认False不重试。
  • retry_backoff:默认Flase,设置重试时的延迟时间间隔策略
  • retry_backoff_max:设置最大延迟重试时间,默认10分钟,如果失败则不在重试
  • retry_jitter:默认为True,既引入抖动,避免重试任务集中执行
Task的一般属性

  • Task.name:任务名称
  • Task.request:当前任务的信息
  • Task.max_retries:设置重试的最大次数
  • Task.rhrows:预期错误类的可选元组
  • Task.rate_limit:设置任务类型的速度限制
  • Task.time_limit:此任务的硬限时,单位为秒
  • Task.serializer:标识要使用的默认序列化方法的字符串

修改task1.py的内容如下

from  apps import app
import celery
celery.Task   # Task的属性在这里面
class BaseTask(celery.Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print('执行task失败')
    def on_success(self, retval, task_id, args, kwargs):
        print(f'执行task成功,task id为:{task_id}')
@app.task(name='wahaha',bind=True,base=BaseTask)
def add(self,x,y):
    print(self.request.id)  # task_id
    return x+y

启动celery worker

执行demo.py

from apps.task1 import add
from apps.task2 import subs
if __name__ == '__main__':
    add.delay(3,5)
    subs.apply_async(args=[55,33])


相关文章
|
2月前
|
Java 数据库
详解Task 和 ValueTask 的使用区别
详解Task 和 ValueTask 的使用区别
52 0
|
缓存 NoSQL Redis
celery--实现异步任务
celery--实现异步任务
|
Java 调度
多线程执行顺序以及run方法的调用
多线程执行顺序以及run方法的调用
多线程执行顺序以及run方法的调用
|
存储 Oracle Java
一个线程调用两次 start()方法会出现什么情况?
一个线程调用两次 start()方法会出现什么情况?
205 0
一个线程调用两次 start()方法会出现什么情况?
c#编程:Task不包含Task.Run
c#编程:Task不包含Task.Run
338 0
|
Java C# vr&ar
C# Task用法
原文:C# Task用法 1、Task的优势   ThreadPool相比Thread来说具备了很多优势,但是ThreadPool却又存在一些使用上的不方便。比如:   ◆ ThreadPool不支持线程的取消、完成、失败通知等交互性操作;   ◆ ThreadPool不支持线程执行的先后次序;   以往,如果开发者要实现上述功能,需要完成很多额外的工作,现在,FCL中提供了一个功能更强大的概念:Task。
1654 0
C#.NET使用Task,await,async,异步执行控件耗时事件(event),不阻塞UI线程和不跨线程执行UI更新,以及其他方式比较
原文:C#.NET使用Task,await,async,异步执行控件耗时事件(event),不阻塞UI线程和不跨线程执行UI更新,以及其他方式比较 使用Task,await,async,异步执行事件(event),不阻塞UI线程和不跨线程执行UI更新   使用Task,await,async 的异步模式 去执行事件(event) 解决不阻塞UI线程和不夸跨线程执行UI更新报错的最佳实践,附加几种其他方式比较 由于是Winform代码和其他原因,本文章只做代码截图演示,不做界面UI展示,当然所有代码都会在截图展示。
4913 0
Thread类的sleep()方法和对象的wait()方法都可以让线程暂停执行,它们有什么区别?
sleep()方法(休眠)是线程类(Thread)的静态方法,调用此方法会让当前线程暂停执行指定的时间,将执行机会(CPU)让给其他线程,但是对象的锁依然保持,因此休眠时间结束后会自动恢复(线程回到就绪状态,请参考第66题中的线程状态转换图)。
1165 0
|
C#
C#跟着阿笨玩一起玩异步Task实战(一)
一、课程介绍 本次分享课程属于《C#高级编程实战技能开发宝典课程系列》中的第一部分,阿笨后续会计划将实际项目中的一些比较实用的关于C#高级编程的技巧分享出来给大家进行学习,不断的收集、整理和完善此系列课程! 本高级系列课程适合人群如下: 1、有一定的NET开发基础并对多线程技术有一定了解和认识。
1324 0
难道调用ThreadPool.QueueUserWorkItem()的时候,真是必须调用Thread.Sleep(N)吗?
开门见山,下面的例子中通过调用ThreadPool.QueueUserWorkItem(WaitCallback callBack, object state)的方式实现异步调用: 1: class Program 2: { 3: static void Main(...
1136 0