Flink报错问题之使用Watermark报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink keyedState 能不能存储泛型或抽象类型

下面的业务逻辑

robot 传感器上报的信息,先按 robotId keyBy,之后要遍历很多的规则。每个规则存储一个之前的对象,实现如下: private transient MapState<String, robotData> state; for (Entry<String, IChargerTwoRecordRule> entry : RulesFactory.getChargerTwoRecordRules().entrySet()) { String faultName = entry.getKey(); IChargerTwoRecordRule rule = entry.getValue(); RobotData old = state.get(faultName);

rule.handleLogMsg(old, current);

}

现在有部分规则存储的对象不能用 RobotData 表示,有没有可能用类似泛型或继承的方式实现 MapState value 存储不同类型的数据呢?

比如

MapState<String, Object> state;

之后根据不同的规则 把 Object 转换成具体的类*来自志愿者整理的flink邮件归档



参考答案:

可以的。不过你在声明MapStateDescriptor的时候要用 GenericTypeInfo了,并且会有一定的性能损失。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364570?spm=a2c6h.13066369.question.1.6ad26382scKk5a



问题二:flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert

"请问使用mysql数据库时,使用flinksql,已经设置主键的情况下,相同主键的记录没有更新,而是越来越多, 是目前不支持还是我使用的方法不对呢? 版本:flink 1.11.1

关键的2个sql如下

create table open_and_close_terminal_minute_1 ( request_date varchar ,terminal_no varchar ,logon_time varchar ,logout_time varchar ,insert_time varchar ,PRIMARY KEY (request_date,terminal_no) NOT ENFORCED ) with ( 'connector' = 'jd…… 'url' = 'jdbc:mys……se', 'table-name' = 'c……, 'driver' = 'com.m…… 'username' = 'ana…… 'password' = 'ana…… 'sink.buffer-flus…… )

upsert into open_and_close_terminal_minute_1 select request_date ,terminal_no ,logon_time ,logout_time ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19) from ( select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,10) as request_date ,cast(terminalNo as varchar) as terminal_no ,DATE_FORMAT(min(times),'yyyy-MM-dd HH:mm:ss.SSS') as logon_time ,DATE_FORMAT(max(times),'yyyy-MM-dd HH:mm:ss.SSS') as logout_time from caslog INNER join itoa_b_terminal_shop for system_time as of caslog.proc_time on cast(caslog.terminalNo as varchar)= itoa_b_terminal_shop.rowkey where errCode=0 and attr=0 group by TUMBLE(times, INTERVAL '1' MINUTE),terminalNo

)"*来自志愿者整理的flink邮件归档



参考答案:

这个版本是支持的。 其中插入语句是 "insert into " 而不是 “update into”?

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364568?spm=a2c6h.13066369.question.4.6ad26382jp5Qtg



问题三:使用flink1.11.1的debezium-changelog目前是否不支持Watermark

报错日志:

Currently, defining WATERMARK on a changelog source is not supported



参考答案:

是的。 目前还不支持。

1.12 版本会支持。

你定义 watermark 目的是什么呢?做 window 聚合?

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364564?spm=a2c6h.13066369.question.3.6ad26382ClZfBe



问题四:Flink未来会弃用TableSourceFactory吗

FLIP-95都实现后有了DynamicTableSourceFactory那么TableSourceFactory会弃用吗?*来自志愿者整理的flink邮件归档



参考答案:

Hi, 据我了解会弃用的,新的connector都会用DynamicTableSourceFactory,一般稳定一两个版本后社区会弃用, 另外这个是比较底层的实现,sql用户应该感知不到,如果有自定义connector的开发都建议用DynamicTableSourceFactory。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364561?spm=a2c6h.13066369.question.4.6ad263821rGQsS



问题五:zookeeper更换leader对flink的影响

按照我在工作中经验,有过几次需要重启zk集群,我是单个zk节点逐个重启。结论是导致了flink集群中任务的全部自动重启(基于最近一次的ckpt)。这对任务还是有一定影响的,因为ckpt是10分钟一次,会导致瞬间压力变高。

问下这个合理嘛,还是我配置的有问题or操作有问题。*来自志愿者整理的flink邮件归档



参考答案:

哈哈, 我的也是, flink和ZK断开连接的话, 任务会全部重启, 这边测试了各种场景, 比如部署HA方案, 部署多个jobmanager都测试过, 任务都是会重启的, 同样不知道如何解决.

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364560?spm=a2c6h.13066369.question.5.6ad26382lwbkmO

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
消息中间件 分布式计算 大数据
大数据-121 - Flink Time Watermark 详解 附带示例详解
大数据-121 - Flink Time Watermark 详解 附带示例详解
97 0
|
3月前
|
分布式计算 Java 大数据
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
52 0
|
5月前
|
SQL Shell API
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
|
5月前
|
资源调度 监控 关系型数据库
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
|
5月前
|
SQL Java Apache
实时计算 Flink版操作报错合集之使用parquet时,怎么解决报错:无法访问到java.uti.Arrays$ArrayList类的私有字段
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
Oracle 关系型数据库 Java
实时计算 Flink版操作报错合集之遇到了关于MySqIValidator类缺失的错误,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
SQL 存储 资源调度
实时计算 Flink版操作报错合集之启动项目时报错缺少MySqlValidator类,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
Java 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在使用批处理模式中使用flat_aggregate函数时报错,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
SQL Oracle NoSQL
实时计算 Flink版操作报错合集之报错“找不到对应的归档日志文件”,怎么处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
监控 Apache 流计算
时间的守卫者:揭秘Flink中Watermark如何掌控数据流的时空秩序?
【8月更文挑战第26天】Apache Flink是一款功能强大的流处理框架,其Watermark机制为核心,确保了系统即使面对数据乱序或延迟也能准确处理时间相关的特性。Watermark作为一种特殊事件,标记了所有在此之前发生事件的最晚时间点,这对于时间窗口操作至关重要。
83 0

相关产品

  • 实时计算 Flink版