ActiveMQ学习记录

简介: ActiveMQ学习记录

下载安装地址:archive.apache.org/dist/activemq

Maven依赖:

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.7.0</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>5.3.6</version>
</dependency>

image.gif

    • 注意:不要直接导入activemq-all这个依赖不让会有以来冲突,这个包太大了,可以分开导入

    MQ的产品种类和对比

    image.gif编辑

    现有主流的MQ:

      • kafka

      编程语言:scala。

      大数据领域的主流MQ。

        • rabbitmq

        编程语言:erlang

        基于erlang语言,不好修改底层,不要查找问题的原因,不建议选用。

          • rocketmq

          编程语言:java

          适用于大型项目。适用于集群。

            • activemq

            编程语言:java

            适用于中小型项目。

            MQ的主要作用

              1. 解耦,解除各个系统之间的耦合性
              2. 削峰,抵御大量的流量,保护主要业务
              3. 异步,调用者无需等待

              MQ的两种模式:

                1. Queue模式:一对一
                2. topic主题模式:一对多image.gif编辑

                image.gif编辑

                JMS编码规范

                image.gif编辑

                image.gif编辑

                队列消息Queue总结

                同步阻塞方式(receive):

                订阅者或接收者抵用MessageConsumer的receive()方法来接收消息,receive方法在能接收到消息之前(或超时之前)将一直阻塞。

                异步非阻塞方式(监听器onMessage()):

                订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统会自动调用监听器MessageListener的onMessage(Message message)方法。

                队列消息Queue特点

                image.gif编辑

                Queue模式代码

                gitee地址:https://gitee.com/mantianx/Mq_Sub/tree/master/ActiveMq_Pro/src/main/java/icu/look/smile/basic/queue

                Topic

                在发布订阅消息传递域中,目的地被称为主题(topic)

                发布/订阅消息传递域的特点如下:

                (1)生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系;

                (2)生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息。

                (3)生产者生产时,topic不保存消息它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者。

                Topic模式代码

                gitee地址:https://gitee.com/mantianx/Mq_Sub/tree/master/ActiveMq_Pro/src/main/java/icu/look/smile/basic/topic

                Topic和Queue的对比

                image.gif编辑


                JMS规范

                  • jms是什么?
                    什么是Java消息服务?
                    Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。image.gif
                    image.gif

                  消息头

                  JMS的消息头有哪些属性:

                  JMSDestination:消息目的地

                  JMSDeliveryMode:消息持久化模式

                  JMSExpiration:消息过期时间,设为0表示永不过期

                  JMSPriority:消息的优先级,0~4普通,5~9加急,默认4

                  JMSMessageID:消息的唯一标识符。后面我们会介绍如何解决幂等性。

                  说明: 消息的生产者可以set这些属性,消息的消费者可以get这些属性。

                  这些属性在send方法里面也可以设置,可以用雪花算法。

                  消息体

                  image.gif编辑  

                  消息属性

                  如果需要除消息头字段之外的值,那么可以使用消息属性。他是识别/去重/重点标注等操作,非常有用的方法。

                  他们是以属性名和属性值对的形式制定的。可以将属性是为消息头得扩展,属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。消息的属性就像可以分配给一条消息的附加消息头一样。它们允许开发者添加有关消息的不透明附加信息。它们还用于暴露消息选择器在消息过滤时使用的数据。

                    • 示例代码:
                    for (int i = 1; i < 4 ; i++) {
                                TextMessage textMessage = session.createTextMessage("topic_name--" + i);
                                // 调用Message的set*Property()方法,就能设置消息属性。根据value的数据类型的不同,有相应的API。
                                textMessage.setStringProperty("From","ZhangSan@qq.com");
                                textMessage.setByteProperty("Spec", (byte) 1);
                                textMessage.setBooleanProperty("Invalide",true);
                                messageProducer.send(textMessage);
                            }

                    image.gif

                    if (null != message  && message instanceof TextMessage){
                                        TextMessage textMessage = (TextMessage)message;
                                        try {
                                          System.out.println("消息体:"+textMessage.getText());
                                          System.out.println("消息属性:"+textMessage.getStringProperty("From"));
                                          System.out.println("消息属性:"+textMessage.getByteProperty("Spec"));
                                          System.out.println("消息属性:"+textMessage.getBooleanProperty("Invalide"));
                                        }catch (JMSException e) {
                                        }
                                    }

                    image.gif

                    消息的持久和非持久

                      • Queue的持久和非持久image.gif编辑
                        • 注意:默认是持久

                          消息的签收机制

                            • 签收的几种方式
                              • 自动签收(AUTO_ACKNOWLEDGE):该方式是默认的。该种方式,无需我们程序做任何操作,框架会帮我们自动签收收到的消息。
                              • 手动签收(CLIENT_ACKNOWLEDGE):手动签收。该种方式,需要我们手动调用Message.acknowledge(),来签收消息。如果不签收消息,该消息会被我们反复消费,只到被签收。
                              • 允许重复消息(DUPS_OK_ACKNOWLEDGE):多线程或多个消费者同时消费到一个消息,因为线程不安全,可能会重复消费。该种方式很少使用到。
                              • 事务下的签收(SESSION_TRANSACTED):开始事务的情况下,可以使用该方式。该种方式很少使用到。
                                • 事务和签收的关系
                                  • 在事务性会话中,当一个事务被成功提交则消息被自动签收。如果事务回滚,则消息会被再次传送。事务优先于签收,开始事务后,签收机制不再起任何作用。
                                  • 非事务性会话中,消息何时被确认取决于创建会话时的应答模式。
                                  • 生产者事务开启,只有commit后才能将全部消息变为已消费。
                                  • 事务偏向生产者,签收偏向消费者。也就是说,生产者使用事务更好点,消费者使用签收机制更好点。
                                    • 例子:
                                    • DeliveryTopicConsumer:
                                    package icu.look.smile.basic.deliverytopic.consumer;
                                    import org.apache.activemq.ActiveMQConnectionFactory;
                                    import javax.jms.*;
                                    import java.io.IOException;
                                    import java.util.Properties;
                                    /*
                                     * @Author gcq
                                     * @Description :
                                     * topic持久化
                                     * @Date 16:38 2c021/5/7
                                     **/
                                    @SuppressWarnings("all")
                                    public class DeliveryTopicConsumer {
                                        private static  ActiveMQConnectionFactory factory = null;
                                        private static  Connection connection = null;
                                        private static  Session session = null;
                                        private static  TopicSubscriber durableSubscriber = null;
                                        public static void main(String[] args) {
                                            Properties properties = new Properties();
                                            try {
                                                properties.load(DeliveryTopicConsumer.class.getClassLoader().getResourceAsStream("mq-detail.properties"));
                                                factory = new ActiveMQConnectionFactory(
                                                        (String)properties.get("BROCK_USERNAME") ,
                                                        (String)properties.get("BROCK_PASSWORD"),
                                                        (String) properties.get("BROCK_URL")
                                                        );
                                                connection = factory.createConnection();
                                    //            ClienID一定要在session之前创建connection之后
                                                connection.setClientID("clinet1");
                                                //ture提交事务,结合commit
                                                session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
                                                Topic topic = session.createTopic("topic6");
                                                durableSubscriber = session.createDurableSubscriber(topic, "reamaker2...");
                                                connection.start();
                                                durableSubscriber.setMessageListener((message) -> {
                                                        if (null != message & message instanceof  TextMessage){
                                                            try {
                                                                System.out.println("detail:"+((TextMessage)message).getText());
                                                                message.acknowledge();
                                                                session.commit();
                                                            } catch (JMSException e) {
                                                                e.printStackTrace();
                                                                try {
                                                                    session.rollback();
                                                                } catch (JMSException jmsException) {
                                                                    jmsException.printStackTrace();
                                                                }
                                                            }
                                                        }
                                                });
                                            } catch (IOException | JMSException e) {
                                                e.printStackTrace();
                                            }finally {
                                                if (connection != null ){
                                                    try {
                                                        System.in.read();
                                                        durableSubscriber.close();
                                                        session.close();
                                                        connection.close();
                                                    } catch (JMSException | IOException e) {
                                                        e.printStackTrace();
                                                    }
                                                }
                                            }
                                        }
                                    }

                                    image.gif

                                    • DeliveryTopicProducer:
                                    package icu.look.smile.basic.deliverytopic.producer;
                                    import icu.look.smile.basic.deliverytopic.consumer.DeliveryTopicConsumer;
                                    import org.apache.activemq.ActiveMQConnectionFactory;
                                    import javax.jms.*;
                                    import java.io.IOException;
                                    import java.util.Date;
                                    import java.util.Properties;
                                    import java.util.concurrent.TimeUnit;
                                    /*
                                     * @Author gcq
                                     * @Description :
                                     * topic持久化
                                     * @Date 16:38 2021/5/7
                                     **/
                                    public class DeliveryTopicProducer {
                                        private static ActiveMQConnectionFactory factory = null;
                                        private static Connection connection = null;
                                        private static Session session = null;
                                        private static MessageProducer producer =null;
                                        public static void main(String[] args) throws JMSException {
                                            Properties properties = new Properties();
                                            try {
                                                properties.load(DeliveryTopicConsumer.class.getClassLoader().getResourceAsStream("mq-detail.properties"));
                                                factory = new ActiveMQConnectionFactory(
                                                        (String) properties.get("BROCK_USERNAME"),
                                                        (String) properties.get("BROCK_PASSWORD"),
                                                        (String) properties.get("BROCK_URL")
                                                );
                                                connection = factory.createConnection();
                                                session = connection.createSession(true, Session.SESSION_TRANSACTED);
                                                Topic topic = session.createTopic("topic6");
                                                producer = session.createProducer(topic);
                                                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                                                connection.start();
                                                while (true){
                                                    try {
                                                TextMessage textMessage = session.createTextMessage("msg:" + new Date() + "new!!!");
                                                producer.send(textMessage);
                                                session.commit();
                                                        TimeUnit.SECONDS.sleep(1);
                                                    } catch (InterruptedException e) {
                                                        e.printStackTrace();
                                                        session.rollback();
                                                    }
                                                }
                                            } catch (IOException | JMSException e) {
                                                e.printStackTrace();
                                                session.rollback();
                                            }finally {
                                                if (producer != null){
                                                    try {
                                                        producer.close();
                                                        session.close();
                                                        connection.close();
                                                    } catch ( JMSException e) {
                                                        e.printStackTrace();
                                                    }
                                                }
                                            }
                                        }
                                    }

                                    image.gif

                                      JMS点对点总结

                                      点对点模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。和我们平时给朋友发送短信类似。

                                      如果在Session关闭时有部分消息己被收到但还没有被签收(acknowledged),那当消费者下次连接到相同的队列时,这些消息还会被再次接收

                                      队列可以长久地保存消息直到消费者收到消息。消费者不需要因为担心消息会丢失而时刻和队列保持激活的连接状态,充分体现了异步传输模式的优势

                                        • JMS的发布订阅总结

                                        JMS Pub/Sub 模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作topic。

                                        主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(subscribe)从主题订阅消息。

                                        主题使得消息订阅者和消息发布者保持互相独立不需要解除即可保证消息的传送

                                          • 非持久订阅

                                          非持久订阅只有当客户端处于激活状态,也就是和MQ保持连接状态才能收发到某个主题的消息。

                                          如果消费者处于离线状态,生产者发送的主题消息将会丢失作废,消费者永远不会收到。

                                          一句话:先订阅注册才能接受到发布,只给订阅者发布消息。

                                            • 持久订阅

                                            客户端首先向MQ注册一个自己的身份ID识别号,当这个客户端处于离线时,生产者会为这个ID保存所有发送到主题的消息,当客户再次连接到MQ的时候,会根据消费者的ID得到所有当自己处于离线时发送到主题的消息

                                            当持久订阅状态下,不能恢复或重新派送一个未签收的消息。

                                            持久订阅才能恢复或重新派送一个未签收的消息。

                                              • 非持久和持久化订阅如何选择

                                              当所有的消息必须被接收,则用持久化订阅。当消息丢失能够被容忍,则用非持久订阅。

                                              ActiveMQ的Broker(嵌入式mq)

                                                • 相当于一个ActiveMQ服务器实例。说白了,Broker其实就是实现了用代码的形式启动ActiveMQ将MQ嵌入到Java代码中,以便随时用随时启动,在用的时候再去启动这样能节省了资源,也保证了可用性。这种方式,我们实际开发中很少采用,因为他缺少太多了东西,如:日志,数据存储等等。
                                                • maven依赖坐标:
                                                <dependency>
                                                    <groupId>com.fasterxml.jackson.core</groupId>
                                                    <artifactId>jackson-databind</artifactId>
                                                    <version>2.10.1</version>
                                                </dependency>

                                                image.gif

                                                • Broker来启动类:
                                                package icu.look.smile.broker;
                                                import org.apache.activemq.broker.BrokerService;
                                                /*
                                                 * @Author gcq
                                                 * @Description :
                                                 * activemq 服务器
                                                 * @Date 9:05 2021/5/8
                                                 **/
                                                public class EmActiveMqServer {
                                                    public static void main(String[] args) throws Exception {
                                                        BrokerService brokerService = new BrokerService();
                                                        brokerService.setPopulateJMSXUserID(true);
                                                        brokerService.addConnector("tcp://localhost:61616");
                                                        brokerService.start();
                                                    }
                                                }


                                                Spring整合ActiveMQ

                                                gitee地址:https://gitee.com/mantianx/Mq_Sub/tree/master/ActiveMq_Spring

                                                SpringBoot整合ActiveMQ【Queue】

                                                gitee地址:https://gitee.com/mantianx/Mq_Sub/tree/master/ActiveMq_SpringBoot

                                                相关实践学习
                                                RocketMQ一站式入门使用
                                                从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
                                                消息队列 MNS 入门课程
                                                1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
                                                相关文章
                                                |
                                                7月前
                                                |
                                                消息中间件 存储 网络协议
                                                我们一起来学RabbitMQ 五:RabbitMQ 应知应会的面试题
                                                我们一起来学RabbitMQ 五:RabbitMQ 应知应会的面试题
                                                |
                                                消息中间件 Java Linux
                                                activeMQ入门安装
                                                activeMQ入门安装
                                                214 0
                                                activeMQ入门安装
                                                |
                                                消息中间件 SQL 存储
                                                超详细的RabbitMQ入门,看这篇就够了!
                                                RabbitMQ入门,看这篇就够了
                                                187498 58
                                                |
                                                7月前
                                                |
                                                消息中间件 Java Linux
                                                消息中间件系列教程(02) -ActiveMQ -安装&入门案例
                                                消息中间件系列教程(02) -ActiveMQ -安装&入门案例
                                                35 0
                                                |
                                                消息中间件 存储 网络协议
                                                爆肝3万字,为你吃透RabbitMQ,最详细的RabbitMQ讲解(VIP典藏版)
                                                早在之前就了解到了消息中间件,但是一直没有系统的学习,最近花了一段时间系统学习了当下最为主流的 RabbitMQ 消息队列,学习过程中也随时记录,刚开始学习的时候懵懵懂懂,做的笔记都比较杂乱,系统学习完后我将笔记内容不断反复修改,对章节进行设计调整,最终整合出了以下好理解、案例多、超详细的 RabbitMQ 学习笔记,希望能帮到大家~
                                                爆肝3万字,为你吃透RabbitMQ,最详细的RabbitMQ讲解(VIP典藏版)
                                                |
                                                消息中间件 网络协议
                                                【精华】RabbitMQ整理(30分钟读完)
                                                RabbitMQ的组成部分,Exchange的4种类型,RabbitMQ的6种工作模式,相关概念:死信队列。
                                                106 1
                                                【精华】RabbitMQ整理(30分钟读完)
                                                |
                                                消息中间件 存储 缓存
                                                RocketMQ最新版源码剖析注释笔记 关注我的技术博客:
                                                RocketMQ最新版源码剖析注释笔记 关注我的技术博客:
                                                216 0
                                                |
                                                消息中间件 存储 网络协议
                                                常见的RabbitMQ实战居然还能这样搞?
                                                常见的消息队列很多,主要包括 RabbitMQ、Kafka、RocketMQ 和 ActiveMQ,相关的选型可以看我之前的系列, 这篇文章只讲 RabbitMQ,先讲原理,后搞实战。 文章很长,如果你能一次性看完,“大哥,请收下我的膝盖”,建议大家先收藏,啥时需要面试,或者工作中遇到了,可以再慢慢看。 不 BB,直接上思维导图:
                                                |
                                                消息中间件 存储 自然语言处理
                                                RabbitMQ入门知识整理
                                                RabbitMQ入门知识整理
                                                172 0
                                                RabbitMQ入门知识整理
                                                |
                                                消息中间件 Java API
                                                ActiveMQ入门案例
                                                ActiveMQ入门案例
                                                ActiveMQ入门案例