Kratos微服务框架下实现分布式任务队列

本文涉及的产品
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 提起分布式任务队列(Distributed Task Queue),就不得不提Python的Celery。而Asynq和Machinery就是GO当中类似于Celery的分布式任务队列。

Kratos微服务框架下实现分布式任务队列

任务队列(Task Queue)一般用于线程或计算机之间分配工作的一种机制。其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务。

提起分布式任务队列(Distributed Task Queue),就不得不提Python的Celery。而Asynq和Machinery就是GO当中类似于Celery的分布式任务队列。

什么是任务队列

消息队列(Message Queue),一般来说知道的人不少。比如常见的:kafka、Rabbitmq、RocketMQ等。

任务队列(Task Queue),听说过这个概念的人不会太多,清楚它的概念的人怕是更少。

这两个概念是有关系的,他们是怎样的关系呢?任务队列(Task Queue)是消息队列(Message Queue)的超集。任务队列是构建在消息队列之上的。消息队列是任务队列的一部分。

下面我们来看Celery的架构图,以此来讲解。其他的任务队列也并不会与之有太大的差异性,至少原理是一致的。

celery_framework.png

在 Celery 的架构中,由多台 Server 发起异步任务(Async Task),发送任务到 Broker 的队列中,其中的 Celery Beat 进程可负责发起定时任务。当 Task 到达 Broker 后,会将其分发给相应的 Celery Worker 进行处理。当 Task 处理完成后,其结果存储至 Backend。

在上述过程中的 Broker 和 Backend,Celery 没有实现,而是使用了现有开源实现,例如 RabbitMQ 作为 Broker 提供消息队列服务,Redis 作为 Backend 提供结果存储服务。Celery 就像是抽象了消息队列架构中 Producer、Consumer 的实现,将消息队列中基本单位“消息”抽象成了任务队列中的“任务”,并将异步、定时任务的发起和结果存储等操作进行了封装,让开发者可以忽略 AMQP、RabbitMQ 等实现细节,为开发带来便利。

综上所述,Celery 作为任务队列是基于消息队列的进一步封装,其实现依赖消息队列。

任务队列的应用场景

  • 即时响应需求:网页的响应时间是用户体验的关键,Amazon 曾指出响应时间每提高 100ms,他们的收入便会增加 1%。对于一些需要长时间执行的任务,大多会采用异步调用的方式来释放用户操作。Celery 的异步调用特性,和前端使用 Ajax 异步加载类似,能够有效缩短响应时间。
  • 周期性任务需求(Periodic Task):对于心跳测试、日志归档、运维巡检这类指定时间周期执行的任务,可以应用任务队列的定时队列,支持 crontab 定时模式,简单方便。
  • 高并发及可扩展性需求:解耦应用程序最直接的好处就是可扩展性和并发性能的提高。支持并发执行任务,同时支持自动动态扩展。

Kratos下实现分布式任务队列

我们将分布式任务队列以transport.Server的形式整合进微服务框架Kratos。

Asynq

Asynq是一个go语言实现的分布式任务队列和异步处理库,基于Redis。类似于Python的Celery。作者Ken Hibino,任职于Google。

特点

  • 保证至少执行一次任务
  • 任务写入Redis后可以持久化
  • 任务失败之后,会自动重试
  • worker崩溃自动恢复
  • 可是实现任务的优先级
  • 任务可以进行编排
  • 任务可以设定执行时间或者最长可执行的时间
  • 支持中间件
  • 可以使用 unique-option 来避免任务重复执行,实现唯一性
  • 支持 Redis Cluster 和 Redis Sentinels 以达成高可用性
  • 作者提供了Web UI & CLI Tool让大家查看任务的执行情况

安装命令行工具

go install github.com/hibiken/asynq/tools/asynq

Docker安装Web UI

docker pull hibiken/asynqmon:latest

docker run -d \
    --name asynq \
    -p 8080:8080 \
    hibiken/asynqmon:latest --redis-addr=host.docker.internal:6379

管理后台:http://localhost:8080

  • 仪表盘

asynq_web_ui_dashboard.png

  • 任务视图

asynq_web_ui_task_view.png

  • 性能

asynq_web_ui_metrics.png

创建Kratos服务

import github.com/tx7do/kratos-transport/transport/asynq

const (
    localRedisAddr = "127.0.0.1:6379"

    testTask1        = "test_task_1"
    testDelayTask    = "test_task_delay"
    testPeriodicTask = "test_periodic_task"
)

ctx := context.Background()

srv := asynq.NewServer(
    asynq.WithAddress(localRedisAddr),
)

if err := srv.Start(ctx); err != nil {
    panic(err)
}

defer srv.Stop(ctx)

