ActiveMQ消息可靠性-持久性

简介: 三个方面保证消息的可靠性  1.消息的持久  2.事物  3.签收

三个方面保证消息的可靠性


  1.消息的持久


  2.事物


  3.签收


一:PERSISTENT:持久性


    参数说明:1.持久


         2.非持久



  Java里面设置持久化和非持久



  持久:


    将持久性设置为持久



    宕机前,数据正常,未被消费



    服务器恢复后,数据仍然存在,未被消费的消息为3 



  非持久


    设置为非持久



    宕机前



    服务器恢复后,消息全部丢失



  上面例子指定了持久化和非持久化,如果不指定的话默认为持久化


  以上是针对队列模式(queue)


  主题模式(Topic)


    主题模式的持久化是针对订阅者,因为订阅模式下生产者将消息发送出去就啥也不管了,如果没有订阅者,消息就等于是废消息,一点意义也没有,所以应该在消费者端进行持久化


    实验:


       1.首先消费者先订阅:如图一个订阅者(在线)



    代码:


package com.steak.activemq.test;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class Consumer {
private static final StringACTIVE_URL ="tcp://127.0.0.1:61616";
    private static final StringQUEUE ="topic_persist";
    public static void main(String[] args)throws JMSException, IOException {
//创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory =new ActiveMQConnectionFactory(ACTIVE_URL);
        //通过连接工厂,获得连接
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.setClientID("刘牌");//订阅者
        //创建session,第一个参数叫事物,第二个叫签收
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        //创建目的地
        Topic topic = session.createTopic(QUEUE);
        //持久订阅者
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark");
        connection.start();
        Message message = topicSubscriber.receive();
        while (null != message){
TextMessage textMessage = (TextMessage) message;
            System.out.println("topic持久化的消息  "+textMessage.getText());
            //如果1秒钟以后收不到消息,自动断开,相当于取关
            message = topicSubscriber.receive(1000L);
        }
session.close();
        connection.close();
    }
}


     2.然后启动生产者(发布者):此时发布了三条,订阅者收到了三条



package com.steak.activemq.test;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Producer {
private static final StringACTIVE_URL ="tcp://127.0.0.1:61616";
    private static final StringQUEUE ="topic_persist";
    public static void main(String[] args)throws JMSException {
//创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory =new ActiveMQConnectionFactory(ACTIVE_URL);
        //通过连接工厂,获得连接
        Connection connection = activeMQConnectionFactory.createConnection();
        //创建session
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        //创建目的地
        Topic topic = session.createTopic(QUEUE);
        //创建消息的生产者
        MessageProducer messageProducer = session.createProducer(topic);
            connection.start();
        //通过使用messageProducer生产消息发送到MQ队列里
        for (int i =0 ; i <3 ; i++){
//创建消息
            TextMessage textMessage = session.createTextMessage("消息 "+i);
            //通过messageProducer发送消息
            messageProducer.send(textMessage);
        }
//关闭资源
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("消息发送完成");
    }
}


  因为我们设置了1秒钟过后如果收不到消息就断开连接,所以消费者从在线变为离线




    如果设置为receive(),则一直监听(相当于微信公众号一直都关注,一直都能收到消息)


  无论消费者是否在线,都会接收到,不在线的话,下次连接的时候,会把没有收到的消息都接收过来(相当于我取关了,不能收到消息,但是我再此关注,我也能把我取关的这段时间的消息都收到),前提时注册过一次,第一注册以前的消息肯定是收不到

目录
相关文章
|
7月前
|
消息中间件 存储 运维
|
26天前
|
消息中间件
使用RabbitMQ如何保证消息不丢失 ?
RabbitMQ通过发布者确认、回执机制、消息持久化及消费者确认等方案,确保消息从发送到接收的每个环节都能有效防止丢失。即便如此,特殊情况下仍可能丢失,如系统故障等。为此,可设计消息状态表,记录消息ID、内容、交换机、路由键、发送时间和签收状态等,结合定时任务检查并重发未签收消息,以进一步提升消息传输的可靠性。
46 1
|
4月前
|
消息中间件 存储 运维
RabbitMQ-消息消费时的可靠性保障
将这些实践融入到消息消费的处理逻辑中,可以很大程度上保障RabbitMQ中消息消费的可靠性,确保消息系统的稳定性和数据的一致性。这些措施的实施,需要在系统的设计和开发阶段充分考虑,以及在后续的维护过程中不断的调整和完善。
64 0
|
5月前
|
消息中间件 Kafka 索引
微服务数据问题之Broker宕机MetaQ保证数据的可靠性如何解决
微服务数据问题之Broker宕机MetaQ保证数据的可靠性如何解决
|
5月前
|
消息中间件 存储 Kafka
深入Kafka:如何保证数据一致性与可靠性?
**Kafka一致性详解:** 讲解了幂等性如何通过ProducerID和SequenceNumber确保消息唯一,防止重复处理,维持数据一致性。Kafka利用Zookeeper进行控制器和分区Leader选举,应对节点变动,防止脑裂,确保高可用性。实例中,电商平台用Kafka处理订单,保证每个订单仅处理一次,即使在异常情况下。关注微信公众号“软件求生”获取更多技术内容。
849 0
|
6月前
|
消息中间件 Kafka API
深入解析Kafka消息传递的可靠性保证机制
深入解析Kafka消息传递的可靠性保证机制
89 0
|
消息中间件 存储 Kafka
如何保证MQ中消息的可靠性传输?
如何保证MQ中消息的可靠性传输?
118 1
|
7月前
|
消息中间件 Kafka
面试官:你说说Kafka是怎么保证消息可靠性的
面试官:那要是Kafka消费堆积了怎么办。每个topic是分为多个分区给不同Broker处理,要合理分配分区数量来提高Broker的消息处理能力。比如3个Broker2个分区,可以改为3个Broker3个分区
105 1
面试官:你说说Kafka是怎么保证消息可靠性的
|
消息中间件 存储 Java
RabbitMQ如何保证消息的可靠性
RabbitMQ如何保证消息的可靠性
100 0
|
消息中间件
如何保证消息的可靠性,避免消息丢失
如何保证消息的可靠性,避免消息丢失
109 0