消息中间件系列四、认识AMQP和RabbiyMq的简单使用

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
性能测试 PTS,5000VUM额度
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: AMQP AMQP(advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制 。

前言:这是中间件一个系列的文章之一,有需要的朋友可以看看这个系列的其他文章:
消息中间件系列一、消息中间件的基本了解
消息中间件系列二、Windows下的activeMQ和rabbitMQ的安装
消息中间件系列三、JMS和activeMQ的简单使用
消息中间件系列四、认识AMQP和RabbiyMq的简单使用
消息中间件系列五、rabbit消息的确认机制
消息中间件系列六,rabbit与spring集成实战

AMQP

AMQP(advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制 。

一、AMQP的一些技术术语

  • AMQP模型(AMQP Model):一个由关键实体和语义表示的逻辑框架,遵从AMQP规范的服务器必须提供这些 实体和语义。为了实现本规范中定义的语义,客户端可以发送命令来控制AMQP服务器。
  • 连接(Connection):一个网络连接,比如TCP/IP套接字连接。
  • 会话(Session):端点之间的命名对话。在一个会话上下文中,保证“恰好传递一次”。
  • 信道(Channel):多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。虚拟的连接,建立在真实的tcp连接之上的。信道的创建没有限制的。
  • 交换器(Exchange):服务器中的实体,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  • 消息队列(Message Queue):一个命名实体,用来保存消息直到发送给消费者。
  • 绑定器(Binding):消息队列和交换器之间的关联。
  • 绑定器关键字(Binding Key):绑定的名称。一些交换器类型可能使用这个名称作为定义绑定器路由行为的模式。
  • 路由关键字(路由键)(Routing Key):一个消息头,交换器可以用这个消息头决定如何路由某条消息。
  • 持久存储(Durable):一种服务器资源,当服务器重启时,保存的消息数据不会丢失。
  • 临时存储(Transient):一种服务器资源,当服务器重启时,保存的消息数据会丢失。
  • 持久化(Persistent):服务器将消息保存在可靠磁盘存储中,当服务器重启时,消息不会丢失。
  • 非持久化(Non-Persistent):服务器将消息保存在内存中,当服务器重启时,消息可能丢失。
  • 消费者(Consumer):一个从消息队列中请求消息的客户端应用程序。
  • 生产者(Producer):一个向交换器发布消息的客户端应用程序。
    消息处理过程:

队列通过路由键绑定到交换器,生产者把消息发送到了交换器,交换器根据绑定的路由键将消息路由到特定的队列,订阅了队列的消费者进行接收。

_1

说明(下文会证明这三点)

  • 如果消息达到无人订阅的队列会一直在队列中等待,rabbitmq会默认队列是无限长度的。
  • 如果多个消费者订阅到同一队列,消息会轮询的方式发送给消费者,每个消息只会发送给一个消费者
  • 消息路由到了不存在的队列,消息会忽略,当消息不存在,消息丢失了。

二、创建自定义队列

  生产者和消费者都可以调用declareQueue方法创建队列,当没有目标队列时才会创建,如果已经存在相同的队列了就不在重复创建。
  相关参数:exclusive 队列为应用程序私有,auto-delete 最后一个消费者取消订阅时,队列会自动删除,durable 队列持久化。
  如果消费者订阅了队列,就不能再声明队列了。
先贴上后面会说到的一段代码,演示是怎么创建队列的

        Channel channel = connection.createChannel();//信道
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器
        String queueName = "direct_queue";
        //创建队列,参数分别为:队列名,durable是否进行持久化,exlusive是否私有,auto-delete没有消费者是自动删除,arguments相关参数
        channel.queueDeclare(queueName,false,false,false,null);

检测队列是否存在

可以通过调用方法QueueDeclarePassive判断队列是否存在,看方法名是“消极的声明创建”的意思,事实上它没有去声明队列,所谓消极,去看看有没有名为xxx的queue,如果有我就把名字什么的信息告诉你,没有就直接报错,用来确认queue是否存在。

三、四种交换器:

1、direct:

路由键完全匹配时,消息投放到对应队列。Amqp实现都必须有一个direct交换器(默认交换器),名称为空白字符。队列不声明交换器,会自动绑定到默认交换器,队列的名称作为路由键。
补充:一个direct队列可以绑定多个路由键,一条消息可以发给多个direct队列(都有绑定相同的路由键),一个direct队列(有多个路由键)也可以接收不同类型的消息。

_2

2、Fanout:

可以理解为广播,绑定这中交换器的队列,可以接收该交换器上任何类型的消息。

_3

3、Topic:

主题,使来自不同源头的消息到达同一个队列

_4

路由键中的“*”和“#”

“.”会把路由键分为好几个标识符,“*”匹配一个标识符,“#”匹配一个或者多个;
例如:xxx.yyy.zzzz 可以: xxx.*. zzzz , xxx.# , #.zzzz

4、Headers:

匹配消息头,其余与direct一样,实用性不大

举例说明

日志处理场景:
1、有交换器(topic)log_exchange,日志级别有 error,info,warning,应用模块有 user,order,email,路由键的规则是 日志级别+“.”+应用模块名(例如info.user)
2、发送邮件失败,报告一个email的error,basicPublic(message,"log-exchange","error.email")
队列的绑定:queueBind("email-error-queue","log-exchange","error.email")
要监听email所有的日志怎么办?
queueBind("email-log-queue","log-exchange"," *.email")
监听所有模块所有级别日志?
queuebind(“all-log-queue”,"log-exchange","#")
“.”会把路由键分为好几个标识符,“*”匹配一个标识符,“#”匹配一个或者多个(xxx.yyy.zzzz 可以: xxx.*. zzzz , xxx.# , #.zzzz)。

补充:生产者要把消息发送到消费者,两者必须绑定同一个的交换器。

四、虚拟主机

Vhost,真实rabbitmq服务器上的mini型虚拟的mq服务器。有自己的权限机制。Vhost提供了一个逻辑上的分离,可以区分客户端,避免队列和交换器的名称冲突。RabbitMq包含了一个缺省的vhost :“/”,用户名guest,口令 guest(guest用户只能在本机访问)。

五、消息持久化

1、队列是必须持久化
2、交换器也必须是持久化
3、消息的投递模式必须(int型) 2
以上条件全部满足,消息才能持久化
问题:持久化会带来性能的严重下降(下降10倍)

六、Rabbit简要架构介绍

_5

消息队列的使用过程大概如下:
(1)客户端连接到消息队列服务器,打开一个channel(信道)。(生产者和消费者都可操作)
(2)客户端声明一个exchange(交换器),并设置相关属性。(生产者和消费者都可操作)
(3)客户端声明一个queue(队列),并设置相关属性。(一般在消费者中操作)
(4)客户端使用routing key(路由键),在exchange(交换器)和queue(队列)之间建立好绑定关系。(一般在消费者中操作)
(5)客户端投递消息到exchange(交换器)。(一般在生产者中进行)
客户端先给指定交换器(exchange)发送消息,交换器再根据路由键把消息发送给相应的队列,而订阅了相应队列的客户端就能收到消息。

七、使用RabbitMq原生Java客户端进行消息通信

1、添加依赖

客户端Jar包和源码包下载地址:
http://repo1.maven.org/maven2/com/rabbitmq/amqp-client/5.0.0/amqp-client-5.0.0.jar
http://repo1.maven.org/maven2/com/rabbitmq/amqp-client/5.0.0/amqp-client-5.0.0-sources.jar
如果是引入jar包的形式还需要引入slf4j-api-1.6.1.jar。

如果是Maven工程加入:

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.0.0</version>
</dependency>

注意:5系列的版本最好使用JDK8及以上, 低于JDK8可以使用4.x(具体的版本号到Maven的中央仓库查)的版本。

2、生产者通过DIRECT类型的交换器发布消息

AMQP一样要连接工厂、连接,与JMS中间件不同的是,AMQP多了信道、交换器、路由键等这些概念。处理过程看下面代码:

package dongnaoedu.normal;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DirectProducer {

    private final static String EXCHANGE_NAME = "direct_logs";//交换器

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        //gust用户只能在本机访问
        //非本机访问需要设置以下属性
/*        factory.setUsername(..);
        factory.setPort();
        factory.setVirtualHost();*/
        Connection connection = factory.newConnection();//连接

        Channel channel = connection.createChannel();//信道

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//定义连接交换器

        String[]serverities = {"error","info","warning"};//路由键

        for(int i=0;i<3;i++){
            String server = serverities[i];
            String message = "Hello world_"+(i+1);
            //basicProperties是设置一些消息属性的,不需要可以传null
            //通过信道把消息、路由键传给交换器,交换器会根据路由键把消息传给相应的队列
            channel.basicPublish(EXCHANGE_NAME,server,null,message.getBytes());
            System.out.println("Sent "+server+":"+message);
        }
        channel.close();
        connection.close();
    }
}

3、生产者通过FANOUT类型的交换器发布消息

FANOUT和DIRECT在原生代码的实现方式上基本一样,只有在调用channel的exchangeDeclare方法声明交换器时把交换器类型改成FANOUT即可。

package dongnaoedu.normal;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class FanoutProducer {
    private final static String EXCHANGE_NAME = "fanout_logs";//交换器
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
/*        factory.setUsername(..);
        factory.setPort();
        factory.setVirtualHost();*/
        Connection connection = factory.newConnection();//连接
        Channel channel = connection.createChannel();//信道
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//交换器
        String[]serverities = {"error","info","warning"};//路由键
        for(int i=0;i<3;i++){
            String server = serverities[i];
            String message = "Hello world_"+(i+1);
            channel.basicPublish(EXCHANGE_NAME,server,null,message.getBytes());
            System.out.println("Sent "+server+":"+message);
        }
        channel.close();
        connection.close();
    }
}

