go rabbitmq 使用教程 ,go rabbitmq 简单队列,go rabbitmq work模式,go rabbitmq 订阅模式

简介: go rabbitmq 使用教程 ,go rabbitmq 简单队列,go rabbitmq work模式,go rabbitmq 订阅模式

使用Go的过程记录了全部的rabbitmq的go代码,方便自己下次Copy,go的资料比较少,seo估计很好做,流量速度过来。

【一】.简单队列.生产者将消息发送到队列,消费者从队列中获取消息。

1.0.connection code

func NewRabbitMQ() *amqp.Channel {

// 获取connection

amqUrl := "amqp://admin:elecfans@spiderqueue.elecfans.net:5672/"

connection, err := amqp.Dial(amqUrl)

if err != nil {

panic(fmt.Sprintf("获取connection异常:%s\n", err))

}


// 获取channel

channel, err := connection.Channel()

if err != nil {

panic(fmt.Sprintf("获取channel异常:%s\n", err))

}


return channel

}

1.1.client code:

// 生产_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 生产_声明队列(不存在自动创建)

queueName := "ic_order_active"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 生产_发送消息到队列

message := "ic元器件活动来新单啦"

err = channel.Publish(

   // 交换机

   "",

   // 队列名称

   queueName,

   // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃

   false,

   // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列

   false,

   amqp.Publishing{

       ContentType: "text/plain",

       // 队列和消息同时设置持久化

       DeliveryMode: 2,

       Body:         []byte(message),

   },

)

if err != nil {

   fmt.Printf("发送消息到队列异常:%s", err)

   return

}

1.2.service code

// 消费_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 消费_声明队列

queueName := "ic_order_active"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 消费_获取队列中的消息

message, err := channel.Consume(

   // 队列名称

   queueName,

   // 消费者名称

   "ic订单消费者",

   // 是否自动ack

   false,

   // 是否排他性队列标识

   false,

   false,

   false,

   nil,

)

if err != nil {

   return

}


// 输出消息

for msg := range message {

   // 打印消息内容

   fmt.Printf("收到队列消息%s \n", msg.Body)

   // 确认收到消息

   msg.Ack(true)

}

【二】.Work模式.一个生产者,多个消费者,一个消息只能被一个消费者获取到

2.0.client code

// 生产_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 生产_声明队列(不存在自动创建)

queueName := "ic_order_active"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 生产_发送消息到队列

message := "ic元器件活动来新单啦,订单id"

messageSize := 10

for i := 0; i < messageSize; i++ {

   // 方便观察消费者

   time.Sleep(time.Second * 1)

   err = channel.Publish(

       // 交换机

       "",

       // 队列名称

       queueName,

       // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃

       false,

       // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列

       false,

       amqp.Publishing{

           ContentType: "text/plain",

           // 队列和消息同时设置持久化

           DeliveryMode: 2,

           Body:         []byte(message + strconv.Itoa(i)),

       },

   )

   if err != nil {

       fmt.Printf("发送消息到队列异常:%s", err)

       return

   }

}

2.1.service code

// 消费_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 消费_声明队列

queueName := "ic_order_active"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 设置同一时间服务器只会发送一条消息给消费者

channel.Qos(

   // 每次获取多少条

   10,

   // 预加载数量(rabbitMq不支持)

   0,

   // false->对当前队列可用 true->对channel可用(rabbitMq不支持)

   false,

)


// 消费_获取队列中的消息

message, err := channel.Consume(

   // 队列名称

   queueName,

   // 消费者名称

   "ic订单消费者",

   // 是否自动ack

   false,

   // 是否排他性队列标识

   false,

   false,

   false,

   nil,

)

if err != nil {

   return

}


// 输出消息

for msg := range message {

   // 打印消息内容

   fmt.Printf("收到队列消息%s \n", msg.Body)

   // 确认收到消息

   msg.Ack(true)

}

【三】.订阅模式(fanout).

一个生产者,多个消费者

每个消费者拥有自己的队列

生产者将消息发送到交换机

每个队列自己去绑定交换机

(交换机没有储存能力,发送到没有任何队列绑定的交换机则消息丢失)

3.0.client code

// 生产_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 生产_声明交换机

exchangeName := "notice"

err := channel.ExchangeDeclare(

   // 交换机名称

   exchangeName,

   // 交换机类型

   "fanout",

   // 持久化

   true,

   // true->当所有绑定都与交换器解绑后,会自动删除此交换器

   false,

   // true->客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器

   false,

   // 是否非阻塞

   false,

   // 其他参数

   nil,

)

if err != nil {

   fmt.Printf("声明交换机异常:%s", err)

   return

}


// 生产_发送消息到交换机

message := "最新消息,华秋全场元器件3折起"

messageSize := 10