创建新任务

  • 普通任务
// 最多重试3次,10秒超时,20秒后过期
err = srv.NewTask(testTask1, []byte("test string"),
    asynq.MaxRetry(10),
    asynq.Timeout(10*time.Second),
    asynq.Deadline(time.Now().Add(20*time.Second)))
  • 延迟任务
err = srv.NewTask(testDelayTask, []byte("delay task"), asynq.ProcessIn(3*time.Second))
  • 周期性任务
// 每分钟执行一次
err = srv.NewPeriodicTask("*/1 * * * ?", testPeriodicTask, []byte("periodic task"))

注册任务回调

func handleTask(_ context.Context, task *asynq.Task) error {
    log.Infof("Task Type: [%s], Payload: [%s]", task.Type(), string(task.Payload()))
    return nil
}

func handleDelayTask(_ context.Context, task *asynq.Task) error {
    log.Infof("Delay Task Type: [%s], Payload: [%s]", task.Type(), string(task.Payload()))
    return nil
}

func handlePeriodicTask(_ context.Context, task *asynq.Task) error {
    log.Infof("Periodic Task Type: [%s], Payload: [%s]", task.Type(), string(task.Payload()))
    return nil
}

err := srv.HandleFunc(testTask1, handleTask)
err = srv.HandleFunc(testTaskDelay, handleDelayTask)
err = srv.HandleFunc(testPeriodicTask, handlePeriodicTask)

示例代码

示例代码可以在单元测试代码中找到:https://github.com/tx7do/kratos-transport/tree/main/transport/asynq/server_test.go

Machinery

go machinery框架类似python中常用celery框架,主要用于异步任务和定时任务。

特性

  • 任务重试机制
  • 延迟任务支持
  • 任务回调机制
  • 任务结果记录
  • 支持Workflow模式:Chain,Group,Chord
  • 多Brokers支持:Redis, AMQP, AWS SQS
  • 多Backends支持:Redis, Memcache, AMQP, MongoDB

架构

任务队列,简而言之就是一个放大的生产者消费者模型,用户请求会生成任务,任务生产者不断的向队列中插入任务,同时,队列的处理器程序充当消费者不断的消费任务。

  • Server :业务主体,我们可以使用用server暴露的接口方法进行所有任务编排的操作。如果是简单的使用那么了解它就够了。
  • Broker :数据存储层接口,主要功能是将数据放入任务队列和取出,控制任务并发,延迟也在这层。
  • Backend:数据存储层接口,主要用于更新获取任务执行结果,状态等。
  • Worker:数据处理层结构,主要是操作 Server、Broker、Backend 进行任务的获取,执行,处理执行状态及结果等。
  • Task: 数据处理层,这一层包括Task、Signature、Group、Chain、Chord等结构,主要是处理任务编排的逻辑。

任务编排

Machinery一共提供了三种任务编排方式:

  • Groups : 执行一组异步任务,任务之间互不影响。
  • Chord:执行一组同步任务,执行完成后,在调用一个回调函数。
  • Chain:执行一组同步任务,任务有次序之分,上个任务的出参可作为下个任务的入参。

创建Kratos服务

import github.com/tx7do/kratos-transport/transport/machinery

const (
    localRedisAddr = "127.0.0.1:6379"

    testTask1        = "test_task_1"
    testDelayTask    = "test_delay_task"
    testPeriodicTask = "test_periodic_task"
    sumTask          = "sum_task"
)

ctx := context.Background()

srv := machinery.NewServer(
    machinery.WithRedisAddress([]string{localRedisAddr}, []string{localRedisAddr}),
)

if err := srv.Start(ctx); err != nil {
    panic(err)
}

defer srv.Stop(ctx)

创建新任务

  • 普通任务
var args = map[string]interface{}{}
args["int64"] = 1
err = srv.NewTask(sumTask, args)
  • 延迟任务
// 延迟5秒执行任务
var args = map[string]interface{}{}
err = srv.NewTask(testDelayTask, args, WithDelayTime(time.Now().UTC().Add(time.Second*5)))
  • 周期性任务(需要注意的是,延迟任务的精度只能到秒级)
var args = map[string]interface{}{}
// 每分钟执行一次
err = srv.NewPeriodicTask("*/1 * * * ?", testPeriodicTask, args)

注册任务回调


func handleTask(_ context.Context, task *asynq.Task) error {
    log.Infof("Task Type: [%s], Payload: [%s]", task.Type(), string(task.Payload()))
    return nil
}

func handleDelayTask(_ context.Context, task *asynq.Task) error {
    log.Infof("Task Type: [%s], Payload: [%s]", task.Type(), string(task.Payload()))
    return nil
}

