go rabbitmq 使用教程 ,go rabbitmq 简单队列,go rabbitmq work模式,go rabbitmq 订阅模式

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: go rabbitmq 使用教程 ,go rabbitmq 简单队列,go rabbitmq work模式,go rabbitmq 订阅模式

使用Go的过程记录了全部的rabbitmq的go代码,方便自己下次Copy,go的资料比较少,seo估计很好做,流量速度过来。

【一】.简单队列.生产者将消息发送到队列,消费者从队列中获取消息。

1.0.connection code

func NewRabbitMQ() *amqp.Channel {

// 获取connection

amqUrl := "amqp://admin:elecfans@spiderqueue.elecfans.net:5672/"

connection, err := amqp.Dial(amqUrl)

if err != nil {

panic(fmt.Sprintf("获取connection异常:%s\n", err))

}


// 获取channel

channel, err := connection.Channel()

if err != nil {

panic(fmt.Sprintf("获取channel异常:%s\n", err))

}


return channel

}

1.1.client code:

// 生产_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 生产_声明队列(不存在自动创建)

queueName := "ic_order_active"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 生产_发送消息到队列

message := "ic元器件活动来新单啦"

err = channel.Publish(

   // 交换机

   "",

   // 队列名称

   queueName,

   // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃

   false,

   // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列

   false,

   amqp.Publishing{

       ContentType: "text/plain",

       // 队列和消息同时设置持久化

       DeliveryMode: 2,

       Body:         []byte(message),

   },

)

if err != nil {

   fmt.Printf("发送消息到队列异常:%s", err)

   return

}

1.2.service code

// 消费_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 消费_声明队列

queueName := "ic_order_active"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 消费_获取队列中的消息

message, err := channel.Consume(

   // 队列名称

   queueName,

   // 消费者名称

   "ic订单消费者",

   // 是否自动ack

   false,

   // 是否排他性队列标识

   false,

   false,

   false,

   nil,

)

if err != nil {

   return

}


// 输出消息

for msg := range message {

   // 打印消息内容

   fmt.Printf("收到队列消息%s \n", msg.Body)

   // 确认收到消息

   msg.Ack(true)

}

【二】.Work模式.一个生产者,多个消费者,一个消息只能被一个消费者获取到

2.0.client code

// 生产_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 生产_声明队列(不存在自动创建)

queueName := "ic_order_active"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 生产_发送消息到队列

message := "ic元器件活动来新单啦,订单id"

messageSize := 10

for i := 0; i < messageSize; i++ {

   // 方便观察消费者

   time.Sleep(time.Second * 1)

   err = channel.Publish(

       // 交换机

       "",

       // 队列名称

       queueName,

       // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃

       false,

       // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列

       false,

       amqp.Publishing{

           ContentType: "text/plain",

           // 队列和消息同时设置持久化

           DeliveryMode: 2,

           Body:         []byte(message + strconv.Itoa(i)),

       },

   )

   if err != nil {

       fmt.Printf("发送消息到队列异常:%s", err)

       return

   }

}

2.1.service code

// 消费_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 消费_声明队列

queueName := "ic_order_active"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 设置同一时间服务器只会发送一条消息给消费者

channel.Qos(

   // 每次获取多少条

   10,

   // 预加载数量(rabbitMq不支持)

   0,

   // false->对当前队列可用 true->对channel可用(rabbitMq不支持)

   false,

)


// 消费_获取队列中的消息

message, err := channel.Consume(

   // 队列名称

   queueName,

   // 消费者名称

   "ic订单消费者",

   // 是否自动ack

   false,

   // 是否排他性队列标识

   false,

   false,

   false,

   nil,

)

if err != nil {

   return

}


// 输出消息

for msg := range message {

   // 打印消息内容

   fmt.Printf("收到队列消息%s \n", msg.Body)

   // 确认收到消息

   msg.Ack(true)

}

【三】.订阅模式(fanout).

一个生产者,多个消费者

每个消费者拥有自己的队列

生产者将消息发送到交换机

每个队列自己去绑定交换机

(交换机没有储存能力,发送到没有任何队列绑定的交换机则消息丢失)

3.0.client code

// 生产_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 生产_声明交换机

exchangeName := "notice"

err := channel.ExchangeDeclare(

   // 交换机名称

   exchangeName,

   // 交换机类型

   "fanout",

   // 持久化

   true,

   // true->当所有绑定都与交换器解绑后,会自动删除此交换器

   false,

   // true->客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器

   false,

   // 是否非阻塞

   false,

   // 其他参数

   nil,

)

