微服务从代码到k8s部署应有尽有系列(八、各种队列)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 微服务从代码到k8s部署应有尽有系列(八、各种队列)

我们用一个系列来讲解从需求到上线、从代码到k8s部署、从日志到监控等各个方面的微服务完整实践。

整个项目使用了go-zero开发的微服务,基本包含了go-zero以及相关go-zero作者开发的一些中间件,所用到的技术栈基本是go-zero项目组的自研组件,基本是go-zero全家桶了。

实战项目地址:https://github.com/Mikaelemmmm/go-zero-looklook

1、概述

消息队列有很多种,有rabbitmq、rocketmq、kafka等常用的,其中go-queue(https://github.com/zeromicro/go-queue)是go-zero官方开发的消息队列组件,其中分为2类,一种是kq、一种是dq,kq是基于kafka的消息队列,dq是基于beanstalkd的延迟队列,但是go-queue不支持定时任务。具体想更多了解go-queue的我之前也写过一篇教程可以去看一下这里不细说了。

本项目采用的是go-queue做消息队列,asynq做延迟队列、定时队列

为什么使用asynq的几个原因

  • 直接基于redis,一般项目都有redis,而asynq本身就是基于redis所以可以少维护一个中间件
  • 支持消息队列、延迟队列、定时任务调度 , 因为希望项目支持定时任务而asynq直接就支持
  • 有webui界面,每个任务都可以暂停、归档、通过ui界面查看成功失败、监控

为什么asynq支持消息队列还在使用go-queue?

  • kafka的吞吐是业绩出名的,如果前期量不大可以直接用asynq
  • 没啥目的,就是想给你们演示一下go-queue

在我们使用go-zero的时候,goctl给我们带了很大的便利,但是目前go-zero只有生成api、rpc,很多同学在群里问定时任务、延迟队列、消息队列如何生成,目录结构该怎样做,其实go-zero是为我们设计好了的,就是serviceGroup,使用serviceGroup管理你的服务。

2、如何使用

在前面订单、消息等场景我们其实已经演示过了,这里再额外单独补充一次

我们还是拿order-mq来举例子,显然使用goctl生成api、rpc不是我们想要的,那我们就自己使用serviceGroup改造,目录结构还是延续api的基本差不多,只是将handler改成了listen , 将logic换成了mqs。

2.1 在main中代码如下

var configFile = flag.String("f", "etc/order.yaml", "Specify the config file")
func main() {
    flag.Parse()
    var c config.Config
    conf.MustLoad(*configFile, &c)
    // log, prometheus, trace, metricsUrl
    if err := c.SetUp(); err != nil {
        panic(err)
    }
    serviceGroup := service.NewServiceGroup()
    defer serviceGroup.Stop()
    for _, mq := range listen.Mqs(c) {
        serviceGroup.Add(mq)
    }
    serviceGroup.Start()
}
  • 首先我们要定义配置以及解析配置。
  • 其次为什么我们要在这里加SetUp而api、rpc不需要呢?因为api、rpc都是在MustNewServer中已经框架写的,但是我们用serviceGroup管理没有,可以手动点进去SetUp看看,这个方法中包含了log、prometheus、trace、metricsUrl的定义,一个方法可以省很多事情,这样我们直接修改配置文件就可以实现日志、监控、链路追踪了。
  • 接下来就是go-zero的serivceGroup管理服务了,serviceGroup是用来管理一组service的,那service其实就是一个接口,代码如下
    Service (代码在go-zero/core/service/servicegroup.go)
// Service is the interface that groups Start and Stop methods.
Service interface {
    Starter // Start
    Stopper // Stop
}
  • 所以,只要你的服务实现了这两个接口,就可以加入到serviceGroup统一管理
    那可以看到我们把所有的mq都实现这个接口,然后统一放到都 list.Mqs中,在启动服务即可

2.2 mq分类管理

go-zero-looklook/app/order/cmd/mq/internal/listen目录下代码

该目录下代码是统一管理不同类型mq,因为我们要管理kq、asynq可能后续还有rabbitmq、rocketmq等等,所以在这里做了分类方便维护

统一管理在go-zero-looklook/app/order/cmd/mq/internal/listen/listen.go,然后在main中调用listen.Mqs可以获取所有mq一起start

// 返回所有消费者
func Mqs(c config.Config) []service.Service {
    svcContext := svc.NewServiceContext(c)
    ctx := context.Background()
    var services []service.Service
    // kq :消息队列.
    services = append(services, KqMqs(c, ctx, svcContext)...)
    // asynq:延迟队列、定时任务
    services = append(services, AsynqMqs(c, ctx, svcContext)...)
    // other mq ....
    return services
}

go-zero-looklook/app/order/cmd/mq/internal/listen/asynqMqs.go就是定义的asynq

// asynq
// 定时任务、延迟任务
func AsynqMqs(c config.Config, ctx context.Context, svcContext *svc.ServiceContext) []service.Service {
   return []service.Service{
      // 监听延迟队列
      deferMq.NewAsynqTask(ctx, svcContext),
      // 监听定时任务
   }
}

go-zero-looklook/app/order/cmd/mq/internal/listen/asynqMqs.go就是定义的kq (go-queue的kafka)

// kq
// 消息队列
func KqMqs(c config.Config, ctx context.Context, svcContext *svc.ServiceContext) []service.Service {
    return []service.Service{
        // 监听消费流水状态变更
        kq.MustNewQueue(c.PaymentUpdateStatusConf, kqMq.NewPaymentUpdateStatusMq(ctx, svcContext)),
        // .....
    }
}

2.3 实际业务

编写实际业务,我们就在go-zero-looklook/app/order/cmd/mq/internal/listen/mqs下,这里为了方便维护,也是做了分类

  • deferMq : 延迟队列
  • kq:消息队列

2.3.1 延迟队列

// 监听关闭订单
type AsynqTask struct {
   ctx    context.Context
   svcCtx *svc.ServiceContext
}
func NewAsynqTask(ctx context.Context, svcCtx *svc.ServiceContext) *AsynqTask {
   return &AsynqTask{
      ctx:    ctx,
      svcCtx: svcCtx,
   }
}
func (l *AsynqTask) Start() {
   fmt.Println("AsynqTask start ")
   srv := asynq.NewServer(
      asynq.RedisClientOpt{Addr: l.svcCtx.Config.Redis.Host, Password: l.svcCtx.Config.Redis.Pass},
      asynq.Config{
         Concurrency: 10,
         Queues: map[string]int{
            "critical": 6,
            "default":  3,
            "low":      1,
         },
      },
   )
   mux := asynq.NewServeMux()
   // 关闭民宿订单任务
   mux.HandleFunc(asynqmq.TypeHomestayOrderCloseDelivery, l.closeHomestayOrderStateMqHandler)
   if err := srv.Run(mux); err != nil {
      log.Fatalf("could not run server: %v", err)
   }
}
func (l *AsynqTask) Stop() {
   fmt.Println("AsynqTask stop")
}

因为 asynq 要先启动,然后定义路由任务,所以我们在asynqTask.go中做了统一的路由管理,之后我们每个业务都单独的在deferMq的文件夹下面定义一个文件(如“延迟关闭订单:closeHomestayOrderState.go”),这样每个业务一个文件,跟go-zero的api、rpc的logic一样,维护很方便

closeHomestayOrderState.go 关闭订单逻辑

package deferMq
import (
    "context"
    "encoding/json"
    "looklook/app/order/cmd/rpc/order"
    "looklook/app/order/model"
    "looklook/common/asynqmq"
    "looklook/common/xerr"
    "github.com/hibiken/asynq"
    "github.com/pkg/errors"
)
func (l *AsynqTask) closeHomestayOrderStateMqHandler(ctx context.Context, t *asynq.Task) error {
    var p asynqmq.HomestayOrderCloseTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return errors.Wrapf(xerr.NewErrMsg("解析asynq task payload err"), "closeHomestayOrderStateMqHandler payload err:%v, payLoad:%+v", err, t.Payload())
    }
    resp, err := l.svcCtx.OrderRpc.HomestayOrderDetail(ctx, &order.HomestayOrderDetailReq{
        Sn: p.Sn,
    })
    if err != nil || resp.HomestayOrder == nil {
        return errors.Wrapf(xerr.NewErrMsg("获取订单失败"), "closeHomestayOrderStateMqHandler 获取订单失败 or 订单不存在 err:%v, sn:%s ,HomestayOrder : %+v", err, p.Sn, resp.HomestayOrder)
    }
    if resp.HomestayOrder.TradeState == model.HomestayOrderTradeStateWaitPay {
        _, err := l.svcCtx.OrderRpc.UpdateHomestayOrderTradeState(ctx, &order.UpdateHomestayOrderTradeStateReq{
            Sn:         p.Sn,
            TradeState: model.HomestayOrderTradeStateCancel,
        })
        if err != nil {
            return errors.Wrapf(xerr.NewErrMsg("关闭订单失败"), "closeHomestayOrderStateMqHandler 关闭订单失败  err:%v, sn:%s ", err, p.Sn)
        }
    }
    return nil
}

