go-micro集成RabbitMQ实战和原理

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: go-micro集成RabbitMQ实战和原理

在go-micro中异步消息的收发是通过Broker这个组件来完成的,底层实现有RabbitMQ、Kafka、Redis等等很多种方式,这篇文章主要介绍go-micro使用RabbitMQ收发数据的方法和原理。

Broker的核心功能

Broker的核心功能是Publish和Subscribe,也就是发布和订阅。它们的定义是:

Publish(topic string, m *Message, opts ...PublishOption) error
Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)

发布

发布第一个参数是topic(主题),用于标识某类消息。

发布的数据是通过Message承载的,其包括消息头和消息体,定义如下:

type Message struct {
  Header map[string]string
  Body   []byte
}

消息头是map,也就是一组KV(键值对)。

消息体是字节数组,在发送和接收时需要开发者进行编码和解码的处理。

订阅

订阅的第一个参数也是topic(主题),用于过滤出要接收的消息。

订阅的数据是通过Handler处理的,Handler是一个函数,其定义如下:


type Event interface {
  Topic() string
  Message() *Message
  Ack() error
  Error() error
}
  • Topic() 用于获取当前消息的topic,也是发布者发送时的topic。
  • Message() 用于获取消息体,也是发布者发送时的Message,其中包括Header和Body。
  • Ack() 用于通知Broker消息已经收到了,Broker可以删除消息了,可用来保证消息至少被消费一次。
  • Error() 用于获取Broker处理消息过成功的错误。

开发者订阅数据时,需要实现Handler这个函数,接收Event的实例,提取数据进行处理,根据不同的Broker,可能还需要调用Ack(),处理出现错误时,返回error。

go-micro集成RabbitMQ实战

大概了解了Broker的定义之后,再来看下如何使用go-micro收发RabbitMQ消息。

启动一个RabbitMQ

如果你已经有一个RabbitMQ服务器,请跳过这个步骤。

这里介绍一个使用docker快速启动RabbitMQ的方法,当然前提是你得安装了docker。

执行如下命令启动一个rabbitmq的docker容器:

docker run --name rabbitmq1 -p 5672:5672 -p 15672:15672 -d rabbitmq
rabbitmq-plugins enable rabbitmq_management
cd /etc/rabbitmq/conf.d/
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf

最后重启容器:

docker restart rabbitmq1

最后浏览器中输入 http://127.0.0.0:15672 即可访问,默认用户名和密码都是 guest 。

编写收发函数

为了方便演示,先来定义发布消息和接收消息的函数。其中发布函数使用了go-micro提供的Event类型,还有其它类型也可以提供Publish的功能,这里发送的数据格式是Json字符串。接收消息的函数名称可以随意取,但是参数和返回值必须符合规范,也就是下边代码中的样子,这个函数也可以是绑定到某个类型的。

// 定义一个发布消息的函数:每隔1秒发布一条消息
func loopPublish(event micro.Event) {
  for {
    time.Sleep(time.Duration(1) * time.Second)
    curUnix := strconv.FormatInt(time.Now().Unix(), 10)
    msg := "{\"Id\":" + curUnix + ",\"Name\":\"张三\"}"
    event.Publish(context.TODO(), msg)
  }
}
// 定义一个接收消息的函数:将收到的消息打印出来
func handle(ctx context.Context, msg interface{}) (err error) {
  defer func() {
    if r := recover(); r != nil {
      err = errors.New(fmt.Sprint(r))
      log.Println(err)
    }
  }()
  b, err := json.Marshal(msg)
  if err != nil {
    log.Println(err)
    return
  }
  log.Println(string(b))
  return
}

编写主体代码

这里先给出代码,里面提供了一些注释,后边还会有详细介绍。

