Flink问题之优化消费如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:关于Watermark的使用调试问题


想问下社区,watermark必须加在数据源上吗?顺便想问下一般用什么方式来调watermark,我自己本地有发现加在map

function之后,watermark会自动重置为默认值的情况。

谢谢!*来自志愿者整理的flink邮件归档


参考回答:

可以中途产生,走这个接口

org.apache.flink.streaming.api.datastream.DataStream#assignTimestampsAndWatermarks(org.apache.flink.api.common.eventtime.WatermarkStrategy )

麻烦贴一下你加 watermark 的代码和 pipeline 看一下啥情况


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359897?spm=a2c6h.13066354.0.0.5a8f6992GOUQsh


问题二:消息队列量级特别如何优化消费?


hi, 由于消息队列量特别大,在flink接入以后被压严重。除了修改并行度以外还有没有其他的优化方案!!!*来自志愿者整理的flink邮件归档


参考回答:

被压严重一般是sink 效率低或者计算过程有瓶颈,被压跟数据源数据多少没什么关系。你可以在web 界面查看哪个算子导致的,然后优化就可以了


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359536?spm=a2c6h.13262185.0.0.133c39c0Clkic3


问题三:Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问?


Hi 社区。

Flink 1.12.1

现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition ,设置大的并发,对于只有

forword 的ETL没有作用。

insert into table_a select id,udf(a),b,c from table_b;

发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区

  1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置?
  2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗?
== Physical Execution Plan ==
Stage 1 : Data Source
content : Source: TableSourceScan(table=[[default_catalog,
default_database, temp_table]], fields=[id...])
Stage 3 : Operator
content : ChangelogNormalize(key=[id])
ship_strategy : HASH
Stage 4 : Operator
content : Calc(select=[...])
ship_strategy : FORWARD
Stage 5 : Data Sink
content : Sink: Sink(table=[default_catalog.default_database.table_a],
fields=[id...])
ship_strategy : FORWARD
```*来自志愿者整理的flink邮件归档

参考回答:

  1. 对于 upsert-kafka 会默认加上 ChangelogNormalize
  2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json

也能用,但是要加上 table.exec.source.cdc-events-duplicate = true

参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如

forward。[1]:

https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359539?spm=a2c6h.13262185.0.0.133c39c0Clkic3


问题四:状态恢复参数顺序 -s怎么办?


../bin/flink run -s path -c class test.jar 这里面的-s 必须在最前面么,我换成 ../bin/flink run -c class test.jar -s path 不生效。*来自志愿者整理的flink邮件归档


参考回答:

你放在jar包后就当作jar的参数了 ,你可以试试这样在你的main中获取参数 s 就是你的path。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359540?spm=a2c6h.13262185.0.0.133c39c0Clkic3


问题五:Flink1.12 如何使用代码提交Batch的Sql?


我们知道如果在1.12里使用Table API来提交Batch的作业,比如: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH);

但是,如果提交Sql作业的话: StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table result = tableEnv.sqlQuery(...); 文档里也找不到如何使用StreamTableEnvironment 来跑Batch的SQL,又或者使用BatchTableEnvironment?

感谢各位提供思路!*来自志愿者整理的flink邮件归档


参考回答:

Hi shougou.

你要找的是不是这个[1]

// ******************// BLINK BATCH QUERY// ******************import

org.apache.flink.table.api.EnvironmentSettings;import

org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings bbSettings =

EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();TableEnvironment

bbTableEnv = TableEnvironment.create(bbSettings);

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#create-a-tableenvironment


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359546?spm=a2c6h.13262185.0.0.133c39c0Clkic3


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
消息中间件 缓存 关系型数据库
Flink CDC产品常见问题之upsert-kafka增加参数报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
4月前
|
消息中间件 缓存 监控
Flink背压原理以及解决优化
Flink背压原理以及解决优化
135 0
|
4月前
|
SQL 关系型数据库 MySQL
使用CTAS 把mysql 表同步数据 到hologres ,Flink有什么参数可以使hologres 的字段都小写吗?
使用CTAS 把mysql 表同步数据 到hologres ,Flink有什么参数可以使hologres 的字段都小写吗?
281 0
|
5月前
|
资源调度 流计算
Flink 指标参数源码解读(读取数量、发送数量、发送字节数、接收字节数等)(下)
Flink 指标参数源码解读(读取数量、发送数量、发送字节数、接收字节数等)(下)
84 1
|
1月前
|
存储 SQL Oracle
flink cdc 时区问题之文档添加参数无效如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
消息中间件 Oracle 关系型数据库
Flink CDC 数据源问题之参数配置如何解决
Flink CDC数据源指的是使用Apache Flink的CDC特性来连接并捕获外部数据库变更数据的数据源;本合集将介绍如何配置和管理Flink CDC数据源,以及解决数据源连接和同步过程中遇到的问题。
40 0
|
2月前
|
Java 流计算
【极数系列】Flink配置参数如何获取?(06)
【极数系列】Flink配置参数如何获取?(06)
|
3月前
|
消息中间件 存储 Kafka
在Flink中,可以通过配置`KafkaConsumer`的`properties`参数来设置两个不同的SASL机制
【1月更文挑战第19天】【1月更文挑战第91篇】在Flink中,可以通过配置`KafkaConsumer`的`properties`参数来设置两个不同的SASL机制
79 3
|
4月前
|
监控 分布式数据库 流计算
Flink 异步IO优化任务
Flink 异步IO优化任务
38 0
|
4月前
|
缓存 监控 分布式数据库
Flink应用优化实践
Flink应用优化实践
35 0

相关产品

  • 实时计算 Flink版