Flink问题之优化消费如何解决

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:关于Watermark的使用调试问题


想问下社区,watermark必须加在数据源上吗?顺便想问下一般用什么方式来调watermark,我自己本地有发现加在map

function之后,watermark会自动重置为默认值的情况。

谢谢!*来自志愿者整理的flink邮件归档


参考回答:

可以中途产生,走这个接口

org.apache.flink.streaming.api.datastream.DataStream#assignTimestampsAndWatermarks(org.apache.flink.api.common.eventtime.WatermarkStrategy )

麻烦贴一下你加 watermark 的代码和 pipeline 看一下啥情况


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359897?spm=a2c6h.13066354.0.0.5a8f6992GOUQsh


问题二:消息队列量级特别如何优化消费?


hi, 由于消息队列量特别大,在flink接入以后被压严重。除了修改并行度以外还有没有其他的优化方案!!!*来自志愿者整理的flink邮件归档


参考回答:

被压严重一般是sink 效率低或者计算过程有瓶颈,被压跟数据源数据多少没什么关系。你可以在web 界面查看哪个算子导致的,然后优化就可以了


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359536?spm=a2c6h.13262185.0.0.133c39c0Clkic3


问题三:Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问?


Hi 社区。

Flink 1.12.1

现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition ,设置大的并发,对于只有

forword 的ETL没有作用。

insert into table_a select id,udf(a),b,c from table_b;

发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区

  1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置?
  2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗?
== Physical Execution Plan ==
Stage 1 : Data Source
content : Source: TableSourceScan(table=[[default_catalog,
default_database, temp_table]], fields=[id...])
Stage 3 : Operator
content : ChangelogNormalize(key=[id])
ship_strategy : HASH
Stage 4 : Operator
content : Calc(select=[...])
ship_strategy : FORWARD
Stage 5 : Data Sink
content : Sink: Sink(table=[default_catalog.default_database.table_a],
fields=[id...])
ship_strategy : FORWARD
```*来自志愿者整理的flink邮件归档

参考回答:

  1. 对于 upsert-kafka 会默认加上 ChangelogNormalize
  2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json

也能用,但是要加上 table.exec.source.cdc-events-duplicate = true

参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如

forward。[1]:

https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359539?spm=a2c6h.13262185.0.0.133c39c0Clkic3


问题四:状态恢复参数顺序 -s怎么办?


../bin/flink run -s path -c class test.jar 这里面的-s 必须在最前面么,我换成 ../bin/flink run -c class test.jar -s path 不生效。*来自志愿者整理的flink邮件归档


参考回答:

你放在jar包后就当作jar的参数了 ,你可以试试这样在你的main中获取参数 s 就是你的path。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359540?spm=a2c6h.13262185.0.0.133c39c0Clkic3


问题五:Flink1.12 如何使用代码提交Batch的Sql?


我们知道如果在1.12里使用Table API来提交Batch的作业,比如: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH);

但是,如果提交Sql作业的话: StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table result = tableEnv.sqlQuery(...); 文档里也找不到如何使用StreamTableEnvironment 来跑Batch的SQL,又或者使用BatchTableEnvironment?

感谢各位提供思路!*来自志愿者整理的flink邮件归档


参考回答:

Hi shougou.

你要找的是不是这个[1]

// ******************// BLINK BATCH QUERY// ******************import

org.apache.flink.table.api.EnvironmentSettings;import

org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings bbSettings =

EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();TableEnvironment

bbTableEnv = TableEnvironment.create(bbSettings);

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#create-a-tableenvironment


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359546?spm=a2c6h.13262185.0.0.133c39c0Clkic3


相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
NoSQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之设置什么参数可以让多张表同时写入
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL 算法 调度
Flink批处理自适应执行计划优化
本文整理自阿里集团高级开发工程师孙夏在Flink Forward Asia 2024的分享,聚焦Flink自适应逻辑执行计划与Join算子优化。内容涵盖自适应批处理调度器、动态逻辑执行计划、自适应Broadcast Hash Join及Join倾斜优化等技术细节,并展望未来改进方向,如支持更多场景和智能优化策略。文章还介绍了Flink UI调整及性能优化措施,为批处理任务提供更高效、灵活的解决方案。
235 0
Flink批处理自适应执行计划优化
|
4月前
|
SQL 关系型数据库 MySQL
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
862 1
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
|
11月前
|
Java 流计算
利用java8 的 CompletableFuture 优化 Flink 程序
本文探讨了Flink使用avatorscript脚本语言时遇到的性能瓶颈,并通过CompletableFuture优化代码,显著提升了Flink的QPS。文中详细介绍了avatorscript的使用方法,包括自定义函数、从Map中取值、使用Java工具类及AviatorScript函数等,帮助读者更好地理解和应用avatorscript。
199 2
利用java8 的 CompletableFuture 优化 Flink 程序
|
11月前
|
存储 缓存 监控
Flink如何优化?需要注意哪些方面?
【10月更文挑战第10天】Flink如何优化?需要注意哪些方面?
377 6
|
6月前
|
存储 SQL Java
Flink CDC + Hologres高性能数据同步优化实践
本文整理自阿里云高级技术专家胡一博老师在Flink Forward Asia 2024数据集成(二)专场的分享,主要内容包括:1. Hologres介绍:实时数据仓库,支持毫秒级写入和高QPS查询;2. 写入优化:通过改进缓冲队列、连接池和COPY模式提高吞吐量和降低延迟;3. 消费优化:优化离线场景和分区表的消费逻辑,提升性能和资源利用率;4. 未来展望:进一步简化用户操作,支持更多DDL操作及全增量消费。Hologres 3.0全新升级为一体化实时湖仓平台,提供多项新功能并降低使用成本。
528 1
Flink CDC + Hologres高性能数据同步优化实践
|
存储 缓存 监控
Flink内存管理机制及其参数调优
Flink内存管理机制及其参数调优
|
SQL 分布式计算 关系型数据库
实时计算 Flink版产品使用问题之在使用FlinkCDC与PostgreSQL进行集成时,该如何配置参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
实时计算 Flink版产品使用问题之在使用FlinkCDC与PostgreSQL进行集成时,该如何配置参数
|
SQL 资源调度 流计算
慢sql治理问题之在 Flink 中, userjar 分发问题如何优化
慢sql治理问题之在 Flink 中, userjar 分发问题如何优化
|
SQL Oracle 数据处理
实时计算 Flink版产品使用问题之如何优化数据读取速度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

相关产品

  • 实时计算 Flink版