func main() {
  // RabbitMQ的连接参数
  rabbitmqUrl := "amqp://guest:guest@127.0.0.1:5672/"
  exchangeName := "amq.topic"
  subcribeTopic := "test"
  queueName := "rabbitmqdemo_test"
  // 默认是application/protobuf,这里演示用的是Json,所以要改下
  server.DefaultContentType = "application/json"
  // 创建 RabbitMQ Broker
  b := rabbitmq.NewBroker(
    broker.Addrs(rabbitmqUrl),           // RabbitMQ访问地址,含VHost
    rabbitmq.ExchangeName(exchangeName), // 交换机的名称
    rabbitmq.DurableExchange(),          // 消息在Exchange中时会进行持久化处理
    rabbitmq.PrefetchCount(1),           // 同时消费的最大消息数量
  )
  // 创建Service,内部会初始化一些东西,必须在NewSubscribeOptions前边
  service := micro.NewService(
    micro.Broker(b),
  )
  service.Init()
  // 初始化订阅上下文:这里不是必需的,订阅会有默认值
  subOpts := broker.NewSubscribeOptions(
    rabbitmq.DurableQueue(),   // 队列持久化,消费者断开连接后,消息仍然保存到队列中
    rabbitmq.RequeueOnError(), // 消息处理函数返回error时,消息再次入队列
    rabbitmq.AckOnSuccess(),   // 消息处理函数没有error返回时,go-micro发送Ack给RabbitMQ
  )
  // 注册订阅
  micro.RegisterSubscriber(
    subcribeTopic,    // 订阅的Topic
    service.Server(), // 注册到的rpcServer
    handle,           // 消息处理函数
    server.SubscriberContext(subOpts.Context), // 订阅上下文,也可以使用默认的
    server.SubscriberQueue(queueName),         // 队列名称
  )
  // 发布事件消息
  event := micro.NewEvent(subcribeTopic, service.Client())
  go loopPublish(event)
  log.Println("Service is running ...")
  if err := service.Run(); err != nil {
    log.Println(err)
  }
}

主要逻辑是:

1、先创建一个RabbitMQ Broker,它实现了标准的Broker接口。其中主要的参数是RabbitMQ的访问地址和RabbitMQ交换机,PrefetchCount是订阅者(或称为消费者)使用的。

2、然后通过 NewService 创建go-micro服务,并将broker设置进去。这里边会初始化很多东西,最核心的是创建一个rpcServer,并将rpcServer和这个broker绑定起来。

3、然后是通过 RegisterSubscriber 注册订阅,这个注册有两个层面的功能:一是如果RabbitMQ上还不存在这个队列时创建队列,并订阅指定topic的消息;二是定义go-micro程序从这个RabbitMQ队列接收数据的处理方式。

这里详细看下订阅的参数:

func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error
  • topic:go-micro使用的是Topic模式,发布者发送消息的时候要指定一个topic,订阅者根据需要只接收某个或某几个topic的消息;
  • s:消息从RabbitMQ接收后会进入这个Server进行处理,它是NewService的时候内部创建的;
  • h:使用了上一步创建的接收消息的函数 handle,Server中的方法会调用这个函数;
  • opts 是订阅的一些选项,这里需要指定RabbitMQ队列的名称;另外SubscriberContext定义了订阅的一些行为,这里DurableQueue设置RabbitMQ订阅消息的持久化方式,一般我们都希望消息不丢失,这个设置的作用是即使程序与RabbitMQ的连接断开,消息也会保存在RabbitMQ队列中;AckOnSuccess和RequeueOnError定义了程序处理消息出现错误时的行为,如果handle返回error,消息会重新返回RabbitMQ,然后再投递给程序。

4、然后这里为了演示,通过NewEvent创建了一个Event,通过它每隔一秒发送1条消息。

5、最后通过service.Run()把这个程序启动起来。

辛苦写了半天,看一下这个程序的运行效果:

1689147237515.png


注意一般发布者和订阅者是在不同的程序中,这里只是为了方便演示,才把他们放在一个程序中。所以如果只是发布消息,就不需要订阅的代码,如果只是订阅,也不需要发布消息的代码,大家使用的时候根据需要自己裁剪吧。

go-micro集成RabbitMQ的处理流程

这个部分来看一下消息在go-micro和RabbitMQ中是怎么流转的,我画了一个示意图:

1689147463763.png

这个图有点复杂,这里详细讲解下。