for i := 0; i < messageSize; i++ {

   // 方便观察消费者

   time.Sleep(time.Second * 1)

   err = channel.Publish(

       // 交换机

       exchangeName,

       // 路由key

       "",

       // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃

       false,

       // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列

       false,

       amqp.Publishing{

           ContentType: "text/plain",

           // 队列和消息同时设置持久化

           DeliveryMode: 2,

           Body:         []byte(message + strconv.Itoa(i)),

       },

   )

   if err != nil {

       fmt.Printf("发送消息到队列异常:%s", err)

       return

   }

}

【四】.直接匹配(direct)

4.0.client code

// 生产_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 生产_声明交换机

exchangeName := "pcb_layout_order"

err := channel.ExchangeDeclare(

   // 交换机名称

   exchangeName,

   // 交换机类型

   "direct",

   // 持久化

   true,

   // true->当所有绑定都与交换器解绑后,会自动删除此交换器

   false,

   // true->客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器

   false,

   // 是否非阻塞

   false,

   // 其他参数

   nil,

)

if err != nil {

   fmt.Printf("声明交换机异常:%s", err)

   return

}


// 生产_发送消息到交换机

allRouteKey := []string{

   "order_insert", // 新增订单

   "order_delete", // 删除订单

}


// 循环发送到两个路由key

message := "订单id1事件"

for _, routeKey := range allRouteKey {

   err = channel.Publish(

       // 交换机

       exchangeName,

       // 路由key

       routeKey,

       // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃

       false,

       // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列

       false,

       amqp.Publishing{

           ContentType: "text/plain",

           // 队列和消息同时设置持久化

           DeliveryMode: 2,

           Body:         []byte(message),

       },

   )

}

4.1.service code

// 消费_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 消费_声明队列

queueName := "notice_queue"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 队列绑定交换机+绑定订单新增key

exchangeName := "pcb_layout_order"

allRouteKey := []string{

   "order_insert", // 新增订单

   "order_delete", // 删除订单

}

for _, routeKey := range allRouteKey {

   channel.QueueBind(

       // 队列名称

       queueName,

       // 绑定的键

       routeKey,

       // 交换机名称

       exchangeName,

       // 是否阻塞处理

       false,

       // 其他参数

       nil,

   )

}


// 设置同一时间服务器只会发送一条消息给消费者

channel.Qos(

   // 每次获取多少条

   10,

   // 预加载数量(rabbitMq不支持)

   0,

   // false->对当前队列可用 true->对channel可用(rabbitMq不支持)

   false,

)


// 消费_获取队列中的消息

message, err := channel.Consume(

   // 队列名称

   queueName,

   // 消费者名称

   "ic订单消费者",

   // 是否自动ack

   false,

   // 是否排他性队列标识

   false,

   false,

   false,

   nil,

)

if err != nil {

   return

}

// 输出消息

for msg := range message {

   // 打印消息内容

   fmt.Printf("收到队列消息%s \n", msg.Body)

   // 确认收到消息

   msg.Ack(true)

}

【五】.直接匹配(topic)

topic同样根据key匹配到队列,#匹配一个或者多个,*匹配一个.(切记:发往topic交换器的routing_key它必须是.分隔的几个词)

5.0.client code

// 生产_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 生产_声明交换机

exchangeName := "smt_steel_order"

err := channel.ExchangeDeclare(

   // 交换机名称

   exchangeName,

   // 交换机类型

   "topic",

   // 持久化

   true,

   // true->当所有绑定都与交换器解绑后,会自动删除此交换器

   false,

   // true->客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器

   false,

   // 是否非阻塞

   false,

   // 其他参数

   nil,

)

if err != nil {

   fmt.Printf("声明交换机异常:%s", err)

   return

}


// 生产_发送消息到交换机

allRouteKey := []string{

   "order.insert", // 新增订单

   "order.delete", // 删除订单

}

for _, routeKey := range allRouteKey {

   //fmt.Print(routeKey)

   message := "来自" + routeKey + "的消息"

   err = channel.Publish(

       // 交换机

       exchangeName,

       // 路由key

       routeKey,

       // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃

       true,

       // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列

       false,

       amqp.Publishing{

           ContentType: "text/plain",

           // 队列和消息同时设置持久化

           DeliveryMode: 2,

           Body:         []byte(message),

       },

   )

}

5.1.service code

// 消费_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 消费_声明队列

queueName := "notice_queue"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 队列绑定交换机+绑定订单新增key

exchangeName := "smt_steel_order"

routeKey := "order.#"

channel.QueueBind(

   // 队列名称

   queueName,

   // 绑定的路由

   routeKey,

   // 交换机名称

   exchangeName,

   // 是否阻塞处理

   false,

   // 其他参数

   nil,

)


// 设置同一时间服务器只会发送一条消息给消费者

