RabbitMQ-从基础到实战(3)— 消息的交换(上)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介:

转载请注明出处:http://www.cnblogs.com/4----/p/6549865.html

0.目录

RabbitMQ-从基础到实战(1)— Hello RabbitMQ

RabbitMQ-从基础到实战(2)— 防止消息丢失

RabbitMQ-从基础到实战(4)— 消息的交换(中)

RabbitMQ-从基础到实战(5)— 消息的交换(下)

RabbitMQ-从基础到实战(6)— 与Spring集成

1.简介

在前面的例子中,每个消息都只对应一个消费者,即使有多个消费者在线,也只会有一个消费者接收并处理一条消息,这是消息中间件的一种常用方式。

另外一种方式,生产者生产一条消息,广播给一个或多个队列,所有订阅了这个队列的消费者,都可以消费这条消息,这就是消息订阅。

官方教程列举了这样一个场景,生产者发出一条记录日志的消息,消费者1接收到后写日志到硬盘,消费者2接收到后打印日志到屏幕。工作中还有很多这样的场景有待发掘,适当的使用消息订阅后可以成倍的增加效率。

2.RabbitMQ的交换中心(Exchange)

在前两章的例子中,我们涉及到了三个概念

  1. 生产者
  2. 队列
  3. 消费者

这不禁让我们以为,生产者生产消息后直接到发送到队列,消费者从队列中获取消息,再消费掉。

其实这是错误的,在RabbitMQ中,生产者不会直接把消息发送给队列,实际上,生产者甚至不知道一条消息会不会被发送到队列上。

正确的概念是,生产者会把消息发送给RabbitMQ的交换中心(Exchange),Exchange的一侧是生产者,另一侧则是一个或多个队列,由Exchange决定一条消息的生命周期--发送给某些队列,或者直接丢弃掉。

这个概念在官方文档中被称作RabbitMQ消息模型的核心思想(core idea)

如下图,其中X代表的是Exchange。

image

RabbitMQ中,有4种类型的Exchange

  • direct    通过消息的routing key比较queue的key,相等则发给该queue,常用于相同应用多实例之间的任务分发
    • 默认类型   本身是一个direct类型的exchange,routing key自动设置为queue name。注意,direct不等于默认类型,默认类型是在queue没有指定exchange时的默认处理方式,发消息时,exchange字段也要相应的填成空字符串“”
  • topic    话题,通过可配置的规则分发给绑定在该exchange上的队列,通过地理位置推送等场景适用
  • headers    当分发规则很复杂,用routing key不好表达时适用,忽略routing key,用header取代之,header可以为非字符串,例如Integer或者String
  • fanout    分发给所有绑定到该exchange上的队列,忽略routing key,适用于MMO游戏、广播、群聊等场景

更详细的介绍,请看官方文档

3.临时队列

可以对一个队列命名是十分重要的,在消费者消费消息时,要指明消费哪个队列的消息(下面的queue),这样就可以让多个消费者同时分享一个队列

String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

上述记录日志的场景中,有以下几个特点

  • 所有消费者都需要监听所有的日志消息,因此每个消费者都需要一个单独的队列,不需要和别人分享
  • 消费者只关心最新的消息,连接到RabbitMQ之前的消息不需要关心,因此,每次连接时需要创建一个队列,绑定到相应的exchange上,连接断开后,删除该队列

自己声明队列是比较麻烦的,因此,RabbitMQ提供了简便的获取临时队列的方法,该队列会在连接断开后销毁

String queueName = channel.queueDeclare().getQueue();

这行代码会获取一个名字类似于“amq.gen-JzTY20BRgKO-HjmUJj0wLg”的临时队列

4.绑定

再次注意,在RabbitMQ中,消息是发送到Exchange的,不是直接发送的Queue。因此,需要把Queue和Exchange进行绑定,告诉RabbitMQ把指定的Exchange上的消息发送的这个队列上来

绑定队列使用此方法

Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;

其中,queue是队列名,exchange是要绑定的交换中心,routingKey就是这个queue的routingKey

5.实践

下面来实现上述场景,生产者发送日志消息,消费者1记录日志,消费者2打印日志

下面的代码中,把连接工厂等方法放到了构造函数中,也就是说,每new一个对象,都会创建一个连接,在生产环境这样做是很浪费性能的,每次创建一个connection都会建立一次TCP连接,生产环境应使用连接池。而Channel又不一样,多个Channel是共用一个TCP连接的,因此可以放心的获取Channel(本结论出自官方文档对Channel的定义)

AMQP 0-9-1 connections are multiplexed with channels that can be thought of as "lightweight connections that share a single TCP connection".

For applications that use multiple threads/processes for processing, it is very common to open a new channel per thread/process and not share channels between them.

日志消息发送类 LogSender