首先分成三块:RabbitMQ、消息发布部分、消息接收部分,这里用不同的颜色进行了区分。

  • RabbitMQ不是本文的重点,就把它看成一个整体就行了。
  • 消息发布部分:从生产者程序调用Event.Publish开始,然后调用Client.Publish,到这里为止,都是在go-micro的核心模块中进行处理;然后再调用Broker.Publish,这里的Broker是RabbitMQ插件的Broker实例,从这里开始进入了RabbiitMQ插件部分,然后再依次通过RabbitMQ Connection的Publish方法、RabbitMQ Channle的Publish方法,最终发送到RabbitMQ中。
  • 消息接收部分:Service.Run内部会调用rpcServer.Start,这个方法内部会调用Broker.Subscribe,这个方法是RabbitMQ插件中定义的,它会读取RegisterSubscriber时的一些RabbitMQ队列设置,然后再依次传递到RabbitMQ Connection的Consume方法、RabbitMQ Channel的ConsumeQueue方法,最终连接到RabbitMQ,并在RabbitMQ上设置好要订阅的队列;这些方法还会返回一个类型为amqp.Delivery的Go Channel,Broker.Subscribe不断的从这个Go Channel中读取数据,然后再发送到调用Broker.Subscribe时传入的一个消息处理方法中,这里就是rpcServer.HandleEvnet,消息经过一些处理后再进入rpcServer内部的路由处理模块,这里就是route.ProcessMessage,这个方法内部会根据当前消息的topic查找RegisterSubscriber时注册的订阅,并最终调用到当时注册的用于接收消息的函数。

这个处理过程还可以划分为业务部分、核心模块部分和插件部分。

  • 首先创建一个插件的Broker实现,把它注册到核心模块的rpcServer中;
  • 消息的发送从业务部分进入核心模块部分,再进入具体实现Broker的插件部分;
  • 消息的接收则首先进入插件部分,然后再流转到核心模块部分,再流转到业务部分。

从上边的图中可以看到消息都需要经过这个RabbitMQ插件进行处理,实际上可以只使用这个插件,就能实现消息的发送和接收。这个演示代码我已经提交到了Github,有兴趣的同学可以在文末获取Github仓库的地址。

从上边这些划分中,我们可以理解到设计者的整体设计思路,把握关键节点,用好用对,出现问题时可以快速定位。

填的几个坑

不能接收其它框架发布的消息

这个是因为route.ProcessMessage查找订阅时使用了go-micro专用的一个头信息:

// get the subscribers by topic
  subs, ok := router.subscribers[msg.Topic()]

这个msg.Topic返回的是如下实例中的topic字段:

  rpcMsg := &rpcMessage{
    topic:       msg.Header["Micro-Topic"],
    contentType: ct,
    payload:     &raw.Frame{Data: msg.Body},
    codec:       cf,
    header:      msg.Header,
    body:        msg.Body,
  }

其它框架不会有这么一个头信息,除非专门适配go-micro。

因为使用RabbitMQ的场景下,整个开发都是围绕RabbitMQ做的,而且go-micro的处理逻辑没有考虑RabbitMQ订阅可以使用通配符的情况,发布消息的Topic、接收消息的Topic与Micro-Topic的值匹配时都是按照是否相等的原则处理的,因此可以用RabbitMQ消息自带的topic来设置这个消息头。rabbitmq.rbroker.Subscribe 中接收到消息后,就可以进行这个设置:

// Messages sent from other frameworks to rabbitmq do not have this header.
    // The 'RoutingKey' in the message can be used as this header.
    // Then the message can be transfered to the subscriber which bind this topic.
    msgTopic := header["Micro-Topic"]
    if msgTopic == "" {
      header["Micro-Topic"] = msg.RoutingKey
    }

这样go-micro开发的消费者程序就能接收其它框架发布的消息了,其它框架无需适配。

RabbitMQ重启后订阅者和发布者无限阻塞

go-micro的RabbitMQ插件底层使用另一个库:github.com/streadway/amqp

对于发布者,RabbitMQ断开连接时amqp库会通过Go Channel同步通知go-micro,然后go-micro可以发起重新连接。问题出现在这个同步通知上,go-micro的RabbitMQ插件设置了接收连接和通道的关闭通知,但是只处理了一个通知就去重新连接了,这就导致有一个Go Channel一直阻塞,而这个阻塞会导致某个锁不能释放,这个锁又是Publish时候需要的,因此导致发布者无限阻塞。解决办法就是外层增加一个循环,等所有的通知都收到了,再去做重新连接。

