微服务实践之分布式定时任务

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 微服务实践之分布式定时任务

承接上篇:上篇文章讲到改造 go-zero 生成的 app module 中的 gateway & RPC 。本篇讲讲如何接入 异步任务 以及 log的使用

Delay Job

日常任务开放中,我们会有很多异步、批量、定时、延迟任务要处理,go-zero中有 go-queue,推荐使用 go-queue 去处理,go-queue 本身也是基于 go-zero 开发的,其本身是有两种模式:

  • dq: 依赖于beanstalkd ,分布式,可存储,延迟、定时设置,关机重启可以重新执行,消息不会丢,使用非常简单,go-queue中使用了redis setnx保证了每个消息只被消费一次,使用场景主要是用来做日常任务使用
  • kq:依赖于 kafka ,这个就不多介绍啦,大名鼎鼎的 kafka ,使用场景主要是做日志用

我们主要说一下dq,kq使用也一样的,只是依赖底层不同,如果没使用过beanstalkd,没接触过beanstalkd的可以先google一下,使用起来还是挺容易的。

我在jobs下使用goctl新建了一个message-job.api服务

info(
 title: //消息任务
 desc: // 消息任务
 author: "Mikael"
 email: "13247629622@163.com"
)
type BatchSendMessageReq {}
type BatchSendMessageResp {}
service message-job-api {
 @handler batchSendMessageHandler // 批量发送短信
 post batchSendMessage(BatchSendMessageReq) returns(BatchSendMessageResp)
}

因为不需要使用路由,所以handler下的routes.go被我删除了,在handler下新建了一个jobRun.go,内容如下:

package handler
import (
 "fishtwo/lib/xgo"
 "fishtwo/app/jobs/message/internal/svc"
)
/**
* @Description 启动job
* @Author Mikael
* @Date 2021/1/18 12:05
* @Version 1.0
**/
func JobRun(serverCtx *svc.ServiceContext)  {
 xgo.Go(func() {
  batchSendMessageHandler(serverCtx)
    //...many job
 })
}

其实xgo.Go就是 go batchSendMessageHandler(serverCtx) ,封装了一下go携程,防止野生goroutine panic

然后修改一下启动文件message-job.go

package main
import (
   "flag"
   "fmt"
   "fishtwo/app/jobs/message/internal/config"
   "fishtwo/app/jobs/message/internal/handler"
   "fishtwo/app/jobs/message/internal/svc"
   "github.com/tal-tech/go-zero/core/conf"
   "github.com/tal-tech/go-zero/rest"
)
var configFile = flag.String("f", "etc/message-job-api.yaml", "the config file")
func main() {
   flag.Parse()
   var c config.Config
   conf.MustLoad(*configFile, &c)
   ctx := svc.NewServiceContext(c)
   server := rest.MustNewServer(c.RestConf)
   defer server.Stop()
   handler.JobRun(ctx)
   fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
   server.Start()
}

主要是handler.RegisterHandlers(server, ctx) 修改为handler.JobRun(ctx)

接下来,我们就可以引入dq了,首先在etc/xxx.yaml下添加dqConf

.....
DqConf:
  Beanstalks:
    - Endpoint: 127.0.0.1:7771
      Tube: tube1
    - Endpoint: 127.0.0.1:7772
      Tube: tube2
  Redis:
    Host: 127.0.0.1:6379
    Type: node

我这里本地用不同端口,模拟开了2个节点,7771、7772

在internal/config/config.go添加配置解析对象

type Config struct {
 ....
 DqConf dq.DqConf
}

修改handler/batchsendmessagehandler.go

package handler
import (
 "context"
 "fishtwo/app/jobs/message/internal/logic"
 "fishtwo/app/jobs/message/internal/svc"
 "github.com/tal-tech/go-zero/core/logx"
)
func batchSendMessageHandler(ctx *svc.ServiceContext){
 rootCxt:= context.Background()
 l := logic.NewBatchSendMessageLogic(context.Background(), ctx)
 err := l.BatchSendMessage()
 if err != nil{
  logx.WithContext(rootCxt).Error("【JOB-ERR】 : %+v ",err)
 }
}

