Golang微服务框架Kratos应用分布式任务队列Machinery

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: go machinery是一个基于分布式消息分发的异步任务队列框架,类似python中常用celery框架,主要用于异步任务和定时任务。

Golang微服务框架Kratos应用分布式任务队列Machinery

任务队列(Task Queue) 一般用于跨线程或跨计算机分配工作的一种机制。其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务。

任务队列的输入是称为任务(Task)的工作单元。专用的工作进程不断监视任务队列以查找要执行的新工作。

在Golang语言里面,我们有像AsynqMachinery这样的类似于Celery的分布式任务队列。

什么是任务队列

消息队列(Message Queue),一般来说知道的人不少。比如常见的:kafka、Rabbitmq、RocketMQ等。

任务队列(Task Queue),听说过这个概念的人不会太多,清楚它的概念的人怕是更少。

这两个概念是有关系的,他们是怎样的关系呢?任务队列(Task Queue)是消息队列(Message Queue)的超集。任务队列是构建在消息队列之上的。消息队列是任务队列的一部分。

提起分布式任务队列(Distributed Task Queue),就不得不提PythonCelery。故而,下面我们来看Celery的架构图,以此来讲解。其他的任务队列也并不会与之有太大的差异性,基础的原理是一致的。

Celery架构图

Celery 的架构中,由多台 Server 发起异步任务(Async Task),发送任务到 Broker 的队列中,其中的 Celery Beat 进程可负责发起定时任务。当 Task 到达 Broker 后,会将其分发给相应的 Celery Worker 进行处理。当 Task 处理完成后,其结果存储至 Backend

在上述过程中的 BrokerBackendCelery 并没有去实现,而是使用了已有的开源实现,例如 RabbitMQ 作为 Broker 提供消息队列服务,Redis 作为 Backend 提供结果存储服务。Celery 就像是抽象了消息队列架构中 ProducerConsumer 的实现,将消息队列中基本单位“消息”抽象成了任务队列中的“任务”,并将异步、定时任务的发起和结果存储等操作进行了封装,让开发者可以忽略 AMQP、RabbitMQ 等实现细节,为开发带来便利。

综上所述,Celery 作为任务队列是基于消息队列的进一步封装,其实现依赖消息队列。

任务队列的应用场景

我们现在知道了任务队列是什么,也知道了它的工作原理。但是,我们并不知道它可以用来做什么。下面,我们就来看看,它到底用在什么样的场景下。

  1. 分布式任务:可以将任务分发到多个工作者进程或机器上执行,以提高任务处理速度。
  2. 定时任务:可以在指定时间执行任务。例如:每天定时备份数据、日志归档、心跳测试、运维巡检。支持 crontab 定时模式
  3. 后台任务:可以在后台执行耗时任务,例如图像处理、数据分析等,不影响用户界面的响应。
  4. 解耦任务:可以将任务与主程序解耦,以提高代码的可读性和可维护性,解耦应用程序最直接的好处就是可扩展性和并发性能的提高。支持并发执行任务,同时支持自动动态扩展。
  5. 实时处理:可以支持实时处理任务,例如即时通讯、消息队列等。

Machinery是什么?

go machinery是一个基于分布式消息分发的异步任务队列框架,类似python中常用celery框架,主要用于异步任务和定时任务。

Machinery的特性

  • 任务重试机制
  • 延迟任务支持
  • 任务回调机制
  • 任务结果记录
  • 支持Workflow模式:Chain,Group,Chord
  • 多Brokers支持:Redis, AMQP, AWS SQS……
  • 多Backends支持:Redis, Memcache, AMQP, MongoDB……

架构

任务队列,简而言之就是一个放大的生产者消费者模型,用户请求会生成任务,任务生产者不断的向队列中插入任务,同时,队列的处理器程序充当消费者不断的消费任务。

  • Server :业务主体,我们可以使用用server暴露的接口方法进行所有任务编排的操作。如果是简单的使用那么了解它就够了。
  • Broker :数据存储层接口,主要功能是将数据放入任务队列和取出,控制任务并发,延迟也在这层。
  • Worker:数据处理层结构,主要是操作 ServerBrokerBackend 进行任务的获取,执行,处理执行状态及结果等。
  • Task: 数据处理层,这一层包括TaskSignatureGroupChainChord等结构,主要是处理任务编排的逻辑。
  • Backend:数据持久化层接口,主要用于更新获取任务执行结果,状态等。

machinery_framework

Machinery基础工作流程

machinery-work-flow

Machinery 基本的工作流程如下:

  1. 由 Server 生成并发布任务,推送到 Broker 中
  2. Worker 通过 Key 向 Broker 订阅任务,当 Key 相同的任务到达时,Worker 消费任务
  3. Worker 执行任务
  4. Worker 将执行结果(终态:SUCCESS、FAILURE)存储至 Backend 模块

任务编排

Machinery一共提供了三种任务编排方式:

  • Groups:执行一组异步任务,任务之间互不影响。
  • Chord:执行一组同步任务,执行完成后,在调用一个回调函数。
  • Chain:执行一组同步任务,任务有次序之分,上个任务的出参可作为下个任务的入参。

