消息中间件系列教程(09) -RabbitMQ -案例代码(发布订阅模式)

简介: 消息中间件系列教程(09) -RabbitMQ -案例代码(发布订阅模式)

引言

代码已上传至Github,有兴趣的同学可以下载看看:https://github.com/ylw-github/RabbitMQ-Demo

前面博客讲解了RabbitMQ的五种队列形式《消息中间件系列教程(06) -RabbitMQ -五种队列形式》,主要讲解一下五种队列的代码实现。

主要分为:

  1. 点对点队列模式(简单)
  2. 工作队列模式(公平性)
  3. 发布订阅模式
  4. 路由模式Routing
  5. 通配符模式Topics

本文主要讲解发布订阅模式。

发布订阅模式

功能实现:一个生产者发送消息,多个消费者获取消息(同样的消息),包括一个生产者,一个交换机,多个队列,多个消费者。

思路解读

  1. 一个生产者,多个消费者
  2. 每一个消费者都有自己的一个队列
  3. 生产者没有直接发消息到队列中,而是发送到交换机
  4. 每个消费者的队列都绑定到交换机上
  5. 消息通过交换机到达每个消费者的队列

该模式就是Fanout Exchange(扇型交换机)将消息路由给绑定到它身上的所有队列。注意:交换机没有存储消息功能,如果消息发送到没有绑定消费队列的交换机,消息则丢失。

1.用户发邮件案例讲解

1.新建Maven项目RabbitMQ-Demo

2.添加Maven依赖:

<dependencies>
  <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.5</version>
  </dependency>
</dependencies>

3.连接工具类

package com.ylw.rabbitmq;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQConnecUtils {
    public static Connection newConnection() throws IOException, TimeoutException {
        // 1.定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2.设置服务器地址
        factory.setHost("127.0.0.1");
        // 3.设置协议端口号
        factory.setPort(5672);
        // 4.设置vhost
        factory.setVirtualHost("OrderHost");
        // 5.设置用户名称
        factory.setUsername("OrderAdmin");
        // 6.设置用户密码
        factory.setPassword("123456");
        // 7.创建新的连接
        Connection newConnection = factory.newConnection();
        return newConnection;
    }
}
1.1 生产者
public class Producer {
    private static final String EXCHANGE_NAME = "fanout_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建新的连接
        Connection connection = RabbitMQConnecUtils.newConnection();
        // 2.创建通道
        Channel channel = connection.createChannel();
        // 3.绑定的交换机 参数1交互机名称 参数2 exchange类型
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String msg = "fanout_exchange_msg";
        // 4.发送消息
        channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
        // System.out.println("生产者发送msg:" + msg);
        // // 5.关闭通道、连接
        // channel.close();
        // connection.close();
        // 注意:如果消费没有绑定交换机和队列,则消息会丢失
    }
}
1.2 消费者

邮件消费者:

public class ConsumerEmailFanout {
    private static final String QUEUE_NAME = "consumerFanout_email";
    private static final String EXCHANGE_NAME = "fanout_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建新的连接
        Connection connection = RabbitMQConnecUtils.newConnection();
        // 2.创建通道
        Channel channel = connection.createChannel();
        // 3.消费者关联队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("邮件消费者获取生产者消息:" + msg);
            }
        };
        // 5.消费者监听队列消息
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

短信消费者:

public class ConsumerSMSFanout {
    private static final String QUEUE_NAME = "ConsumerFanout_sms";
    private static final String EXCHANGE_NAME = "fanout_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建新的连接
        Connection connection = RabbitMQConnecUtils.newConnection();
        // 2.创建通道
        Channel channel = connection.createChannel();
        // 3.消费者关联队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("短信消费者获取生产者消息:" + msg);
            }
        };
        // 5.消费者监听队列消息
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

3. 测试

启动生产者,并关闭,让其在RabbitMQ里面注册交换机,在控制台可以看出注册成功(如果不启动,可以手动注册,如下图Add a new exchange):

启动邮件消费者和短信消费者,在控制台可以看出有两个队列:

再启动生产者,可以看到消费者消费信息:

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
18天前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
42 0
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
26天前
|
消息中间件 存储 JSON
rabbitmq基础教程(ui,java,springamqp)
本文提供了RabbitMQ的基础教程,包括如何使用UI创建队列和交换机、Java代码操作RabbitMQ、Spring AMQP进行消息发送和接收,以及如何使用不同的交换机类型(fanout、direct、topic)进行消息路由。
19 0
rabbitmq基础教程(ui,java,springamqp)
|
3月前
|
消息中间件 人工智能 Cloud Native
社区胜于代码,我们在阿帕奇软件基金会亚洲大会聊了聊开源中间件的未来
阿帕奇基金会亚洲大会顺利召开,阿里云消息技术负责人林清山在主论坛做了《阿里云中间件持续进化:从分布式应用架构向云原生 AI 原生应用架构全面升级》的演讲,从云厂商的视角分享了贡献开源、推动社区发展的过程,希望通过 AI 开发框架+AI 观测能力+AI 网关 + 事件驱动,一站式助力大模型应用落地。
252 15
社区胜于代码,我们在阿帕奇软件基金会亚洲大会聊了聊开源中间件的未来
|
3月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
3月前
|
消息中间件 Docker 容器
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
这篇文章提供了RabbitMQ的安装和基本使用教程,包括如何使用Docker拉取RabbitMQ镜像、创建容器、通过浏览器访问管理界面,以及如何创建交换机、队列、绑定和使用direct、fanout和topic三种类型的交换器进行消息发布和接收的测试。
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
|
3月前
|
消息中间件 存储 网络协议
消息中间件RabbitMQ---概述和概念 【一】
该文章提供了对消息中间件RabbitMQ的全面概述,包括其核心概念、工作原理以及与AMQP和JMS的关系。
消息中间件RabbitMQ---概述和概念 【一】
|
3月前
|
物联网 C# Windows
看看如何使用 C# 代码让 MQTT 进行完美通信
看看如何使用 C# 代码让 MQTT 进行完美通信
499 0
|
3月前
|
Java
MQTT(EMQX) - Java 调用 MQTT Demo 代码
MQTT(EMQX) - Java 调用 MQTT Demo 代码
128 0
MQTT(EMQX) - Java 调用 MQTT Demo 代码
|
3月前
|
网络协议 物联网 测试技术
App Inventor 2 MQTT拓展入门(保姆级教程)
本文演示的是App和一个测试客户端进行消息交互的案例,实际应用中,我们的测试客户端可以看着是任意的、支持MQTT协议的硬件,通过订阅及发布消息,联网硬件与我们的App进行双向数据通信,以实现万物互联的智能控制效果。
178 2
|
3月前
|
中间件 数据库连接 UED
Django中间件秘籍:如何用几行代码让你的应用变得超级强大?
【8月更文挑战第31天】中间件是Django框架的核心特性,位于视图与HTTP服务器之间,允许全局处理请求和响应,增强Web应用功能。通过实现`MiddlewareMixin`类的方法,如`process_request`和`process_response`,可以轻松实现请求预处理或响应后处理。中间件应用场景广泛,包括用户认证、CSRF防护和数据库连接管理等。创建并配置中间件需将其加入`settings.py`的`MIDDLEWARE`列表,顺序决定执行优先级。合理利用中间件能提高代码重用性和应用性能,带来更好的用户体验。
49 0

热门文章

最新文章