我们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式的基本应用

简介: 我们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式的基本应用

我们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式的基本应用

嗨,大家好,我是小魔童哪吒,咱们从今天开始进入开源组件的学习,一边学习一边总结一边分享

文章提纲如下:

  • RabbitMQ 成员组成
  • RabbitMQ 的六种工作模式编码

RabbitMQ 成员组成

  • 生产者 producer
  • 消费者 consumer
  • 交换机 exchange

用于接受、分配消息

  • 消息 message
  • 队列 queue

用于存储生产者的消息

  • 信道 channel AMQP

消息推送使用的通道

  • 连接 connections

生成者或者消费者与Rabbit 建立的TCP 连接

  • 路由键 routingKey

用于把生成者的数据分配到交换器上

  • 绑定键 BindingKey

用于把交换器的消息绑定到队列上

  • 连接管理器 ConnectionFactory

应用程序与 Rabbit 之间建立连接的管理器,程序代码中使用

RabbitMQ 的六种工作模式编码

single 模式

  • 消息产生者将消息放入队列
  • 消息的消费者监听消息队列,如果队列中有消息就消费掉

目录如下:

.
├── consumer.go
├── go.mod
├── go.sum
├── main.go
└── xmtmq
    └── xmtmq.go

实际编码如下:

每种模式的编码思路如下:

生产者 / 消费者

  • 连接 RabbitMQ 的 server
  • 初始化连接 connection
  • 初始化通道 channel
  • 初始化交换机 exchange
  • 初始化队列 queue
  • 使用路由key,绑定队列 bind , key
  • 生产消息 / 消费消息 produce , consume

消息xmtmq.go

package xmtmq
import (
   "github.com/streadway/amqp"
   "log"
)
// single 模式
// 定义 RabbitMQ 的数据结构
// go get github.com/streadway/amqp
type RabbitMQ struct {
   conn      *amqp.Connection // 连接
   channel   *amqp.Channel    // 通道
   QueueName string           // 队列名
   Exchange  string           // 交换机
   Key       string           // 路由键
   MQUrl     string           // MQ的虚拟机地址
}
// New 一个 RabbitMQ
func NewRabbitMQ(rbt *RabbitMQ) {
   if rbt == nil || rbt.QueueName == ""  || rbt.MQUrl == "" {
      log.Panic("please check QueueName,Exchange,MQUrl ...")
   }
   conn, err := amqp.Dial(rbt.MQUrl)
   if err != nil {
      log.Panicf("amqp.Dial error : %v", err)
   }
   rbt.conn = conn
   channel, err := rbt.conn.Channel()
   if err != nil {
      log.Panicf("rbt.conn.Channel error : %v", err)
   }
   rbt.channel = channel
}
func RabbitMQFree(rbt *RabbitMQ){
   if rbt == nil{
      log.Printf("rbt is nil,free failed")
      return
   }
   rbt.channel.Close()
   rbt.conn.Close()
}
func (rbt *RabbitMQ) Init() {
   // 申请队列
   _, err := rbt.channel.QueueDeclare(
      rbt.QueueName, // 队列名
      true,          // 是否持久化
      false,         // 是否自动删除
      false,         // 是否排他
      false,         // 是否阻塞
      nil,           // 其他参数
   )
   if err != nil {
      log.Printf("rbt.channel.QueueDeclare error : %v", err)
      return
   }
}
// 生产消息
func (rbt *RabbitMQ) Produce(data []byte) {
   // 向队列中加入数据
   err := rbt.channel.Publish(
      rbt.Exchange,        // 交换机
      rbt.QueueName,       // 队列名
      false,    // 若为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者
      false,    // 若为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
      amqp.Publishing{
         ContentType: "text/plain",
         Body:        data,
      },
   )
   if err != nil {
      log.Printf("rbt.channel.Publish error : %v", err)
      return
   }
   return
}
// 消费消息
func (rbt *RabbitMQ) Consume() {
   // 消费数据
   msg, err := rbt.channel.Consume(
      rbt.QueueName,    // 队列名
      "xmt",    // 消费者的名字
      true,     // 是否自动应答
      false,    // 是否排他
      false,    // 若为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
      false,    // 是否阻塞
      nil,         // 其他属性
   )
   if err != nil {
      log.Printf("rbt.channel.Consume error : %v", err)
      return
   }
   for data := range msg {
      log.Printf("received data is %v", string(data.Body))
   }
}

