消息中间件系列教程(10) -RabbitMQ -案例代码(路由模式)

简介: 消息中间件系列教程(10) -RabbitMQ -案例代码(路由模式)

引言

代码已上传至Github,有兴趣的同学可以下载看看:https://github.com/ylw-github/RabbitMQ-Demo

前面博客讲解了RabbitMQ的五种队列形式《消息中间件系列教程(06) -RabbitMQ -五种队列形式》,主要讲解一下五种队列的代码实现。

主要分为:

  1. 点对点队列模式(简单)
  2. 工作队列模式(公平性)
  3. 发布订阅模式
  4. 路由模式Routing
  5. 通配符模式Topics

本文主要讲解路由模式。

路由模式

功能:生产者发送消息到交换机并指定一个路由key,消费者队列绑定到交换机时要制定路由key(key匹配就能接受消息,key不匹配就不能接受消息)

例如:我们可以把路由key设置为insert ,那么消费者队列key指定包含insert才可以接收消息,消费者队列key定义为update或者delete就不能接收消息。很好的控制了更新,插入和删除的操作。

采用交换机direct模式

1. 邮件短信案例讲解

1.新建Maven项目RabbitMQ-Demo

2.添加Maven依赖:

<dependencies>
  <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.5</version>
  </dependency>
</dependencies>

3.连接工具类

package com.ylw.rabbitmq;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQConnecUtils {
    public static Connection newConnection() throws IOException, TimeoutException {
        // 1.定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2.设置服务器地址
        factory.setHost("127.0.0.1");
        // 3.设置协议端口号
        factory.setPort(5672);
        // 4.设置vhost
        factory.setVirtualHost("OrderHost");
        // 5.设置用户名称
        factory.setUsername("OrderAdmin");
        // 6.设置用户密码
        factory.setPassword("123456");
        // 7.创建新的连接
        Connection newConnection = factory.newConnection();
        return newConnection;
    }
}
1.1 生产者
public class Producer {
    private static final String EXCHANGE_NAME = "direct_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建新的连接
        Connection connection = RabbitMQConnecUtils.newConnection();
        // 2.创建通道
        Channel channel = connection.createChannel();
        // 3.绑定的交换机 参数1交互机名称 参数2 exchange类型
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String routingKey = "info";
        String msg = "direct_exchange_msg" + routingKey;
        // 4.发送消息
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
        System.out.println("生产者发送msg:" + msg);
        // // 5.关闭通道、连接
        // channel.close();
        // connection.close();
        // 注意:如果消费没有绑定交换机和队列,则消息会丢失
    }
}
1.2 消费者

邮件消费者:

public class ConsumerEmailDirect {
    private static final String QUEUE_NAME = "consumer_direct_email";
    private static final String EXCHANGE_NAME = "direct_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建新的连接
        Connection connection = RabbitMQConnecUtils.newConnection();
        // 2.创建通道
        Channel channel = connection.createChannel();
        // 3.消费者关联队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("邮件消费者获取生产者消息:" + msg);
            }
        };
        // 5.消费者监听队列消息
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

短信消费者:

public class ConsumerSMSDirect {
    private static final String QUEUE_NAME = "consumer_direct_sms";
    private static final String EXCHANGE_NAME = "direct_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建新的连接
        Connection connection = RabbitMQConnecUtils.newConnection();
        // 2.创建通道
        Channel channel = connection.createChannel();
        // 3.消费者关联队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("短信消费者获取生产者消息:" + msg);
            }
        };
        // 5.消费者监听队列消息
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

3. 测试

启动生产者,并关闭,让其在RabbitMQ里面注册交换机,在控制台可以看出注册成功(如果不启动,可以手动注册,如下图Add a new exchange):

启动邮件消费者和短信消费者,在控制台可以看出有两个队列:

再启动生产者,可以看到邮件消费者消费信息

而短短信消费者没有消费信息:

这是因为短信消费者没有注册RoteKeyinfo类型的消息,所以接收不到。

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
开发框架 JSON 中间件
Go语言Web开发框架实践:路由、中间件、参数校验
Gin框架以其极简风格、强大路由管理、灵活中间件机制及参数绑定校验系统著称。本文详解其核心功能:1) 路由管理,支持分组与路径参数;2) 中间件机制,实现全局与局部控制;3) 参数绑定,涵盖多种来源;4) 结构体绑定与字段校验,确保数据合法性;5) 自定义校验器扩展功能;6) 统一错误处理提升用户体验。Gin以清晰模块化、流程可控及自动化校验等优势,成为开发者的优选工具。
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
263 3
|
9月前
|
消息中间件 网络协议 RocketMQ
RocketMQ Controller 模式 始终更新成本机ip
ontrollerAddr=192.168.24.241:8878 但是日志输出Update controller leader address to 127.0.0.1:8878。导致访问失败
156 3
|
11月前
|
设计模式 监控 中间件
深入理解PHP中的中间件模式
【10月更文挑战第20天】探索PHP编程世界中的“交通枢纽”——中间件模式。从代码层面剖析如何实现请求和响应的高效管理,以及如何在开发中应用这一模式来增强项目的扩展性和维护性。
|
11月前
|
中间件 PHP 开发者
深入理解PHP中的中间件模式
【10月更文挑战第6天】在PHP开发中,中间件模式是一种优雅的处理请求和响应的方式。本文将带你探索中间件模式的概念、实现及其在PHP框架中的应用,同时提供实用的代码示例来加深理解。
122 4
|
10月前
|
设计模式 缓存 中间件
探索PHP中的中间件模式
【10月更文挑战第33天】在编程世界中,设计模式是解决常见问题的模板。本文将带你领略PHP中中间件模式的魅力,它如何优雅地处理请求,并保持代码的整洁与可维护性。通过实际代码示例,我们将一步步实现一个简单的中间件框架,让你轻松理解并应用到自己的项目中。
|
12月前
|
缓存 中间件 网络架构
Python Web开发实战:高效利用路由与中间件提升应用性能
在Python Web开发中,路由和中间件是构建高效、可扩展应用的核心组件。路由通过装饰器如`@app.route()`将HTTP请求映射到处理函数;中间件则在请求处理流程中插入自定义逻辑,如日志记录和验证。合理设计路由和中间件能显著提升应用性能和可维护性。本文以Flask为例,详细介绍如何优化路由、避免冲突、使用蓝图管理大型应用,并通过中间件实现缓存、请求验证及异常处理等功能,帮助你构建快速且健壮的Web应用。
96 1
|
JavaScript 前端开发 中间件
深入浅出Node.js中间件模式
【9月更文挑战第13天】本文将带你领略Node.js中间件模式的魅力,从概念到实战,一步步揭示如何利用这一强大工具简化和增强你的Web应用。我们将通过实际代码示例,展示中间件如何在不修改原有代码的情况下,为请求处理流程添加功能层。无论你是前端还是后端开发者,这篇文章都将为你打开一扇通往更高效、更可维护代码的大门。
|
设计模式 缓存 中间件
深入理解PHP中的中间件模式
【9月更文挑战第12天】本文旨在通过浅显易懂的语言和实际代码示例,引导读者了解PHP中如何实现和使用中间件模式,以及这一设计模式如何优化我们的应用程序结构。文章将逐步介绍中间件的概念、在PHP中的应用实例,以及如何自定义中间件来解决实际问题。
|
设计模式 缓存 中间件
深入理解PHP中的中间件模式
【8月更文挑战第31天】 在PHP开发中,中间件模式是一种优雅的架构设计,它允许开发者以非侵入式的方式扩展应用程序的功能。本文将通过一个简单的示例,展示如何在PHP中实现和使用中间件,以及这种模式如何提高代码的可维护性和可测试性。

热门文章

最新文章