复制代码
复制代码
 1 import java.io.IOException;
 2 import java.util.concurrent.TimeoutException;
 3 
 4 import org.slf4j.Logger;
 5 import org.slf4j.LoggerFactory;
 6 
 7 import com.rabbitmq.client.Channel;
 8 import com.rabbitmq.client.Connection;
 9 import com.rabbitmq.client.ConnectionFactory;
10 
11 public class LogSender {
12 
13     private Logger logger = LoggerFactory.getLogger(LogSender.class);
14     private  ConnectionFactory factory;
15     private  Connection connection;
16     private  Channel channel;
17     
18     /**
19      * 在构造函数中获取连接
20      */
21     public LogSender(){
22         super();
23         try {
24             factory = new ConnectionFactory();
25             factory.setHost("127.0.0.1");
26             connection = factory.newConnection();
27             channel = connection.createChannel();
28         } catch (Exception e) {
29             logger.error(" [X] INIT ERROR!",e);
30         }
31     }
32     /**
33      * 提供个关闭方法,现在并没有什么卵用
34      * @return
35      */
36     public boolean closeAll(){
37         try {
38             this.channel.close();
39             this.connection.close();
40         } catch (IOException | TimeoutException e) {
41             logger.error(" [X] CLOSE ERROR!",e);
42             return false;
43         }
44         return true;
45     }
46     
47     /**
48      * 我们更加关心的业务方法
49      * @param message
50      */
51     public void sendMessage(String message) {
52             try {
53                 //声明一个exchange,命名为logs,类型为fanout
54                 channel.exchangeDeclare("logs", "fanout");
55                 //exchange是logs,表示发送到此Exchange上
56                 //fanout类型的exchange,忽略routingKey,所以第二个参数为空
57                 channel.basicPublish("logs", "", null, message.getBytes());
58                 logger.debug(" [D] message sent:"+message);
59             } catch (IOException e) {
60                 e.printStackTrace();
61             }
62     }
63 }
复制代码
复制代码

在LogSender中,和之前的例子不一样的地方是,我们没有直接声明一个Queue,取而代之的是声明了一个exchange

发布消息时,第一个参数填了我们声明的exchange名字,routingKey留空,因为fanout类型忽略它。

在前面的例子中,我们routingKey填的是队列名,因为默认的exchange(exchange位空字符串时使用默认交换中心)会把队列的routingKey设置为queueName(声明队列的时候设置的,不是发送消息的时候),又是direct类型,所以可以通过queueName当做routingKey找到队列。

消费类 LogConsumer

复制代码
复制代码
 1 package com.liyang.ticktock.rabbitmq;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 import org.slf4j.Logger;
 7 import org.slf4j.LoggerFactory;
 8 
 9 import com.rabbitmq.client.AMQP;
10 import com.rabbitmq.client.Channel;
11 import com.rabbitmq.client.Connection;
12 import com.rabbitmq.client.ConnectionFactory;
13 import com.rabbitmq.client.Consumer;
14 import com.rabbitmq.client.DefaultConsumer;
15 import com.rabbitmq.client.Envelope;
16 
17 public class LogConsumer {
18 
19     private Logger logger = LoggerFactory.getLogger(LogConsumer.class);
20     private ConnectionFactory factory;
21     private Connection connection;
22     private Channel channel;
23 
24     /**
25      * 在构造函数中获取连接
26      */
27     public LogConsumer() {
28         super();
29         try {
30             factory = new ConnectionFactory();
31             factory.setHost("127.0.0.1");
32             connection = factory.newConnection();
33             channel = connection.createChannel();
34             // 声明exchange,防止生产者没启动,exchange不存在
35             channel.exchangeDeclare("logs","fanout");
36         } catch (Exception e) {
37             logger.error(" [X] INIT ERROR!", e);
38         }
39     }
40 
41     /**
42      * 提供个关闭方法,现在并没有什么卵用
43      * 
44      * @return
45      */
46     public boolean closeAll() {
47         try {
48             this.channel.close();
49             this.connection.close();
50         } catch (IOException | TimeoutException e) {
51             logger.error(" [X] CLOSE ERROR!", e);
52             return false;
53         }
54         return true;
55     }
56 
57     /**
58      * 我们更加关心的业务方法
59      */
60     public void consume() {
61         try {
62             // 获取一个临时队列
63             String queueName = channel.queueDeclare().getQueue();
64             // 把刚刚获取的队列绑定到logs这个交换中心上,fanout类型忽略routingKey,所以第三个参数为空
65             channel.queueBind(queueName, "logs", "");
66             //定义一个Consumer,消费Log消息
67             Consumer consumer = new DefaultConsumer(channel) {
68                 @Override
69                 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
70                         byte[] body) throws IOException {
71                     String message = new String(body, "UTF-8");
72                     logger.debug(" [D] 我是来打印日志的:"+message);
73                 }
74             };
75             //这里自动确认为true,接收到消息后该消息就销毁了
76             channel.basicConsume(queueName, true, consumer);
77         } catch (IOException e) {
78             e.printStackTrace();
79         }
80     }
81 }
复制代码
复制代码

