神器 celery 源码解析 - 7

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: Celery是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的工具。Celery 也是一款消息队列工具,可用于处理实时数据以及任务调度。

大家好,我是肖恩,源码解析每周见


Celery是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的工具。Celery 也是一款消息队列工具,可用于处理实时数据以及任务调度。


本文是是celery源码解析的第篇,在前六篇里分别介绍了:


  1. 神器 celery 源码解析- vine实现Promise功能
  2. 神器 celery 源码解析- py-amqp实现AMQP协议
  3. 神器 celery 源码解析- kombu,一个python实现的消息库
  4. 神器 celery 源码解析- kombu的企业级算法
  5. 神器 celery 源码解析- celery启动流程分析
  6. 神器 celery 源码解析- celery启动日志跟踪


本章我们从celery的蓝图学习celery的实现细节。


蓝图设计



celery的蓝图,官方的解释是 A directed acyclic graph of reusable components ,翻译过来就是 可重用组件的有向无环图 。有WorkController(也叫worker)和Consumer两个蓝图,每个蓝图又由一些step组成,这些step根据依赖关系(requires)组成下面的树结构:


WorkController(Blueprint)
    |-  StateDB
    |-  Timer
            |- Hub
                |- Pool
                    |- WorkerComponent(Autoscaler)
    |-  Beat
    |-  Consumer(Blueprint)
            |- Connection
                |- Agent
                |- Events
                    |- Mingle
                        |- Gossip
                        |- Tasks
                            |- Control
                    |- Heart
        |- Evloop
复制代码


其中Consumer是WorkController的一个step,这个step又启动了一个Consumer的蓝图,形成一个蓝图嵌蓝图的结构。蓝图这个词,可以理解为celery启动的时候需要一些步骤,这些步骤是有依赖顺序的,同级的步骤构成一个蓝图。


Worker蓝图包括{StateDB, Timer, Hub, Pool, Autoscaler, Beat, Consumer}七个步骤,一般情况下仅仅启动了其中的三个Hub, Pool, Consumer


[2021-11-24 15:53:12,984: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2021-11-24 15:53:12,988: DEBUG/MainProcess] | Worker: Building graph...
[2021-11-24 15:53:12,988: DEBUG/MainProcess] | Worker: New boot order: {StateDB, Timer, Hub, Pool, Autoscaler, Beat, Consumer}
...
[2021-11-24 15:53:13,062: DEBUG/MainProcess] | Worker: Starting Hub
[2021-11-24 15:53:13,062: DEBUG/MainProcess] ^-- substep ok
[2021-11-24 15:53:13,062: DEBUG/MainProcess] | Worker: Starting Pool
[2021-11-24 15:53:13,410: DEBUG/MainProcess] ^-- substep ok
[2021-11-24 15:53:13,411: DEBUG/MainProcess] | Worker: Starting Consumer
复制代码


这七个蓝图的顺序和配置的顺序是有差异的:


default_steps = {
    'celery.worker.components:Hub',
    'celery.worker.components:Pool',
    'celery.worker.components:Beat',
    'celery.worker.components:Timer',
    'celery.worker.components:StateDB',
    'celery.worker.components:Consumer',
    'celery.worker.autoscale:WorkerComponent',
}
复制代码


Consumer蓝图包括{Connection, Events, Mingle, Tasks, Control, Gossip, Agent, Heart, event loop}十个步骤,一般情况下除了Agent, 其它都会启动。


[2021-11-24 15:53:13,005: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2021-11-24 15:53:13,005: DEBUG/MainProcess] | Consumer: Building graph...
[2021-11-24 15:53:13,038: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Mingle, Tasks, Control, Gossip, Agent, Heart, event loop}
...
复制代码


Blueprint主要有2个实现函数:apply 创建各个step,start 启动各个step:


def apply(self, parent, **kwargs):
    # 创建step
    self._debug('Preparing bootsteps.')
    order = self.order = []
    steps = self.steps = self.claim_steps()
    self._debug('Building graph...')
    for S in self._finalize_steps(steps):
        step = S(parent, **kwargs)
        steps[step.name] = step
        order.append(step)
    self._debug('New boot order: {%s}',
                ', '.join(s.alias for s in self.order))
    for step in order:
        # 隐式的创建step
        step.include(parent)
    return self
def start(self, parent):
    # 启动蓝图
    ...
    for i, step in enumerate(s for s in parent.steps if s is not None):
        self._debug('Starting %s', step.alias)
        self.started = i + 1
        step.start(parent)
        logger.debug('^-- substep ok')
