源码分析ElasticJob任务错过机制(misfire)与幂等性

简介: 任务在调度执行中,由于某种原因未执行完毕,下一次调度任务触发后,在同一个Job实例中,会出现两个线程处理同一个分片上的数据,这样就会造成两个线程可能处理到相同的数据。为了避免同一条数据可能会被多次执行的问题,ElasticJob引入幂等机制,确保同一条数据不会再被多个Job同时处理,也避免同一条数据在同一个Job实例的多个线程处理。

任务在调度执行中,由于某种原因未执行完毕,下一次调度任务触发后,在同一个Job实例中,会出现两个线程处理同一个分片上的数据,这样就会造成两个线程可能处理到相同的数据。为了避免同一条数据可能会被多次执行的问题,ElasticJob引入幂等机制,确保同一条数据不会再被多个Job同时处理,也避免同一条数据在同一个Job实例的多个线程处理。再重申一次ElastciJob的分布式是数据的分布式,一个任务在多个Job实例上运行,每个Job实例处理该Job的部分数据(数据分片)。

本文重点分析ElasticJob是如何做到如下两点的。

  1. ElasticJob如何确保在同一个Job实例中多个线程不会处理相同的数据。
  2. ElasticJob如何确保数据不会被多个Job实例处理。
    为了解决上述这种情况,ElasticJob引入任务错过补偿执行(misfire)与幂等机制(monitorExecution)

1、ElasticJob如何确保在同一个Job实例中多个线程不会处理相同的数据。

场景:例如任务调度周期为每5s执行一次,正常每次调度任务处理需要耗时2s,如果在某一段时间由于数据库压力变大,导致原本只需要2s就能处理完成的任务,现在需要16s才能运行,在这个数据处理的过程中,每5s又会触发一次调度(任务处理),如果不加以控制的话,在同一个实例上根据分片条件去查询数据库,查询到的数据有可能相同(部分相同),这样同一条任务数据将被多次运行,如果这个任务时处理转账业务,如果在业务方法不实现幂等,则会引发非常严重的问题,那ElasticJob是否可以避免这个问题呢?

答案是肯定。elasticJob提供了一个配置参数:monitorExecution=true,开启幂等性。

一个任务触发后,将执行任务处理逻辑,其入口:AbstractElasticJobExecutor#misfireIfRunning

if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {  // @1
       if (shardingContexts.isAllowSendJobEvent()) {  // @2
             jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
                    "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, 
                    shardingContexts.getShardingItemParameters().keySet()));
       }
      return;
}

代码@1:在一个调度任务触发后如果上一次任务还未执行,则需要设置该分片状态为mirefire,表示错失了一次任务执行。

代码@2:如果该分片被设置为mirefire并开启了事件跟踪,将事件跟踪保存在数据库中。

接下来详细分析JobFacade.misfireIfRunning的实现逻辑:

/**
     * 如果当前分片项仍在运行则设置任务被错过执行的标记.
     * 
     * @param items 需要设置错过执行的任务分片项
     * @return 是否错过本次执行
     */
    public boolean misfireIfHasRunningItems(final Collection<Integer> items) {
        if (!hasRunningItems(items)) {
            return false;
        }
        setMisfire(items);
        return true;
    }

如果存在未完成的分片,则调用setMisfire(items)方法,ElasticJob在开启monitorExecution(true)【幂等机制】机制的情况下,在分片任务开始时会创建${namespace}/jobname/sharding/{item}/running节点,在任务结束后会删除该目录,所以在判断是否有分片正在运行时,只需判断是否存在上述节点即可。如果存在,调用setMisfire方法。

PS:如果ElasticJob为开启幂等(monitorExecution)的情况下,才会创建${namespace}/jobname/sharding

/{item}/running,misfire机制才能生效。

ExecutionService#setMisfire

/**
     * 设置任务被错过执行的标记.
     *
     * @param items 需要设置错过执行的任务分片项
     */
    public void setMisfire(final Collection<Integer> items) {
        for (int each : items) {
            jobNodeStorage.createJobNodeIfNeeded(ShardingNode.getMisfireNode(each));
        }
    }

设置misfire的方法为分配给该实例下的所有分片创建持久节点${namespace}/jobname/shading/{item}/misfire节点,注意,只要分配给该实例的任何一分片未执行完毕,则在该实例下的所有分片都增加misfire节点,然后忽略本次任务触发执行,等待任务结束后再执行。

AbstractElasticJobExecutor#execute

execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
     while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
         jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
        execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
}

在任务执行完成后检查是否存在${namespace}/jobname/sharding/{item}/misfire节点,如果存在,则首先清除misfie相关的文件,然后执行任务。

ElasticJob的misfire实现方案总结:

在下一个调度周期到达之后,只要发现这个分片的任何一个分片正在执行,则为该实例分片的所有分片都设置为misfire,等任务执行完毕后,再统一执行下一次任务调度。

