神器 celery 源码解析 - 8

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: Celery是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的工具。Celery 也是一款消息队列工具,可用于处理实时数据以及任务调度。

大家好,我是肖恩,源码解析每周见


Celery是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的工具。Celery 也是一款消息队列工具,可用于处理实时数据以及任务调度。


本文是是celery源码解析的第篇,在前七篇里分别介绍了:


  1. 神器 celery 源码解析- vine实现Promise功能
  2. 神器 celery 源码解析- py-amqp实现AMQP协议
  3. 神器 celery 源码解析- kombu,一个python实现的消息库
  4. 神器 celery 源码解析- kombu的企业级算法
  5. 神器 celery 源码解析- celery启动流程分析
  6. 神器 celery 源码解析- celery启动日志跟踪
  7. 神器 celery 源码解析- 蓝图分析


本章我们从celery的设计工作流程来了解分布式任务的实现。


基础概念



日常生活中一个大型的任务,我们一般都会进行拆分,变成若干小任务;为了提高处理效率,缩短任务的执行时间,我们还会将没有依赖关系的任务并行推进,最后再把所有分支任务进行汇总。


celery作为一个分布式任务系统,也是这样设计其工作流的。在开始之前我们先看看其中几个基础概念。


高阶函数(Map, Filter and Reduce)



python中提供了3个高阶函数: map, filter和reduce,可以对数据(任务)进行批处理。下面是map函数:


items = [1, 2, 3, 4, 5]
squared = list(map(lambda x: x**2, items))
print(squared)
# output: [1, 4, 9, 16, 25]
复制代码


可以看到使用map函数对目标列表[1, 2, 3, 4, 5]的每一个元素,执行了一次平方操作,并且返回一个新的结果列表[1, 4, 9, 16, 25]。使用map函数,比我们使用for循环实现要简洁。


filter函数和map类似,区别在于对目标列表的每个元素执行一次条件过滤判断,符合要求的数据才会保存到结果列表:


number_list = range(-5, 5)
less_than_zero = list(filter(lambda x: x < 0, number_list))
print(less_than_zero)
# Output: [-5, -4, -3, -2, -1]
复制代码


reduce函数则是对目标列表依次执行函数并且进行累积:


from functools import reduce
# 1*2*3*4=24
product = reduce((lambda x, y: x * y), [1, 2, 3, 4])
# Output: 24
复制代码


大数据中的MapReduce也是这样的思路,把大量的数据分成多个块,分别计算,最后再把结果汇总:


image.png


管道模型(pipeline)


管道模型,大家应该都不陌生。比如linux系统中使用管道符 | ,用下面命令快速查找 nginx的进程:


ps -ef | grep nginx
复制代码


在我之前的requests源码解析文章中,也介绍过hook系统使用管道模型,可以把多个钩子串行化处理:


def dispatch_hook(key, hooks, hook_data, **kwargs):
    """Dispatches a hook dictionary on a given piece of data."""
    ...
    for hook in hooks:
        _hook_data = hook(hook_data, **kwargs)
        if _hook_data is not None:
            hook_data = _hook_data
    ...
    return hook_data
复制代码


dispatch_hook函数中:


  • 每个hook接收hook_data数据并返回一个hook_data数据
  • 前一个hook的返回值会当做下一个hook的参数


柯里化(Currying)


柯里化(Currying)是一种关于函数的高阶技术, 柯里化是一种函数的转换,它是指将一个函数从可调用的 f(a, b, c) 转换为可调用的 f(a)(b)(c)。柯里化不会调用函数。它只是对函数进行转换。下面是js版本示例:


function curry(f) { // curry(f) 执行柯里化转换
  return function(a) {
    return function(b) {
      return f(a, b);
    };
  };
}
// 用法
function sum(a, b) {
  return a + b;
}
let curriedSum = curry(sum);
alert( curriedSum(1)(2) ); // 3
复制代码


  • sum函数的两个参数,通过柯里化的处理方式,变成了2个匿名函数
  • 每次函数的调用会传递1个参数并返回一个新函数,直到执行完成


