ActiveMQ系列:JMS的可靠性

简介: 持久化消息这是队列的的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。

ActiveMQ系列:JMS的可靠性


前面有介绍与基础部分,有兴趣的可以移步:


初步认识了ActiveMQ:https://blog.csdn.net/qq_26975307/article/details/98875098


结合JavaSE进行初尝试:https://blog.csdn.net/qq_26975307/article/details/98968854


详细讲讲JMS:https://blog.csdn.net/qq_26975307/article/details/99408962


此篇来讲讲JMS的可靠性


1、JMS的可靠性


JMS的可靠性在于三点:


   1.1、PERSISTENT:持久性


   1.2、事务


   1.1、Acknowledge:签收


1.1、PERSISTENT:持久性


非持久:


messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);


非持久化:当服务器宕机,消息不存在。


持久:

messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

持久化:当服务器宕机,消息依然存在。


1.2、事务


1、producer提交时的事务


   false:只要执行send,就进入到队列中。关闭事务,那第2个签收参数的设置需要有效


   true:先执行send再执行commit,消息才被真正的提交到队列中消息需要批量发送,需要缓冲区处理


2、在事务性会话中,当一个事务被成功提交则消息被自动签收。


     如果事务回滚,则消息会被再次传送。


3、非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)


4、生产事务开启,只有commit后才能将全部消息变为已消费


5、事务偏生产者


6、签收偏消费者


1.3、结论


持久化消息这是队列的的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。


可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。


2、ActiveMQ 结合 JavaSE 的事务栗子


2.1、消息生产者


package com.phubing;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 *
 */
public class JmsProduce_TX {
    public static final String ACTIVEMQ_URL =  "tcp://192.168.177.130:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws Exception{
        //1、创建链接工厂,按照给定的URL地址,采用默认用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2、通过链接工厂,获得链接connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3、创建session会话
        /**
         * 两个参数: 1、事务;2、签收
         *
         */
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        //4、创建目的地(具体是队列,还是Topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5、创建消息的生产者
        MessageProducer producer = session.createProducer(queue);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        //6、通过使用消息生产者生产消息并发送到MQ的队列中
        for(int i=1;i<=3;i++){
            //7、创建消息
            //可以理解为最简单的字符串
            TextMessage textMessage = session.createTextMessage("msg-" + i);
            /**
             * String消息设置属性
            textMessage.setStringProperty("c01", "vip");
             */
            /**
             * 在每一条消息发送之前,可以设置消息的请求头属性
            MapMessage mapMessage = session.createMapMessage();
            mapMessage.setString("map-k1", "map-v1");
            producer.send(mapMessage);
             */
            //8、通过消息生产者发送给MQ
            producer.send(textMessage);
        }
        //在Session关闭之前先commit
        session.commit();
        //9、关闭资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("消息发送到MQ完成********");
    }
}


顺带一提:


对于生产者而言,事务的开启和关闭是不一样的,如果为true,一定要在生产者关闭之前commit,统一批处理提交


统一批处理不是多此一举?


commit有点类似数据库的业务数据提交,根据事务的特性,要么一起成功,要么一起失败

假如其中某条消息挂了,到try代码块中的时候,本次session的数据不提交,可以回滚,容错机制保证了程序的高可用。


2.2、消息消费者


package com.phubing;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * @ClassName JmsConsumer_Queue
 * @Description TODO
 * @Author phubing
 * @Date 2019-07-24 21:42
 * @Version 1.0
 **/
public class JmsConsumer_TX {
    public static final String ACTIVEMQ_URL =  "tcp://192.168.177.130:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws Exception {
        //1、创建链接工厂,按照给定的URL地址,采用默认用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2、通过链接工厂,获得链接connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3、创建session会话
        /**
         * 两个参数: 1、事务;2、签收
         *
         */
        //Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //客户端需要调用ack方法手动确认消费
        //Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
        //类似带副本的消息:不同消费者查看该消息时可能存在状态不一致的情况,此时允许消息重复签收(实际用的比较少)
        Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
        //4、创建目的地(具体是队列,还是Topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5、创建消费者
        MessageConsumer consumer = session.createConsumer(queue);
        /**
         * 通过监听方式来消费消息
         *
         */
        //如果是接口,则可以直接new ,使用其匿名内部类
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if(null != message && message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("监听到的消息:"+textMessage.getText());
                        System.out.println("监听到的属性消息:"+textMessage.getStringProperty("c01"));
                        textMessage.acknowledge();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }else{
                }
                /*
                if(null != message && message instanceof MapMessage){
                    MapMessage mapMessage = (MapMessage) message;
                    try {
                        System.out.println("监听到的Map消息:"+mapMessage.getString("map-k1"));
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }else{
                }
                */
            }
        });
        //press any key to exit
        System.in.read();
        /*
        *
        //6、接收消息
        while(true){
            //若consumer.receive()不指定超时时间,则会一直等待直到有消息进来
            TextMessage message = (TextMessage) consumer.receive(3000L);
            if(null != message){
                System.out.println("消费者接收到消息:"+message.getText());
            }else {
                break;
            }
        }
        */
        session.commit();
        consumer.close();
        session.close();
        connection.close();
    }
}