channel.Qos(

   // 每次获取多少条

   10,

   // 预加载数量(rabbitMq不支持)

   0,

   // false->对当前队列可用 true->对channel可用(rabbitMq不支持)

   false,

)


// 消费_获取队列中的消息

message, err := channel.Consume(

   // 队列名称

   queueName,

   // 消费者名称

   "smt订单消费者",

   // 是否自动ack

   false,

   // 是否排他性队列标识

   false,

   false,

   false,

   nil,

)

if err != nil {

   return

}


// 输出消息

for msg := range message {

   // 打印消息内容

   fmt.Printf("收到队列消息%s \n", msg.Body)

   // 确认收到消息

   msg.Ack(true)

}

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
人工智能 安全 算法
Go入门实战:并发模式的使用
本文详细探讨了Go语言的并发模式,包括Goroutine、Channel、Mutex和WaitGroup等核心概念。通过具体代码实例与详细解释,介绍了这些模式的原理及应用。同时分析了未来发展趋势与挑战,如更高效的并发控制、更好的并发安全及性能优化。Go语言凭借其优秀的并发性能,在现代编程中备受青睐。
430 33
|
消息中间件 监控 数据挖掘
【有奖实践】轻量消息队列(原 MNS)订阅 OSS 事件实时处理文件变动
当你需要对对象存储 OSS(Object Storage Service)中的文件变动进行实时处理、同步、监听、业务触发、日志记录等操作时,你可以通过设置 OSS 的事件通知规则,自定义关注的文件,并将 OSS 事件推送到轻量消息队列(原 MNS)的队列或主题中,开发者的服务即可及时收到相关通知,并通过消费消息进行后续的业务处理。
359 104
|
12月前
|
消息中间件 监控 Docker
Docker环境下快速部署RabbitMQ教程。
就这样,你成功地用魔法召唤出了RabbitMQ,还把它和你的应用程序连接了起来。现在,消息会像小溪流水一样,在你的系统中自由流淌。别忘了,兔子们不喜欢孤独,他们需要你细心的关怀,不时地监控它们,确保他们的世界运转得井井有条。
711 18
|
缓存 NoSQL Go
通过 SingleFlight 模式学习 Go 并发编程
通过 SingleFlight 模式学习 Go 并发编程
|
监控 Linux PHP
【02】客户端服务端C语言-go语言-web端PHP语言整合内容发布-优雅草网络设备监控系统-2月12日优雅草简化Centos stream8安装zabbix7教程-本搭建教程非docker搭建教程-优雅草solution
【02】客户端服务端C语言-go语言-web端PHP语言整合内容发布-优雅草网络设备监控系统-2月12日优雅草简化Centos stream8安装zabbix7教程-本搭建教程非docker搭建教程-优雅草solution
559 20
|
消息中间件 对象存储
轻量消息队列(原 MNS)订阅 OSS 事件实践
使用轻量消息队列订阅OSS事件,实时处理文件变动,赢取ins风U型枕(限量500个)。访问活动页面,完成实操并上传截图即可参与领奖。活动时间:即日起至2025年2月28日16:00。奖品数量有限,先到先得,快来报名吧!
376 2
|
消息中间件 存储 JSON
rabbitmq基础教程(ui,java,springamqp)
本文提供了RabbitMQ的基础教程,包括如何使用UI创建队列和交换机、Java代码操作RabbitMQ、Spring AMQP进行消息发送和接收,以及如何使用不同的交换机类型(fanout、direct、topic)进行消息路由。
289 0
rabbitmq基础教程(ui,java,springamqp)
|
消息中间件 测试技术
通过轻量消息队列(原MNS)主题HTTP订阅+ARMS实现自定义数据多渠道告警
轻量消息队列(原MNS)以其简单队列模型、轻量化协议及按量后付费模式,成为阿里云产品间消息传输首选。本文通过创建主题、订阅、配置告警集成等步骤,展示了该产品在实际应用中的部分功能,确保消息的可靠传输。
442 2
|
Go 调度 开发者
探索Go语言中的并发模式:goroutine与channel
在本文中,我们将深入探讨Go语言中的核心并发特性——goroutine和channel。不同于传统的并发模型,Go语言的并发机制以其简洁性和高效性著称。本文将通过实际代码示例,展示如何利用goroutine实现轻量级的并发执行,以及如何通过channel安全地在goroutine之间传递数据。摘要部分将概述这些概念,并提示读者本文将提供哪些具体的技术洞见。
|
安全 Go 调度
探索Go语言的并发模式:协程与通道的协同作用
Go语言以其并发能力闻名于世,而协程(goroutine)和通道(channel)是实现并发的两大利器。本文将深入了解Go语言中协程的轻量级特性,探讨如何利用通道进行协程间的安全通信,并通过实际案例演示如何将这两者结合起来,构建高效且可靠的并发系统。