将要执行异步任务脚本 tasks.py:
from celery import Celery from celery import group # host='10.32.21.52', port=6379, db=3 app = Celery('tasks', backend = 'redis://10.32.21.52:6379/14', broker='redis://10.32.21.52:6379/15') @app.task def add(x, y): return x + y @app.task def tsum(ite): return sum(ite) @app.task(trail=True) def A(how_many): return group(B.s(i) for i in range(how_many))() @app.task(trail=True) def B(i): return pow2.delay(i) @app.task(trail=True) def pow2(i): return i ** 2
其中:app中对celery进行配置;详细的配置可以参考文档:http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#configuration
启动任务:
celery -A tasks worker --loglevel=info
启动后有如图所示的输出,显示服务器及配置信息和任务运行日志;
通过脚本调用任务:
from tasks import add from tasks import tsum from tasks import A from celery import chord result = add.delay(4, 4) ~tsum.map([range(10), range(100)]) ~add.starmap([item for item in zip(range(10), range(10))]) # chord(add.s(i, i) for i in xrange(100))(tsum.s()).get() chord((add.s(i, i) for i in xrange(100)), tsum.s())() add.chunks(zip(xrange(100), xrange(100)), 10)() print(result.backend) print result.get() result = A.delay(10)
写的任务太简单了,比单机还慢,只能上抓取任务试试了~
celery通过装饰器把各个任务抽象为消息发送给第三方的插件调度,如:redis;然后把任务分配给各个启动了celery的服务的机器去执行任务,有空研究一下源码再继续分享。