go rabbitmq 使用教程 ,go rabbitmq 简单队列,go rabbitmq work模式,go rabbitmq 订阅模式

简介: go rabbitmq 使用教程 ,go rabbitmq 简单队列,go rabbitmq work模式,go rabbitmq 订阅模式

使用Go的过程记录了全部的rabbitmq的go代码,方便自己下次Copy,go的资料比较少,seo估计很好做,流量速度过来。

【一】.简单队列.生产者将消息发送到队列,消费者从队列中获取消息。

1.0.connection code

func NewRabbitMQ() *amqp.Channel {

// 获取connection

amqUrl := "amqp://admin:elecfans@spiderqueue.elecfans.net:5672/"

connection, err := amqp.Dial(amqUrl)

if err != nil {

panic(fmt.Sprintf("获取connection异常:%s\n", err))

}


// 获取channel

channel, err := connection.Channel()

if err != nil {

panic(fmt.Sprintf("获取channel异常:%s\n", err))

}


return channel

}

1.1.client code:

// 生产_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 生产_声明队列(不存在自动创建)

queueName := "ic_order_active"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 生产_发送消息到队列

message := "ic元器件活动来新单啦"

err = channel.Publish(

   // 交换机

   "",

   // 队列名称

   queueName,

   // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃

   false,

   // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列

   false,

   amqp.Publishing{

       ContentType: "text/plain",

       // 队列和消息同时设置持久化

       DeliveryMode: 2,

       Body:         []byte(message),

   },

)

if err != nil {

   fmt.Printf("发送消息到队列异常:%s", err)

   return

}

1.2.service code

// 消费_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 消费_声明队列

queueName := "ic_order_active"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 消费_获取队列中的消息

message, err := channel.Consume(

   // 队列名称

   queueName,

   // 消费者名称

   "ic订单消费者",

   // 是否自动ack

   false,

   // 是否排他性队列标识

   false,

   false,

   false,

   nil,

)

if err != nil {

   return

}


// 输出消息

for msg := range message {

   // 打印消息内容

   fmt.Printf("收到队列消息%s \n", msg.Body)

   // 确认收到消息

   msg.Ack(true)

}

【二】.Work模式.一个生产者,多个消费者,一个消息只能被一个消费者获取到

2.0.client code

// 生产_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 生产_声明队列(不存在自动创建)

queueName := "ic_order_active"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 生产_发送消息到队列

message := "ic元器件活动来新单啦,订单id"

messageSize := 10

for i := 0; i < messageSize; i++ {

   // 方便观察消费者

   time.Sleep(time.Second * 1)

   err = channel.Publish(

       // 交换机

       "",

       // 队列名称

       queueName,

       // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃

       false,

       // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列

       false,

       amqp.Publishing{

           ContentType: "text/plain",

           // 队列和消息同时设置持久化

           DeliveryMode: 2,

           Body:         []byte(message + strconv.Itoa(i)),

       },

   )

   if err != nil {

       fmt.Printf("发送消息到队列异常:%s", err)

       return

   }

}

2.1.service code

// 消费_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 消费_声明队列

queueName := "ic_order_active"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 设置同一时间服务器只会发送一条消息给消费者

channel.Qos(

   // 每次获取多少条

   10,

   // 预加载数量(rabbitMq不支持)

   0,

   // false->对当前队列可用 true->对channel可用(rabbitMq不支持)

   false,

)


// 消费_获取队列中的消息

message, err := channel.Consume(

   // 队列名称

   queueName,

   // 消费者名称

   "ic订单消费者",

   // 是否自动ack

   false,

   // 是否排他性队列标识

   false,

   false,

   false,

   nil,

)

if err != nil {

   return

}


// 输出消息

for msg := range message {

   // 打印消息内容

   fmt.Printf("收到队列消息%s \n", msg.Body)

   // 确认收到消息

   msg.Ack(true)

}

【三】.订阅模式(fanout).

一个生产者,多个消费者

每个消费者拥有自己的队列

生产者将消息发送到交换机

每个队列自己去绑定交换机

(交换机没有储存能力,发送到没有任何队列绑定的交换机则消息丢失)

