Spring Boot中集成ActiveMQ

简介: 本文介绍JMS与ActiveMQ基础概念,涵盖ActiveMQ的安装启动、Spring Boot中集成ActiveMQ实现点对点和发布/订阅消息模式,详细讲解消息生产者与消费者的配置与实现方式,帮助开发者掌握异步消息处理机制。
  1. JMS 和 ActiveMQ 介绍
    1.1 JMS 是啥
    百度百科的解释:
    JMS 即 Java 消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java 消息服务是一个与具体平台无关的 API,绝大多数 MOM 提供商都对 JMS 提供支持。
    JMS 只是接口,不同的提供商或者开源组织对其有不同的实现,ActiveMQ 就是其中之一,它支持JMS,是 Apache 推出的。JMS 中有几个对象模型:
    连接工厂:ConnectionFactory JMS连接:Connection JMS会话:Session JMS目的:Destination JMS生产者:Producer JMS消费者:Consumer JMS消息两种类型:点对点和发布/订阅。
    可以看出 JMS 实际上和 JDBC 有点类似,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。本文主要使用 ActiveMQ。
    1.2 ActiveMQ
    ActiveMQ 是 Apache 的一个能力强劲的开源消息总线。ActiveMQ 完全支持JMS1.1和J2EE 1.4规范,尽管 JMS 规范出台已经是很久的事情了,但是 JMS 在当今的 Java EE 应用中间仍然扮演着特殊的地位。ActiveMQ 用在异步消息的处理上,所谓异步消息即消息发送者无需等待消息接收者的处理以及返回,甚至无需关心消息是否发送成功。
    异步消息主要有两种目的地形式,队列(queue)和主题(topic),队列用于点对点形式的消息通信,主题用于发布/订阅式的消息通信。本章节主要来学习一下在 Spring Boot 中如何使用这两种形式的消息。
  2. ActiveMQ安装
    使用 ActiveMQ 首先需要去官网下载,官网地址为:http://activemq.apache.org/ 本课程使用的版本是 apache-activemq-5.15.3,下载后解压缩会有一个名为 apache-activemq-5.15.3 的文件夹,没错,这就安装好了,非常简单,开箱即用。打开文件夹会看到里面有个 activemq-all-5.15.3.jar,这个 jar 我们是可以加进工程里的,但是使用 maven 的话,这个 jar 我们不需要。
    在使用 ActiveMQ 之前,首先得先启动,刚才解压后的目录中有个 bin 目录,里面有 win32 和 win64 两个目录,根据自己电脑选择其中一个打开运行里面的 activemq.bat 即可启动 ActiveMQ。 消息生产者生产消息发布到queue中,然后消息消费者从queue中取出,并且消费消息。这里需要注意:消息被消费者消费以后,queue中不再有存储,所以消息消费者不可消费到已经被消费的消息。Queue支持存在多个消息消费者,但是对一个消息而言,只会有一个消费者可以消费启动完成后,在浏览器中输入 http://127.0.0.1:8161/admin/ 来访问 ActiveMQ 的服务器,用户名和密码是 admin/admin。如下:
    image.png
    我们可以看到有 Queues 和 Topics 这两个选项,这两个选项分别是点对点消息和发布/订阅消息的查看窗口。何为点对点消息和发布/订阅消息呢?
    点对点消息:消息生产者生产消息发布到 queue 中,然后消息消费者从 queue 中取出,并且消费消息。这里需要注意:消息被消费者消费以后,queue 中不再有存储,所以消息消费者不可消费到已经被消费的消息。Queue 支持存在多个消息消费者,但是对一个消息而言,只会有一个消费者可以消费。
    发布/订阅消息:消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。下面分析具体的实现方式。
  3. ActiveMQ集成
    3.1 依赖导入和配置
    在 Spring Boot 中集成 ActiveMQ 需要导入如下 starter 依赖:

    org.springframework.boot
    spring-boot-starter-activemq

    然后在 application.yml 配置文件中,对 activemq 做一下配置:
    spring:
    activemq:

    activemq url

    broker-url: tcp://localhost:61616
    in-memory: true
    pool:

    如果此处设置为true,需要添加activemq-pool的依赖包,否则会自动配置失败,无法注入JmsMessagingTemplate

    enabled: false
    3.2 Queue 和 Topic 的创建
    首先我们需要创建两种消息 Queue 和 Topic,这两种消息的创建,我们放到 ActiveMqConfig 中来创建,如下:
    /**

    • activemq的配置
    • @author shengwu ni
      /
      @Configuration
      public class ActiveMqConfig {
      /*
      • 发布/订阅模式队列名称
        /
        public static final String TOPIC_NAME = "activemq.topic";
        /*
      • 点对点模式队列名称
        /
        public static final String QUEUE_NAME = "activemq.queue";
        @Bean
        public Destination topic() {
        return new ActiveMQTopic(TOPIC_NAME);
        }
        @Bean
        public Destination queue() {
        return new ActiveMQQueue(QUEUE_NAME);
        }
        }
        可以看出创建 Queue 和 Topic 两种消息,分别使用 new ActiveMQQueue 和 new ActiveMQTopic 来创建,分别跟上对应消息的名称即可。这样在其他地方就可以直接将这两种消息作为组件注入进来了。
        3.3 消息的发送接口
        在 Spring Boot 中,我们只要注入 JmsMessagingTemplate 模板即可快速发送消息,如下:
        /*
    • 消息发送者
    • @author shengwu ni
      /
      @Service
      public class MsgProducer {
      @Resource
      private JmsMessagingTemplate jmsMessagingTemplate;
      public void sendMessage(Destination destination, String msg) {
      jmsMessagingTemplate.convertAndSend(destination, msg);
      }
      }
      convertAndSend 方法中第一个参数是消息发送的目的地,第二个参数是具体的消息内容。
      3.4 点对点消息生产与消费
      3.4.1 点对点消息的生产
      消息的生产,我们放到 Controller 中来做,由于上面已经生成了 Queue 消息的组件,所以在 Controller 中我们直接注入进来即可。然后调用上文的消息发送方法 sendMessage 即可成功生产一条消息。
      /*
    • ActiveMQ controller
    • @author shengwu ni
      */
      @RestController
      @RequestMapping("/activemq")
      public class ActiveMqController {

      private static final Logger logger = LoggerFactory.getLogger(ActiveMqController.class);

      @Resource
      private MsgProducer producer;
      @Resource
      private Destination queue;

      @GetMapping("/send/queue")
      public String sendQueueMessage() {

      logger.info("===开始发送点对点消息===");
      producer.sendMessage(queue, "Queue: hello activemq!");
      return "success";
      }
      }
      3.4.2 点对点消息的消费
      点对点消息的消费很简单,只要我们指定目的地即可,jms 监听器一直在监听是否有消息过来,如果有,则消费。
      /**

    • 消息消费者
    • @author shengwu ni
      */
      @Service
      public class QueueConsumer {

      /**

      • 接收点对点消息
      • @param msg
        */
        @JmsListener(destination = ActiveMqConfig.QUEUE_NAME)
        public void receiveQueueMsg(String msg) {
        System.out.println("收到的消息为:" + msg);
        }
        }
        可以看出,使用 @JmsListener 注解来指定要监听的目的地,在消息接收方法内部,我们可以根据具体的业务需求做相应的逻辑处理即可。
        3.4.3 测试一下
        启动项目,在浏览器中输入:http://localhost:8081/activemq/send/queue,观察控制台的输出日志,出现下面的日志说明消息发送和消费成功。
        收到的消息为:Queue: hello activemq!
        3.5 发布/订阅消息的生产和消费
        3.5.1 发布/订阅消息的生产
        和点对点消息一样,我们注入 topic 并调用 producer 的 sendMessage 方法即可发送订阅消息,如下,不再赘述:
        @RestController
        @RequestMapping("/activemq")
        public class ActiveMqController {

      private static final Logger logger = LoggerFactory.getLogger(ActiveMqController.class);

      @Resource
      private MsgProducer producer;
      @Resource
      private Destination topic;

      @GetMapping("/send/topic")
      public String sendTopicMessage() {

      logger.info("===开始发送订阅消息===");
      producer.sendMessage(topic, "Topic: hello activemq!");
      return "success";
      }
      }
      3.5.2 发布/订阅消息的消费
      发布/订阅消息的消费和点对点不同,订阅消息支持多个消费者一起消费。其次,Spring Boot 中默认的时点对点消息,所以在使用 topic 时,会不起作用,我们需要在配置文件 application.yml 中添加一个配置:
      spring:
      jms:
      pub-sub-domain: true
      该配置是 false 的话,则为点对点消息,也是 Spring Boot 默认的。这样是可以解决问题,但是如果这样配置的话,上面提到的点对点消息又不能正常消费了。所以二者不可兼得,这并非一个好的解决办法。
      比较好的解决办法是,我们定义一个工厂,@JmsListener 注解默认只接收 queue 消息,如果要接收 topic 消息,需要设置一下 containerFactory。我们还在上面的那个 ActiveMqConfig 配置类中添加:
      /**

    • activemq的配置
      *
    • @author shengwu ni
      */
      @Configuration
      public class ActiveMqConfig {
      // 省略其他内容

      /**

      • JmsListener注解默认只接收queue消息,如果要接收topic消息,需要设置containerFactory
        /
        @Bean
        public JmsListenerContainerFactory topicListenerContainer(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // 相当于在application.yml中配置:spring.jms.pub-sub-domain=true
        factory.setPubSubDomain(true);
        return factory;
        }
        }
        经过这样的配置之后,我们在消费的时候,在 @JmsListener 注解中指定这个容器工厂即可消费 topic 消息。如下:
        /*
    • Topic消息消费者
    • @author shengwu ni
      */
      @Service
      public class TopicConsumer1 {

      /**

      • 接收订阅消息
      • @param msg
        */
        @JmsListener(destination = ActiveMqConfig.TOPIC_NAME, containerFactory = "topicListenerContainer")
        public void receiveTopicMsg(String msg) {
        System.out.println("收到的消息为:" + msg);
        }

}
指定 containerFactory 属性为上面我们自己配置的 topicListenerContainer 即可。由于 topic 消息可以多个消费,所以该消费的类可以拷贝几个一起测试一下,这里我就不贴代码了,可以参考我的源码测试。
3.5.3 测试一下
启动项目,在浏览器中输入:http://localhost:8081/activemq/send/topic,观察控制台的输出日志,出现下面的日志说明消息发送和消费成功。
收到的消息为:Topic: hello activemq!
收到的消息为:Topic: hello activemq!

  1. 总结
    本章主要介绍了 jms 和 activemq 的相关概念、activemq 的安装与启动。详细分析了 Spring Boot 中点对点消息和发布/订阅消息两种方式的配置、消息生产和消费方式。ActiveMQ 是能力强劲的开源消息总线,在异步消息的处理上很有用,希望大家好好消化一下。
相关文章
|
13天前
|
数据采集 人工智能 安全
|
8天前
|
编解码 人工智能 自然语言处理
⚽阿里云百炼通义万相 2.6 视频生成玩法手册
通义万相Wan 2.6是全球首个支持角色扮演的AI视频生成模型,可基于参考视频形象与音色生成多角色合拍、多镜头叙事的15秒长视频,实现声画同步、智能分镜,适用于影视创作、营销展示等场景。
663 4
|
8天前
|
机器学习/深度学习 人工智能 前端开发
构建AI智能体:七十、小树成林,聚沙成塔:随机森林与大模型的协同进化
随机森林是一种基于决策树的集成学习算法,通过构建多棵决策树并结合它们的预测结果来提高准确性和稳定性。其核心思想包括两个随机性:Bootstrap采样(每棵树使用不同的训练子集)和特征随机选择(每棵树分裂时只考虑部分特征)。这种方法能有效处理大规模高维数据,避免过拟合,并评估特征重要性。随机森林的超参数如树的数量、最大深度等可通过网格搜索优化。该算法兼具强大预测能力和工程化优势,是机器学习中的常用基础模型。
350 164
|
7天前
|
机器学习/深度学习 自然语言处理 机器人
阿里云百炼大模型赋能|打造企业级电话智能体与智能呼叫中心完整方案
畅信达基于阿里云百炼大模型推出MVB2000V5智能呼叫中心方案,融合LLM与MRCP+WebSocket技术,实现语音识别率超95%、低延迟交互。通过电话智能体与座席助手协同,自动化处理80%咨询,降本增效显著,适配金融、电商、医疗等多行业场景。
359 155