2.3.2 kq消息队列

看go-zero-looklook/app/order/cmd/mq/internal/mqs/kq文件夹下,因为kq跟asynq不太一样,它本身就是使用go-zero的Service管理的,已经实现了starter、stopper接口了,所以我们在/Users/seven/Developer/goenv/go-zero-looklook/app/order/cmd/mq/internal/listen/kqMqs.go中直接定义好一个go-queue业务扔给serviceGroup,去交给main启动就好了 , 我们的业务代码只需要实现go-queue的Consumer直接写我们自己业务即可。

1)/Users/seven/Developer/goenv/go-zero-looklook/app/order/cmd/mq/internal/listen/kqMqs.go

func KqMqs(c config.Config, ctx context.Context, svcContext *svc.ServiceContext) []service.Service {
    return []service.Service{
        // 监听消费流水状态变更
        kq.MustNewQueue(c.PaymentUpdateStatusConf, kqMq.NewPaymentUpdateStatusMq(ctx, svcContext)),
        // .....
    }
}

可以看到kq.MustNewQueue本身返回就是 queue.MessageQueue , queue.MessageQueue又实现了Start、Stop

2)业务中

/Users/seven/Developer/goenv/go-zero-looklook/app/order/cmd/mq/internal/mqs/kq/paymentUpdateStatus.go