if err != nil {

   fmt.Printf("声明交换机异常:%s", err)

   return

}


// 生产_发送消息到交换机

message := "最新消息,华秋全场元器件3折起"

messageSize := 10

for i := 0; i < messageSize; i++ {

   // 方便观察消费者

   time.Sleep(time.Second * 1)

   err = channel.Publish(

       // 交换机

       exchangeName,

       // 路由key

       "",

       // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃

       false,

       // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列

       false,

       amqp.Publishing{

           ContentType: "text/plain",

           // 队列和消息同时设置持久化

           DeliveryMode: 2,

           Body:         []byte(message + strconv.Itoa(i)),

       },

   )

   if err != nil {

       fmt.Printf("发送消息到队列异常:%s", err)

       return

   }

}

【四】.直接匹配(direct)

4.0.client code

// 生产_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 生产_声明交换机

exchangeName := "pcb_layout_order"

err := channel.ExchangeDeclare(

   // 交换机名称

   exchangeName,

   // 交换机类型

   "direct",

   // 持久化

   true,

   // true->当所有绑定都与交换器解绑后,会自动删除此交换器

   false,

   // true->客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器

   false,

   // 是否非阻塞

   false,

   // 其他参数

   nil,

)

if err != nil {

   fmt.Printf("声明交换机异常:%s", err)

   return

}


// 生产_发送消息到交换机

allRouteKey := []string{

   "order_insert", // 新增订单

   "order_delete", // 删除订单

}


// 循环发送到两个路由key

message := "订单id1事件"

for _, routeKey := range allRouteKey {

   err = channel.Publish(

       // 交换机

       exchangeName,

       // 路由key

       routeKey,

       // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃

       false,

       // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列

       false,

       amqp.Publishing{

           ContentType: "text/plain",

           // 队列和消息同时设置持久化

           DeliveryMode: 2,

           Body:         []byte(message),

       },

   )

}

4.1.service code

// 消费_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 消费_声明队列

queueName := "notice_queue"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 队列绑定交换机+绑定订单新增key

exchangeName := "pcb_layout_order"

allRouteKey := []string{

   "order_insert", // 新增订单

   "order_delete", // 删除订单

}

for _, routeKey := range allRouteKey {

   channel.QueueBind(

       // 队列名称

       queueName,

       // 绑定的键

       routeKey,

       // 交换机名称

       exchangeName,

       // 是否阻塞处理

       false,

       // 其他参数

       nil,

   )

}


// 设置同一时间服务器只会发送一条消息给消费者

channel.Qos(

   // 每次获取多少条

   10,

   // 预加载数量(rabbitMq不支持)

   0,

   // false->对当前队列可用 true->对channel可用(rabbitMq不支持)

   false,

)


// 消费_获取队列中的消息

message, err := channel.Consume(

   // 队列名称

   queueName,

   // 消费者名称

   "ic订单消费者",

   // 是否自动ack

   false,

   // 是否排他性队列标识

   false,

   false,

   false,

   nil,

)

if err != nil {

   return

}

// 输出消息

for msg := range message {

   // 打印消息内容

   fmt.Printf("收到队列消息%s \n", msg.Body)

   // 确认收到消息

   msg.Ack(true)

}

【五】.直接匹配(topic)

topic同样根据key匹配到队列,#匹配一个或者多个,*匹配一个.(切记:发往topic交换器的routing_key它必须是.分隔的几个词)

5.0.client code

// 生产_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 生产_声明交换机

exchangeName := "smt_steel_order"

err := channel.ExchangeDeclare(

   // 交换机名称

   exchangeName,

   // 交换机类型

   "topic",

   // 持久化

   true,

   // true->当所有绑定都与交换器解绑后,会自动删除此交换器

   false,

   // true->客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器

   false,

   // 是否非阻塞

   false,

   // 其他参数

   nil,

)

if err != nil {

   fmt.Printf("声明交换机异常:%s", err)

   return

}


// 生产_发送消息到交换机

allRouteKey := []string{

   "order.insert", // 新增订单

   "order.delete", // 删除订单

}

