干货|SpringBoot JMS(ActiveMQ)API实践应用详解

简介: Active是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。AC-tiveMQ使用Apache提供的...

 image.gif

image.gif

前言

Active是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。AC-tiveMQ使用Apache提供的授权,任何人都可以对其实现代码进行修改。

ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件。ActiveMQ实现了JMS标准并提供了很多附加的特性。本文将带大家详细介绍ActiveMQ的API的使用。

公众号:「浅羽的IT小屋」

1. JMS的概念?

「什么是JMS呢:」

    • JMS---------JAVA    Message    Service
    • JAVA的消息服务,是sun公司提供的接口,只是一个规范,这个规范就类似于JDBC是一样的,使用的时候是需要当前规范的实现产品的。

    「JMS能干什么呢:」

      • 能够将信息发布到目的地
      • 可以从目的地来消费这个消息

      2、两种通信模型

      「队列的通信概念:」

        • 特点:当我们同一个队列有多个消费者的时候,多个消费者的数据之和才是原来队列中的所有数据
        • 队列的通信模型最大的适用场景:流量的消峰,高并发的处理

        「主题的通信模型:」

          • 特点:当我们队列有多个消费者的时候,那么这多个消费者消费到的数据是一样的
          • 主题消费者通信模型的适用场景:微服务下服务之间的异步通信

          3. MQ的实现产品

          「实现产品:」

            • ActiveMQ
            • RabbitMQ
            • RockerMQ
            • Kafka(这个设计的初衷是做分布式的日志的,后来因为日志有严格的顺序问题,这个时候人们就用Kafka来做消息队列了)

            4、JMS中常见的名词

            「常见的名词:」

              • ActiveMQConnectionFactory:这个是创建连接的工厂
              • ConnectionFactory:连接的工厂
              • Connection:连接JAVA对MQ的一个连接
              • Destination:目的地
              • 生产者(Producer)
              • 消费者(Consumer)
              • Session:会话(每一次对MQ的操作都称为一次会话)
              • Queue:队列
              • Topic:主题

              5、什么是消息队列

              「消息队列简单的说就是用来存放临时数据的地方:」

                • 生产者----------->存储介质上
                • 消费者----------->存储介质上

                「消息队列类似于快递公司:」

                  • 你可以将东西交给快递公司
                  • 目标人也可以从快递公司去取东西

                  6. ActiveMQ是什么

                  「含义:」

                    • ActiveMQ就是一个JMS的实现产品,它能够实现JMS下的所有功能

                    7、ActiveMQ能干什么

                    「主要作用:」

                      • 流量消峰处理
                      • 微服务下模块的异步通信
                      • 处理高并发下的订单
                      • 处理第三方平台的高并发
                      • 协助消息表可以完成分布式事务的最终一致性

                      8、ActiveMQ的安装

                      「ActiveMQ的安装和配置:」

                      1、官网下载Linux版的ActiveMQ(最新版本为5.13.4)
                             https://activemq.apache.org/download.html
                             2、解压安装
                            tar -zxvf apache-activemq-5.13.4-bin.tar.gz
                             3、配置(这里采用默认配置,无需修改)
                            vim /usr/lical/activemq-1/conf/activemq.xml
                             4、启动
                            cd /usr/local/activemq-1/bin
                      ./activemq start
                            5、打开管理界面(管理界面可以查看并管理所有队列及消息)
                               http://192.168.1.100:8161
                             启动成功后,可以浏览 http://localhost:8161/admin/
                             默认用户名、密码:admin/admin
                             管理界面是用jetty做容器的,如果想修改管理界面的端口,可以编辑../conf/jetty.xml,找到下面这一段:
                            <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
                          <!-- the default port       number for the web console -->
                          <property name="host" value="0.0.0.0"/>
                          <property name="port" value="8161"/>
                           </bean>
                             用户名/密码是在 ../conf/jetty-realm.properties 里,比如要增加一个管理员jimmy/123456,可参考下面修改:
                             1
                      2
                      3admin: admin, admin
                      jimmy: 123456, admin
                      user: user, user
                             注:管理界面有一个小坑,ActiveMQ 5.13.2与jdk1.8兼容性有点问题,如果使用jdk1.8,管理界面进入Queues标签页时,偶尔会报错,但是并不影响消息正常收发,只是无法从界面上查看队列情况,如果出现该问题,可将jdk版本降至1.7,同时最好清空data目录下的所有数据,再重启activemq即可。

                      image.gif

                      9. ActiveMQ的API的使用

                      「AcatveMQ的API使用:」

                        • 队列的使用(生产者)
                        package com.qy.mq.queue;
                        import org.apache.activemq.ActiveMQConnectionFactory;
                        import org.apache.activemq.Message;
                        import javax.jms.*;
                        /**
                         * @Auther: qianyu
                         * @Date: 2020/11/04 14:12
                         * @Description:生产者
                         */
                        public class Producer {
                            //准备发布的这个地址
                            private  static final String PATH="tcp://10.7.182.87:61616";
                            //ActiveMQ下的用户名
                            private static final String userName="admin";
                            //ActiveMQ下的密码
                            private static final String password="admin";
                            public static void main(String[] args) throws JMSException {
                                //第一步:创建连接的工厂
                                ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH);
                                //通过这个工厂获取连接
                                Connection connection = activeMQConnectionFactory.createConnection();
                                //第三步:打开这个连接
                                connection.start();
                                //第四步:创建操作MQ的这个会话
                                /**
                                 * 第一个参数:是否使用事务
                                 * 第二个参数:客户端的应答模式
                                 *     第一种:自动应答
                                 *     第二种:客户端手动应答
                                 */
                                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                                //需要发送消息的目的地(queue操作的是队列)
                                Destination destination=session.createQueue("wqqq");
                                //生产者来生产这个消息
                                //要有生产者
                                MessageProducer messageProducer = session.createProducer(destination);
                                //发送很多的消息到消息队列中去
                        //        for (int i=0;i<100;i++){
                                    //需要准备发送的消息
                        //            TextMessage textMessage = session.createTextMessage("我是浅羽:"+i);
                                    //研究消息的类型
                                  /*  BytesMessage bytesMessage = session.createBytesMessage();
                                    bytesMessage.setByteProperty("www",new Byte("123"));
                                    //接下来就可以发送消息了
                                    messageProducer.send(bytesMessage);*/
                                //创建map类型的message
                                /*MapMessage mapMessage = session.createMapMessage();
                                mapMessage.setInt("www1",123);
                                messageProducer.send(mapMessage);*/
                                ObjectMessage objectMessage = session.createObjectMessage(new User(1, "qianyu", "123"));
                                messageProducer.send(objectMessage);
                        //        }
                            }
                        }

                        image.gif

                          • 队列的使用(消费者)
                          package com.qy.mq.queue;
                          import org.apache.activemq.ActiveMQConnectionFactory;
                          import javax.jms.*;
                          import java.io.Serializable;
                          /**
                           * @Auther: qianyu
                           * @Date: 2020/11/04 14:13
                           * @Description:消费者
                           */
                          public class Consumer {
                              //准备发布的这个地址
                              private  static final String PATH="tcp://10.7.182.87:61616";
                              //ActiveMQ下的用户名
                              private static final String userName="admin";
                              //ActiveMQ下的密码
                              private static final String password="admin";
                              public static void main(String[] args) throws JMSException {
                                  //第一步:创建连接的工厂
                                  ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH);
                                  //通过这个工厂获取连接
                                  Connection connection = activeMQConnectionFactory.createConnection();
                                  //第三步:打开这个连接
                                  connection.start();
                                  //第四步:创建操作MQ的这个会话
                                  /**
                                   * 第一个参数:是否使用事务
                                   * 第二个参数:客户端的应答模式
                                   *     第一种:自动应答
                                   *     第二种:客户端手动应答
                                   */
                                  Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                                  //需要发送消息的目的地(queue操作的是队列)
                                  Destination destination=session.createQueue("wqqq");
                                  //创建我们的消费者了
                                  MessageConsumer messageConsumer = session.createConsumer(destination);
                                  //接下来就可以接收我们的消息了
                                  //Message message = messageConsumer.receive();
                                  //接收消息并指定这个超时的时间
                          //      Message message = messageConsumer.receive(5000);
                                  //接收消息没有就不等待了 直接over了  不接收了
                          //      Message message = messageConsumer.receiveNoWait();
                                  //给定当前的路径设置监听器
                                  messageConsumer.setMessageListener(new MessageListener() {
                                      @Override
                                      public void onMessage(Message message) {
                                         /* BytesMessage bytesMessage= (BytesMessage) message;
                                          try {
                                              System.out.println("获取到的数据是:"+bytesMessage.getByteProperty("www"));
                                          } catch (JMSException e) {
                                              e.printStackTrace();
                                          }*/
                                         /*MapMessage mapMessage= (MapMessage) message;
                                          try {
                                              System.out.println("获取到的数据是:"+mapMessage.getInt("www1"));
                                          } catch (JMSException e) {
                                              e.printStackTrace();
                                          }*/
                                         //测试对象类型的消息的发送和接收
                                         ObjectMessage objectMessage= (ObjectMessage) message;
                                          try {
                                              User user = (User) objectMessage.getObject();
                                              System.out.println("接收到的数据是:"+user);
                                          } catch (JMSException e) {
                                              e.printStackTrace();
                                          }
                                        /*  //我们知道是一个字符串类型的消息
                                          TextMessage textMessage= (TextMessage) message;
                                          //接下来就可以打印这个消息了
                                          try {
                                              System.out.println("消费者1---接收到的消息是:"+textMessage.getText());
                                          } catch (JMSException e) {
                                              e.printStackTrace();
                                          }*/
                                          try {
                                              //这句话就表示的是客户端来手动的进行应答
                                              message.acknowledge();
                                          } catch (JMSException e) {
                                              e.printStackTrace();
                                          }
                                      }
                                  });
                              }
                          }

                          image.gif

                            • 主题模型的生产者
                            package com.qy.mq.topic;
                            import org.apache.activemq.ActiveMQConnectionFactory;
                            import javax.jms.*;
                            /**
                             * @Auther: qianyu
                             * @Date: 2020/11/04 15:17
                             * @Description:
                             */
                            public class Producer {
                                //准备发布的这个地址
                                private  static final String PATH="tcp://10.7.182.87:61616";
                                //ActiveMQ下的用户名
                                private static final String userName="admin";
                                //ActiveMQ下的密码
                                private static final String password="admin";
                                public static void main(String[] args) throws JMSException {
                                    //第一步:创建连接的工厂
                                    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH);
                                    //通过这个工厂获取连接
                                    Connection connection = activeMQConnectionFactory.createConnection();
                                    //第三步:打开这个连接
                                    connection.start();
                                    //第四步:创建操作MQ的这个会话
                                    /**
                                     * 第一个参数:是否使用事务
                                     * 第二个参数:客户端的应答模式
                                     *     第一种:自动应答
                                     *     第二种:客户端手动应答
                                     */
                                    Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                                    //需要发送消息的目的地(下面创建的就应该是主题模型的地址)
                                    Destination destination=session.createTopic("topic222");
                                    //生产者来生产这个消息
                                    //要有生产者
                                    MessageProducer messageProducer = session.createProducer(destination);
                                    //发送很多的消息到消息队列中去
                                    for (int i=0;i<100;i++){
                                        //需要准备发送的消息
                                        TextMessage textMessage = session.createTextMessage("我是浅羽:"+i);
                                        //接下来就可以发送消息了
                                        messageProducer.send(textMessage);
                                    }
                                }
                            }

                            image.gif

                              • 主题模型的消费者
                              package com.qy.mq.topic;
                              import org.apache.activemq.ActiveMQConnectionFactory;
                              import javax.jms.*;
                              /**
                               * @Auther: qianyu
                               * @Date: 2020/11/04 15:19
                               * @Description:
                               */
                              public class Consumer {
                                  //准备发布的这个地址
                                  private  static final String PATH="tcp://10.7.182.87:61616";
                                  //ActiveMQ下的用户名
                                  private static final String userName="admin";
                                  //ActiveMQ下的密码
                                  private static final String password="admin";
                                  public static void main(String[] args) throws JMSException {
                                      //第一步:创建连接的工厂
                                      ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH);
                                      //通过这个工厂获取连接
                                      Connection connection = activeMQConnectionFactory.createConnection();
                                      //第三步:打开这个连接
                                      connection.start();
                                      //第四步:创建操作MQ的这个会话
                                      /**
                                       * 第一个参数:是否使用事务
                                       * 第二个参数:客户端的应答模式
                                       *     第一种:自动应答
                                       *     第二种:客户端手动应答
                                       */
                                      Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                                      //需要发送消息的目的地(queue操作的是队列)
                                      Destination destination=session.createTopic("topic222");
                                      //创建我们的消费者了
                                      MessageConsumer messageConsumer = session.createConsumer(destination);
                                      //接下来就可以接收我们的消息了
                                      //给定当前的路径设置监听器
                                      messageConsumer.setMessageListener(new MessageListener() {
                                          @Override
                                          public void onMessage(Message message) {
                                              //我们知道是一个字符串类型的消息
                                              TextMessage textMessage= (TextMessage) message;
                                              //接下来就可以打印这个消息了
                                              try {
                                                  System.out.println("消费者1---接收到的消息是:"+textMessage.getText());
                                              } catch (JMSException e) {
                                                  e.printStackTrace();
                                              }
                                              try {
                                                  message.acknowledge();
                                              } catch (JMSException e) {
                                                  e.printStackTrace();
                                              }
                                          }
                                      });
                                  }
                              }

                              image.gif

                              本篇关于ActiveMQ的介绍就先到这里结束了,后续会出更多关于ActiveMQ系列更多文章,谢谢大家支持!

                              image.gif


                              image.gif

                              点个赞,证明你还爱我

                              相关实践学习
                              消息队列RocketMQ版:基础消息收发功能体验
                              本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
                              消息队列 MNS 入门课程
                              1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
                              相关文章
                              |
                              8天前
                              |
                              API 开发工具 开发者
                              探究亚马逊国际获得AMAZON商品详情 API 接口功能、作用与实际应用示例
                              亚马逊提供的Amazon Product Advertising API或Selling Partner API,使开发者能编程访问亚马逊商品数据,包括商品标题、描述、价格等。支持跨境电商和数据分析,提供商品搜索和详情获取等功能。示例代码展示了如何使用Python和boto3库获取特定商品信息。使用时需遵守亚马逊政策并注意可能产生的费用。
                              |
                              13天前
                              |
                              监控 搜索推荐 安全
                              探究亚马逊详情API接口:开发与应用
                              在数字化时代,亚马逊作为全球领先的电商平台,为商家和消费者提供了丰富的商品信息和便捷的购物体验。本文深入探讨了亚马逊详情API接口的获取与运用,帮助开发者和商家实时监控商品数据、分析市场趋势、优化价格策略、分析竞争对手、构建推荐系统及自动化营销工具,从而在竞争中占据优势。文章还提供了Python调用示例和注意事项,确保API使用的安全与高效。
                              41 3
                              |
                              15天前
                              |
                              Prometheus 监控 Java
                              深入探索:自制Agent监控API接口耗时实践
                              在微服务架构中,监控API接口的调用耗时对于性能优化至关重要。通过监控接口耗时,我们可以识别性能瓶颈,优化服务响应速度。本文将分享如何自己动手实现一个Agent来统计API接口的调用耗时,提供一种实用的技术解决方案。
                              27 3
                              |
                              17天前
                              |
                              搜索推荐 数据挖掘 API
                              API接口在电商的应用及收益
                              本文探讨了API接口技术在电商领域的应用及其带来的收益。API接口作为连接电商平台与外部系统的桥梁,实现了高效、实时的数据交换和集成,提升了用户体验、运营效率和市场竞争力。具体应用包括库存管理、支付网关、物流跟踪、自动化业务流程、个性化推荐和精准营销等方面。通过实战案例分析,展示了亚马逊和小型电商公司如何利用API接口实现自动化管理,提高了工作效率和客户满意度。未来,API接口技术将更加注重智能化、标准化、安全性和跨界合作。
                              51 3
                              |
                              18天前
                              |
                              监控 安全 应用服务中间件
                              微服务架构下的API网关设计策略与实践####
                              本文深入探讨了在微服务架构下,API网关作为系统统一入口点的设计策略、实现细节及其在实际应用中的最佳实践。不同于传统的摘要概述,本部分将直接以一段精简的代码示例作为引子,展示一个基于NGINX的简单API网关配置片段,随后引出文章的核心内容,旨在通过具体实例激发读者兴趣,快速理解API网关在微服务架构中的关键作用及实现方式。 ```nginx server { listen 80; server_name api.example.com; location / { proxy_pass http://backend_service:5000;
                              |
                              23天前
                              |
                              XML API 网络架构
                              深入理解RESTful API设计原则与实践
                              【10月更文挑战第26天】在数字化浪潮中,API(应用程序编程接口)成为连接不同软件组件的桥梁。本文将深入浅出地探讨如何根据REST(Representational State Transfer)原则设计高效、易于维护和扩展的API,同时分享一些实用的代码示例,帮助开发者构建更加健壮和用户友好的服务。
                              |
                              27天前
                              |
                              数据采集 Java 数据安全/隐私保护
                              Spring Boot 3.3中的优雅实践:全局数据绑定与预处理
                              【10月更文挑战第22天】 在Spring Boot应用中,`@ControllerAdvice`是一个强大的工具,它允许我们在单个位置处理多个控制器的跨切面关注点,如全局数据绑定和预处理。这种方式可以大大减少重复代码,提高开发效率。本文将探讨如何在Spring Boot 3.3中使用`@ControllerAdvice`来实现全局数据绑定与预处理。
                              61 2
                              |
                              28天前
                              |
                              JSON 供应链 API
                              京东商品评价API的获取和应用
                              京东商品评价API是电商数据分析的重要工具,帮助开发者和商家获取商品的用户评价数据,包括评分、评论内容和购买时间等。通过分析这些数据,商家可以优化产品和服务,提升客户满意度,制定更有效的营销策略。本文介绍了获取和应用京东商品评价API的详细步骤,包括注册账号、获取权限、阅读文档和编写代码调用API。示例代码展示了如何使用Python调用API并处理响应数据。
                              87 2
                              |
                              29天前
                              |
                              JSON API 开发者
                              淘宝商品评价API的获取与应用
                              在数字化时代,电商平台如淘宝成为消费者购物的主要渠道。本文介绍如何使用淘宝开放平台的商品评论API获取并利用评论数据,以优化产品和服务,提升用户体验。内容涵盖API的重要性、准备工作、调用流程及代码实现,帮助开发者高效获取和分析数据。
                              50 3
                              |
                              29天前
                              |
                              缓存 数据挖掘 API
                              淘宝商品类目API的获取与应用探索
                              淘宝商品类目API是淘宝开放平台提供的关键服务,允许开发者获取淘宝商品的类目信息,包括根类目、子类目及属性信息。本文介绍API的获取方法、应用场景及使用技巧,帮助电商从业者和开发者更好地利用类目数据,提升商品管理、搜索推荐及数据分析等能力。
                              69 1
                              下一篇
                              无影云桌面