修改logic下batchsendmessagelogic.go,写我们的consumer消费逻辑

package logic
import (
   "context"
   "fishtwo/app/jobs/message/internal/svc"
   "fmt"
   "github.com/tal-tech/go-zero/core/logx"
)
type BatchSendMessageLogic struct {
   logx.Logger
   ctx    context.Context
   svcCtx *svc.ServiceContext
}
func NewBatchSendMessageLogic(ctx context.Context, svcCtx *svc.ServiceContext) BatchSendMessageLogic {
   return BatchSendMessageLogic{
    Logger: logx.WithContext(ctx),
    ctx:    ctx,
    svcCtx: svcCtx,
   }
}
func (l *BatchSendMessageLogic) BatchSendMessage() error {
   fmt.Println("job BatchSendMessage start")
   l.svcCtx.Consumer.Consume(func(body []byte) {
    fmt.Printf("job BatchSendMessage %s \n" + string(body))
   })
   fmt.Printf("job BatchSendMessage finish \n")
   return nil
}

这样就大功告成了,启动message-job.go就ok课

go run message-job.go

之后我们就可以在业务代码中向dq添加任务,它就可以自动消费了

producer.Delay 向dq中投递5个延迟任务:

producer := dq.NewProducer([]dq.Beanstalk{
  {
   Endpoint: "localhost:7771",
   Tube:     "tube1",
  },
  {
   Endpoint: "localhost:7772",
   Tube:     "tube2",
  },
 })
 for i := 1000; i < 1005; i++ {
  _, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second * 1)
  if err != nil {
   fmt.Println(err)
  }
 }

producer.At 可以指定某个时间执行,非常好用,感兴趣的朋友自己可以研究下。

错误日志

在前面说到gateway改造时候,如果眼神好的童鞋,在上面的httpresult.go中已经看到了log的身影:

我们在来看下rpc中怎么处理的

是的,我在每个rpc启动的main中加入了grpc拦截器 https://www.yuque.com/tal-tech/go-zero/ttzlo1,那让我们看看grpc拦截器里面做了什么

然后我代码里面使用github/pkg/errors这个包去处理错误的,这个包还是很好用的

所以呢:

我们在 grpc 中打印日志 logx.WithContext(ctx).Errorf("[RPC-SRV-ERR] %+v",err)

api 中打印日志 logx.WithContext(r.Context()).Error("[GATEWAY-SRV-ERR] : %+v ",err)

go-zero 中打印日志,使用logx.WithContext会把trace-id带入,这样一个请求下来,比如

user-api --> user-srv --> message-srv

那如果 messsage-srv 出错,他们三个是同一个 trace-id ,是不是就可以在elk通过输入这个trace-id一次性搜索出来这条请求报错堆栈信息呢?当然你也可以接入 jaeger、zipkin、skywalking 等,这个我暂时还没接入。

框架地址