复制一个项目,把72行改为如下代码,代表两个做不同工作的消费者

1 logger.debug(" [D] 我已经把消息写到硬盘了:"+message);

消费者App

复制代码
复制代码
1 public class App 
2 {
3     public static void main( String[] args )
4     {
5         LogConsumer consumer = new LogConsumer();
6         consumer.consume();
7     }
8 }
复制代码
复制代码

生产者App

复制代码
复制代码
1 public class App {
2     public static void main( String[] args ) throws InterruptedException{
3         LogSender sender = new LogSender();
4         while(true){
5             sender.sendMessage(System.nanoTime()+"");
6             Thread.sleep(1000);
7         }
8     }
9 }
复制代码
复制代码

把消费者打包成两个可执行的jar包,方便观察控制台

用java -jar 命令执行,结果如下

6.结束语

本章介绍了RabbitMQ中消息的交换,再次强调,RabbitMQ中,消息是通过交换中心转发到队列的,不要被默认的exchange混淆,默认的exchange会自动把queue的名字设置为它的routingKey,所以消息发布时,才能通过queueName找到该队列,其实此时queueName扮演的角色就是routingKey。

本教程是参考官方文档写出来的,后续章节会介绍更多RabbitMQ的相关知识以及项目中的实战技巧

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
7月前
|
消息中间件 负载均衡 Java
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
103 0
|
7月前
|
消息中间件 Java RocketMQ
RocketMQ实战教程之RocketMQ安装
这是一篇关于RocketMQ安装的实战教程,主要介绍了在CentOS系统上使用传统安装和Docker两种方式安装RocketMQ。首先,系统需要是64位,并且已经安装了JDK 1.8。传统安装包括下载安装包,解压并启动NameServer和Broker。Docker安装则涉及安装docker和docker-compose,然后通过docker-compose.yaml文件配置并启动服务。教程还提供了启动命令和解决问题的提示。
|
7月前
|
消息中间件 前端开发 数据库
RocketMQ实战教程之MQ简介与应用场景
RocketMQ实战教程介绍了MQ的基本概念和应用场景。MQ(消息队列)是生产者和消费者模型,用于异步传输数据,实现系统解耦。消息中间件在生产者发送消息和消费者接收消息之间起到邮箱作用,简化通信。主要应用场景包括:1)应用解耦,如订单系统与库存系统的非直接交互;2)异步处理,如用户注册后的邮件和短信发送延迟处理,提高响应速度;3)流量削峰,如秒杀活动限制并发流量,防止系统崩溃。
|
2月前
|
消息中间件 数据采集 中间件
RabbitMQ的使用—实战
RabbitMQ的使用—实战
89 0
|
3月前
|
消息中间件 缓存 Java
RocketMQ的JAVA落地实战
RocketMQ作为一款高性能、高可靠、高实时、分布式特点的消息中间件,其核心作用主要体现在异步处理、削峰填谷以及系统解耦三个方面。
182 0
|
4月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
77 2
|
5月前
|
消息中间件 新零售 弹性计算
云消息队列 RabbitMQ 版入门训练营,解锁对比开源优势与零基础实战
欢迎加入「云消息队列 RabbitMQ 版入门训练营」。
171 15
|
7月前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
99 0
|
7月前
|
消息中间件 存储 Apache
RocketMQ实战教程之常见概念和模型
Apache RocketMQ 实战教程介绍了其核心概念和模型。消息是基本的数据传输单元,主题是消息的分类容器,支持字节、数字和短划线命名,最长64个字符。消息类型包括普通、顺序、事务和定时/延时消息。消息队列是实际存储和传输消息的容器,是主题的分区。消费者分组是一组行为一致的消费者的逻辑集合,也有命名限制。此外,文档还提到了一些使用约束和建议,如主题和消费者组名的命名规则,消息大小限制,请求超时时间等。RocketMQ 提供了多种消息模型,包括发布/订阅模型,有助于理解和优化消息处理。
|
7月前
|
消息中间件 存储 Java
RocketMQ实战教程之NameServer与BrokerServer
这是一个关于RocketMQ实战教程的概要,主要讨论NameServer和BrokerServer的角色。NameServer负责管理所有BrokerServer,而BrokerServer存储和传输消息。生产者和消费者通过NameServer找到合适的Broker进行交互,不需要直接知道Broker的具体信息。工作流程包括生产者向NameServer查询后发送消息到Broker,以及消费者同样通过NameServer获取消息进行消费。这种设计类似于服务注册中心的概念,便于系统扩展和集群管理。