[原创]JMS-ActiveMQ基础与SpringBoot整合

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: [原创]JMS-ActiveMQ基础与SpringBoot整合

ActiveMQ实现了JMS规范。


# ActiveMQ中相关概念术语


  1. Destination目的地消息将要发送的地方,包括:QueueTopic,它们都对Destination接口进行了实现
  1. PTP模式 - Queue
  2. 发布订阅模式 - Topic
    MessageProvider需要指定Destination才能发送消息,MessageConsumer需要指定Destination才能接收和消费消息。
  1. Producer消息生产者
    消息生产者,负责将消息发送到目的地Destination
  2. Consumer消息消费者
    消息消费者,负责从目的地Destination消费消息。
  3. Message消息本体
  4. ConnectionFactory连接工厂
    用于创建连接的工厂
  5. Connection连接
    用户访问ActiveMQ
  6. Session会话
    一次持久有效有状态的访问,由Connection创建,是具体操作消息的基础支撑。
    JMS中定义了两种消息模型:点对点(point to point, queue)发布/订阅(publish/subscribe,topic)。主要区别就是是能否重复消费


# JMS中Queue模式与Topic模式对比


Topic Queue
概要 Publish Subscribe messaging 发布订阅消息 Point-to-Point 点对点
有无状态 topic数据默认不落地,是无状态的。 Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存储。
完整性保障 并不保证publisher发布的每条数据,Subscriber都能接受到。 Queue保证每条数据都能被receiver接收。
消息是否会丢失 一般来说publisher发布消息到某一个topic时,只有正在监听该topic地址的sub能够接收到消息;如果没有sub在监听,该topic就丢失了。 Sender发送消息到目标Queue,receiver可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有receiver来取,也不会丢失。
消息发布接收策略 一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器 一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。

1. PTP Queue不可重复消费


消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。

消息被消费以后(消费者ack应答确认/事务模式),queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。


Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费、其它的则不能消费此消息了。


当消费者不存在时,消息会一直保存,直到有消费消费


image.png


2. 发布订阅模式 Topic 可以重复消费


消息生产者(发布)将消息发布到Topic中,同时有多个消息消费者(订阅该Topic)消费该消息。


和点对点方式不同,发布到topic的消息会被所有订阅者消费。


当生产者发布消息,不管是否有消费者。都不会保存消息。如果生产者向队列发送消息时,没有消费者订阅该队列,则消息全部丢失。否则向所有订阅了该Topic的消费者发送同样的消息(即:消费者必须在线)


image.png



# 在SpringBoot中使用ActiveMQ


ActiveMQ管理地址: http://localhost:8161/admin/


  1. PTP模式
  • 依赖
//jms-active
    compile 'org.springframework.boot:spring-boot-starter-activemq'
    //active连接池-1.5.13依赖
    compile 'org.apache.activemq:activemq-pool'
  • 配置信息