Kratos下如何应用Machinery?

我们将分布式任务队列以transport.Server的形式整合进微服务框架Kratos

目前,go里面有两个分布式任务队列可用:

我已经对这两个库进行了支持:

Docker部署依赖组件

在本文中,我们仅使用Redis来做演示。因此,我们使用Docker的方式安装Redis的服务器:

docker pull bitnami/redis:latest

docker run -itd \
    --name redis-test \
    -p 6379:6379 \
    -e ALLOW_EMPTY_PASSWORD=yes \
    bitnami/redis:latest

安装依赖库

我们需要在项目中安装Asynq的依赖库:

go get -u github.com/tx7do/kratos-transport/transport/machinery

创建Kratos服务器

首先,我们要创建Server

package server

import (
    ...
    "github.com/tx7do/kratos-transport/transport/machinery"
)

// NewMachineryServer create a machinery server.
func NewMachineryServer(cfg *conf.Bootstrap, _ log.Logger, svc *service.TaskService) *machinery.Server {
   
    ctx := context.Background()

    srv := machinery.NewServer(
        machinery.WithBrokerAddress(cfg.Server.Machinery.Brokers, 0, machinery.BrokerTypeRedis),
        machinery.WithResultBackendAddress(cfg.Server.Machinery.Brokers, 0, machinery.BackendTypeRedis),
    )

    registerMachineryTasks(ctx, srv, svc)

    return srv
}

注册任务回调

然后,把回调函数注册进服务器:

const (
    testTask1        = "test_task_1"
    testDelayTask    = "test_delay_task"
    testPeriodicTask = "test_periodic_task"

    addTask         = "add"
    multiplyTask    = "multiply"
    sumIntTask      = "sum_ints"
    sumFloatTask    = "sum_floats"
    concatTask      = "concat"
    splitTask       = "split"
    panicTask       = "panic_task"
    longRunningTask = "long_running_task"
)

func registerMachineryTasks(ctx context.Context, srv *machinery.Server, svc *service.TaskService) {
   
    _ = srv.HandleFunc(testTask1, svc.HandleTask)
    _ = srv.HandleFunc(testDelayTask, svc.HandleDelayTask)
    _ = srv.HandleFunc(testPeriodicTask, svc.HandlePeriodicTask)
    _ = srv.HandleFunc(addTask, svc.HandleAdd)
    _ = srv.HandleFunc(multiplyTask, svc.HandleMultiply)
}

Machinery服务器注册到Kratos

接着,调用kratos.Server把Machinery服务器注册到Kratos里去:

func newApp(ll log.Logger, rr registry.Registrar, ks *machinery.Server) *kratos.App {
   
    return kratos.New(
        kratos.ID(Service.GetInstanceId()),
        kratos.Name(Service.Name),
        kratos.Version(Service.Version),
        kratos.Metadata(Service.Metadata),
        kratos.Logger(ll),
        kratos.Server(
            ks,
        ),
        kratos.Registrar(rr),
    )
}

实现任务回调方法

最后,我们就可以在Service里愉快的玩耍了:

package service

type TaskService struct {
   
    log          *log.Helper
}

func NewTaskService(
    logger log.Logger,
) *TaskService {
   
    l := log.NewHelper(log.With(logger, "module", "task/service/logger-service"))
    return &TaskService{
   
        log:          l,
        statusRepo:   statusRepo,
        realtimeRepo: realtimeRepo,
    }
}

func (s *TaskService) HandleTask1() error {
   
    fmt.Println("################ 执行任务Task1 #################")
    return nil
}

func (s *TaskService) HandleDelayTask() error {
   
    fmt.Println("################ 执行延迟任务DelayTask #################")
    return nil
}

func (s *TaskService) HandlePeriodicTask() error {
   
    fmt.Println("################ 执行周期任务PeriodicTask #################")
    return nil
}

func (s *TaskService) HandleAdd(args ...int64) (int64, error) {
   
    sum := int64(0)
    for _, arg := range args {
   
        sum += arg
    }
    fmt.Printf("sum: %d\n", sum)
    return sum, nil
}

func (s *TaskService) HandleMultiply(args ...int64) (int64, error) {
   
    sum := int64(1)
    for _, arg := range args {
   
        sum *= arg
    }
    fmt.Printf("mulptiply: %d\n", sum)
    return sum, nil
}

创建新任务

现在,我们万事俱备,只欠任务了。

普通任务

err = srv.NewTask(sumTask, machinery.WithArgument("int64", 1))

延迟任务

// 延迟5秒执行任务
err = srv.NewTask(testDelayTask, machinery.WithDelayTime(time.Now().UTC().Add(time.Second*5)))

周期性任务

需要注意的是,周期性任务的精度只能到分钟级

// 每分钟执行一次
err = srv.NewPeriodicTask("*/1 * * * ?", testPeriodicTask)

Group(分组任务)工作流

// add(1, 1)
// add(5, 5)
// (1 + 1) = 2
// (5 + 5) = 10

err = srv.NewGroup(
    machinery.WithTask(addTask, machinery.WithArgument("int64", 1), machinery.WithArgument("int64", 1)),
    machinery.WithTask(addTask, machinery.WithArgument("int64", 5), machinery.WithArgument("int64", 5)),
)

Chord(和弦任务)工作流

// multiply(add(1, 1), add(5, 5))
// (1 + 1) * (5 + 5) = 2 * 10 = 20

err = srv.NewChord(
    machinery.WithTask(addTask, machinery.WithArgument("int64", 1), machinery.WithArgument("int64", 1)),
    machinery.WithTask(addTask, machinery.WithArgument("int64", 5), machinery.WithArgument("int64", 5)),
    machinery.WithTask(multiplyTask),
)

Chain(链式任务)工作流

// multiply(4, add(5, 5, add(1, 1)))
//   4 * (5 + 5 + (1 + 1))   # task1: add(1, 1)        returns 2
// = 4 * (5 + 5 + 2)         # task2: add(5, 5, 2)     returns 12
// = 4 * (12)                # task3: multiply(4, 12)  returns 48
// = 48

err = srv.NewChain(
    machinery.WithTask(addTask, machinery.WithArgument("int64", 1), machinery.WithArgument("int64", 1)),
    machinery.WithTask(addTask, machinery.WithArgument("int64", 5), machinery.WithArgument("int64", 5)),
    machinery.WithTask(multiplyTask, machinery.WithArgument("int64", 4)),
)

示例代码

示例代码可以在单元测试代码中找到:https://github.com/tx7do/kratos-transport/tree/main/transport/machinery/server_test.go

参考资料

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
1月前
|
项目管理 微服务
云效常见问题之将多个微服务应用集成到一次研发流程中发布上线如何解决
云效(CloudEfficiency)是阿里云提供的一套软件研发效能平台,旨在通过工程效能、项目管理、质量保障等工具与服务,帮助企业提高软件研发的效率和质量。本合集是云效使用中可能遇到的一些常见问题及其答案的汇总。
28 0
|
8天前
|
Kubernetes Cloud Native Go
《Go 简易速速上手小册》第10章:微服务与云原生应用(2024 最新版)(下)
《Go 简易速速上手小册》第10章:微服务与云原生应用(2024 最新版)
43 0
|
8天前
|
Cloud Native 算法 Go
《Go 简易速速上手小册》第10章:微服务与云原生应用(2024 最新版)(上)
《Go 简易速速上手小册》第10章:微服务与云原生应用(2024 最新版)
32 0
|
14天前
|
Kubernetes 监控 Cloud Native
构建高效云原生应用:基于Kubernetes的微服务治理实践
【4月更文挑战第13天】 在当今数字化转型的浪潮中,企业纷纷将目光投向了云原生技术以支持其业务敏捷性和可扩展性。本文深入探讨了利用Kubernetes作为容器编排平台,实现微服务架构的有效治理,旨在为开发者和运维团队提供一套优化策略,以确保云原生应用的高性能和稳定性。通过分析微服务设计原则、Kubernetes的核心组件以及实际案例,本文揭示了在多变的业务需求下,如何确保系统的高可用性、弹性和安全性。
17 4
|
19天前
|
SpringCloudAlibaba Java Nacos
SpringCloud Alibaba微服务 -- Nacos使用以及注册中心和配置中心的应用(保姆级)
SpringCloud Alibaba微服务 -- Nacos使用以及注册中心和配置中心的应用(保姆级)
|
6月前
|
存储 编译器 Go
Golang 语言的多种变量声明方式和使用场景
Golang 语言的多种变量声明方式和使用场景
32 0
|
1月前
|
SQL 前端开发 Go
编程笔记 GOLANG基础 001 为什么要学习Go语言
编程笔记 GOLANG基础 001 为什么要学习Go语言
|
6月前
|
存储 JSON Go
Golang 语言 gRPC 服务怎么同时支持 gRPC 和 HTTP 客户端调用?
Golang 语言 gRPC 服务怎么同时支持 gRPC 和 HTTP 客户端调用?
76 0
|
3月前
|
物联网 Go 网络性能优化
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式【1月更文挑战第21天】【1月更文挑战第104篇】
101 1
|
1天前
|
安全 Go 开发者
Golang深入浅出之-Go语言并发编程面试:Goroutine简介与创建
【4月更文挑战第22天】Go语言的Goroutine是其并发模型的核心,是一种轻量级线程,能低成本创建和销毁,支持并发和并行执行。创建Goroutine使用`go`关键字,如`go sayHello("Alice")`。常见问题包括忘记使用`go`关键字、不正确处理通道同步和关闭、以及Goroutine泄漏。解决方法包括确保使用`go`启动函数、在发送完数据后关闭通道、设置Goroutine退出条件。理解并掌握这些能帮助开发者编写高效、安全的并发程序。
10 1