4、指定路由键的消费者

package dongnaoedu.normal;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerError {

    private static final String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] argv) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        Connection connection = factory.newConnection();//连接
        Channel channel = connection.createChannel();//信道
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器
        //声明随机队列
        String queueName = channel.queueDeclare().getQueue();
        String server = "error";
        //队列和交换器的绑定
        channel.queueBind(queueName,EXCHANGE_NAME,server);//把队列按路由键绑定到交换器上
        System.out.println("Waiting message.......");

        Consumer consumerB = new DefaultConsumer(channel){
                //消息的回调监听
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
            }
        };
        //三个参数:队列名,是否自动确认,消息监听回调
        channel.basicConsume(queueName,true,consumerB);//对消息进行消费
    }
}

补充:这里涉及到rabbit的确认机制,相关内容我会在下一篇博客消息中间件系列五、rabbit消息的确认机制再做详细介绍。

5、绑定多个路由键的消费者

package dongnaoedu.normal;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerAll {
    private static final String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] argv) throws IOException,
            InterruptedException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        Connection connection = factory.newConnection();//连接
        Channel channel = connection.createChannel();//信道
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器
        //声明随机队列
        String queueName = channel.queueDeclare().getQueue();
        String[]serverities = {"error","info","warning"};
        for(String server:serverities){
            //队列和交换器的绑定
            channel.queueBind(queueName,EXCHANGE_NAME,server);//server是路由建,一个队列可以绑定多个路由建
        }
        System.out.println("Waiting message.......");

        Consumer consumerA = new DefaultConsumer(channel){
            //消息的回调监听
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);//getRoutingKey拿到路由建
            }
        };
        //三个参数:队列名,是否自动确认,消息监听回调
        channel.basicConsume(queueName,true,consumerA);//对消息进行消费
    }
}

