消息中间件系列教程(11) -RabbitMQ -案例代码(通配符模式)

简介: 消息中间件系列教程(11) -RabbitMQ -案例代码(通配符模式)

引言

代码已上传至Github,有兴趣的同学可以下载看看:https://github.com/ylw-github/RabbitMQ-Demo

前面博客讲解了RabbitMQ的五种队列形式《消息中间件系列教程(06) -RabbitMQ -五种队列形式》,主要讲解一下五种队列的代码实现。

主要分为:

  1. 点对点队列模式(简单)
  2. 工作队列模式(公平性)
  3. 发布订阅模式
  4. 路由模式Routing
  5. 通配符模式Topics

本文主要讲解通配符模式。

配符模式

功能:此模式实在路由key模式的基础上,使用了通配符来管理消费者接收消息。生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配;

符号#:匹配一个或者多个词lazy.# 可以匹配lazy.irs或者lazy.irs.cor 符号*:只能匹配一个词lazy.*

可以匹配lazy.irs或者lazy.cor

1. 邮件短信案例

1.新建Maven项目RabbitMQ-Demo

2.添加Maven依赖:

<dependencies>
  <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.5</version>
  </dependency>
</dependencies>

3.连接工具类

package com.ylw.rabbitmq;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQConnecUtils {
    public static Connection newConnection() throws IOException, TimeoutException {
        // 1.定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2.设置服务器地址
        factory.setHost("127.0.0.1");
        // 3.设置协议端口号
        factory.setPort(5672);
        // 4.设置vhost
        factory.setVirtualHost("OrderHost");
        // 5.设置用户名称
        factory.setUsername("OrderAdmin");
        // 6.设置用户密码
        factory.setPassword("123456");
        // 7.创建新的连接
        Connection newConnection = factory.newConnection();
        return newConnection;
    }
}
1.1 生产者
public class Producer {
    private static final String EXCHANGE_NAME = "topic_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建新的连接
        Connection connection = RabbitMQConnecUtils.newConnection();
        // 2.创建通道
        Channel channel = connection.createChannel();
        // 3.绑定的交换机 参数1交互机名称 参数2 exchange类型
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String routingKey = "log.info.error";
        String msg = "topic_exchange_msg" + routingKey;
        // 4.发送消息
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
        System.out.println("生产者发送msg:" + msg);
        // // 5.关闭通道、连接
        channel.close();
        connection.close();
        // 注意:如果消费没有绑定交换机和队列,则消息会丢失
    }
}
1.2 消费者

邮件消费者:

public class ConsumerEmailDirect {
    private static final String QUEUE_NAME = "consumer_topic_email";
    private static final String EXCHANGE_NAME = "topic_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建新的连接
        Connection connection = RabbitMQConnecUtils.newConnection();
        // 2.创建通道
        Channel channel = connection.createChannel();
        // 3.消费者关联队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.#");
        // 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("邮件消费者获取生产者消息:" + msg);
            }
        };
        // 5.消费者监听队列消息
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

短信消费者:

public class ConsumerSMSDirect {
    private static final String QUEUE_NAME = "consumer_topic_sms";
    private static final String EXCHANGE_NAME = "topic_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建新的连接
        Connection connection = RabbitMQConnecUtils.newConnection();
        // 2.创建通道
        Channel channel = connection.createChannel();
        // 3.消费者关联队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.*");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("短信消费者获取生产者消息:" + msg);
            }
        };
        // 5.消费者监听队列消息
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

3. 测试

启动生产者,并关闭,让其在RabbitMQ里面注册交换机,在控制台可以看出注册成功(如果不启动,可以手动注册,如下图Add a new exchange):

启动邮件消费者和短信消费者,在控制台可以看出有两个队列:

再启动生产者,可以看到邮件消费者消费信息

而短信消费者没有消费信息:

因为生产者的RouteKey为log.info.error,邮件消费者的匹配的RouteKey为log.#(匹配log后面的所有),SMS消费者的匹配的RouteKey为log.*(匹配log后面的一个)。所以只有邮件消费者能消费消息。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
21天前
|
消息中间件 编解码 Docker
Docker部署RabbitMQ消息中间件
【7月更文挑战第4天】Docker部署RabbitMQ消息中间件
68 3
|
2天前
|
消息中间件 存储 Kafka
RocketMQ DLedger融合模式是什么
RocketMQ DLedger融合模式是什么
|
12天前
|
消息中间件 监控 负载均衡
中间件RabbitMQ性能瓶颈
【7月更文挑战第13天】
49 11
|
15天前
|
设计模式 中间件 PHP
探索PHP中的中间件模式
【7月更文挑战第12天】在现代的Web开发中,设计模式的应用对于代码的可维护性、扩展性和复用性至关重要。本文将深入探讨PHP语言中如何实现中间件模式,这是一种用于管理HTTP请求和响应的处理流程的设计模式。我们将通过具体示例来展示中间件模式如何在PHP项目中提升代码结构,并分析其在处理Web请求时的优势。
|
14天前
|
设计模式 开发框架 中间件
探索PHP中的中间件模式
【7月更文挑战第13天】在现代的Web开发中,设计模式扮演着至关重要的角色。本文将通过PHP语言的视角,深入探讨中间件模式的概念、实现及其在Web开发中的应用。我们将从中间件的定义开始,逐步过渡到如何在PHP框架中实现中间件,以及如何利用中间件来简化代码结构、增强应用的可扩展性和维护性。文章最后会提供一些实际案例,帮助读者更好地理解和运用中间件模式。
|
20天前
|
设计模式 中间件 PHP
深入理解PHP中的中间件模式
【7月更文挑战第7天】在Web开发的海洋中,PHP作为一艘灵活且强大的船,承载着无数的项目和解决方案。本文将揭开PHP中一个不为人知的角落——中间件模式,它如同船上的指南针,指引着请求的处理方向。我们将从中间件的定义出发,探索其在PHP中的应用实例,并深入分析其工作原理与实现方式,最终通过代码示例来揭示这一模式如何在实际应用中发挥巨大作用。文章旨在为读者提供对PHP中间件模式的全面认识,帮助开发者更好地利用这一模式优化项目架构。
|
16天前
|
设计模式 中间件 测试技术
PHP中的中间件模式解析与实践
【7月更文挑战第11天】在现代Web开发中,中间件模式已成为设计高效、可维护应用程序的关键。本文深入探讨了PHP环境下中间件模式的实现方法,并提供了一个实际示例来演示如何利用中间件优化请求处理流程。
17 1
|
16天前
|
存储 设计模式 监控
深入理解PHP中的中间件模式
【7月更文挑战第11天】本文将探索PHP中实现中间件模式的奥秘,从理论到实践,逐步剖析如何通过中间件提升代码的可维护性和扩展性。我们将摒弃传统的摘要形式,而是以一次虚拟的开发者对话引入话题,展现中间件在PHP项目中的应用价值和实现策略。
|
15天前
|
设计模式 负载均衡 中间件
深入理解PHP中的中间件模式
【7月更文挑战第12天】中间件模式在PHP开发中扮演着至关重要的角色,它允许开发者在请求处理流程中注入自定义的逻辑。本文将深入探讨中间件的工作原理、常见应用场景以及如何实现自己的中间件。通过实例演示,我们将揭示中间件模式如何优化应用架构,增强代码的可维护性和可扩展性。
11 0
|
2月前
|
消息中间件 存储 负载均衡
消息中间件的选择:RabbitMQ是一个明智的选择
消息中间件的选择:RabbitMQ是一个明智的选择
70 0