Flink中异步AsyncIO的实现 (源码分析)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink中异步AsyncIO的实现 (源码分析)

先上张图整体了解Flink中的异步io

阿里贡献给flink的,优点就不说了嘛,官网上都有,就是写库不会柱塞性能更好

然后来看一下, Flink 中异步io主要分为两种

  一种是有序Ordered

  一种是无序UNordered

主要区别是往下游output的顺序(注意这里顺序不是写库的顺序既然都异步了写库的顺序自然是无法保证的),有序的会按接收的顺序继续往下游output发送,无序就是谁先处理完谁就先往下游发送

两张图了解这两种模式的实现

有序:record数据会通过异步线程写库,Emitter是一个守护进程,会不停的拉取queue头部的数据,如果头部的数据异步写库完成,Emitter将头数据往下游发送,如果头元素还没有异步写库完成,柱塞    

无序:record数据会通过异步线程写库,这里有两个queue,一开始放在uncompleteedQueue,当哪个record异步写库成功后就直接放到completedQueue中,Emitter是一个守护进程,completedQueue只要有数据,会不停的拉取queue数据往下游发送

可以看到原理还是很简单的,两句话就总结完了,就是利用queue和java的异步线程,现在来看下源码

这里AsyncIO在Flink中被设计成operator中的一种,自然去OneInputStreamOperator的实现类中去找

于是来看一下AsyncWaitOperator.java

看到它的open方法(open方法会在taskmanager启动job的时候全部统一调用,可以翻一下以前的文章)

这里启动了一个守护线程Emitter,来看下线程具体做了什么

1处拉取数据,2处就是常规的将拉取到的数据往下游emit,Emitter拉取数据,这里先不讲因为分为有序的和无序的

这里已经知道了这个Emitter的作用是循环的拉取数据往下游发送

回到AsyncWaitOperator.java在它的open方法初始化了Emitter,那它是如何处理接收到的数据的呢,看它的ProcessElement()方法

其实主要就是三个个方法

先是!!!将record封装成了一个包装类StreamRecordQueueEntry,主要是这个包装类的构造方法中,创建了一个CompleteableFuture(这个的complete方法其实会等到用户代码执行的时候用户自己决定什么时候完成)

1处主要就是讲元素加入到了对应的queue,这里也分为两种有序和无序的

这里也先不讲这两种模式加入数据的区别

接着2处就是调用用户的代码了,来看看官网的异步io的例子

给了一个Future作为参数,用户自己起了一个线程(这里思考一下就知道了为什么要新起一个异步线程去执行,因为如果不起线程的话,那processElement方法就柱塞了,无法异步了)去写库读库等,然后调用了这个参数的complete方法(也就是前面那个包装类中的CompleteableFuture)并且传入了一个结果

看下complete方法源码

这个resultFuture是每个record的包装类StreamRecordQueueEntry的其中一个属性是一个CompletableFuture

那现在就清楚了,用户代码在自己新起的线程中当自己的逻辑执行完以后会使这个异步线程结束,并输入一个结果

那这个干嘛用的呢

最开始的图中看到有序和无序实现原理,有序用一个queue,无序用两个queue分别就对应了

OrderedStreamElementQueue类中

UnorderedStreamElementQueue类中

回到前面有两个地方没有细讲,一是两种模式的Emitter是如何拉取数据的,二是两种模式下数据是如何加入OrderedStreamElementQueue的

有序模式:


1.先来看一下有序模式的,Emitter的数据拉取,和数据的加入

    其tryPut()方法

    onComplete方法

      onCompleteHandler方法

  这里比较绕,先将接收的数据加入queue中,然后onComplete()中当上一个异步线程getFuture() 其实就是每个元素包装类里面的那个CompletableFuture,当他结束时(会在用户方法用户调用complete时结束)异步调用传入的对象的 accept方法,accept方法中调用了onCompleteHandler()方法,onCompleteHandler方法中会判断queue是否为空,以及queue的头元素是否完成了用户的异步方法,当完成的时候,就会将headIsCompleted这个对象signalAll()唤醒


2.接着看有序模式Emitter的拉取数据

  这里有序方式拉取数据的逻辑很清晰,如果为空或者头元素没有完成用户的异步方法,headIsCompleted这个对象会wait住(上面可以知道,当加入元素的到queue且头元素完成异步方法的时候会signalAll())然后将头数据返回,往下游发送

这样就实现了有序发送,因为Emitter只拉取头元素且已经完成用户异步方法的头元素

无序模式:

  这里和有序模式就大同小异了,只是变成了,接收数据后直接加入uncompletedQueue,当数据完成异步方法的时候就,放到completedQueue里面去并signalAll(),只要completedqueue里面有数据,Emitter就拉取往下发

这样就实现了无序模式,也就是异步写入谁先处理完就直接放到完成队列里面去,然后往下发,不用管接收数据的顺序

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
29天前
|
SQL 消息中间件 关系型数据库
实时计算 Flink版产品使用问题之MySQL当维表如何开启异步操作
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之实现异步删除操作如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
监控 分布式数据库 流计算
Flink 异步IO优化任务
Flink 异步IO优化任务
57 0
|
8月前
|
SQL API 流计算
Flink SQL代码补全提示(源码分析)
Flink SQL代码补全提示(源码分析)
58 0
|
8月前
|
SQL 前端开发 API
Flink教程(22)- Flink高级特性(异步IO)
Flink教程(22)- Flink高级特性(异步IO)
179 0
|
10月前
|
存储 大数据 API
大数据Flink异步IO
大数据Flink异步IO
76 0
|
10月前
|
存储 前端开发 Java
Flink 异步 I/O 解析
Flink 在内部处理数据是,由于业务的复杂性,不可避免的会与外部系统做数据交互,那么其中的延迟会对流处理的整个工作进度起决定性影响,本文使用几个案例来说明异步I/O的使用方式
|
11月前
|
SQL 存储 缓存
Flink进行Paimon写入源码分析
本文主要解析了Flink写入Paimon的核心流程。
|
11月前
|
存储 消息中间件 缓存
Flink进行Hudi写入源码分析
本文主要解析了Flink将DataStream写入到Hudi表的核心流程
|
SQL API Apache
Flink SQL代码补全提示(源码分析)
使用过Navicat的童鞋都知道,当我们写SQL的时候,工具会根据我们输入的内容弹出提示,这样可以很方便我们去写SQL
396 0
Flink SQL代码补全提示(源码分析)