部分函数(partial)


在python中partial()会被“冻结了”一部分函数参数和/或关键字的部分函数应用所使用,从而得到一个具有简化签名的新对象,和柯里化很类似。下面是示例:


from functools import partial
# A normal function
def f(a, b, c, x):
  return 1000*a + 100*b + 10*c + x
# A partial function that calls f with
# a as 3, b as 1 and c as 4.
g = partial(f, 3, 1, 4)
# Calling g()
print(g(5))
print(g(6))
复制代码


  • 通过partial将f函数的4个参数分成2个部分传入
  • 先传入3,1,4得到一个函数g
  • 使用g可以用来反复用计算x的值


currying和partial都是利用了闭包的特性,将函数的部分参数临时缓存住,从而达到重用函数的目的。


celery工作流(workflow)示例



签名(Signatures)


签名可以把任务使用函数签名的方式进行调用,这样可以让任务的生产者和消费者进行解耦。这样介绍有点泛泛,还是看一看其具体的使用过程, 我们先使用celery -A myapp worker -l DEBUG启动我们的测试worker ,然后新开一个python3的shell:


✗ python3
Python 3.8.5 (v3.8.5:580fbb018f, Jul 20 2020, 12:11:27)
[Clang 6.0 (clang-600.0.57)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from celery import Celery
>>> app = Celery(
...     'myapp',
...     broker='redis://localhost:6379/0',
...     result_backend='redis://localhost:6379/0',
...     # ## add result backend here if needed.
...     backend='rpc'
... )
>>>
复制代码


在新的shell中我们需要创建celery的app,然后使用signature方式创建一个task:


>>> from celery import signature
>>> t = signature('myapp.add', args=(2, 2), countdown=10)
复制代码


这个签名包括了任务的名称是 myapp.add, 2个参数,超时时间设置为10s。使用task的apply_async函数获取到AsyncResult结果:


>>> r = t.apply_async()
>>> r.get()
4
复制代码


观测worker的日志也可以看到myapp.add被远程执行。


本地(进程)是没有add这个任务函数的,可以使用下面方式校验,直接执行t任务会报错:


>>> t()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/yoo/work/yuanmahui/python/.venv/lib/python3.8/site-packages/celery/canvas.py", line 174, in __call__
    return self.type(*args, **kwargs)
  File "/Users/yoo/work/yuanmahui/python/.venv/lib/python3.8/site-packages/kombu/utils/objects.py", line 29, in __get__
    return super().__get__(instance, owner)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/functools.py", line 967, in __get__
    val = self.func(instance)
  File "/Users/yoo/work/yuanmahui/python/.venv/lib/python3.8/site-packages/celery/canvas.py", line 505, in type
    return self._type or self.app.tasks[self['task']]
  File "/Users/yoo/work/yuanmahui/python/.venv/lib/python3.8/site-packages/celery/app/registry.py", line 18, in __missing__
    raise self.NotRegistered(key)
celery.exceptions.NotRegistered: 'myapp.add'
复制代码


上面就是signature的示例。这样我们的任务系统就可以脱离web服务的实现,仅提供任务函数的签名,也就是任务名称和参数要求。在web服务中要执行任务只需要使用这个函数签名进行调用。web服务和celery任务解耦后,让代码维护更容易维护。


signature还提供了快捷方式, 可以从task创建:


>>> add.signature((2, 2), countdown=10)
tasks.add(2, 2)
>>> add.s(2, 2)
tasks.add(2, 2)
复制代码


原语(Primitive)


celery提供了一系列的组合任务执行的方法函数,叫做原语(Primitives),主要包括:


  • group 可以并行处理的一组任务
  • chain 将任务组装成一个调用链
  • chord 可以使用Map-Reduce的方式执行任务
  • map 使用map方式执行系列任务
  • starmap 类似map方式,支持*args
  • chunks 将任务拆分成块进行执行


我们重点看一看chain,chord和chunks三种方式的示例。


下面是chain方式:


>>> from celery import chain
>>> # 2 + 2 + 4 + 8
>>> res = chain(add.s(2, 2), add.s(4), add.s(8))()
>>> res.get()
16
复制代码


可以看到3个add任务使用chain关键字串联起来,前面函数的结果是后面函数的参数。第一个add使用了2个参数2+2, 第二/三个add则只有一个参数。chain还可以使用管道方式创建:


>>> res = (add.si(2, 2) | add.si(4, 4) | add.si(8, 8))()
>>> res.get()
16
复制代码


chord(和弦)的示例如下:


>>> from celery import chord
>>> res = chord((add.s(i, i) for i in range(10)), xsum.s())()
>>> res.get()
90
复制代码


  • chord 需要提供2个参数,第一个是任务列表,这种分步任务叫做header-task,第二个是汇总任务叫做body-task
  • 任务提交后会先执行每个header-task,最后再把这些header-task的结果使用body-task汇总


chunks的示例:


>>> items = zip(range(1000), range(1000))  # 1000 items
>>> add.chunks(items, 10)
复制代码


先复习一下zip函数的逻辑:


languages = ['Java', 'Python', 'JavaScript']
versions = [14, 3, 6]
result = zip(languages, versions)
print(list(result))
# Output: [('Java', 14), ('Python', 3), ('JavaScript', 6)]
复制代码


可以看到zip把两个列表合并形成一个二维数组,就像拉链一样。上面的add.chunks就是把1000个元素的二维数组,逐一执行一下add函数,执行过程分成10个块,这样会产生100个任务。使用chunks可以批量生成的系列任务。


回头再看 原语,简单的理解就是把一系列复杂的逻辑操作抽象封装成一个单一的操作(函数),提供一个唯一的关键字。


canvas模块实现



了解了celery-workflow的基础知识和示例后,我们继续深入了解它是如何实现的。主要实现都在canvas模块,其类结构大概如下:


+-----------+
                          | Signature |
                          +------^----+
                                 |
      +------------+-------------+-------------+----------+--------------+
      |            |             |             |          |              |
      |            |             |             |          |              |
+-----+-+     +----+---+    +-------+     +----+--+    +--+---+    +-----+----+
| chain |     | chunks |    | group |     | chord |    | xmap |    | xstarmap |
+-------+     +--------+    +-------+     +-------+    +------+    +----------+
复制代码


因为所有的原语都继承自Signature类,我们从Signature开始:


class Signature(dict):
    def __init__(self, task=None, args=None, kwargs=None, options=None,
             type=None, subtask_type=None, immutable=False,
             app=None, **ex):
        self._app = app
        try:
            # task
            task_name = task.name
        except AttributeError:
            # 字符串
            task_name = task
        else:
            self._type = task
        # self['task']
        super().__init__(
            task=task_name, args=tuple(args or ()),
            kwargs=kwargs or {},
            options=dict(options or {}, **ex),
            subtask_type=subtask_type,
            immutable=immutable,
            chord_size=None,
        )
复制代码


Signature继承自dict,主要的key是:task,就是任务的名称;args和kwargs,执行任务的参数;options,前面的选项。重点是signature关联app,这个后面会用来执行任务。


执行任务使用apply_async:


def apply_async(self, args=None, kwargs=None, route_name=None, **options):
    args = args if args else ()
    kwargs = kwargs if kwargs else {}
    # Extra options set to None are dismissed
    options = {k: v for k, v in options.items() if v is not None}
    try:
        _apply = self._apply_async
    except IndexError:  # pragma: no cover
        # no tasks for chain, etc to find type
        return
    # For callbacks: extra args are prepended to the stored args.
    if args or kwargs or options:
        # 合并参数
        args, kwargs, options = self._merge(args, kwargs, options)
    else:
        args, kwargs, options = self.args, self.kwargs, self.options
    # pylint: disable=too-many-function-args
    #   Borks on this, as it's a property
    return _apply(args, kwargs, **options)
复制代码


apply_async执行任务的时候,还可以继续传入参数,传入的参数会和signature的参数进行合并,这就为chain提供了基础。创建signature时候提供一部分参数,执行的时候再提供一部分参数。任务的具体执行,实际上使用的是app的send_task方法,因为每个signature的task_name都是一样的,所以这里使用了partial的方式:


def _apply_async(self):
    try:
        return self.type.apply_async
    except KeyError:
        # 使用app的send_task发送任务
        return _partial(self.app.send_task, self['task'])
复制代码


signature实际上就是task的一种包装,比较特别的是它还可以使用 | 进行按位或运算,模拟管道的执行,比如:


>>> res = (add.si(2, 2) | add.si(4, 4) | add.si(8, 8))()
>>> res.get()
16
复制代码


这个功能的是因为signature使用了元编程,覆盖了__or__函数, 其部分实现如下:


def __or__(self, other):
    # These could be implemented in each individual class,
    # I'm sure, but for now we have this.
    ...
    elif isinstance(other, Signature):
        if isinstance(self, _chain):
            ...
            # chain | task -> chain
            return _chain(seq_concat_item(
                self.unchain_tasks(), other), app=self._app)
        # task | task -> chain
        return _chain(self, other, app=self._app)
    return NotImplemented
复制代码


这个过程大概分成2步:


  • 第一步add.si(2, 2) | add.si(4, 4) 是上是task | task,这时候会返回一个chain
  • 第二步是chain和add.si(8, 8)这样的task进行按位或,会继续返回一个chain


__or__函数中同样提供了group类型,chord类型的实现,原理类似。我们继续查看chain的实现:


class chain(_chain):
    def __new__(cls, *tasks, **kwargs):
        # This forces `chain(X, Y, Z)` to work the same way as `X | Y | Z`
        if not kwargs and tasks:
            if len(tasks) != 1 or is_list(tasks[0]):
                tasks = tasks[0] if len(tasks) == 1 else tasks
                return reduce(operator.or_, tasks, chain())
        return super().__new__(cls, *tasks, **kwargs)
复制代码


从注释可以理解chain(X, Y, Z),本质上就是X | Y | Z,通过reduce函数实现。


然后我们在跟踪一下第一个task的结果如何传递给第二个task主要在run函数和prepare_steps中:


def run(self, args=None, kwargs=None, group_id=None, chord=None,
        task_id=None, link=None, link_error=None, publisher=None,
        producer=None, root_id=None, parent_id=None, app=None, **options):
    ...
    tasks, results_from_prepare = self.prepare_steps(
            args, kwargs, self.tasks, root_id, parent_id, link_error, app,
            task_id, group_id, chord,
        )
    ...
    if results_from_prepare:
        first_task = tasks.pop()
        options = _prepare_chain_from_options(options, tasks, use_link)
        # 执行第一个任务
        result_from_apply = first_task.apply_async(**options)
        if not tasks:
            return result_from_apply
        else:
            # 依次返回下一个任务
            return results_from_prepare[0]
def prepare_steps(self, args, kwargs, tasks,
                      root_id=None, parent_id=None, link_error=None, app=None,
                      last_task_id=None, group_id=None, chord_body=None,
                      clone=True, from_dict=Signature.from_dict,
                      group_index=None):
    steps = deque(tasks)
    prev_task = None
    prev_res = None
    tasks, results = [], []
    i = 0
    while steps:
        ...
        task = steps_pop()
        res = task.freeze(root_id=root_id)
        ...
        # 把任务的结果串起来
        if prev_res and not prev_res.parent:
            prev_res.parent = res
        ...
        tasks.append(task)
        results.append(res)
        ...
    return tasks, results
复制代码


为什么每个task可以串起来呢,是因为freeze函数返回AsyncResult, 而AsyncResult是vine的promise,所以可以串起来执行:


def freeze(self, _id=None, group_id=None, chord=None,
           root_id=None, parent_id=None, group_index=None):
    opts = self.options
    try:
        tid = opts['task_id']
    except KeyError:
        tid = opts['task_id'] = _id or uuid()
    if root_id:
        opts['root_id'] = root_id
    if parent_id:
        opts['parent_id'] = parent_id
    ...
    return self.AsyncResult(tid)        
复制代码


chord和chunks的实现和chain差不多,就不再赘述了。


小结



本文我们了解了一些批量处理任务的方法:map/filter/reduce/pipeline。了解对函数进行partial和curry处理,都是利用闭包机制,提高函数的重用性。celery利用这些机制,提供了一些便捷使用的原语: chain, chord, chunks ... , 帮助应用组合和拆分任务构建任务的工作流,提高效率。同时celery作为一个分布式任务系统,也可以使用任务签名Signature的方式进行任务调用,将任务的生产者和消费者分离,降低系统耦合度,这对大型系统非常有用。


我们通过对celery的了解,相信大家对 分布式系统 有初步的认识,建立了一些 索引 ,需要研究的时候,会知道在那些地方进行深入的学习,我想这就达到了预期目的。


一点题外话: celery的源码解析是9月份开始,当时预计3~4篇,结果没有想到写了8篇,持续4个月。由于能力和精力有限,一些内容没有解析的很细致,不过还是打算暂时完结,本篇是系列的最后一篇。未到位的地方,以后有机会再补充。感谢大家付出的时间!


参考链接




目录
相关文章
|
21天前
|
监控 网络协议 Java
Tomcat源码解析】整体架构组成及核心组件
Tomcat,原名Catalina,是一款优雅轻盈的Web服务器,自4.x版本起扩展了JSP、EL等功能,超越了单纯的Servlet容器范畴。Servlet是Sun公司为Java编程Web应用制定的规范,Tomcat作为Servlet容器,负责构建Request与Response对象,并执行业务逻辑。
Tomcat源码解析】整体架构组成及核心组件
|
6天前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
10天前
|
开发工具
Flutter-AnimatedWidget组件源码解析
Flutter-AnimatedWidget组件源码解析
|
6天前
|
设计模式 Java 关系型数据库
【Java笔记+踩坑汇总】Java基础+JavaWeb+SSM+SpringBoot+SpringCloud+瑞吉外卖/谷粒商城/学成在线+设计模式+面试题汇总+性能调优/架构设计+源码解析
本文是“Java学习路线”专栏的导航文章,目标是为Java初学者和初中高级工程师提供一套完整的Java学习路线。
|
29天前
|
测试技术 Python
python自动化测试中装饰器@ddt与@data源码深入解析
综上所述,使用 `@ddt`和 `@data`可以大大简化写作测试用例的过程,让我们能专注于测试逻辑的本身,而无需编写重复的测试方法。通过讲解了 `@ddt`和 `@data`源码的关键部分,我们可以更深入地理解其背后的工作原理。
24 1
|
1月前
|
开发者 Python
深入解析Python `httpx`源码,探索现代HTTP客户端的秘密!
深入解析Python `httpx`源码,探索现代HTTP客户端的秘密!
68 1
|
1月前
|
算法 安全 Java
深入解析Java多线程:源码级别的分析与实践
深入解析Java多线程:源码级别的分析与实践
|
1月前
|
存储 NoSQL Redis
redis 6源码解析之 object
redis 6源码解析之 object
55 6
|
3月前
|
XML Java 数据格式
深度解析 Spring 源码:从 BeanDefinition 源码探索 Bean 的本质
深度解析 Spring 源码:从 BeanDefinition 源码探索 Bean 的本质
76 3
|
1月前
|
开发者 Python
深入解析Python `requests`库源码,揭开HTTP请求的神秘面纱!
深入解析Python `requests`库源码,揭开HTTP请求的神秘面纱!
123 1

热门文章

最新文章

推荐镜像

更多