ActiveMQ系列:JMS的可靠性

简介: 持久化消息这是队列的的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。

ActiveMQ系列:JMS的可靠性


前面有介绍与基础部分,有兴趣的可以移步:


初步认识了ActiveMQ:https://blog.csdn.net/qq_26975307/article/details/98875098


结合JavaSE进行初尝试:https://blog.csdn.net/qq_26975307/article/details/98968854


详细讲讲JMS:https://blog.csdn.net/qq_26975307/article/details/99408962


此篇来讲讲JMS的可靠性


1、JMS的可靠性


JMS的可靠性在于三点:


   1.1、PERSISTENT:持久性


   1.2、事务


   1.1、Acknowledge:签收


1.1、PERSISTENT:持久性


非持久:


messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);


非持久化:当服务器宕机,消息不存在。


持久:

messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

持久化:当服务器宕机,消息依然存在。


1.2、事务


1、producer提交时的事务


   false:只要执行send,就进入到队列中。关闭事务,那第2个签收参数的设置需要有效


   true:先执行send再执行commit,消息才被真正的提交到队列中消息需要批量发送,需要缓冲区处理


2、在事务性会话中,当一个事务被成功提交则消息被自动签收。


     如果事务回滚,则消息会被再次传送。


3、非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)


4、生产事务开启,只有commit后才能将全部消息变为已消费


5、事务偏生产者


6、签收偏消费者


1.3、结论


持久化消息这是队列的的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。


可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。


2、ActiveMQ 结合 JavaSE 的事务栗子


2.1、消息生产者


package com.phubing;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 *
 */
public class JmsProduce_TX {
    public static final String ACTIVEMQ_URL =  "tcp://192.168.177.130:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws Exception{
        //1、创建链接工厂,按照给定的URL地址,采用默认用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2、通过链接工厂,获得链接connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3、创建session会话
        /**
         * 两个参数: 1、事务;2、签收
         *
         */
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        //4、创建目的地(具体是队列,还是Topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5、创建消息的生产者
        MessageProducer producer = session.createProducer(queue);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        //6、通过使用消息生产者生产消息并发送到MQ的队列中
        for(int i=1;i<=3;i++){
            //7、创建消息
            //可以理解为最简单的字符串
            TextMessage textMessage = session.createTextMessage("msg-" + i);
            /**
             * String消息设置属性
            textMessage.setStringProperty("c01", "vip");
             */
            /**
             * 在每一条消息发送之前,可以设置消息的请求头属性
            MapMessage mapMessage = session.createMapMessage();
            mapMessage.setString("map-k1", "map-v1");
            producer.send(mapMessage);
             */
            //8、通过消息生产者发送给MQ
            producer.send(textMessage);
        }
        //在Session关闭之前先commit
        session.commit();
        //9、关闭资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("消息发送到MQ完成********");
    }
}


顺带一提:


对于生产者而言,事务的开启和关闭是不一样的,如果为true,一定要在生产者关闭之前commit,统一批处理提交


统一批处理不是多此一举?


commit有点类似数据库的业务数据提交,根据事务的特性,要么一起成功,要么一起失败

假如其中某条消息挂了,到try代码块中的时候,本次session的数据不提交,可以回滚,容错机制保证了程序的高可用。


2.2、消息消费者


package com.phubing;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * @ClassName JmsConsumer_Queue
 * @Description TODO
 * @Author phubing
 * @Date 2019-07-24 21:42
 * @Version 1.0
 **/
