go rabbitmq 使用教程 ,go rabbitmq 简单队列,go rabbitmq work模式,go rabbitmq 订阅模式

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: go rabbitmq 使用教程 ,go rabbitmq 简单队列,go rabbitmq work模式,go rabbitmq 订阅模式

使用Go的过程记录了全部的rabbitmq的go代码,方便自己下次Copy,go的资料比较少,seo估计很好做,流量速度过来。

【一】.简单队列.生产者将消息发送到队列,消费者从队列中获取消息。

1.0.connection code

func NewRabbitMQ() *amqp.Channel {

// 获取connection

amqUrl := "amqp://admin:elecfans@spiderqueue.elecfans.net:5672/"

connection, err := amqp.Dial(amqUrl)

if err != nil {

panic(fmt.Sprintf("获取connection异常:%s\n", err))

}


// 获取channel

channel, err := connection.Channel()

if err != nil {

panic(fmt.Sprintf("获取channel异常:%s\n", err))

}


return channel

}

1.1.client code:

// 生产_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 生产_声明队列(不存在自动创建)

queueName := "ic_order_active"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 生产_发送消息到队列

message := "ic元器件活动来新单啦"

err = channel.Publish(

   // 交换机

   "",

   // 队列名称

   queueName,

   // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃

   false,

   // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列

   false,

   amqp.Publishing{

       ContentType: "text/plain",

       // 队列和消息同时设置持久化

       DeliveryMode: 2,

       Body:         []byte(message),

   },

)

if err != nil {

   fmt.Printf("发送消息到队列异常:%s", err)

   return

}

1.2.service code

// 消费_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 消费_声明队列

queueName := "ic_order_active"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 消费_获取队列中的消息

message, err := channel.Consume(

   // 队列名称

   queueName,

   // 消费者名称

   "ic订单消费者",

   // 是否自动ack

   false,

   // 是否排他性队列标识

   false,

   false,

   false,

   nil,

)

if err != nil {

   return

}


// 输出消息

for msg := range message {

   // 打印消息内容

   fmt.Printf("收到队列消息%s \n", msg.Body)

   // 确认收到消息

   msg.Ack(true)

}

【二】.Work模式.一个生产者,多个消费者,一个消息只能被一个消费者获取到

2.0.client code

// 生产_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 生产_声明队列(不存在自动创建)

queueName := "ic_order_active"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 生产_发送消息到队列

message := "ic元器件活动来新单啦,订单id"

messageSize := 10

for i := 0; i < messageSize; i++ {

   // 方便观察消费者

   time.Sleep(time.Second * 1)

   err = channel.Publish(

       // 交换机

       "",

       // 队列名称

       queueName,

       // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃

       false,

       // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列

       false,

       amqp.Publishing{

           ContentType: "text/plain",

           // 队列和消息同时设置持久化

           DeliveryMode: 2,

           Body:         []byte(message + strconv.Itoa(i)),

       },

   )

   if err != nil {

       fmt.Printf("发送消息到队列异常:%s", err)

       return

   }

}

2.1.service code

// 消费_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 消费_声明队列

queueName := "ic_order_active"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 设置同一时间服务器只会发送一条消息给消费者

channel.Qos(

   // 每次获取多少条

   10,

   // 预加载数量(rabbitMq不支持)

   0,

   // false->对当前队列可用 true->对channel可用(rabbitMq不支持)

   false,

)


// 消费_获取队列中的消息

message, err := channel.Consume(

   // 队列名称

   queueName,

   // 消费者名称

   "ic订单消费者",

   // 是否自动ack

   false,

   // 是否排他性队列标识

   false,

   false,

   false,

   nil,

)

if err != nil {

   return

}


// 输出消息

for msg := range message {

   // 打印消息内容

   fmt.Printf("收到队列消息%s \n", msg.Body)

   // 确认收到消息

   msg.Ack(true)

}

【三】.订阅模式(fanout).

一个生产者,多个消费者

每个消费者拥有自己的队列

生产者将消息发送到交换机

每个队列自己去绑定交换机

(交换机没有储存能力,发送到没有任何队列绑定的交换机则消息丢失)

3.0.client code

// 生产_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 生产_声明交换机

exchangeName := "notice"

err := channel.ExchangeDeclare(

   // 交换机名称

   exchangeName,

   // 交换机类型

   "fanout",

   // 持久化

   true,

   // true->当所有绑定都与交换器解绑后,会自动删除此交换器

   false,

   // true->客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器

   false,

   // 是否非阻塞

   false,

   // 其他参数

   nil,

)