复制代码


Step和子类StartStopStep使用enbled属性,决定step步骤的是否启用:


enabled = True
def include_if(self, parent):
    return self.enabled
def _should_include(self, parent):
    if self.include_if(parent):
        return True, self.create(parent)
    return False, None
def include(self, parent):
    inc, ret = self._should_include(parent)
    if inc:
        self.obj = ret
        parent.steps.append(self)
    return inc
复制代码


比如默认情况下StateDB会根据参数关闭:


@click.option('-S',
              '--statedb',
              cls=CeleryOption,
              type=click.Path(),
              callback=lambda ctx, _, value: value or ctx.obj.app.conf.worker_state_db,
              help_group="Worker Options",
              help="Path to the state database. The extension '.db' may be "
                   "appended to the filename.")
    ...
class StateDB(bootsteps.Step):
    """Bootstep that sets up between-restart state database file."""
    def __init__(self, w, **kwargs):
        self.enabled = w.statedb
        ...
复制代码


Step类还有requires和last两个属性,blueprint可以根据这两个属性建立所有步骤的先后顺序:


def _find_last(self):
    # 查找steps的尾
    return next((C for C in self.steps.values() if C.last), None)
def _firstpass(self, steps):
    # 查找依赖关系
    for step in steps.values():
        step.requires = [symbol_by_name(dep) for dep in step.requires]
    stream = deque(step.requires for step in steps.values())
    # 广度优先的遍历
    while stream:
        for node in stream.popleft():
            node = symbol_by_name(node)
            if node.name not in self.steps:
                steps[node.name] = node
            stream.append(node.requires)
复制代码


Consumer这个特殊的Step是这样嵌套启动Consumer蓝图的:


class Consumer(bootsteps.StartStopStep):
    """Bootstep starting the Consumer blueprint."""
    def create(self, w):
        # consumer_cls就是Consumer蓝图
        c = w.consumer = self.instantiate(
            w.consumer_cls, w.process_task,
            hostname=w.hostname,
            task_events=w.task_events,
            init_callback=w.ready_callback,
            initial_prefetch_count=prefetch_count,
            pool=w.pool,
            timer=w.timer,
            app=w.app,
            controller=w,
            hub=w.hub,
            worker_options=w.options,
            disable_rate_limits=w.disable_rate_limits,
            prefetch_multiplier=w.prefetch_multiplier,
        )
        return c
复制代码


celery将启动过程分成多个step,每个step承担不同的功能,不同的step又组合成多个蓝图,这种方式可以灵活的定义启动流程,并且让业务功能解耦,更易维护。下面我们继续学习其中的一些step。


Connection-Step实现AMQP协议连接



Connection-Step主要功能是创建connection连接:


class Connection(bootsteps.StartStopStep):
    """Service managing the consumer broker connection."""
    def __init__(self, c, **kwargs):
        c.connection = None
        super().__init__(c, **kwargs)
    def start(self, c):
        # 创建连接
        c.connection = c.connect()
        info('Connected to %s', c.connection.as_uri())
复制代码


Pool-Step实现并发模型



Pool-Step主要功能是启动一个调度池:


# Initialize bootsteps
self.pool_cls = _concurrency.get_implementation(self.pool_cls)
def create(self, w):
    ...
    # 启动concurrency模型
    pool = w.pool = self.instantiate(
            w.pool_cls, w.min_concurrency,
            initargs=(w.app, w.hostname),
            maxtasksperchild=w.max_tasks_per_child,
            max_memory_per_child=w.max_memory_per_child,
            timeout=w.time_limit,
            soft_timeout=w.soft_time_limit,
            putlocks=w.pool_putlocks and threaded,
            lost_worker_timeout=w.worker_lost_wait,
            threads=threaded,
            max_restarts=max_restarts,
            allow_restart=allow_restart,
            forking_enable=True,
            semaphore=semaphore,
            sched_strategy=self.optimization,
            app=w.app,
        )
        ...
    return pool
复制代码


并发模型主要包括下面一些实现,比如基于fork的多进程,基于eventlet和gevent的协程和多线程等:


ALIASES = {
    'prefork': 'celery.concurrency.prefork:TaskPool',
    'eventlet': 'celery.concurrency.eventlet:TaskPool',
    'gevent': 'celery.concurrency.gevent:TaskPool',
    'solo': 'celery.concurrency.solo:TaskPool',
    'processes': 'celery.concurrency.prefork:TaskPool',  # XXX compat alias
    'threads': 'celery.concurrency.thread:TaskPool'
}
def get_implementation(cls):
    """Return pool implementation by name."""
    return symbol_by_name(cls, ALIASES)
