Rabbitmq 搭建使用案例 [附源码]

简介: Rabbitmq 搭建使用案例 [附源码]

RabbitMQ搭建

docker

docker run -d --hostname rabbitmq --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_MANAGEMENT_PLUGIN=rabbitmq_management_agent -p 15672:15672 -p 5672:5672 rabbitmq:management

代码

golang

生产者
package main

import (
  "flag"
  "fmt"
  amqp "github.com/rabbitmq/amqp091-go"
  "log"
  "strconv"
  "time"
)

func main() {
  var url = flag.String("url", "amqp://admin:admin@localhost:5672/my_vhost", "RabbitMQ server URL")
  var exchange = flag.String("exchange", "logs", "Exchange name")
  var key = flag.String("key", "log", "Routing key")

  flag.Parse()

  // 连接到RabbitMQ服务器
  conn, err := amqp.Dial(*url)
  if err != nil {
    log.Fatalf("Failed to connect to RabbitMQ: %v", err)
  }
  defer conn.Close()

  // 创建一个通道
  ch, err := conn.Channel()
  if err != nil {
    log.Fatalf("Failed to open a channel: %v", err)
  }
  defer ch.Close()

  // 声明一个交换机
  err = ch.ExchangeDeclare(
    *exchange, // name: 交换机名称
    "fanout",  // kind: 交换机类型
    true,      // durable: 是否持久化
    false,     // autoDelete: 没有队列绑定时是否自动删除
    false,     // internal: 是否是内部交换机
    false,     // noWait: 是否需要等待服务器响应
    nil,       // args: 其他参数
  )
  if err != nil {
    log.Fatalf("Failed to declare an exchange: %v", err)
  }

  // 发送消息
  body := "Hello World!" + fmt.Sprintf(time.Now().String())

  for i := 0; i < 20; i++ {
    body = strconv.Itoa(i) + body
    err = ch.Publish(
      *exchange, // 交换机名称
      *key,      // 路由键
      false,     // 强制发布
      false,     // 立即发布
      amqp.Publishing{
        ContentType:  "text/plain",
        DeliveryMode: amqp.Persistent,
        Body:         []byte(body),
        Expiration:   "10000", // 3000 3秒
      })
  }

  if err != nil {
    log.Fatalf("Failed to publish a message: %v", err)
  }

  fmt.Printf(" [x] Sent %s", body)
}

消费者
package main

import (
  "flag"
  "fmt"
  "log"

  amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
  var url = flag.String("url", "amqp://admin:admin@localhost:5672/my_vhost", "RabbitMQ server URL")
  var exchange = flag.String("exchange", "logs", "Exchange name")
  var key = flag.String("key", "log", "Routing key")

  flag.Parse()

  // 连接到RabbitMQ服务器
  conn, err := amqp.Dial(*url)
  if err != nil {
    log.Fatalf("Failed to connect to RabbitMQ: %v", err)
  }
  defer conn.Close()

  // 创建一个通道
  ch, err := conn.Channel()
  if err != nil {
    log.Fatalf("Failed to open a channel: %v", err)
  }
  defer ch.Close()

  // 声明一个交换机
  err = ch.ExchangeDeclare(
    *exchange, // name: 交换机名称
    "fanout",  // kind: 交换机类型
    true,      // durable: 是否持久化
    false,     // autoDelete: 没有队列绑定时是否自动删除
    false,     // internal: 是否是内部交换机
    false,     // noWait: 是否需要等待服务器响应
    nil,       // args: 其他参数
  )
  if err != nil {
    log.Fatalf("Failed to declare an exchange: %v", err)
  }

  // 声明一个队列
  q, err := ch.QueueDeclare(
    "queue01", // 随机生成队列名称
    true,      // 持久化
    false,     // 删除
    false,     // 独占
    false,     // 不等消息
    nil,       // 其他参数
  )
  if err != nil {
    log.Fatalf("Failed to declare a queue: %v", err)
  }

  // 绑定队列到交换机
  err = ch.QueueBind(
    q.Name,    // 队列名称
    *key,      // 路由键
    *exchange, // 交换机名称
    false,     // 现在绑定
    nil,       // 其他参数
  )
  if err != nil {
    log.Fatalf("Failed to bind a queue: %v", err)
  }

  // 接收消息
  msgs, err := ch.Consume(
    q.Name,       // 队列名称
    "consumer01", // 消费者标签
    false,        // 自动ack
    false,        // 不独占
    false,        // 不等消息
    false,        // 不从服务器获取消息
    nil,          // 其他参数
  )
  if err != nil {
    log.Fatalf("Failed to register a consumer: %v", err)
  }

  fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C\n")
  for d := range msgs {
    // 输出接收到的消息
    fmt.Printf(" [x] Received %s\n", d.Body)
    err = ch.Ack(d.DeliveryTag, true)
    if err != nil {
      log.Fatalf("Failed to ack message: %v", err)
    }
  }
}

