前言:这是中间件一个系列的文章之一,有需要的朋友可以看看这个系列的其他文章:
消息中间件系列一、消息中间件的基本了解
消息中间件系列二、Windows下的activeMQ和rabbitMQ的安装
消息中间件系列三、JMS和activeMQ的简单使用
消息中间件系列四、认识AMQP和RabbiyMq的简单使用
消息中间件系列五、rabbit消息的确认机制
消息中间件系列六,rabbit与spring集成实战
一、JMS
1、什么是JMS
JMS(JAVA Message Service,java消息服务)本质是API,Java平台消息中间件的规范,java应用程序之间进行消息交换。并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
2、JMS规范中的点对点 (P2P) 模式:
P2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
P2P的特点:
- 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
- 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
- 接收者在成功接收消息之后需向队列应答成功
如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式。
3、JMS规范中的主题模式(Pub/sub发布订阅):
包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber) 多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
Pub/Sub的特点
- 每个消息可以有多个消费者
- 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息
- 为了消费消息,订阅者必须保持运行的状态,如果消息生产者发布了消息之后,订阅者没有运行,则会错过消息,当订阅者再次运行也接受不到消息了。
为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。
4、消息消费
在JMS中,消息的产生和消费都是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。
(1)同步
订阅者或接收者通过receive方法来接收消息,receive方法在接收到消息之前(或超时之前)将一直阻塞;
(2)异步
订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。
5、JMS对象模型包含如下几个要素:
1)连接工厂:创建一个JMs连接
创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。
2)JMS连接:客户端和服务器之间的一个连接。
Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。
3)JMS会话:客户和服务器会话的状态,建立在连接之上的
Session是操作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。
4)JMS目的Destination:消息队列
Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。
5)JMS生产者:消息的生成
消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。
6)JMS消费者:接收消息
消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。
7)Broker
简单来说就是消息队列服务器实体。
6、JMS规范中的消息类型
- TextMessage
- MapMessage
- ObjectMessage
- BytesMessage
- StreamMessage
二、activeMQ的简单使用
activeMQ是JMS规范中的一种消息中间件。
下面先实现一个最简单的消息提供者和消息消费者,熟悉一下JMS规范。
说明一下:原生点对点和发布订阅模式,只要在session创建队列的时候改一下即可,其他都可以不变,下面代码会有注释。
首先通过maven引入activemq包
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.8.0</version>
</dependency>
消息生成者代码
package com.dongnaoedu;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProducer {
//默认连接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认连接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认连接地址
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
//发送的消息数量
private static final int SENDNUM = 10;
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageProducer messageProducer;
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);//连接工程
try {
connection = connectionFactory.createConnection();//连接
connection.start();
/*
createSession参数取值
* 1、为true表示启用事务
* 2、消息的确认模式
* AUTO_ACKNOWLEDGE 自动签收
* CLIENT_ACKNOWLEDGE 客户端自行调用acknowledge方法签收
* DUPS_OK_ACKNOWLEDGE 不是必须签收,消费可能会重复发送
* 在第二次重新传送消息的时候,消息
头的JmsDelivered会被置为true标示当前消息已经传送过一次,
客户端需要进行消息的重复处理控制。
* */
session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);//会话
destination = session.createQueue("HelloWAM");//点对点消息队列,如果要创建发布订阅模式,是要改成session.createTopic("HelloWAM");,整个类其他地方都不用变
messageProducer = session.createProducer(destination);//消息生产者
for(int i=0;i<SENDNUM;i++){
String msg = "发送消息"+i+" "+System.currentTimeMillis();
TextMessage message = session.createTextMessage(msg);
System.out.println("发送消息:"+msg);
messageProducer.send(message);
}
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}finally {
if(connection!=null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
将发送的消息打印出来:
在可视化界面可以看到已经生成了队列,并且已经有了消息。
消息消费者代码:
package com.dongnaoedu;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsConsumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
public static void main(String[] args) {
ConnectionFactory connectionFactory;//连接工厂
Connection connection = null;//连接
Session session;//会话 接受或者发送消息的线程
Destination destination;//消息的目的地
MessageConsumer messageConsumer;//消息的消费者
//实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JmsConsumer.USERNAME,
JmsConsumer.PASSWORD, JmsConsumer.BROKEURL);
try {
//通过连接工厂获取连接
connection = connectionFactory.createConnection();
//启动连接
connection.start();
//创建session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建一个连接HelloWorld的消息队列
destination = session.createQueue("HelloWAM");//点对点消息队列,如果要创建发布订阅模式,是要改成session.createTopic("HelloWAM");,整个类其他地方都不用变
//创建消息消费者
messageConsumer = session.createConsumer(destination);
//读取消息
while(true){
TextMessage textMessage = (TextMessage)messageConsumer.receive(10000);
if(textMessage != null){
System.out.println("Accept msg : "+textMessage.getText());
}else{
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
接收到的消息:
可以看到消息已经被处理了
三、activeMQ与spring整合实战
消息生产者项目
1、用maven引入相关的包,pom文件:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dongnaoedu</groupId>
<artifactId>am_spring_producer</artifactId>
<packaging>war</packaging>
<version>1.0-SNAPSHOT</version>
<name>am_spring_producer Maven Webapp</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax</groupId>
<artifactId>javaee-web-api</artifactId>
<version>7.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>4.3.11.RELEASE</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>jstl</artifactId>
<version>1.2</version>
</dependency>
<!--日志-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.0.13</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.0.13</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-access</artifactId>
<version>1.0.13</version>
</dependency>
<!--JSON-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.7.4</version>
</dependency>
<!-- xbean -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
<dependency>
<groupId>com.thoughtworks.xstream</groupId>
<artifactId>xstream</artifactId>
<version>1.3.1</version>
</dependency>
<!--ActiveMq-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.8.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.3.11.RELEASE</version>
</dependency>
</dependencies>
<build>
<finalName>am_spring_producer</finalName>
<resources>
<resource>
<directory>${basedir}/src/main/java</directory>
<includes>
<include>**/*.xml</include>
</includes>
</resource>
</resources>
</build>
</project>
2、web.xml配置
<!DOCTYPE web-app PUBLIC
"-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
"http://java.sun.com/dtd/web-app_2_3.dtd" >
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
version="3.0">
<display-name>ActiveMQSpringProducer</display-name>
<servlet-mapping>
<servlet-name>default</servlet-name>
<url-pattern>*.js</url-pattern>
</servlet-mapping>
<!-- Spring 编码过滤器 start -->
<filter>
<filter-name>characterEncoding</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>UTF-8</param-value>
</init-param>
<init-param>
<param-name>forceEncoding</param-name>
<param-value>true</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>characterEncoding</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<!-- Spring 编码过滤器 End -->
<!-- Spring Application Context Listener Start -->
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:applicationContext.xml</param-value>
</context-param>
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
<!-- Spring Application Context Listener End -->
<!-- Spring MVC Config Start -->
<servlet>
<servlet-name>SpringMVC</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:spring-mvc.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>SpringMVC</servlet-name>
<!-- Filter all resources -->
<url-pattern>/</url-pattern>
</servlet-mapping>
<!-- Spring MVC Config End -->
</web-app>
3、spring配置文件:applicationContext.xml:
注意:与spring整合的时候要加上命名空间:
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms=http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"
JMS对象模型的连接工厂、连接、消费者、生产者等可以通过配置文件注入spring容器。
<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
<!-- 配置扫描路径 -->
<context:component-scan base-package="com.dongnaoedu">
<context:exclude-filter type="annotation"
expression="org.springframework.stereotype.Controller"/>
</context:component-scan>
<!-- ActiveMQ 连接工厂 -->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" />
<!-- Spring Caching连接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connection"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<property name="sessionCacheSize" value="100"></property>
</bean>
<!-- Spring JmsTemplate 的消息生产者 start-->
<!-- 定义JmsTemplate的Queue类型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connection"></constructor-arg>
<!-- 队列模式-->
<property name="pubSubDomain" value="false"></property>
</bean>
<!-- 定义JmsTemplate的Topic类型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connection"></constructor-arg>
<!-- true:发布订阅模式; false:队列模式-->
<property name="pubSubDomain" value="true"></property>
</bean>
<!--Spring JmsTemplate 的消息生产者 end-->
<!--接收消费者应答的监听器-->
<!--
消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。
EJB中的MDB(Message-Driven Bean)就是一种MessageListener。
-->
<jms:listener-container destination-type="queue" container-type="default"
connection-factory="connection" acknowledge="auto">
<jms:listener destination="tempqueue" ref="getResponse"></jms:listener>
</jms:listener-container>
</beans>
4、做成网页实战,需要配置springMVC;spring-MVC.xml
<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.0.xsd">
<!-- <mvc:default-servlet-handler />-->
<mvc:resources mapping="/js/**" location="/js/"/>
<mvc:annotation-driven
content-negotiation-manager="contentNegotiationManager" />
<context:component-scan base-package="com.dongnaoedu">
<context:include-filter type="annotation"
expression="org.springframework.stereotype.Controller" />
</context:component-scan>
<bean id="stringHttpMessageConverter"
class="org.springframework.http.converter.StringHttpMessageConverter">
<property name="supportedMediaTypes">
<list>
<bean class="org.springframework.http.MediaType">
<constructor-arg index="0" value="text" />
<constructor-arg index="1" value="plain" />
<constructor-arg index="2" value="UTF-8" />
</bean>
</list>
</property>
</bean>
<bean id="mappingJackson2HttpMessageConverter"
class="org.springframework.http.converter.json.MappingJackson2HttpMessageConverter" />
<bean class="org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter">
<property name="messageConverters">
<list>
<ref bean="stringHttpMessageConverter" />
<ref bean="mappingJackson2HttpMessageConverter" />
</list>
</property>
</bean>
<bean id="contentNegotiationManager"
class="org.springframework.web.accept.ContentNegotiationManagerFactoryBean">
<property name="mediaTypes">
<map>
<entry key="html" value="text/html" />
<entry key="pdf" value="application/pdf" />
<entry key="xsl" value="application/vnd.ms-excel" />
<entry key="xml" value="application/xml" />
<entry key="json" value="application/json" />
</map>
</property>
<property name="defaultContentType" value="text/html" />
</bean>
<bean id="viewResolver"
class="org.springframework.web.servlet.view.ContentNegotiatingViewResolver">
<property name="order" value="0" />
<property name="contentNegotiationManager" ref="contentNegotiationManager" />
<property name="viewResolvers">
<list>
<bean class="org.springframework.web.servlet.view.BeanNameViewResolver" />
<bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
<property name="viewClass"
value="org.springframework.web.servlet.view.JstlView" />
<property name="prefix" value="/WEB-INF/pages/" />
<property name="suffix" value=".jsp"></property>
</bean>
</list>
</property>
<property name="defaultViews">
<list>
<bean class="org.springframework.web.servlet.view.json.MappingJackson2JsonView">
<property name="extractValueFromSingleKeyModel" value="true" />
</bean>
</list>
</property>
</bean>
</beans>
5、监听接口:
package com.dongnaoedu.mq.producer.queue;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* 动脑学院-Mark老师
* 创建日期:2017/11/02
* 创建时间: 22:19
* 接收消费者应答的类
*/
@Component
public class GetResponse implements MessageListener {
public void onMessage(Message message) {
try {
String textMsg = ((TextMessage)message).getText();
System.out.println("GetResponse accept response : "+textMsg);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
6、发布模式的简单消息生产者:
package com.dongnaoedu.mq.producer.topic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
/**
*
* @author lgy
* @description Topic生产者发送消息到Topic
*
*/
@Component("topicSender")
public class TopicSender {
@Autowired
@Qualifier("jmsTopicTemplate")
private JmsTemplate jmsTemplate;
public void send(String queueName,final String message){
jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Message msg = session.createTextMessage(message);
return msg;
}
});
}
}
7、点对点模式消息生产者:
package com.dongnaoedu.mq.producer.queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
import javax.jms.*;
import java.awt.peer.SystemTrayPeer;
/**
*
* @author lgy
* @description 队列消息生产者,发送消息到队列
*
*/
@Component("queueSender")
public class QueueSender {
@Autowired
@Qualifier("jmsQueueTemplate")//在xml配置好的队列模式
private JmsTemplate jmsTemplate;
@Autowired
private GetResponse getResponse;
public void send(String queueName,final String message){
//jmsTemplate的send方法传递两个参数,第一个传队列名,第二个传消息创建接口的实现对象
jmsTemplate.send(queueName, new MessageCreator() {
//消息创建接口只有这么一个方法,框架会把session会话对象传给方法,此方法需要返回一个Message消息对象,框架会发送消息到队列
public Message createMessage(Session session) throws JMSException {
//创建会话消息
Message msg = session.createTextMessage(message);
//配置消费者应答相关内容
//获取临时的消息队列
Destination tempDest = session.createTemporaryQueue();
MessageConsumer responseConsumer = session.createConsumer(tempDest);//获取应答的消息消费者
responseConsumer.setMessageListener(getResponse);//设置消费者返回消息时的监听接口
msg.setJMSReplyTo(tempDest);//告诉消费者应答消息发送到临时队列
//消费者应答的id,发送出的消息和应答消息进行匹配
String uid = System.currentTimeMillis()+"";
msg.setJMSCorrelationID(uid);
return msg;
}
});
}
}
8、controller:
package com.dongnaoedu.controller;
import com.dongnaoedu.mq.producer.queue.QueueSender;
import com.dongnaoedu.mq.producer.topic.TopicSender;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
/**
*
* @author lgy
* @description controller测试
*/
@Controller
@RequestMapping("/activemq")
public class ActivemqController {
@Resource
QueueSender queueSender;
@Resource
TopicSender topicSender;
/**
* 发送消息到队列
* Queue队列:仅有一个订阅者会收到消息,消息一旦被处理就不会存在队列中
* @param message
* @return String
*/
@ResponseBody
@RequestMapping("queueSender")
public String queueSender(@RequestParam("message")String message){
String opt="";
try {
queueSender.send("test.queue",message);
opt = "suc";
} catch (Exception e) {
opt = e.getCause().toString();
}
return opt;
}
/**
* 发送消息到主题
* Topic主题 :放入一个消息,所有订阅者都会收到
* 这个是主题目的地是一对多的
* @param message
* @return String
*/
@ResponseBody
@RequestMapping("topicSender")
public String topicSender(@RequestParam("message")String message){
String opt = "";
try {
topicSender.send("test.topic",message);
opt = "suc";
} catch (Exception e) {
opt = e.getCause().toString();
}
return opt;
}
}
调用不同的接口即可发布消息到对应的队列中。
消息消费者项目
项目的pom文件和web.xml以及spring-mvc.xml基本和生产者一样,且不是重点,这里不再浪费篇幅,就不贴出来了。
1、applicationContext.xml:
<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
<!-- 配置扫描路径 -->
<context:component-scan base-package="com.dongnaoedu">
<context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
</context:component-scan>
<!-- ActiveMQ 连接工厂 -->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" />
<!-- Spring Caching连接工厂 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<property name="sessionCacheSize" value="100" />
</bean>
<!-- 消息消费者 start-->
<!-- 定义Queue监听器 -->
<jms:listener-container destination-type="queue" container-type="default"
connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="test.queue" ref="queueReceiver1"></jms:listener>
<jms:listener destination="test.queue" ref="queueReceiver2"></jms:listener>
</jms:listener-container>
<!-- 定义Topic监听器 -->
<jms:listener-container destination-type="topic" container-type="default"
connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="test.topic" ref="topicReceiver1"></jms:listener>
<jms:listener destination="test.topic" ref="topicReceiver2"></jms:listener>
</jms:listener-container>
<!-- 消息消费者 end -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory"></constructor-arg>
<!-- 队列模式-->
<property name="pubSubDomain" value="false"></property>
</bean>
</beans>
2、发布订阅模式的消息监听:
package com.dongnaoedu.mq.consumer.topic;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
*
* @author lgy
* @description Topic消息监听器
*
*/
@Component
public class TopicReceiver2 implements MessageListener {
public void onMessage(Message message) {
try {
String textMsg = ((TextMessage)message).getText();
System.out.println("TopicReceiver2 accept msg : "+textMsg);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
可以同时设置多个监听,有消息发布时,多个监听器都能监听到消息
package com.dongnaoedu.mq.consumer.topic;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
*
* @author lgy
* @description Topic消息监听器
*
*/
@Component
public class TopicReceiver1 implements MessageListener {
public void onMessage(Message message) {
try {
String textMsg = ((TextMessage)message).getText();
System.out.println("TopicReceiver1 accept msg : "+textMsg);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
在消息生产者项目的前端界面输入消息内容:“123456”,在点击发送Topic消息,以发布订阅模式发布消息;消费项目这边的两个发布订阅模式监听器都能收到消息并打印出来:
3、点对点模式的消息监听
package com.dongnaoedu.mq.consumer.queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
*
* @author Mark
* @description 队列消息监听器
*
*/
@Component
public class QueueReceiver1 implements MessageListener {
@Autowired
private ReplyTo replyTo;
public void onMessage(Message message) {
try {
String textMsg = ((TextMessage)message).getText();
System.out.println("QueueReceiver1 accept msg : "+textMsg);
//do my 业务工作
replyTo.send(textMsg,message);//执行回复
} catch (JMSException e) {
e.printStackTrace();
}
}
}
可以给消息生产者回复消息:
package com.dongnaoedu.mq.consumer.queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
/**
* 负责向消息提供者发送应答信息
*/
@Component
public class ReplyTo {
@Autowired
private JmsTemplate jmsTemplate;
public void send(final String consumerMsg, Message produerMessage) throws JMSException {
//消息回复,getJMSReplyTo回去回复的临时队列
jmsTemplate.send(produerMessage.getJMSReplyTo(), new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Message msg = session.createTextMessage("QueueReceiver1 accept msg"
+consumerMsg);
return msg;
}
});
}
}
可以有多个点对点队列监听器,当同一队列有多个监听器时,会轮询给监听器发消息
package com.dongnaoedu.mq.consumer.queue;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
*
* @author Mark
* @description 队列消息监听器
*
*/
@Component
public class QueueReceiver2 implements MessageListener {
public void onMessage(Message message) {
try {
String textMsg = ((TextMessage)message).getText();
System.out.println("QueueReceiver2 accept msg : "+textMsg);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
在生产项目连续发布6条点对点消息,消费项目这边接收到的消息打印结果为:
上图是两个监听器轮询接收到消息并打印出来的结果。