启动消费者之后,两个消费者的打印信息都是一样的,会一直监听消息,当生产者发了消息的时候消费者会调用handleDelivery监听方法。

image

此时有两个消费者,连接,通道,交换器队列的信息如下

image
image
image
image

可以看到产生了两个队列,所以这两个消费者接收消息是互不影响的。

再启动生产者DirectProducer
image
这个简单的生产者执行完了就退出了,
ConsumerError通过路由键的匹配只能收到error类的消息
image
ConsumerAll收到了全部的消息:
image

6、自己创建队列,不同消费者订阅相同的队列

在上面例子的基础上进行改动,
调用通道的queueDeclare方法即可创建指定名字的队列,相关参数在目录二解释过了

package dongnaoedu.normal;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerAll {
    private static final String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] argv) throws IOException,
            InterruptedException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        Connection connection = factory.newConnection();//连接
        Channel channel = connection.createChannel();//信道
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器
        //声明随机队列
//        String queueName = channel.queueDeclare().getQueue();
        String queueName = "direct_queue";
        //创建队列,参数分别为:队列名,durable是否进行持久化,exlusive是否私有,auto-delete没有消费者是自动删除,arguments相关参数
        channel.queueDeclare(queueName,false,false,false,null);
        String[]serverities = {"error","info","warning"};
        for(String server:serverities){
            //队列和交换器的绑定
            channel.queueBind(queueName,EXCHANGE_NAME,server);//server是路由建,一个队列可以绑定多个路由建
        }
        System.out.println("Waiting message.......");

        Consumer consumerA = new DefaultConsumer(channel){
            //消息的回调监听
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);//getRoutingKey拿到路由建
            }
        };
        //三个参数:队列,自动确认,消息监听回调
        channel.basicConsume(queueName,true,consumerA);//对消息进行消费
    }
}
package dongnaoedu.normal;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerError {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        Connection connection = factory.newConnection();//连接
        Channel channel = connection.createChannel();//信道
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器
//        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//交换器
        //声明随机队列
//        String queueName = channel.queueDeclare().getQueue();
        String queueName = "direct_queue";
 //       direct_queue这个队列已经在ConsumerAll 绑定了三个路由键,这里不用再绑定了,也不用重复创建了
        //参数分别为:队列名,durable是否进行持久化,exlusive是否私有,auto-delete没有消费者是自动删除,arguments相关参数
//        channel.queueDeclare(queueName,false,false,false,null);
 //       String server = "error";
//        channel.queueBind(queueName,EXCHANGE_NAME,server);//把队列按路由键绑定到交换器上
        
        System.out.println("Waiting message.......");

        Consumer consumerB = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
            }
        };
        channel.basicConsume(queueName,true,consumerB);
    }
}

