分布式批处理框架在大促场景下的设计与实现

简介: 分布式批处理框架在大促场景下的设计与实现




在本次双十一之前,我们上线了新版的批处理框架,完整支撑了大促的招商。通过SDK接入,可以直接在业务应用中实现任务逻辑,接入便捷;通过中心化调度与任务分发,处理过程提效明显。


背景


在B端系统中,批处理能力是不可或缺的,它可以帮助用户批量完成一系列动作,降低重复操作的成本。在大促招商系统中,我们也需要一套在线批处理框架,来支持商家的批量商品报名、批量主图打标、一键发布、导出已报商品等操作,让商家可以批量上传数据、管理操作记录、查看批量操作的结果明细。这些任务输入数据的来源五花八门,有Excel、DB、OpenSearch等,框架需要能支持各种类型输入数据的解析。同时,招商系统的应用数量较多,需要能同时支持各个域的应用便捷接入,最好是只需要引包,然后实现任务逻辑即可。在大数据量的场景下,框架需要能支持对不同类型的任务实例精细化调度,同时保证系统的吞吐量和稳定性。


整体方案


 架构设计


业务容器为接入框架的应用,任务中心是批处理框架的中心应用。实例的调度、状态变更在任务中心完成,方便做中心化管理;实例的执行逻辑在业务容器中实现,所以在执行时需要回调业务容器。



 模型设计


要在单条数据项维度调度任务实例,除了任务注册信息、任务实例外,还需要引入子任务实例的模型。任务注册信息对象含有某个任务的任务类型、执行限流值等信息。用户每次批量上传数据都会生成一个主任务实例,单条数据项对应一个子任务实例。


 主要流程


核心流程借鉴了MapReduce的思想,将一个大任务拆分后分发到多机去执行,最后再进行结果汇总。业务容器在接入时需要实现一个任务的主实例拆分、子实例执行和结果合并逻辑。主实例拆分时会将用户输入数据解析为子实例,落DB存储;子实例执行是单条数据项的执行逻辑;结果合并是将子实例的处理结果统计后展示(如生成任务明细Excel)。


实例被调度触发后任务中心会调用业务容器中实现的主实例拆分方法,主实例被拆分后需要分批回传数据。任务中心执行实例时会将子实例扫出来,通过rocketmq自发自收将子实例分发到任务中心的整个集群,接到消息后调用业务容器执行子实例,拿到结果后更新子实例状态,消费成功。通过ScheduleX任务定时扫描执行中主实例的子实例状态,当子实例全部执行完成后回调业务容器执行结果合并逻辑,最终将任务归档。


 状态机


主任务

子任务


关键技术点


 调度执行


  • 限流组件


限流组件使用的是guava包中的实现,任务注册时需要分别配置主实例和子实例执行的限流值,限流也在任务类型维度来做,调度时根据任务的key取到对应的限流器。限流器缓存在机器本地,过期后重新查询任务注册信息,新建限流器。目前只在单机维度做了限流,用集群限流数除以任务中心的集群机器数,得到单机限流值。



  • 主实例调度


任务实例创建后,会尝试获取一次令牌,如果能获取到,那主实例会直接被触发,执行后续流程。如果没有获取到令牌,任务会停留在待触发状态,等待ScheduleX任务定时捞起重新尝试执行。



  • 子实例调度


主实例在执行时会先在任务中心通过rocketmq将所有子实例分发到整个集群,然后同步调用业务容器执行并获取到子实例的结果。子实例的限流是通过控制消息消费速率实现的。在任务中心消费rocketmq时阻塞获取令牌。因为所有任务类型共用一个topic,消息分发速率也做了上限控制,否则大实例会导致消息积压从而阻塞其他类型任务的执行。


如果在更新子实例状态前机器重启,利用mq的重试机制,对于幂等的任务类型,可直接重新执行,不支持幂等的任务类型在消息重试时子实例直接更新为失败即可。


 中心与客户端通信


执行任务的实现逻辑时,需要回调业务容器,SDK中需要有支持供任务中心主动发起的通信方式。具体实现为:

  1. 客户端侧:提供Dubbo接口,业务应用启动时注册服务,用Dubbo的group区分不同应用(group不能重复,所以直接使用应用名称作为group)
  2. 任务中心侧:注册为所有业务应用服务的消费者,在需要回调业务容器时,先根据任务注册的应用找到对应消费者,通过消费者向业务应用发起调用


主实例的拆分和结果合并采用异步调用,子实例考虑到已经是拆分后的结果,目前只支持同步调用。


 实例探活


考虑到主实例的拆分和结果合并执行时间较长,极端场景下有几十万条数据的读写,所以主实例的拆分和结果合并对客户端的调用都是异步的。在异步场景下需要考虑如何对业务应用进行探活并重试,否则一旦机器重启,正在执行的任务实例便会停留在中间状态,产生大量脏数据。


