大家好,我是肖恩,源码解析每周见
Celery是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的工具。Celery 也是一款消息队列工具,可用于处理实时数据以及任务调度。
本文是是celery源码解析的第七篇,在前六篇里分别介绍了:
- 神器 celery 源码解析- vine实现Promise功能
- 神器 celery 源码解析- py-amqp实现AMQP协议
- 神器 celery 源码解析- kombu,一个python实现的消息库
- 神器 celery 源码解析- kombu的企业级算法
- 神器 celery 源码解析- celery启动流程分析
- 神器 celery 源码解析- celery启动日志跟踪
本章我们从celery的蓝图学习celery的实现细节。
蓝图设计
celery的蓝图,官方的解释是 A directed acyclic graph of reusable components ,翻译过来就是 可重用组件的有向无环图 。有WorkController(也叫worker)和Consumer两个蓝图,每个蓝图又由一些step组成,这些step根据依赖关系(requires)组成下面的树结构:
WorkController(Blueprint) |- StateDB |- Timer |- Hub |- Pool |- WorkerComponent(Autoscaler) |- Beat |- Consumer(Blueprint) |- Connection |- Agent |- Events |- Mingle |- Gossip |- Tasks |- Control |- Heart |- Evloop 复制代码
其中Consumer是WorkController的一个step,这个step又启动了一个Consumer的蓝图,形成一个蓝图嵌蓝图的结构。蓝图这个词,可以理解为celery启动的时候需要一些步骤,这些步骤是有依赖顺序的,同级的步骤构成一个蓝图。
Worker蓝图包括{StateDB, Timer, Hub, Pool, Autoscaler, Beat, Consumer}
七个步骤,一般情况下仅仅启动了其中的三个Hub, Pool, Consumer
:
[2021-11-24 15:53:12,984: DEBUG/MainProcess] | Worker: Preparing bootsteps. [2021-11-24 15:53:12,988: DEBUG/MainProcess] | Worker: Building graph... [2021-11-24 15:53:12,988: DEBUG/MainProcess] | Worker: New boot order: {StateDB, Timer, Hub, Pool, Autoscaler, Beat, Consumer} ... [2021-11-24 15:53:13,062: DEBUG/MainProcess] | Worker: Starting Hub [2021-11-24 15:53:13,062: DEBUG/MainProcess] ^-- substep ok [2021-11-24 15:53:13,062: DEBUG/MainProcess] | Worker: Starting Pool [2021-11-24 15:53:13,410: DEBUG/MainProcess] ^-- substep ok [2021-11-24 15:53:13,411: DEBUG/MainProcess] | Worker: Starting Consumer 复制代码
这七个蓝图的顺序和配置的顺序是有差异的:
default_steps = { 'celery.worker.components:Hub', 'celery.worker.components:Pool', 'celery.worker.components:Beat', 'celery.worker.components:Timer', 'celery.worker.components:StateDB', 'celery.worker.components:Consumer', 'celery.worker.autoscale:WorkerComponent', } 复制代码
Consumer蓝图包括{Connection, Events, Mingle, Tasks, Control, Gossip, Agent, Heart, event loop}
十个步骤,一般情况下除了Agent
, 其它都会启动。
[2021-11-24 15:53:13,005: DEBUG/MainProcess] | Consumer: Preparing bootsteps. [2021-11-24 15:53:13,005: DEBUG/MainProcess] | Consumer: Building graph... [2021-11-24 15:53:13,038: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Mingle, Tasks, Control, Gossip, Agent, Heart, event loop} ... 复制代码
Blueprint
主要有2个实现函数:apply
创建各个step,start
启动各个step:
def apply(self, parent, **kwargs): # 创建step self._debug('Preparing bootsteps.') order = self.order = [] steps = self.steps = self.claim_steps() self._debug('Building graph...') for S in self._finalize_steps(steps): step = S(parent, **kwargs) steps[step.name] = step order.append(step) self._debug('New boot order: {%s}', ', '.join(s.alias for s in self.order)) for step in order: # 隐式的创建step step.include(parent) return self def start(self, parent): # 启动蓝图 ... for i, step in enumerate(s for s in parent.steps if s is not None): self._debug('Starting %s', step.alias) self.started = i + 1 step.start(parent) logger.debug('^-- substep ok') 复制代码
Step
和子类StartStopStep
使用enbled属性,决定step步骤的是否启用:
enabled = True def include_if(self, parent): return self.enabled def _should_include(self, parent): if self.include_if(parent): return True, self.create(parent) return False, None def include(self, parent): inc, ret = self._should_include(parent) if inc: self.obj = ret parent.steps.append(self) return inc 复制代码
比如默认情况下StateDB会根据参数关闭:
@click.option('-S', '--statedb', cls=CeleryOption, type=click.Path(), callback=lambda ctx, _, value: value or ctx.obj.app.conf.worker_state_db, help_group="Worker Options", help="Path to the state database. The extension '.db' may be " "appended to the filename.") ... class StateDB(bootsteps.Step): """Bootstep that sets up between-restart state database file.""" def __init__(self, w, **kwargs): self.enabled = w.statedb ... 复制代码
Step
类还有requires和last两个属性,blueprint可以根据这两个属性建立所有步骤的先后顺序:
def _find_last(self): # 查找steps的尾 return next((C for C in self.steps.values() if C.last), None) def _firstpass(self, steps): # 查找依赖关系 for step in steps.values(): step.requires = [symbol_by_name(dep) for dep in step.requires] stream = deque(step.requires for step in steps.values()) # 广度优先的遍历 while stream: for node in stream.popleft(): node = symbol_by_name(node) if node.name not in self.steps: steps[node.name] = node stream.append(node.requires) 复制代码
Consumer这个特殊的Step是这样嵌套启动Consumer蓝图的:
class Consumer(bootsteps.StartStopStep): """Bootstep starting the Consumer blueprint.""" def create(self, w): # consumer_cls就是Consumer蓝图 c = w.consumer = self.instantiate( w.consumer_cls, w.process_task, hostname=w.hostname, task_events=w.task_events, init_callback=w.ready_callback, initial_prefetch_count=prefetch_count, pool=w.pool, timer=w.timer, app=w.app, controller=w, hub=w.hub, worker_options=w.options, disable_rate_limits=w.disable_rate_limits, prefetch_multiplier=w.prefetch_multiplier, ) return c 复制代码
celery将启动过程分成多个step,每个step承担不同的功能,不同的step又组合成多个蓝图,这种方式可以灵活的定义启动流程,并且让业务功能解耦,更易维护。下面我们继续学习其中的一些step。
Connection-Step实现AMQP协议连接
Connection-Step主要功能是创建connection连接:
class Connection(bootsteps.StartStopStep): """Service managing the consumer broker connection.""" def __init__(self, c, **kwargs): c.connection = None super().__init__(c, **kwargs) def start(self, c): # 创建连接 c.connection = c.connect() info('Connected to %s', c.connection.as_uri()) 复制代码
Pool-Step实现并发模型
Pool-Step主要功能是启动一个调度池:
# Initialize bootsteps self.pool_cls = _concurrency.get_implementation(self.pool_cls) def create(self, w): ... # 启动concurrency模型 pool = w.pool = self.instantiate( w.pool_cls, w.min_concurrency, initargs=(w.app, w.hostname), maxtasksperchild=w.max_tasks_per_child, max_memory_per_child=w.max_memory_per_child, timeout=w.time_limit, soft_timeout=w.soft_time_limit, putlocks=w.pool_putlocks and threaded, lost_worker_timeout=w.worker_lost_wait, threads=threaded, max_restarts=max_restarts, allow_restart=allow_restart, forking_enable=True, semaphore=semaphore, sched_strategy=self.optimization, app=w.app, ) ... return pool 复制代码
并发模型主要包括下面一些实现,比如基于fork的多进程,基于eventlet和gevent的协程和多线程等:
ALIASES = { 'prefork': 'celery.concurrency.prefork:TaskPool', 'eventlet': 'celery.concurrency.eventlet:TaskPool', 'gevent': 'celery.concurrency.gevent:TaskPool', 'solo': 'celery.concurrency.solo:TaskPool', 'processes': 'celery.concurrency.prefork:TaskPool', # XXX compat alias 'threads': 'celery.concurrency.thread:TaskPool' } def get_implementation(cls): """Return pool implementation by name.""" return symbol_by_name(cls, ALIASES) 复制代码
在前一篇的日志中,我们知道默认使用的是prefork也就是多线程模式:
class TaskPool(BasePool): """Multiprocessing Pool implementation.""" # billiard提供的池模式 BlockingPool = BlockingPool ... 复制代码
TaskPool的实现主要依赖billiard库,我们以后再行介绍,这里简单了解一下celery的并发模型都在concurrency模块之下即可。
Evloop-Step实现事件循环
Evloop-Step是由Consumer blueprint启动:
class Evloop(bootsteps.StartStopStep): """Event loop service. Note: This is always started last. """ # [2021-11-24 20:08:31,037: DEBUG/MainProcess] | Consumer: Starting event loop label = 'event loop' last = True def start(self, c): self.patch_all(c) c.loop(*c.loop_args()) 复制代码
这里的loop在consumer中定义, 默认使用异步循环(asynloop)和同步循环(synloop)中的同步循环:
def synloop(obj, connection, consumer, blueprint, hub, qos, heartbeat, clock, hbrate=2.0, **kwargs): """Fallback blocking event loop for transports that doesn't support AIO.""" RUN = bootsteps.RUN on_task_received = obj.create_task_handler() perform_pending_operations = obj.perform_pending_operations ... consumer.on_message = on_task_received consumer.consume() obj.on_ready() while blueprint.state == RUN and obj.connection: ... try: perform_pending_operations() connection.drain_events(timeout=2.0) except socket.timeout: ... 复制代码
循环中主要功能是:
- 设定消息的消费函数on_message
- 使用while循环阻塞监听
- 使用connection.drain_events消费消息(在kombu的文章中有过介绍)
因为synloop会阻塞,所以需要设置step为last,确保在蓝图的最后启动。
Consumer-Blueprint实现任务调度
我们再查看celery的任务处理日志:
[2021-11-24 21:33:50,535: INFO/MainProcess] Received task: myapp.add[e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2] [2021-11-24 21:33:50,535: DEBUG/MainProcess] TaskPool: Apply <function _trace_task_ret at 0x7fe6086ac280> (args:('myapp.add', 'e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2', {'lang': 'py', 'task': 'myapp.add', 'id': 'e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2', 'parent_id': None, 'argsrepr': '(16, 16)', 'kwargsrepr': '{}', 'origin': 'gen83110@192.168.5.28', 'reply_to': '63862dbb-9d82-3bdd-b7fb-03580941362a', 'correlation_id': 'e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2', 'hostname': 'celery@192.168.5.28', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [16, 16], 'kwargs': {}}, b'[[16, 16], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8') kwargs:{}) [2021-11-24 21:33:50,536: DEBUG/MainProcess] Task accepted: myapp.add[e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2] pid:83086 [2021-11-24 21:33:50,537: INFO/ForkPoolWorker-8] Task myapp.add[e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2] succeeded in 0.000271957000000711s: 32 复制代码
可以发现celery的worker在主进程(MainProcess)中接收到task后,会派发给子进程(ForkPoolWorker-8)执行。
前面synloop的消费函数on_message实际上是Consumer(Blueprint)的create_task_handler:
def create_task_handler(self, promise=promise): strategies = self.strategies on_unknown_message = self.on_unknown_message on_unknown_task = self.on_unknown_task on_invalid_task = self.on_invalid_task callbacks = self.on_task_message call_soon = self.call_soon def on_task_received(message): type_ = message.headers['task'] ... strategy = strategies[type_] strategy( message, payload, promise(call_soon, (message.ack_log_error,)), promise(call_soon, (message.reject_log_error,)), callbacks, ) ... return on_task_received 复制代码
对于消息和任务的处理,celery提供了默认的执行策略:
# celery/worker/strategy.py:22 def default(task, app, consumer, info=logger.info, error=logger.error, task_reserved=task_reserved, to_system_tz=timezone.to_system, bytes=bytes, proto1_to_proto2=proto1_to_proto2): """Default task execution strategy.""" ... # task event related # (optimized to avoid calling request.send_event) handle = consumer.on_task_request ... Request = symbol_by_name(task.Request) Req = create_request_cls(Request, task, consumer.pool, hostname, eventer) revoked_tasks = consumer.controller.state.revoked def task_message_handler(message, body, ack, reject, callbacks, to_timestamp=to_timestamp): .... req = Req( message, on_ack=ack, on_reject=reject, app=app, hostname=hostname, eventer=eventer, task=task, connection_errors=connection_errors, body=body, headers=headers, decoded=decoded, utc=utc, ) ... info('Received task: %s', req) ... handle(req) return task_message_handler 复制代码
default策略主要做了下面2件事:
- 创建请求对象Request
- 使用handle处理request对象
Request对象的执行是调用pool的执行方法:
def execute_using_pool(self, pool, **kwargs): """Used by the worker to send this task to the pool.""" result = pool.apply_async( trace_task_ret, args=(self._type, task_id, self._request_dict, self._body, self._content_type, self._content_encoding), accept_callback=self.on_accepted, timeout_callback=self.on_timeout, callback=self.on_success, error_callback=self.on_failure, soft_timeout=soft_time_limit or task.soft_time_limit, timeout=time_limit or task.time_limit, correlation_id=task_id, ) # cannot create weakref to None self._apply_result = maybe(ref, result) return result 复制代码
这样远程的任务请求就派发给Pool进行执行, pool如何执行task同样以后再介绍。
Mingle-Step和Gossip-Step实现worker分布式协作
celery作为一款分布式任务调度框架,多个worker的协作由Mingle和Gossip两个step提供。我们先看Mingle-Step的日志:
[2021-12-12 13:37:56,632: DEBUG/MainProcess] | Consumer: Starting Mingle [2021-12-12 13:37:56,632: INFO/MainProcess] mingle: searching for neighbors [2021-12-12 13:37:57,674: INFO/MainProcess] mingle: all alone ... 复制代码
Mingle-Step实现多个worker节点的同步通讯:
def start(self, c): self.sync(c) def sync(self, c): info('mingle: searching for neighbors') replies = self.send_hello(c) if replies: info('mingle: sync with %s nodes', len([reply for reply, value in replies.items() if value])) [self.on_node_reply(c, nodename, reply) for nodename, reply in replies.items() if reply] info('mingle: sync complete') else: info('mingle: all alone') 复制代码
可以看到Mingle启动后,发送hello消息,然后对其它节点的回应进行处理。hello的发送是这样的:
def send_hello(self, c): inspect = c.app.control.inspect(timeout=1.0, connection=c.connection) our_revoked = c.controller.state.revoked replies = inspect.hello(c.hostname, our_revoked._data) or {} replies.pop(c.hostname, None) # delete my own response return replies ... # celery/app/control.py def hello(self, from_node, revoked=None): return self._request('hello', from_node=from_node, revoked=revoked) 复制代码
对于回应的主要处理就是对当前worker的LamportClock进行校正:
def on_node_reply(self, c, nodename, reply): ... c.app.clock.adjust(clock) if clock else c.app.clock.forward() ... 复制代码
Gossip-Step的功能会复杂一些,不像Mingle是一次性的,它是一个持续的过程。下面是它的日志,清晰展示会持续的监听:
[2021-12-05 15:59:19,088: DEBUG/MainProcess] w2@bogon joined the party[2021-12-12 13:37:58,096: DEBUG/MainProcess] w2@bogon joined the party [2021-12-12 14:52:49,259: INFO/MainProcess] missed heartbeat from w2@bogon [2021-12-12 14:52:49,262: DEBUG/MainProcess] w2@bogon joined the party ... [2021-12-12 16:10:54,112: DEBUG/MainProcess] w2@bogon left 复制代码
Gossip是一种算法,又称流行病算法,其图示如下:
简单的说在Gossip算法中网络节点每次向自己关联的节点广播消息,直到网络中所有节点都收到消息。
celery的gossip处理消息的过程是创建自己的Consumer和定时器:
def get_consumers(self, channel): # 定时处理worker激活事件 self.register_timer() # 消息消费者 ev = self.Receiver(channel, routing_key='worker.#', queue_ttl=self.heartbeat_interval) return [Consumer( channel, queues=[ev.queue], on_message=partial(self.on_message, ev.event_from_message), no_ack=True )] 复制代码
定时器负责处理其它节点的活跃状态, 如果节点不活跃,将它标记为脏节点,进行节点丢失处理,然后移除节点:
def periodic(self): workers = self.state.workers dirty = set() for worker in workers.values(): if not worker.alive: dirty.add(worker) self.on_node_lost(worker) for worker in dirty: workers.pop(worker.hostname, None) 复制代码
消费的消息,又分成2种类型: 选举消息和其它消息。
def on_message(self, prepare, message): _type = message.delivery_info['routing_key'] try: # 选举事件 handler = self.event_handlers[_type] except KeyError: pass else: return handler(message.payload) # proto2: hostname in header; proto1: in body hostname = (message.headers.get('hostname') or message.payload['hostname']) if hostname != self.hostname: ... # 其它事件 _, event = prepare(message.payload) self.update_state(event) ... else: self.clock.forward() 复制代码
选举类的消息是处理选举消息和选举ack消息:
self.event_handlers = { 'worker.elect': self.on_elect, 'worker.elect.ack': self.on_elect_ack, } def on_elect(self, event): ... def on_elect_ack(self, event): ... 复制代码
其它事件主要是节点的上下线之类:
self.state = c.app.events.State( on_node_join=self.on_node_join, on_node_leave=self.on_node_leave, max_tasks_in_memory=1, ) def on_node_join(self, worker): debug('%s joined the party', worker.hostname) self._call_handlers(self.on.node_join, worker) def on_node_leave(self, worker): debug('%s left', worker.hostname) self._call_handlers(self.on.node_leave, worker) 复制代码
小结
我们通过解析celery的两个Blueprint,了解到celery worker的启动流程包括建立和broker之间的AMQP协议连接,使用进程池/线程池/协程池方式处理任务,使用hello消息进行worker节点之间的LamportClock时钟校时,使用Gossip协议进行worker节点之间的通讯协作。在多进程情况下,每次的任务都先被主进程获取,然后分配给进程池中的子进程进行执行。