Golang微服务框架Kratos应用分布式任务队列Machinery

简介: go machinery是一个基于分布式消息分发的异步任务队列框架,类似python中常用celery框架,主要用于异步任务和定时任务。

Golang微服务框架Kratos应用分布式任务队列Machinery

任务队列(Task Queue) 一般用于跨线程或跨计算机分配工作的一种机制。其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务。

任务队列的输入是称为任务(Task)的工作单元。专用的工作进程不断监视任务队列以查找要执行的新工作。

在Golang语言里面,我们有像AsynqMachinery这样的类似于Celery的分布式任务队列。

什么是任务队列

消息队列(Message Queue),一般来说知道的人不少。比如常见的:kafka、Rabbitmq、RocketMQ等。

任务队列(Task Queue),听说过这个概念的人不会太多,清楚它的概念的人怕是更少。

这两个概念是有关系的,他们是怎样的关系呢?任务队列(Task Queue)是消息队列(Message Queue)的超集。任务队列是构建在消息队列之上的。消息队列是任务队列的一部分。

提起分布式任务队列(Distributed Task Queue),就不得不提PythonCelery。故而,下面我们来看Celery的架构图,以此来讲解。其他的任务队列也并不会与之有太大的差异性,基础的原理是一致的。

Celery架构图

Celery 的架构中,由多台 Server 发起异步任务(Async Task),发送任务到 Broker 的队列中,其中的 Celery Beat 进程可负责发起定时任务。当 Task 到达 Broker 后,会将其分发给相应的 Celery Worker 进行处理。当 Task 处理完成后,其结果存储至 Backend

在上述过程中的 BrokerBackendCelery 并没有去实现,而是使用了已有的开源实现,例如 RabbitMQ 作为 Broker 提供消息队列服务,Redis 作为 Backend 提供结果存储服务。Celery 就像是抽象了消息队列架构中 ProducerConsumer 的实现,将消息队列中基本单位“消息”抽象成了任务队列中的“任务”,并将异步、定时任务的发起和结果存储等操作进行了封装,让开发者可以忽略 AMQP、RabbitMQ 等实现细节,为开发带来便利。

综上所述,Celery 作为任务队列是基于消息队列的进一步封装,其实现依赖消息队列。

任务队列的应用场景

我们现在知道了任务队列是什么,也知道了它的工作原理。但是,我们并不知道它可以用来做什么。下面,我们就来看看,它到底用在什么样的场景下。

  1. 分布式任务:可以将任务分发到多个工作者进程或机器上执行,以提高任务处理速度。
  2. 定时任务:可以在指定时间执行任务。例如:每天定时备份数据、日志归档、心跳测试、运维巡检。支持 crontab 定时模式
  3. 后台任务:可以在后台执行耗时任务,例如图像处理、数据分析等,不影响用户界面的响应。
  4. 解耦任务:可以将任务与主程序解耦,以提高代码的可读性和可维护性,解耦应用程序最直接的好处就是可扩展性和并发性能的提高。支持并发执行任务,同时支持自动动态扩展。
  5. 实时处理:可以支持实时处理任务,例如即时通讯、消息队列等。

Machinery是什么?

go machinery是一个基于分布式消息分发的异步任务队列框架,类似python中常用celery框架,主要用于异步任务和定时任务。

Machinery的特性

  • 任务重试机制
  • 延迟任务支持
  • 任务回调机制
  • 任务结果记录
  • 支持Workflow模式:Chain,Group,Chord
  • 多Brokers支持:Redis, AMQP, AWS SQS……
  • 多Backends支持:Redis, Memcache, AMQP, MongoDB……

架构

任务队列,简而言之就是一个放大的生产者消费者模型,用户请求会生成任务,任务生产者不断的向队列中插入任务,同时,队列的处理器程序充当消费者不断的消费任务。

  • Server :业务主体,我们可以使用用server暴露的接口方法进行所有任务编排的操作。如果是简单的使用那么了解它就够了。
  • Broker :数据存储层接口,主要功能是将数据放入任务队列和取出,控制任务并发,延迟也在这层。
  • Worker:数据处理层结构,主要是操作 ServerBrokerBackend 进行任务的获取,执行,处理执行状态及结果等。
  • Task: 数据处理层,这一层包括TaskSignatureGroupChainChord等结构,主要是处理任务编排的逻辑。
  • Backend:数据持久化层接口,主要用于更新获取任务执行结果,状态等。

machinery_framework

