Spring + activeMQ整合,并写了一个简单的小栗子

简介: Spring + activeMQ整合,并写了一个简单的小栗子

消息中间件有很多的用途和优点:

1. 将数据从一个应用程序传送到另一个应用程序,或者从软件的一个模块传送到另外一个模块;

2. 负责建立网络通信的通道,进行数据的可靠传送。

3. 保证数据不重发,不丢失

4. 能够实现跨平台操作,能够为不同操作系统上的软件集成技工数据传送服务


MQ


首先简单的介绍一下MQ,MQ英文名MessageQueue,中文名也就是大家用的消息队列,干嘛用的呢,说白了就是一个消息的接受和转发的容器,可用于消息推送。


这里主要介绍一下ActiveMQ

官方网站下载地址: 官方网站下载地址

08092532_ZZC2.png

Windows下下载完直接解压就行,然后到解压的目录,到bin目录下的32或64运行activemq.bat脚本文件。

运行效果图:

image.png


启动默认端口号为:8161,在浏览器输入:localhost:8161即可访问如下页面:


image.png


这样ActiveMQ安装,启动就完成了。

接下来开始使用ActiveMQ。

项目进本结构:


image.png



1.创建一个Maven项目,在pom.xml引入jar包。

<!-- spring-mq包 -->
   <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-webmvc</artifactId>
      <version>3.2.8.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-spring</artifactId>
      <version>5.14.5</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jms</artifactId>
      <version>3.2.8.RELEASE</version>
    </dependency>
    <!-- json的转换 -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.4</version>
    </dependency>


2.配置mq的链接信息spring-config.properties

这里默认端口号为:61616,账号密码默认为:admin

#mq  link Properties
activemq_url=tcp://192.168.1.102:61616
activemq_username=admin
activemq_password=admin

3.配置spring+activeMQ的配置文件,这里贴上完成代码,里面有详细注释。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:jdbc="http://www.springframework.org/schema/jdbc"  
    xmlns:jee="http://www.springframework.org/schema/jee" 
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:aop="http://www.springframework.org/schema/aop" 
    xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:util="http://www.springframework.org/schema/util"
    xmlns:jpa="http://www.springframework.org/schema/data/jpa"
    xmlns:amq="http://activemq.apache.org/schema/core"  
    xmlns:jms="http://www.springframework.org/schema/jms" 
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd
        http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.2.xsd
        http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.2.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd
        http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa-1.3.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd
        http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-3.2.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.2.xsd
        http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.14.5.xsd
        ">
    <context:annotation-config/>
    <context:component-scan base-package="com"/>
    <!-- 读取配置文件 -->
    <bean id="propertyPlaceholderConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <array>
                <value>classpath:properties/spring-config.properties</value>
            </array>
        </property>
    </bean>
    <!-- ActiveMQ 连接工厂 -->  
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
    <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码--> 
    <amq:connectionFactory id="amqConnectionFactory" brokerURL="${activemq_url}" userName="${activemq_username}" password="${activemq_password}"/>        
    <!-- 这里可以采用连接池的方式连接PooledConnectionFactoryBean -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 配置连接 -->
        <property name="targetConnectionFactory" ref="amqConnectionFactory"/>
        <!-- 会话的最大连接数 -->
        <property name="sessionCacheSize" value="100"/>
    </bean>
    <!-- 定义消息队列topic类型,queue的方式差不多 -->
    <bean id="topic" class="org.apache.activemq.command.ActiveMQTopic">
        <!-- 定义名称 -->
        <constructor-arg index="0" value="topic"/>
    </bean>
    <!-- 配置JMS模板(topic),Spring提供的JMS工具类,它发送、接收消息。 -->
    <!-- 为了测试发送消息,保留jmsTemplate的配置,实际不存在发送,只需要配置监听即可 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="defaultDestination" ref="topic"/>
        <!-- 非pub/sub模型(发布/订阅),true为topic,false为queue --> 
        <property name="pubSubDomain" value="true"/>
    </bean>
    <!-- 定义JmsTemplate的Queue类型 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
        <constructor-arg ref="connectionFactory" />
        <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
        <property name="pubSubDomain" value="false" />
    </bean>
    <!-- 监听方式,这种方式更实用,可以一直监听消息 -->
    <bean id="topicMessageListen" class="com.mq.TopicMessageListen"/>    
    <bean id="defaultMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <!-- 注册activemq名称 -->
        <property name="destination" ref="topic"/>
        <property name="messageListener" ref="topicMessageListen"/>
    </bean>