if err != nil {

   fmt.Printf("声明交换机异常:%s", err)

   return

}


// 生产_发送消息到交换机

message := "最新消息,华秋全场元器件3折起"

messageSize := 10

for i := 0; i < messageSize; i++ {

   // 方便观察消费者

   time.Sleep(time.Second * 1)

   err = channel.Publish(

       // 交换机

       exchangeName,

       // 路由key

       "",

       // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃

       false,

       // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列

       false,

       amqp.Publishing{

           ContentType: "text/plain",

           // 队列和消息同时设置持久化

           DeliveryMode: 2,

           Body:         []byte(message + strconv.Itoa(i)),

       },

   )

   if err != nil {

       fmt.Printf("发送消息到队列异常:%s", err)

       return

   }

}

【四】.直接匹配(direct)

4.0.client code

// 生产_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 生产_声明交换机

exchangeName := "pcb_layout_order"

err := channel.ExchangeDeclare(

   // 交换机名称

   exchangeName,

   // 交换机类型

   "direct",

   // 持久化

   true,

   // true->当所有绑定都与交换器解绑后,会自动删除此交换器

   false,

   // true->客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器

   false,

   // 是否非阻塞

   false,

   // 其他参数

   nil,

)

if err != nil {

   fmt.Printf("声明交换机异常:%s", err)

   return

}


// 生产_发送消息到交换机

allRouteKey := []string{

   "order_insert", // 新增订单

   "order_delete", // 删除订单

}


// 循环发送到两个路由key

message := "订单id1事件"

for _, routeKey := range allRouteKey {

   err = channel.Publish(

       // 交换机

       exchangeName,

       // 路由key

       routeKey,

       // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃

       false,

       // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列

       false,

       amqp.Publishing{

           ContentType: "text/plain",

           // 队列和消息同时设置持久化

           DeliveryMode: 2,

           Body:         []byte(message),

       },

   )

}

4.1.service code

// 消费_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 消费_声明队列

queueName := "notice_queue"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 队列绑定交换机+绑定订单新增key

exchangeName := "pcb_layout_order"

allRouteKey := []string{

   "order_insert", // 新增订单

   "order_delete", // 删除订单

}

for _, routeKey := range allRouteKey {

   channel.QueueBind(

       // 队列名称

       queueName,

       // 绑定的键

       routeKey,

       // 交换机名称

       exchangeName,

       // 是否阻塞处理

       false,

       // 其他参数

       nil,

   )

}


// 设置同一时间服务器只会发送一条消息给消费者

channel.Qos(

   // 每次获取多少条

   10,

   // 预加载数量(rabbitMq不支持)

   0,

   // false->对当前队列可用 true->对channel可用(rabbitMq不支持)

   false,

)


// 消费_获取队列中的消息

message, err := channel.Consume(

   // 队列名称

   queueName,

   // 消费者名称

   "ic订单消费者",

   // 是否自动ack

   false,

   // 是否排他性队列标识

   false,

   false,

   false,

   nil,

)

if err != nil {

   return

}

// 输出消息

for msg := range message {

   // 打印消息内容

   fmt.Printf("收到队列消息%s \n", msg.Body)

   // 确认收到消息

   msg.Ack(true)

}

【五】.直接匹配(topic)

topic同样根据key匹配到队列,#匹配一个或者多个,*匹配一个.(切记:发往topic交换器的routing_key它必须是.分隔的几个词)

5.0.client code

// 生产_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 生产_声明交换机

exchangeName := "smt_steel_order"

err := channel.ExchangeDeclare(

   // 交换机名称

   exchangeName,

   // 交换机类型

   "topic",

   // 持久化

   true,

   // true->当所有绑定都与交换器解绑后,会自动删除此交换器

   false,

   // true->客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器

   false,

   // 是否非阻塞

   false,

   // 其他参数

   nil,

)

if err != nil {

   fmt.Printf("声明交换机异常:%s", err)

   return

}


// 生产_发送消息到交换机

allRouteKey := []string{

   "order.insert", // 新增订单

   "order.delete", // 删除订单

}

for _, routeKey := range allRouteKey {

   //fmt.Print(routeKey)

   message := "来自" + routeKey + "的消息"

   err = channel.Publish(

       // 交换机

       exchangeName,

       // 路由key

       routeKey,

       // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃

       true,

       // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列

       false,

       amqp.Publishing{

           ContentType: "text/plain",

           // 队列和消息同时设置持久化

           DeliveryMode: 2,

           Body:         []byte(message),

       },

   )

}

