消息中间件系列三、JMS和activeMQ的简单使用

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 一、JMS 1、什么是JMS   JMS(JAVA Message Service,java消息服务)本质是API,Java平台消息中间件的规范,java应用程序之间进行消息交换。并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。

前言:这是中间件一个系列的文章之一,有需要的朋友可以看看这个系列的其他文章:
消息中间件系列一、消息中间件的基本了解
消息中间件系列二、Windows下的activeMQ和rabbitMQ的安装
消息中间件系列三、JMS和activeMQ的简单使用
消息中间件系列四、认识AMQP和RabbiyMq的简单使用
消息中间件系列五、rabbit消息的确认机制
消息中间件系列六,rabbit与spring集成实战

一、JMS

1、什么是JMS

  JMS(JAVA Message Service,java消息服务)本质是API,Java平台消息中间件的规范,java应用程序之间进行消息交换。并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

2、JMS规范中的点对点 (P2P) 模式:

P2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

_2

P2P的特点:

  • 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
  • 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
  • 接收者在成功接收消息之后需向队列应答成功

如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式。

3、JMS规范中的主题模式(Pub/sub发布订阅):

包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber) 多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
_3

Pub/Sub的特点

  • 每个消息可以有多个消费者
  • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息
  • 为了消费消息,订阅者必须保持运行的状态,如果消息生产者发布了消息之后,订阅者没有运行,则会错过消息,当订阅者再次运行也接受不到消息了。

为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。

4、消息消费

在JMS中,消息的产生和消费都是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。

(1)同步

订阅者或接收者通过receive方法来接收消息,receive方法在接收到消息之前(或超时之前)将一直阻塞;

(2)异步

订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。

5、JMS对象模型包含如下几个要素:

1)连接工厂:创建一个JMs连接

  创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。

2)JMS连接:客户端和服务器之间的一个连接。

  Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。

3)JMS会话:客户和服务器会话的状态,建立在连接之上的

  Session是操作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。

4)JMS目的Destination:消息队列

  Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。

5)JMS生产者:消息的生成

  消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。

6)JMS消费者:接收消息

  消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。

7)Broker

  简单来说就是消息队列服务器实体。

6、JMS规范中的消息类型

  1. TextMessage
  2. MapMessage
  3. ObjectMessage
  4. BytesMessage
  5. StreamMessage

二、activeMQ的简单使用

activeMQ是JMS规范中的一种消息中间件。
下面先实现一个最简单的消息提供者和消息消费者,熟悉一下JMS规范。
说明一下:原生点对点和发布订阅模式,只要在session创建队列的时候改一下即可,其他都可以不变,下面代码会有注释。
首先通过maven引入activemq包

<dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.8.0</version>
</dependency>

消息生成者代码

package com.dongnaoedu;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class JmsProducer {

    //默认连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    //发送的消息数量
    private static final int SENDNUM = 10;

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;
        Connection connection = null;
        Session session;
        Destination destination;
        MessageProducer messageProducer;

        connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);//连接工程

        try {
            connection = connectionFactory.createConnection();//连接
            connection.start();
            /*
            createSession参数取值
            * 1、为true表示启用事务
            * 2、消息的确认模式
            * AUTO_ACKNOWLEDGE  自动签收
            * CLIENT_ACKNOWLEDGE 客户端自行调用acknowledge方法签收
            * DUPS_OK_ACKNOWLEDGE 不是必须签收,消费可能会重复发送
            * 在第二次重新传送消息的时候,消息
               头的JmsDelivered会被置为true标示当前消息已经传送过一次,
               客户端需要进行消息的重复处理控制。
            * */
            session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);//会话
            destination = session.createQueue("HelloWAM");//点对点消息队列,如果要创建发布订阅模式,是要改成session.createTopic("HelloWAM");,整个类其他地方都不用变
            messageProducer = session.createProducer(destination);//消息生产者
            for(int i=0;i<SENDNUM;i++){
                String msg = "发送消息"+i+" "+System.currentTimeMillis();
                TextMessage message = session.createTextMessage(msg);
                System.out.println("发送消息:"+msg);
                messageProducer.send(message);
            }
            session.commit();
        } catch (JMSException e) {
            e.printStackTrace();
        }finally {
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

将发送的消息打印出来:
A_80_H_2TYD_WSYN_R_2G

在可视化界面可以看到已经生成了队列,并且已经有了消息。
DT_309Z_EYK_W3GQ_R4W0_P

消息消费者代码:

package com.dongnaoedu;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsConsumer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;//连接工厂
        Connection connection = null;//连接

        Session session;//会话 接受或者发送消息的线程
        Destination destination;//消息的目的地

        MessageConsumer messageConsumer;//消息的消费者

        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(JmsConsumer.USERNAME,
                JmsConsumer.PASSWORD, JmsConsumer.BROKEURL);

        try {
            //通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建session
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建一个连接HelloWorld的消息队列
            destination = session.createQueue("HelloWAM");//点对点消息队列,如果要创建发布订阅模式,是要改成session.createTopic("HelloWAM");,整个类其他地方都不用变

            //创建消息消费者
            messageConsumer = session.createConsumer(destination);

            //读取消息
            while(true){
                TextMessage textMessage = (TextMessage)messageConsumer.receive(10000);
                if(textMessage != null){
                    System.out.println("Accept msg : "+textMessage.getText());
                }else{
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

接收到的消息:
image

可以看到消息已经被处理了
image

三、activeMQ与spring整合实战

消息生产者项目

1、用maven引入相关的包,pom文件:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.dongnaoedu</groupId>
  <artifactId>am_spring_producer</artifactId>
  <packaging>war</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>am_spring_producer Maven Webapp</name>
  <url>http://maven.apache.org</url>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>javax</groupId>
      <artifactId>javaee-web-api</artifactId>
      <version>7.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-webmvc</artifactId>
      <version>4.3.11.RELEASE</version>
    </dependency>

    <dependency>
      <groupId>javax.servlet</groupId>
      <artifactId>jstl</artifactId>
      <version>1.2</version>
    </dependency>

    <!--日志-->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.5</version>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.16</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>jcl-over-slf4j</artifactId>
      <version>1.7.5</version>
    </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-classic</artifactId>
      <version>1.0.13</version>
    </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-core</artifactId>
      <version>1.0.13</version>
    </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-access</artifactId>
      <version>1.0.13</version>
    </dependency>

    <!--JSON-->
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.7.4</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-core</artifactId>
      <version>2.7.4</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-annotations</artifactId>
      <version>2.7.4</version>
    </dependency>

    <!-- xbean -->
    <dependency>
      <groupId>org.apache.xbean</groupId>
      <artifactId>xbean-spring</artifactId>
      <version>3.16</version>
    </dependency>
    <dependency>
      <groupId>com.thoughtworks.xstream</groupId>
      <artifactId>xstream</artifactId>
      <version>1.3.1</version>
    </dependency>

    <!--ActiveMq-->
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.8.0</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jms</artifactId>
      <version>4.3.11.RELEASE</version>
    </dependency>

  </dependencies>
  <build>
    <finalName>am_spring_producer</finalName>
    <resources>
      <resource>
        <directory>${basedir}/src/main/java</directory>
        <includes>
          <include>**/*.xml</include>
        </includes>
      </resource>
    </resources>
  </build>
</project>

2、web.xml配置

<!DOCTYPE web-app PUBLIC
 "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
 "http://java.sun.com/dtd/web-app_2_3.dtd" >

<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
         xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
         version="3.0">
  <display-name>ActiveMQSpringProducer</display-name>

  <servlet-mapping>
    <servlet-name>default</servlet-name>
    <url-pattern>*.js</url-pattern>
  </servlet-mapping>

  <!-- Spring 编码过滤器 start -->
  <filter>
    <filter-name>characterEncoding</filter-name>
    <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
    <init-param>
      <param-name>encoding</param-name>
      <param-value>UTF-8</param-value>
    </init-param>
    <init-param>
      <param-name>forceEncoding</param-name>
      <param-value>true</param-value>
    </init-param>
  </filter>
  <filter-mapping>
    <filter-name>characterEncoding</filter-name>
    <url-pattern>/*</url-pattern>
  </filter-mapping>
  <!-- Spring 编码过滤器 End -->

  <!-- Spring Application Context Listener Start -->
  <context-param>
    <param-name>contextConfigLocation</param-name>
    <param-value>classpath:applicationContext.xml</param-value>
  </context-param>
  <listener>
    <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
  </listener>
  <!-- Spring Application Context Listener End -->


  <!-- Spring MVC Config Start -->
  <servlet>
    <servlet-name>SpringMVC</servlet-name>
    <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
    <init-param>
      <param-name>contextConfigLocation</param-name>
      <param-value>classpath:spring-mvc.xml</param-value>
    </init-param>
    <load-on-startup>1</load-on-startup>
  </servlet>
  <servlet-mapping>
    <servlet-name>SpringMVC</servlet-name>
    <!-- Filter all resources -->
    <url-pattern>/</url-pattern>
  </servlet-mapping>
  <!-- Spring MVC Config End -->

</web-app>

3、spring配置文件:applicationContext.xml:

注意:与spring整合的时候要加上命名空间:

xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms=http://www.springframework.org/schema/jms

http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"

JMS对象模型的连接工厂、连接、消费者、生产者等可以通过配置文件注入spring容器。

<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
     
     <!-- 配置扫描路径 -->
     <context:component-scan base-package="com.dongnaoedu">
         <context:exclude-filter type="annotation"
              expression="org.springframework.stereotype.Controller"/>
     </context:component-scan>

    <!-- ActiveMQ 连接工厂 -->
    <amq:connectionFactory id="amqConnectionFactory"
             brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" />

    <!-- Spring Caching连接工厂 -->
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connection"
          class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <property name="sessionCacheSize" value="100"></property>
    </bean>


    <!-- Spring JmsTemplate 的消息生产者 start-->
    <!-- 定义JmsTemplate的Queue类型 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connection"></constructor-arg>
        <!-- 队列模式-->
        <property name="pubSubDomain" value="false"></property>
    </bean>


    <!-- 定义JmsTemplate的Topic类型 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connection"></constructor-arg>
        <!-- true:发布订阅模式; false:队列模式-->
        <property name="pubSubDomain" value="true"></property>
    </bean>

    <!--Spring JmsTemplate 的消息生产者 end-->

    <!--接收消费者应答的监听器-->
    <!--
        消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。
        EJB中的MDB(Message-Driven Bean)就是一种MessageListener。
    -->
    <jms:listener-container destination-type="queue" container-type="default"
                            connection-factory="connection" acknowledge="auto">
        <jms:listener destination="tempqueue" ref="getResponse"></jms:listener>
    </jms:listener-container>
</beans>  

4、做成网页实战,需要配置springMVC;spring-MVC.xml

<?xml version="1.0" encoding="UTF-8"?>  
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"   
       xmlns:aop="http://www.springframework.org/schema/aop"   
       xmlns:context="http://www.springframework.org/schema/context"  
       xmlns:mvc="http://www.springframework.org/schema/mvc"   
       xmlns:tx="http://www.springframework.org/schema/tx"   
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
       xsi:schemaLocation="http://www.springframework.org/schema/aop   
        http://www.springframework.org/schema/aop/spring-aop-4.0.xsd   
        http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-4.0.xsd   
        http://www.springframework.org/schema/mvc   
        http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd   
        http://www.springframework.org/schema/tx   
        http://www.springframework.org/schema/tx/spring-tx-4.0.xsd">

<!--    <mvc:default-servlet-handler />-->
    <mvc:resources mapping="/js/**" location="/js/"/>
    <mvc:annotation-driven
            content-negotiation-manager="contentNegotiationManager" />

    <context:component-scan base-package="com.dongnaoedu">
        <context:include-filter type="annotation"
                                expression="org.springframework.stereotype.Controller" />
    </context:component-scan>


    <bean id="stringHttpMessageConverter"
          class="org.springframework.http.converter.StringHttpMessageConverter">
        <property name="supportedMediaTypes">
            <list>
                <bean class="org.springframework.http.MediaType">
                    <constructor-arg index="0" value="text" />
                    <constructor-arg index="1" value="plain" />
                    <constructor-arg index="2" value="UTF-8" />
                </bean>
            </list>
        </property>
    </bean>
    <bean id="mappingJackson2HttpMessageConverter"
          class="org.springframework.http.converter.json.MappingJackson2HttpMessageConverter" />

    <bean class="org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter">
        <property name="messageConverters">
            <list>
                <ref bean="stringHttpMessageConverter" />
                <ref bean="mappingJackson2HttpMessageConverter" />
            </list>
        </property>
    </bean>

    <bean id="contentNegotiationManager"
          class="org.springframework.web.accept.ContentNegotiationManagerFactoryBean">
        <property name="mediaTypes">
            <map>
                <entry key="html" value="text/html" />
                <entry key="pdf" value="application/pdf" />
                <entry key="xsl" value="application/vnd.ms-excel" />
                <entry key="xml" value="application/xml" />
                <entry key="json" value="application/json" />
            </map>
        </property>
        <property name="defaultContentType" value="text/html" />
    </bean>

    <bean id="viewResolver"
          class="org.springframework.web.servlet.view.ContentNegotiatingViewResolver">
        <property name="order" value="0" />
        <property name="contentNegotiationManager" ref="contentNegotiationManager" />

        <property name="viewResolvers">
            <list>
                <bean class="org.springframework.web.servlet.view.BeanNameViewResolver" />
                <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
                    <property name="viewClass"
                              value="org.springframework.web.servlet.view.JstlView" />
                    <property name="prefix" value="/WEB-INF/pages/" />
                    <property name="suffix" value=".jsp"></property>
                </bean>
            </list>
        </property>

        <property name="defaultViews">
            <list>
                <bean  class="org.springframework.web.servlet.view.json.MappingJackson2JsonView">
                    <property name="extractValueFromSingleKeyModel" value="true" />
                </bean>
            </list>
        </property>
    </bean>
    
</beans>  

5、监听接口:

package com.dongnaoedu.mq.producer.queue;

import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 动脑学院-Mark老师
 * 创建日期:2017/11/02
 * 创建时间: 22:19
 * 接收消费者应答的类
 */
@Component
public class GetResponse implements MessageListener {

    public void onMessage(Message message) {
        try {
            String textMsg = ((TextMessage)message).getText();
            System.out.println("GetResponse accept response : "+textMsg);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

6、发布模式的简单消息生产者:

package com.dongnaoedu.mq.producer.topic;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

/**
 * 
 * @author lgy
 * @description   Topic生产者发送消息到Topic
 * 
 */

@Component("topicSender")
public class TopicSender {

    @Autowired
    @Qualifier("jmsTopicTemplate")
    private JmsTemplate jmsTemplate;

    public void send(String queueName,final String message){
        jmsTemplate.send(queueName, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                Message msg = session.createTextMessage(message);
                return msg;
            }
        });
    }
}

7、点对点模式消息生产者:

package com.dongnaoedu.mq.producer.queue;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

import javax.jms.*;
import java.awt.peer.SystemTrayPeer;

/**
 * 
 * @author lgy
 * @description  队列消息生产者,发送消息到队列
 * 
 */
@Component("queueSender")
public class QueueSender {

    @Autowired
    @Qualifier("jmsQueueTemplate")//在xml配置好的队列模式
    private JmsTemplate jmsTemplate;

    @Autowired
    private GetResponse getResponse;

    public void send(String queueName,final String message){
        //jmsTemplate的send方法传递两个参数,第一个传队列名,第二个传消息创建接口的实现对象
        jmsTemplate.send(queueName, new MessageCreator() {
            //消息创建接口只有这么一个方法,框架会把session会话对象传给方法,此方法需要返回一个Message消息对象,框架会发送消息到队列
            public Message createMessage(Session session) throws JMSException {
                //创建会话消息
                Message msg = session.createTextMessage(message);
                //配置消费者应答相关内容
                //获取临时的消息队列
                Destination tempDest = session.createTemporaryQueue();
                MessageConsumer responseConsumer = session.createConsumer(tempDest);//获取应答的消息消费者
                responseConsumer.setMessageListener(getResponse);//设置消费者返回消息时的监听接口
                msg.setJMSReplyTo(tempDest);//告诉消费者应答消息发送到临时队列
                //消费者应答的id,发送出的消息和应答消息进行匹配
                String uid = System.currentTimeMillis()+"";
                msg.setJMSCorrelationID(uid);

                return msg;
            }
        });
    }
}

8、controller:

package com.dongnaoedu.controller;

import com.dongnaoedu.mq.producer.queue.QueueSender;
import com.dongnaoedu.mq.producer.topic.TopicSender;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;


/**
 * 
 * @author lgy
 * @description controller测试
 */
@Controller
@RequestMapping("/activemq")
public class ActivemqController {
    
    @Resource
    QueueSender queueSender;
    @Resource
    TopicSender topicSender;
    
    /**
     * 发送消息到队列
     * Queue队列:仅有一个订阅者会收到消息,消息一旦被处理就不会存在队列中
     * @param message
     * @return String
     */
    @ResponseBody
    @RequestMapping("queueSender")
    public String queueSender(@RequestParam("message")String message){
        String opt="";
        try {
            queueSender.send("test.queue",message);
            opt = "suc";
        } catch (Exception e) {
            opt = e.getCause().toString();
        }
        return opt;
    }
    
    /**
     * 发送消息到主题
     * Topic主题 :放入一个消息,所有订阅者都会收到 
     * 这个是主题目的地是一对多的
     * @param message
     * @return String
     */
    @ResponseBody
    @RequestMapping("topicSender")
    public String topicSender(@RequestParam("message")String message){
        String opt = "";
        try {
            topicSender.send("test.topic",message);
            opt = "suc";
        } catch (Exception e) {
            opt = e.getCause().toString();
        }
        return opt;
    }
    
}

II_D54_A_H3V_XB1JWQG31A

调用不同的接口即可发布消息到对应的队列中。

消息消费者项目

项目的pom文件和web.xml以及spring-mvc.xml基本和生产者一样,且不是重点,这里不再浪费篇幅,就不贴出来了。

1、applicationContext.xml:

<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
     
     <!-- 配置扫描路径 -->
     <context:component-scan base-package="com.dongnaoedu">
         <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
     </context:component-scan>

    <!-- ActiveMQ 连接工厂 -->
    <amq:connectionFactory id="amqConnectionFactory"
                           brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin"  />

    <!-- Spring Caching连接工厂 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <property name="sessionCacheSize" value="100" />
    </bean>

    <!-- 消息消费者 start-->

    <!-- 定义Queue监听器 -->
    <jms:listener-container destination-type="queue" container-type="default"
                            connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="test.queue" ref="queueReceiver1"></jms:listener>
        <jms:listener destination="test.queue" ref="queueReceiver2"></jms:listener>
    </jms:listener-container>


    <!-- 定义Topic监听器 -->
    <jms:listener-container destination-type="topic" container-type="default"
                            connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="test.topic" ref="topicReceiver1"></jms:listener>
        <jms:listener destination="test.topic" ref="topicReceiver2"></jms:listener>
    </jms:listener-container>

    <!-- 消息消费者 end -->

    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory"></constructor-arg>
        <!-- 队列模式-->
        <property name="pubSubDomain" value="false"></property>
    </bean>

</beans>

2、发布订阅模式的消息监听:

package com.dongnaoedu.mq.consumer.topic;

import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 
 * @author lgy
 * @description  Topic消息监听器
 * 
 */
@Component
public class TopicReceiver2 implements MessageListener {

    public void onMessage(Message message) {
        try {
            String textMsg = ((TextMessage)message).getText();
            System.out.println("TopicReceiver2 accept msg : "+textMsg);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

可以同时设置多个监听,有消息发布时,多个监听器都能监听到消息

package com.dongnaoedu.mq.consumer.topic;

import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 
 * @author lgy
 * @description  Topic消息监听器
 * 
 */
@Component
public class TopicReceiver1 implements MessageListener {


    public void onMessage(Message message) {
        try {
            String textMsg = ((TextMessage)message).getText();
            System.out.println("TopicReceiver1 accept msg : "+textMsg);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    
}

在消息生产者项目的前端界面输入消息内容:“123456”,在点击发送Topic消息,以发布订阅模式发布消息;消费项目这边的两个发布订阅模式监听器都能收到消息并打印出来:
image

3、点对点模式的消息监听


package com.dongnaoedu.mq.consumer.queue;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
 * 
 * @author Mark
 * @description  队列消息监听器
 * 
 */
@Component
public class QueueReceiver1 implements MessageListener {

    @Autowired
    private ReplyTo replyTo;

    public void onMessage(Message message) {
        try {
            String textMsg = ((TextMessage)message).getText();
            System.out.println("QueueReceiver1 accept msg : "+textMsg);
            //do my 业务工作
            replyTo.send(textMsg,message);//执行回复
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

可以给消息生产者回复消息:


package com.dongnaoedu.mq.consumer.queue;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

/**
 * 负责向消息提供者发送应答信息
 */
@Component
public class ReplyTo {

    @Autowired
    private JmsTemplate jmsTemplate;

    public void send(final String consumerMsg, Message produerMessage) throws JMSException {
        //消息回复,getJMSReplyTo回去回复的临时队列
        jmsTemplate.send(produerMessage.getJMSReplyTo(), new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                Message msg = session.createTextMessage("QueueReceiver1 accept msg"
                        +consumerMsg);
                return msg;
            }
        });

    }

}

可以有多个点对点队列监听器,当同一队列有多个监听器时,会轮询给监听器发消息



package com.dongnaoedu.mq.consumer.queue;

import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 
 * @author Mark
 * @description  队列消息监听器
 * 
 */
@Component
public class QueueReceiver2 implements MessageListener {

    public void onMessage(Message message) {
        try {
            String textMsg = ((TextMessage)message).getText();
            System.out.println("QueueReceiver2 accept msg : "+textMsg);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

在生产项目连续发布6条点对点消息,消费项目这边接收到的消息打印结果为:
image
上图是两个监听器轮询接收到消息并打印出来的结果。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 网络协议 Java
消息中间件-ActiveMQ
消息中间件-ActiveMQ
|
5月前
|
消息中间件 存储 监控
Java一分钟之-ActiveMQ:消息中间件
【6月更文挑战第11天】Apache ActiveMQ是广泛使用的开源消息中间件,支持JMS和多种消息协议。本文介绍了ActiveMQ的基础知识,包括消息队列和主题模型,以及持久化和高可用性配置。同时,提出了三个常见问题:配置不当、消息堆积和网络错误,并给出了相应的解决策略。通过Java示例代码展示了如何使用ActiveMQ发送和接收消息。正确配置、管理消息处理和持续监控是确保ActiveMQ高效运行的关键。
147 2
|
4月前
|
消息中间件 NoSQL Kafka
消息中间件(RocketMQ、RabbitMQ、ActiveMQ、Redis、kafka、ZeroMQ)以及之间的区别
消息中间件(RocketMQ、RabbitMQ、ActiveMQ、Redis、kafka、ZeroMQ)以及之间的区别
|
消息中间件 Java Maven
消息中间件系列教程(03) -ActiveMQ -点对点&发布订阅模式
消息中间件系列教程(03) -ActiveMQ -点对点&发布订阅模式
95 0
|
消息中间件 Java Linux
消息中间件系列教程(02) -ActiveMQ -安装&入门案例
消息中间件系列教程(02) -ActiveMQ -安装&入门案例
59 0
|
消息中间件 缓存 安全
SpringBoot与JMS集成(中间件为ActiveMQ)
Apache ActiveMQ是最受欢迎和强有力的开源消息和集成模式服务器,支持许多跨语言客户端和协议,便利使用企业集成模式还有许多先进的特性。
|
消息中间件 存储 缓存
消息中间件ActiveMQ常见问题解析
消息中间件ActiveMQ常见问题解析
378 5
|
消息中间件 XML 开发框架
|
消息中间件 存储 NoSQL
剑指offer之消息中间件ActiveMQ知识总结
剑指offer之消息中间件ActiveMQ知识总结
217 3
剑指offer之消息中间件ActiveMQ知识总结
|
消息中间件 Java
JAVA分布式--ActiveMQ 消息中间件(下)
JAVA分布式--ActiveMQ 消息中间件(下)
117 5
JAVA分布式--ActiveMQ 消息中间件(下)