func handleAdd(args ...int64) (int64, error) {
    sum := int64(0)
    for _, arg := range args {
        sum += arg
    }
    fmt.Printf("sum: %d\n", sum)
    return sum, nil
}

func handlePeriodicTask() error {
    fmt.Println("################ 执行周期任务PeriodicTask #################")
    return nil
}

err = srv.HandleFunc(testTask1, handleTask)
err = srv.HandleFunc(testTaskDelay, handleDelayTask)
err = srv.HandleFunc(testPeriodicTask, handlePeriodicTask)
err = srv.HandleFunc(sumTask, handleAdd)

示例代码

示例代码可以在单元测试代码中找到:https://github.com/tx7do/kratos-transport/tree/main/transport/machinery/server_test.go

参考资料

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
24天前
|
XML JSON API
ServiceStack:不仅仅是一个高性能Web API和微服务框架,更是一站式解决方案——深入解析其多协议支持及简便开发流程,带您体验前所未有的.NET开发效率革命
【10月更文挑战第9天】ServiceStack 是一个高性能的 Web API 和微服务框架,支持 JSON、XML、CSV 等多种数据格式。它简化了 .NET 应用的开发流程,提供了直观的 RESTful 服务构建方式。ServiceStack 支持高并发请求和复杂业务逻辑,安装简单,通过 NuGet 包管理器即可快速集成。示例代码展示了如何创建一个返回当前日期的简单服务,包括定义请求和响应 DTO、实现服务逻辑、配置路由和宿主。ServiceStack 还支持 WebSocket、SignalR 等实时通信协议,具备自动验证、自动过滤器等丰富功能,适合快速搭建高性能、可扩展的服务端应用。
86 3
|
2月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
2月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
5天前
|
机器学习/深度学习 自然语言处理 并行计算
DeepSpeed分布式训练框架深度学习指南
【11月更文挑战第6天】随着深度学习模型规模的日益增大,训练这些模型所需的计算资源和时间成本也随之增加。传统的单机训练方式已难以应对大规模模型的训练需求。
28 3
|
9天前
|
机器学习/深度学习 并行计算 Java
谈谈分布式训练框架DeepSpeed与Megatron
【11月更文挑战第3天】随着深度学习技术的不断发展,大规模模型的训练需求日益增长。为了应对这种需求,分布式训练框架应运而生,其中DeepSpeed和Megatron是两个备受瞩目的框架。本文将深入探讨这两个框架的背景、业务场景、优缺点、主要功能及底层实现逻辑,并提供一个基于Java语言的简单demo例子,帮助读者更好地理解这些技术。
28 2
|
21天前
|
Dubbo Java 应用服务中间件
Dubbo学习圣经:从入门到精通 Dubbo3.0 + SpringCloud Alibaba 微服务基础框架
尼恩团队的15大技术圣经,旨在帮助开发者系统化、体系化地掌握核心技术,提升技术实力,从而在面试和工作中脱颖而出。本文介绍了如何使用Dubbo3.0与Spring Cloud Gateway进行整合,解决传统Dubbo架构缺乏HTTP入口的问题,实现高性能的微服务网关。
|
21天前
|
消息中间件 存储 负载均衡
微服务与分布式系统设计看这篇就够了!
【10月更文挑战第12天】 在现代软件架构中,微服务和分布式系统设计已经成为构建可扩展、灵活和可靠应用程序的主流方法。本文将深入探讨微服务架构的核心概念、设计原则和挑战,并提供一些关于如何在分布式系统中实现微服务的实用指导。
42 2
|
21天前
|
人工智能 文字识别 Java
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
尼恩,一位拥有20年架构经验的老架构师,通过其深厚的架构功力,成功指导了一位9年经验的网易工程师转型为大模型架构师,薪资逆涨50%,年薪近80W。尼恩的指导不仅帮助这位工程师在一年内成为大模型架构师,还让他管理起了10人团队,产品成功应用于多家大中型企业。尼恩因此决定编写《LLM大模型学习圣经》系列,帮助更多人掌握大模型架构,实现职业跃迁。该系列包括《从0到1吃透Transformer技术底座》、《从0到1精通RAG架构》等,旨在系统化、体系化地讲解大模型技术,助力读者实现“offer直提”。此外,尼恩还分享了多个技术圣经,如《NIO圣经》、《Docker圣经》等,帮助读者深入理解核心技术。
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
|
30天前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
42 1
|
2月前
|
数据采集 分布式计算 MaxCompute
MaxCompute 分布式计算框架 MaxFrame 服务正式商业化公告
MaxCompute 分布式计算框架 MaxFrame 服务于北京时间2024年09月27日正式商业化!
74 3