main.go

package main
import (
   "fmt"
   "log"
   "time"
   "xmt/xmtmq"
)
/*
RabbimtMQ single 模式 案例
应用场景:简单消息队列的使用,一个生产者一个消费者
生产消息
*/
func main() {
  // 设置日志
   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
   rbt := &xmtmq.RabbitMQ{
      QueueName: "xmtqueue",
      MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",
   }
   xmtmq.NewRabbitMQ(rbt)
   var index = 0
   for {
       // 生产消息
      rbt.Produce([]byte(fmt.Sprintf("hello wolrd %d ", index)))
      log.Println("发送成功 ", index)
      index++
      time.Sleep(1 * time.Second)
   }
}

consumer.go

package main
import (
   "log"
   "xmt/xmtmq"
)
func main() {
   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
   rbt := &xmtmq.RabbitMQ{
      QueueName: "xmtqueue",
      MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",
   }
   xmtmq.NewRabbitMQ(rbt)
   rbt.Consume()
}

运行的时候,打开2个终端

终端1:go run main.go

终端2:go run consumer.go

work 模式

多个消费端消费同一个队列中的消息,队列采用轮询的方式将消息是平均发送给消费者,此处的资源是竞争关系

当生产者生产消息的速度大于消费者消费的速度,就要考虑用 work 工作模式,这样能提高处理速度提高负载

work 模式与 single 模式类似, 只是work 模式比 single 模式多了一些消费者

基于single 模式,开一个终端3 : go run consumer.go

publish / subscribe 模式

publish / subscribe 发布订阅模式 , 相对于Work queues模式多了一个交换机,此处的资源是共享的

用于场景

  • 邮件群发
  • 群聊天
  • 广播(广告等)

目录和上述编码保持一致:

xmtmq.go

开始用到交换机 exchange ,fanout 类型

生产端先把消息发送到交换机,再由交换机把消息发送到绑定的队列中,每个绑定的队列都能收到由生产端发送的消息

