阅读全文,约 15 分钟
这是江帅帅的第019篇原创
一、下载和安装 Erlang
1.1 安装依赖环境
先安装 GCC 等依赖环境
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
再安装 ncurses
yum -y install ncurses-devel
1.2 安装 Erlang
官网:https://www.erlang.org/downloads
或者使用 wget 下载,然后解压:
cd /usr/local/src/ wget http://erlang.org/download/otp_src_22.0.tar.gz tar -zxvf otp_src_22.0.tar.gz
开始安装:
cd otp_src_22.0 ./configure --prefix=/usr/local/soft make make install
安装位置:
whereis erlang
1.3 配置环境变量
修改 /etc/profile 配置文件
ERLANG_HOME=/usr/local/soft export PATH=$PATH:$ERLANG_HOME/bin export PATH ERLANG_HOME
执行
source /etc/profile
1.4 检验
erl
二、下载和安装 RabbitMQ
官网:http://www.rabbitmq.com/releases/rabbitmq-server/
下载
wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-generic-unix-3.6.15.tar.xz
解压
xz -d rabbitmq-server-generic-unix-3.6.15.tar.xz tar -xvf rabbitmq-server-generic-unix-3.6.15.tar
编辑 /etc/profile 修改环境变量
三、基本介绍
3.1 工作模型
Broker 是 RabbitMQ 的服务器,消息队列的两大作用:存储消息和转发给消费者。Producter 和 Consumer 工作时,都必须跟服务器建立一个 TCP 的长链接,叫 Connection。但如果生产者和消费者都直接连接服务器的话,频繁的创建和释放一个 Connection 的话,则极其消耗性能,所以引入一个【消息通道】的虚拟连接,在保持一个 TCP 长连接中,创建和释放 Channel 来优化节省系统性能。
比如,很多使用 Client API 进行编程的接口都是基于 Channel 去完成的,包括 Spring 也针对 Channel 做了方法调用的封装。
在其他的消息中间件中,Producter(生成者)发送消息会直接就到 Queue(队列),但 RabbitMQ 不会,它先经过 Exchange(交换机)。交换机主要实现消息更加灵活的分发,通过它的类型我们就知道它为什么灵活了。
交换机没有自己运行的进程,只是一个地址的列表,消息发送过来,然后根据地址列表送到对应的队列。队列有自己的进程,用来存储消息的。交换机和队列之间,需要定义一个绑定的关系。
每一个业务系统,都需要来使用 RabbitMQ 服务器,然后创建了很多 Exchange 和 Queue,资源管理就有可能很混乱。如果某业务系统,希望单独使用一台硬件服务器,不想跟其他的混在一起,那这样是不是需要额外采购一台机器呢?
其实,我们可以创建多个 Virtual Host(虚拟机)来解决,虚拟机相当于小型的 RabbitMQ 服务器,在不同的虚拟机中就可以添加多个 Exchange 来绑定 Queue 解决问题了。虚拟机可以提高我们的资源利用率,也可以提供资源隔离,类似于变成中命名空间和包的概念。
也可以创建多个用户,对应分配一些不同虚拟机的权限。用户能够绑定虚拟机,也就能绑定到其中的交换机和队列,还有它们的绑定关系。不同的用户,不同的虚拟机,就实现了我们资源的隔离。
3.2 RabbitMQ 的几种交换机
RabbitMQ 中一共有 4 种 Exchange(交换机),分别为:
- direct(默认)
- topic
- fanout
- headers(几乎不用,性能差)
1. Direct Exchange 直连交换机
从图中可以看出,很多队列需要从直连交换机中取走消息。那么,这时需要使用 binding key 绑定关键字,将它们进行绑定。在发送消息的时候,也需要指定 routing key 路由关键字,用来对应匹配绑定关键字找到对应的队列。
例如,basicPulish("DIRECT_EXCHANGE", "key1", "cuihua");
2. Topic Exchange 主题交换机
星号 * 代表一个单词,井号 # 代表零个或多个单词。
例如:basicPulish("TOPIC_EXCHANGE", "key1", "dodo.abc"); ---- dodo.*
basicPulish("TOPIC_EXCHANGE", "key1", "dodo.aaa.bbb"); ---- dodo.#
3. Fanout Exchange 广播交换机
队列不需要和交换机指定任何的绑定关键字,发送消息的时候也不需要携带任何路由关键字,当发送一条消息到广播交换机的时候,所有相关的队列都能接收到相同的消息。
3.3 Java API 编程
- RabbitMQ 官网 https://www.rabbitmq.com
- 生产者
- 消费者
- 参数详解
案例展示
1. 生产者
public class Producer { // 交换机类型 private static final String EXCHANGE_NAME = "excName"; // 路由 key private static final String ROUTING_KEY = "rkey"; // 队列名 private static final String QUEUE_NAME = "qName"; // IP 地址 private static final String IP_ADDRESS = "192.168.207.133"; // 端口号 private static final int PORT = 5672; public static void main(String[] args) throws Exception { // 1. 连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername("root"); factory.setPassword("1234"); // 2. 配置信息 // 连接对象 Connection conn = factory.newConnection(); // 信道 Channel channel = conn.createChannel(); // 交换器 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null); // 队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 绑定 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); // 3. 发送消息 String msg = "cuihua is a good girl..."; channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); // 4. 关闭资源 channel.close(); conn.close(); } }
2. 消费者
public class Consumer { // 队列名 private static final String QUEUE_NAME = "qName"; // IP 地址 private static final String IP_ADDRESS = "192.168.207.133"; // 端口号 private static final int PORT = 5672; public static void main(String[] args) throws Exception { // 指定地址和端口 Address[] addresses = new Address[]{ new Address(IP_ADDRESS, PORT) }; // 连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("root"); factory.setPassword("1234"); // 连接方式 // 创建连接方式 Connection conn = factory.newConnection(addresses); // 创建信道 final Channel channel = conn.createChannel(); // 设置客户端最多接收未被 ack 的消息个数 channel.basicQos(64); com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("msg : " + new String(body)); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e){ e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, consumer); TimeUnit.SECONDS.sleep(5); channel.close(); conn.close(); } }
3.4 进阶知识
- 怎么自动删除没人消费的消息?
- 无法路由的消息,去了哪里?
- 可以让消息优先得到消费吗?
- 如何实现延迟发送消息?
- MQ 怎么实现 RPC?
- RabbitMQ 流量控制怎么做?设置队列大小有用吗?
1. 怎么自动删除没人消费的消息?
TTL:Time To Live
a. 设置一个队列的消息
b. 指定消息的 TTL
死信队列 Dead Letter Queue
2. 无法路由的消息,去了哪里?
死信交换机 DLX:Dead Letter Exchange
哪些情况消息会变成死信?
a. 消息过期 b. Reject Nack 并且 requeue == false c. 队列达到最大长度 —— 先入队的消息会被删除
3. 可以让消息优先得到消费吗?
设置 x-max-priority 属性,指定优先数字。
4. 如果实现延迟发送消息?
1)延迟队列的插件。
2)实现的思路:TTL + DLX
5. MQ 怎么实现 RPC?
为什么要实现 RPC?我们原来在一个项目中,或者一个代码工程中,需要去调用一个其他业务模块的方法是很简单的。但现在是在不同的网络节点上、不同语言实现的系统上,还要实现与方法调用一样的方式去工作。
6. RabbitMQ 流量控制怎么做?设置队列大小有用吗?
设置 x-max-length 是没有太大作用的,是因为正常情况下,消费者是不会停止的,其中最大长度是堆积消息的长度,而不是说把消费者停掉,如果消费者区消息的速度比发消息的速度还快的话,那永远达不到最大长度。
1)服务端
没有特别明确的方式能设置只接收指定量的消息。
我们的消息要么存放在内存中,要么存放在磁盘中,或者同时存放在内存和磁盘中。流量的控制,需要从硬件资源的角度去控制,有一个 Flow Control 策略,从内存使用率控制来考虑。比如,当前机器内存是 2G 的话,当内存的可用空间达到 0.4 限度的时候,就不再接收消息,所有消息生产者就会被阻塞。
或者是,我们磁盘的可用空间用到只剩 1G 的时候,就不再接收消息了。
2)消费端
在官网的 RabbitMQ Tutorials 第二个 Work queues 中可以看到,两个消费者去轮询接收消息。但这种平均分发的方式,不一定会公平,平均 ≠ 不公平。是因为,第一个消费者收到是奇数消息,第二个消费者能够收到是偶数消息。那如果我们每个奇数消息都需要很长时间来处理时,第一个消费者第一个消息都还没处理完的时候,第二个消费者有可能已经处理了好多个消息。
可以针对我们每个消息通道或者消费者,去设置 prefetch count 预取数量,对应的方法是 channel.basicQos()。
如果设置 channel.basicQos(1),也就是说,当前还有 1 条消息在处理,还没有给队列发送应答的时候,这个队列就不会再发送消息了。