Rabbitmq 搭建使用案例 [附源码]

简介: Rabbitmq 搭建使用案例 [附源码]

RabbitMQ搭建

docker

docker run -d --hostname rabbitmq --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_MANAGEMENT_PLUGIN=rabbitmq_management_agent -p 15672:15672 -p 5672:5672 rabbitmq:management

代码

golang

生产者
package main

import (
  "flag"
  "fmt"
  amqp "github.com/rabbitmq/amqp091-go"
  "log"
  "strconv"
  "time"
)

func main() {
  var url = flag.String("url", "amqp://admin:admin@localhost:5672/my_vhost", "RabbitMQ server URL")
  var exchange = flag.String("exchange", "logs", "Exchange name")
  var key = flag.String("key", "log", "Routing key")

  flag.Parse()

  // 连接到RabbitMQ服务器
  conn, err := amqp.Dial(*url)
  if err != nil {
    log.Fatalf("Failed to connect to RabbitMQ: %v", err)
  }
  defer conn.Close()

  // 创建一个通道
  ch, err := conn.Channel()
  if err != nil {
    log.Fatalf("Failed to open a channel: %v", err)
  }
  defer ch.Close()

  // 声明一个交换机
  err = ch.ExchangeDeclare(
    *exchange, // name: 交换机名称
    "fanout",  // kind: 交换机类型
    true,      // durable: 是否持久化
    false,     // autoDelete: 没有队列绑定时是否自动删除
    false,     // internal: 是否是内部交换机
    false,     // noWait: 是否需要等待服务器响应
    nil,       // args: 其他参数
  )
  if err != nil {
    log.Fatalf("Failed to declare an exchange: %v", err)
  }

  // 发送消息
  body := "Hello World!" + fmt.Sprintf(time.Now().String())

  for i := 0; i < 20; i++ {
    body = strconv.Itoa(i) + body
    err = ch.Publish(
      *exchange, // 交换机名称
      *key,      // 路由键
      false,     // 强制发布
      false,     // 立即发布
      amqp.Publishing{
        ContentType:  "text/plain",
        DeliveryMode: amqp.Persistent,
        Body:         []byte(body),
        Expiration:   "10000", // 3000 3秒
      })
  }

  if err != nil {
    log.Fatalf("Failed to publish a message: %v", err)
  }

  fmt.Printf(" [x] Sent %s", body)
}

消费者
package main

import (
  "flag"
  "fmt"
  "log"

  amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
  var url = flag.String("url", "amqp://admin:admin@localhost:5672/my_vhost", "RabbitMQ server URL")
  var exchange = flag.String("exchange", "logs", "Exchange name")
  var key = flag.String("key", "log", "Routing key")

  flag.Parse()

  // 连接到RabbitMQ服务器
  conn, err := amqp.Dial(*url)
  if err != nil {
    log.Fatalf("Failed to connect to RabbitMQ: %v", err)
  }
  defer conn.Close()

  // 创建一个通道
  ch, err := conn.Channel()
  if err != nil {
    log.Fatalf("Failed to open a channel: %v", err)
  }
  defer ch.Close()

  // 声明一个交换机
  err = ch.ExchangeDeclare(
    *exchange, // name: 交换机名称
    "fanout",  // kind: 交换机类型
    true,      // durable: 是否持久化
    false,     // autoDelete: 没有队列绑定时是否自动删除
    false,     // internal: 是否是内部交换机
    false,     // noWait: 是否需要等待服务器响应
    nil,       // args: 其他参数
  )
  if err != nil {
    log.Fatalf("Failed to declare an exchange: %v", err)
  }

  // 声明一个队列
  q, err := ch.QueueDeclare(
    "queue01", // 随机生成队列名称
    true,      // 持久化
    false,     // 删除
    false,     // 独占
    false,     // 不等消息
    nil,       // 其他参数
  )
  if err != nil {
    log.Fatalf("Failed to declare a queue: %v", err)
  }

  // 绑定队列到交换机
  err = ch.QueueBind(
    q.Name,    // 队列名称
    *key,      // 路由键
    *exchange, // 交换机名称
    false,     // 现在绑定
    nil,       // 其他参数
  )
  if err != nil {
    log.Fatalf("Failed to bind a queue: %v", err)
  }

  // 接收消息
  msgs, err := ch.Consume(
    q.Name,       // 队列名称
    "consumer01", // 消费者标签
    false,        // 自动ack
    false,        // 不独占
    false,        // 不等消息
    false,        // 不从服务器获取消息
    nil,          // 其他参数
  )
  if err != nil {
    log.Fatalf("Failed to register a consumer: %v", err)
  }

  fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C\n")
  for d := range msgs {
    // 输出接收到的消息
    fmt.Printf(" [x] Received %s\n", d.Body)
    err = ch.Ack(d.DeliveryTag, true)
    if err != nil {
      log.Fatalf("Failed to ack message: %v", err)
    }
  }
}

可视化

看板

http://localhost:15672/

账户密码

admin
admin

消费进度

http://localhost:15672/#/queues

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
8月前
|
消息中间件 存储 数据库
RocketMQ 流存储解析:面向流场景的关键特性与典型案例
RocketMQ 流存储解析:面向流场景的关键特性与典型案例
88642 18
|
8月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
8月前
|
消息中间件 Apache 开发工具
RocketMQ-初体验RocketMQ(08)-IDEA拉取调测RocketMQ源码
RocketMQ-初体验RocketMQ(08)-IDEA拉取调测RocketMQ源码
73 0
|
2月前
|
消息中间件 存储 监控
深度写作:深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(Message Queue, MQ)扮演着至关重要的角色。MQ不仅实现了应用间的解耦,还提供了异步消息处理、流量削峰等功能。而在MQ的众多特性中,长轮询(Long Polling)机制因其能有效提升消息处理的实时性和效率,备受关注。
77 12
|
2月前
|
消息中间件 存储 Java
深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(MQ)作为一种重要的中间件,广泛应用于解耦、异步处理、流量削峰等场景。其中,延时消息和定时消息作为MQ的高级功能,能够进一步满足复杂的业务需求。为了实现这些功能,MQ系统需要进行一系列优化,长轮询机制便是其中的关键一环。本文将深入探讨MQ如何设计延时消息和定时消息的优化机制,特别是长轮询机制的实现原理及其在Java中的模拟实现。
41 2
|
3月前
|
传感器 数据可视化 网络协议
DIY可视化整合MQTT生成UniApp源码
DIY可视化整合MQTT生成UniApp源码
54 0
|
5月前
|
消息中间件 Arthas Java
RocketMQ—一次连接namesvr失败的案例分析
项目组在使用RocketMQ时遇到Consumer连接Name Server失败的问题,异常显示连接特定地址失败。通过Arthas工具逐步分析代码执行路径,定位到创建Channel返回空值导致异常。进一步跟踪发现,问题源于Netty组件在初始化`ByteBufAllocator`时出现错误。分析依赖后确认存在Netty版本冲突。解决方法为排除冲突的Netty包,仅保留兼容版本。
327 0
RocketMQ—一次连接namesvr失败的案例分析
|
7月前
|
数据采集 监控 物联网
MQTT协议在智能制造中的应用案例与效益分析
【6月更文挑战第8天】MQTT协议在智能制造中的应用案例与效益分析
203 1
|
8月前
|
消息中间件 小程序 网络性能优化
蓝易云 - 直播小程序源码有用的协议知识:MQTT协
在直播小程序源码中,MQTT协议可以用于实现实时消息推送,如弹幕、聊天消息、礼物信息等。通过使用MQTT协议,可以确保消息的实时性和可靠性,从而提高用户体验。
198 0
|
8月前
|
消息中间件 Java 调度
【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
70 1