3.0.client code

// 生产_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 生产_声明交换机

exchangeName := "notice"

err := channel.ExchangeDeclare(

   // 交换机名称

   exchangeName,

   // 交换机类型

   "fanout",

   // 持久化

   true,

   // true->当所有绑定都与交换器解绑后,会自动删除此交换器

   false,

   // true->客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器

   false,

   // 是否非阻塞

   false,

   // 其他参数

   nil,

)

if err != nil {

   fmt.Printf("声明交换机异常:%s", err)

   return

}


// 生产_发送消息到交换机

message := "最新消息,华秋全场元器件3折起"

messageSize := 10

for i := 0; i < messageSize; i++ {

   // 方便观察消费者

   time.Sleep(time.Second * 1)

   err = channel.Publish(

       // 交换机

       exchangeName,

       // 路由key

       "",

       // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃

       false,

       // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列

       false,

       amqp.Publishing{

           ContentType: "text/plain",

           // 队列和消息同时设置持久化

           DeliveryMode: 2,

           Body:         []byte(message + strconv.Itoa(i)),

       },

   )

   if err != nil {

       fmt.Printf("发送消息到队列异常:%s", err)

       return

   }

}

【四】.直接匹配(direct)

4.0.client code

// 生产_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 生产_声明交换机

exchangeName := "pcb_layout_order"

err := channel.ExchangeDeclare(

   // 交换机名称

   exchangeName,

   // 交换机类型

   "direct",

   // 持久化

   true,

   // true->当所有绑定都与交换器解绑后,会自动删除此交换器

   false,

   // true->客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器

   false,

   // 是否非阻塞

   false,

   // 其他参数

   nil,

)

if err != nil {

   fmt.Printf("声明交换机异常:%s", err)

   return

}


// 生产_发送消息到交换机

allRouteKey := []string{

   "order_insert", // 新增订单

   "order_delete", // 删除订单

}


// 循环发送到两个路由key

message := "订单id1事件"

for _, routeKey := range allRouteKey {

   err = channel.Publish(

       // 交换机

       exchangeName,

       // 路由key

       routeKey,

       // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃

       false,

       // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列

       false,

       amqp.Publishing{

           ContentType: "text/plain",

           // 队列和消息同时设置持久化

           DeliveryMode: 2,

           Body:         []byte(message),

       },

   )

}

4.1.service code

// 消费_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 消费_声明队列

queueName := "notice_queue"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 队列绑定交换机+绑定订单新增key

exchangeName := "pcb_layout_order"

allRouteKey := []string{

   "order_insert", // 新增订单

   "order_delete", // 删除订单

}

for _, routeKey := range allRouteKey {

   channel.QueueBind(

       // 队列名称

       queueName,

       // 绑定的键

       routeKey,

       // 交换机名称

       exchangeName,

       // 是否阻塞处理

       false,

       // 其他参数

       nil,

   )

}


// 设置同一时间服务器只会发送一条消息给消费者

channel.Qos(

   // 每次获取多少条

   10,

   // 预加载数量(rabbitMq不支持)

   0,

   // false->对当前队列可用 true->对channel可用(rabbitMq不支持)

   false,

)


// 消费_获取队列中的消息

message, err := channel.Consume(

   // 队列名称

   queueName,

   // 消费者名称

   "ic订单消费者",

   // 是否自动ack

   false,

   // 是否排他性队列标识

   false,

   false,

   false,

   nil,

)

if err != nil {

   return

}

// 输出消息

for msg := range message {

   // 打印消息内容

   fmt.Printf("收到队列消息%s \n", msg.Body)

   // 确认收到消息

   msg.Ack(true)

}

【五】.直接匹配(topic)

topic同样根据key匹配到队列,#匹配一个或者多个,*匹配一个.(切记:发往topic交换器的routing_key它必须是.分隔的几个词)

5.0.client code

// 生产_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 生产_声明交换机

exchangeName := "smt_steel_order"

err := channel.ExchangeDeclare(

   // 交换机名称

   exchangeName,

   // 交换机类型

   "topic",

   // 持久化

   true,

   // true->当所有绑定都与交换器解绑后,会自动删除此交换器

   false,

   // true->客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器

   false,

   // 是否非阻塞

   false,

   // 其他参数

   nil,

)

