企业级分布式批处理方案

简介: 在企业级大数据量批处理需求场景中,如何通过分布式方式来有效地提升处理效率。本文将就常见批处理框架Spring Batch与SchdulerX进行比较讨论。同时基于阿里巴巴分布式任务调度平台SchedulerX2.0,实现一个分布式并行批处理方案,展示其相关的功能特性。

前言

先来讲解下在分布式任务调度SchedulerX中的可视化MapReduce模型是什么,从字面来理解可视化MapReduce就是一个任务切分多个子任务并行处理。在简单单机场景下就是开启多线程来同时处理一个大任务,在多个机器下可以有多台机器同时并行处理同一个任务,分布式调度的可视化MapReduce模型就是为使用者在代码开发层面屏蔽上述并行协调的逻辑,让使用者可以通过简单的业务逻辑开发完成并行任务设计开发。该模型场景与很多批处理框架的处理模型有其相似及可替代之处。

可视化MapReduce模型结构

批处理框架对比

常见的Spring Batch批处理框架,其提供的批处理能力与分布式任务调度中的可视化MapReduce能力有着相似的功能。Spring Batch任务批处理框主要提供:单进程多线程处理、分布式多进程处理两种方式。在单进程多线程处理模式下,用户可自行定一个Job作为一个批处理任务单元,在Job是由一个或多个Step步骤进行串联或并行组成,每一个Step又分别由reader、process、writer构成来完成每一步任务的读取、处理、输出。

对于Spring Batch框架个人觉得单进程下多线程实践意义并不是太大,主要是如果在较小批量数据任务处理采用该框架来做有点费功夫,完全可以自行开线程池来解决问题;那么在数据量较大的批处理处理场景下,单进程的处理能力显然是存在处理能力扩展瓶颈的。因此数据批处理场景下可能分布式多进程远程协调处理能力更具有实践意义。在Spring Batch中提供了远程分片/分区处理能力,将某一个Step中过根据特定的规则将任务拆分成多个子任务并分发给集群中其他的worker来处理,以实现远程并行处理能力。其相关的远程交互能力常见是借助第三方消息中间件来完成子任务分发和执行结果汇聚。

优劣分析

SchedulerX的可视化MapReduce与Spring Batch框架,综合比较结果:

  • 批处理能力:两者都支持单机多线程、分布式多进程处理,Spring Batch在批处理细节能力完备,SchdulerX可视化MapReduce定义更简洁。
  • 定时调度:SchdulerX本身自带定时调度能力;Spring Batch无该能力需集成三方定时框架。
  • 管控能力:SchdulerX具备任务可视化管控能力;Spring Batch常见采用程序或文件配置任务,管控台需额外搭建且能力较弱。
  • 集成难度:Spring Batch开源框架集成,分布式并行能力需额外第三方中间件集成搭建;SchdulerX集成SDK自动对接阿里云公有云平台,无需额外组件实现分布式并行处理弹性能力。

开发可视化MapReduce任务

下面讲解如果在任务调度平台上创建可视化MapReduce任务,对于业务应用开发者而言主要是完成下面的步骤:

1、注册应用

任务调度平台上创建应用分组,构建一个属于自己业务应用的服务接入点

2、集成对接

在自己的业务应用程序中添加SDK集成依赖,配置相应的应用分组接入信息,部署发布应用集群即可。在控制台->应用管理实例信息中即可查看应用节点的对接信息。

<dependency><groupId>com.aliyun.schedulerx</groupId><artifactId>schedulerx2-spring-boot-starter</artifactId><version>1.4.2</version></dependency>
# 命名空间ID
spring.schedulerx2.namespace=********-ba41-d40c7153c838
# 对接平台服务地址,本地调试可采用如下“公网”;其他region详见官方文档
spring.schedulerx2.endpoint=acm.aliyun.com 
# 平台上创建的应用分组id
spring.schedulerx2.groupId=********
# 平台上创建的应用分组key
spring.schedulerx2.appKey=********

3、可视化MapReduce任务开发

开发符合自己业务场景需要可视化MapReduce任务,主要分为两个步骤:1.子任务拆分逻辑;2.单个子任务业务处理逻辑。任务运行时在集群中的子任务分布式协调处理逻辑开发者都无需关心。相应代码开发完成后即可发布部署至应用集群。

@ComponentpublicclassParallelJobextendsMapReduceJobProcessor {
privatestaticfinalLoggerlogger=LoggerFactory.getLogger("schedulerx");
@OverridepublicProcessResultreduce(JobContextcontext) throwsException {
returnnewProcessResult(true);
    }
@OverridepublicProcessResultprocess(JobContextcontext) throwsException {
if(isRootTask(context)){
// ---------------------------------------------------------// Step1. 子任务的拆分逻辑开发,子任务对象可为业务自定义(内容务必简洁)// ---------------------------------------------------------logger.info("构建子任务列表...");
List<ParallelAccountInfo>list=newLinkedList();
/***  判断如果是rootTask的情况下,构建子任务对象列表*  在实际业务场景中,用户可自行根据业务场景加载子任务对象且该业务对象实现BizSubTask接口*  场景案例:*  1、从数据库中加载未被处理的客户账户信息*  2、构建省份城市地区信息列表,按区域分发任务处理*  3、根据业务标签作为子任务分类,如:电器、日用品、食品等*  4、可根据时间作为子任务分类,如:按月(1月、2月...)*/for(inti=0; i<20; i++){
list.add(newParallelAccountInfo(i, "CUS"+StringUtils.leftPad(i+"", 4, "0"),
"AC"+StringUtils.leftPad(i+"", 12, "0")));
            }
returnmap(list, "transfer");
        }else {
// ---------------------------------------------------------// Step2. 针对单个子任务的业务逻辑处理// ---------------------------------------------------------/*** 非rootTask,用户可以获取对应的子任务信息进行相应的业务处理*/ParallelAccountInfoobj= (ParallelAccountInfo)context.getTask();
// 针对获取的 obj子任务信息,强制还原为业务对象进行相应业务逻辑处理logger.info("处理子任务信息:{}", JSON.toJSONString(obj));
returnnewProcessResult(true);
        }
    }
}

4、配置任务

针对上述开发的可视化MapReduce任务在控制台->任务管理中配置创建相应的任务信息,实现调度平台上任务的触发运行。在高级设置中可配置单机子任务线程数量等相关参数。

5、查看执行结果

任务在执行后,在控制台->执行记录中选择对应的任务实例查看其执行详情,在子任务列表中会显示每一个拆分的子任务分发执行信息,包括:包括该子任务执行状态、执行节点、日志等。

新特性(业务标签)

在任务调度最新的专业版应用中提供了子任务业务标签功能,在基础版的子任务列表中主要面临以下几个困境:

  • 某子任务执行报错或持续运行中,无法快速直观知道具体是哪个业务子任务单元处理出现了问题
  • 在大量的子任务记录中,用户无法精准查询具体某个业务子任务的处理情况信息

因此可视化MapReduce任务自定义的业务标签,主要是用于解决上述需求场景。业务如何对子任务进行自定义标签设置参考如下:

publicclassParallelAccountInfoimplementsBizSubTask {
// 业务参数按业务场景需求自定义privatelongid;
privateStringname;
privateStringaccountId;
publicParallelAccountInfo(longid, Stringname, StringaccountId) {
this.id=id;
this.name=name;
this.accountId=accountId;
    }
/*** 自定义处理核心:* 业务实体需实现BizSubTask:labelMap方法,* 在该方法实现中设置对应子任务的标签信息,挑取子任务中核心业务信息作为lable* @return*/@OverridepublicMap<String, String>labelMap() {
Map<String, String>labelMap=newHashMap();
labelMap.put("户名", name);
returnlabelMap;
    }
}

可视化MapReduce分发处理的子任务实体对象在实现上述接口逻辑后,在子任务列表中即可展现出相应的子任务标签内容信息,且可以通过模糊查询功能锁定需要查找的子任务处理情况。

运用场景

可视化MapReduce模型或者说批处理场景,在实际企业级应用中是有大量的需求存在。一些常见的使用方式如:

  • 针对分库分表数据批量并行处理,将分库或分表信息作为子任务对象在集群节点间分发实现并行处理
  • 按城市区域的物流订单数据处理,将城市和区域作为子任务对象在集群节点间分发实现并行处理
  • 鉴于可视化MapReduce子任务可视化能力,可将重点客户/订单信息作为子任务处理对象,来进行相应数据报表处理或信息推送,以实现重要子任务的可视化跟踪处理

基金销售业务案例

以下提供一个基金销售业务常见下案例以供参考,以便使用者在自己的业务场景下自由发挥。案例说明:在基金公司与基金销售公司(如:蚂蚁财富)之间每天会由投资者的账户/交易申请需要进行数据同步处理,往往采用的是文件数据交互,一个基金公司对接着N多家销售商(反之亦然),每家销售商提供的数据文件完全独立;每一个销售商的数据文件都需要经过文件校验、接口文件解析、数据校验、数据导入这么几个固定步骤。销售商上述的固定处理步骤就非常适合采用批处理并行方式以加快数据文件处理。

@ComponentpublicclassFileImportJobextendsMapReduceJobProcessor {
privatestaticfinalLoggerlogger=LoggerFactory.getLogger("schedulerx");
@OverridepublicProcessResultreduce(JobContextcontext) throwsException {
returnnewProcessResult(true);
    }
@OverridepublicProcessResultprocess(JobContextcontext) throwsException {
if(isRootTask(context)){
// ---------------------------------------------------------// Step1. 读取对接的销售商列表Code// ---------------------------------------------------------logger.info("以销售商为维度构建子任务列表...");
// 伪代码从数据库读取销售商列表,Agency类需要实现BizSubTask接口并可将// 销售商名称/编码作为子任务标签,以便控制台可视化跟踪List<Agency>agencylist=getAgencyListFromDb();
returnmap(agencylist, "fileImport");
        }else {
// ---------------------------------------------------------// Step2. 针对单个销售商进行对应文件数据的处理// ---------------------------------------------------------Agencyagency= (Agency)context.getTask();
Filefile=loadFile(agency);
logger.info("文件加载完成.");
validFile(file);
logger.info("文件校验通过.");
List<Request>request=resolveRequest(file);
logger.info("文件数据解析完成.");
List<Request>request=checkRequest(request);
logger.info("申请数据检查通过.");
importRequest(request);
logger.info("申请数据导入完成.");
returnnewProcessResult(true);
        }
    }
}

上面描述的案例主要是将基金交易清算中的一个业务环节,采用并行批处理方式来进行处理,其他后续处理的每一个环节也可以采用该方式来进行处理。后续还可以通过每一个可视化MapReduce任务节点DAG依赖编辑构建一个完整的自动业务清算流程。在SchdulerX任务调度平台处理类似案例,可有效解决以下问题:

  • 可通过动态弹性扩容来应对业务量增长带来的处理效率提升
  • 每个销售商独立可视化处理分析,可单独针对某个销售商查看其处理状态以及其处理的业务日志信息
目录
相关文章
|
6月前
|
人工智能 负载均衡 Java
Spring AI Alibaba 发布企业级 MCP 分布式部署方案
本文介绍了Spring AI Alibaba MCP的开发与应用,旨在解决企业级AI Agent在分布式环境下的部署和动态更新问题。通过集成Nacos,Spring AI Alibaba实现了流量负载均衡及节点变更动态感知等功能。开发者可方便地将企业内部业务系统发布为MCP服务或开发自己的AI Agent。文章详细描述了如何通过代理应用接入存量业务系统,以及全新MCP服务的开发流程,并提供了完整的配置示例和源码链接。未来,Spring AI Alibaba计划结合Nacos3的mcp-registry与mcp-router能力,进一步优化Agent开发体验。
2228 15
|
6月前
|
NoSQL 算法 安全
redis分布式锁在高并发场景下的方案设计与性能提升
本文探讨了Redis分布式锁在主从架构下失效的问题及其解决方案。首先通过CAP理论分析,Redis遵循AP原则,导致锁可能失效。针对此问题,提出两种解决方案:Zookeeper分布式锁(追求CP一致性)和Redlock算法(基于多个Redis实例提升可靠性)。文章还讨论了可能遇到的“坑”,如加从节点引发超卖问题、建议Redis节点数为奇数以及持久化策略对锁的影响。最后,从性能优化角度出发,介绍了减少锁粒度和分段锁的策略,并结合实际场景(如下单重复提交、支付与取消订单冲突)展示了分布式锁的应用方法。
425 3
|
11月前
|
消息中间件 架构师 数据库
本地消息表事务:10Wqps 高并发分布式事务的 终极方案,大厂架构师的 必备方案
45岁资深架构师尼恩分享了一篇关于分布式事务的文章,详细解析了如何在10Wqps高并发场景下实现分布式事务。文章从传统单体架构到微服务架构下分布式事务的需求背景出发,介绍了Seata这一开源分布式事务解决方案及其AT和TCC两种模式。随后,文章深入探讨了经典ebay本地消息表方案,以及如何使用RocketMQ消息队列替代数据库表来提高性能和可靠性。尼恩还分享了如何结合延迟消息进行事务数据的定时对账,确保最终一致性。最后,尼恩强调了高端面试中需要准备“高大上”的答案,并提供了多个技术领域的深度学习资料,帮助读者提升技术水平,顺利通过面试。
本地消息表事务:10Wqps 高并发分布式事务的 终极方案,大厂架构师的 必备方案
|
11月前
|
缓存 NoSQL Java
Spring Boot中的分布式缓存方案
Spring Boot提供了简便的方式来集成和使用分布式缓存。通过Redis和Memcached等缓存方案,可以显著提升应用的性能和扩展性。合理配置和优化缓存策略,可以有效避免常见的缓存问题,保证系统的稳定性和高效运行。
306 3
|
12月前
|
NoSQL 安全 PHP
hyperf-wise-locksmith,一个高效的PHP分布式锁方案
`hyperf-wise-locksmith` 是 Hyperf 框架下的互斥锁库,支持文件锁、分布式锁、红锁及协程锁,有效防止分布式环境下的竞争条件。本文介绍了其安装、特性和应用场景,如在线支付系统的余额扣减,确保操作的原子性。
172 4
|
3月前
|
存储 负载均衡 NoSQL
【赵渝强老师】Redis Cluster分布式集群
Redis Cluster是Redis的分布式存储解决方案,通过哈希槽(slot)实现数据分片,支持水平扩展,具备高可用性和负载均衡能力,适用于大规模数据场景。
250 2
|
3月前
|
存储 缓存 NoSQL
【📕分布式锁通关指南 12】源码剖析redisson如何利用Redis数据结构实现Semaphore和CountDownLatch
本文解析 Redisson 如何通过 Redis 实现分布式信号量(RSemaphore)与倒数闩(RCountDownLatch),利用 Lua 脚本与原子操作保障分布式环境下的同步控制,帮助开发者更好地理解其原理与应用。
182 6
|
4月前
|
存储 缓存 NoSQL
Redis核心数据结构与分布式锁实现详解
Redis 是高性能键值数据库,支持多种数据结构,如字符串、列表、集合、哈希、有序集合等,广泛用于缓存、消息队列和实时数据处理。本文详解其核心数据结构及分布式锁实现,帮助开发者提升系统性能与并发控制能力。
|
2月前
|
NoSQL Java 调度
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
分布式锁是分布式系统中用于同步多节点访问共享资源的机制,防止并发操作带来的冲突。本文介绍了基于Spring Boot和Redis实现分布式锁的技术方案,涵盖锁的获取与释放、Redis配置、服务调度及多实例运行等内容,通过Docker Compose搭建环境,验证了锁的有效性与互斥特性。
148 0
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)

热门文章

最新文章

下一篇
开通oss服务