Machinery基础工作流程

machinery-work-flow

Machinery 基本的工作流程如下:

  1. 由 Server 生成并发布任务,推送到 Broker 中
  2. Worker 通过 Key 向 Broker 订阅任务,当 Key 相同的任务到达时,Worker 消费任务
  3. Worker 执行任务
  4. Worker 将执行结果(终态:SUCCESS、FAILURE)存储至 Backend 模块

任务编排

Machinery一共提供了三种任务编排方式:

  • Groups:执行一组异步任务,任务之间互不影响。
  • Chord:执行一组同步任务,执行完成后,在调用一个回调函数。
  • Chain:执行一组同步任务,任务有次序之分,上个任务的出参可作为下个任务的入参。

Kratos下如何应用Machinery?

我们将分布式任务队列以transport.Server的形式整合进微服务框架Kratos

目前,go里面有两个分布式任务队列可用:

我已经对这两个库进行了支持:

Docker部署依赖组件

在本文中,我们仅使用Redis来做演示。因此,我们使用Docker的方式安装Redis的服务器:

docker pull bitnami/redis:latest

docker run -itd \
    --name redis-test \
    -p 6379:6379 \
    -e ALLOW_EMPTY_PASSWORD=yes \
    bitnami/redis:latest

安装依赖库

我们需要在项目中安装Asynq的依赖库:

go get -u github.com/tx7do/kratos-transport/transport/machinery

创建Kratos服务器

首先,我们要创建Server

package server

import (
    ...
    "github.com/tx7do/kratos-transport/transport/machinery"
)

// NewMachineryServer create a machinery server.
func NewMachineryServer(cfg *conf.Bootstrap, _ log.Logger, svc *service.TaskService) *machinery.Server {
   
    ctx := context.Background()

    srv := machinery.NewServer(
        machinery.WithBrokerAddress(cfg.Server.Machinery.Brokers, 0, machinery.BrokerTypeRedis),
        machinery.WithResultBackendAddress(cfg.Server.Machinery.Brokers, 0, machinery.BackendTypeRedis),
    )

    registerMachineryTasks(ctx, srv, svc)

    return srv
}

注册任务回调

然后,把回调函数注册进服务器:

const (
    testTask1        = "test_task_1"
    testDelayTask    = "test_delay_task"
    testPeriodicTask = "test_periodic_task"

    addTask         = "add"
    multiplyTask    = "multiply"
    sumIntTask      = "sum_ints"
    sumFloatTask    = "sum_floats"
    concatTask      = "concat"
    splitTask       = "split"
    panicTask       = "panic_task"
    longRunningTask = "long_running_task"
)

func registerMachineryTasks(ctx context.Context, srv *machinery.Server, svc *service.TaskService) {
   
    _ = srv.HandleFunc(testTask1, svc.HandleTask)
    _ = srv.HandleFunc(testDelayTask, svc.HandleDelayTask)
    _ = srv.HandleFunc(testPeriodicTask, svc.HandlePeriodicTask)
    _ = srv.HandleFunc(addTask, svc.HandleAdd)
    _ = srv.HandleFunc(multiplyTask, svc.HandleMultiply)
}

Machinery服务器注册到Kratos

接着,调用kratos.Server把Machinery服务器注册到Kratos里去:

func newApp(ll log.Logger, rr registry.Registrar, ks *machinery.Server) *kratos.App {
   
    return kratos.New(
        kratos.ID(Service.GetInstanceId()),
        kratos.Name(Service.Name),
        kratos.Version(Service.Version),
        kratos.Metadata(Service.Metadata),
        kratos.Logger(ll),
        kratos.Server(
            ks,
        ),
        kratos.Registrar(rr),
    )
}

实现任务回调方法

最后,我们就可以在Service里愉快的玩耍了:

package service

type TaskService struct {
   
    log          *log.Helper
}

func NewTaskService(
    logger log.Logger,
) *TaskService {
   
    l := log.NewHelper(log.With(logger, "module", "task/service/logger-service"))
    return &TaskService{
   
        log:          l,
        statusRepo:   statusRepo,
        realtimeRepo: realtimeRepo,
    }
}

func (s *TaskService) HandleTask1() error {
   
    fmt.Println("################ 执行任务Task1 #################")
    return nil
}

func (s *TaskService) HandleDelayTask() error {
   
    fmt.Println("################ 执行延迟任务DelayTask #################")
    return nil
}