for _, routeKey := range allRouteKey {

   //fmt.Print(routeKey)

   message := "来自" + routeKey + "的消息"

   err = channel.Publish(

       // 交换机

       exchangeName,

       // 路由key

       routeKey,

       // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃

       true,

       // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列

       false,

       amqp.Publishing{

           ContentType: "text/plain",

           // 队列和消息同时设置持久化

           DeliveryMode: 2,

           Body:         []byte(message),

       },

   )

}

5.1.service code

// 消费_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 消费_声明队列

queueName := "notice_queue"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 队列绑定交换机+绑定订单新增key

exchangeName := "smt_steel_order"

routeKey := "order.#"

channel.QueueBind(

   // 队列名称

   queueName,

   // 绑定的路由

   routeKey,

   // 交换机名称

   exchangeName,

   // 是否阻塞处理

   false,

   // 其他参数

   nil,

)


// 设置同一时间服务器只会发送一条消息给消费者

channel.Qos(

   // 每次获取多少条

   10,

   // 预加载数量(rabbitMq不支持)

   0,

   // false->对当前队列可用 true->对channel可用(rabbitMq不支持)

   false,

)


// 消费_获取队列中的消息

message, err := channel.Consume(

   // 队列名称

   queueName,

   // 消费者名称

   "smt订单消费者",

   // 是否自动ack

   false,

   // 是否排他性队列标识

   false,

   false,

   false,

   nil,

)

if err != nil {

   return

}


// 输出消息

for msg := range message {

   // 打印消息内容

   fmt.Printf("收到队列消息%s \n", msg.Body)

   // 确认收到消息

   msg.Ack(true)

}

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
缓存 NoSQL Go
通过 SingleFlight 模式学习 Go 并发编程
通过 SingleFlight 模式学习 Go 并发编程
|
6天前
|
Go 调度 开发者
探索Go语言中的并发模式:goroutine与channel
在本文中,我们将深入探讨Go语言中的核心并发特性——goroutine和channel。不同于传统的并发模型,Go语言的并发机制以其简洁性和高效性著称。本文将通过实际代码示例,展示如何利用goroutine实现轻量级的并发执行,以及如何通过channel安全地在goroutine之间传递数据。摘要部分将概述这些概念,并提示读者本文将提供哪些具体的技术洞见。
|
27天前
|
安全 Go 调度
探索Go语言的并发模式:协程与通道的协同作用
Go语言以其并发能力闻名于世,而协程(goroutine)和通道(channel)是实现并发的两大利器。本文将深入了解Go语言中协程的轻量级特性,探讨如何利用通道进行协程间的安全通信,并通过实际案例演示如何将这两者结合起来,构建高效且可靠的并发系统。
|
27天前
|
安全 Go 开发者
破译Go语言中的并发模式:从入门到精通
在这篇技术性文章中,我们将跳过常规的摘要模式,直接带你进入Go语言的并发世界。你将不会看到枯燥的介绍,而是一段代码的旅程,从Go的并发基础构建块(goroutine和channel)开始,到高级模式的实践应用,我们共同探索如何高效地使用Go来处理并发任务。准备好,让Go带你飞。
|
3月前
|
存储 Unix 测试技术
解释Go中常见的I/O模式
解释Go中常见的I/O模式
|
4月前
|
XML JSON Go
Swoole与Go系列教程之WebSocket服务的应用
在 WebSocket 协议出现之前,Web 应用为了能过获取到实时的数据都是通过不断轮询服务端的接口。轮询的效率、延时很低,并且很耗费资源。
1057 2
Swoole与Go系列教程之WebSocket服务的应用
|
4月前
|
网络协议 Go
Swoole与Go系列教程之TCP服务的应用
TCP(传输控制协议)的出现是为了解决计算机网络中的数据可靠传输和连接管理的问题。在早期的计算机网络中,特别是在分组交换和互联网的发展初期,网络是不可靠的,存在丢包、错误和延迟等问题。
993 0
Swoole与Go系列教程之TCP服务的应用
|
4月前
|
设计模式 Go
Go语言设计模式:使用Option模式简化类的初始化
在Go语言中,面对构造函数参数过多导致的复杂性问题,可以采用Option模式。Option模式通过函数选项提供灵活的配置,增强了构造函数的可读性和可扩展性。以`Foo`为例,通过定义如`WithName`、`WithAge`、`WithDB`等设置器函数,调用者可以选择性地传递所需参数,避免了记忆参数顺序和类型。这种模式提升了代码的维护性和灵活性,特别是在处理多配置场景时。
72 8
|
3月前
|
缓存 算法 Go