可视化

看板

http://localhost:15672/

账户密码

admin
admin

消费进度

http://localhost:15672/#/queues

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
消息中间件 存储 Java
深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(MQ)作为一种重要的中间件,广泛应用于解耦、异步处理、流量削峰等场景。其中,延时消息和定时消息作为MQ的高级功能,能够进一步满足复杂的业务需求。为了实现这些功能,MQ系统需要进行一系列优化,长轮询机制便是其中的关键一环。本文将深入探讨MQ如何设计延时消息和定时消息的优化机制,特别是长轮询机制的实现原理及其在Java中的模拟实现。
61 2
|
3月前
|
消息中间件 存储 监控
深度写作:深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(Message Queue, MQ)扮演着至关重要的角色。MQ不仅实现了应用间的解耦,还提供了异步消息处理、流量削峰等功能。而在MQ的众多特性中,长轮询(Long Polling)机制因其能有效提升消息处理的实时性和效率,备受关注。
106 12
|
4月前
|
传感器 数据可视化 网络协议
DIY可视化整合MQTT生成UniApp源码
DIY可视化整合MQTT生成UniApp源码
73 0
|
6月前
|
消息中间件 Arthas Java
RocketMQ—一次连接namesvr失败的案例分析
项目组在使用RocketMQ时遇到Consumer连接Name Server失败的问题,异常显示连接特定地址失败。通过Arthas工具逐步分析代码执行路径,定位到创建Channel返回空值导致异常。进一步跟踪发现,问题源于Netty组件在初始化`ByteBufAllocator`时出现错误。分析依赖后确认存在Netty版本冲突。解决方法为排除冲突的Netty包,仅保留兼容版本。
427 0
RocketMQ—一次连接namesvr失败的案例分析
|
8月前
|
数据采集 监控 物联网
MQTT协议在智能制造中的应用案例与效益分析
【6月更文挑战第8天】MQTT协议在智能制造中的应用案例与效益分析
238 1
|
9月前
|
消息中间件 小程序 网络性能优化
蓝易云 - 直播小程序源码有用的协议知识:MQTT协
在直播小程序源码中,MQTT协议可以用于实现实时消息推送,如弹幕、聊天消息、礼物信息等。通过使用MQTT协议,可以确保消息的实时性和可靠性,从而提高用户体验。
208 0
|
9月前
|
消息中间件 JSON Java
RabbitMQ入门指南(六):消息转换器及其案例
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了RabbitMQ默认转换器、JSON转换器及其案例等内容。
204 0
|
9月前
|
消息中间件 微服务
RabbitMQ入门指南(四):交换机与案例解析
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了交换机在RabbitMQ中的作用与类型、交换机案例(Fanout交换机、Direct交换机、Topic交换机)等内容。
462 0
|
9月前
|
消息中间件 Java 调度
【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
88 1
|
9月前
|
消息中间件 Java RocketMQ
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
60 1