对Flink流处理模型的抽象

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 对Flink流处理模型的抽象

作为目前最为高效的流处理框架之一,Flink在我们的大数据平台产品中得到了广泛运用。为了简化开发,我们对Flink做了一些封装,以满足我们自己的产品需求。


我们开发的一个基于大数据平台的数据仓库,选择了Flink作为数据处理的底层框架。我们主要看重于它在流处理的低延迟性,消息传递保证的extractly once特性;它为流处理和批处理提供了相对统一的API,支持Java、Scala和Python等主流开发语言,同时还较好地支持了SQL。Flink搭建了非常棒的基础设施,例如它可以和ZooKeeper、YARN集成起来,保证处理功能的高可用性与水平扩展的集群能力,同时还提供了相对开放的扩展能力,使得我们可以较容易地在已有功能基础之上实现定制开发。


我们基于Flink开发了自己的底层框架“海纳(haina)”,这是取“海纳百川有容乃大”之意。haina以库的形式为我们的产品提供了数据采集、治理和共享等功能,是整个平台最核心的数据处理基础设施,逻辑架构如下图所示:


image.png

抽象的流处理模型


由于我们的产品对数据的处理主要包括三个方面:采集、治理与共享,这之间流转的皆为采集器从上游系统采集获得的数据。我们结合Flink的架构,并参考了Apex、Storm、Flume等其他流处理框架,抽象出自己的流处理模型。这个模型中各个概念之间的关系与层次如下图所示:


image.png


在这个流处理模型中,一个Job对应一个实际的物理环境(Environment)。多数情况下,为了保证Job运行的独立性,可以为每个Job分配一个单独的运行节点,提供专有的运行资源。每个Job核心的逻辑概念是Flow,它由Source、Processor和Sink组成,它们都是Flink的Operator,其中Processor对应于Flink的Transformation Operator。在实时流处理中,一个典型的Processor其实就是我们常用的map、filter或flatMap函数。例如:


public class ArchiveJsonMetaProcessor implements MapFunction<String, String> {
   private String target;
   public ArchiveJsonMetaProcessor(String target) {
       this.target = target;
   }
   @Override
   public String map(String msg) {
       JSONObject message = JSONObject.parseObject(msg);
       String messageId = message.getString("messageId");
       String originTimestamp = message.getString("originalTimestamp");
       String archivedTimestamp = DateUtil.transformTime(new Date(), DateUtil.YYYYMMDDHHMMSSS);
       ArchivedMetaData archivedMetaData = ArchivedMetaData.fillArchivedMetaData(messageId, target, originTimestamp, archivedTimestamp);
       return JSON.toJSONString(archivedMetaData);
   }
}


Processor的设计原则


我们之所以要抽象出Processor概念,是因为我们遵循了管道-过滤器模式,希望每个operator都是一个最小的可以重用的逻辑单元。管道就是我们定义的Flow,Source是管道的上游入口,Sink是管道的下游出口,每个细粒度的Processor就是每个负责处理数据流的过滤器。我们的底层框架haina实现了这些逻辑单元,至于它们该如何组装,则交由框架的使用者。正因为此,我们制定了Processor的设计原则,其根本思想就是保持Processor的细粒度,严格分离与业务无关和有关的Processor,保证Processor在组成Flow时尽可能被重用。


如下为设计Processor应该遵循的原则:

  • 业务上对数据流的处理可以拆分为多个阶段,每个Processor对应一个阶段。
  • 尽可能把有副作用的和无副作用的职责分离到不同的Processor。
  • 把需要访问外部资源的职责尽可能分离到不同的Processor。
  • 尽可能确保Processor的代码短小,这样可以保证将Processor真正的职责转移到别的类,例如对象的转换逻辑。转移出去的类与Flink平台无关,有利于编写和运行单元测试。
  • 每个Processor的上游与下游,即MapFunction或其他接口对应的类型参数T与O,应尽量采用平台定义的模型对象,而非如String之类的基础类型。这样就能保证调用者对Processor进行组装时,通过编译就能检查到不必要的组装错误。
  • 每个Processor的命名采用动宾短语,并以Processor作为类的后缀。例如将一条航班信息拆分成多个机位信息,命名为SplitFlightToStandsProcessor。好的命名可以帮助我们更容易发现它,进而促进调用者对它的重用。例如在IntelliJ中,就可以直接以*Processor来搜索所有的Processor,然后根据它的命名就能推测出这个Processor到底是做什么的。
  • 每个Processor需要的外部数据,都通过Processor的构造函数来传递。
  • 每个Processor都应该实现Flink提供的transfomation接口。
  • 第一个Processor接收的是String类型的消息,则要求必须对传入的消息进行验证。用于验证的Processor应该实现FilterFunction<String>。
  • 应保证每个Processor都不要抛出出人意料不可控制的异常,否则可能导致执行Job时出现错误从而导致整个Application停止或者重启。


