Flink报错问题之报类型转换错误如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink-1.11 KafkaDynamicTableSouce groupBy 结果怎样发送到

INSERT INTO kafka_dws_artemis_out_order select warehouse_id, count(*)

from kafka_ods_artemis_out_order group by warehouse_id;

[ERROR] Could not execute SQL statement. Reason:

org.apache.flink.table.api.TableException: Table sink

'myhive.wanglei.kafka_dws_artemis_out_order' doesn't support consuming

update changes which is produced by node

GroupAggregate(groupBy=[warehouse_id], select=[warehouse_id, COUNT(*) AS

EXPR$1])

在 Flink-1.10 中可以更改 KafkaTableSinkBase 让它 implements RetractStream 实现。

我看现在 Flink-1.11 中是用了 KafkaDynamicSource, KafkaDynamicSink,这样怎样改动才能让

GroupBy 的结果也发送到 Kafka 呢?

*来自志愿者整理的flink邮件归档



参考答案:

DynamicTableSink有一个方法是getChangelogMode,可以通过这个方法来指定这个sink接收什么种类的数据*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370230?spm=a2c6h.12873639.article-detail.63.6f9243783Lv0fl



问题二:FlinkSQL 任务提交后 任务名称问题

代码大概是这样子的,一张kafka source表,一张es Sink表,最后通过tableEnv.executeSql("insert into esSinkTable select ... from kafkaSourceTable")执行

任务提交后任务名称为“inset-into_某某catalog_某某database.某某Table”

这样很不友好啊,能不能我自己指定任务名称呢?

*来自志愿者整理的flink邮件归档



参考答案:

在zeppelin中你可以指定insert 语句的job name,如下图,(对Zeppelin感兴趣的,可以加入钉钉群:32803524)

%flink.ssql(jobName="my job")

insert into sink_kafka select status, direction, cast(event_ts/1000000000

as timestamp(3)) from source_kafka where status <> 'foo'*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370228?spm=a2c6h.12873639.article-detail.64.6f9243783Lv0fl



问题三:state无法从checkpoint中恢复

state无法从checkpoint中恢复 配置代码env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//作业失败后不重启

env.setRestartStrategy(RestartStrategies.noRestart());

env.getCheckpointConfig().setCheckpointTimeout(500);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

env.setStateBackend(new RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints")); 使用状态的代码private transient ListState<String> counts;

@Override

public void open(Configuration parameters) throws Exception {

StateTtlConfig ttlConfig = StateTtlConfig

.newBuilder(Time.minutes(30))

.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)

.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)

.build();

ListStateDescriptor<String> lastUserLogin = new ListStateDescriptor<>("lastUserLogin", String.class);

lastUserLogin.enableTimeToLive(ttlConfig);

counts = getRuntimeContext().getListState(lastUserLogin);

}

我重启了task managers 后。发现 counts 里面的数据都丢失了

*来自志愿者整理的flink邮件归档



参考答案:

1 counts 的数据丢失了能否详细描述一下呢?你预期是什么,看到什么现象

2 能否把你关于 counts 的其他代码也贴一下

  1. 你的作业是否从 checkpoint 恢复了呢?这个可以从 JM log 来查看
  2. 如果你确定是数据有丢失的话,或许你可以使用 state-process-api[1] 看一下是序列化出去有问题,还是 restore 回来有问题

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370227?spm=a2c6h.12873639.article-detail.65.6f9243783Lv0fl



问题四:sql 内嵌josn数组解析报 类型转换报错

hi all我这边有个嵌套的json数组,报类型转换错误(ts AS CAST(FROM_UNIXTIME(hiido_time) AS TIMESTAMP(3)),这里报错),是不是不能这么写

create table hiido_push_sdk_mq (

datas   ARRAY<ROW<from string,hdid string,event string,hiido_time bigint,ts AS CAST(FROM_UNIXTIME(hiido_time) AS TIMESTAMP(3)),WATERMARK FOR ts AS ts - INTERVAL '5' MINUTE>>

) with (

'connector' = 'kafka',

'topic' = 'hiido_pushsdk_event',

'properties.bootstrap.servers' = 'kafkafs002-core001.yy.com:8103,kafkafs002-core002.yy.com:8103,kafkafs002-core003.yy.com:8103',

'properties.group.id' = 'push_click_sql_version_consumer',

'scan.startup.mode' = 'latest-offset',

'format.type' = 'json');

