ActiveMQ系列:高级特性之异步投递、延时投递与定时投递

简介: ActiveMQ 支持同步、异步两种发送的模式将消息发送到 broker,模式的选择对发送延时有巨大的影响。producer 能达到怎样的产出率(产出率 = 发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著的提高发送的性能。

ActiveMQ系列:高级特性之异步投递、延时投递与定时投递


前面有介绍与基础、应用实践部分,有兴趣的可以移步:


初步认识了ActiveMQ:https://blog.csdn.net/qq_26975307/article/details/98875098


结合JavaSE进行初尝试:https://blog.csdn.net/qq_26975307/article/details/98968854


详细讲讲JMS:https://blog.csdn.net/qq_26975307/article/details/99408962


JMS的可靠性:https://phubing.blog.csdn.net/article/details/99412285


结合 Spring,基于配置文件的使用 ActiveMQ:https://phubing.blog.csdn.net/article/details/99413883


结合 SpringBoot,基于 application.xml 使用ActiveMQ:https://blog.csdn.net/qq_26975307/article/details/99415899


ActiveMQ传输协议:https://blog.csdn.net/qq_26975307/article/details/100147542


ActiveMQ 的持久化机制https://blog.csdn.net/qq_26975307/article/details/100161024


基于LevelDB和 Zookeeper 的数据复制集群:https://blog.csdn.net/qq_26975307/article/details/100560821


此篇开始  高级特性之异步投递、延时投递与定时投递,有兴趣可以留言共同探讨


1、本文大纲一览


1.引入消息队列之后该如何保证其高可用性

2.异步投递 Async Sends

3.延迟投递和定时投递

4.分发策略

5.ActiveMQ 消费重试机制

6.死信队列

7.如何保证消息不被重复消费呢?幂等性问题?


引入消息队列之后该如何保证其高可用性?上篇不是说了嘛,集群啊:https://blog.csdn.net/qq_26975307/article/details/100560821


2、异步投递


2.1、官网拜读


脱离官网谈ActiveMQ 都是虾扯蛋http://activemq.apache.org/async-sends


2.1.1、说明(Background)


ActiveMQ supports sending messages to a broker in sync or async mode. The mode used has a huge impact in the latency of the send call. Since latency is typically a huge factor in the throughput that can achieved by producer, using async sends can increase the performance of your system dramatically.


The good news is that ActiveMQ sends message in async mode by default in several cases. It is only in cases where the JMS specification required the use of sync sending that we default to sync sending. The cases that we are forced to send in sync mode are when persistent messages are being sent outside of a transaction.


If you are not using transactions and are sending persistent messages, then each send is synch and blocks until the broker has sent back an acknowledgement to the producer that the message has been safely persisted to disk. This ack provides that guarantee that the message will not be lost but it also costs a huge latency penalty since the client is blocked.


Many high performance applications are designed to be tolerate a small amount of message loss in failure scenarios. If your application has been designed in this fashion, you can enable the use of async sends to increase throughput even when using persistent messages.


(ActiveMQ支持以同步或异步模式向代理发送消息。使用的模式对发送呼叫的延迟有很大影响。由于延迟通常是生产者可以实现的吞吐量的一个重要因素,因此使用异步发送可以显着提高系统性能。


好消息是ActiveMQ在几种情况下默认以异步模式发送消息。只有在JMS规范要求使用同步发送的情况下,我们才会默认同步发送。我们被迫以同步模式发送的情况是在事务之外发送持久性消息的情况。


如果您没有使用事务并且正在发送持久性消息,则每个发送都是同步并阻塞,直到代理向生产者发回确认消息已被安全地保存到磁盘。此ack保证消息不会丢失,但由于客户端被阻止,因此也会造成巨大的延迟损失。


许多高性能应用程序旨在容忍故障情况下的少量消息丢失。如果您的应用程序是以这种方式设计的,则即使使用持久性消息,也可以启用异步发送以提高吞吐量。)


2.1.3、是什么?


   ActiveMQ 支持同步、异步两种发送的模式将消息发送到 broker,模式的选择对发送延时有巨大的影响。producer 能达到怎样的产出率(产出率 = 发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著的提高发送的性能。


   ActiveMQ 默认使用异步发送的模式:除非明确指定使用同步发送的方式或者在未使用事务的前提下发送持久化的消息,这两种情况都是同步发送的。


   如果你没有使用事务且发送的是持久化的消息,每一次发送都是同步发送的且会阻塞 producer 直到 broker 返回一个确认,表示消息己经被安全的持久化到磁盘。确认机制提供了消息安全的保障,但同时会阻塞客户端带来了很大的延时。


   很多高性能的应用,允许在失败的情况下有少量的数据丢失。如果你的应用满足这个特点,你可以使用异步发送来提高生产率,即使发送的是持久化的消息。


   异步投递可以最大化 produer 端的发送效率。我们通常在发送消息量比较密集的情况下使用异步发送,它可以很大的提升 Producer 性能;


   不过这也带来了额外的问题,就是需要消耗较多的 Client 端内存同时也会导致 broker 端性能消耗增加;此外它不能有效的确保消息的发送成功。在 useAsyncSend=true 的情况下客户端需要容忍消息丢失的可能。


像不像当年小学五年级左右的游泳池进出水问题?mmp......


一个游泳池有两条进水管和一条出水管,进水管A的时间为60分钟才能装满池水,进水管B的时间为40分钟才能装满一池水,出水管的时间为50分钟能放完一池水,问三条水管同时使用游泳池的水多久能装满?


2.1.3、为什么?


对于一个slow consumer,使用同步发送消息可能出现 Producer 堵塞等情况,慢消费者适合使用异步发送。


2.1.4、官网推荐配置


   1、Configuring Async Send using a Connection URI(使用连接URI配置异步发送)


cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");

 

2、Configuring Async Send at the ConnectionFactory Level(在ConnectionFactory 级别配置异步发送)


其实就是在代码里面增加 activeMQConnectionFactory.setuseAsyncsend(true);

((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);

3、Configuring Async Send at the Connection Level(在连接级别配置异步发送)


((ActiveMQConnection)connection).setUseAsyncSend(true);


2.1.5、异步发送如何确认发送成功?


   异步发送丢失消息的场景是:生产者设置 UseAsyncSend=true,使用 producer.send(msg)持续发送消息。


   由于消息不阻塞,生产者会认为所有 send 的消息均被成功发送至 MQ。如果 MQ 突然宕机,此时生产者端内存中尚未被发送至 MQ 的消息都会丢失。


   所以,正确的异步方法是需要接收回调的


   同步发送和异步发送的区别就在此,同步发送等send不阻塞了就表示一定发送成功了,异步发送需要接收回执并由客户端再判断一次是否发送成功。


JmsProduce_Asyncsend


package com.phubing.jms_async;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.AsyncCallback;
import javax.jms.*;
import java.util.UUID;
public class JmsProduce_AsynCsend {
    public static final String ACTIVEMQ_URL =  "tcp://192.168.177.130:61616";
    public static final String QUEUE_NAME = "queue-async";
    public static void main(String[] args) throws Exception{
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        activeMQConnectionFactory.setUseAsyncSend(true);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        ActiveMQMessageProducer activeMOMessageProducer = (ActiveMQMessageProducer)session.createProducer(queue);
        TextMessage message = null;
        for(int i=1;i<=3;i++){
            message=session. createTextMessage("message--"+i);
            message. setJMSMessageID(UUID.randomUUID().toString());
            String msgID = message. getJMSMessageID();
            activeMOMessageProducer.send(message, new AsyncCallback() {
                @Override
                public void onSuccess() {
                    System.out.println(msgID + "已经发送成功");
                }
                @Override
                public void onException(JMSException e) {
                    System.out.println(msgID + "发送失败了");
                }
            });
        }
        activeMOMessageProducer.close();
        session.close();
        connection.close();
        System.out.println("*****消息发送到队列完成");
    }
}



消费者与之前的没什么差别,这里就不写了,跑起来,看看控制台 与 Linux


20190908185216177.png


3、延时投递和定时投递


3.1、官网拜读


http://activemq.apache.org/delay-and-schedule-message-delivery.html

ActiveMQ from version 5.4 has an optional persistent scheduler built into the ActiveMQ message broker. It is enabled by setting the broker schedulerSupport attribute to true in the Xml Configuration. An ActiveMQ client can take advantage of a delayed delivery by using the following message properties:


3.2、延时投递和定时投递的四个属性


20190908185823770.png


3.2、Cron表达式 - WIKI百科


https://en.wikipedia.org/wiki/Cron


3.3、撸码看看?


3.3.1、先在 activemq.xml 中配置 schedulerSupport 属性为 true


<broker xmlns="http://activemq.apache.org/schema/core"  brokerName="localhost" dataDirectory="${ activemq. data}" schedulerSupport="true" />


3.3.2、Java代码里面封装的辅助消息类型:ScheduledMessage(代码无差,在生产者中 messageProducer.send(msg) 增加三个)-----实际懒得写,改天有空补上14


message.setLongProperty(ScheduledMessage.AMO_SCHEDULED_DELAY, delay);
message.setLongProperty(ScheduledMes sage.AMO_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduledMessage.AMO_SCHEDULED_REPEAT, repeat);


跑起来,看 ActiveMQ 控制台,就这样。



(未完待续......下篇开始 ActiveMQ 的死信队列与消费重试机制  )

目录
相关文章
|
5月前
|
消息中间件 数据安全/隐私保护 RocketMQ
消息队列 MQ产品使用合集之如何自定义时间间隔
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
3月前
|
消息中间件 存储 负载均衡
现代消息队列与云存储问题之消息队列支持定时消息和延迟队列的问题如何解决
现代消息队列与云存储问题之消息队列支持定时消息和延迟队列的问题如何解决
|
4月前
|
消息中间件 存储 RocketMQ
消息队列 MQ使用问题之进行超过3天的延迟消息投递,采用多次投递的策略是否有风险
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 网络性能优化 RocketMQ
消息队列 MQ产品使用合集之本地事务还没有执行完就触发了回查是什么导致的
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
6月前
|
消息中间件 存储 RocketMQ
RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息
RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息
63 0
|
消息中间件 弹性计算 网络安全
消息队列RocketMQ版:定时消息通知功能体验
本实验场景介绍消息队列RocketMQ版的定时(延时)消息收发功能,体验发送若干条自定义延迟触发的消息,观察消息是否按照预期的投递时间投递。
|
消息中间件 Kafka
MQ 学习日志(八) 消息队列的延时以及过期失效问题处理
消息队列的延时以及过期失效问题处理
309 0
|
消息中间件 Shell RocketMQ
RocketMQ进阶-延时消息
RocketMQ进阶-延时消息
1116 0
|
消息中间件 存储 算法
RocketMQ 消息集成:多类型业务消息——定时消息
本篇将继续业务消息集成的场景,从使用场景、应用案例、功能原理以及最佳实践等角度介绍 RocketMQ 的定时消息功能。
466 0
RocketMQ  消息集成:多类型业务消息——定时消息
|
消息中间件 存储 算法
【视频】定时消息 | 学习笔记
快速学习【视频】定时消息
138 0
【视频】定时消息 | 学习笔记