package xmtmq
import (
   "github.com/streadway/amqp"
   "log"
)
// publish 模式
// 定义 RabbitMQ 的数据结构
// go get github.com/streadway/amqp
type RabbitMQ struct {
   conn      *amqp.Connection // 连接
   channel   *amqp.Channel    // 通道
   QueueName string           // 队列名
   Exchange  string           // 交换机
   Key       string           // 路由键
   MQUrl     string           // MQ的虚拟机地址
}
// New 一个 RabbitMQ
func NewRabbitMQ(rbt *RabbitMQ) {
   if rbt == nil || rbt.Exchange == "" || rbt.MQUrl == "" {
      log.Panic("please check Exchange,MQUrl ...")
   }
   conn, err := amqp.Dial(rbt.MQUrl)
   if err != nil {
      log.Panicf("amqp.Dial error : %v", err)
   }
   rbt.conn = conn
   channel, err := rbt.conn.Channel()
   if err != nil {
      log.Panicf("rbt.conn.Channel error : %v", err)
   }
   rbt.channel = channel
}
func RabbitMQFree(rbt *RabbitMQ) {
   if rbt == nil {
      log.Printf("rbt is nil,free failed")
      return
   }
   rbt.channel.Close()
   rbt.conn.Close()
}
func (rbt *RabbitMQ) Init() {
   // 1、创建交换机
   err := rbt.channel.ExchangeDeclare(
      rbt.Exchange,        // 交换机
      amqp.ExchangeFanout, // 交换机类型
      true,                // 是否持久化
      false,               //是否自动删除
      false,               //true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
      false,               // 是否阻塞
      nil,                 // 其他属性
   )
   if err != nil {
      log.Printf("rbt.channel.ExchangeDeclare error : %v", err)
      return
   }
}
// 生产消息 publish
func (rbt *RabbitMQ) PublishMsg(data []byte) {
   // 1、向队列中加入数据
   err := rbt.channel.Publish(
      rbt.Exchange, // 交换机
      "",           // 队列名
      false,        // 若为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者
      false,        // 若为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
      amqp.Publishing{
         ContentType: "text/plain",
         Body:        data,
      },
   )
   if err != nil {
      log.Printf("rbt.channel.Publish error : %v", err)
      return
   }
   return
}
// 消费消息
func (rbt *RabbitMQ) SubscribeMsg() {
   // 1、创建队列
   q, err := rbt.channel.QueueDeclare(
      "", // 此处我们传入的是空,则是随机产生队列的名称
      true,
      false,
      false,
      false,
      nil,
   )
   if err != nil {
      log.Printf("rbt.channel.QueueDeclare error : %v", err)
      return
   }
   // 2、绑定队列
   err = rbt.channel.QueueBind(
      q.Name,       // 队列名字
      "",           // 在publish模式下,这里key 为空
      rbt.Exchange, // 交换机名称
      false,        // 是否阻塞
      nil,          // 其他属性
   )
   if err != nil {
      log.Printf("rbt.channel.QueueBind error : %v", err)
      return
   }
   // 3、消费数据
   msg, err := rbt.channel.Consume(
      q.Name, // 队列名
      "xmt",  // 消费者的名字
      true,   // 是否自动应答
      false,  // 是否排他
      false,  // 若为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
      false,  // 是否阻塞
      nil,    // 其他属性
   )
   if err != nil {
      log.Printf("rbt.channel.Consume error : %v", err)
      return
   }
   for data := range msg {
      log.Printf("received data is %v", string(data.Body))
   }
}

main.go

package main
import (
   "fmt"
   "log"
   "time"
   "xmt/xmtmq"
)
/*
RabbimtMQ publish 模式 案例
应用场景:邮件群发,群聊天,广播(广告)
生产消息
*/
func main() {
   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
   rbt := &xmtmq.RabbitMQ{
      Exchange:  "xmtPubEx",
      MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",
   }
   xmtmq.NewRabbitMQ(rbt)
   rbt.Init()
   var index = 0
   for {
      rbt.PublishMsg([]byte(fmt.Sprintf("hello wolrd %d ", index)))
      log.Println("发送成功 ", index)
      index++
      time.Sleep(1 * time.Second)
   }
   xmtmq.RabbitMQFree(rbt)
}

consumer.go

package main
import (
   "log"
   "xmt/xmtmq"
)
func main() {
   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
   rbt := &xmtmq.RabbitMQ{
      Exchange: "xmtPubEx",
      MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",
   }
   xmtmq.NewRabbitMQ(rbt)
   rbt.SubscribeMsg()
   xmtmq.RabbitMQFree(rbt)
}

执行的操作和上述保持一致

终端1:go run main.go

终端2:go run consumer.go

终端3:go run consumer.go

效果和上述single 模式和 work模式的明显区别是:发布订阅模式的案例,生产者生产的消息,对应的消费者消费其生产的内容

routing 模式

消息生产者将消息发送给交换机按照路由判断,路由是字符串 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息

**应用场景:**从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中业务场景,例如处理错误,处理特定消息等

生产者处理流程:

声明队列并声明交换机 -> 创建连接 -> 创建通道 -> 通道声明交换机 -> 通道声明队列 -> 通过通道使队列绑定到交换机并指定该队列的routingkey(通配符) -> 制定消息 -> 发送消息并指定routingkey(通配符)

消费者处理流程:

声明队列并声明交换机 -> 创建连接 -> 创建通道 -> 通道声明交换机 -> 通道声明队列 -> 通过通道使队列绑定到交换机并指定routingkey(通配符) -> 重写消息消费方法 -> 执行消息方法

目录结构如下:

.
├── consumer2.go
├── consumer.go
├── go.mod
├── go.sum
├── main.go
└── xmtmq
    └── xmtmq.go

xmtmq.go

  • 用到交换机 为 direct 类型
  • 用到路由键
package xmtmq
import (
   "github.com/streadway/amqp"
   "log"
)
// routing 模式
// 定义 RabbitMQ 的数据结构
// go get github.com/streadway/amqp
type RabbitMQ struct {
   conn      *amqp.Connection // 连接
   channel   *amqp.Channel    // 通道
   QueueName string           // 队列名
   Exchange  string           // 交换机
   Key       string           // 路由键
   MQUrl     string           // MQ的虚拟机地址
}
// New 一个 RabbitMQ
func NewRabbitMQ(rbt *RabbitMQ) {
   if rbt == nil || rbt.Exchange == "" || rbt.QueueName == "" || rbt.Key == "" || rbt.MQUrl == "" {
      log.Panic("please check Exchange,,QueueName,Key,MQUrl ...")
   }
   conn, err := amqp.Dial(rbt.MQUrl)
   if err != nil {
      log.Panicf("amqp.Dial error : %v", err)
   }
   rbt.conn = conn
   channel, err := rbt.conn.Channel()
   if err != nil {
      log.Panicf("rbt.conn.Channel error : %v", err)
   }
   rbt.channel = channel
}
func RabbitMQFree(rbt *RabbitMQ) {
   if rbt == nil {
      log.Printf("rbt is nil,free failed")
      return
   }
   rbt.channel.Close()
   rbt.conn.Close()
}
func (rbt *RabbitMQ) Init() {
   // 1、创建交换机
   err := rbt.channel.ExchangeDeclare(
      rbt.Exchange, // 交换机
      amqp.ExchangeDirect,     // 交换机类型
      true,         // 是否持久化
      false,        //是否自动删除
      false,        //true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
      false,        // 是否阻塞
      nil,          // 其他属性
   )
   if err != nil {
      log.Printf("rbt.channel.ExchangeDeclare error : %v", err)
      return
   }
   // 2、创建队列
   _, err = rbt.channel.QueueDeclare(
      rbt.QueueName, // 此处我们传入的是空,则是随机产生队列的名称
      true,
      false,
      false,
      false,
      nil,
   )
   if err != nil {
      log.Printf("rbt.channel.QueueDeclare error : %v", err)
      return
   }
   // 3、绑定队列
   err = rbt.channel.QueueBind(
      rbt.QueueName, // 队列名字
      rbt.Key,       // routing,这里key 需要填
      rbt.Exchange,  // 交换机名称
      false,         // 是否阻塞
      nil,           // 其他属性
   )
   if err != nil {
      log.Printf("rbt.channel.QueueBind error : %v", err)
      return
   }
}
// 生产消息 publish
func (rbt *RabbitMQ) ProduceRouting(data []byte) {
   // 1、向队列中加入数据
   err := rbt.channel.Publish(
      rbt.Exchange, // 交换机
      rbt.Key,      // key
      false,        // 若为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者
      false,        // 若为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
      amqp.Publishing{
         ContentType: "text/plain",
         Body:        data,
      },
   )
   if err != nil {
      log.Printf("rbt.channel.Publish error : %v", err)
      return
   }
   return
}
// 消费消息
func (rbt *RabbitMQ) ConsumeRoutingMsg() {
   // 4、消费数据
   msg, err := rbt.channel.Consume(
      rbt.QueueName, // 队列名
      "",     // 消费者的名字
      true,   // 是否自动应答
      false,  // 是否排他
      false,  // 若为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
      false,  // 是否阻塞
      nil,    // 其他属性
   )
   if err != nil {
      log.Printf("rbt.channel.Consume error : %v", err)
      return
   }
   for data := range msg {
      log.Printf("received data is %v", string(data.Body))
   }
}

main.go

package main
import (
   "fmt"
   "log"
   "time"
   "xmt/xmtmq"
)
/*
RabbimtMQ routing 模式 案例
应用场景:从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中业务场景,例如处理错误,处理特定消息等
生产消息
*/
func main() {
   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
   rbt1 := &xmtmq.RabbitMQ{
      Exchange: "xmtPubEx2",
      Key: "xmt1",
      QueueName: "Routingqueuexmt1",
      MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",
   }
   xmtmq.NewRabbitMQ(rbt1)
   rbt1.Init()
   rbt2 := &xmtmq.RabbitMQ{
      Exchange: "xmtPubEx2",
      Key: "xmt2",
      QueueName: "Routingqueuexmt2",
      MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",
   }
   xmtmq.NewRabbitMQ(rbt2)
   rbt2.Init()
   var index = 0
   for {
      rbt1.ProduceRouting([]byte(fmt.Sprintf("hello wolrd xmt1  %d ", index)))
      log.Println("发送成功xmt1  ", index)
      rbt2.ProduceRouting([]byte(fmt.Sprintf("hello wolrd xmt2  %d ", index)))
      log.Println("发送成功xmt2  ", index)
      index++
      time.Sleep(1 * time.Second)
   }
   xmtmq.RabbitMQFree(rbt1)
   xmtmq.RabbitMQFree(rbt2)
}

consumer.go

package main
import (
   "log"
   "xmt/xmtmq"
)
func main() {
   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
   rbt := &xmtmq.RabbitMQ{
      Exchange: "xmtPubEx2",
      Key: "xmt1",
      QueueName: "Routingqueuexmt1",
      MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",
   }
   xmtmq.NewRabbitMQ(rbt)
   rbt.ConsumeRoutingMsg()
   xmtmq.RabbitMQFree(rbt)
}

consumer2.go

package main
import (
   "log"
   "xmt/xmtmq"
)
func main() {
   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
   rbt := &xmtmq.RabbitMQ{
      Exchange: "xmtPubEx2",
      Key:      "xmt2",
      QueueName: "Routingqueuexmt2",
      MQUrl:    "amqp://guest:guest@127.0.0.1:5672/xmtmq",
   }
   xmtmq.NewRabbitMQ(rbt)
   rbt.ConsumeRoutingMsg()
   xmtmq.RabbitMQFree(rbt)
}

topic 模式

话题模式,一个消息被多个消费者获取,消息的目标 queue 可用 BindingKey 的通配符

Topics 模式实际上是路由模式的一种

他俩的最大的区别是 : Topics 模式发送消息和消费消息的时候是通过通配符去进行匹配的

  • *号代表可以同通配一个单词
  • #号代表可以通配零个或多个单词

编码的案例与上述 routing 模式保持一直,只是 exchange 为 topic类型

如下是上述几种模式涉及到的交换机队列

rpc 模式

RPC 远程过程调用,客户端远程调用服务端的方法 ,使用 MQ 可以实现 RPC 的异步调用

目录结构为:

.
├── consumer.go
├── go.mod
├── go.sum
├── main.go
└── xmtmq
    └── xmtmq.go
  • 客户端即是生产者也是消费者,向 RPC 请求队列发送 RPC 调用消息,同时监听RPC响应队列
  • 服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果
  • 服务端将RPC方法 的结果发送到RPC响应队列。
  • 客户端监听RPC响应队列,接收到RPC调用结果

xmtmq.go

package xmtmq
import (
   "github.com/streadway/amqp"
   "log"
   "math/rand"
)
// rpc 模式
// 定义 RabbitMQ 的数据结构
// go get github.com/streadway/amqp
type RabbitMQ struct {
   conn      *amqp.Connection // 连接
   channel   *amqp.Channel    // 通道
   QueueName string           // 队列名
   Exchange  string           // 交换机
   Key       string           // 路由键
   MQUrl     string           // MQ的虚拟机地址
}
// New 一个 RabbitMQ
func NewRabbitMQ(rbt *RabbitMQ) {
   if rbt == nil || rbt.QueueName == "" || rbt.MQUrl == "" {
      log.Panic("please check QueueName,Exchange,MQUrl ...")
   }
   conn, err := amqp.Dial(rbt.MQUrl)
   if err != nil {
      log.Panicf("amqp.Dial error : %v", err)
   }
   rbt.conn = conn
   channel, err := rbt.conn.Channel()
   if err != nil {
      log.Panicf("rbt.conn.Channel error : %v", err)
   }
   rbt.channel = channel
}
func RabbitMQFree(rbt *RabbitMQ) {
   if rbt == nil {
      log.Printf("rbt is nil,free failed")
      return
   }
   rbt.channel.Close()
   rbt.conn.Close()
}
// 生产消息
func (rbt *RabbitMQ) Produce(data []byte) {
   // 申请队列
   q, err := rbt.channel.QueueDeclare(
      rbt.QueueName, // 队列名
      true,          // 是否持久化
      false,         // 是否自动删除
      false,         // 是否排他
      false,         // 是否阻塞
      nil,           // 其他参数
   )
   if err != nil {
      log.Printf("rbt.channel.QueueDeclare error : %v", err)
      return
   }
   err = rbt.channel.Qos(1, 0, false)
   if err != nil {
      log.Printf("rbt.channel.Qos error : %v", err)
      return
   }
   d, err := rbt.channel.Consume(
      q.Name,
      "",
      false,
      false,
      false,
      false,
      nil)
   if err != nil {
      log.Printf("rbt.channel.Consume error : %v", err)
      return
   }
   for msg := range d {
      log.Println("received msg is  ", string(msg.Body))
      err := rbt.channel.Publish(
         "",
         msg.ReplyTo,
         false,
         false,
         amqp.Publishing{
            ContentType:   "test/plain",
            CorrelationId: msg.CorrelationId,
            Body:          data,
         })
      if err != nil {
         log.Printf("rbt.channel.Publish error : %v", err)
         return
      }
      msg.Ack(false)
      log.Println("svr response ok ")
   }
   return
}
func randomString(l int) string {
   bytes := make([]byte, l)
   for i := 0; i < l; i++ {
      bytes[i] = byte(rand.Intn(l))
   }
   return string(bytes)
}
// 消费消息
func (rbt *RabbitMQ) Consume() {
   // 申请队列
   q, err := rbt.channel.QueueDeclare(
      "",    // 队列名
      true,  // 是否持久化
      false, // 是否自动删除
      false, // 是否排他
      false, // 是否阻塞
      nil,   // 其他参数
   )
   if err != nil {
      log.Printf("rbt.channel.QueueDeclare error : %v", err)
      return
   }
   // 消费数据
   msg, err := rbt.channel.Consume(
      q.Name, // 队列名
      "xmt",  // 消费者的名字
      true,   // 是否自动应答
      false,  // 是否排他
      false,  // 若为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
      false,  // 是否阻塞
      nil,    // 其他属性
   )
   if err != nil {
      log.Printf("rbt.channel.Consume error : %v", err)
      return
   }
   id := randomString(32)
   err = rbt.channel.Publish(
      "",
      rbt.QueueName,
      false,
      false,
      amqp.Publishing{
         ContentType:   "test/plain",
         CorrelationId: id,
         ReplyTo:       q.Name,
         Body:          []byte("321"),
      })
   if err != nil {
      log.Printf("rbt.channel.Publish error : %v", err)
      return
   }
   for data := range msg {
      log.Printf("received data is %v", string(data.Body))
   }
}

main.go

package main
import (
   "fmt"
   "log"
   "xmt/xmtmq"
)
/*
RabbimtMQ rpc 模式 案例
应用场景:简单消息队列的使用,一个生产者一个消费者
生产消息
*/
func main() {
   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
   rbt := &xmtmq.RabbitMQ{
      QueueName: "xmtqueue",
      MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",
   }
   xmtmq.NewRabbitMQ(rbt)
   rbt.Produce([]byte(fmt.Sprintf("hello wolrd")))
}

consumer.go