问:MQ挂了,那么消息的持久化和丢失情况分别如何?

答:看是否设置消息持久化



问:消息默认的持久化模式

答:默认使用持久化



问:对于消费着而言,事务的开启与否又有何区别呢?

答: 开启了事务之后,没有commit,则MQ会认为已消费的消息并没有被消费,继续运行仍然会重复消费(消息重复消费异常)


2.3、消息签收的方式


自动签收(默认):Session.AUTO_ACKNOWL EDGE


手动签收:Session.CLIENT_ACKNOWL EDGE(客户端调用acknowledge方法手动签收)- message.acknowledge();


允许重复消费:Session.DUPS_OK ACKNOWLEDGE(需求不大)



3、不是总结的总结


3.1、点对点(Queue)


点对点模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。


和发送短信类似。


1:如果在Session关闭时有部分消息已被收到但还没有被签收(acknowledged),那当消费者下次连接到相同的队列时,


     这些消息还会被再次接收


2:队列可以长久地保存消息直到消费者收到消息。消费者不需要因为担心消息会丢失而时刻和队列保持激活的连接状态,


     充分体现了异步传输模式的优势


3.2、发布订阅(Topic)


JMSPub/Sub 模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作topic主题可以被认为是消息的传输中介,


发布者( publisher )发布消息到主题,订阅者( subscribe )从主题订阅消息。


主题使得消息订阅者和消息发布者保持互相独立,不需要接触即可保证消息的传送。


3.3、非持久订阅


非持久订阅只有当客户端处于激活状态,也就是和MQ保持连接状态才能收到发送到某个主题的消息。


如果消费者处于离线状态,生产者发送的主题消息将会丢失作废,消费者永远不会收到。


一句话:先要订阅注册才能接受到发布,只给订阅者发布消息。


3.4、持久订阅


   客户端首先向MQ注册一个自己的身份ID识别号,当这个客户端处于离线时,生产者会为这个ID保存所有发送到主题的消息,当客户再次连接到MQ时会根据消费者的ID得到所有当自己处于离线时发送到主题的消息。


   非持久订阅状态下,不能恢复或重新派送一个未签收的消息。


   持久订阅才能恢复或重新派送一个未签收的消息。


3.5、用哪个?


当所有的消息必须被接收,则用持久订阅。


当丢失消息能够被容忍,则用非持久订阅


未完待续......(下篇结合Spring,基于配置文件的使用ActiveMQ)

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3天前
|
消息中间件 存储 数据库
RabbitMQ之MQ的可靠性
RabbitMQ之MQ的可靠性
|
3天前
|
消息中间件 SQL Java
RabbitMQ之消费者可靠性
RabbitMQ之消费者可靠性
|
3天前
|
消息中间件 安全 Java
SpringBoot基于RabbitMQ实现消息可靠性
SpringBoot基于RabbitMQ实现消息可靠性
38 0
|
3天前
|
消息中间件
RabbitMQ之发送者(生产者)可靠性
RabbitMQ之发送者(生产者)可靠性
|
9月前
|
消息中间件 Java API
可靠消息传递的选择:深入了解 Apache ActiveMQ 消息队列
在分布式系统中,可靠的消息传递是实现异步通信、解耦和数据同步的关键。Apache ActiveMQ,作为一款开源的消息队列系统,为开发者提供了一个强大的工具来实现可靠的消息传递。本文将为您详细介绍 Apache ActiveMQ 的核心概念、特性以及在分布式架构中的应用。
117 0
|
消息中间件 机器学习/深度学习 XML
ActiveMQ系列:ActiveMQ传输协议
ActiveMQ支持的client-broker通讯协议有:TCP、NIO、UDP、SSL、Http(s)、VM。其中配置 Transport Connector 的文件在activeMQ安装目录的 conf/activemq.xml 中的 &lt;transportConnectors&gt; 标签之内。
232 0
ActiveMQ系列:ActiveMQ传输协议
|
11月前
|
消息中间件 测试技术
RabbitMQ中的消息可靠性问题(上)
RabbitMQ中的消息可靠性问题(上)
104 0
|
11月前
|
消息中间件 安全 Java
RabbitMQ中的消息可靠性问题(下)
RabbitMQ中的消息可靠性问题(下)
85 0
|
消息中间件 存储 NoSQL
springcloud:springboot整合RabbitMQ|RabbitMQ保证消息可靠性(三)
上一章我们讲解了rabbitmq的四种交换机类型、七种通讯方式。本章我们将整合springboot来向大家完整演示rabbitmq的使用,并说明如何保证消息的可靠性。
435 0
springcloud:springboot整合RabbitMQ|RabbitMQ保证消息可靠性(三)
|
消息中间件 安全 Java
03 RabbitMQ之消息可靠性问题
消息从发送,到消费者接收,会经历多个过程