if err != nil {

   fmt.Printf("声明交换机异常:%s", err)

   return

}


// 生产_发送消息到交换机

allRouteKey := []string{

   "order.insert", // 新增订单

   "order.delete", // 删除订单

}

for _, routeKey := range allRouteKey {

   //fmt.Print(routeKey)

   message := "来自" + routeKey + "的消息"

   err = channel.Publish(

       // 交换机

       exchangeName,

       // 路由key

       routeKey,

       // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃

       true,

       // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列

       false,

       amqp.Publishing{

           ContentType: "text/plain",

           // 队列和消息同时设置持久化

           DeliveryMode: 2,

           Body:         []byte(message),

       },

   )

}

5.1.service code

// 消费_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 消费_声明队列

queueName := "notice_queue"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 队列绑定交换机+绑定订单新增key

exchangeName := "smt_steel_order"

routeKey := "order.#"

channel.QueueBind(

   // 队列名称

   queueName,

   // 绑定的路由

   routeKey,

   // 交换机名称

   exchangeName,

   // 是否阻塞处理

   false,

   // 其他参数

   nil,

)


// 设置同一时间服务器只会发送一条消息给消费者

channel.Qos(

   // 每次获取多少条

   10,

   // 预加载数量(rabbitMq不支持)

   0,

   // false->对当前队列可用 true->对channel可用(rabbitMq不支持)

   false,

)


// 消费_获取队列中的消息

message, err := channel.Consume(

   // 队列名称

   queueName,

   // 消费者名称

   "smt订单消费者",

   // 是否自动ack

   false,

   // 是否排他性队列标识

   false,

   false,

   false,

   nil,

)

if err != nil {

   return

}


// 输出消息

for msg := range message {

   // 打印消息内容

   fmt.Printf("收到队列消息%s \n", msg.Body)

   // 确认收到消息

   msg.Ack(true)

}

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4月前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
9天前
|
消息中间件 网络协议 RocketMQ
RocketMQ Controller 模式 始终更新成本机ip
ontrollerAddr=192.168.24.241:8878 但是日志输出Update controller leader address to 127.0.0.1:8878。导致访问失败
40 3
|
3月前
|
消息中间件 存储 JSON
rabbitmq基础教程(ui,java,springamqp)
本文提供了RabbitMQ的基础教程,包括如何使用UI创建队列和交换机、Java代码操作RabbitMQ、Spring AMQP进行消息发送和接收,以及如何使用不同的交换机类型(fanout、direct、topic)进行消息路由。
38 0
rabbitmq基础教程(ui,java,springamqp)
|
5月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
87 2
|
5月前
|
消息中间件
RabbitMQ广播模式
RabbitMQ广播模式
88 1
|
5月前
|
网络协议 物联网 测试技术
App Inventor 2 MQTT拓展入门(保姆级教程)
本文演示的是App和一个测试客户端进行消息交互的案例,实际应用中,我们的测试客户端可以看着是任意的、支持MQTT协议的硬件,通过订阅及发布消息,联网硬件与我们的App进行双向数据通信,以实现万物互联的智能控制效果。
253 2
|
5月前
|
消息中间件 应用服务中间件 网络安全
rabbitMQ镜像模式搭建
rabbitMQ镜像模式搭建
|
5月前
|
消息中间件 监控 Ubuntu
RabbitMQ安装配置,超详细版教程
以上步骤为您提供了在Linux环境下安装RabbitMQ的详细过程。安装Erlang作为基础,然后通过添加官方源并安装RabbitMQ本身,最后对服务进行配置并启用Web管理界面。这些步骤操作简单直观,只需要跟随上述指南,即可在短时间内将RabbitMQ服务器运行起来,并进行进一步的配置和管理。不要忘记硬件和网络资源对性能的影响,确保RabbitMQ能够满足您的应用需求。
313 0
|
6月前
|
消息中间件 传感器 负载均衡
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
|
6月前
|
消息中间件 存储 Kafka
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决