Flink报错问题之flink 1.11 upsert结果报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:[sql-client] 通过sql-client提交sql怎么设置checkpointing.in

通过sql-client提交sql怎么设置checkpointing.interval?

我看了一下sql-client-defaults.yaml中的execution, 并没有发现这个参数。请教大家一下。

谢谢

*来自志愿者整理的flink邮件归档



参考答案:

现在还不支持在sql-client-defaults.yaml 里配置 checkpointing.interval,

你可以配置在flink-conf.yaml里*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370216?spm=a2c6h.12873639.article-detail.73.6f9243783Lv0fl



问题二:flink1.11 set yarn slots failed

使用如下命令,设置Number of slots per TaskManager /opt/flink-1.11.0/bin/flink run-application -t yarn-application

-Djobmanager.memory.process.size=1024m

-Dtaskmanager.memory.process.size=2048m

-ys 4 \

发现并不能override/opt/flink-1.11.0/bin/flink/conf/flink-conf.yaml中的默认值,每次要调整只能通过更改flink-conf.yaml的方式才能生效,请问使用run-application的方式,怎样设置Number of slots per TaskManager? 另外,有哪些方式可以增Flink UI中的大Available Task Slots的值,现在每次提交作业都是0

*来自志愿者整理的flink邮件归档



参考答案:

’-t是新引入的参数,是不支持以前的-yxxx参数的

你需要使用-Dtaskmanager.numberOfTaskSlots=4这样来设置*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370215?spm=a2c6h.12873639.article-detail.74.6f9243783Lv0fl



问题三:Flink-1.11内置connector测试问题求解

小白在测试flink 1.11新特性新内置的三个connector时,在本地创建图片[1]中的任务并进行数据打印时,控制台只打印了表schema,而没有按内置的datagen connector规则产生数据,请问可能是什么原因呢?谢谢解答!

[1] https://postimg.cc/PprT9XV6

*来自志愿者整理的flink邮件归档



参考答案:

目前 1.11 版本中的 tableResult.print 只支持 exactly once 语义,需要配置 checkpoint。

1.12 里准备支持 at least once 语义,用户可以不用配置 checkpoint。目前 pr [1] 正在reivew 。

[1] https://github.com/apache/flink/pull/12867*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370213?spm=a2c6h.12873639.article-detail.75.6f9243783Lv0fl



问题四:flink 1.11 upsert结果出错

各位大佬好,请教一个问题flink从Kafka读数,写入mysql,对mysql结果根据主键进行数据更新,看官网是支持“on DUPLICATE”的,但是在执行中报错是这个导致的语法问题。完整代码如下,是在linux下,直接python *.py执行的。请问下这个是不支持吗,还是怎么写呢!

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode

from pyflink.table import StreamTableEnvironment, EnvironmentSettings

source="""

CREATE TABLE kafka_source_tab (    

trck_id VARCHAR,

score  INT

) WITH (

'connector' = 'kafka',

'topic' = 'alarm_test_g',  

'scan.startup.mode' = 'earliest-offset',

'properties.bootstrap.servers' = '10.2.2.73:2181',

'properties.bootstrap.servers' = '10.2.2.73:9092',

'format' = 'json'

)

"""

sink="""

CREATE TABLE g_source_tab (

trck_id VARCHAR,

score  INT,

PRIMARY KEY (trck_id) NOT ENFORCED

) WITH (

'connector' = 'jdbc',

'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false',

'table-name' = 'g',  

'username' = 'root',

'password' = '123456t',

'sink.buffer-flush.interval' = '1s'

)

"""

env = StreamExecutionEnvironment.get_execution_environment()

env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

env.set_parallelism(1)

env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()

t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)

t_env.execute_sql(source)

t_env.execute_sql(sink)

table_result1=t_env.execute_sql('''Insert into g_source_tab (trck_id,score) VALUES (select

                      trck_id,score from kafka_source_tab ) ON DUPLICATE KEY UPDATE score=score+1''')

table_result1.get_job_client().get_job_execution_result().result()

*来自志愿者整理的flink邮件归档



参考答案:

这个语法 Flink 还不支持的,官网上说的 Flink 的 JDBC connector 实现 幂等写入[1]的方式,就是有相同pk的数据在写入数据库时,翻译成数据库 upsert SQL的方式,这里说的语法是数据库的 SQL 语法 。

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#idempotent-writes https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#idempotent-writes*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370209?spm=a2c6h.12873639.article-detail.76.6f9243783Lv0fl



问题五:flink1.11 sql kafka 抽取事件时间

大家好!

使用flink1.11 sql接入kafka ,format为csv

从eventTime字段中抽取事件时间

rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(eventTime / 1000, 'yyyy-MM-dd HH:mm:ss'))

eventTime可能存在脏数据(非13位的毫秒时间戳),设置了 'csv.ignore-parse-errors' = 'true', 那么eventTime会被设置为null,此时会报一个异常:

Caused by: java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-null long value.

有没有什么好的方式可以解决

*来自志愿者整理的flink邮件归档



参考答案:

我感觉可以通过计算列的方式来解决呀,你只需要在计算rowtime这个列的时候保证它不是null即可,如果是null,可以设置一个默认值之类的?*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370212?spm=a2c6h.12873639.article-detail.77.6f9243783Lv0fl

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4月前
|
SQL Shell API
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
|
4月前
|
资源调度 监控 关系型数据库
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
|
4月前
|
SQL Java Apache
实时计算 Flink版操作报错合集之使用parquet时,怎么解决报错:无法访问到java.uti.Arrays$ArrayList类的私有字段
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
Oracle 关系型数据库 Java
实时计算 Flink版操作报错合集之遇到了关于MySqIValidator类缺失的错误,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
SQL 存储 资源调度
实时计算 Flink版操作报错合集之启动项目时报错缺少MySqlValidator类,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
Java 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在使用批处理模式中使用flat_aggregate函数时报错,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
SQL Oracle NoSQL
实时计算 Flink版操作报错合集之报错“找不到对应的归档日志文件”,怎么处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到iava.lang.NoClassDefFoundError: ververica/cdc/common/utils/StrinaUtils错误,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
Oracle 关系型数据库 数据库连接
实时计算 Flink版操作报错合集之为什么使用StartupOptions.latest()能够正常启动而切换到StartupOptions.specificOffset时遇到报错
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
存储 缓存 Java
实时计算 Flink版操作报错合集之怎么处理在运行作业时遇到报错::ClassCastException
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。

相关产品

  • 实时计算 Flink版