flink问题之做实时数仓sql保证分topic区有序如何解决

简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink 1.10 kafka collector topic 配置pattern


请教大家,flink 1.10里面kafka connector 不能配置topic pattern,后续会支持吗?


参考回答:

可以关注下:https://issues.apache.org/jira/browse/FLINK-18449

预计1.12会支持。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/371672


问题二:kafkaf To mysql 写入问题


请教两个问题 1) 用下面的代码消费kafka 发生序列化异常时,会发生JOB反复重试,重启后也是这样, 改用FlinkKafkaConsumer010类的话,有相关的解决方法,参照https://stackoverflow.com/questions/51301549/how-to-handle-exception-while-parsing-json-in-flink/51302225 不知道,用Kafka类的话,如何解决 .connect( new Kafka() .version("0.10") .topic("test-input") 2) 对于timestamp类型字段,用JDBCAppendTableSink 把DataStream 写入到mysql时,会发下面的错误LocalTimeStamp到Timestamp的转型错误 kafka消息是avro格式,字段类型设置为timestamp(3),我是把System.currentTimeMillis()写入到kafka中的 jdbc参数类型设置为Types.SQL_TIMESTAMP


参考回答:

估计需要使用Flink 1.11。

1.JSON Format有参数控制 [1]

2.是之前的bug,Flink 1.11应该是不会存在了,不确定1.10.1有没有修。

[1]

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/json.html#json-ignore-parse-errors


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/371671


问题三:rocksdb的block cache usage应该如何使用


通过 state.backend.rocksdb.metrics.block-cache-usage: true开启 rocksdb_block_cache_usage监控,上报到prometheus,对应的指标名称是 flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage。

我们的作业一个TM的内存设置如下:

taskmanager.memory.process.size: 23000m taskmanager.memory.managed.fraction: 0.4

ui上显示的Flink Managed MEM是8.48G。

通过grafana配置出来的图,如果group by的维度是host,得出来的每个TM在作业稳定后是45G,超过8.48G了。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"}) by (host)

如果维度是host,operator_name,每个operator_name维度是22G。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"}) by (host,operator_name)

请问这个指标应该如何使用?


参考回答:

默认Flink启用了rocksDB 的managed memory,这里涉及到这个功能的实现原理,简单来说,一个slot里面的所有rocksDB实例底层“托管”内存的LRU block cache均是一个,这样你可以根据taskmanager和subtask_index 作为tag来区分,你会发现在同一个TM里面的某个subtask对应的不同column_family 的block cache的数值均是完全相同的。所以不需要将这个数值进行求和统计。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/371670


问题四:flink对task分配slot问题


请教一个问题,当前同一个job下的多个task(不在一个算子链)中,都会存在某一个subTask任务过重,这些subTask会分配到同一个slot下吗?

flink在对subTask分配slot时候,会先判断slot当前存在的任务数,磁盘IO之类的吗?


参考回答:

Flink 在进行 slot sharing 的时候,不会考虑当前 slot 的任务数、磁盘 IO 这些,而是会遵循“相同 task 的多个

subtask 不能分配到同一个 slot 中”这样的一个规则。

举个例子:

如果作业中有 A, B 两个 vertex,并发为 2,那就有 A1, A2, B1, B2 这 4 个 subtask。

那么 A1 和 A2 不能放到一个 slot 中,B1 和 B2 不能够放到一个 slot 中。

所以,slot sharing 的结果只能是 (A1, B1), (A2, B2) 或 (A1, B2), (A2, B1) 这两种情况。

通常情况下,A 和 B 之间的负载可能存在较大差异,而 A1 和 A2、B1 和 B2 之间通常不会有太大差异。

因此,slot sharing 的规则使得每个 slot 中都分配了一个 A 和一个 B,各个 slot 之间的负载大体上是均衡的。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/371669


问题五:做实时数仓,sql怎么保证分topic区有序


就是我用 flink sql 通过ddl读取和写入kafka怎么设置并行度呢? flink sql 通过ddl写入kafka怎么自定义分区呢?

这样才能保证提高消费能力。和保证数据有序。 但是好像没有发现再table模式 或者sql 语句上设置 或者做自定义分区。


参考回答:

sql似乎不支持相关的设置,可以通过env或配置文件设置所有蒜子的并行度。

你可以试试流转表,可以做到细粒度的控制。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/371668

相关文章
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
862 43
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
323 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
5月前
|
SQL 消息中间件 Kafka
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是 Apache Flink 提供的 SQL 引擎,支持流批一体处理,统一操作流数据与批数据,具备高性能、低延迟、丰富数据源支持及标准 SQL 兼容性,适用于实时与离线数据分析。
939 1
|
11月前
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
2058 27
|
10月前
|
SQL 关系型数据库 OLAP
云原生数据仓库AnalyticDB PostgreSQL同一个SQL可以实现向量索引、全文索引GIN、普通索引BTREE混合查询,简化业务实现逻辑、提升查询性能
本文档介绍了如何在AnalyticDB for PostgreSQL中创建表、向量索引及混合检索的实现步骤。主要内容包括:创建`articles`表并设置向量存储格式,创建ANN向量索引,为表增加`username`和`time`列,建立BTREE索引和GIN全文检索索引,并展示了查询结果。参考文档提供了详细的SQL语句和配置说明。
352 2
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
913 14
|
SQL
数仓规范之sql编写规范
编写SQL时,应遵循以下规范:所有关键字小写,表别名按a, b, c...顺序使用,复杂逻辑多行书写,提高可读性。SELECT字段需逐行列出,避免使用*,GROUP BY字段同样处理。WHERE条件多于一个时,每条件一行。JOIN子表推荐使用嵌套查询方式1,明确关联条件,避免笛卡尔积。关键逻辑需注释,INSERT SELECT后最外层字段加注释说明用途。示例中展示了推荐的JOIN替代子查询的写法,以提高代码的可读性和维护性。
598 1
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
291 0
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
568 13

相关产品

  • 实时计算 Flink版