精通 RabbitMQ 系列 02

简介: 精通 RabbitMQ 系列 02

阅读全文,约 15 分钟

这是江帅帅的第019篇原创

一、下载和安装 Erlang

1.1 安装依赖环境

先安装  GCC 等依赖环境

yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel

再安装 ncurses

yum -y install ncurses-devel

1.2 安装 Erlang

官网:https://www.erlang.org/downloads

或者使用 wget 下载,然后解压:

cd /usr/local/src/
wget http://erlang.org/download/otp_src_22.0.tar.gz
 
tar -zxvf otp_src_22.0.tar.gz

开始安装:

cd otp_src_22.0
./configure --prefix=/usr/local/soft
make
make install

安装位置:

whereis erlang

1.3 配置环境变量

修改 /etc/profile 配置文件

ERLANG_HOME=/usr/local/soft
export PATH=$PATH:$ERLANG_HOME/bin
export PATH ERLANG_HOME

执行

source /etc/profile

1.4 检验

erl

二、下载和安装 RabbitMQ

官网:http://www.rabbitmq.com/releases/rabbitmq-server/

下载

wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-generic-unix-3.6.15.tar.xz

解压

xz -d rabbitmq-server-generic-unix-3.6.15.tar.xz
 
tar -xvf rabbitmq-server-generic-unix-3.6.15.tar

编辑 /etc/profile 修改环境变量

三、基本介绍

3.1 工作模型

Broker 是 RabbitMQ 的服务器,消息队列的两大作用:存储消息和转发给消费者。Producter 和 Consumer 工作时,都必须跟服务器建立一个 TCP 的长链接,叫 Connection。但如果生产者和消费者都直接连接服务器的话,频繁的创建和释放一个 Connection 的话,则极其消耗性能,所以引入一个【消息通道】的虚拟连接,在保持一个 TCP 长连接中,创建和释放 Channel 来优化节省系统性能。

比如,很多使用 Client API 进行编程的接口都是基于 Channel 去完成的,包括 Spring 也针对 Channel 做了方法调用的封装。

在其他的消息中间件中,Producter(生成者)发送消息会直接就到 Queue(队列),但 RabbitMQ 不会,它先经过 Exchange(交换机)。交换机主要实现消息更加灵活的分发,通过它的类型我们就知道它为什么灵活了。

交换机没有自己运行的进程,只是一个地址的列表,消息发送过来,然后根据地址列表送到对应的队列。队列有自己的进程,用来存储消息的。交换机和队列之间,需要定义一个绑定的关系。

每一个业务系统,都需要来使用 RabbitMQ 服务器,然后创建了很多 Exchange 和 Queue,资源管理就有可能很混乱。如果某业务系统,希望单独使用一台硬件服务器,不想跟其他的混在一起,那这样是不是需要额外采购一台机器呢?

其实,我们可以创建多个 Virtual Host(虚拟机)来解决,虚拟机相当于小型的 RabbitMQ 服务器,在不同的虚拟机中就可以添加多个 Exchange 来绑定 Queue 解决问题了。虚拟机可以提高我们的资源利用率,也可以提供资源隔离,类似于变成中命名空间和包的概念。

也可以创建多个用户,对应分配一些不同虚拟机的权限。用户能够绑定虚拟机,也就能绑定到其中的交换机和队列,还有它们的绑定关系。不同的用户,不同的虚拟机,就实现了我们资源的隔离。

3.2 RabbitMQ 的几种交换机

RabbitMQ 中一共有 4 种 Exchange(交换机),分别为:

  1. direct(默认)
  2. topic
  3. fanout
  4. headers(几乎不用,性能差)
1. Direct Exchange 直连交换机

从图中可以看出,很多队列需要从直连交换机中取走消息。那么,这时需要使用 binding key 绑定关键字,将它们进行绑定。在发送消息的时候,也需要指定 routing key 路由关键字,用来对应匹配绑定关键字找到对应的队列。

例如,basicPulish("DIRECT_EXCHANGE", "key1", "cuihua");

2. Topic Exchange 主题交换机

星号 * 代表一个单词,井号 # 代表零个或多个单词。

例如:basicPulish("TOPIC_EXCHANGE", "key1", "dodo.abc");  ---- dodo.*

basicPulish("TOPIC_EXCHANGE", "key1", "dodo.aaa.bbb");  ----  dodo.#

3. Fanout Exchange 广播交换机


队列不需要和交换机指定任何的绑定关键字,发送消息的时候也不需要携带任何路由关键字,当发送一条消息到广播交换机的时候,所有相关的队列都能接收到相同的消息。

3.3 Java API 编程

  1. RabbitMQ 官网  https://www.rabbitmq.com
  2. 生产者
  3. 消费者
  4. 参数详解

案例展示

1. 生产者
public class Producer {
 
    // 交换机类型
    private static final String EXCHANGE_NAME = "excName";
    // 路由 key
    private static final String ROUTING_KEY = "rkey";
    // 队列名
    private static final String QUEUE_NAME = "qName";
    // IP 地址
    private static final String IP_ADDRESS = "192.168.207.133";
    // 端口号
    private static final int PORT = 5672;
 
    public static void main(String[] args) throws Exception {
 
        // 1. 连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername("root");
        factory.setPassword("1234");
 
        // 2. 配置信息
        // 连接对象
        Connection conn = factory.newConnection();
        // 信道
        Channel channel = conn.createChannel();
        // 交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
        // 队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 绑定
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
 
        // 3. 发送消息
        String msg = "cuihua is a good girl...";
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
 
        // 4. 关闭资源
        channel.close();
        conn.close();
    }
}
2. 消费者
public class Consumer {
 