https://github.com/tal-tech/go-zero

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
11天前
|
运维 Kubernetes 调度
阿里云容器服务 ACK One 分布式云容器企业落地实践
阿里云容器服务ACK提供强大的产品能力,支持弹性、调度、可观测、成本治理和安全合规。针对拥有IDC或三方资源的企业,ACK One分布式云容器平台能够有效解决资源管理、多云多集群管理及边缘计算等挑战,实现云上云下统一管理,提升业务效率与稳定性。
|
18天前
|
运维 监控 Java
后端开发中的微服务架构实践与挑战####
在数字化转型加速的今天,微服务架构凭借其高度的灵活性、可扩展性和可维护性,成为众多企业后端系统构建的首选方案。本文深入探讨了微服务架构的核心概念、实施步骤、关键技术考量以及面临的主要挑战,旨在为开发者提供一份实用的实践指南。通过案例分析,揭示微服务在实际项目中的应用效果,并针对常见问题提出解决策略,帮助读者更好地理解和应对微服务架构带来的复杂性与机遇。 ####
|
18天前
|
机器学习/深度学习 存储 运维
分布式机器学习系统:设计原理、优化策略与实践经验
本文详细探讨了分布式机器学习系统的发展现状与挑战,重点分析了数据并行、模型并行等核心训练范式,以及参数服务器、优化器等关键组件的设计与实现。文章还深入讨论了混合精度训练、梯度累积、ZeRO优化器等高级特性,旨在提供一套全面的技术解决方案,以应对超大规模模型训练中的计算、存储及通信挑战。
48 4
|
18天前
|
算法 NoSQL Java
微服务架构下的接口限流策略与实践#### 一、
本文旨在探讨微服务架构下,面对高并发请求时如何有效实施接口限流策略,以保障系统稳定性和服务质量。不同于传统的摘要概述,本文将从实际应用场景出发,深入剖析几种主流的限流算法(如令牌桶、漏桶及固定窗口计数器等),通过对比分析它们的优缺点,并结合具体案例,展示如何在Spring Cloud Gateway中集成自定义限流方案,实现动态限流规则调整,为读者提供一套可落地的实践指南。 #### 二、
42 3
|
16天前
|
消息中间件 运维 安全
后端开发中的微服务架构实践与挑战####
在数字化转型的浪潮中,微服务架构凭借其高度的灵活性和可扩展性,成为众多企业重构后端系统的首选方案。本文将深入探讨微服务的核心概念、设计原则、关键技术选型及在实际项目实施过程中面临的挑战与解决方案,旨在为开发者提供一套实用的微服务架构落地指南。我们将从理论框架出发,逐步深入至技术细节,最终通过案例分析,揭示如何在复杂业务场景下有效应用微服务,提升系统的整体性能与稳定性。 ####
31 1
|
18天前
|
监控 安全 持续交付
构建高效微服务架构:策略与实践####
在数字化转型的浪潮中,微服务架构凭借其高度解耦、灵活扩展和易于维护的特点,成为现代企业应用开发的首选。本文深入探讨了构建高效微服务架构的关键策略与实战经验,从服务拆分的艺术到通信机制的选择,再到容器化部署与持续集成/持续部署(CI/CD)的实践,旨在为开发者提供一套全面的微服务设计与实现指南。通过具体案例分析,揭示如何避免常见陷阱,优化系统性能,确保系统的高可用性与可扩展性,助力企业在复杂多变的市场环境中保持竞争力。 ####
35 2
|
18天前
|
消息中间件 运维 API
后端开发中的微服务架构实践####
本文深入探讨了微服务架构在后端开发中的应用,从其定义、优势到实际案例分析,全面解析了如何有效实施微服务以提升系统的可维护性、扩展性和灵活性。不同于传统摘要的概述性质,本摘要旨在激发读者对微服务架构深度探索的兴趣,通过提出问题而非直接给出答案的方式,引导读者深入
37 1
|
19天前
|
负载均衡 监控 API
后端开发中的微服务架构实践与挑战
本文深入探讨了微服务架构在后端开发中的应用,分析了其优势和面临的挑战,并通过案例分析提出了相应的解决策略。微服务架构以其高度的可扩展性和灵活性,成为现代软件开发的重要趋势。然而,它同时也带来了服务间通信、数据一致性等问题。通过实际案例的剖析,本文旨在为开发者提供有效的微服务实施指导,以优化系统性能和用户体验。
|
19天前
|
弹性计算 Kubernetes API
构建高效后端服务:微服务架构的深度剖析与实践####
本文深入探讨了微服务架构的核心理念、设计原则及实现策略,旨在为开发者提供一套系统化的方法论,助力其构建灵活、可扩展且易于维护的后端服务体系。通过案例分析与实战经验分享,揭示了微服务在提升开发效率、优化资源利用及增强系统稳定性方面的关键作用。文章首先概述了微服务架构的基本概念,随后详细阐述了其在后端开发中的应用优势与面临的挑战,最后结合具体实例,展示了如何从零开始规划并实施一个基于微服务的后端项目。 ####
|
20天前
|
存储 运维 数据可视化
如何为微服务实现分布式日志记录
如何为微服务实现分布式日志记录
35 1
下一篇
DataWorks