一、AMQP 概述
AMQP(Advanced Message Queuing Protocol),高级消息队列协议。
简单回忆一下JMS的消息模型,可能会有助于理解AMQP的消息模型。在JMS中,有三个主要的参与者:消息的生产者、消息的消费者以及在生产者和消费者之间传递消息的通道(队列或主题)。在JMS中,通道有助于解耦消息的生产者和消费者,但是这两者依然会与通道相耦合。与之不同的是,AMQP的生产者并不会直接将消息发布到队列中。AMQP在消息的生产者以及传递信息的队列之间引入了一种间接的机制:Exchange。如下图:
哈哈,笔主从今天开始也要学着自己画图了。
来看看 AMQP 消息的通信过程。首先,生产者把消息发给 Exchange,并带有一个 routing key。其次,Exchange 和 队列 之间 通过 binging 通信,binging 上也有 一个 routing key,AMQP定义了四种不同类型的Exchange,每一种都有不同的路由算法,根据Exchange的算法不同,它可能会使用消息的routing key或参数,并与 binding 的routing key或参数进行对比,来决定是否要将信息放到队列中。然后,消费者从每个队列中取出消息。
Exchange 的路由算法:
- Direct:如果 消息的routing key 与 binding的routing key 直接匹配的话,消息将会路由到该队列上;
- Topic:如果 消息的routing key 与 binding的routing key 符合通配符匹配的话,消息将会路由到该队列上;
- Headers:如果 消息参数表中的头信息和值 都与 bingding参数表中 相匹配,消息将会路由到该队列上;
- Fanout:不管消息的routing key和参数表的头信息/值是什么,消息将会路由到所有队列上。
AMQP 与 JMS 的区别:
1、AMQP为消息定义了线路层(wire-level protocol)的协议,而JMS所定义的是API规范。JMS的API协议能够确保所有的实现都能通过通用的API来使用,但是并不能保证某个JMS实现所发送的消息能够被另外不同的JMS实现所使用。而AMQP的线路层协议规范了消息的格式,消息在生产者和消费者间传送的时候会遵循这个格式。这样AMQP在互相协作方面就要优于JMS——它不仅能跨不同的AMQP实现,还能跨语言和平台。
2、JMS 支持TextMessage、MapMessage 等复杂的消息类型;而AMQP 仅支持 byte[] 消息类型(复杂的类型可序列化后发送),个人认为这也是它能够跨平台和跨语言使用的原因之一。
3、由于Exchange 提供的路由算法,AMQP可以提供多样化的路由方式来传递消息到消息队列,而 JMS 仅支持 队列 和 主题/订阅 方式两种。
二、Spring 集成 RabbitMQ
RabbitMQ是一个流行的开源消息代理,它实现了AMQP。Spring AMQP为RabbitMQ提供了支持,包括RabbitMQ连接工厂、模板以及Spring配置命名空间。
首先,需要安装 RabbitMQ,我们可以在 http://www.rabbitmq.com/download.html 上找到安装指南,具体怎么安装,不是这篇博文的重点,请笔友们自行解决。
接下来,让我们一起来看看,Spring 和 RabbitMQ 的集成:
1、pom 依赖
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.0.3.RELEASE</version> </dependency>
2、连接工厂 和 admin
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.userName}" password="${rabbitmq.password}"/> <rabbit:admin connection-factory="connectionFactory"/>
admin 元素会自动创建一个RabbitMQ管理组件,它会自动创建队列、Exchange以及binding
3、声明队列、Exchange以及binding
声明队列:
<rabbit:queue name="queue1"/> <rabbit:queue name="queue2"/> <rabbit:queue name="queue3"/> <rabbit:queue name="queue4"/> <rabbit:queue name="queue5"/> <rabbit:queue name="queue6"/>
声明 Exchange 以及 binding:
direct-exchange:
<rabbit:direct-exchange name="directExchange"> <rabbit:bindings> <rabbit:binding key="queue1" queue="queue1"/> <rabbit:binding key="queue2" queue="queue2"/> <rabbit:binding key="queue3" queue="queue3"/> </rabbit:bindings> </rabbit:direct-exchange>
如果消息的routing key 与 routing key 直接匹配的话,消息将会路由到该队列上。
topic-exchange
<rabbit:topic-exchange name="topicExchange"> <rabbit:bindings> <rabbit:binding pattern="routing.*" queue="queue2"/> <rabbit:binding pattern="routing.*" queue="queue3"/> </rabbit:bindings> </rabbit:topic-exchange>
消息的 routing key 与 binding的routing key 符合通配符匹配的话,消息将会路由到该队列上。
这个通配符匹配特别坑,贼坑!我本来写了个 "routing*" ,自以为能匹配 "routingrrr" 这样的字符,不行!然后我又写了个"routing?"、"rounting.",预想着能不能匹配单个任意字符,不行!
终于我得出了一个结论,只能使用 "*"(匹配 0 个或任意多个)通配符,并且,并且!"*" 前面一定要有 个 "." ! 太可怕了,不知道我总结的对不对哈!
headers-exchange
<rabbit:headers-exchange name="headersExchange"> <rabbit:bindings> <rabbit:binding queue="queue4" key="betty" value="rubble" /> <rabbit:binding queue="queue5" key="barney" value="rubble" /> </rabbit:bindings> </rabbit:headers-exchange>
消息参数表中的头信息和值都与bingding参数表中相匹配,消息将会路由到该队列上。
这个用法比较少用,也比较难用,原因是因为它仅支持 发送 byte[] 的消息类型。
fanout-exchange
<rabbit:fanout-exchange name="fanoutExchange"> <rabbit:bindings> <rabbit:binding queue="queue5"/> <rabbit:binding queue="queue6"/> </rabbit:bindings> </rabbit:fanout-exchange>
这个是最简单粗暴的匹配规则,不管消息的routing key和参数表的头信息/值是什么,消息将会路由到所有队列上。
4、发送和接收消息
还是Spring的那一套,Spring 为我们提供了一个模板 bean(rabbitTemplate) 来发送和接收消息。其中,像前文提到的 jmsTemplate 那样,rabbitTemplate 也为我们 提供了 convertAndSend() 方法来自动转换和发送消息,提供了receiveAndConvret() 方法来接收和自动转换成对象(消息和对象之间默认的消息转换器是SimpleMessageConverter,它适用于String、Serializable实例以及字节数组)。另外,rabbitTemplate 也照常提供了 send() 和 receive() 方法来发送和接收消息,不过貌似仅支持发送字节数组...
配置 rabbitTemplate:
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="directExchange" routing-key="queue1"/>
下面仅演示 通配符路由方式 和 header 路由方式 发送和接收消息。其他具体详细的内容可参考我下面附上的源码:
通配符路由方式:
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("classpath:applicationContext.xml") public class TopicExchange { @Autowired private RabbitTemplate rabbitTemplate; @Test public void convertAndSend(){ List<String> list = new ArrayList<>(); list.add("java"); list.add("python"); list.add("c++"); rabbitTemplate.convertAndSend("topicExchange","routing.123", list); } @Test public void receiveAndConvert(){ List<String> queue2List =(List) rabbitTemplate.receiveAndConvert("queue2"); printList(queue2List); System.out.println("----------------华丽的分隔符-----------------"); List<String> queue3List =(List) rabbitTemplate.receiveAndConvert("queue3"); printList(queue3List); } private <E> void printList(List<E> list){ if (list != null && list.size() > 0){ for (Object o : list){ System.out.println("-----------------"+ o +"---------------"); } } } }
header 路由方式:
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("classpath:applicationContext.xml") public class HeadersExchangeTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void convertAndSend(){ MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("betty", "rubble"); messageProperties.setHeader("fred", "flintstone"); messageProperties.setHeader("barney", "rubble"); String str = new String("Hello RabbitMQ"); Message message = new Message(str.getBytes(), messageProperties); rabbitTemplate.convertAndSend("headersExchange","",message); } @Test public void receiveAndConvert(){ Message queue4 = rabbitTemplate.receive("queue4"); System.out.println("第一个输出:" + new String(queue4.getBody())); Message queue5 = rabbitTemplate.receive("queue5"); System.out.println("第三个输出:" + new String(queue5.getBody())); } }
5、定义消息驱动的AMQP POJO
用 receive()和 receiveAndConvert()方法都会立即返回,如果队列中没有等待的消息时,将会得到 null。Spring AMQP提供了消息驱动POJO的支持,也就是相当于一个监听器,监听某些队列,当消息到达指定队列的时候,可以立即调用方法处理该消息。
listener-container 配置:
<rabbit:listener-container connection-factory="connectionFactory" concurrency="2" prefetch="3" type="direct"> <rabbit:listener ref="handlerListener" method="handler" queue-names="queue5,queue6"/> </rabbit:listener-container>
其中,ref 指定Spring bean 的 id,method 指定 该bean中处理队列中消息的方法,queue-names 指定要监听哪些队列,队列之间用 "," 分隔。
三、结语
祝大家五一节快乐!
演示源码下载链接:https://github.com/JMCuixy/SpringMessageRabbitMQ
参考资料:1、《Spring 实战第四版》