public class JmsConsumer_TX {
    public static final String ACTIVEMQ_URL =  "tcp://192.168.177.130:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws Exception {
        //1、创建链接工厂,按照给定的URL地址,采用默认用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2、通过链接工厂,获得链接connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3、创建session会话
        /**
         * 两个参数: 1、事务;2、签收
         *
         */
        //Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //客户端需要调用ack方法手动确认消费
        //Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
        //类似带副本的消息:不同消费者查看该消息时可能存在状态不一致的情况,此时允许消息重复签收(实际用的比较少)
        Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
        //4、创建目的地(具体是队列,还是Topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5、创建消费者
        MessageConsumer consumer = session.createConsumer(queue);
        /**
         * 通过监听方式来消费消息
         *
         */
        //如果是接口,则可以直接new ,使用其匿名内部类
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if(null != message && message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("监听到的消息:"+textMessage.getText());
                        System.out.println("监听到的属性消息:"+textMessage.getStringProperty("c01"));
                        textMessage.acknowledge();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }else{
                }
                /*
                if(null != message && message instanceof MapMessage){
                    MapMessage mapMessage = (MapMessage) message;
                    try {
                        System.out.println("监听到的Map消息:"+mapMessage.getString("map-k1"));
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }else{
                }
                */
            }
        });
        //press any key to exit
        System.in.read();
        /*
        *
        //6、接收消息
        while(true){
            //若consumer.receive()不指定超时时间,则会一直等待直到有消息进来
            TextMessage message = (TextMessage) consumer.receive(3000L);
            if(null != message){
                System.out.println("消费者接收到消息:"+message.getText());
            }else {
                break;
            }
        }
        */
        session.commit();
        consumer.close();
        session.close();
        connection.close();
    }
}


问:MQ挂了,那么消息的持久化和丢失情况分别如何?

答:看是否设置消息持久化



问:消息默认的持久化模式

答:默认使用持久化



问:对于消费着而言,事务的开启与否又有何区别呢?

答: 开启了事务之后,没有commit,则MQ会认为已消费的消息并没有被消费,继续运行仍然会重复消费(消息重复消费异常)


2.3、消息签收的方式


自动签收(默认):Session.AUTO_ACKNOWL EDGE


手动签收:Session.CLIENT_ACKNOWL EDGE(客户端调用acknowledge方法手动签收)- message.acknowledge();


允许重复消费:Session.DUPS_OK ACKNOWLEDGE(需求不大)



3、不是总结的总结


3.1、点对点(Queue)


点对点模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。


和发送短信类似。


1:如果在Session关闭时有部分消息已被收到但还没有被签收(acknowledged),那当消费者下次连接到相同的队列时,


     这些消息还会被再次接收


2:队列可以长久地保存消息直到消费者收到消息。消费者不需要因为担心消息会丢失而时刻和队列保持激活的连接状态,


     充分体现了异步传输模式的优势


3.2、发布订阅(Topic)


JMSPub/Sub 模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作topic主题可以被认为是消息的传输中介,


发布者( publisher )发布消息到主题,订阅者( subscribe )从主题订阅消息。


主题使得消息订阅者和消息发布者保持互相独立,不需要接触即可保证消息的传送。


3.3、非持久订阅


非持久订阅只有当客户端处于激活状态,也就是和MQ保持连接状态才能收到发送到某个主题的消息。


如果消费者处于离线状态,生产者发送的主题消息将会丢失作废,消费者永远不会收到。


一句话:先要订阅注册才能接受到发布,只给订阅者发布消息。


3.4、持久订阅


   客户端首先向MQ注册一个自己的身份ID识别号,当这个客户端处于离线时,生产者会为这个ID保存所有发送到主题的消息,当客户再次连接到MQ时会根据消费者的ID得到所有当自己处于离线时发送到主题的消息。


   非持久订阅状态下,不能恢复或重新派送一个未签收的消息。


   持久订阅才能恢复或重新派送一个未签收的消息。


3.5、用哪个?


当所有的消息必须被接收,则用持久订阅。


当丢失消息能够被容忍,则用非持久订阅


