Flink教程(28)- Flink性能优化

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink教程(28)- Flink性能优化

01 引言

在前面的博客,我们学习了FlinkMetrics监控了,有兴趣的同学可以参阅下:

本文主要讲解Flink性能优化

02 History Server

flinkHistoryServer主要是用来存储和查看任务的历史记录,具体信息可以看官网:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/advanced/historyserver.html

相关配置:

# Directory to upload completed jobs to. Add this directory to the list of
# monitored directories of the HistoryServer as well (see below).
# 将已完成的作业上传到的目录
jobmanager.archive.fs.dir: hdfs://node01:8020/completed-jobs/
# The address under which the web-based HistoryServer listens.
# 基于 Web 的 HistoryServer 的地址
historyserver.web.address: 0.0.0.0
# The port under which the web-based HistoryServer listens.
# 基于 Web 的 HistoryServer 的端口号
historyserver.web.port: 8082
# Comma separated list of directories to monitor for completed jobs.
# 以逗号分隔的目录列表,用于监视已完成的作业
historyserver.archive.fs.dir: hdfs://node01:8020/completed-jobs/
# Interval in milliseconds for refreshing the monitored directories.
# 刷新受监控目录的时间间隔(以毫秒为单位)
historyserver.archive.fs.refresh-interval: 10000

参数解析:

  • jobmanager.archive.fs.dir:flink job运行完成后的日志存放目录
  • historyserver.archive.fs.dir:flink history进程的hdfs监控目录
  • historyserver.web.address:flink history进程所在的主机
  • historyserver.web.port:flink history进程的占用端口
  • historyserver.archive.fs.refresh-interval:刷新受监视目录的时间间隔(以毫秒为单位)。

默认启动端口8082:

  • bin/historyserver.sh (start|start-foreground|stop)

03 序列化

Java 原生的序列化方式:

  • 优点:好处是比较简单通用,只要对象实现了 Serializable接口即可;
  • 缺点:效率比较低,而且如果用户没有指定 serialVersionUID的话,很容易出现作业重新编译后,之前的数据无法反序列化出来的情况(这也是 Spark Streaming Checkpoint的一个痛点,在业务使用中经常出现修改了代码之后,无法从 Checkpoint 恢复的问题)

对于分布式计算来讲,数据的传输效率非常重要。好的序列化框架可以通过较低的序列化时间和较低的内存占用大大提高计算效率和作业稳定性。

在数据序列化上,FlinkSpark 采用了不同的方式

  • Spark 对于所有数据默认采用 Java 原生序列化方式,用户也可以配置使用 Kryo;相比于 Java 原生序列化方式,无论是在序列化效率还是序列化结果的内存占用上,Kryo 则更好一些(Spark 声称一般 Kryo 会比 Java 原生节省 10x 内存占用);Spark 文档中表示它们之所以没有把 Kryo 设置为默认序列化框架的唯一原因是因为 Kryo 需要用户自己注册需要序列化的类,并且建议用户通过配置开启 Kryo。
  • Flink 则是自己实现了一套高效率的序列化方法。

04 复用对象

比如如下代码:

stream
    .apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
        @Override
        public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
            long changesCount = ...
            // A new Tuple instance is created on every execution
            collector.collect(new Tuple2<>(userName, changesCount));
        }
    }

可以看出,apply函数每执行一次,都会新建一个Tuple2类的实例,因此增加了对垃圾收集器的压力。解决这个问题的一种方法是反复使用相同的实例:

stream
        .apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
    // Create an instance that we will reuse on every call
    private Tuple2<String, Long> result = new Tuple<>();
    @Override
    public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
        long changesCount = ...
        // Set fields on an existing object instead of creating a new one
        result.f0 = userName;
        // Auto-boxing!! A new Long value may be created
        result.f1 = changesCount;
        // Reuse the same Tuple2 object
        collector.collect(result);
    }
}

这种做法其实还间接创建了Long类的实例。

为了解决这个问题,Flink有许多所谓的value class:IntValue、LongValue、StringValue、FloatValue等。下面介绍一下如何使用它们:

stream
        .apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
    // Create a mutable count instance
    private LongValue count = new LongValue();
    // Assign mutable count to the tuple
    private Tuple2<String, LongValue> result = new Tuple<>("", count);
    @Override
    // Notice that now we have a different return type
    public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, LongValue>> collector) throws Exception {
        long changesCount = ...
        // Set fields on an existing object instead of creating a new one
        result.f0 = userName;
        // Update mutable count value
        count.setValue(changesCount);
        // Reuse the same tuple and the same LongValue instance
        collector.collect(result);
    }
}

05 数据倾斜

我们的flink程序中如果使用了keyBy等分组的操作,很容易就出现数据倾斜的情况,数据倾斜会导致整体计算速度变慢,有些子节点甚至接受不到数据,导致分配的资源根本没有利用上。

带有窗口的操作:

  • 带有窗口的每个窗口中所有数据的分布不平均,某个窗口处理数据量太大导致速率慢
  • 导致Source数据处理过程越来越慢
  • 再导致所有窗口处理越来越慢

不带有窗口的操作:

  • 有些子节点接受处理的数据很少,甚至得不到数据,导致分配的资源根本没有利用上

WebUI体现:

WebUISubtasks中打开每个窗口可以看到每个窗口进程的运行情况:如上图,数据分布很不均匀,导致部分窗口数据处理缓慢。

优化方式:

  • 对key进行均匀的打散处理(hash,加盐等)
  • 自定义分区器
  • 使用Rebalabce

注意:Rebalance是在数据倾斜的情况下使用,不倾斜不要使用,否则会因为shuffle产生大量的网络开销。

06 总结

本文主要从History Server、序列化、 复用对象、 数据倾斜来讲解了Flink的性能优化,谢谢大家的阅读,本文完!

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
4月前
|
SQL 运维 API
Apache Flink 学习教程----持续更新
Apache Flink 学习教程----持续更新
269 0
|
4月前
|
流计算
JD Flink教程
JD Flink教程
36 0
|
4月前
|
Apache 流计算
Apache Flink教程
Apache Flink教程
251 0
|
2月前
|
SQL 缓存 监控
14个Flink SQL性能优化实践分享
【7月更文挑战第12天】 1. **合理设置并行度**: 根据数据量和资源调整以提高处理速度. 2. **优化数据源**: 使用分区表并进行预处理减少输入量. 3. **数据缓存**: 采用 `BROADCAST` 或 `REPARTITION` 缓存常用数据. 4. **索引和分区**: 创建索引并按常用字段分区. 5. **避免不必要的计算**: 检查并移除多余的计算步骤. 6. **调整内存配置**: 分配足够内存避免性能下降. 7. **优化连接操作**: 选择适合大表和小表的连接方式. 8. **数据类型优化**: 选择合适类型以节省资源. ........
|
1月前
|
资源调度 关系型数据库 MySQL
【Flink on YARN + CDC 3.0】神操作!看完这篇教程,你也能成为数据流处理高手!从零开始,一步步教会你在Flink on YARN模式下如何配置Debezium CDC 3.0,让你的数据库变更数据瞬间飞起来!
【8月更文挑战第15天】随着Apache Flink的普及,企业广泛采用Flink on YARN部署流处理应用,高效利用集群资源。变更数据捕获(CDC)工具在现代数据栈中至关重要,能实时捕捉数据库变化并转发给下游系统处理。本文以Flink on YARN为例,介绍如何在Debezium CDC 3.0中配置MySQL连接器,实现数据流处理。首先确保YARN上已部署Flink集群,接着安装Debezium MySQL连接器并配置Kafka Connect。最后,创建Flink任务消费变更事件并提交任务到Flink集群。通过这些步骤,可以构建出从数据库变更到实时处理的无缝数据管道。
77 2
|
4月前
|
SQL 数据采集 监控
14个Flink SQL性能优化实践分享
本文档详细列举了Apache Flink SQL的性能调优策略。主要关注点包括:增加数据源读取并行度、优化状态管理(如使用RocksDB状态后端并设置清理策略)、调整窗口操作以减少延迟、避免类型转换和不合理的JOIN操作、使用广播JOIN、注意SQL查询复杂度、控制并发度和资源调度、自定义源码实现、执行计划分析、异常检测与恢复、监控报警、数据预处理与清洗、利用高级特性(如容器化部署和UDF)以及数据压缩与序列化。此外,文档还强调了任务并行化、网络传输优化、系统配置调优、数据倾斜处理和任务调度策略。通过这些方法,可以有效解决性能问题,提升Flink SQL的运行效率。
223 5
|
4月前
|
SQL 资源调度 监控
Flink SQL性能优化实践
Apache Flink流处理性能优化指南:探索数据源读取并行度、状态管理、窗口操作的优化策略,包括设置默认并行度、使用RocksDB状态后端、调整窗口大小。调优方法涉及数据源分区、JOIN条件优化、使用Broadcast JOIN。注意SQL复杂度、并发控制与资源调度,如启用动态资源分配。源码层面优化自定义Source和Sink,利用执行计划分析性能瓶颈。异常检测与恢复通过启用检查点,监控任务性能。预处理数据、使用DISTINCT去重,结合UDF提高效率。选择高效序列化框架和启用数据压缩,优化网络传输和系统配置。处理数据倾斜,均衡数据分布,动态调整资源和任务优先级,以提升整体性能。
126 2
|
4月前
|
SQL 缓存 网络协议
Blink实时计算:Explorer大基数表的写入性能优化
在研发实时数据的过程中碰到了需要update写入Explore的大基数实时数据表的场景。本文记录了经过一系列方式调优后,在流量正常的情况下,任务不再出现explorer链接失败报错和延迟的全过程。
|
4月前
|
存储 缓存 监控
Flink性能优化小结
Flink性能优化小结
|
4月前
|
存储 数据采集 监控
Flink中的性能优化有哪些方法?请举例说明。
Flink中的性能优化有哪些方法?请举例说明。
87 0