谁让你再使用cron发送延时消息,你直接给他一jio!(文末送书)

简介: 谁让你再使用cron发送延时消息,你直接给他一jio!(文末送书)

前言


在开发中经常会遇到延时任务的需求,例如在12306购买车票,若生成订单30分钟未支付则自动取消;还有在线商城完成订单后48小时不评价 ,自动5星好评。像这类在某事件触发后一段时间内执行的需求任务我们称之为 延时任务

那么如何实现延迟任务呢?

第一反应是利用cron方案来实现:

启动一个cron定时任务,每隔一段时间执行一次,比如30分钟,找到那些超时的数据,直接更新状态,或者拿出来执行一些操作。如果数据量比较大,需要分页查询,分页update,这将是一个for循环更新操作。

cron方案是很常见的一种方案,但是常见的不一定是最好的,主要有以下几个问题:

  • 当数据量大的时候轮询效率低;
  • 时效性不够好,如果每小时轮询一次,最差的情况时间误差会达到1小时;
  • 如果通过增加cron轮询频率来减少时间误差,则会出现轮询低效和重复计算的问题;

既然cron方案不是很理想,那就请出我们今天的主角,使用RocketMQ的延时消息解决。在创建订单的时候发送一条延时消息到RocketMQ,30分钟后消费者消费消息去检查订单的状态,如果发现订单未支付则取消订单释放库存。


实现


RocketMQ延迟队列的核心思路是:所有的延迟消息由producer发出之后,都会存放到同一个topic(SCHEDULE_TOPIC_XXXX)下,不同的延迟级别会对应不同的队列序号,当延迟时间到之后,由定时线程读取转换为普通的消息存的真实指定的topic下,此时对于consumer端此消息才可见,从而被consumer消费。

注意: RocketMQ不支持任意时间的延时,只支持以下几个固定的延时等级

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

下面我们结合SprintBoot利用RocketMQ发送延时消息

  • 引入RocketMQ组件
<dependency>
 <groupId>org.apache.rocketmq</groupId>
 <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>


  • 增加RocketMQ的配置
rocketmq:
  name-server: 172.31.0.44:9876
  producer:
    group: delay-group


  • 编写生产者
@Component
@Slf4j
public class DelayProduce {
    @Autowired
    private RocketMQTemplate rocketMQTemplatet;
    public void sendDelayMessage(String topic,String message,int delayLevel){
       SendResult sendResult = rocketMQTemplatet.syncSend(topic, MessageBuilder.withPayload(message).build(), 2000, delayLevel);
        log.info("sendtime is {}", DateTimeFormatter.ofPattern("yyyy年MM月dd日 HH:mm:ss").format(LocalDateTime.now()));
        log.info("sendResult is{}",sendResult);
    }
}


  • 编写消费者
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "delay-topic",
        consumerGroup = "delay-group"
)
public class DelayConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("received message time is {}", DateTimeFormatter.ofPattern("yyyy年MM月dd日 HH:mm:ss").format(LocalDateTime.now()));
        log.info("received message is {}",message);
    }
}


  • 测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayProduceTest {
    @Autowired
    private DelayProduce delayProduce;
    @Test
    public void sendDelayMessage() {
        delayProduce.sendDelayMessage("delay-topic","Hello,JAVA日知录",5);
    }
}

这里delayLevel设置成5,对应RocketMQ的延时等级就是1分钟后投递消息。

  • 运行结果

发送时间消费时间


修改延时级别

RocketMQ的延迟等级可以进行修改,以满足自己的业务需求,可以修改/添加新的level。例如:你想支持1天的延迟,修改最后一个level的值为1d,这个时候依然是18个level;也可以增加一个1d,这个时候总共就有19个level。

  • 打开RocketMQ的配置文件,修改messageDelayLevel 属性
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
storePathRootDir = /app/rocketmq/data
messageDelayLevel=90s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

这次将延时等级1修改成了90s,生产者发送消息后需要90s后再进行消息投递。修改完成后重启RocketMQ。

nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf &

  • 使用延时等级1发送消息
public void sendDelayMessage() {
 delayProduce.sendDelayMessage("delay-topic","Hello,JAVA日知录",1);
}
  • 测试

发送时间消费时间

通过比对发送时间与消费时间证明延时等级修改生效。

以上,希望对你有所帮助。

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
JSON API 开发者
钉钉如何调用「创建钉钉待办任务」接口?
钉钉如何调用「创建钉钉待办任务」接口?
563 0
|
关系型数据库 Go 数据处理
高效数据迁移:使用Go语言优化ETL流程
在本文中,我们将探索Go语言在处理大规模数据迁移任务中的独特优势,以及如何通过Go语言的并发特性来优化数据提取、转换和加载(ETL)流程。不同于其他摘要,本文不仅展示了Go语言在ETL过程中的应用,还提供了实用的代码示例和性能对比分析。
|
11月前
|
人工智能 Cloud Native Serverless
从零到一:阿里云CAP助你轻松高效构建云应用
云原生应用开发平台CAP是阿里云提供的一站式应用开发及生命周期管理平台。它内置丰富的Serverless和AI应用模板、先进的开发者工具和企业级应用管理功能,帮助个人和企业开发者快速构建、部署和管理云上应用,大幅提升研发、部署和运维效能。CAP支持Web应用、AI应用、ETL数据处理等多种场景,提供图形化、低代码的流程编排能力,助力开发者高效构建复杂业务流程。
|
监控 Linux
在Linux中,如何进行系统性能瓶颈分析?
在Linux中,如何进行系统性能瓶颈分析?
|
安全 Go 数据库
「有问必答」秒杀系统 Go并发编程实践!
「有问必答」秒杀系统 Go并发编程实践!
264 2
|
消息中间件 存储 监控
超详细:这份全网首发的Kafka技术手册,从基础到实战一应俱全
Kafka正在爆炸式增长。超过三分之一的财富500强企业都使用Kafka。这些公司包括十大旅游公司,十大银行中的七家,十大保险公司中的八家,十大电信公司中的九家,以及更多。LinkedIn,微软和Netflix每天使用Kafka(1,000,000,000,000)处理万亿级的消息。Kafka用于实时数据流,收集大数据或进行实时分析(或两者兼而有之)。Kafka与内存微服务一起使用以提供可靠性,它可用于向 CEP(复杂事件流系统)和IoT / IFTTT式自动化系统提供事件。
|
消息中间件 缓存 前端开发
新项目,不妨采用这种架构分层,很优雅!
新项目,不妨采用这种架构分层,很优雅!
397 0
|
NoSQL Java Redis
SpringBoot中如何实现限流,这种方式才叫优雅!
SpringBoot中如何实现限流,这种方式才叫优雅!
452 0
|
Kubernetes Cloud Native 网络协议
微服务上云后本地如何联调?
微服务上云后本地如何联调?
407 0
|
存储 NoSQL 算法
SpringBoot 如何进行限流?老鸟们还可以这样玩!
SpringBoot 如何进行限流?老鸟们还可以这样玩!
278 0