未完待续......(下篇结合Spring,基于配置文件的使用ActiveMQ)

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
数据处理 Python
Pandas数据处理 | apply() 函数用法指南!
本文介绍一下关于 Pandas 中 apply() 函数的几个常见用法,apply() 函数的自由度较高,可以直接对 Series 或者 DataFrame 中元素进行逐元素遍历操作,方便且高效,具有类似于 Numpy 的特性。
|
7月前
|
传感器 安全 物联网
《分布式软总线:解锁未来柔性电子设备互联新境界》
柔性电子设备正以前所未有的速度融入生活,从可折叠手机到智能穿戴设备,其轻薄便携、可变形特性为人们带来全新体验。然而,设备间互联互通的需求日益迫切,分布式软总线技术应运而生。该技术融合Wi-Fi、蓝牙、NFC等优势,实现设备自发现与自组网,屏蔽通信协议差异,优化数据传输效率。它在智能家居、智能医疗及可穿戴设备领域展现出巨大潜力,助力设备协同工作,提升用户体验。尽管面临兼容性与安全性挑战,未来结合AI、区块链等技术,分布式软总线将推动柔性电子设备进入更智能化、安全化的全新时代。
293 3
|
人工智能 编解码 5G
虚拟现实(VR)与增强现实(AR)的融合:开启全新交互时代
【6月更文挑战第17天】虚拟现实(VR)与增强现实(AR)融合成混合现实(MR),打造全新交互体验。MR结合VR的沉浸感和AR的现实增强,应用于教育、游戏、设计和营销,带来创新教学方式、沉浸式游戏体验和高效设计工具。尽管面临技术挑战,随着5G和AI的发展,MR有望引领未来交互的革命。
|
存储 SQL 关系型数据库
使用MySQL Workbench进行数据库备份
【9月更文挑战第13天】以下是使用MySQL Workbench进行数据库备份的步骤:启动软件后,通过“Database”菜单中的“管理连接”选项配置并选择要备份的数据库。随后,选择“数据导出”,确认导出的数据库及格式(推荐SQL格式),设置存储路径,点击“开始导出”。完成后,可在指定路径找到备份文件,建议定期备份并存储于安全位置。
1628 11
|
安全 搜索推荐 应用服务中间件
Web安全-目录遍历漏洞
Web安全-目录遍历漏洞
558 2
|
机器学习/深度学习 数据可视化 自动驾驶
YOLO11-seg分割:具有切片操作的SimAM注意力,魔改SimAM助力分割
本文创新地对SimAM注意力机制进行魔改,引入切片操作,显著提升了小目标特征提取能力。针对SimAM在计算整张特征图的像素差平均值时可能忽略小目标重要性的问题,通过切片操作增强了小目标的加权效果。实验结果显示,魔改后的SimAM在YOLO11-seg上的Mask mAP50从0.673提升至0.681,有效改善了小目标检测性能。
1205 2
|
Java Apache Maven
将word文档转换成pdf文件方法
在Java中,将Word文档转换为PDF文件可采用多种方法:1) 使用Apache POI和iText库,适合处理基本转换需求;2) Aspose.Words for Java,提供更高级的功能和性能;3) 利用LibreOffice命令行工具,适用于需要开源解决方案的场景。每种方法都有其适用范围,可根据具体需求选择。
|
存储 Kubernetes Docker
Kubernetes(K8S)集群管理Docker容器(概念篇)
Kubernetes(K8S)集群管理Docker容器(概念篇)
|
存储 前端开发 JavaScript
JAVA医院电子病历编辑器系统源码
电子病历编辑器极具灵活性,它既可嵌入到医院HIS系统中,作为内置编辑工具供多个模块使用,也可以独立拿出来,与第三方业务厂商展开合作,为他们提供病历书写功能,充分发挥编辑器的功能。
478 8
|
数据采集 API 开发者
爬虫:闲鱼商品详情数据接口(goodfish.item_get)
闲鱼的`goodfish.item_get`非官方API,其数据接口多为私有,适用于授权合作伙伴。获取商品详情数据可考虑官方合作、网络爬虫(需遵守反爬政策)、第三方API服务或直接联系闲鱼官方。合法合规使用数据至关重要。
1221 0