</beans>

4.配置好了,接下来就写实现

创建一个MqSendUtil.java的通用发送方法。

package com.util;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.context.ApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
public class MqSendUtil {
    private  ApplicationContext ac = new ClassPathXmlApplicationContext("classpath:spring/applicationContext.xml");
    private  JmsTemplate jmsTemplate = (JmsTemplate) ac.getBean("jmsTopicTemplate");
    public void send(final String message){
        jmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                TextMessage msg = session.createTextMessage();
                msg.setText(message);
                System.out.println("发送数据++++++++++++发送数据:"+message);
                return msg;
            }
        });
    }
}

监听发送的消息TopicMessageListen.java

package com.mq;
import java.util.Map;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import com.alibaba.fastjson.JSON;
public class TopicMessageListen implements MessageListener{
    //监听接口获取数据
    public void onMessage(Message message) {        
        try {
            System.out.println("------------获取到的数据:"+message);
            TextMessage tm = (TextMessage)(message);
            String aa = tm.getText();
            Map<String, Object> map = JSON.parseObject(aa);
            System.out.println("------------------aa的值:"+map.get("aa"));
            System.out.println("------------------bb的值:"+map.get("bb"));
            //在这里可以进行操作。
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


调用发送util实现发送TopicSendMessage.java

package com.mq;
import java.util.HashMap;
import java.util.Map;
import com.alibaba.fastjson.JSON;
import com.util.MqSendUtil;
public class TopicSendMessage {
    public static void main(String[] args) {
        MqSendUtil mq = new MqSendUtil();
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("aa", "Thread2 aa的值");
        map.put("bb", "Thread2 bb的值");
        mq.send(JSON.toJSONString(map));
        System.out.println("Thread2 请求的数据:"+map);
    }
}


运行图:


image.png

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6月前
|
消息中间件 Java Kafka
SpringBoot实用开发篇第六章(整合第三方技术,ActiveMQ,RabbitMQ,RocketMQ,Kafka)
SpringBoot实用开发篇第六章(整合第三方技术,ActiveMQ,RabbitMQ,RocketMQ,Kafka)
|
6月前
|
消息中间件 监控 Java
使用Spring Boot结合ActiveMQ和MQTT实现消息的发送和接收
使用Spring Boot结合ActiveMQ和MQTT实现消息的发送和接收
531 3
|
5月前
|
消息中间件 Java Apache
使用Spring Boot实现与ActiveMQ的消息队列集成
使用Spring Boot实现与ActiveMQ的消息队列集成
|
消息中间件 Java
springboot整合ActiveMQ(点对点+发布订阅)
springboot整合ActiveMQ(点对点+发布订阅)
|
7月前
|
消息中间件 XML Java
SpringBoot2.0整合ActiveMQ
SpringBoot2.0整合ActiveMQ
75 2
|
7月前
|
消息中间件 Java
SpringBoot使用ActiveMq同时支持点对点推送和发布订阅
SpringBoot使用ActiveMq同时支持点对点推送和发布订阅
43 0
|
7月前
|
消息中间件 Java Kafka
SpringBoot整合 ActiveMQ快速入门 实现点对点推送
SpringBoot整合 ActiveMQ快速入门 实现点对点推送
70 0
|
消息中间件 Java 测试技术
53分布式电商项目 - Spring集成ActiveMQ
53分布式电商项目 - Spring集成ActiveMQ
31 0
|
消息中间件 Java Spring
ActiveMQ整合springboot
ActiveMQ整合springboot
|
消息中间件 Java
SpringBoot整合ActiveMq实现Queue和Topic两种模式
SpringBoot整合ActiveMq实现Queue和Topic两种模式
136 4
SpringBoot整合ActiveMq实现Queue和Topic两种模式