探活采用的方案是客户端侧上报心跳结合任务中心侧定时任务检测心跳。客户端接收到请求后会在本地定时上报任务实例的心跳,即更新DB中实例的心跳时间,执行结束后不再上报。任务中心通过ScheduleX任务扫表检测心跳超时的任务实例,重新向客户端发起请求。



以上解决的是业务应用重启时实例执行中断的问题,如果任务中心应用重启,也会导致对实例的部分操作中断(如子实例分发),同样也用心跳时间探活来解决,对超时的实例,重新执行当前状态的操作,防止实例永久停留在中间状态。

 客户端实现

业务容器接入时,只需要实现SDK中的MapReduceTask类并实现主任务实例拆分、子任务实例执行和结果合并的方法,其余逻辑在SDK中内置,无需关心。定义类时还需要声明任务类型的唯一标识key,用于在客户端匹配任务类型和具体实现。具体demo如下:

@BatchTask(key = "myDemo")
public class MyDemo implements MapReduceTask {
    /**
     * 主任务实例拆分
     *
     * @param context context
     * @return {@link TaskResult}
     */
    @Override
    public TaskResult processInstance(ExecuteContext executeContext) {
        while (true) {
            // 分批读取输入数据
            ...
            // 生成子实例
            List<SubInstance> subInstances = ...;
            // 提交数据
            executeContext.commit(subInstances);
        }
        return TaskResult.success();
    }
    /**
     * 子任务实例执行
     *
     * @param context context
     * @return {@link TaskResult}
     */
    @Override
    public TaskResult map(SubExecuteContext subExecuteContext) {
        // do something
        ...
        return TaskResult.success();
    }
    /**
     * 结果合并
     *
     * @param context 上下文
     * @return {@link TaskResult}
     */
    @Override
    public TaskResult reduce(ExecuteContext executeContext) {
        do {
            // 读取子实例
            List<SubInstance> subInstanceList = executeContext.read(pageSize);
            if (CollectionUtils.isEmpty(subInstanceList)) {
                break;
            }
            // 构建结果明细
            ...
        } while (subInstanceList.size() == pageSize);
        // 生成结果数据
        Map<String, Object> resultInfoMap = ...;
        return TaskResult.success(resultInfoMap);
    }
}

在主任务的拆分与结果合并逻辑中,分别会需要对子实例进行读与写操作,所以在任务执行上下文ExecuteContext中,提供了commit()和read()方法供客户端调用,其中写逻辑需要在业务容器中构建好子实例对象并提交。


考虑到子实例的读写如果每次通过Dubbo调用任务中心,高频次读写下会增加网络超时等异常的数量,所以采用了直连DB的方案,SDK中内置了与DB的交互层。为了提高写DB的效率,commit后的数据会存在任务执行上下文的缓冲区中,超过阈值后会向DB批量插入一次数据,最后清空缓冲区,插入剩余数据。


效果与展望

新版批处理框架上线后目前完成了批量报名、一键报名、导出已报商品、批量撤销等多个商家关键操作的迁移与接入,完整、稳定支撑了本次双十一大促的招商,处理了130W+的任务实例。目前系统整体较稳定,任务支持灵活配置,系统整体可监控。


关于未来优化的方向:因为子实例是通过mq分发的,导致如果大量子实例执行被限流,可能会阻塞后面其他任务没有被限流的子实例的消息。目前消息做了业务身份的隔离,发送速率上限也做了粗略地控制,未来会尝试通过匹配消息单机发送与集群消费的速率、不同类型任务的分组消费,提高整个系统的在极端场景下的吞吐量。


团队介绍


淘天核心技术团队,持续建设全网比价、用增、交易、招商、选品、搭建、投放等能力,支撑大促(双11、618等)和日销业务。简单、高效、纯粹的技术文化,在使命与责任中互相成就,共同成长。
Base杭州职位热招中:Java开发工程师、前端工程师、测试开发工程师、数据分析师。详情联系:zhengchunfu.zcf@taobao.com



相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8月前
|
数据采集 存储 数据可视化
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
872 0
分布式爬虫框架Scrapy-Redis实战指南
|
6月前
|
监控 Java 调度
SpringBoot中@Scheduled和Quartz的区别是什么?分布式定时任务框架选型实战
本文对比分析了SpringBoot中的`@Scheduled`与Quartz定时任务框架。`@Scheduled`轻量易用,适合单机简单场景,但存在多实例重复执行、无持久化等缺陷;Quartz功能强大,支持分布式调度、任务持久化、动态调整和失败重试,适用于复杂企业级需求。文章通过特性对比、代码示例及常见问题解答,帮助开发者理解两者差异,合理选择方案。记住口诀:单机简单用注解,多节点上Quartz;若是任务要可靠,持久化配置不能少。
652 4
|
6月前
|
NoSQL 算法 安全
redis分布式锁在高并发场景下的方案设计与性能提升
本文探讨了Redis分布式锁在主从架构下失效的问题及其解决方案。首先通过CAP理论分析,Redis遵循AP原则,导致锁可能失效。针对此问题,提出两种解决方案:Zookeeper分布式锁(追求CP一致性)和Redlock算法(基于多个Redis实例提升可靠性)。文章还讨论了可能遇到的“坑”,如加从节点引发超卖问题、建议Redis节点数为奇数以及持久化策略对锁的影响。最后,从性能优化角度出发,介绍了减少锁粒度和分段锁的策略,并结合实际场景(如下单重复提交、支付与取消订单冲突)展示了分布式锁的应用方法。
513 3
|
6月前
|
存储 NoSQL Java
从扣减库存场景来讲讲redis分布式锁中的那些“坑”
本文从一个简单的库存扣减场景出发,深入分析了高并发下的超卖问题,并逐步优化解决方案。首先通过本地锁解决单机并发问题,但集群环境下失效;接着引入Redis分布式锁,利用SETNX命令实现加锁,但仍存在死锁、锁过期等隐患。文章详细探讨了通过设置唯一标识、续命机制等方法完善锁的可靠性,并最终引出Redisson工具,其内置的锁续命和原子性操作极大简化了分布式锁的实现。最后,作者剖析了Redisson源码,揭示其实现原理,并预告后续关于主从架构下分布式锁的应用与性能优化内容。
337 0
|
11月前
|
存储 监控 数据可视化
常见的分布式定时任务调度框架
分布式定时任务调度框架用于在分布式系统中管理和调度定时任务,确保任务按预定时间和频率执行。其核心概念包括Job(任务)、Trigger(触发器)、Executor(执行器)和Scheduler(调度器)。这类框架应具备任务管理、任务监控、良好的可扩展性和高可用性等功能。常用的Java生态中的分布式任务调度框架有Quartz Scheduler、ElasticJob和XXL-JOB。
4188 66
|
10月前
|
数据采集 人工智能 分布式计算
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
阿里云推出的MaxFrame是链接大数据与AI的分布式Python计算框架,提供类似Pandas的操作接口和分布式处理能力。本文从部署、功能验证到实际场景全面评测MaxFrame,涵盖分布式Pandas操作、大语言模型数据预处理及企业级应用。结果显示,MaxFrame在处理大规模数据时性能显著提升,代码兼容性强,适合从数据清洗到训练数据生成的全链路场景...
546 5
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
|
存储 Java 关系型数据库
在Spring Boot中整合Seata框架实现分布式事务
可以在 Spring Boot 中成功整合 Seata 框架,实现分布式事务的管理和处理。在实际应用中,还需要根据具体的业务需求和技术架构进行进一步的优化和调整。同时,要注意处理各种可能出现的问题,以保障分布式事务的顺利执行。
916 53
|
10月前
|
人工智能 分布式计算 大数据
MaxFrame 产品评测:大数据与AI融合的Python分布式计算框架
MaxFrame是阿里云MaxCompute推出的自研Python分布式计算框架,支持大规模数据处理与AI应用。它提供类似Pandas的API,简化开发流程,并兼容多种机器学习库,加速模型训练前的数据准备。MaxFrame融合大数据和AI,提升效率、促进协作、增强创新能力。尽管初次配置稍显复杂,但其强大的功能集、性能优化及开放性使其成为现代企业与研究机构的理想选择。未来有望进一步简化使用门槛并加强社区建设。
479 8
|
11月前
|
NoSQL Java Redis
秒杀抢购场景下实战JVM级别锁与分布式锁
在电商系统中,秒杀抢购活动是一种常见的营销手段。它通过设定极低的价格和有限的商品数量,吸引大量用户在特定时间点抢购,从而迅速增加销量、提升品牌曝光度和用户活跃度。然而,这种活动也对系统的性能和稳定性提出了极高的要求。特别是在秒杀开始的瞬间,系统需要处理海量的并发请求,同时确保数据的准确性和一致性。 为了解决这些问题,系统开发者们引入了锁机制。锁机制是一种用于控制对共享资源的并发访问的技术,它能够确保在同一时间只有一个进程或线程能够操作某个资源,从而避免数据不一致或冲突。在秒杀抢购场景下,锁机制显得尤为重要,它能够保证商品库存的扣减操作是原子性的,避免出现超卖或数据不一致的情况。
316 10
|
11月前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
424 2