我们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式的基本应用
嗨,大家好,我是小魔童哪吒,咱们从今天开始进入开源组件的学习,一边学习一边总结一边分享
文章提纲如下:
- RabbitMQ 成员组成
- RabbitMQ 的六种工作模式编码
RabbitMQ 成员组成
- 生产者 producer
- 消费者 consumer
- 交换机 exchange
用于接受、分配消息
- 消息 message
- 队列 queue
用于存储生产者的消息
- 信道 channel AMQP
消息推送使用的通道
- 连接 connections
生成者或者消费者与Rabbit 建立的TCP 连接
- 路由键 routingKey
用于把生成者的数据分配到交换器上
- 绑定键 BindingKey
用于把交换器的消息绑定到队列上
- 连接管理器 ConnectionFactory
应用程序与 Rabbit 之间建立连接的管理器,程序代码中使用
RabbitMQ 的六种工作模式编码
single 模式
- 消息产生者将消息放入队列
- 消息的消费者监听消息队列,如果队列中有消息就消费掉
目录如下:
. ├── consumer.go ├── go.mod ├── go.sum ├── main.go └── xmtmq └── xmtmq.go
实际编码如下:
每种模式的编码思路如下:
生产者 / 消费者
- 连接 RabbitMQ 的 server
- 初始化连接 connection
- 初始化通道 channel
- 初始化交换机 exchange
- 初始化队列 queue
- 使用路由key,绑定队列 bind , key
- 生产消息 / 消费消息 produce , consume
消息xmtmq.go
package xmtmq import ( "github.com/streadway/amqp" "log" ) // single 模式 // 定义 RabbitMQ 的数据结构 // go get github.com/streadway/amqp type RabbitMQ struct { conn *amqp.Connection // 连接 channel *amqp.Channel // 通道 QueueName string // 队列名 Exchange string // 交换机 Key string // 路由键 MQUrl string // MQ的虚拟机地址 } // New 一个 RabbitMQ func NewRabbitMQ(rbt *RabbitMQ) { if rbt == nil || rbt.QueueName == "" || rbt.MQUrl == "" { log.Panic("please check QueueName,Exchange,MQUrl ...") } conn, err := amqp.Dial(rbt.MQUrl) if err != nil { log.Panicf("amqp.Dial error : %v", err) } rbt.conn = conn channel, err := rbt.conn.Channel() if err != nil { log.Panicf("rbt.conn.Channel error : %v", err) } rbt.channel = channel } func RabbitMQFree(rbt *RabbitMQ){ if rbt == nil{ log.Printf("rbt is nil,free failed") return } rbt.channel.Close() rbt.conn.Close() } func (rbt *RabbitMQ) Init() { // 申请队列 _, err := rbt.channel.QueueDeclare( rbt.QueueName, // 队列名 true, // 是否持久化 false, // 是否自动删除 false, // 是否排他 false, // 是否阻塞 nil, // 其他参数 ) if err != nil { log.Printf("rbt.channel.QueueDeclare error : %v", err) return } } // 生产消息 func (rbt *RabbitMQ) Produce(data []byte) { // 向队列中加入数据 err := rbt.channel.Publish( rbt.Exchange, // 交换机 rbt.QueueName, // 队列名 false, // 若为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者 false, // 若为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者 amqp.Publishing{ ContentType: "text/plain", Body: data, }, ) if err != nil { log.Printf("rbt.channel.Publish error : %v", err) return } return } // 消费消息 func (rbt *RabbitMQ) Consume() { // 消费数据 msg, err := rbt.channel.Consume( rbt.QueueName, // 队列名 "xmt", // 消费者的名字 true, // 是否自动应答 false, // 是否排他 false, // 若为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者 false, // 是否阻塞 nil, // 其他属性 ) if err != nil { log.Printf("rbt.channel.Consume error : %v", err) return } for data := range msg { log.Printf("received data is %v", string(data.Body)) } }
main.go
package main import ( "fmt" "log" "time" "xmt/xmtmq" ) /* RabbimtMQ single 模式 案例 应用场景:简单消息队列的使用,一个生产者一个消费者 生产消息 */ func main() { // 设置日志 log.SetFlags(log.Llongfile | log.Ltime | log.Ldate) rbt := &xmtmq.RabbitMQ{ QueueName: "xmtqueue", MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq", } xmtmq.NewRabbitMQ(rbt) var index = 0 for { // 生产消息 rbt.Produce([]byte(fmt.Sprintf("hello wolrd %d ", index))) log.Println("发送成功 ", index) index++ time.Sleep(1 * time.Second) } }
consumer.go
package main import ( "log" "xmt/xmtmq" ) func main() { log.SetFlags(log.Llongfile | log.Ltime | log.Ldate) rbt := &xmtmq.RabbitMQ{ QueueName: "xmtqueue", MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq", } xmtmq.NewRabbitMQ(rbt) rbt.Consume() }
运行的时候,打开2个终端
终端1:go run main.go
终端2:go run consumer.go
work 模式
多个消费端消费同一个队列中的消息,队列采用轮询的方式将消息是平均发送给消费者,此处的资源是竞争关系
当生产者生产消息的速度大于消费者消费的速度,就要考虑用 work 工作模式,这样能提高处理速度提高负载
work 模式与 single 模式类似, 只是work 模式比 single 模式多了一些消费者
基于single
模式,开一个终端3 : go run consumer.go
publish / subscribe 模式
publish / subscribe
发布订阅模式 , 相对于Work queues模式多了一个交换机,此处的资源是共享的
用于场景
- 邮件群发
- 群聊天
- 广播(广告等)
目录和上述编码保持一致:
xmtmq.go
开始用到交换机 exchange ,fanout 类型
生产端先把消息发送到交换机,再由交换机把消息发送到绑定的队列中,每个绑定的队列都能收到由生产端发送的消息
package xmtmq import ( "github.com/streadway/amqp" "log" ) // publish 模式 // 定义 RabbitMQ 的数据结构 // go get github.com/streadway/amqp type RabbitMQ struct { conn *amqp.Connection // 连接 channel *amqp.Channel // 通道 QueueName string // 队列名 Exchange string // 交换机 Key string // 路由键 MQUrl string // MQ的虚拟机地址 } // New 一个 RabbitMQ func NewRabbitMQ(rbt *RabbitMQ) { if rbt == nil || rbt.Exchange == "" || rbt.MQUrl == "" { log.Panic("please check Exchange,MQUrl ...") } conn, err := amqp.Dial(rbt.MQUrl) if err != nil { log.Panicf("amqp.Dial error : %v", err) } rbt.conn = conn channel, err := rbt.conn.Channel() if err != nil { log.Panicf("rbt.conn.Channel error : %v", err) } rbt.channel = channel } func RabbitMQFree(rbt *RabbitMQ) { if rbt == nil { log.Printf("rbt is nil,free failed") return } rbt.channel.Close() rbt.conn.Close() } func (rbt *RabbitMQ) Init() { // 1、创建交换机 err := rbt.channel.ExchangeDeclare( rbt.Exchange, // 交换机 amqp.ExchangeFanout, // 交换机类型 true, // 是否持久化 false, //是否自动删除 false, //true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定 false, // 是否阻塞 nil, // 其他属性 ) if err != nil { log.Printf("rbt.channel.ExchangeDeclare error : %v", err) return } } // 生产消息 publish func (rbt *RabbitMQ) PublishMsg(data []byte) { // 1、向队列中加入数据 err := rbt.channel.Publish( rbt.Exchange, // 交换机 "", // 队列名 false, // 若为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者 false, // 若为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者 amqp.Publishing{ ContentType: "text/plain", Body: data, }, ) if err != nil { log.Printf("rbt.channel.Publish error : %v", err) return } return } // 消费消息 func (rbt *RabbitMQ) SubscribeMsg() { // 1、创建队列 q, err := rbt.channel.QueueDeclare( "", // 此处我们传入的是空,则是随机产生队列的名称 true, false, false, false, nil, ) if err != nil { log.Printf("rbt.channel.QueueDeclare error : %v", err) return } // 2、绑定队列 err = rbt.channel.QueueBind( q.Name, // 队列名字 "", // 在publish模式下,这里key 为空 rbt.Exchange, // 交换机名称 false, // 是否阻塞 nil, // 其他属性 ) if err != nil { log.Printf("rbt.channel.QueueBind error : %v", err) return } // 3、消费数据 msg, err := rbt.channel.Consume( q.Name, // 队列名 "xmt", // 消费者的名字 true, // 是否自动应答 false, // 是否排他 false, // 若为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者 false, // 是否阻塞 nil, // 其他属性 ) if err != nil { log.Printf("rbt.channel.Consume error : %v", err) return } for data := range msg { log.Printf("received data is %v", string(data.Body)) } }
main.go
package main import ( "fmt" "log" "time" "xmt/xmtmq" ) /* RabbimtMQ publish 模式 案例 应用场景:邮件群发,群聊天,广播(广告) 生产消息 */ func main() { log.SetFlags(log.Llongfile | log.Ltime | log.Ldate) rbt := &xmtmq.RabbitMQ{ Exchange: "xmtPubEx", MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq", } xmtmq.NewRabbitMQ(rbt) rbt.Init() var index = 0 for { rbt.PublishMsg([]byte(fmt.Sprintf("hello wolrd %d ", index))) log.Println("发送成功 ", index) index++ time.Sleep(1 * time.Second) } xmtmq.RabbitMQFree(rbt) }
consumer.go
package main import ( "log" "xmt/xmtmq" ) func main() { log.SetFlags(log.Llongfile | log.Ltime | log.Ldate) rbt := &xmtmq.RabbitMQ{ Exchange: "xmtPubEx", MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq", } xmtmq.NewRabbitMQ(rbt) rbt.SubscribeMsg() xmtmq.RabbitMQFree(rbt) }
执行的操作和上述保持一致
终端1:go run main.go
终端2:go run consumer.go
终端3:go run consumer.go
效果和上述single
模式和 work
模式的明显区别是:发布订阅模式的案例,生产者生产的消息,对应的消费者消费其生产的内容
routing 模式
消息生产者将消息发送给交换机按照路由判断,路由是字符串 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息
**应用场景:**从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中业务场景,例如处理错误,处理特定消息等
生产者处理流程:
声明队列并声明交换机 -> 创建连接 -> 创建通道 -> 通道声明交换机 -> 通道声明队列 -> 通过通道使队列绑定到交换机并指定该队列的routingkey(通配符) -> 制定消息 -> 发送消息并指定routingkey(通配符)
消费者处理流程:
声明队列并声明交换机 -> 创建连接 -> 创建通道 -> 通道声明交换机 -> 通道声明队列 -> 通过通道使队列绑定到交换机并指定routingkey(通配符) -> 重写消息消费方法 -> 执行消息方法
目录结构如下:
. ├── consumer2.go ├── consumer.go ├── go.mod ├── go.sum ├── main.go └── xmtmq └── xmtmq.go
xmtmq.go
- 用到交换机 为
direct
类型 - 用到路由键
package xmtmq import ( "github.com/streadway/amqp" "log" ) // routing 模式 // 定义 RabbitMQ 的数据结构 // go get github.com/streadway/amqp type RabbitMQ struct { conn *amqp.Connection // 连接 channel *amqp.Channel // 通道 QueueName string // 队列名 Exchange string // 交换机 Key string // 路由键 MQUrl string // MQ的虚拟机地址 } // New 一个 RabbitMQ func NewRabbitMQ(rbt *RabbitMQ) { if rbt == nil || rbt.Exchange == "" || rbt.QueueName == "" || rbt.Key == "" || rbt.MQUrl == "" { log.Panic("please check Exchange,,QueueName,Key,MQUrl ...") } conn, err := amqp.Dial(rbt.MQUrl) if err != nil { log.Panicf("amqp.Dial error : %v", err) } rbt.conn = conn channel, err := rbt.conn.Channel() if err != nil { log.Panicf("rbt.conn.Channel error : %v", err) } rbt.channel = channel } func RabbitMQFree(rbt *RabbitMQ) { if rbt == nil { log.Printf("rbt is nil,free failed") return } rbt.channel.Close() rbt.conn.Close() } func (rbt *RabbitMQ) Init() { // 1、创建交换机 err := rbt.channel.ExchangeDeclare( rbt.Exchange, // 交换机 amqp.ExchangeDirect, // 交换机类型 true, // 是否持久化 false, //是否自动删除 false, //true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定 false, // 是否阻塞 nil, // 其他属性 ) if err != nil { log.Printf("rbt.channel.ExchangeDeclare error : %v", err) return } // 2、创建队列 _, err = rbt.channel.QueueDeclare( rbt.QueueName, // 此处我们传入的是空,则是随机产生队列的名称 true, false, false, false, nil, ) if err != nil { log.Printf("rbt.channel.QueueDeclare error : %v", err) return } // 3、绑定队列 err = rbt.channel.QueueBind( rbt.QueueName, // 队列名字 rbt.Key, // routing,这里key 需要填 rbt.Exchange, // 交换机名称 false, // 是否阻塞 nil, // 其他属性 ) if err != nil { log.Printf("rbt.channel.QueueBind error : %v", err) return } } // 生产消息 publish func (rbt *RabbitMQ) ProduceRouting(data []byte) { // 1、向队列中加入数据 err := rbt.channel.Publish( rbt.Exchange, // 交换机 rbt.Key, // key false, // 若为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者 false, // 若为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者 amqp.Publishing{ ContentType: "text/plain", Body: data, }, ) if err != nil { log.Printf("rbt.channel.Publish error : %v", err) return } return } // 消费消息 func (rbt *RabbitMQ) ConsumeRoutingMsg() { // 4、消费数据 msg, err := rbt.channel.Consume( rbt.QueueName, // 队列名 "", // 消费者的名字 true, // 是否自动应答 false, // 是否排他 false, // 若为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者 false, // 是否阻塞 nil, // 其他属性 ) if err != nil { log.Printf("rbt.channel.Consume error : %v", err) return } for data := range msg { log.Printf("received data is %v", string(data.Body)) } }
main.go
package main import ( "fmt" "log" "time" "xmt/xmtmq" ) /* RabbimtMQ routing 模式 案例 应用场景:从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中业务场景,例如处理错误,处理特定消息等 生产消息 */ func main() { log.SetFlags(log.Llongfile | log.Ltime | log.Ldate) rbt1 := &xmtmq.RabbitMQ{ Exchange: "xmtPubEx2", Key: "xmt1", QueueName: "Routingqueuexmt1", MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq", } xmtmq.NewRabbitMQ(rbt1) rbt1.Init() rbt2 := &xmtmq.RabbitMQ{ Exchange: "xmtPubEx2", Key: "xmt2", QueueName: "Routingqueuexmt2", MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq", } xmtmq.NewRabbitMQ(rbt2) rbt2.Init() var index = 0 for { rbt1.ProduceRouting([]byte(fmt.Sprintf("hello wolrd xmt1 %d ", index))) log.Println("发送成功xmt1 ", index) rbt2.ProduceRouting([]byte(fmt.Sprintf("hello wolrd xmt2 %d ", index))) log.Println("发送成功xmt2 ", index) index++ time.Sleep(1 * time.Second) } xmtmq.RabbitMQFree(rbt1) xmtmq.RabbitMQFree(rbt2) }
consumer.go
package main import ( "log" "xmt/xmtmq" ) func main() { log.SetFlags(log.Llongfile | log.Ltime | log.Ldate) rbt := &xmtmq.RabbitMQ{ Exchange: "xmtPubEx2", Key: "xmt1", QueueName: "Routingqueuexmt1", MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq", } xmtmq.NewRabbitMQ(rbt) rbt.ConsumeRoutingMsg() xmtmq.RabbitMQFree(rbt) }
consumer2.go
package main import ( "log" "xmt/xmtmq" ) func main() { log.SetFlags(log.Llongfile | log.Ltime | log.Ldate) rbt := &xmtmq.RabbitMQ{ Exchange: "xmtPubEx2", Key: "xmt2", QueueName: "Routingqueuexmt2", MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq", } xmtmq.NewRabbitMQ(rbt) rbt.ConsumeRoutingMsg() xmtmq.RabbitMQFree(rbt) }
topic 模式
话题模式,一个消息被多个消费者获取,消息的目标 queue 可用 BindingKey
的通配符
Topics 模式实际上是路由模式的一种
他俩的最大的区别是 : Topics 模式发送消息和消费消息的时候是通过通配符去进行匹配的
- *号代表可以同通配一个单词
- #号代表可以通配零个或多个单词
编码的案例与上述 routing 模式保持一直,只是 exchange 为 topic
类型
如下是上述几种模式涉及到的交换机
和队列
rpc 模式
RPC
远程过程调用,客户端远程调用服务端的方法 ,使用 MQ
可以实现 RPC
的异步调用
目录结构为:
. ├── consumer.go ├── go.mod ├── go.sum ├── main.go └── xmtmq └── xmtmq.go
- 客户端即是生产者也是消费者,向
RPC
请求队列发送RPC
调用消息,同时监听RPC
响应队列 - 服务端监听
RPC
请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果 - 服务端将
RPC
方法 的结果发送到RPC
响应队列。 - 客户端监听
RPC
响应队列,接收到RPC
调用结果
xmtmq.go
package xmtmq import ( "github.com/streadway/amqp" "log" "math/rand" ) // rpc 模式 // 定义 RabbitMQ 的数据结构 // go get github.com/streadway/amqp type RabbitMQ struct { conn *amqp.Connection // 连接 channel *amqp.Channel // 通道 QueueName string // 队列名 Exchange string // 交换机 Key string // 路由键 MQUrl string // MQ的虚拟机地址 } // New 一个 RabbitMQ func NewRabbitMQ(rbt *RabbitMQ) { if rbt == nil || rbt.QueueName == "" || rbt.MQUrl == "" { log.Panic("please check QueueName,Exchange,MQUrl ...") } conn, err := amqp.Dial(rbt.MQUrl) if err != nil { log.Panicf("amqp.Dial error : %v", err) } rbt.conn = conn channel, err := rbt.conn.Channel() if err != nil { log.Panicf("rbt.conn.Channel error : %v", err) } rbt.channel = channel } func RabbitMQFree(rbt *RabbitMQ) { if rbt == nil { log.Printf("rbt is nil,free failed") return } rbt.channel.Close() rbt.conn.Close() } // 生产消息 func (rbt *RabbitMQ) Produce(data []byte) { // 申请队列 q, err := rbt.channel.QueueDeclare( rbt.QueueName, // 队列名 true, // 是否持久化 false, // 是否自动删除 false, // 是否排他 false, // 是否阻塞 nil, // 其他参数 ) if err != nil { log.Printf("rbt.channel.QueueDeclare error : %v", err) return } err = rbt.channel.Qos(1, 0, false) if err != nil { log.Printf("rbt.channel.Qos error : %v", err) return } d, err := rbt.channel.Consume( q.Name, "", false, false, false, false, nil) if err != nil { log.Printf("rbt.channel.Consume error : %v", err) return } for msg := range d { log.Println("received msg is ", string(msg.Body)) err := rbt.channel.Publish( "", msg.ReplyTo, false, false, amqp.Publishing{ ContentType: "test/plain", CorrelationId: msg.CorrelationId, Body: data, }) if err != nil { log.Printf("rbt.channel.Publish error : %v", err) return } msg.Ack(false) log.Println("svr response ok ") } return } func randomString(l int) string { bytes := make([]byte, l) for i := 0; i < l; i++ { bytes[i] = byte(rand.Intn(l)) } return string(bytes) } // 消费消息 func (rbt *RabbitMQ) Consume() { // 申请队列 q, err := rbt.channel.QueueDeclare( "", // 队列名 true, // 是否持久化 false, // 是否自动删除 false, // 是否排他 false, // 是否阻塞 nil, // 其他参数 ) if err != nil { log.Printf("rbt.channel.QueueDeclare error : %v", err) return } // 消费数据 msg, err := rbt.channel.Consume( q.Name, // 队列名 "xmt", // 消费者的名字 true, // 是否自动应答 false, // 是否排他 false, // 若为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者 false, // 是否阻塞 nil, // 其他属性 ) if err != nil { log.Printf("rbt.channel.Consume error : %v", err) return } id := randomString(32) err = rbt.channel.Publish( "", rbt.QueueName, false, false, amqp.Publishing{ ContentType: "test/plain", CorrelationId: id, ReplyTo: q.Name, Body: []byte("321"), }) if err != nil { log.Printf("rbt.channel.Publish error : %v", err) return } for data := range msg { log.Printf("received data is %v", string(data.Body)) } }
main.go
package main import ( "fmt" "log" "xmt/xmtmq" ) /* RabbimtMQ rpc 模式 案例 应用场景:简单消息队列的使用,一个生产者一个消费者 生产消息 */ func main() { log.SetFlags(log.Llongfile | log.Ltime | log.Ldate) rbt := &xmtmq.RabbitMQ{ QueueName: "xmtqueue", MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq", } xmtmq.NewRabbitMQ(rbt) rbt.Produce([]byte(fmt.Sprintf("hello wolrd"))) }
consumer.go
package main import ( "log" "math/rand" "time" "xmt/xmtmq" ) func main() { log.SetFlags(log.Llongfile | log.Ltime | log.Ldate) rand.Seed(time.Now().UTC().UnixNano()) rbt := &xmtmq.RabbitMQ{ QueueName: "xmtqueue", MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq", } xmtmq.NewRabbitMQ(rbt) rbt.Consume() }
咱们先运行消费者,多运行几个,可以看到咱们的队列中已经有数据了,咱们运行的是2个消费者,因此此处是 2
再运行生产者,就能看到生产者将消费者发送的消息消费掉,并且通过 CorrelationId
找到对应消费者监听的队列,将数据发送到队列中
消费者监听的队列有数据了,消费者就取出来进行消费
总结
RabbitMQ
的六种工作模式:
- single 模式
- work 模式
- publish / subscribe 模式
- routing 模式
- topic 模式
- rpc 模式
参考资料:
欢迎点赞,关注,收藏
朋友们,你的支持和鼓励,是我坚持分享,提高质量的动力
好了,本次就到这里
技术是开放的,我们的心态,更应是开放的。拥抱变化,向阳而生,努力向前行。
我是小魔童哪吒,欢迎点赞关注收藏,下次见~