错误如下:

[ERROR] 2020-07-17 20:17:50,640(562284338) --> [http-nio-8080-exec-10] com.yy.push.flink.sql.gateway.sql.parse.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:77): parseBySqlParser, parse: com.yy.push.flink.sql.gateway.context.JobContext$1@5d5f32d1, stmt: create table hiido_push_sdk_mq (    datas   ARRAY<ROW<from string,hdid string,event string,hiido_time bigint,ts AS CAST(FROM_UNIXTIME(hiido_time) AS TIMESTAMP(3)),WATERMARK FOR ts AS ts - INTERVAL '5' MINUTE>>) with ('connector' = 'kafka','topic' = 'hiido_pushsdk_event','properties.bootstrap.servers' = 'kafkafs002-core001.yy.com:8103,kafkafs002-core002.yy.com:8103,kafkafs002-core003.yy.com:8103','properties.group.id' = 'push_click_sql_version_consumer','scan.startup.mode' = 'latest-offset','format.type' = 'json'), error info: SQL parse failed. Encountered "AS" at line 1, column 115.

Was expecting one of:

   "ROW" ...

   <BRACKET_QUOTED_IDENTIFIER> ...

   <QUOTED_IDENTIFIER> ...

   <BACK_QUOTED_IDENTIFIER> ...

   <IDENTIFIER> ...

   <UNICODE_QUOTED_IDENTIFIER> ...

   "STRING" ...

   "BYTES" ...

   "ARRAY" ...

   "MULTISET" ...

   "RAW" ...

   "BOOLEAN" ...

   "INTEGER" ...

   "INT" ...

   "TINYINT" ...

   "SMALLINT" ...

   "BIGINT" ...

   "REAL" ...

   "DOUBLE" ...

   "FLOAT" ...

   "BINARY" ...

   "VARBINARY" ...

   "DECIMAL" ...

   "DEC" ...

   "NUMERIC" ...

   "ANY" ...

   "CHARACTER" ...

   "CHAR" ...

   "VARCHAR" ...

   "DATE" ...

   "TIME" ...

   "TIMESTAMP" ...

*来自志愿者整理的flink邮件归档



参考答案:

计算列只能写在最外层,不能在嵌套类型里面有计算列。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370226?spm=a2c6h.12873639.article-detail.66.6f9243783Lv0fl



问题五:flink不带参数的udf始终返回第一次调用的结果

我有一个不带参数的udf,用于返回系统当前时间的字符串格式,但是调用时每次都返回这个udf第一次调用的结果,所以拿到的时间全部都是一样的

udf的实时如下:

public class GetTimeFunc extends ScalarFunction {

public String eval() {

return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());

}

}

请问,针对这种没有入参的udf,flink内部是有做什么优化吗,导致每次调用返回的结果都一样?

*来自志愿者整理的flink邮件归档



参考答案:

是的,这种就被当做常量被优化掉了。

你可以覆盖一下ScalarFunction#isDeterministic方法,说明你这个函数时非确定性的,就不会被优化掉了。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370224?spm=a2c6h.12873639.article-detail.67.6f9243783Lv0fl

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
30天前
|
Java 关系型数据库 MySQL
Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
【2月更文挑战第33天】Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
49 2
|
1月前
|
Java 关系型数据库 MySQL
Flink CDC有见这个报错不?
【2月更文挑战第29天】Flink CDC有见这个报错不?
22 2
|
1月前
|
存储 关系型数据库 MySQL
Flink CDC产品常见问题之写hudi的时候报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
资源调度 关系型数据库 测试技术
Flink CDC产品常见问题之没有报错但是一直监听不到数据如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
缓存 监控 Java
Flink CDC产品常见问题之flink集群jps命令报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
Oracle 关系型数据库 MySQL
flink cdc 转换问题之类型转换如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
Oracle 关系型数据库 MySQL
Flink CDC产品常见问题之用superset连接starrocks报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
Oracle 关系型数据库 MySQL
flink cdc 增量问题之增量数据会报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
Oracle 关系型数据库 MySQL
Flink CDC产品常见问题之使用cdc-Oracle连接器报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
Flink CDC产品常见问题之使用cdc-Oracle连接器报错如何解决
|
1月前
|
Oracle 关系型数据库 数据处理
Flink CDC产品常见问题之flink postgresqlcdc 报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。

相关产品

  • 实时计算 Flink版