ActiveMQ系列:高级特性之异步投递、延时投递与定时投递
前面有介绍与基础、应用实践部分,有兴趣的可以移步:
初步认识了ActiveMQ:https://blog.csdn.net/qq_26975307/article/details/98875098
结合JavaSE进行初尝试:https://blog.csdn.net/qq_26975307/article/details/98968854
详细讲讲JMS:https://blog.csdn.net/qq_26975307/article/details/99408962
JMS的可靠性:https://phubing.blog.csdn.net/article/details/99412285
结合 Spring,基于配置文件的使用 ActiveMQ:https://phubing.blog.csdn.net/article/details/99413883
结合 SpringBoot,基于 application.xml 使用ActiveMQ:https://blog.csdn.net/qq_26975307/article/details/99415899
ActiveMQ传输协议:https://blog.csdn.net/qq_26975307/article/details/100147542
ActiveMQ 的持久化机制:https://blog.csdn.net/qq_26975307/article/details/100161024
基于LevelDB和 Zookeeper 的数据复制集群:https://blog.csdn.net/qq_26975307/article/details/100560821
此篇开始 高级特性之异步投递、延时投递与定时投递,有兴趣可以留言共同探讨
1、本文大纲一览
1.引入消息队列之后该如何保证其高可用性
2.异步投递 Async Sends
3.延迟投递和定时投递
4.分发策略
5.ActiveMQ 消费重试机制
6.死信队列
7.如何保证消息不被重复消费呢?幂等性问题?
引入消息队列之后该如何保证其高可用性?上篇不是说了嘛,集群啊:https://blog.csdn.net/qq_26975307/article/details/100560821
2、异步投递
2.1、官网拜读
脱离官网谈ActiveMQ 都是虾扯蛋:http://activemq.apache.org/async-sends
2.1.1、说明(Background)
ActiveMQ supports sending messages to a broker in sync or async mode. The mode used has a huge impact in the latency of the send call. Since latency is typically a huge factor in the throughput that can achieved by producer, using async sends can increase the performance of your system dramatically.
The good news is that ActiveMQ sends message in async mode by default in several cases. It is only in cases where the JMS specification required the use of sync sending that we default to sync sending. The cases that we are forced to send in sync mode are when persistent messages are being sent outside of a transaction.
If you are not using transactions and are sending persistent messages, then each send is synch and blocks until the broker has sent back an acknowledgement to the producer that the message has been safely persisted to disk. This ack provides that guarantee that the message will not be lost but it also costs a huge latency penalty since the client is blocked.
Many high performance applications are designed to be tolerate a small amount of message loss in failure scenarios. If your application has been designed in this fashion, you can enable the use of async sends to increase throughput even when using persistent messages.
(ActiveMQ支持以同步或异步模式向代理发送消息。使用的模式对发送呼叫的延迟有很大影响。由于延迟通常是生产者可以实现的吞吐量的一个重要因素,因此使用异步发送可以显着提高系统性能。
好消息是ActiveMQ在几种情况下默认以异步模式发送消息。只有在JMS规范要求使用同步发送的情况下,我们才会默认同步发送。我们被迫以同步模式发送的情况是在事务之外发送持久性消息的情况。
如果您没有使用事务并且正在发送持久性消息,则每个发送都是同步并阻塞,直到代理向生产者发回确认消息已被安全地保存到磁盘。此ack保证消息不会丢失,但由于客户端被阻止,因此也会造成巨大的延迟损失。
许多高性能应用程序旨在容忍故障情况下的少量消息丢失。如果您的应用程序是以这种方式设计的,则即使使用持久性消息,也可以启用异步发送以提高吞吐量。)
2.1.3、是什么?
ActiveMQ 支持同步、异步两种发送的模式将消息发送到 broker,模式的选择对发送延时有巨大的影响。producer 能达到怎样的产出率(产出率 = 发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著的提高发送的性能。
ActiveMQ 默认使用异步发送的模式:除非明确指定使用同步发送的方式或者在未使用事务的前提下发送持久化的消息,这两种情况都是同步发送的。
如果你没有使用事务且发送的是持久化的消息,每一次发送都是同步发送的且会阻塞 producer 直到 broker 返回一个确认,表示消息己经被安全的持久化到磁盘。确认机制提供了消息安全的保障,但同时会阻塞客户端带来了很大的延时。
很多高性能的应用,允许在失败的情况下有少量的数据丢失。如果你的应用满足这个特点,你可以使用异步发送来提高生产率,即使发送的是持久化的消息。
异步投递可以最大化 produer 端的发送效率。我们通常在发送消息量比较密集的情况下使用异步发送,它可以很大的提升 Producer 性能;
不过这也带来了额外的问题,就是需要消耗较多的 Client 端内存同时也会导致 broker 端性能消耗增加;此外它不能有效的确保消息的发送成功。在 useAsyncSend=true 的情况下客户端需要容忍消息丢失的可能。
像不像当年小学五年级左右的游泳池进出水问题?mmp......
一个游泳池有两条进水管和一条出水管,进水管A的时间为60分钟才能装满池水,进水管B的时间为40分钟才能装满一池水,出水管的时间为50分钟能放完一池水,问三条水管同时使用游泳池的水多久能装满?
2.1.3、为什么?
对于一个slow consumer,使用同步发送消息可能出现 Producer 堵塞等情况,慢消费者适合使用异步发送。
2.1.4、官网推荐配置
1、Configuring Async Send using a Connection URI(使用连接URI配置异步发送)
cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
2、Configuring Async Send at the ConnectionFactory Level(在ConnectionFactory 级别配置异步发送)
其实就是在代码里面增加 activeMQConnectionFactory.setuseAsyncsend(true);
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
3、Configuring Async Send at the Connection Level(在连接级别配置异步发送)
((ActiveMQConnection)connection).setUseAsyncSend(true);
2.1.5、异步发送如何确认发送成功?
异步发送丢失消息的场景是:生产者设置 UseAsyncSend=true,使用 producer.send(msg)持续发送消息。
由于消息不阻塞,生产者会认为所有 send 的消息均被成功发送至 MQ。如果 MQ 突然宕机,此时生产者端内存中尚未被发送至 MQ 的消息都会丢失。
所以,正确的异步方法是需要接收回调的
同步发送和异步发送的区别就在此,同步发送等send不阻塞了就表示一定发送成功了,异步发送需要接收回执并由客户端再判断一次是否发送成功。
JmsProduce_Asyncsend
package com.phubing.jms_async; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQMessageProducer; import org.apache.activemq.AsyncCallback; import javax.jms.*; import java.util.UUID; public class JmsProduce_AsynCsend { public static final String ACTIVEMQ_URL = "tcp://192.168.177.130:61616"; public static final String QUEUE_NAME = "queue-async"; public static void main(String[] args) throws Exception{ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); activeMQConnectionFactory.setUseAsyncSend(true); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(QUEUE_NAME); ActiveMQMessageProducer activeMOMessageProducer = (ActiveMQMessageProducer)session.createProducer(queue); TextMessage message = null; for(int i=1;i<=3;i++){ message=session. createTextMessage("message--"+i); message. setJMSMessageID(UUID.randomUUID().toString()); String msgID = message. getJMSMessageID(); activeMOMessageProducer.send(message, new AsyncCallback() { @Override public void onSuccess() { System.out.println(msgID + "已经发送成功"); } @Override public void onException(JMSException e) { System.out.println(msgID + "发送失败了"); } }); } activeMOMessageProducer.close(); session.close(); connection.close(); System.out.println("*****消息发送到队列完成"); } }
消费者与之前的没什么差别,这里就不写了,跑起来,看看控制台 与 Linux
3、延时投递和定时投递
3.1、官网拜读
http://activemq.apache.org/delay-and-schedule-message-delivery.html
ActiveMQ from version 5.4 has an optional persistent scheduler built into the ActiveMQ message broker. It is enabled by setting the broker schedulerSupport attribute to true in the Xml Configuration. An ActiveMQ client can take advantage of a delayed delivery by using the following message properties:
3.2、延时投递和定时投递的四个属性
3.2、Cron表达式 - WIKI百科
https://en.wikipedia.org/wiki/Cron
3.3、撸码看看?
3.3.1、先在 activemq.xml 中配置 schedulerSupport 属性为 true
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${ activemq. data}" schedulerSupport="true" />
3.3.2、Java代码里面封装的辅助消息类型:ScheduledMessage(代码无差,在生产者中 messageProducer.send(msg) 增加三个)-----实际懒得写,改天有空补上14
message.setLongProperty(ScheduledMessage.AMO_SCHEDULED_DELAY, delay); message.setLongProperty(ScheduledMes sage.AMO_SCHEDULED_PERIOD, period); message.setIntProperty(ScheduledMessage.AMO_SCHEDULED_REPEAT, repeat);
跑起来,看 ActiveMQ 控制台,就这样。
(未完待续......下篇开始 ActiveMQ 的死信队列与消费重试机制 )