spring:
  # activemq
  activemq:
    broker-url: failover:(tcp://localhost:61616,tcp://localhost:666)?randomize=false      # tcp://localhost:61616/故障转移,默认情况下如果某个链接失效了,则从列表中随机获取一个,如果设置了randomize=false则是严格按照列表的先后顺序的
    user: admin           # 用户名
    password: admin       # 密码
    in-memory: false      # 基于内存的activemq
    close-timeout: 15s     # 在考虑结束之前等待的时间
    pool:
      enabled: true                               # 启动连接池(是否用Pooledconnectionfactory代替普通的ConnectionFactory)
      max-connections: 10                         # 最大链接数量
      idle-timeout: 60s                           # 空闲连接存活时间
      block-if-full: true                         # 当连接请求和池满时是否阻塞。设置false会抛“JMSException异常”
      block-if-full-timeout: -1                   # 如果池仍然满,则在抛出异常之前阻塞时间
      create-connection-on-startup: true          # 是否在启动时创建连接。可以在启动时用于加热池
      maximum-active-session-per-connection: 500  # 每个连接的有效会话的最大数目。
      reconnect-on-exception: true                # 当发生"JMSException"时尝试重新连接
  jms:
    pub-sub-domain: false                  # 默认情况下activemq提供的是queue模式,若要使用topic模式需要配置下面配置
  • 定义PTP模式下的Destination-Queue


/**
 * @author futao
 * Created on 2019-06-04.
 */
@AllArgsConstructor
@Getter
public enum ActiveMqQueueEnum {
    /**
     * springboot-test-queue=测试Queue
     */
    TEST_QUEUE("springboot-test-queue", "测试Queue");
    private String queueName;
    private String desc;
    public static final String testQueue = "springboot-test-queue";
}
/**
 * @author futao
 * Created on 2019-06-04.
 */
@Configuration
public class ActiveMqConfig {
   /**
     * The ActiveMQConnectionFactory creates ActiveMQ Connections.
     * The PooledConnectionFactory pools Connections.
     * If you only need to create one Connection and keep it around for a long time you don't need to pool.
     * If you tend to create many Connection instances over time then Pooling is better as connecting is a heavy operation and can be a performance bottleneck.
     * <p>
     * 可以在这里统一设置JmsTemplate的一些配置,也可以在具体使用到JmsTemplate的时候单独设置
     * JmsMessageTemplate是对JmsTemplate的进一步封装
     * TODO 目前看起来不起作用
     *
     * @param factory
     * @return
     */
    //    @Primary
//    @Bean
    public JmsTemplate jmsTemplate(PooledConnectionFactory factory) {
        JmsTemplate jmsTemplate = new JmsTemplate();
        //关闭事物
        jmsTemplate.setSessionTransacted(false);
        //TODO 在此设置无效
//        jmsTemplate.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
        jmsTemplate.setConnectionFactory(factory);
        return jmsTemplate;
    }
    @Bean(name = ActiveMqQueueEnum.testQueue)
    public ActiveMQQueue activeTestQueue() {
        return new ActiveMQQueue(ActiveMqQueueEnum.TEST_QUEUE.getQueueName());
    }
  /**
     * 定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
     *
     * @param pooledConnectionFactory
     * @return
     */
    @Bean(name = "jmsQueueListener")
    public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(PooledConnectionFactory pooledConnectionFactory) {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(pooledConnectionFactory);
        factory.setSessionTransacted(false);
        factory.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
        return factory;
    }
}
  • 定义PTP模式下的生产者


package com.futao.springbootdemo.foundation.mq.active.ptp;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
/**
 * PTP模式生产者
 *
 * @author futao
 * Created on 2019-06-06.
 */
@Slf4j
@Component
public class PtpProducer {
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    /**
     * 目的地
     */
    @Qualifier("springboot-test-queue")
    @Autowired
    private ActiveMQQueue springBootTestQueue;
    public void send(String msg) {
        jmsMessagingTemplate.convertAndSend(springBootTestQueue, msg);
        try {
            log.info("send to ActiveMQ-Queue[{}] success ,msg:[{}]", springBootTestQueue.getQueueName(), msg);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
/**
 * @author futao
 * Created on 2019-06-04.
 */
@RequestMapping("/activemq")
@RestController
public class ActiveController {
  @Resource
    private PtpProducer ptpProducer;
    @PostMapping("/ptp/sender")
    public void ptpSender(@RequestParam String msg) {
        ptpProducer.send(msg);
    }
}
  • 定义PTP模式下的消费者


package com.futao.springbootdemo.foundation.mq.active.ptp;
import com.futao.springbootdemo.foundation.mq.active.ActiveMqQueueEnum;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.Test;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
import javax.jms.*;
/**
 * @author futao
 * Created on 2019-06-06.
 */
@Slf4j
@Service
public class PtpConsumer {
    @JmsListener(destination = ActiveMqQueueEnum.testQueue, containerFactory = "jmsQueueListener")
    public void ptpConsumer(ActiveMQMessage message) throws JMSException {
        String text = ((TextMessage) message).getText();
        if ("节日快乐666".equalsIgnoreCase(text)) {
            message.acknowledge();    //ack手动确认
        }
        log.info("receive message from activeMQ :[{}]", text);
    }
  /**
   * 手动创建ActiveMQConnectionFactory消费消息,生产消息也类似
   */
    @Test
    public void test() throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://localhost:61616");
        Connection connection = connectionFactory.createConnection();
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);//开启ack手动确认
        MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(ActiveMqQueueEnum.TEST_QUEUE.getQueueName()));
        connection.start();
        consumer.setMessageListener(message -> {
            try {
                String text = ((TextMessage) message).getText();
                System.out.println(("收到消息:{}" + text));
                if ("节日快乐666".equalsIgnoreCase(text)) {
                    message.acknowledge();    //ack手动确认
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
        Thread.sleep(999999999);
    }
}


image.png


特点


  • 一条消息只会发送给其中某一个单独的消费者

image.png

未被确认的消息将再次发送给其他消费


image.png

发布订阅模式


  • 发布订阅模式需要将spring.jms.pub-sub-domain=true,其他配置不需要修改
  • 定义发布订阅模式下的Destination - Topic


/**
 * @author futao
 * Created on 2019-06-04.
 */
@Configuration
public class ActiveMqConfig {
  /**
     * ActiveMQ topic的定义
     */
    public static class TopicDefinition {
        public static final String activeTestTopic = "active-test-topic";
        public static final String activeProdTopic = "active-prod-topic";
    }
    /**
     * 定义一个名为BeanName为activeTestTopic的Topic:active-test-topic
     *
     * @return
     */
    @Bean(name = "activeTestTopic")
    public ActiveMQTopic activeMQTestTopic() {
        return new ActiveMQTopic(TopicDefinition.activeTestTopic);
    }
    /**
     * 定义一个名为BeanName为activeProdTopic的Topic:active-prod-topic
     *
     * @return
     */
    @Bean(name = "activeProdTopic")
    public ActiveMQTopic activeMQProdTopic() {
        return new ActiveMQTopic(TopicDefinition.activeProdTopic);
    }
}
    @PostMapping("/ps/sender")
    public void pushTest(@RequestParam String msg) {
        activeMqProducer.send(msg);
    }
  • 发布订阅模式下的消费者定义


package com.futao.springbootdemo.foundation.mq.active.topic;
import com.futao.springbootdemo.foundation.mq.active.ActiveMqConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQMessage;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
import javax.jms.JMSException;
import javax.jms.TextMessage;
/**
 * 订阅的队列是PTP模式还是Topic模式,与这边的定义无关。取决于配置
 * # 开启topic模式
 * spring:
 * jms:
 * pub-sub-domain: true
 *
 * @author futao
 * Created on 2019-06-04.
 */
@Slf4j
@Service
public class ActiveMqConsumer {
    /**
     * 订阅testTopic  -1
     *
     * @param mqMessage
     * @throws JMSException
     */
    @JmsListener(destination = ActiveMqConfig.TopicDefinition.activeTestTopic)
    public void testTopicConsumer1(ActiveMQMessage mqMessage) throws JMSException {
        String text = ((TextMessage) mqMessage.getMessage()).getText();
        log.info("testTopicConsumer1接收到activeMq-activeTestTopic消息:[{}]", text);
    }
    /**
     * 订阅testTopic  -2
     *
     * @param mqMessage
     * @throws JMSException
     */
    @JmsListener(destination = ActiveMqConfig.TopicDefinition.activeTestTopic)
    public void testTopicConsumer2(ActiveMQMessage mqMessage) throws JMSException {
        String text = ((TextMessage) mqMessage.getMessage()).getText();
        log.info("testTopicConsumer2接收到activeMq-activeTestTopic消息:[{}]", text);
    }
    /**
     * 订阅prodTopic  -1
     *
     * @param mqMessage
     * @throws JMSException
     */
    @JmsListener(destination = ActiveMqConfig.TopicDefinition.activeProdTopic)
    public void prodTopicConsumer1(ActiveMQMessage mqMessage) throws JMSException {
        String text = ((TextMessage) mqMessage.getMessage()).getText();
        log.info("prodTopicConsumer1接收到activeMq-activeProdTopic消息:[{}]", text);
    }
    /**
     * 订阅 prodTopic  -2
     *
     * @param mqMessage
     * @throws JMSException
     */
    @JmsListener(destination = ActiveMqConfig.TopicDefinition.activeProdTopic)
    public void prodTopicConsumer2(ActiveMQMessage mqMessage) throws JMSException {
        String text = ((TextMessage) mqMessage.getMessage()).getText();
        log.info("prodTopicConsumer2接收到activeMq-activeProdTopic消息:[{}]", text);
    }
}


  • 结果展示
    **发送到Topic的消息被所有订阅了该Topic的消费者接收


image.png

image.png

# 参考资料


SpringBoot与ActiveMQ整合实现手动ACK(事务模式与ack应答模式)


# TODO:


  • 如何保证消费者将消息发送到ActiveMQ的过程中消息不丢失
  • ActiveMQ的集群与主从
  • 消息的持久化
  • 事务
  • PTP模式下消费者多久没ACK后ActiveMQ会认为该条消息消费失败呢?(是不是有个消费超时时间设置)。还是只能等到该消费者下线。
相关实践学习
容器服务Serverless版ACK Serverless 快速入门:在线魔方应用部署和监控
通过本实验,您将了解到容器服务Serverless版ACK Serverless 的基本产品能力,即可以实现快速部署一个在线魔方应用,并借助阿里云容器服务成熟的产品生态,实现在线应用的企业级监控,提升应用稳定性。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
SQL 存储 Java
SpringBoot基础学习文章
SpringBoot基础学习文章
172 0
SpringBoot基础学习文章
|
Java 容器
SpringBoot2.x基础篇:谈谈SpringBoot内提供的这几种配置绑定
`SpringBoot`在不断地版本迭代中陆续提供了不同的配置参数绑定的方式,我们可以单独获取`一个配置参数`也可以将`一系列的配置`映射绑定到`JavaBean`的属性字段,下面我们来看看这几种方式的配置绑定哪一种是你最常用到的。
SpringBoot2.x基础篇:谈谈SpringBoot内提供的这几种配置绑定
|
Java Maven Spring
SpringBoot2.x基础篇:开发你的第一个SpringBoot应用程序
`SpringBoot2.x`版本是基于`Java8`来编写的,由于内部使用到了很多新的特性,比如:`lambda`、`interface default`...,所以需要本地开发环境有`java8`的支持。
|
XML 存储 SQL
技术:Java-Web基础|MyBatis整合到SpringBoot(二)
上一篇简单介绍了下MyBatis,知道并了解什么是MyBatis,MyBatis 是一款优秀的持久层框架,它支持定制化 SQL、存储过程以及高级映射。MyBatis 避免了几乎所有的 JDBC 代码和手动设置参数以及获取结果集。MyBatis 可以使用简单的 XML 或注解来配置和映射原生信息,将接口和 Java 的 POJOs(Plain Ordinary Java Object,普通的 Java对象)映射成数据库中的记录。
技术:Java-Web基础|MyBatis整合到SpringBoot(二)
|
安全 Java 数据库连接
SpringBoot-基础配置和属性配置详解
SpringBoot-基础配置和属性配置详解
SpringBoot-基础配置和属性配置详解
|
Java Maven Spring
SpringBoot基础学习
SpringBoot基础学习
86 0
|
NoSQL Java Maven
SpringBoot基础系列@Value 之字面量及 SpEL使用知识点介绍篇
承接上一篇博文【SpringBoot 基础系列】@Value 中哪些你不知道的知识点 中提及到但没有细说的知识点,这一篇博文将来看一下@Value除了绑定配置文件中的属性配置之外,另外支持的两种姿势
231 0
SpringBoot基础系列@Value 之字面量及 SpEL使用知识点介绍篇
|
NoSQL Java Redis
SpringBoot基础系列之自定义配置源使用姿势实例演示
前面一篇博文介绍了一个@Value的一些知识点,其中提了一个点,@Value对应的配置,除了是配置文件中之外,可以从其他的数据源中获取么,如从 redis,db,http 中获取配置? 了解过 SpringCloud Config 的可以给出确切的答案,可以,而且用起来还老爽了,远程配置,支持配置动态刷新,接下来我们来看一下,在 SpringBoot 中,如何配置自定义的数据源
186 0
SpringBoot基础系列之自定义配置源使用姿势实例演示
|
JSON 安全 Java
SpringBoot基础系列之AOP结合SpEL实现日志输出中两点注意事项
使用 AOP 来打印日志大家一把都很熟悉了,最近在使用的过程中,发现了几个有意思的问题,一个是 SpEL 的解析,一个是参数的 JSON 格式输出
438 0
SpringBoot基础系列之AOP结合SpEL实现日志输出中两点注意事项
|
Java Maven 微服务
SpringBoot基础篇之@Value中哪些你不知道的知识点
看到这个标题,有点夸张了啊,@Value 这个谁不知道啊,不就是绑定配置么,还能有什么特殊的玩法不成? (如果下面列出的这些问题,已经熟练掌握,那确实没啥往下面看的必要了)
257 0
SpringBoot基础篇之@Value中哪些你不知道的知识点