干货|SpringBoot JMS(ActiveMQ)API实践应用详解

简介: Active是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。AC-tiveMQ使用Apache提供的...

 image.gif

image.gif

前言

Active是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。AC-tiveMQ使用Apache提供的授权,任何人都可以对其实现代码进行修改。

ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件。ActiveMQ实现了JMS标准并提供了很多附加的特性。本文将带大家详细介绍ActiveMQ的API的使用。

公众号:「浅羽的IT小屋」

1. JMS的概念?

「什么是JMS呢:」

    • JMS---------JAVA    Message    Service
    • JAVA的消息服务,是sun公司提供的接口,只是一个规范,这个规范就类似于JDBC是一样的,使用的时候是需要当前规范的实现产品的。

    「JMS能干什么呢:」

      • 能够将信息发布到目的地
      • 可以从目的地来消费这个消息

      2、两种通信模型

      「队列的通信概念:」

        • 特点:当我们同一个队列有多个消费者的时候,多个消费者的数据之和才是原来队列中的所有数据
        • 队列的通信模型最大的适用场景:流量的消峰,高并发的处理

        「主题的通信模型:」

          • 特点:当我们队列有多个消费者的时候,那么这多个消费者消费到的数据是一样的
          • 主题消费者通信模型的适用场景:微服务下服务之间的异步通信

          3. MQ的实现产品

          「实现产品:」

            • ActiveMQ
            • RabbitMQ
            • RockerMQ
            • Kafka(这个设计的初衷是做分布式的日志的,后来因为日志有严格的顺序问题,这个时候人们就用Kafka来做消息队列了)

            4、JMS中常见的名词

            「常见的名词:」

              • ActiveMQConnectionFactory:这个是创建连接的工厂
              • ConnectionFactory:连接的工厂
              • Connection:连接JAVA对MQ的一个连接
              • Destination:目的地
              • 生产者(Producer)
              • 消费者(Consumer)
              • Session:会话(每一次对MQ的操作都称为一次会话)
              • Queue:队列
              • Topic:主题

              5、什么是消息队列

              「消息队列简单的说就是用来存放临时数据的地方:」

                • 生产者----------->存储介质上
                • 消费者----------->存储介质上

                「消息队列类似于快递公司:」

                  • 你可以将东西交给快递公司
                  • 目标人也可以从快递公司去取东西

                  6. ActiveMQ是什么

                  「含义:」

                    • ActiveMQ就是一个JMS的实现产品,它能够实现JMS下的所有功能

                    7、ActiveMQ能干什么

                    「主要作用:」

                      • 流量消峰处理
                      • 微服务下模块的异步通信
                      • 处理高并发下的订单
                      • 处理第三方平台的高并发
                      • 协助消息表可以完成分布式事务的最终一致性

                      8、ActiveMQ的安装

                      「ActiveMQ的安装和配置:」

                      1、官网下载Linux版的ActiveMQ(最新版本为5.13.4)
                             https://activemq.apache.org/download.html
                             2、解压安装
                            tar -zxvf apache-activemq-5.13.4-bin.tar.gz
                             3、配置(这里采用默认配置,无需修改)
                            vim /usr/lical/activemq-1/conf/activemq.xml
                             4、启动
                            cd /usr/local/activemq-1/bin
                      ./activemq start
                            5、打开管理界面(管理界面可以查看并管理所有队列及消息)
                               http://192.168.1.100:8161
                             启动成功后,可以浏览 http://localhost:8161/admin/
                             默认用户名、密码:admin/admin
                             管理界面是用jetty做容器的,如果想修改管理界面的端口,可以编辑../conf/jetty.xml,找到下面这一段:
                            <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
                          <!-- the default port       number for the web console -->
                          <property name="host" value="0.0.0.0"/>
                          <property name="port" value="8161"/>
                           </bean>
                             用户名/密码是在 ../conf/jetty-realm.properties 里,比如要增加一个管理员jimmy/123456,可参考下面修改:
                             1
                      2
                      3admin: admin, admin
                      jimmy: 123456, admin
                      user: user, user
                             注:管理界面有一个小坑,ActiveMQ 5.13.2与jdk1.8兼容性有点问题,如果使用jdk1.8,管理界面进入Queues标签页时,偶尔会报错,但是并不影响消息正常收发,只是无法从界面上查看队列情况,如果出现该问题,可将jdk版本降至1.7,同时最好清空data目录下的所有数据,再重启activemq即可。

                      image.gif

                      9. ActiveMQ的API的使用

                      「AcatveMQ的API使用:」

                        • 队列的使用(生产者)
                        package com.qy.mq.queue;
                        import org.apache.activemq.ActiveMQConnectionFactory;
                        import org.apache.activemq.Message;
                        import javax.jms.*;
                        /**
                         * @Auther: qianyu
                         * @Date: 2020/11/04 14:12
                         * @Description:生产者
                         */
                        public class Producer {
                            //准备发布的这个地址
                            private  static final String PATH="tcp://10.7.182.87:61616";
                            //ActiveMQ下的用户名
                            private static final String userName="admin";
                            //ActiveMQ下的密码
                            private static final String password="admin";
                            public static void main(String[] args) throws JMSException {
                                //第一步:创建连接的工厂
                                ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH);
                                //通过这个工厂获取连接
                                Connection connection = activeMQConnectionFactory.createConnection();
                                //第三步:打开这个连接
                                connection.start();
                                //第四步:创建操作MQ的这个会话
                                /**
                                 * 第一个参数:是否使用事务
                                 * 第二个参数:客户端的应答模式
                                 *     第一种:自动应答
                                 *     第二种:客户端手动应答
                                 */
                                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                                //需要发送消息的目的地(queue操作的是队列)
                                Destination destination=session.createQueue("wqqq");
                                //生产者来生产这个消息
                                //要有生产者
                                MessageProducer messageProducer = session.createProducer(destination);
                                //发送很多的消息到消息队列中去
                        //        for (int i=0;i<100;i++){
                                    //需要准备发送的消息
                        //            TextMessage textMessage = session.createTextMessage("我是浅羽:"+i);
                                    //研究消息的类型
                                  /*  BytesMessage bytesMessage = session.createBytesMessage();
                                    bytesMessage.setByteProperty("www",new Byte("123"));
                                    //接下来就可以发送消息了
                                    messageProducer.send(bytesMessage);*/
                                //创建map类型的message
                                /*MapMessage mapMessage = session.createMapMessage();
                                mapMessage.setInt("www1",123);
                                messageProducer.send(mapMessage);*/
                                ObjectMessage objectMessage = session.createObjectMessage(new User(1, "qianyu", "123"));
                                messageProducer.send(objectMessage);
                        //        }
                            }
                        }

                        image.gif

                          • 队列的使用(消费者)
                          package com.qy.mq.queue;
                          import org.apache.activemq.ActiveMQConnectionFactory;
                          import javax.jms.*;
                          import java.io.Serializable;
                          /**
                           * @Auther: qianyu
                           * @Date: 2020/11/04 14:13
                           * @Description:消费者
                           */
                          public class Consumer {
                              //准备发布的这个地址
                              private  static final String PATH="tcp://10.7.182.87:61616";
                              //ActiveMQ下的用户名
                              private static final String userName="admin";
                              //ActiveMQ下的密码
                              private static final String password="admin";
                              public static void main(String[] args) throws JMSException {
                                  //第一步:创建连接的工厂
                                  ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH);
                                  //通过这个工厂获取连接
                                  Connection connection = activeMQConnectionFactory.createConnection();
                                  //第三步:打开这个连接
                                  connection.start();
                                  //第四步:创建操作MQ的这个会话
                                  /**
                                   * 第一个参数:是否使用事务
                                   * 第二个参数:客户端的应答模式
                                   *     第一种:自动应答
                                   *     第二种:客户端手动应答
                                   */
                                  Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                                  //需要发送消息的目的地(queue操作的是队列)
                                  Destination destination=session.createQueue("wqqq");
                                  //创建我们的消费者了
                                  MessageConsumer messageConsumer = session.createConsumer(destination);
                                  //接下来就可以接收我们的消息了
                                  //Message message = messageConsumer.receive();
                                  //接收消息并指定这个超时的时间
                          //      Message message = messageConsumer.receive(5000);
                                  //接收消息没有就不等待了 直接over了  不接收了
                          //      Message message = messageConsumer.receiveNoWait();
                                  //给定当前的路径设置监听器
                                  messageConsumer.setMessageListener(new MessageListener() {
                                      @Override
                                      public void onMessage(Message message) {
                                         /* BytesMessage bytesMessage= (BytesMessage) message;
                                          try {
                                              System.out.println("获取到的数据是:"+bytesMessage.getByteProperty("www"));
                                          } catch (JMSException e) {
                                              e.printStackTrace();
                                          }*/
                                         /*MapMessage mapMessage= (MapMessage) message;
                                          try {
                                              System.out.println("获取到的数据是:"+mapMessage.getInt("www1"));
                                          } catch (JMSException e) {
                                              e.printStackTrace();
                                          }*/
                                         //测试对象类型的消息的发送和接收
                                         ObjectMessage objectMessage= (ObjectMessage) message;
                                          try {
                                              User user = (User) objectMessage.getObject();
                                              System.out.println("接收到的数据是:"+user);
                                          } catch (JMSException e) {
                                              e.printStackTrace();
                                          }
                                        /*  //我们知道是一个字符串类型的消息
                                          TextMessage textMessage= (TextMessage) message;
                                          //接下来就可以打印这个消息了
                                          try {
                                              System.out.println("消费者1---接收到的消息是:"+textMessage.getText());
                                          } catch (JMSException e) {
                                              e.printStackTrace();
                                          }*/
                                          try {
                                              //这句话就表示的是客户端来手动的进行应答
                                              message.acknowledge();
                                          } catch (JMSException e) {
                                              e.printStackTrace();
                                          }
                                      }
                                  });
                              }
                          }

                          image.gif

                            • 主题模型的生产者
                            package com.qy.mq.topic;
                            import org.apache.activemq.ActiveMQConnectionFactory;
                            import javax.jms.*;
                            /**
                             * @Auther: qianyu
                             * @Date: 2020/11/04 15:17
                             * @Description:
                             */
                            public class Producer {
                                //准备发布的这个地址
                                private  static final String PATH="tcp://10.7.182.87:61616";
                                //ActiveMQ下的用户名
                                private static final String userName="admin";
                                //ActiveMQ下的密码
                                private static final String password="admin";
                                public static void main(String[] args) throws JMSException {
                                    //第一步:创建连接的工厂
                                    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH);
                                    //通过这个工厂获取连接
                                    Connection connection = activeMQConnectionFactory.createConnection();
                                    //第三步:打开这个连接
                                    connection.start();
                                    //第四步:创建操作MQ的这个会话
                                    /**
                                     * 第一个参数:是否使用事务
                                     * 第二个参数:客户端的应答模式
                                     *     第一种:自动应答
                                     *     第二种:客户端手动应答
                                     */
                                    Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                                    //需要发送消息的目的地(下面创建的就应该是主题模型的地址)
                                    Destination destination=session.createTopic("topic222");
                                    //生产者来生产这个消息
                                    //要有生产者
                                    MessageProducer messageProducer = session.createProducer(destination);
                                    //发送很多的消息到消息队列中去
                                    for (int i=0;i<100;i++){
                                        //需要准备发送的消息
                                        TextMessage textMessage = session.createTextMessage("我是浅羽:"+i);
                                        //接下来就可以发送消息了
                                        messageProducer.send(textMessage);
                                    }
                                }
                            }

                            image.gif

                              • 主题模型的消费者
                              package com.qy.mq.topic;
                              import org.apache.activemq.ActiveMQConnectionFactory;
                              import javax.jms.*;
                              /**
                               * @Auther: qianyu
                               * @Date: 2020/11/04 15:19
                               * @Description:
                               */
                              public class Consumer {
                                  //准备发布的这个地址
                                  private  static final String PATH="tcp://10.7.182.87:61616";
                                  //ActiveMQ下的用户名
                                  private static final String userName="admin";
                                  //ActiveMQ下的密码
                                  private static final String password="admin";
                                  public static void main(String[] args) throws JMSException {
                                      //第一步:创建连接的工厂
                                      ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH);
                                      //通过这个工厂获取连接
                                      Connection connection = activeMQConnectionFactory.createConnection();
                                      //第三步:打开这个连接
                                      connection.start();
                                      //第四步:创建操作MQ的这个会话
                                      /**
                                       * 第一个参数:是否使用事务
                                       * 第二个参数:客户端的应答模式
                                       *     第一种:自动应答
                                       *     第二种:客户端手动应答
                                       */
                                      Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                                      //需要发送消息的目的地(queue操作的是队列)
                                      Destination destination=session.createTopic("topic222");
                                      //创建我们的消费者了
                                      MessageConsumer messageConsumer = session.createConsumer(destination);
                                      //接下来就可以接收我们的消息了
                                      //给定当前的路径设置监听器
                                      messageConsumer.setMessageListener(new MessageListener() {
                                          @Override
                                          public void onMessage(Message message) {
                                              //我们知道是一个字符串类型的消息
                                              TextMessage textMessage= (TextMessage) message;
                                              //接下来就可以打印这个消息了
                                              try {
                                                  System.out.println("消费者1---接收到的消息是:"+textMessage.getText());
                                              } catch (JMSException e) {
                                                  e.printStackTrace();
                                              }
                                              try {
                                                  message.acknowledge();
                                              } catch (JMSException e) {
                                                  e.printStackTrace();
                                              }
                                          }
                                      });
                                  }
                              }

                              image.gif

                              本篇关于ActiveMQ的介绍就先到这里结束了,后续会出更多关于ActiveMQ系列更多文章,谢谢大家支持!

                              image.gif


                              image.gif

                              点个赞,证明你还爱我

                              相关实践学习
                              RocketMQ一站式入门使用
                              从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
                              消息队列 MNS 入门课程
                              1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
                              相关文章
                              |
                              8天前
                              |
                              设计模式 Java API
                              Java 可扩展 API 设计:打造灵活的应用架构
                              【4月更文挑战第27天】设计可扩展的 API 是构建灵活、易于维护的应用程序架构的关键。Java 提供了丰富的工具和技术来实现这一目标,使开发者能够构建具有高度可扩展性的应用程序。
                              26 4
                              |
                              8天前
                              |
                              存储 缓存 安全
                              API在Visual Basic中的应用:连接外部服务与扩展功能
                              【4月更文挑战第27天】本文探讨了在Visual Basic中使用API连接外部服务和扩展功能的方法,涵盖了API的基本概念、种类及如何使用本地和Web API。通过DllImport调用本地API,利用HttpClient和WebClient与Web API交互,同时强调了第三方API的使用和SOA架构中的API角色。安全性、性能优化和错误处理是实践中的关键点。案例研究和最佳实践有助于开发者更有效地利用API,提升Visual Basic应用程序的功能和灵活性。随着API技术的发展,Visual Basic将持续支持开发者创造更强大的应用。
                              |
                              2月前
                              |
                              人工智能 关系型数据库 Serverless
                              Serverless 应用引擎常见问题之API生成的函数镜像改为自定义的镜像如何解决
                              Serverless 应用引擎(Serverless Application Engine, SAE)是一种完全托管的应用平台,它允许开发者无需管理服务器即可构建和部署应用。以下是Serverless 应用引擎使用过程中的一些常见问题及其答案的汇总:
                              39 3
                              |
                              6天前
                              |
                              机器学习/深度学习 算法 安全
                              深度学习在图像识别中的应用与挑战构建高效可扩展的RESTful API:后端开发的实战指南
                              【4月更文挑战第30天】 随着计算机视觉技术的飞速发展,深度学习在图像识别领域取得了显著的成果。本文将探讨深度学习技术在图像识别中的应用及其所面临的挑战。首先,我们将介绍深度学习的基本原理和关键技术,然后分析其在图像识别中的优势和应用案例。最后,我们将讨论当前深度学习在图像识别领域所面临的主要挑战和未来的发展趋势。
                              |
                              7天前
                              |
                              运维 Serverless API
                              Serverless 应用引擎产品使用之在阿里函数计算中开启函数计算 API 接口如何解决
                              阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
                              106 6
                              |
                              7天前
                              |
                              弹性计算 运维 Serverless
                              Serverless 应用引擎产品使用之在阿里函数计算中,使用阿里云API或SDK从函数计算调用ECS实例的服务如何解决
                              阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
                              33 4
                              |
                              7天前
                              |
                              缓存 运维 Serverless
                              Serverless 应用引擎产品使用之阿里函数计算中。将本地电脑上的项目文件部署到阿里云函数计算(FC)上并实现对外提供API和WebUI如何解决
                              阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
                              25 1
                              |
                              7天前
                              |
                              运维 Serverless 5G
                              Serverless 应用引擎产品使用之调用阿里云函数计算API时获取有效的鉴权令牌如何解决
                              阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
                              8 1
                              |
                              9天前
                              |
                              缓存 监控 NoSQL
                              7min到40s:SpringBoot 启动优化实践!
                              7min到40s:SpringBoot 启动优化实践!
                              22 1
                              |
                              12天前
                              |
                              缓存 人工智能 API
                              【Python+微信】【企业微信开发入坑指北】2. 如何利用企业微信API主动给用户发应用消息
                              【Python+微信】【企业微信开发入坑指北】2. 如何利用企业微信API主动给用户发应用消息
                              10 0