func (l *PaymentUpdateStatusMq) Consume(_, val string) error {
    fmt.Printf(" PaymentUpdateStatusMq Consume val : %s \n", val)
    // 解析数据
    var message kqueue.ThirdPaymentUpdatePayStatusNotifyMessage
    if err := json.Unmarshal([]byte(val), &message); err != nil {
        logx.WithContext(l.ctx).Error("PaymentUpdateStatusMq->Consume Unmarshal err : %v , val : %s", err, val)
        return err
    }
    // 执行业务..
    if err := l.execService(message); err != nil {
        logx.WithContext(l.ctx).Error("PaymentUpdateStatusMq->execService  err : %v , val : %s , message:%+v", err, val, message)
        return err
    }
    return nil
}

我们在paymentUpdateStatus.go中只需要实现接口Consume 就可以接受来自kq传过来的kafka的消息了,我们只管在我们Consumer中处理我们业务即可

3、定时任务

关于定时任务,目前go-zero-looklook没有使用,这里我也说明一下

  • 如果你想简单一点直接使用cron(裸机、k8s都有),
  • 如果稍微复杂一点可以使用https://github.com/robfig/cron包,在代码中定义时间
  • 使用 xxl-job、gocron 分布式定时任务系统接入
  • asynq 的 shedule

这里因为项目用的asynq,我就演示一下asynq的shedule吧

分为client与server , client用来定义调度时间,server是到了时间接受client的消息触发来执行我们写的业务的,实际业务我们应该写在server,client用来定义业务调度时间的

asynqtest/docker-compose.yml

version: '3'
services:
  #asynqmon asynq延迟队列、定时队列的webui
  asynqmon:
    image: hibiken/asynqmon:latest
    container_name: asynqmon_asynq
    ports:
      - 8980:8080
    command:
      - '--redis-addr=redis:6379'
      - '--redis-password=G62m50oigInC30sf'
    restart: always
    networks:
      - asynqtest_net
    depends_on:
      - redis
  
  #redis容器
  redis:
    image: redis:6.2.5
    container_name: redis_asynq
    ports:
      - 63779:6379
    environment:
      # 时区上海
      TZ: Asia/Shanghai
    volumes:
      # 数据文件
      - ./data/redis/data:/data:rw
    command: "redis-server --requirepass G62m50oigInC30sf  --appendonly yes"
    privileged: true
    restart: always
    networks:
      - asynqtest_net
networks:
  asynqtest_net:
    driver: bridge
    ipam:
      config:
        - subnet: 172.22.0.0/16

asynqtest/shedule/client/client.go

package main
import (
    "asynqtest/tpl"
    "encoding/json"
    "log"
    "github.com/hibiken/asynq"
)
const redisAddr = "127.0.0.1:63779"
const redisPwd = "G62m50oigInC30sf"
func main() {
    // 周期性任务
    scheduler := asynq.NewScheduler(
        asynq.RedisClientOpt{
            Addr:     redisAddr,
            Password: redisPwd,
        }, nil)
    payload, err := json.Marshal(tpl.EmailPayload{Email: "546630576@qq.com", Content: "发邮件呀"})
    if err != nil {
        log.Fatal(err)
    }
    task := asynq.NewTask(tpl.EMAIL_TPL, payload)
    // 每隔1分钟同步一次
    entryID, err := scheduler.Register("*/1 * * * *", task)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("registered an entry: %q\n", entryID)
    if err := scheduler.Run(); err != nil {
        log.Fatal(err)
    }
}

asynqtest/shedule/server/server.go

