最近分布式系统开发小结: Slave模块Executors设计

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:

更新一段我在linkedin上对这个项目的描述,目前项目已经开发完在使用了。本文并不是最新的设计。

背景
解决HDFS/Hive/RDBMS/FTP/MongoDB等数据源之间的批量数据同步问题


特性
跨机房场景下的链路优化;多路输入和输出的任务模型;数据容错和可持久化;任务失败恢复


任务调度
把任务配置解析为物理执行计划,Master控制任务的调度和失败恢复,基于Mesos完成资源分配和任务调度。Slave分布在各个数据中心,具体传输任务的调起做到链路优化选择。高并发场景下,增加Mesos Slave节点来保证可扩展性(CPU和MEM资源),Master将元数据记录在ZK上,并通过争抢ZK锁实现互备。


数据传输
传输组件分为Input、Cache和Output三种Executor,各自进程内通过双队列优化传输速度。数据以bundle为单位传输,通常上百行为一个bundle且可压缩,Netty作网络通信。Input端异步备份一份数据在BookKeeper,Cache使用Beanstalkd做消息队列,Output端处理Bundle成功或失败,会有守护线程异步删除或更新beanstalkd内的Message(类似Storm Topology里的Ack),Executors会把bundle传输状态更新在ZookKeeper上,某一Executor挂掉都可以在一台slave上重新调起并恢复任务继续进行。Input和Output端的Reader和Writer是插件化的。


====================================  我是更新线 ====================================


之前在最近分布式系统开发小结里,提到了一个在开发中的系统的大致设计,本文是我负责部分的一个详细设计。在阅读本文前可以先浏览下之前那篇文章,对于系统的功能和概况有个基本了解。


1. Slave总体设计


Slave模块主要需要实现不同的Mesos Executors,包括Input, MemoryStorage和Output三种Executor。每个Dpump任务会由Scheduler Manager经过逻辑执行计划和物理执行计划的拆分,从Knowledge Center获取知识,最终将切分后的Task分配给相应的Slaves执行,并通过Mesos Master,分配资源并调起Slave上的各自的Executor。三种Executors的执行逻辑图如下。


数据通过Bundle形式在三种Executor之间的流通,每个Bundle有唯一ID、一个String[]、以及一个Index。Index用于标记每个Bundle最后数据输出的最新成功行,即我们容错粒度控制在行级别。对Input、Cache、Output作一个简单介绍:
  •  Input,也叫Reader。每个Task内只有一个Input Executor,负责从数据源(HDFS、FTP、MySQL、MongoDB等)读出数据,将数据经过切分、处理、压缩后通过Netty流式传输给MemoryStorage。

  • Cache,也叫MemoryStorage。每个Task内只有一个Cache Executor,负责从Input端接收Bundle,将Bundle存取往一个队列内,当有Output连接的时候,将Bundle取出输送给Output

  • Output,也叫Writer。每个Task可能有多个Output Executor,负责将数据最终输出到数据目的源。Output从Cache端得到Bundle的过程也是流式的。

整个Task的流通都是流式的,且Slave之间的网络通信使用的是Netty这个NIO框架,传输过程中还涉及到Bundle高效的正反序列化和压缩、解压缩。最重要的一点是Input、Cache、Output三个部分各自都有容错设计,其中Input和Output通过向Zookeeper记录和获取Bundle状态保证处理Bundle的不重不漏,而Cache通过对队列内消息内容的钝化,保证自身已保存的Bundle不丢失,并能在新的Cache Executor起来后,可以继续为Output提供Bundle输出。


2. Slave 详细设计

下面详细介绍三种Executor的设计,阅读过程中请参考这张Task进程图。



2.1 Input设计


2.1.1 数据流通

每个Input负责一次Job(每个Job对应多个Tasks)内最小粒度的文件块读取,比如可能是一个HDFS Block,一张Hive表的一个分区甚至是一张MySQL表。

Input内还分有Writer、Buffer(双队列)和Reader。Writer是一个单线程,从数据源获取数据并切分好Bundle,每个Bundle有唯一ID和定长的字符串数组,然后将Bundle存入双队列的输入头,在双队列的读出头有若干个Reader线程抢占Bundle,每个Reader获取到Bundle后释放锁并做二次处理、压缩,最终Reader通过Netty Client将Bundle包装成一个传输格式,以二进制流的方式通过Channel流向Cache。