package main
import (
   "log"
   "math/rand"
   "time"
   "xmt/xmtmq"
)
func main() {
   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
   rand.Seed(time.Now().UTC().UnixNano())
   rbt := &xmtmq.RabbitMQ{
      QueueName: "xmtqueue",
      MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",
   }
   xmtmq.NewRabbitMQ(rbt)
   rbt.Consume()
}

咱们先运行消费者,多运行几个,可以看到咱们的队列中已经有数据了,咱们运行的是2个消费者,因此此处是 2

再运行生产者,就能看到生产者将消费者发送的消息消费掉,并且通过 CorrelationId 找到对应消费者监听的队列,将数据发送到队列中

消费者监听的队列有数据了,消费者就取出来进行消费

总结

RabbitMQ 的六种工作模式:

  • single 模式
  • work 模式
  • publish / subscribe 模式
  • routing 模式
  • topic 模式
  • rpc 模式

参考资料:

RabbitMQ Tutorials

欢迎点赞,关注,收藏

朋友们,你的支持和鼓励,是我坚持分享,提高质量的动力

好了,本次就到这里

技术是开放的,我们的心态,更应是开放的。拥抱变化,向阳而生,努力向前行。

我是小魔童哪吒,欢迎点赞关注收藏,下次见~

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
8天前
|
消息中间件 网络协议 RocketMQ
RocketMQ Controller 模式 始终更新成本机ip
ontrollerAddr=192.168.24.241:8878 但是日志输出Update controller leader address to 127.0.0.1:8878。导致访问失败
40 3
|
14天前
|
消息中间件 存储 监控
说说MQ在你项目中的应用(一)
本文总结了消息队列(MQ)在项目中的应用,主要围绕异步处理、系统解耦和流量削峰三大功能展开。通过分析短信通知和业务日志两个典型场景,介绍了MQ的实现方式及其优势。短信通知中,MQ用于异步发送短信并处理状态更新;业务日志中,Kafka作为高吞吐量的消息系统,负责收集和传输系统及用户行为日志,确保数据的可靠性和高效处理。MQ不仅提高了系统的灵活性和响应速度,还提供了重试机制和状态追踪等功能,保障了业务的稳定运行。
53 6
|
14天前
|
消息中间件 存储 中间件
说说MQ在你项目中的应用(二)商品支付
本文总结了消息队列(MQ)在支付订单业务中的应用,重点分析了RabbitMQ的优势。通过异步处理、系统解耦和流量削峰等功能,RabbitMQ确保了支付流程的高效与稳定。具体场景包括用户下单、支付请求、商品生产和物流配送等环节。相比Kafka,RabbitMQ在低吞吐量、高实时性需求下表现更优,提供了更低延迟和更高的可靠性。
29 0
|
2月前
|
消息中间件 存储 Apache
探索 RocketMQ:企业级消息中间件的选择与应用
RocketMQ 是一个高性能、高可靠、可扩展的分布式消息中间件,它是由阿里巴巴开发并贡献给 Apache 软件基金会的一个开源项目。RocketMQ 主要用于处理大规模、高吞吐量、低延迟的消息传递,它是一个轻量级的、功能强大的消息队列系统,广泛应用于金融、电商、日志系统、数据分析等领域。
106 0
探索 RocketMQ:企业级消息中间件的选择与应用
|
5月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
87 2
|
5月前
|
消息中间件
RabbitMQ广播模式
RabbitMQ广播模式
88 1
|
5月前
|
消息中间件 应用服务中间件 网络安全
rabbitMQ镜像模式搭建
rabbitMQ镜像模式搭建
|
5月前
|
消息中间件 开发工具
【Azure Event Hub】原生应用中使用RabbitMQ,是否可以不改动代码的情况下直接转换为使用Event Hub呢?
【Azure Event Hub】原生应用中使用RabbitMQ,是否可以不改动代码的情况下直接转换为使用Event Hub呢?
|
5月前
|
消息中间件 Java Maven
RabbitMQ通配符模式
RabbitMQ通配符模式
76 0