复制代码


在前一篇的日志中,我们知道默认使用的是prefork也就是多线程模式:


class TaskPool(BasePool):
    """Multiprocessing Pool implementation."""
    # billiard提供的池模式
    BlockingPool = BlockingPool
    ...
复制代码


TaskPool的实现主要依赖billiard库,我们以后再行介绍,这里简单了解一下celery的并发模型都在concurrency模块之下即可。


Evloop-Step实现事件循环



Evloop-Step是由Consumer blueprint启动:


class Evloop(bootsteps.StartStopStep):
    """Event loop service.
    Note:
        This is always started last.
    """
    # [2021-11-24 20:08:31,037: DEBUG/MainProcess] | Consumer: Starting event loop
    label = 'event loop'
    last = True
    def start(self, c):
        self.patch_all(c)
        c.loop(*c.loop_args())
复制代码


这里的loop在consumer中定义, 默认使用异步循环(asynloop)和同步循环(synloop)中的同步循环:


def synloop(obj, connection, consumer, blueprint, hub, qos,
            heartbeat, clock, hbrate=2.0, **kwargs):
    """Fallback blocking event loop for transports that doesn't support AIO."""
    RUN = bootsteps.RUN
    on_task_received = obj.create_task_handler()
    perform_pending_operations = obj.perform_pending_operations
    ...
    consumer.on_message = on_task_received
    consumer.consume()
    obj.on_ready()
    while blueprint.state == RUN and obj.connection:
        ...
        try:
            perform_pending_operations()
            connection.drain_events(timeout=2.0)
        except socket.timeout:
           ...
复制代码


循环中主要功能是:


  1. 设定消息的消费函数on_message
  2. 使用while循环阻塞监听
  3. 使用connection.drain_events消费消息(在kombu的文章中有过介绍)


因为synloop会阻塞,所以需要设置step为last,确保在蓝图的最后启动。


Consumer-Blueprint实现任务调度



我们再查看celery的任务处理日志:


[2021-11-24 21:33:50,535: INFO/MainProcess] Received task: myapp.add[e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2]
[2021-11-24 21:33:50,535: DEBUG/MainProcess] TaskPool: Apply <function _trace_task_ret at 0x7fe6086ac280> (args:('myapp.add', 'e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2', {'lang': 'py', 'task': 'myapp.add', 'id': 'e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2', 'parent_id': None, 'argsrepr': '(16, 16)', 'kwargsrepr': '{}', 'origin': 'gen83110@192.168.5.28', 'reply_to': '63862dbb-9d82-3bdd-b7fb-03580941362a', 'correlation_id': 'e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2', 'hostname': 'celery@192.168.5.28', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [16, 16], 'kwargs': {}}, b'[[16, 16], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8') kwargs:{})
[2021-11-24 21:33:50,536: DEBUG/MainProcess] Task accepted: myapp.add[e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2] pid:83086
[2021-11-24 21:33:50,537: INFO/ForkPoolWorker-8] Task myapp.add[e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2] succeeded in 0.000271957000000711s: 32
复制代码


可以发现celery的worker在主进程(MainProcess)中接收到task后,会派发给子进程(ForkPoolWorker-8)执行。


前面synloop的消费函数on_message实际上是Consumer(Blueprint)的create_task_handler:


def create_task_handler(self, promise=promise):
    strategies = self.strategies
    on_unknown_message = self.on_unknown_message
    on_unknown_task = self.on_unknown_task
    on_invalid_task = self.on_invalid_task
    callbacks = self.on_task_message
    call_soon = self.call_soon
    def on_task_received(message):
        type_ = message.headers['task']  
        ...
        strategy = strategies[type_]
        strategy(
                        message, payload,
                        promise(call_soon, (message.ack_log_error,)),
                        promise(call_soon, (message.reject_log_error,)),
                        callbacks,
                    )
        ...
    return on_task_received
复制代码


对于消息和任务的处理,celery提供了默认的执行策略:


# celery/worker/strategy.py:22
def default(task, app, consumer,
            info=logger.info, error=logger.error, task_reserved=task_reserved,
            to_system_tz=timezone.to_system, bytes=bytes,
            proto1_to_proto2=proto1_to_proto2):
    """Default task execution strategy."""
    ...
    # task event related
    # (optimized to avoid calling request.send_event)
    handle = consumer.on_task_request
    ...
    Request = symbol_by_name(task.Request)
    Req = create_request_cls(Request, task, consumer.pool, hostname, eventer)
    revoked_tasks = consumer.controller.state.revoked
    def task_message_handler(message, body, ack, reject, callbacks,
                             to_timestamp=to_timestamp):
        ....
        req = Req(
            message,
            on_ack=ack, on_reject=reject, app=app, hostname=hostname,
            eventer=eventer, task=task, connection_errors=connection_errors,
            body=body, headers=headers, decoded=decoded, utc=utc,
        )
        ...
        info('Received task: %s', req)
        ...
        handle(req)
    return task_message_handler
复制代码


default策略主要做了下面2件事:


  • 创建请求对象Request
  • 使用handle处理request对象


Request对象的执行是调用pool的执行方法:


def execute_using_pool(self, pool, **kwargs):
    """Used by the worker to send this task to the pool."""
    result = pool.apply_async(
            trace_task_ret,
            args=(self._type, task_id, self._request_dict, self._body,
                  self._content_type, self._content_encoding),
            accept_callback=self.on_accepted,
            timeout_callback=self.on_timeout,
            callback=self.on_success,
            error_callback=self.on_failure,
            soft_timeout=soft_time_limit or task.soft_time_limit,
            timeout=time_limit or task.time_limit,
            correlation_id=task_id,
        )
        # cannot create weakref to None
        self._apply_result = maybe(ref, result)
    return result
复制代码


这样远程的任务请求就派发给Pool进行执行, pool如何执行task同样以后再介绍。


Mingle-Step和Gossip-Step实现worker分布式协作



celery作为一款分布式任务调度框架,多个worker的协作由Mingle和Gossip两个step提供。我们先看Mingle-Step的日志:


[2021-12-12 13:37:56,632: DEBUG/MainProcess] | Consumer: Starting Mingle
[2021-12-12 13:37:56,632: INFO/MainProcess] mingle: searching for neighbors
[2021-12-12 13:37:57,674: INFO/MainProcess] mingle: all alone
...
复制代码


Mingle-Step实现多个worker节点的同步通讯:


def start(self, c):
    self.sync(c)
def sync(self, c):
    info('mingle: searching for neighbors')
    replies = self.send_hello(c)
    if replies:
        info('mingle: sync with %s nodes',
             len([reply for reply, value in replies.items() if value]))
        [self.on_node_reply(c, nodename, reply)
         for nodename, reply in replies.items() if reply]
        info('mingle: sync complete')
    else:
        info('mingle: all alone')
复制代码


可以看到Mingle启动后,发送hello消息,然后对其它节点的回应进行处理。hello的发送是这样的:


def send_hello(self, c):
    inspect = c.app.control.inspect(timeout=1.0, connection=c.connection)
    our_revoked = c.controller.state.revoked
    replies = inspect.hello(c.hostname, our_revoked._data) or {}
    replies.pop(c.hostname, None)  # delete my own response
    return replies
...
# celery/app/control.py
def hello(self, from_node, revoked=None):
    return self._request('hello', from_node=from_node, revoked=revoked)
复制代码


对于回应的主要处理就是对当前worker的LamportClock进行校正:


def on_node_reply(self, c, nodename, reply):
    ...
    c.app.clock.adjust(clock) if clock else c.app.clock.forward()
    ...
复制代码


Gossip-Step的功能会复杂一些,不像Mingle是一次性的,它是一个持续的过程。下面是它的日志,清晰展示会持续的监听:


[2021-12-05 15:59:19,088: DEBUG/MainProcess] w2@bogon joined the party[2021-12-12 13:37:58,096: DEBUG/MainProcess] w2@bogon joined the party
[2021-12-12 14:52:49,259: INFO/MainProcess] missed heartbeat from w2@bogon
[2021-12-12 14:52:49,262: DEBUG/MainProcess] w2@bogon joined the party
...
[2021-12-12 16:10:54,112: DEBUG/MainProcess] w2@bogon left
复制代码


Gossip是一种算法,又称流行病算法,其图示如下:


image.png


简单的说在Gossip算法中网络节点每次向自己关联的节点广播消息,直到网络中所有节点都收到消息。


celery的gossip处理消息的过程是创建自己的Consumer和定时器:


def get_consumers(self, channel):
    # 定时处理worker激活事件
    self.register_timer()
    # 消息消费者
    ev = self.Receiver(channel, routing_key='worker.#',
                       queue_ttl=self.heartbeat_interval)
    return [Consumer(
        channel,
        queues=[ev.queue],
        on_message=partial(self.on_message, ev.event_from_message),
        no_ack=True
    )]