    // 队列名
    private static final String QUEUE_NAME = "qName";
    // IP 地址
    private static final String IP_ADDRESS = "192.168.207.133";
    // 端口号
    private static final int PORT = 5672;
 
    public static void main(String[] args) throws Exception {
        // 指定地址和端口
        Address[] addresses = new Address[]{
                new Address(IP_ADDRESS, PORT)
        };
 
        // 连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("root");
        factory.setPassword("1234");
 
        // 连接方式
        // 创建连接方式
        Connection conn = factory.newConnection(addresses);
        // 创建信道
        final Channel channel = conn.createChannel();
        // 设置客户端最多接收未被 ack 的消息个数
        channel.basicQos(64);
 
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 
                System.out.println("msg : " + new String(body));
 
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e){
                    e.printStackTrace();
                }
 
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
 
        channel.basicConsume(QUEUE_NAME, consumer);
 
        TimeUnit.SECONDS.sleep(5);
        channel.close();
        conn.close();
    }
}

3.4 进阶知识

  • 怎么自动删除没人消费的消息?
  • 无法路由的消息,去了哪里?
  • 可以让消息优先得到消费吗?
  • 如何实现延迟发送消息?
  • MQ 怎么实现 RPC?
  • RabbitMQ 流量控制怎么做?设置队列大小有用吗?
1. 怎么自动删除没人消费的消息?

TTL:Time To Live

a. 设置一个队列的消息

b. 指定消息的 TTL

死信队列 Dead Letter Queue

2. 无法路由的消息,去了哪里?

死信交换机 DLX:Dead Letter Exchange

哪些情况消息会变成死信?

a. 消息过期 
b. Reject Nack 并且 requeue == false
c. 队列达到最大长度 —— 先入队的消息会被删除     


3. 可以让消息优先得到消费吗?

设置 x-max-priority 属性,指定优先数字。

4. 如果实现延迟发送消息?

1)延迟队列的插件。

2)实现的思路:TTL + DLX

5. MQ 怎么实现 RPC?

为什么要实现 RPC?我们原来在一个项目中,或者一个代码工程中,需要去调用一个其他业务模块的方法是很简单的。但现在是在不同的网络节点上、不同语言实现的系统上,还要实现与方法调用一样的方式去工作。

6. RabbitMQ 流量控制怎么做?设置队列大小有用吗?

设置 x-max-length 是没有太大作用的,是因为正常情况下,消费者是不会停止的,其中最大长度是堆积消息的长度,而不是说把消费者停掉,如果消费者区消息的速度比发消息的速度还快的话,那永远达不到最大长度。

1)服务端

没有特别明确的方式能设置只接收指定量的消息。

我们的消息要么存放在内存中,要么存放在磁盘中,或者同时存放在内存和磁盘中。流量的控制,需要从硬件资源的角度去控制,有一个 Flow Control 策略,从内存使用率控制来考虑。比如,当前机器内存是 2G 的话,当内存的可用空间达到 0.4 限度的时候,就不再接收消息,所有消息生产者就会被阻塞。

或者是,我们磁盘的可用空间用到只剩 1G 的时候,就不再接收消息了。

2)消费端

在官网的 RabbitMQ Tutorials 第二个 Work queues 中可以看到,两个消费者去轮询接收消息。但这种平均分发的方式,不一定会公平,平均 ≠ 不公平。是因为,第一个消费者收到是奇数消息,第二个消费者能够收到是偶数消息。那如果我们每个奇数消息都需要很长时间来处理时,第一个消费者第一个消息都还没处理完的时候,第二个消费者有可能已经处理了好多个消息。

可以针对我们每个消息通道或者消费者,去设置 prefetch count 预取数量,对应的方法是 channel.basicQos()。

如果设置 channel.basicQos(1),也就是说,当前还有 1 条消息在处理,还没有给队列发送应答的时候,这个队列就不会再发送消息了。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
12天前
|
消息中间件 存储 中间件
精通 RabbitMQ 系列 01
精通 RabbitMQ 系列 01
34 0
|
3月前
|
消息中间件 存储 负载均衡
什么是RabbitMQ?
RabbitMQ是一个开源的消息代理软件,用于在分布式系统中传递消息。它实现了高级消息队列协议(AMQP),提供了一种可靠的、强大的、灵活的消息传递机制,使得不同应用程序或组件之间可以轻松地进行通信。
23 0
|
7月前
|
消息中间件 Java 关系型数据库
简单讲解RabbitMQ
简单讲解RabbitMQ
30 1
|
9月前
|
消息中间件 存储
RabbitMq
RabbitMq
67 0
|
11月前
|
消息中间件 存储 JSON
关于RabbitMQ
MQ是一种应用程序键一步通讯的技术,MQ是消息队列的缩写(Message Queue) 在MQ中,消息由一个应用程序发送到一个称为队列的中间件中,接着被中间件存储,并最终被另一个或多个消费者应用程序读取和处理; MQ组成:消息——生产者——队列——中间件——消费者!
55 0
|
11月前
|
消息中间件 Java
RabbitMQ(2)
RabbitMQ(2)
|
11月前
|
消息中间件 网络协议 Java
RabbitMQ(1)
RabbitMQ(1)
|
11月前
|
消息中间件 JSON 缓存
RabbitMQ中的SpringAMQP(下)
RabbitMQ中的SpringAMQP(下)
105 0
|
11月前
|
消息中间件 存储 缓存
RabbitMQ中的SpringAMQP(上)
RabbitMQ中的SpringAMQP(上)
114 0
|
消息中间件 存储 JSON
02 RabbitMQ之SpringAMQP
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。

相关实验场景

更多