2、ElasticJob如何确保数据不会被多个Job实例处理

ElasticJob基于数据分片,不同分片根据分片参数(人为配置),从数据库中查询各自数据(任务数据分片),如果当节点宕机,数据会重新分片,如果任务未执行完成,然后执行分片,数据是否会被不同的任务同时处理呢?

答案是不会,因为当节点宕机后,是否需要重新分片事件监听器会监听到Job实例代表的节点删除,设置重新分片,在任务被调度执行具体处理逻辑之前,需要重新分片,重新分片的前提又是要所有的分片的任务全部执行完毕,这也依赖是否开启幂等控制(monitorExecution),如果开启,ElasticJob能感知正在执行处理逻辑的分片,重新分片需要等待当前所有任务全部运行完毕后才会触发,故不会存在不同节点处理相同数据的问题。

问答:
1、如果一个任务JOB的调度频率为每10s一次,在某个时间,该job执行耗时用了33s(平时只需执行5s),按照正常调度,应该后续会触发3次调度,那该job后执行完,会连续执行3次调度吗?
答案:在33s这次任务执行完成后,如果后面的任务执行在10s内执行完毕的话,只会触发一次,不会补偿3次,因为ElasticJob记录任务错失执行,只是创建了misfire节点,并不会记录错失的此时,因为也没这个必要。


原文发布时间为:2018-12-11
本文作者:丁威,《RocketMQ技术内幕》作者。
本文来自中间件兴趣圈,了解相关信息可以关注中间件兴趣圈

目录
相关文章
|
7月前
|
消息中间件 负载均衡 Java
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
101 0
|
11天前
|
消息中间件 存储 监控
深度写作:深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(Message Queue, MQ)扮演着至关重要的角色。MQ不仅实现了应用间的解耦,还提供了异步消息处理、流量削峰等功能。而在MQ的众多特性中,长轮询(Long Polling)机制因其能有效提升消息处理的实时性和效率,备受关注。
50 12
|
11天前
|
消息中间件 存储 Java
深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(MQ)作为一种重要的中间件,广泛应用于解耦、异步处理、流量削峰等场景。其中,延时消息和定时消息作为MQ的高级功能,能够进一步满足复杂的业务需求。为了实现这些功能,MQ系统需要进行一系列优化,长轮询机制便是其中的关键一环。本文将深入探讨MQ如何设计延时消息和定时消息的优化机制,特别是长轮询机制的实现原理及其在Java中的模拟实现。
29 2
|
5月前
|
消息中间件 Kafka 微服务
微服务数据问题之MetaQ设置同步异步刷盘如何解决
微服务数据问题之MetaQ设置同步异步刷盘如何解决
|
7月前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
97 0
|
5月前
|
消息中间件 Java RocketMQ
MetaQ/RocketMQ 原理问题之被动控速工作的问题如何解决
MetaQ/RocketMQ 原理问题之被动控速工作的问题如何解决
|
7月前
|
Dubbo Java 应用服务中间件
微服务框架(十五)Dubbo 超时机制及服务降级
此系列文章将会描述Java框架Spring Boot、服务治理框架Dubbo、应用容器引擎Docker,及使用Spring Boot集成Dubbo、Mybatis等开源框架,其中穿插着Spring Boot中日志切面等技术的实现,然后通过gitlab-CI以持续集成为Docker镜像。 本文为Dubbo超时机制及服务降级 当服务出现创建超时的时候,TimeoutFilter会打印该创建记录的详细信息,日志级别为WARN,即为可恢复异常,或瞬时的状态不一致
|
7月前
|
缓存 负载均衡 Dubbo
从源码全面解析 dubbo 消费端服务调用的来龙去脉
从源码全面解析 dubbo 消费端服务调用的来龙去脉
|
消息中间件 存储 NoSQL
【2021年遇到最头疼的Bug】【Alibaba中间件技术系列】「RocketMQ技术专题」Broker配置介绍及发送流程、异常(XX Busy)问题分析总结
【2021年遇到最头疼的Bug】【Alibaba中间件技术系列】「RocketMQ技术专题」Broker配置介绍及发送流程、异常(XX Busy)问题分析总结
674 9
【2021年遇到最头疼的Bug】【Alibaba中间件技术系列】「RocketMQ技术专题」Broker配置介绍及发送流程、异常(XX Busy)问题分析总结
Nacos 和 Apollo中的 长轮询 定时机制,太好用了
今天这篇文章来介绍一下Nacos配置中心的原理之一:长轮询机制的应用 为方便理解与表达,这里把 Nacos 控制台和 Nacos 注册中心称为 Nacos 服务器(就是 web 界面那个),我们编写的业务服务称为 Nacso 客户端; Nacos 动态监听的长轮询机制原理图,本篇将围绕这张图剖析长轮询定时机制的原理: