对Flink流处理模型的抽象

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 对Flink流处理模型的抽象

作为目前最为高效的流处理框架之一,Flink在我们的大数据平台产品中得到了广泛运用。为了简化开发,我们对Flink做了一些封装,以满足我们自己的产品需求。


我们开发的一个基于大数据平台的数据仓库,选择了Flink作为数据处理的底层框架。我们主要看重于它在流处理的低延迟性,消息传递保证的extractly once特性;它为流处理和批处理提供了相对统一的API,支持Java、Scala和Python等主流开发语言,同时还较好地支持了SQL。Flink搭建了非常棒的基础设施,例如它可以和ZooKeeper、YARN集成起来,保证处理功能的高可用性与水平扩展的集群能力,同时还提供了相对开放的扩展能力,使得我们可以较容易地在已有功能基础之上实现定制开发。


我们基于Flink开发了自己的底层框架“海纳(haina)”,这是取“海纳百川有容乃大”之意。haina以库的形式为我们的产品提供了数据采集、治理和共享等功能,是整个平台最核心的数据处理基础设施,逻辑架构如下图所示:


image.png

抽象的流处理模型


由于我们的产品对数据的处理主要包括三个方面:采集、治理与共享,这之间流转的皆为采集器从上游系统采集获得的数据。我们结合Flink的架构,并参考了Apex、Storm、Flume等其他流处理框架,抽象出自己的流处理模型。这个模型中各个概念之间的关系与层次如下图所示:


image.png


在这个流处理模型中,一个Job对应一个实际的物理环境(Environment)。多数情况下,为了保证Job运行的独立性,可以为每个Job分配一个单独的运行节点,提供专有的运行资源。每个Job核心的逻辑概念是Flow,它由Source、Processor和Sink组成,它们都是Flink的Operator,其中Processor对应于Flink的Transformation Operator。在实时流处理中,一个典型的Processor其实就是我们常用的map、filter或flatMap函数。例如:


public class ArchiveJsonMetaProcessor implements MapFunction<String, String> {
   private String target;
   public ArchiveJsonMetaProcessor(String target) {
       this.target = target;
   }
   @Override
   public String map(String msg) {
       JSONObject message = JSONObject.parseObject(msg);
       String messageId = message.getString("messageId");
       String originTimestamp = message.getString("originalTimestamp");
       String archivedTimestamp = DateUtil.transformTime(new Date(), DateUtil.YYYYMMDDHHMMSSS);
       ArchivedMetaData archivedMetaData = ArchivedMetaData.fillArchivedMetaData(messageId, target, originTimestamp, archivedTimestamp);
       return JSON.toJSONString(archivedMetaData);
   }
}


Processor的设计原则


我们之所以要抽象出Processor概念,是因为我们遵循了管道-过滤器模式,希望每个operator都是一个最小的可以重用的逻辑单元。管道就是我们定义的Flow,Source是管道的上游入口,Sink是管道的下游出口,每个细粒度的Processor就是每个负责处理数据流的过滤器。我们的底层框架haina实现了这些逻辑单元,至于它们该如何组装,则交由框架的使用者。正因为此,我们制定了Processor的设计原则,其根本思想就是保持Processor的细粒度,严格分离与业务无关和有关的Processor,保证Processor在组成Flow时尽可能被重用。


如下为设计Processor应该遵循的原则:

  • 业务上对数据流的处理可以拆分为多个阶段,每个Processor对应一个阶段。
  • 尽可能把有副作用的和无副作用的职责分离到不同的Processor。
  • 把需要访问外部资源的职责尽可能分离到不同的Processor。
  • 尽可能确保Processor的代码短小,这样可以保证将Processor真正的职责转移到别的类,例如对象的转换逻辑。转移出去的类与Flink平台无关,有利于编写和运行单元测试。
  • 每个Processor的上游与下游,即MapFunction或其他接口对应的类型参数T与O,应尽量采用平台定义的模型对象,而非如String之类的基础类型。这样就能保证调用者对Processor进行组装时,通过编译就能检查到不必要的组装错误。
  • 每个Processor的命名采用动宾短语,并以Processor作为类的后缀。例如将一条航班信息拆分成多个机位信息,命名为SplitFlightToStandsProcessor。好的命名可以帮助我们更容易发现它,进而促进调用者对它的重用。例如在IntelliJ中,就可以直接以*Processor来搜索所有的Processor,然后根据它的命名就能推测出这个Processor到底是做什么的。
  • 每个Processor需要的外部数据,都通过Processor的构造函数来传递。
  • 每个Processor都应该实现Flink提供的transfomation接口。
  • 第一个Processor接收的是String类型的消息,则要求必须对传入的消息进行验证。用于验证的Processor应该实现FilterFunction<String>。
  • 应保证每个Processor都不要抛出出人意料不可控制的异常,否则可能导致执行Job时出现错误从而导致整个Application停止或者重启。


自定义Source与Sink


针对Source与Sink,除了重用Flink本身提供的source与sink之外,我们还开发了大量的满足自己需求的自定义Source与Sink。例如,我们为独立开发的ESB系统提供了Source,为关系型数据库和WebService提供了具有轮询能力的Source,为ElasticSearch开发了满足批量添加数据的Sink,同时还实现了具有回调能力的自定义Sink。如下就是针对Oracle编写的具有轮询能力的自定义Source:


public class OracleClobSource extends RichSourceFunction<String> {
   private static final Logger log = LoggerFactory.getLogger(OracleClobSource.class);
   private JdbcGateway jdbcGateway;
   private Long period;
   private DSLContext executor;
   public OracleClobSource(JdbcGateway jdbcGateway, Long period) {
       this.jdbcGateway = jdbcGateway;
       this.period = period;
   }
   @Override
   public void open(Configuration parameters) throws Exception {
       executor = jdbcGateway.executor();
   }
   @Override
   public void run(SourceContext<String> ctx) throws Exception {
       while (true) {
           Result<Record> results = executor.select().from(table(tableName)).where(rownum().le(10)).fetch();
           for (Record record : results) {
               String msgcontext = record.get(field(filedName), String.class);
               ctx.collect(msgcontext);
               Long id = record.get(field("ID"), Long.class);
               executor.delete(table(tableName)).where(field("ID").equal(id)).execute();
           }
           Thread.sleep(period);
       }
   }
   @Override
   public void cancel() {
       executor.close();
   }
}


为了便于使用,我们还为这些内置与定制source和sink分别定义了静态工厂。


Flow与Job


Flow相当于是传递DataStream的拓扑图,由Source、Processor和Sink组成。我们之所以引入这个概念,一方面是为Job提供更粗粒度的重用单元,另一方面也承担了封装业务流程的主要职责。例如,一个航班数据从上游系统进入我们的大数据平台,需要进行多次数据格式的转换、验证与治理,我们就可以定义一个FlightFlow来完成这些细小职责的组装:


public class FlightFlow extends AbstractFlow {
   public FlightFlow(Environment env, Config config) {
       super(env, config);
   }
   private static final String FLIGHT = "flight";
   @Override
   public void run() {
       DataStream<String> source = env.addSource(sourcesFactory.createSslKafkaSource("INBOUND"));
       SingleOutputStreamOperator flightStream = source
               .filter(new FilterFlightProcessor())
               .map(new TransformXmlToJsonProcessor())
               .map(new TransformJsonStringToJsonObjectProcessor())
               .map(new TransformJsonMessageToFlightsProcessor())
               .map(new FlattenDomainModelProcessor());
       flightStream
               .map(new DeletedDiscusedRecordsProcessor(DETAIL, FLIGHT))
               .map(new DeleteFullModelProcessor(DETAIL, FLIGHT))
               .map(new InsertFullModelProcessor(DETAIL, FLIGHT))
               .map(new UpdateChangedRecordsProcessor(DETAIL, FLIGHT))
               .map(new TransformModelToEventProcessor())
               .map(new TransformObjectToJsonProcessor())
               .addSink(sinksFactory.createSslKafkaSink("trigger"));
   }
}


Job与Flow之间的关系是一对多关系。这种关系可以根据资源情况与业务需求的不同随时调整。因而我们引入配置方式来保证这种灵活性。Job是一个容器,通过它可以传入Flink Job的执行环境,然后在配置文件中配置Job与Flow之间的关系。如下配置文件就是为数据探针任务配置的Job,它包含了三个完全不同的Flow,运行同一个Flink集群上:


<job name="AirportToKafkaJob">
   <flow name="FlightToKafkaFlow" flowClassName="haina.airprobe.flow.FlightToKafkaFlow"/>
   <flow name="PassengerToKafkaFlow" flowClassName="haina.airprobe.flow.PassengerToKafkaFlow"/>
   <flow name="AcdmToKafkaFlow" flowClassName="haina.airprobe.flow.AcdmToKafkaFlow"/>
</job>


内核与外部应用


haina在针对flink的流处理模型进行了抽象和扩展开发后,就形成了围绕flink为核心的逻辑架构。如下图所示:


image.png


flink是haina的核心,提供了基本的运算、运行和部署的能力,而haina则根据我们产品的需求对flink进行扩展,并遵循前面提及的抽象流处理模型提供各个可以被重用的细粒度组成单元,并实现了通用的组成逻辑,简化了开发工作量。图中所示的air-probe、air-jobs等模块满足不同的数据处理功能。这些模块都是建立在haina之上薄薄的一层应用,只需要创建满足业务流程处理的Flow,并配置Flow与Job的关系即可。


同时,我们还自行开发了一套配置框架,可以简化整个大数据平台要使用到的外部资源,包括YARN、Flink、Kafka、ElasticSearch、RabbitMQ、ZooKeeper等,并在AbstractJob中完成了Flink执行环境与具体Job之间的绑定以及对外部环境的使用。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
6月前
|
SQL Kubernetes 调度
Flink 流批一体在模型特征场景的使用
本文整理自B站资深开发工程师张杨老师在 Flink Forward Asia 2023 中 AI 特征工程专场中的分享。
77713 5
Flink 流批一体在模型特征场景的使用
|
6月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
775 5
|
6月前
|
SQL 算法 API
读Flink源码谈设计:图的抽象与分层
前阵子组里的小伙伴问我“为什么Flink从我们的代码到真正可执行的状态,要经过这么多个graph转换?这样做有什么好处嘛?”我早期看到这里的设计时的确有过相同的疑惑,当时由于手里还在看别的东西,查阅过一些资料后就翻页了。如今又碰到了这样的问题,不妨就在这篇文章中好好搞清楚。
557 0
读Flink源码谈设计:图的抽象与分层
|
Java Linux API
flink入门-流处理
flink入门-流处理
152 0
|
1月前
|
存储 分布式计算 API
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
82 0
|
5月前
|
监控 大数据 Java
使用Apache Flink进行大数据实时流处理
Apache Flink是开源流处理框架,擅长低延迟、高吞吐量实时数据流处理。本文深入解析Flink的核心概念、架构(包括客户端、作业管理器、任务管理器和数据源/接收器)和事件时间、窗口、状态管理等特性。通过实战代码展示Flink在词频统计中的应用,讨论其实战挑战与优化。Flink作为大数据处理的关键组件,将持续影响实时处理领域。
850 5
|
3月前
|
Java Spring 安全
Spring 框架邂逅 OAuth2:解锁现代应用安全认证的秘密武器,你准备好迎接变革了吗?
【8月更文挑战第31天】现代化应用的安全性至关重要,OAuth2 作为实现认证和授权的标准协议之一,被广泛采用。Spring 框架通过 Spring Security 提供了强大的 OAuth2 支持,简化了集成过程。本文将通过问答形式详细介绍如何在 Spring 应用中集成 OAuth2,包括 OAuth2 的基本概念、集成步骤及资源服务器保护方法。首先,需要在项目中添加 `spring-security-oauth2-client` 和 `spring-security-oauth2-resource-server` 依赖。
52 0
|
3月前
|
消息中间件 数据挖掘 Kafka
揭秘大数据时代的极速王者!Flink:颠覆性流处理引擎,让实时数据分析燃爆你的想象力!
【8月更文挑战第29天】Apache Flink 是一个高性能的分布式流处理框架,适用于高吞吐量和低延迟的实时数据处理。它采用统一执行引擎处理有界和无界数据流,具备精确状态管理和灵活窗口操作等特性。Flink 支持毫秒级处理和广泛生态集成,但学习曲线较陡峭,社区相对较小。通过实时日志分析示例,我们展示了如何利用 Flink 从 Kafka 中读取数据并进行词频统计,体现了其强大功能和灵活性。
71 0
|
3月前
|
监控 搜索推荐 数据挖掘
Flink流处理与批处理大揭秘:实时与离线,一文让你彻底解锁!
【8月更文挑战第24天】Apache Flink 是一款开源框架,擅长流处理与批处理。流处理专攻实时数据流,支持无限数据流及事件驱动应用,实现数据的连续输入与实时处理。批处理则聚焦于静态数据集,进行一次性处理。两者差异体现在处理方式与应用场景:流处理适合实时性要求高的场景(例如实时监控),而批处理更适用于离线数据分析任务(如数据挖掘)。通过提供的示例代码,读者可以直观理解两种模式的不同之处及其实际应用。
198 0
|
3月前
|
消息中间件 大数据 Kafka
Apache Flink 大揭秘:征服大数据实时流处理的神奇魔法,等你来解锁!
【8月更文挑战第5天】Apache Flink 是一款强大的开源大数据处理框架,专长于实时流处理。本教程通过两个示例引导你入门:一是计算数据流中元素的平均值;二是从 Kafka 中读取数据并实时处理。首先确保已安装配置好 Flink 和 Kafka 环境。第一个 Java 示例展示了如何创建流执行环境,生成数据流,利用 `flatMap` 转换数据,并使用 `keyBy` 和 `sum` 计算平均值。第二个示例则演示了如何设置 Kafka 消费者属性,并从 Kafka 主题读取数据。这两个示例为你提供了使用 Flink 进行实时流处理的基础。随着进一步学习,你将能应对更复杂的实时数据挑战。
71 0