两个类运行后的队列:
image

可以看到只有一个队列,两个消费者都是订阅这个队列。

生产者DirectProducer不用改变,运行DirectProducer后消费者接收信息的打印结果:

image
image

可以看到消息是轮询的方式发送给消费者,这时再把消费者关掉,可以看到队列已经没有消息了,由于创建队列的时候auto-delete设置为false,这时队列并没有删除。

image

再一次运行生产者,可以看到队列收到了3条新消息,没有被消费者消费。

image
在生产者这边,添加一个没有队列绑定的路由键,发现在direct交换器下队列和消费者和都没有受到任何影响。

        String[]serverities = {"error","info","warning","test"};//路由键
        for(int i=0;i<4;i++){
            String server = serverities[i];
            String message = "Hello world_"+(i+1);
            //basicProperties是设置一些消息属性的,不需要可以传null
            //通过信道把消息、路由键传给交换器,交换器会根据路由键把消息传给相应的队列
            channel.basicPublish(EXCHANGE_NAME,server,null,message.getBytes());
            System.out.println("Sent "+server+":"+message);
        }

这证实了上文说到的:

  • 如果多个消费者订阅到同一队列,消息会轮询的方式发送给消费者,每个消息只会发送给一个消费者。
  • 如果消息达到无人订阅的队列会一直在队列中等待,rabbitmq会默认队列是无限长度的。
  • 消息路由到了不存在的队列,消息会忽略,当消息不存在,消息丢失了。
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 存储 网络协议
MQ(消息中间件)概述及 RabbitMQ 的基本介绍
MQ(消息中间件)概述及 RabbitMQ 的基本介绍
417 0
|
消息中间件 存储 监控
【图解RabbitMQ-3】消息队列RabbitMQ介绍及核心流程
【图解RabbitMQ-3】消息队列RabbitMQ介绍及核心流程
408 0
|
5月前
|
消息中间件 存储 Java
后端开发Spring框架之消息介绍 同步异步 JMS AMQP MQTT Kafka介绍
后端开发Spring框架之消息介绍 同步异步 JMS AMQP MQTT Kafka介绍
41 0
|
消息中间件 存储 Java
消息中间件系列教程(09) -RabbitMQ -案例代码(发布订阅模式)
消息中间件系列教程(09) -RabbitMQ -案例代码(发布订阅模式)
63 0
|
消息中间件 Linux 虚拟化
消息中间件系列教程(04) -RabbitMQ -简介&安装
消息中间件系列教程(04) -RabbitMQ -简介&安装
69 0
|
6月前
|
消息中间件 存储 中间件
【SpringCloud Stream消息驱动、设计思想以及整合rabbitmq消息队列案例--学习笔记】
【SpringCloud Stream消息驱动、设计思想以及整合rabbitmq消息队列案例--学习笔记】
283 0
|
6月前
|
消息中间件 存储 Java
RabbitMQ是如何实现消息传递的?
RabbitMQ是如何实现消息传递的?
116 0
|
消息中间件 网络协议 Java
RabbitMQ消息中间件学习3:快速入门案例
rabbitmq是spring一个公司的,所以很多公司 企业选择用rabbitmq。
RabbitMQ消息中间件学习3:快速入门案例
|
消息中间件 Linux
消息队列:第五章:RabbitMQ的使用
消息队列:第五章:RabbitMQ的使用
178 0
消息队列:第五章:RabbitMQ的使用
|
消息中间件 安全 中间件
消息中间件学习笔记--RabbitMQ(二、模式)
消息中间件学习笔记--RabbitMQ(二、模式)
108 7
消息中间件学习笔记--RabbitMQ(二、模式)
下一篇
无影云桌面