消息中间件有很多的用途和优点:
1. 将数据从一个应用程序传送到另一个应用程序,或者从软件的一个模块传送到另外一个模块;
2. 负责建立网络通信的通道,进行数据的可靠传送。
3. 保证数据不重发,不丢失
4. 能够实现跨平台操作,能够为不同操作系统上的软件集成技工数据传送服务
MQ
首先简单的介绍一下MQ,MQ英文名MessageQueue,中文名也就是大家用的消息队列,干嘛用的呢,说白了就是一个消息的接受和转发的容器,可用于消息推送。
这里主要介绍一下ActiveMQ
官方网站下载地址: 官方网站下载地址
Windows下下载完直接解压就行,然后到解压的目录,到bin目录下的32或64运行activemq.bat脚本文件。
运行效果图:
启动默认端口号为:8161,在浏览器输入:localhost:8161即可访问如下页面:
这样ActiveMQ安装,启动就完成了。
接下来开始使用ActiveMQ。
项目进本结构:
1.创建一个Maven项目,在pom.xml引入jar包。
<!-- spring-mq包 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>3.2.8.RELEASE</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-spring</artifactId> <version>5.14.5</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>3.2.8.RELEASE</version> </dependency> <!-- json的转换 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.4</version> </dependency>
2.配置mq的链接信息spring-config.properties
这里默认端口号为:61616,账号密码默认为:admin
#mq link Properties activemq_url=tcp://192.168.1.102:61616 activemq_username=admin activemq_password=admin
3.配置spring+activeMQ的配置文件,这里贴上完成代码,里面有详细注释。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:util="http://www.springframework.org/schema/util" xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.2.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa-1.3.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-3.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.2.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.14.5.xsd "> <context:annotation-config/> <context:component-scan base-package="com"/> <!-- 读取配置文件 --> <bean id="propertyPlaceholderConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <array> <value>classpath:properties/spring-config.properties</value> </array> </property> </bean> <!-- ActiveMQ 连接工厂 --> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码--> <amq:connectionFactory id="amqConnectionFactory" brokerURL="${activemq_url}" userName="${activemq_username}" password="${activemq_password}"/> <!-- 这里可以采用连接池的方式连接PooledConnectionFactoryBean --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 配置连接 --> <property name="targetConnectionFactory" ref="amqConnectionFactory"/> <!-- 会话的最大连接数 --> <property name="sessionCacheSize" value="100"/> </bean> <!-- 定义消息队列topic类型,queue的方式差不多 --> <bean id="topic" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 定义名称 --> <constructor-arg index="0" value="topic"/> </bean> <!-- 配置JMS模板(topic),Spring提供的JMS工具类,它发送、接收消息。 --> <!-- 为了测试发送消息,保留jmsTemplate的配置,实际不存在发送,只需要配置监听即可 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> <property name="defaultDestination" ref="topic"/> <!-- 非pub/sub模型(发布/订阅),true为topic,false为queue --> <property name="pubSubDomain" value="true"/> </bean> <!-- 定义JmsTemplate的Queue类型 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <constructor-arg ref="connectionFactory" /> <!-- 非pub/sub模型(发布/订阅),即队列模式 --> <property name="pubSubDomain" value="false" /> </bean> <!-- 监听方式,这种方式更实用,可以一直监听消息 --> <bean id="topicMessageListen" class="com.mq.TopicMessageListen"/> <bean id="defaultMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <!-- 注册activemq名称 --> <property name="destination" ref="topic"/> <property name="messageListener" ref="topicMessageListen"/> </bean> </beans>
4.配置好了,接下来就写实现
创建一个MqSendUtil.java的通用发送方法。
package com.util; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.xbean.spring.context.ClassPathXmlApplicationContext; import org.springframework.context.ApplicationContext; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class MqSendUtil { private ApplicationContext ac = new ClassPathXmlApplicationContext("classpath:spring/applicationContext.xml"); private JmsTemplate jmsTemplate = (JmsTemplate) ac.getBean("jmsTopicTemplate"); public void send(final String message){ jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage msg = session.createTextMessage(); msg.setText(message); System.out.println("发送数据++++++++++++发送数据:"+message); return msg; } }); } }
监听发送的消息TopicMessageListen.java
package com.mq; import java.util.Map; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import com.alibaba.fastjson.JSON; public class TopicMessageListen implements MessageListener{ //监听接口获取数据 public void onMessage(Message message) { try { System.out.println("------------获取到的数据:"+message); TextMessage tm = (TextMessage)(message); String aa = tm.getText(); Map<String, Object> map = JSON.parseObject(aa); System.out.println("------------------aa的值:"+map.get("aa")); System.out.println("------------------bb的值:"+map.get("bb")); //在这里可以进行操作。 } catch (Exception e) { e.printStackTrace(); } } }
调用发送util实现发送TopicSendMessage.java
package com.mq; import java.util.HashMap; import java.util.Map; import com.alibaba.fastjson.JSON; import com.util.MqSendUtil; public class TopicSendMessage { public static void main(String[] args) { MqSendUtil mq = new MqSendUtil(); Map<String, Object> map = new HashMap<String, Object>(); map.put("aa", "Thread2 aa的值"); map.put("bb", "Thread2 bb的值"); mq.send(JSON.toJSONString(map)); System.out.println("Thread2 请求的数据:"+map); } }
运行图: