开发者社区官方技术圈

阿里云开发者社区官方技术圈,用户产品功能发布、用户反馈收集等。

阿里云开发者社区官方技术圈,用户产品功能发布、用户反馈收集等。

一般任务的反压并不会直接影响实时任务,但是任务中反压的节点是处于一个高负载情况,会造成任务的延迟越来越大,如果是持续性的反压,意味着任务本身存在瓶颈,可能导致潜在的不稳定或者数据延迟,尤其是数据量较大的场景下,反压的影响主要体现在Flink中checkpoint生成上,主要影响两个方面:

•任务出现反压,上游数据流阻塞,会使数据管道中数据处理速度变慢,数据处理被阻塞也会导致 checkpoint barrier 流经整个数据管道的时长变长,因而 checkpoint总体时间(End to End Duration)变长甚至是checkpoint失败。

•因为为保证 EOS(Exactly-Once-Semantics,准确一次),在对齐checkpoint场景中,算子接收多个管道输入,输入较快的管道数据state会被缓存起来,等待输入较慢的管道数据barrier对齐,这样由于输入较快管道数据没被处理,反压一直存在,较快的数据进入后一直积压可能导致OOM或者内存资源耗尽的不稳定问题。

这个影响对于数据量大的生产环境的作业来说是十分危险的,因为 checkpoint是保证数据一致性的关键,checkpoint 时间变长有可能导致 checkpoint 超时失败,而 state 大小同样可能拖慢 checkpoint 甚至导致 OOM 从而导致实时任务异常,而且不能能失败的checkpoint进行快速恢复。

以上内容摘自《企业级云原生白皮书项目实战》电子书,点击https://developer.aliyun.com/ebook/download/7774可下载完整版

胡嘞嘞 评论 0

反压是在Flink实时数据处理中经常遇到的问题,是在实时数据流的管道某个节点上游产生数据的速度大于该节点处理数据速度出现瓶颈。反压会从该节点向上游传递,一直到数据源,并降低数据源的摄入速度。这在流数据处理中非常常见,很多场景可以导致反压的出现,比如, GC导致短时间数据积压,数据的波动带来的一段时间内需处理的数据量有突增,任务负载大节点内存CPU使用率较高,以及任务checkpoint本身异常都可能造成反压。

具体到flink任务上查看,出现反压的时候,在flink UI上可以看到一个 task 发生反压警告,意味着它生产数据的速率比下游 task 消费数据的速率要快。 在工作流中数据记录是从上游向下游流动的(例如:从 Source 到 Sink)。

以一个简单的 Source -> Sink job 为例。如果看到 Source 发生了警告,意味着Sink 消费数据的速率比 Source 生产数据的速率要慢。 Sink 正在向上游的 Source算子产生反压。

image.png

在Flink WebUI 集合了所有 subTasks 的反压和繁忙指标的最大值,并在 Job-Graph 中将集合的值进行显示。除了显示原始的数值,tasks 也用颜色进行了标记,使检查更加容易。闲置的 tasks 为蓝色,完全被反压的 tasks 为黑色,完全繁忙的tasks 被标记为红色。 中间的所有值都表示为这三种颜色之间的过渡色。

在Job Overview 旁的 Back Pressure 选项卡中,可以具体看到当前节点的backpressured(反压)和Busy情况来具体判断节点的反压情况。

image.png

如果你看到 subtasks 的状态为 OK 表示没有反压。HIGH 表示这个 subtask 被反压。状态用如下定义:

•OK: 0% <= 反压比例 <= 10%

•LOW: 10% < 反压比例 <= 50%

•HIGH: 50% < 反压比例 <= 100%

除此之外,你还可以找到每一个 subtask 被反压、闲置或是繁忙的时间百分比。

以上内容摘自《企业级云原生白皮书项目实战》电子书,点击https://developer.aliyun.com/ebook/download/7774可下载完整版

胡嘞嘞 评论 0

1

回答

image.png

对比增量同步能力,基于日志的方式,可以很好的做到增量同步;而基于查询的方式是很难做到增量同步的。

对比全量同步能力,基于查询或者日志的 CDC 方案基本都支持,除了 Canal。

而对比全量 + 增量同步的能力,只有 Flink CDC、Debezium、Oracle Goldengate支持较好。

从架构角度去看,该表将架构分为单机和分布式,这里的分布式架构不单纯体现在数据读取能力的水平扩展上,更重要的是在大数据场景下分布式系统接入能力。例如 Flink CDC 的数据入湖或者入仓的时候,下游通常是分布式的系统,如 Hive、HDFS、Iceberg、Hudi 等,那么从对接入分布式系统能力上看,Flink CDC 的架构能够很好地接入此类系统。

在数据转换 / 数据清洗能力上,当数据进入到 CDC 工具的时候是否能较方便的对数据做一些过滤或者清洗,甚至聚合?在 Flink CDC 上操作相当简单,可以通过Flink SQL 去操作这些数据;但是像 DataX、Debezium 等则需要通过脚本或者模板 去做,所以用户的使用门槛会比较高。

另外,在生态方面,这里指的是下游的一些数据库或者数据源的支持。FlinkCDC 下游有丰富的 Connector,例如写入到 TiDB、MySQL、Pg、HBase、Kafka、ClickHouse 等常见的一些系统,也支持各种自定义 connector。

以上内容摘自《企业级云原生白皮书项目实战》电子书,点击https://developer.aliyun.com/ebook/download/7774可下载完整版

胡嘞嘞 评论 0

为什么需要单独来谈PyODPS DataFrame(以下简写为DF,此处特指PyODPSDataFrame,请注意和Pandas DataFrame区分)?因为在同一段PyODPS代码中,需要特别留意区分普通代码脚本和DF相关脚本。在开发者看来明明是同一套代码,但是其中的DF部分却是在不同的环境里执行的,下面借助这张图进一步阐释。

image.png

总体上看,可以简单地这么记忆:DF代码会在上图框选的MaxCompute内部执行,非DF代码则会在紫色部分(“本地环境”)执行。

以DataStudio为例,如果是在DataWorks的DataStudio创建了PyODPS2/PyODPS3节点,实际跑这段代码的环境是DataWorks的调度资源组机器(gateway机器),DataWorks已经帮用户配置好了相应的runtime(pyodps依赖、python interpreter等)。因而,这些代码在执行时,行为与普通Python code的执行行为类似,import第三方包时,引用的是“本地”的包,如果您使用的是独享调度资源组,则可通过DataWorks控制台的调度资源组运维助手,进行pip命令的下发来安装必要的三方依赖。

image.png

PyODPS DF支持对所有Sequence实现调用map方法,传入自定义函数闭包,实现对于MaxCompute表中某一列的每一个元素逐个调用自定义函数进行处理。如上述代码片段的handle函数。handle函数传入map方法时,会被提取为闭包和字节码,DF使用闭包和字节码生成一个MaxCompute的UDF,在执行时实际等效于:

select this_handle_udf(pyodps_iris.sepal_length)

由此可见,此部分的DF代码执行,发生在了MaxCompute集群内部了,即上图中的MaxCompute Executor机器上执行。

进一步来说说如果在自定义函数里使用到了三方包,因为自定义函数在Max-Compute Executor机器上执行,所以无法引用“本地环境”的包,import建议放在自定义函数内部进行,并且在DataWorks上上传三方包资源后,需要点击“提交”确保资源被正确上传到MaxCompute集群内部。

以上内容摘自《企业级云原生白皮书项目实战》电子书,点击https://developer.aliyun.com/ebook/download/7774可下载完整版

胡嘞嘞 评论 0

MMA目前主要面向Hadoop数据迁移的场景,支持Hive到MaxCompute迁移。MMA实现了Hive的 UDTF,通过Hive的分布式能力,实现Hive数据向MaxCompute的高并发传输。通过连接用户的Hive Metastore服务,抓取Hive Metadata,并利用这些数据生成用于创建MaxCompute表和分区的DDL语句以及用于迁移数据的HiveUDTF SQL。

以下是整个迁移链路的简要示意图,供参考:

image.png

当用户通过 MMA client 向 MMA server 提交一个迁移 Job 后,MMA 首先会将该 Job 的配置记录在元数据中,并初始化其状态为 PENDING。

随后,MMA 调度器将会把这个 Job 状态置为 RUNNING,向 Hive 请求这张表的元数据,并开始调度执行。这个 Job 在 MMA 中会被拆分为若干 个 Task,每一个Task 负责表中的一部分数据的传输,每个 Task 又会拆分为若干个 Action 进行具体传输和验证。在逻辑结构上,每一个 Job 将会包含若干个 Task 组成的 DAG,而每一个 Task 又会包含若干个 Action 组成的 DAG。整体的流程大致如下:

image.png

上图中数据传输的原理是利用Hive的分布式计算能力,实现了一个Hive UDTF,在Hive UDTF中实现了上传数据至MaxCompute的逻辑,并将一个数据迁移任务转化为一个或多个形如:

SELECT UDTF(*) FROM hive_db.hive_table;

的Hive SQL。在执行上述Hive SQL时,数据将被Hive读出并传入UDTF,UDTF会通过MaxCompute的 Tunnel SDK将数据写入MaxCompute。

当某一个Task的所有Action执行成功后,MMA会将这个Task负责的部分数据的迁移状态置为 SUCCEEDED。当该Job对应的所有Task都成功后,这张表的迁移结束。

当某一个Task的某一个Action执行失败,MMA会将这个Task负责的部分数据的迁移状态置为 FAILED,并生成另一个Task负责这部分数据,直到成功或达到重试次数上限。

当表中数据发生变化时(新增数据,新增分区,或已有分区数据变化),可以重新提交迁移任务,此时MMA会重新扫描Hive中元数据,发现数据变化,并迁移发生变化的表或分区。

以上内容摘自《企业级云原生白皮书项目实战》电子书,点击https://developer.aliyun.com/ebook/download/7774可下载完整版

胡嘞嘞 评论 0

有DataWorks等数据平台开发经验的大数据从业者对业务流程、DAG、调度编排等词汇耳熟能详,这些词汇都描述或提示了大数据开发的一般流程。通常数据开发的总体流程包括数据产生、数据收集与存储、数据分析与处理、数据提取和数据展现与分享。

image.png

这里以DataWorks来举例说明,一般是需要在DataStudio数据开发页面中,创建某个分析需求的业务流程,然后在业务流程中配合使用各类节点(逻辑类、数据同步类、各类计算引擎节点等),最终将这些不同类型的节点,根据业务逻辑关系,编排成有向无环图(DAG)。

如果是简单的A表B表的周期离线同步,那只要在业务流程中添加一个离线同步节点,完整源端、目标端数据源配置及网络打通,基于脚本或向导配置好管道,即可完成。但实际业务场景下,数据同步通常不能通过一个或多个简单离线同步或者实时同步任务完成,而是由多个离线同步、实时同步和数据处理等任务组合完成,这就会导致数据同步场景下的配置复杂度非常高。

为了解决上述问题,DataWorks提出了面向业务场景的同步任务配置化方案(我们称之为同步解决方案),支持不同数据源的一键同步功能,例如,“一键实时同步至MaxCompute”、“一键实时同步至Hologres”功能等,通过此类功能,只需要进行简单的配置,就可以完成一个复杂业务场景。而通过传统的手工拖拽节点进行编排的方式,可能需要操作5+甚至10+个节点,配置项达到上百个(涉及周期、参数、依赖关系等各类配置)。

例如:一键实时同步至MaxCompute(独立merge天周期),可能包含了5个数据开发节点和2个资源文件。7个文件可以在一个解决方案向导中轻松配置完成。

image.png

以上内容摘自《企业级云原生白皮书项目实战》电子书,点击https://developer.aliyun.com/ebook/download/7774可下载完整版

胡嘞嘞 评论 0

公告

阿里云开发者社区官方技术圈,用户产品功能发布、用户反馈收集等。

展开