一、概念
- 异步消息简介
与远程调用机制以及REST接口类似,异步消息也是用于应用程序之间通信的。
RMI、Hessian、Burlap、HTTP invoker和Web服务在应用程序之间的通信机制是同步的,即客户端应用程序直接与远程服务相交互,并且一直等到远程过程完成后才继续执行。而消息是异步发送的,客户端不需要等待服务处理消息,甚至不需要等待消息投递完成。客户端发送消息,然后继续执行,这是因为客户端假定服务最终可以收到并处理这条消息。
- 优缺点
优点:
-
- 异步通信。客户端无需等待服务端的响应,节省时间,提升客户端的效率。
- 面向消息与解耦。客户端不需要与特定的方法签名绑定,任何可以处理数据的队列或主题订阅者都可以处理由客户端发送的消息,而客户端不必了解远程服务的任何规范。
- 位置独立。由于客户端并不直接与服务端通信,而是把消息交由消息代理。因此,只要服务能够从队列或主题中获取消息即可,消息客户端根本不需要关注服务来自哪里。而且可以使用服务器集群监听同一个消息代理提升服务器负载。
缺点:
-
- 增加复杂度。毫无疑问,消息代理这个东西是多出来的,需要维护成本。
- 暂时的不一致性。异步消息方式可以确保最终的一致性,但是可能存在客户端把消息给了消息队列,而服务端暂时还没处理这个队列导致的暂时不一致性问题。
- 应用场景
-
- 客户端并不需要服务端的反馈,诸如此类的非核心流程异步化处理。
- 流量削峰。比如很多的秒杀场景,用户的请求,服务器接收后,首先写入消息队列,接着再根据业务做后续处理。
- 日志处理。将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。
- 消息通讯。消息队列一般都内置了高效的通信机制,因此也可以用于单纯的消息通讯,比如实现点对点消息队列或者聊天室等。
- 消息模型
点对点消息模型
在点对点模型中,每一条消息都有一个发送者和一个接收者,如图17.3所示。当消息代理得到消息时,它将消息放入一个队列中。当接收者请求队列中的下一条消息时,消息会从队列中取出,并投递给接收者。因为消息投递后会从队列中删除,这样就可以保证消息只能投递给一个接收者。
发布-订阅消息模型
在发布—订阅消息模型中,消息会发送给一个主题。与队列类似,多个接收者都可以监听一个主题。但是,与队列不同的是,消息不再是只投递给一个接收者,而是主题的所有订阅者都会接收到此消息的副本,如图17.4所示。
二、集成实现JMS
Java消息服务(Java Message Service ,JMS)是一个Java标准,定义了使用消息代理的通用API。借助JMS,所有遵从规范的实现都使用通用的接口,这就类似于JDBC为数据库操作提供了通用的接口一样。
Spring通过基于模板的抽象为JMS功能提供了支持,这个模板也就是JmsTemplate。使用JmsTemplate,能够非常容易地在消息生产方发送队列和主题消息,在消费消息的那一方,也能够非常容易地接收这些消息。Spring还提供了消息驱动POJO的理念:这是一个简单的Java对象,它能够以异步的方式响应队列或主题上到达的消息。
接下来让我们来看看在Spring中如何集成实现JMS:
-
搭建消息代理
我们首先需要一个消息代理,作为客户端和服务端通信的中介。ActiveMQ是一个伟大的开源消息代理产品,也是使用JMS进行异步消息传递的最佳选择。下载地址:http://activemq.apache.org/ ,下载完成后解压缩到本地硬盘,在bin目录下,我们可以看到为各种操作系统所创建的对应子目录。在这些子目录下,我们可以找到用于启动ActiveMQ的脚本。
启动好ActiveMQ后,添加如下的 pom 依赖:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.3</version> </dependency>
-
建立连接工厂、消息目的地
连接工厂:
<!--1、ActiveMQ 工厂 2、amq命名空间方式 3、默认监听端口61616 4、默认用户名:admin 密码:admin --> <amq:connectionFactory id="connectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin"/>
消息目的地:
消息目的地又分为 队列 和 主题 两种:
<!--1、定义消息目的地,可以是队列或者主题两种方式 2、借助physicalName属性指定消息通道的名称--> <amq:queue id="queueDestination" physicalName="queueName"/> <amq:topic id="topicDestination" physicalName="topicName"/>
-
使用 JmsTemplate
为了消除冗余和重复的JMS代码,Spring 给出的解决方案就是JmsTemplate。JmsTemplate可以创建连接、获得会话以及发送和接收消息。这使得我们可以专注于构建要发送的消息或者处理接收到的消息。另外,JmsTemplate可以处理所有抛出的笨拙的JMSException异常。
<!--1、jmsTemplate 2、defaultDestination 定义了默认的消息目的地 3、messageConverter 消息转换器 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" p:connectionFactory-ref="connectionFactory" p:defaultDestination-ref="queueDestination" p:messageConverter-ref="messageConverter"/> <!--MessageConvert--> <bean id="messageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
JmsTemplate 可以非常的简单的实现消息的发送和接收功能,让我们来看看吧!
发送消息(convertAndSend):
@Autowired private JmsOperations jmsOperations; /** * jmsOperations.convertAndSend() 方法,"queueName" 不填写,用默认的 Destination */ @Test public void convertAndSend(){ Map<String ,Object> map = new HashMap<>(16); map.put("java", "java"); map.put("python", "python"); map.put("c++", "c++"); jmsOperations.convertAndSend("queueName", map); }
接收消息(receiveAndConvert):
@Autowired private JmsOperations jmsOperations; /** * jmsOperations 的 receiveAndConvert() 方法 */ @Test public void receiveAndConvert(){ Map<String, Object> map = (Map) jmsOperations.receiveAndConvert("queueName"); }
这里有几点需要说明一下;
1、除了 convertAndSend() 和 receiveAndConvert() 方法,JmsTempalte 还支持 send() 和 receive() 方法来发送和接收消息,就是写起来麻烦点,还要自己处理 JMSException。可参考我的源码~
2、convertAndSend() 和 receiveAndConvert() 方法 如果不指定 消息通道名称,即上面的 "queueName"。采用JmsTemplate 默认设置的,即 defaultDestination 关联的消息目的地中的消息通道。
3、convertAndSend() 和 receiveAndConvert() 方法 能便捷的实现 发送和接收消息功能,原因是 消息转换器 !发送时,JmsTemplate 先把消息内容转换成对应Message;接收时,JmsTemplate 再把对应Message 转换回消息内容。JmsTemplate 定义了多个消息转换器。如上,我用了 SimpleMessageConverter 转换器,也就是 JmsTemplate 中默认使用的转换器(不设置用的就是这个转换器)。如果需要,还可自定义转换器呢!
-
创建消息监听器
使用JmsTemplate接收消息的最大缺点在于receive()和receiveAndConvert()方法都是同步的。这意味着接收者必须耐心等待消息的到来,因此这些方法会一直被阻塞,直到有可用消息(或者直到超时)。同步接收异步发送的消息,是不是感觉很怪异?
如果一发送消息就能被对应的方法处理,岂不美哉?
<jms:listener-container connection-factory="connectionFactory"> <jms:listener destination="queueName" ref="queueMessageHandler" method="handle"/> <jms:listener destination="topicName" ref="topicMessageHandler" method="handle"/> </jms:listener-container>
在这里,我们在消息监听器容器中包含了消息监听器。消息监听器容器(message listener container)是一个特殊的bean,它可以监控JMS目的地并等待消息到达。一旦有消息到达,它取出消息,然后把消息传给任意一个对此消息感兴趣的消息监听器。注意!关键词 任意一个 !说明即使多个消息监听器监听同一个消息通道,仍然只会有一个消息监听器执行!!另外,destination 指的是消息通道的名称,并不是JMS目的地的 id 。ref 连接的是 Spring 的 bean 。methon 指的是这个bean中处理这个 消息的方法,需要注意的是 这个方法的形参!如果放入消息通道的数据类型是 字符串的话,那这个方法的形参也要用字符串接收;如果放入消息通道的数据类型是 集合的话,那这个方法的形参也要用对应集合类型接收。
三、使用基于消息的RPC
为了支持基于消息的RPC,Spring提供了JmsInvokerServiceExporter,它可以把bean导出为基于消息的服务;同时,为客户端提供了JmsInvokerProxyFactoryBean来使用这些服务。
- 导出基于JMS的服务
把bean导出为基于消息的服务,利用的是Spring的 JmsInvokerServiceExporter,如下:
<bean id="jmsServer" class="org.springframework.jms.remoting.JmsInvokerServiceExporter" p:serviceInterface="org.springframework.message.activemq.rpc.JmsServer" p:service-ref="jmsServerImpl"/>
这个bean的属性描述了导出的服务应该是什么样子的。service-ref 属性设置为 jmsServerImpl 的引用,它是远程服务的实现。同时,serviceInterface 属性设置为远程服务对外提供接口的全限定类名。
JmsInvokerServiceExporter 可以充当JMS监听器来进行服务间的通信。即客户端 调用这个服务的时候,就可以立即 用这个服务的实现 来处理客户端的调用啦!因为我们监听了这个服务!如下:
<jms:listener-container connection-factory="connectionFactory"> <!--利用jms监听器导出消息服务--> <jms:listener destination="sparta" ref="jmsServer"/> </jms:listener-container>
我们为JMS监听器容器指定了连接工厂,所以它能够知道如何连接消息代理,而<jms:listener>声明指定了远程消息的目的地。
- 使用基于JMS的服务
JmsInvokerProxyFactoryBean 是一个远程代理工厂bean,代理了通过JmsInvokerServiceExporter所导出的JMS服务。它隐藏了访问远程服务的细节,并提供一个易用的接口,通过该接口客户端与远程服务进行交互。
<!--远程代理工厂 bean ,供客户端访问--> <bean id="jmsServerProxy" class="org.springframework.jms.remoting.JmsInvokerProxyFactoryBean" p:serviceInterface="org.springframework.message.activemq.rpc.JmsServer" p:connectionFactory-ref="connectionFactory" p:queueName="sparta"/>
对于serviceInterface,指定了代理应该通过 JmsServer 接口暴露功能。queueName 指定要连接的消息代理的名称。
测试类:
@Test public void test01(){ ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext.xml"); JmsServer service = (JmsServer)context.getBean("jmsServerProxy"); service.doServer("Hello Message"); }
tips:使用基于消息的RPC,只研究了用法。还没想到用在什么场景~ 各位指教?
演示代码下载:https://github.com/JMCuixy/SpringMessage
参考资料:《Spring 实战第四版》