复制代码


定时器负责处理其它节点的活跃状态, 如果节点不活跃,将它标记为脏节点,进行节点丢失处理,然后移除节点:


def periodic(self):
    workers = self.state.workers
    dirty = set()
    for worker in workers.values():
        if not worker.alive:
            dirty.add(worker)
            self.on_node_lost(worker)
    for worker in dirty:
        workers.pop(worker.hostname, None)
复制代码


消费的消息,又分成2种类型: 选举消息和其它消息。


def on_message(self, prepare, message):
    _type = message.delivery_info['routing_key']
    try:
        # 选举事件
        handler = self.event_handlers[_type]
    except KeyError:
        pass
    else:
        return handler(message.payload)
    # proto2: hostname in header; proto1: in body
    hostname = (message.headers.get('hostname') or
                message.payload['hostname'])
    if hostname != self.hostname:
        ...
        # 其它事件
        _, event = prepare(message.payload)
            self.update_state(event)
        ...
    else:
        self.clock.forward()
复制代码


选举类的消息是处理选举消息和选举ack消息:


self.event_handlers = {
            'worker.elect': self.on_elect,
            'worker.elect.ack': self.on_elect_ack,
        }
def on_elect(self, event):
    ...
def on_elect_ack(self, event):
    ...
复制代码


其它事件主要是节点的上下线之类:


self.state = c.app.events.State(
                on_node_join=self.on_node_join,
                on_node_leave=self.on_node_leave,
                max_tasks_in_memory=1,
            )
def on_node_join(self, worker):
    debug('%s joined the party', worker.hostname)
    self._call_handlers(self.on.node_join, worker)
def on_node_leave(self, worker):
    debug('%s left', worker.hostname)
    self._call_handlers(self.on.node_leave, worker)
复制代码


小结



我们通过解析celery的两个Blueprint,了解到celery worker的启动流程包括建立和broker之间的AMQP协议连接,使用进程池/线程池/协程池方式处理任务,使用hello消息进行worker节点之间的LamportClock时钟校时,使用Gossip协议进行worker节点之间的通讯协作。在多进程情况下,每次的任务都先被主进程获取,然后分配给进程池中的子进程进行执行。


参考链接




目录
相关文章
|
11天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
39 2
|
11天前
|
存储 安全 Linux
Golang的GMP调度模型与源码解析
【11月更文挑战第11天】GMP 调度模型是 Go 语言运行时系统的核心部分,用于高效管理和调度大量协程(goroutine)。它通过少量的操作系统线程(M)和逻辑处理器(P)来调度大量的轻量级协程(G),从而实现高性能的并发处理。GMP 模型通过本地队列和全局队列来减少锁竞争,提高调度效率。在 Go 源码中,`runtime.h` 文件定义了关键数据结构,`schedule()` 和 `findrunnable()` 函数实现了核心调度逻辑。通过深入研究 GMP 模型,可以更好地理解 Go 语言的并发机制。
|
24天前
|
消息中间件 缓存 安全
Future与FutureTask源码解析,接口阻塞问题及解决方案
【11月更文挑战第5天】在Java开发中,多线程编程是提高系统并发性能和资源利用率的重要手段。然而,多线程编程也带来了诸如线程安全、死锁、接口阻塞等一系列复杂问题。本文将深度剖析多线程优化技巧、Future与FutureTask的源码、接口阻塞问题及解决方案,并通过具体业务场景和Java代码示例进行实战演示。
40 3
|
1月前
|
存储
让星星⭐月亮告诉你,HashMap的put方法源码解析及其中两种会触发扩容的场景(足够详尽,有问题欢迎指正~)
`HashMap`的`put`方法通过调用`putVal`实现,主要涉及两个场景下的扩容操作:1. 初始化时,链表数组的初始容量设为16,阈值设为12;2. 当存储的元素个数超过阈值时,链表数组的容量和阈值均翻倍。`putVal`方法处理键值对的插入,包括链表和红黑树的转换,确保高效的数据存取。
56 5
|
1月前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
113 5
|
1月前
|
XML Java 数据格式
Spring底层架构源码解析(二)
Spring底层架构源码解析(二)
|
1月前
|
算法 Java 程序员
Map - TreeSet & TreeMap 源码解析
Map - TreeSet & TreeMap 源码解析
34 0
|
1月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
70 0
|
1月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
57 0
|
1月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
62 0

推荐镜像

更多
下一篇
无影云桌面