2.1.2 容错

Writer端切分Bundle保证了从同一个数据源的同份文件块读取数据生成Bundle是有序的,每次Netty往Channel里写入一份Bundle的时候,会通过Companion线程异步更新此Task下znode内的Bitmap,该Bitmap标记每个Bundle在Input端是否被传输。每次Input启动的时候,Netty会读取znode上的Bitmap缓存在内存里,发送Bundle前根据id作一次校对。所以当Input挂掉或重启时,可以保证发送给Cache的Bundle不重不漏。

2.2 Cache设计


2.2.1 消息队列

Cache本身是一个Netty Server,接收Input和Output多个Netty Client的连接,并对不同的Channel做不同Event处理。Cache Executor需要一个多状态的消息队列,这里采用的是Beanstalkd队列,下图为该Beanstalkd内消息(job)的状态变化图。



 每次Cache将新的Bundle put进Beanstalkd的时候需要选择一个tube(管道),Beanstalkd可以开启多个独立的tube,tube内存放jobs,每个job有自己唯一的job id,而job消息体就是我们的bundleBytes(Bundle存入Cache直接存的就是序列化后的byte[])。

每个job存入queue后是ready状态,被reserve之后,就不能被客户端再次获取到,即Cache每次会从每个tube里按顺序reserve一个job,并发送消息体给Output(一个output对应一个tube),这个过程保证每个job被消费一次,且只能被一个Output消费。如果Output端消费成功,则该job会被delete掉;如果该job消费失败,则会被重新置为ready,重新置为ready可能是因为超时(每个job被reserve的时候都有一个Time-To-Run时间设置)了,也可以是客户端release掉该job。

2.2.2 Acker设计

这里,对于tube内job的后续处理交给Acker这个线程来做。Acker的设计灵感来源于Storm。Storm Topology内每个bolt对tuple的执行和处理最终都会给Spout一个ack响应,而拓扑过程中整棵Tuple树的成功/失败执行状态会由Acker守护进程进行跟踪,以此来保证每个tuple被完全处理,而acker对tuple的跟踪算法是Storm的主要突破之一。

Cache端的Acker线程会监听zookeeper上znode树上各个节点的事件变化,从而掌握被Output消费的所有Bundle的最后状态,对应地删除、释放,或者更新Queue里的job。需要注意的是这里还涉及到一个更新job的过程。前面提到Bundle内维护了一个index,而Output消费bundle的时候,如果是数据行写了一半出现了异常或者挂掉了,我们需要记录bundle内数据行的最新index并将此信息也记录在znode上。对于这种最坏情况,Acker负责将该fail的job从queue里delete掉,并更改job内bundle bytes内容,重置新的index,再把新的job put进queue里。这是我们最不希望看到的情况,同时也是我们对Bundle能做的最细粒度的容错设计。

2.2.3 容错

Beanstalkd启动之后可以打开binlog开关,binlog是Beanstalkd容错恢复的机制,将内存里的消息队列结构映射到硬盘上。对于Cache的容错设计,直观的办法在于将这份binlog存在NFS或HDFS上,来保证Cache挂掉重启后,能获取到之前保存的Bundle数据,继续提供服务。

2.3 Output设计


Output在最终的Bundle消费阶段,会把数据导向新的数据源。每个Output获取的Bundle来自于Cache里的一个tube,而每个Bundle的执行情况也会由Companion线程异步更新到Zookeeper上。

 对于Output来说,它只需要关心从Cache端获取的每个Bundle都照常处理就可以了,不需要关心这个Bundle之前是否被消费过,被消费到哪里。原因在于,Cache端的job状态的变更和job的更新可以由Acker保障,而Acker是从zk上得到这些job的状态并对Queue异步更新。如果Acker挂了,只要重新起一个线程获取znode上最新的状态就可以了。对于Output来说,能传过来的Bundle,对应到queue里就是ready状态的job,这个job可能被消费过了,但是他的index也因此得到了更新,Output端对于所有Bundle的处理是一致的,唯一需要关心的是Output需要把Bundle的信息异步更新给zk,如果Output挂了,重新起一个Output接着从Cache读Bundle就可以了。


3. Slave模块总结

Slave模块三种Executor的设计,主要考虑的是各个Executor挂掉之后,怎样保证数据处理的不重复和不遗漏。我们依赖Zookeeper的可靠性,记录、更新、判断Bundle的状态,做到Input、Cache、Output各司其职,最到最小粒度的容错。Executor本身的失败和重启则由Mesos保障,Mesos作为资源管理系统,由Master监控Slave上各个Executor的执行状况,通过回调,可以在合适的Slave上再次启动挂掉的Executor进程,保证业务Task的顺利进行。



(全文完)


相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
7月前
|
消息中间件 NoSQL Java
Java高级开发:高并发+分布式+高性能+Spring全家桶+性能优化
Java高架构师、分布式架构、高可扩展、高性能、高并发、性能优化、Spring boot、Redis、ActiveMQ、Nginx、Mycat、Netty、Jvm大型分布式项目实战学习架构师之路
|
7月前
|
NoSQL Java Redis
【分布式技术专题】「分布式技术架构」手把手教你如何开发一个属于自己的分布式锁的功能组件(一)
【分布式技术专题】「分布式技术架构」手把手教你如何开发一个属于自己的分布式锁的功能组件
91 0
|
2月前
|
NoSQL Java Redis
开发实战:使用Redisson实现分布式延时消息,订单30分钟关闭的另外一种实现!
本文详细介绍了 Redisson 延迟队列(DelayedQueue)的实现原理,包括基本使用、内部数据结构、基本流程、发送和获取延时消息以及初始化延时队列等内容。文章通过代码示例和流程图,逐步解析了延迟消息的发送、接收及处理机制,帮助读者深入了解 Redisson 延迟队列的工作原理。
|
7月前
|
NoSQL Java Redis
【分布式技术专题】「分布式技术架构」手把手教你如何开发一个属于自己的分布式锁的功能组件(二)
【分布式技术专题】「分布式技术架构」手把手教你如何开发一个属于自己的分布式锁的功能组件
46 0
|
4月前
|
开发者 云计算 数据库
从桌面跃升至云端的华丽转身:深入解析如何运用WinForms与Azure的强大组合,解锁传统应用向现代化分布式系统演变的秘密,实现性能与安全性的双重飞跃——你不可不知的开发新模式
【8月更文挑战第31天】在数字化转型浪潮中,传统桌面应用面临新挑战。本文探讨如何融合Windows Forms(WinForms)与Microsoft Azure,助力应用向云端转型。通过Azure的虚拟机、容器及无服务器计算,可轻松解决性能瓶颈,满足全球用户需求。文中还提供了连接Azure数据库的示例代码,并介绍了集成Azure Storage和Functions的方法。尽管存在安全性、网络延迟及成本等问题,但合理设计架构可有效应对,帮助开发者构建高效可靠的现代应用。
33 0
|
7月前
|
分布式计算 负载均衡 Java
构建高可用性Java应用:介绍分布式系统设计与开发
构建高可用性Java应用:介绍分布式系统设计与开发
76 0
|
7月前
|
缓存 监控 负载均衡
【分布式技术专题】「缓存解决方案」一文带领你好好认识一下企业级别的缓存技术解决方案的运作原理和开发实战(数据缓存不一致分析)
【分布式技术专题】「缓存解决方案」一文带领你好好认识一下企业级别的缓存技术解决方案的运作原理和开发实战(数据缓存不一致分析)
132 2
|
7月前
|
存储 负载均衡 NoSQL
【分布式技术架构】「Tomcat技术专题」 探索Tomcat集群架构原理和开发分析指南
【分布式技术架构】「Tomcat技术专题」 探索Tomcat集群架构原理和开发分析指南
146 1
|
7月前
|
缓存 应用服务中间件 数据库
【分布式技术专题】「缓存解决方案」一文带领你好好认识一下企业级别的缓存技术解决方案的运作原理和开发实战(多级缓存设计分析)
【分布式技术专题】「缓存解决方案」一文带领你好好认识一下企业级别的缓存技术解决方案的运作原理和开发实战(多级缓存设计分析)
162 1
|
7月前
|
存储 缓存 安全
【分布式技术专题】「缓存解决方案」一文带领你好好认识一下企业级别的缓存技术解决方案的运作原理和开发实战(存穿透、缓存击穿和缓存雪崩)
【分布式技术专题】「缓存解决方案」一文带领你好好认识一下企业级别的缓存技术解决方案的运作原理和开发实战(存穿透、缓存击穿和缓存雪崩)
114 1