package main
import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "asynqtest/tpl"
    "github.com/hibiken/asynq"
)
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "127.0.0.1:63779", Password: "G62m50oigInC30sf"},
        asynq.Config{
            Concurrency: 10,
            Queues: map[string]int{
                "critical": 6,
                "default":  3,
                "low":      1,
            },
        },
    )
    mux := asynq.NewServeMux()
    // 关闭民宿订单任务
    mux.HandleFunc(tpl.EMAIL_TPL, emailMqHandler)
    if err := srv.Run(mux); err != nil {
        log.Fatalf("could not run server: %v", err)
    }
}
func emailMqHandler(ctx context.Context, t *asynq.Task) error {
    var p tpl.EmailPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return fmt.Errorf("emailMqHandler err:%+v", err)
    }
    fmt.Printf("p : %+v \n", p)
    return nil
}

asynqtest/tpl/tpl.go

package tpl
const EMAIL_TPL = "schedule:email"
type EmailPayload struct {
    Email   string
    Content string
}

启动 server.goclient.go

浏览器输入http://127.0.0.1:8980/schedulers这里 可以看到所有client定义的任务

浏览器输入http://127.0.0.1:8990/这里可以看到我们的server消费请

控制台消费情况

说一下asynq的shedule在集成到项目中的思路,可以单独启动一个服务作为调度client定义系统的定时任务调度管理,将server定义在每个业务自己的mq的asynq一起即可。

4、结尾

在这一节中,我们学会使用了消息队列、延迟队列 ,kafka可以通过管理工具去查看,至于asynq查看webui在go-zero-looklook/docker-compose-env.yml中我们已经启动好了asynqmon,直接使用http://127.0.0.1:8980 即可查看

项目地址

https://github.com/zeromicro/go-zero

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
21天前
|
关系型数据库 MySQL Docker
《docker高级篇(大厂进阶):5.Docker-compose容器编排》包括是什么能干嘛去哪下、Compose核心概念、Compose使用三个步骤、Compose常用命令、Compose编排微服务
《docker高级篇(大厂进阶):5.Docker-compose容器编排》包括是什么能干嘛去哪下、Compose核心概念、Compose使用三个步骤、Compose常用命令、Compose编排微服务
84 24
|
13天前
|
存储 Kubernetes 容器
K8S部署nexus
该配置文件定义了Nexus 3的Kubernetes部署,包括PersistentVolumeClaim、Deployment和服务。PVC请求20Gi存储,使用NFS存储类。Deployment配置了一个Nexus 3容器,内存限制为6G,CPU为1000m,并挂载数据卷。Service类型为NodePort,通过30520端口对外提供服务。所有资源位于`nexus`命名空间中。
|
23天前
|
关系型数据库 MySQL Docker
《docker高级篇(大厂进阶):5.Docker-compose容器编排》包括是什么能干嘛去哪下、Compose核心概念、Compose使用三个步骤、Compose常用命令、Compose编排微服务
《docker高级篇(大厂进阶):5.Docker-compose容器编排》包括是什么能干嘛去哪下、Compose核心概念、Compose使用三个步骤、Compose常用命令、Compose编排微服务
107 6
|
2月前
|
Kubernetes Cloud Native 微服务
云原生入门与实践:Kubernetes的简易部署
云原生技术正改变着现代应用的开发和部署方式。本文将引导你了解云原生的基础概念,并重点介绍如何使用Kubernetes进行容器编排。我们将通过一个简易的示例来展示如何快速启动一个Kubernetes集群,并在其上运行一个简单的应用。无论你是云原生新手还是希望扩展现有知识,本文都将为你提供实用的信息和启发性的见解。
|
2月前
|
Kubernetes Cloud Native 持续交付
容器化、Kubernetes与微服务架构的融合
容器化、Kubernetes与微服务架构的融合
48 1
|
2月前
|
监控 安全 持续交付
构建高效的微服务架构:从设计到部署
构建高效的微服务架构:从设计到部署
26 1
|
2月前
|
监控 持续交付 Docker
Docker 容器化部署在微服务架构中的应用有哪些?
Docker 容器化部署在微服务架构中的应用有哪些?
|
2月前
|
监控 持续交付 Docker
Docker容器化部署在微服务架构中的应用
Docker容器化部署在微服务架构中的应用
|
2月前
|
安全 持续交付 Docker
微服务架构和 Docker 容器化部署的优点是什么?
微服务架构和 Docker 容器化部署的优点是什么?
|
2月前
|
存储 监控 Docker
探索微服务架构下的容器化部署
本文旨在深入探讨微服务架构下容器化部署的关键技术与实践,通过分析Docker容器技术如何促进微服务的灵活部署和高效管理,揭示其在现代软件开发中的重要性。文章将重点讨论容器化技术的优势、面临的挑战以及最佳实践策略,为读者提供一套完整的理论与实践相结合的指导方案。