5.1.service code

// 消费_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 消费_声明队列

queueName := "notice_queue"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 队列绑定交换机+绑定订单新增key

exchangeName := "smt_steel_order"

routeKey := "order.#"

channel.QueueBind(

   // 队列名称

   queueName,

   // 绑定的路由

   routeKey,

   // 交换机名称

   exchangeName,

   // 是否阻塞处理

   false,

   // 其他参数

   nil,

)


// 设置同一时间服务器只会发送一条消息给消费者

channel.Qos(

   // 每次获取多少条

   10,

   // 预加载数量(rabbitMq不支持)

   0,

   // false->对当前队列可用 true->对channel可用(rabbitMq不支持)

   false,

)


// 消费_获取队列中的消息

message, err := channel.Consume(

   // 队列名称

   queueName,

   // 消费者名称

   "smt订单消费者",

   // 是否自动ack

   false,

   // 是否排他性队列标识

   false,

   false,

   false,

   nil,

)

if err != nil {

   return

}


// 输出消息

for msg := range message {

   // 打印消息内容

   fmt.Printf("收到队列消息%s \n", msg.Body)

   // 确认收到消息

   msg.Ack(true)

}

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
16天前
|
缓存 NoSQL Go
通过 SingleFlight 模式学习 Go 并发编程
通过 SingleFlight 模式学习 Go 并发编程
|
4月前
|
消息中间件 Java RocketMQ
RocketMQ实战教程之RocketMQ安装
这是一篇关于RocketMQ安装的实战教程,主要介绍了在CentOS系统上使用传统安装和Docker两种方式安装RocketMQ。首先,系统需要是64位,并且已经安装了JDK 1.8。传统安装包括下载安装包,解压并启动NameServer和Broker。Docker安装则涉及安装docker和docker-compose,然后通过docker-compose.yaml文件配置并启动服务。教程还提供了启动命令和解决问题的提示。
|
1月前
|
存储 Unix 测试技术
解释Go中常见的I/O模式
解释Go中常见的I/O模式
|
1月前
|
网络协议 物联网 测试技术
App Inventor 2 MQTT拓展入门(保姆级教程)
本文演示的是App和一个测试客户端进行消息交互的案例,实际应用中,我们的测试客户端可以看着是任意的、支持MQTT协议的硬件,通过订阅及发布消息,联网硬件与我们的App进行双向数据通信,以实现万物互联的智能控制效果。
109 2
|
18天前
|
消息中间件 监控 Ubuntu
RabbitMQ安装配置,超详细版教程
以上步骤为您提供了在Linux环境下安装RabbitMQ的详细过程。安装Erlang作为基础,然后通过添加官方源并安装RabbitMQ本身,最后对服务进行配置并启用Web管理界面。这些步骤操作简单直观,只需要跟随上述指南,即可在短时间内将RabbitMQ服务器运行起来,并进行进一步的配置和管理。不要忘记硬件和网络资源对性能的影响,确保RabbitMQ能够满足您的应用需求。
60 0
|
2月前
|
网络协议 Go
Swoole与Go系列教程之TCP服务的应用
TCP(传输控制协议)的出现是为了解决计算机网络中的数据可靠传输和连接管理的问题。在早期的计算机网络中,特别是在分组交换和互联网的发展初期,网络是不可靠的,存在丢包、错误和延迟等问题。
975 0
Swoole与Go系列教程之TCP服务的应用
|
2月前
|
XML JSON Go
Swoole与Go系列教程之WebSocket服务的应用
在 WebSocket 协议出现之前,Web 应用为了能过获取到实时的数据都是通过不断轮询服务端的接口。轮询的效率、延时很低,并且很耗费资源。
1038 1
Swoole与Go系列教程之WebSocket服务的应用
|
2月前
|
设计模式 Go
Go语言设计模式:使用Option模式简化类的初始化
在Go语言中,面对构造函数参数过多导致的复杂性问题,可以采用Option模式。Option模式通过函数选项提供灵活的配置,增强了构造函数的可读性和可扩展性。以`Foo`为例,通过定义如`WithName`、`WithAge`、`WithDB`等设置器函数,调用者可以选择性地传递所需参数,避免了记忆参数顺序和类型。这种模式提升了代码的维护性和灵活性,特别是在处理多配置场景时。
58 8
|
1月前
|
缓存 算法 Go