对于订阅者,RabbitMQ断开连接时,它会一直阻塞在某个Go Channel上,直到它返回一个值,这个值代表连接已经重新建立,订阅者可以重建消费通道。问题也是出现在这个阻塞的Go Channel上,因为这个Go Channel在每次收到amqp的关闭通知时会重新赋值,而订阅者等待的Go Channel可能是之前的旧值,永远也不会返回,订阅者也就无限阻塞了。解决办法呢,就是在select时增加一个time.After,让等待的Go Channel有机会更新到新值。

代码就不贴了,有兴趣的可以到Github中去看:github.com/go-micro/pl…

关于这两个问题的修改已经合并到官方仓库中,大家去get最新的代码就可以了。

这两个坑填了,基本上就能满足我的需要了。当然可能还有其它的坑,比如go-micro的RabbitMQ插件好像没有发布者确认的功能,这个要实现,还得好好想想怎么改。


好了,以上就是本文的主要内容。

老规矩,代码已经上传到Github,欢迎访问:github.com/bosima/go-d…


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 弹性计算 Kubernetes
RabbitMQ与容器化技术的集成实践
【8月更文第28天】RabbitMQ 是一个开源消息代理和队列服务器,用于在分布式系统中存储、转发消息。随着微服务架构的普及,容器化技术(如 Docker 和 Kubernetes)成为了部署和管理应用程序的标准方式。本文将探讨如何使用 Docker 和 Kubernetes 在生产环境中部署和管理 RabbitMQ 服务,同时保证高可用性和弹性伸缩能力。
50 3
|
2月前
|
Shell Go API
Go语言grequests库并发请求的实战案例
Go语言grequests库并发请求的实战案例
|
2天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
14天前
|
Dart Android开发
鸿蒙Flutter实战:03-鸿蒙Flutter开发中集成Webview
本文介绍了在OpenHarmony平台上集成WebView的两种方法:一是使用第三方库`flutter_inappwebview`,通过配置pubspec.lock文件实现;二是编写原生ArkTS代码,自定义PlatformView,涉及创建入口能力、注册视图工厂、处理方法调用及页面构建等步骤。
27 0
|
2月前
|
Kubernetes Go 持续交付
一个基于Go程序的持续集成/持续部署(CI/CD)
本教程通过一个简单的Go程序示例,展示了如何使用GitHub Actions实现从代码提交到Kubernetes部署的CI/CD流程。首先创建并版本控制Go项目,接着编写Dockerfile构建镜像,再配置CI/CD流程自动化构建、推送Docker镜像及部署应用。此流程基于GitHub仓库,适用于快速迭代开发。
42 3
|
2月前
|
Kubernetes 持续交付 Go
创建一个基于Go程序的持续集成/持续部署(CI/CD)流水线
创建一个基于Go程序的持续集成/持续部署(CI/CD)流水线
|
2月前
|
监控 关系型数据库 MySQL
zabbix agent集成percona监控MySQL的插件实战案例
这篇文章是关于如何使用Percona监控插件集成Zabbix agent来监控MySQL的实战案例。
41 2
zabbix agent集成percona监控MySQL的插件实战案例
|
24天前
|
消息中间件 数据采集 中间件
RabbitMQ的使用—实战
RabbitMQ的使用—实战
54 0
|
2月前
|
安全 大数据 Go
深入探索Go语言并发编程:Goroutines与Channels的实战应用
在当今高性能、高并发的应用需求下,Go语言以其独特的并发模型——Goroutines和Channels,成为了众多开发者眼中的璀璨明星。本文不仅阐述了Goroutines作为轻量级线程的优势,还深入剖析了Channels作为Goroutines间通信的桥梁,如何优雅地解决并发编程中的复杂问题。通过实战案例,我们将展示如何利用这些特性构建高效、可扩展的并发系统,同时探讨并发编程中常见的陷阱与最佳实践,为读者打开Go语言并发编程的广阔视野。
|
2月前
|
消息中间件 缓存 Java
RocketMQ的JAVA落地实战
RocketMQ作为一款高性能、高可靠、高实时、分布式特点的消息中间件,其核心作用主要体现在异步处理、削峰填谷以及系统解耦三个方面。
135 0