Spring5整合ActiveMQ实现queue和topic消息模式

简介: Spring5整合ActiveMQ实现queue和topic消息模式

一、引入spring和ActiveMQ的依赖。


    <properties>
        <spring.version>5.0.2.RELEASE</spring.version>
        <activemq.version>5.15.5</activemq.version>
    </properties>
    <dependencies>
        <!--activemq需要的jar包 -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>${activemq.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>${activemq.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>
    </dependencies>

二、整合queue消息模式


1.application-queue.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">
    <!--声明消息工厂-->
    <bean id="jmsFactory"
          class="org.apache.activemq.pool.PooledConnectionFactory"
          destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="tcp://127.0.0.1:61616"/>
            </bean>
        </property>
        <property name="maxConnections" value="100"></property>
    </bean>
    <!--声明目的地 队列-->
    <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="spring-active-queue"/>
    </bean>
    <!-- Spring的JMS模版操作activeMQ工具 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="jmsFactory"/>
        <property name="defaultDestination" ref="destinationQueue"/>
        <!-- 消息转化器 -->
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>
</beans>


2.QuenProducer.java消息生产者


public class QuenProducer {
    public static void main(String[] args) {
        // 创建IOC容器
        ApplicationContext context = new ClassPathXmlApplicationContext("classpath:application-queue.xml");
        JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
        //指定队列名称地址
        //jmsTemplate.setDefaultDestination(new ActiveMQQueue("hello-quen"));
        // 默认队列
        jmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                //创建消息
                TextMessage textMessage = session.createTextMessage("spring整合ActiveMQ-queue");
                return textMessage;
            }
        });
        System.out.println("消息发送成功!!!");
    }
}


3.QuenConsumer.java消息消费者


public class QuenConsumer {
    public static void main(String[] args) throws JMSException {
        //创建IOC容器
        ApplicationContext context = new ClassPathXmlApplicationContext("classpath:application-queue.xml");
        JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
        //消费队列名称地址
        //jmsTemplate.setDefaultDestination(new ActiveMQQueue("hello-quen"));
        //如果不设置监听地址,默认目的地
        TextMessage textMessage = (TextMessage) jmsTemplate.receive();
        System.out.println(textMessage.getText());
        //返回的是对象 消息转换器的作用 bean里面有配置
        //Object object = jmsTemplate.receiveAndConvert();
        //System.out.println("消息是:"+ object);
    }
}

三、整合Topic消息模式


1.application-topic.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">
    <!--声明消息工厂-->
    <bean id="jmsFactory"
          class="org.apache.activemq.pool.PooledConnectionFactory"
          destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="tcp://127.0.0.1:61616"/>
            </bean>
        </property>
        <property name="maxConnections" value="100"></property>
    </bean>
    <!--声明目的地 队列-->
    <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg index="0" value="spring-active-topic"/>
    </bean>
    <!-- Spring的JMS模版操作activeMQ工具 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="jmsFactory"/>
        <property name="defaultDestination" ref="destinationTopic"/>
        <!-- 消息转化器 -->
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>
</beans>


2.生产者和消费者,和queue模式一样,只需要修改配置文件即可。


四、监听者模式


1.application-queue-listener.xml


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">
    <!--声明消息工厂-->
    <bean id="jmsFactory"
          class="org.apache.activemq.pool.PooledConnectionFactory"
          destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="tcp://127.0.0.1:61616"/>
            </bean>
        </property>
        <property name="maxConnections" value="100"></property>
    </bean>
    <!--声明目的地 队列-->
    <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="spring-active-queue"/>
    </bean>
    <!-- Spring的JMS模版操作activeMQ工具 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="jmsFactory"/>
        <property name="defaultDestination" ref="destinationQueue"/>
        <!-- 消息转化器 -->
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>
    <!--声明一个监听器对象-->
    <bean id="myListener" class="com.wang.listener.MyListener"></bean>
    <!--配置监听容器-->
    <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="jmsFactory"/>
        <property name="destination" ref="destinationQueue"/>
        <property name="messageListener" ref="myListener"/>
    </bean>
</beans>

2.MyListener.java,监听相当于消费者。

public class MyListener implements MessageListener {
    public void onMessage(Message message) {
        if (message instanceof TextMessage){
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("监听器收到消息,不需要配置消费者了!!!,内容是:"+ textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

3.QuenProducerListener.java 只需要配置生产者就可以了。上面监听相当于消费者。


public class QuenProducerListener {
    public static void main(String[] args) {
        // 创建IOC容器
        ApplicationContext context = new ClassPathXmlApplicationContext("classpath:application-queue-listener.xml");
        JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
        //指定队列名称地址
        //jmsTemplate.setDefaultDestination(new ActiveMQQueue("hello-quen"));
        // 默认队列
        jmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                //创建消息
                TextMessage textMessage = session.createTextMessage("spring整合ActiveMQ-queue");
                return textMessage;
            }
        });
        System.out.println("消息发送成功!!!");
    }
}



目录
相关文章
|
5月前
|
Java Spring 容器
Spring注解开发定义bean及纯注解开发模式
Spring注解开发定义bean及纯注解开发模式
35 0
|
5月前
|
SQL Java 数据库
Spring Authorization Server 1.1 扩展实现 OAuth2 密码模式与 Spring Cloud 的整合实战(上)
Spring Authorization Server 1.1 扩展实现 OAuth2 密码模式与 Spring Cloud 的整合实战(上)
|
5月前
|
安全 前端开发 Java
Spring Authorization Server 1.1 扩展实现 OAuth2 密码模式与 Spring Cloud 的整合实战(下)
Spring Authorization Server 1.1 扩展实现 OAuth2 密码模式与 Spring Cloud 的整合实战(下)
|
4月前
|
消息中间件 Java Spring
一文看懂Spring Boot整合Rabbit MQ实现多种模式的生产和消费
一文看懂Spring Boot整合Rabbit MQ实现多种模式的生产和消费
66 0
|
4月前
|
安全 Java 数据安全/隐私保护
Spring Security OAuth 认证流程浅析:授权码模式
【1月更文挑战第16天】上一篇[Spring Security OAuth 认证流程浅析:密码模式],简单分析了密码模式授权流程的源码,这篇来试着分析 OAuth 中最具代表性的授权码模式。
75 4
|
4月前
|
安全 Java 数据安全/隐私保护
Spring Security OAuth 认证流程浅析:密码模式
【1月更文挑战第15天】从 Spring Security OAuth 源码分析其实现原理。这篇我们先分析最常用也相对简单的密码模式。
84 0
|
4月前
|
消息中间件 Java Spring
RabbitMQ各种模式的含义与Spring Boot实例详解
RabbitMQ各种模式的含义与Spring Boot实例详解
36 0
|
4月前
|
监控 安全 关系型数据库
微服务+Java+Spring Cloud +UniApp +MySql智慧工地综合管理云平台源码,SaaS模式
微服务+Java+Spring Cloud +UniApp +MySql智慧工地综合管理云平台源码,SaaS模式
111 0
|
7月前
|
XML Java 数据格式
Spring中JavaBean的生命周期及模式
Spring中JavaBean的生命周期及模式
25 0
|
7月前
|
设计模式 Java 开发者
Spring框架中JavaBean的生命周期及单例模式与多列模式
Spring框架中JavaBean的生命周期及单例模式与多列模式
82 0