自定义Source与Sink


针对Source与Sink,除了重用Flink本身提供的source与sink之外,我们还开发了大量的满足自己需求的自定义Source与Sink。例如,我们为独立开发的ESB系统提供了Source,为关系型数据库和WebService提供了具有轮询能力的Source,为ElasticSearch开发了满足批量添加数据的Sink,同时还实现了具有回调能力的自定义Sink。如下就是针对Oracle编写的具有轮询能力的自定义Source:


public class OracleClobSource extends RichSourceFunction<String> {
   private static final Logger log = LoggerFactory.getLogger(OracleClobSource.class);
   private JdbcGateway jdbcGateway;
   private Long period;
   private DSLContext executor;
   public OracleClobSource(JdbcGateway jdbcGateway, Long period) {
       this.jdbcGateway = jdbcGateway;
       this.period = period;
   }
   @Override
   public void open(Configuration parameters) throws Exception {
       executor = jdbcGateway.executor();
   }
   @Override
   public void run(SourceContext<String> ctx) throws Exception {
       while (true) {
           Result<Record> results = executor.select().from(table(tableName)).where(rownum().le(10)).fetch();
           for (Record record : results) {
               String msgcontext = record.get(field(filedName), String.class);
               ctx.collect(msgcontext);
               Long id = record.get(field("ID"), Long.class);
               executor.delete(table(tableName)).where(field("ID").equal(id)).execute();
           }
           Thread.sleep(period);
       }
   }
   @Override
   public void cancel() {
       executor.close();
   }
}


为了便于使用,我们还为这些内置与定制source和sink分别定义了静态工厂。


Flow与Job


Flow相当于是传递DataStream的拓扑图,由Source、Processor和Sink组成。我们之所以引入这个概念,一方面是为Job提供更粗粒度的重用单元,另一方面也承担了封装业务流程的主要职责。例如,一个航班数据从上游系统进入我们的大数据平台,需要进行多次数据格式的转换、验证与治理,我们就可以定义一个FlightFlow来完成这些细小职责的组装:


public class FlightFlow extends AbstractFlow {
   public FlightFlow(Environment env, Config config) {
       super(env, config);
   }
   private static final String FLIGHT = "flight";
   @Override
   public void run() {
       DataStream<String> source = env.addSource(sourcesFactory.createSslKafkaSource("INBOUND"));
       SingleOutputStreamOperator flightStream = source
               .filter(new FilterFlightProcessor())
               .map(new TransformXmlToJsonProcessor())
               .map(new TransformJsonStringToJsonObjectProcessor())
               .map(new TransformJsonMessageToFlightsProcessor())
               .map(new FlattenDomainModelProcessor());
       flightStream
               .map(new DeletedDiscusedRecordsProcessor(DETAIL, FLIGHT))
               .map(new DeleteFullModelProcessor(DETAIL, FLIGHT))
               .map(new InsertFullModelProcessor(DETAIL, FLIGHT))
               .map(new UpdateChangedRecordsProcessor(DETAIL, FLIGHT))
               .map(new TransformModelToEventProcessor())
               .map(new TransformObjectToJsonProcessor())
               .addSink(sinksFactory.createSslKafkaSink("trigger"));
   }
}


Job与Flow之间的关系是一对多关系。这种关系可以根据资源情况与业务需求的不同随时调整。因而我们引入配置方式来保证这种灵活性。Job是一个容器,通过它可以传入Flink Job的执行环境,然后在配置文件中配置Job与Flow之间的关系。如下配置文件就是为数据探针任务配置的Job,它包含了三个完全不同的Flow,运行同一个Flink集群上:


<job name="AirportToKafkaJob">
   <flow name="FlightToKafkaFlow" flowClassName="haina.airprobe.flow.FlightToKafkaFlow"/>
   <flow name="PassengerToKafkaFlow" flowClassName="haina.airprobe.flow.PassengerToKafkaFlow"/>
   <flow name="AcdmToKafkaFlow" flowClassName="haina.airprobe.flow.AcdmToKafkaFlow"/>
</job>


内核与外部应用


haina在针对flink的流处理模型进行了抽象和扩展开发后,就形成了围绕flink为核心的逻辑架构。如下图所示:


image.png


flink是haina的核心,提供了基本的运算、运行和部署的能力,而haina则根据我们产品的需求对flink进行扩展,并遵循前面提及的抽象流处理模型提供各个可以被重用的细粒度组成单元,并实现了通用的组成逻辑,简化了开发工作量。图中所示的air-probe、air-jobs等模块满足不同的数据处理功能。这些模块都是建立在haina之上薄薄的一层应用,只需要创建满足业务流程处理的Flow,并配置Flow与Job的关系即可。


同时,我们还自行开发了一套配置框架,可以简化整个大数据平台要使用到的外部资源,包括YARN、Flink、Kafka、ElasticSearch、RabbitMQ、ZooKeeper等,并在AbstractJob中完成了Flink执行环境与具体Job之间的绑定以及对外部环境的使用。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
482 5
|
4月前
|
SQL 算法 API
读Flink源码谈设计:图的抽象与分层
前阵子组里的小伙伴问我“为什么Flink从我们的代码到真正可执行的状态,要经过这么多个graph转换?这样做有什么好处嘛?”我早期看到这里的设计时的确有过相同的疑惑,当时由于手里还在看别的东西,查阅过一些资料后就翻页了。如今又碰到了这样的问题,不妨就在这篇文章中好好搞清楚。
538 0
读Flink源码谈设计:图的抽象与分层
|
5月前
|
Java Linux API
flink入门-流处理
flink入门-流处理
110 0
|
6月前
|
分布式计算 资源调度 监控
没有监控的流处理作业与茫茫大海中的裸泳无异 - 附 flink 与 spark 作业监控脚本实现
没有监控的流处理作业与茫茫大海中的裸泳无异 - 附 flink 与 spark 作业监控脚本实现
|
11天前
|
机器学习/深度学习 分布式计算 BI
Flink实时流处理框架原理与应用:面试经验与必备知识点解析
【4月更文挑战第9天】本文详尽探讨了Flink实时流处理框架的原理,包括运行时架构、数据流模型、状态管理和容错机制、资源调度与优化以及与外部系统的集成。此外,还介绍了Flink在实时数据管道、分析、数仓与BI、机器学习等领域的应用实践。同时,文章提供了面试经验与常见问题解析,如Flink与其他系统的对比、实际项目挑战及解决方案,并展望了Flink的未来发展趋势。附带Java DataStream API代码样例,为学习和面试准备提供了实用素材。
33 0
|
2月前
|
运维 监控 数据处理
【天衍系列 03】深入理解Flink的Watermark:实时流处理的时间概念与乱序处理
【天衍系列 03】深入理解Flink的Watermark:实时流处理的时间概念与乱序处理
|
3月前
|
存储 数据挖掘 Apache
【Flink】Flink 有状态的流处理
【1月更文挑战第26天】【Flink】Flink 有状态的流处理
|
3月前
|
关系型数据库 数据处理 流计算
【Flink】Flink 流处理和批处理
【1月更文挑战第26天】【Flink】Flink 流处理和批处理
|
3月前
|
机器学习/深度学习 算法 物联网
实时计算Flink版:引领流处理的新时代
实时计算Flink版:引领流处理的新时代
|
4月前
|
流计算
Flink CDC-sql怎样导数据使starrocks支持主键模型delete的配置吗?目前只能更新和插入,但是删除不行
Flink CDC-sql怎样导数据使starrocks支持主键模型delete的配置吗?目前只能更新和插入,但是删除不行
96 1