func (s *TaskService) HandlePeriodicTask() error {
   
    fmt.Println("################ 执行周期任务PeriodicTask #################")
    return nil
}

func (s *TaskService) HandleAdd(args ...int64) (int64, error) {
   
    sum := int64(0)
    for _, arg := range args {
   
        sum += arg
    }
    fmt.Printf("sum: %d\n", sum)
    return sum, nil
}

func (s *TaskService) HandleMultiply(args ...int64) (int64, error) {
   
    sum := int64(1)
    for _, arg := range args {
   
        sum *= arg
    }
    fmt.Printf("mulptiply: %d\n", sum)
    return sum, nil
}

创建新任务

现在,我们万事俱备,只欠任务了。

普通任务

err = srv.NewTask(sumTask, machinery.WithArgument("int64", 1))

延迟任务

// 延迟5秒执行任务
err = srv.NewTask(testDelayTask, machinery.WithDelayTime(time.Now().UTC().Add(time.Second*5)))

周期性任务

需要注意的是,周期性任务的精度只能到分钟级

// 每分钟执行一次
err = srv.NewPeriodicTask("*/1 * * * ?", testPeriodicTask)

Group(分组任务)工作流

// add(1, 1)
// add(5, 5)
// (1 + 1) = 2
// (5 + 5) = 10

err = srv.NewGroup(
    machinery.WithTask(addTask, machinery.WithArgument("int64", 1), machinery.WithArgument("int64", 1)),
    machinery.WithTask(addTask, machinery.WithArgument("int64", 5), machinery.WithArgument("int64", 5)),
)

Chord(和弦任务)工作流

// multiply(add(1, 1), add(5, 5))
// (1 + 1) * (5 + 5) = 2 * 10 = 20

err = srv.NewChord(
    machinery.WithTask(addTask, machinery.WithArgument("int64", 1), machinery.WithArgument("int64", 1)),
    machinery.WithTask(addTask, machinery.WithArgument("int64", 5), machinery.WithArgument("int64", 5)),
    machinery.WithTask(multiplyTask),
)

Chain(链式任务)工作流

// multiply(4, add(5, 5, add(1, 1)))
//   4 * (5 + 5 + (1 + 1))   # task1: add(1, 1)        returns 2
// = 4 * (5 + 5 + 2)         # task2: add(5, 5, 2)     returns 12
// = 4 * (12)                # task3: multiply(4, 12)  returns 48
// = 48

err = srv.NewChain(
    machinery.WithTask(addTask, machinery.WithArgument("int64", 1), machinery.WithArgument("int64", 1)),
    machinery.WithTask(addTask, machinery.WithArgument("int64", 5), machinery.WithArgument("int64", 5)),
    machinery.WithTask(multiplyTask, machinery.WithArgument("int64", 4)),
)

示例代码

示例代码可以在单元测试代码中找到:https://github.com/tx7do/kratos-transport/tree/main/transport/machinery/server_test.go

参考资料

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6月前
|
存储 安全 Java
管理 Spring 微服务中的分布式会话
在微服务架构中,管理分布式会话是确保用户体验一致性和系统可扩展性的关键挑战。本文探讨了在 Spring 框架下实现分布式会话管理的多种方法,包括集中式会话存储和客户端会话存储(如 Cookie),并分析了它们的优缺点。同时,文章还涵盖了与分布式会话相关的安全考虑,如数据加密、令牌验证、安全 Cookie 政策以及服务间身份验证。此外,文中强调了分布式会话在提升系统可扩展性、增强可用性、实现数据一致性及优化资源利用方面的显著优势。通过合理选择会话管理策略,结合 Spring 提供的强大工具,开发人员可以在保证系统鲁棒性的同时,提供无缝的用户体验。
126 0
|
7月前
|
监控 Java API
Spring Boot 3.2 结合 Spring Cloud 微服务架构实操指南 现代分布式应用系统构建实战教程
Spring Boot 3.2 + Spring Cloud 2023.0 微服务架构实践摘要 本文基于Spring Boot 3.2.5和Spring Cloud 2023.0.1最新稳定版本,演示现代微服务架构的构建过程。主要内容包括: 技术栈选择:采用Spring Cloud Netflix Eureka 4.1.0作为服务注册中心,Resilience4j 2.1.0替代Hystrix实现熔断机制,配合OpenFeign和Gateway等组件。 核心实操步骤: 搭建Eureka注册中心服务 构建商品
1106 3
|
12月前
|
数据采集 存储 数据可视化
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
1312 0
分布式爬虫框架Scrapy-Redis实战指南
|
11月前
|
人工智能 安全 Java
智慧工地源码,Java语言开发,微服务架构,支持分布式和集群部署,多端覆盖
智慧工地是“互联网+建筑工地”的创新模式,基于物联网、移动互联网、BIM、大数据、人工智能等技术,实现对施工现场人员、设备、材料、安全等环节的智能化管理。其解决方案涵盖数据大屏、移动APP和PC管理端,采用高性能Java微服务架构,支持分布式与集群部署,结合Redis、消息队列等技术确保系统稳定高效。通过大数据驱动决策、物联网实时监测预警及AI智能视频监控,消除数据孤岛,提升项目可控性与安全性。智慧工地提供专家级远程管理服务,助力施工质量和安全管理升级,同时依托可扩展平台、多端应用和丰富设备接口,满足多样化需求,推动建筑行业数字化转型。
378 5
|
10月前
|
安全 JavaScript 前端开发
HarmonyOS NEXT~HarmonyOS 语言仓颉:下一代分布式开发语言的技术解析与应用实践
HarmonyOS语言仓颉是华为专为HarmonyOS生态系统设计的新型编程语言,旨在解决分布式环境下的开发挑战。它以“编码创造”为理念,具备分布式原生、高性能与高效率、安全可靠三大核心特性。仓颉语言通过内置分布式能力简化跨设备开发,提供统一的编程模型和开发体验。文章从语言基础、关键特性、开发实践及未来展望四个方面剖析其技术优势,助力开发者掌握这一新兴工具,构建全场景分布式应用。
911 35
|
10月前
|
监控 Java 调度
SpringBoot中@Scheduled和Quartz的区别是什么?分布式定时任务框架选型实战
本文对比分析了SpringBoot中的`@Scheduled`与Quartz定时任务框架。`@Scheduled`轻量易用,适合单机简单场景,但存在多实例重复执行、无持久化等缺陷;Quartz功能强大,支持分布式调度、任务持久化、动态调整和失败重试,适用于复杂企业级需求。文章通过特性对比、代码示例及常见问题解答,帮助开发者理解两者差异,合理选择方案。记住口诀:单机简单用注解,多节点上Quartz;若是任务要可靠,持久化配置不能少。
882 4
|
10月前
|
人工智能 数据可视化 JavaScript
颠覆开发效率!国内首个微服务编排框架Juggle开源啦!
Juggle是国内首个开源的微服务编排框架,专注于解决企业微服务进程中接口重复开发、系统对接复杂等问题。它提供零代码、低代码和AI增强功能,通过可视化拖拽快速组装简单API为复杂接口,支持多协议、多语言脚本和流程多版本管理。相比国外框架如Conductor,Juggle更贴合国内需求,具备高效开发、企业级可靠性及信创适配等优势,助力企业实现敏捷创新与数字化转型。
颠覆开发效率!国内首个微服务编排框架Juggle开源啦!
|
9月前
|
分布式计算 Java 大数据
Java 大视界 —— 基于 Java 的大数据分布式计算在气象数据处理与天气预报中的应用进展(176)
本文围绕基于 Java 的大数据分布式计算在气象数据处理与天气预报中的应用展开,剖析行业现状与挑战,阐释技术原理,介绍其在数据处理及天气预报中的具体应用,并结合实际案例展示实施效果。
Java 大视界 -- 基于 Java 的大数据分布式存储在视频监控数据管理中的应用优化(170)
本文围绕基于 Java 的大数据分布式存储在视频监控数据管理中的应用展开,分析管理现状与挑战,阐述技术应用,结合案例和代码给出实操方案。
|
11月前
|
Cloud Native Serverless 流计算
云原生时代的应用架构演进:从微服务到 Serverless 的阿里云实践
云原生技术正重塑企业数字化转型路径。阿里云作为亚太领先云服务商,提供完整云原生产品矩阵:容器服务ACK优化启动速度与镜像分发效率;MSE微服务引擎保障高可用性;ASM服务网格降低资源消耗;函数计算FC突破冷启动瓶颈;SAE重新定义PaaS边界;PolarDB数据库实现存储计算分离;DataWorks简化数据湖构建;Flink实时计算助力风控系统。这些技术已在多行业落地,推动效率提升与商业模式创新,助力企业在数字化浪潮中占据先机。
559 12

推荐镜像

更多