Flink报错问题之报类型转换错误如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink-1.11 KafkaDynamicTableSouce groupBy 结果怎样发送到

INSERT INTO kafka_dws_artemis_out_order select warehouse_id, count(*)

from kafka_ods_artemis_out_order group by warehouse_id;

[ERROR] Could not execute SQL statement. Reason:

org.apache.flink.table.api.TableException: Table sink

'myhive.wanglei.kafka_dws_artemis_out_order' doesn't support consuming

update changes which is produced by node

GroupAggregate(groupBy=[warehouse_id], select=[warehouse_id, COUNT(*) AS

EXPR$1])

在 Flink-1.10 中可以更改 KafkaTableSinkBase 让它 implements RetractStream 实现。

我看现在 Flink-1.11 中是用了 KafkaDynamicSource, KafkaDynamicSink,这样怎样改动才能让

GroupBy 的结果也发送到 Kafka 呢?

*来自志愿者整理的flink邮件归档



参考答案:

DynamicTableSink有一个方法是getChangelogMode,可以通过这个方法来指定这个sink接收什么种类的数据*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370230?spm=a2c6h.12873639.article-detail.63.6f9243783Lv0fl



问题二:FlinkSQL 任务提交后 任务名称问题

代码大概是这样子的,一张kafka source表,一张es Sink表,最后通过tableEnv.executeSql("insert into esSinkTable select ... from kafkaSourceTable")执行

任务提交后任务名称为“inset-into_某某catalog_某某database.某某Table”

这样很不友好啊,能不能我自己指定任务名称呢?

*来自志愿者整理的flink邮件归档



参考答案:

在zeppelin中你可以指定insert 语句的job name,如下图,(对Zeppelin感兴趣的,可以加入钉钉群:32803524)

%flink.ssql(jobName="my job")

insert into sink_kafka select status, direction, cast(event_ts/1000000000

as timestamp(3)) from source_kafka where status <> 'foo'*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370228?spm=a2c6h.12873639.article-detail.64.6f9243783Lv0fl



问题三:state无法从checkpoint中恢复

state无法从checkpoint中恢复 配置代码env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//作业失败后不重启

env.setRestartStrategy(RestartStrategies.noRestart());

env.getCheckpointConfig().setCheckpointTimeout(500);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

env.setStateBackend(new RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints")); 使用状态的代码private transient ListState<String> counts;

@Override

public void open(Configuration parameters) throws Exception {

StateTtlConfig ttlConfig = StateTtlConfig

.newBuilder(Time.minutes(30))

.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)

.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)

.build();

ListStateDescriptor<String> lastUserLogin = new ListStateDescriptor<>("lastUserLogin", String.class);

lastUserLogin.enableTimeToLive(ttlConfig);

counts = getRuntimeContext().getListState(lastUserLogin);

}

我重启了task managers 后。发现 counts 里面的数据都丢失了

*来自志愿者整理的flink邮件归档



参考答案:

1 counts 的数据丢失了能否详细描述一下呢?你预期是什么,看到什么现象

2 能否把你关于 counts 的其他代码也贴一下

  1. 你的作业是否从 checkpoint 恢复了呢?这个可以从 JM log 来查看
  2. 如果你确定是数据有丢失的话,或许你可以使用 state-process-api[1] 看一下是序列化出去有问题,还是 restore 回来有问题

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370227?spm=a2c6h.12873639.article-detail.65.6f9243783Lv0fl



问题四:sql 内嵌josn数组解析报 类型转换报错

hi all我这边有个嵌套的json数组,报类型转换错误(ts AS CAST(FROM_UNIXTIME(hiido_time) AS TIMESTAMP(3)),这里报错),是不是不能这么写

create table hiido_push_sdk_mq (

datas   ARRAY<ROW<from string,hdid string,event string,hiido_time bigint,ts AS CAST(FROM_UNIXTIME(hiido_time) AS TIMESTAMP(3)),WATERMARK FOR ts AS ts - INTERVAL '5' MINUTE>>

) with (

'connector' = 'kafka',

'topic' = 'hiido_pushsdk_event',

'properties.bootstrap.servers' = 'kafkafs002-core001.yy.com:8103,kafkafs002-core002.yy.com:8103,kafkafs002-core003.yy.com:8103',

'properties.group.id' = 'push_click_sql_version_consumer',

'scan.startup.mode' = 'latest-offset',

'format.type' = 'json');

错误如下:

[ERROR] 2020-07-17 20:17:50,640(562284338) --> [http-nio-8080-exec-10] com.yy.push.flink.sql.gateway.sql.parse.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:77): parseBySqlParser, parse: com.yy.push.flink.sql.gateway.context.JobContext$1@5d5f32d1, stmt: create table hiido_push_sdk_mq (    datas   ARRAY<ROW<from string,hdid string,event string,hiido_time bigint,ts AS CAST(FROM_UNIXTIME(hiido_time) AS TIMESTAMP(3)),WATERMARK FOR ts AS ts - INTERVAL '5' MINUTE>>) with ('connector' = 'kafka','topic' = 'hiido_pushsdk_event','properties.bootstrap.servers' = 'kafkafs002-core001.yy.com:8103,kafkafs002-core002.yy.com:8103,kafkafs002-core003.yy.com:8103','properties.group.id' = 'push_click_sql_version_consumer','scan.startup.mode' = 'latest-offset','format.type' = 'json'), error info: SQL parse failed. Encountered "AS" at line 1, column 115.

Was expecting one of:

   "ROW" ...

   <BRACKET_QUOTED_IDENTIFIER> ...

   <QUOTED_IDENTIFIER> ...

   <BACK_QUOTED_IDENTIFIER> ...

   <IDENTIFIER> ...

   <UNICODE_QUOTED_IDENTIFIER> ...

   "STRING" ...

   "BYTES" ...

   "ARRAY" ...

   "MULTISET" ...

   "RAW" ...

   "BOOLEAN" ...

   "INTEGER" ...

   "INT" ...

   "TINYINT" ...

   "SMALLINT" ...

   "BIGINT" ...

   "REAL" ...

   "DOUBLE" ...

   "FLOAT" ...

   "BINARY" ...

   "VARBINARY" ...

   "DECIMAL" ...

   "DEC" ...

   "NUMERIC" ...

   "ANY" ...

   "CHARACTER" ...

   "CHAR" ...

   "VARCHAR" ...

   "DATE" ...

   "TIME" ...

   "TIMESTAMP" ...

*来自志愿者整理的flink邮件归档



参考答案:

计算列只能写在最外层,不能在嵌套类型里面有计算列。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370226?spm=a2c6h.12873639.article-detail.66.6f9243783Lv0fl



问题五:flink不带参数的udf始终返回第一次调用的结果

我有一个不带参数的udf,用于返回系统当前时间的字符串格式,但是调用时每次都返回这个udf第一次调用的结果,所以拿到的时间全部都是一样的

udf的实时如下:

public class GetTimeFunc extends ScalarFunction {

public String eval() {

return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());

}

}

请问,针对这种没有入参的udf,flink内部是有做什么优化吗,导致每次调用返回的结果都一样?

*来自志愿者整理的flink邮件归档



参考答案:

是的,这种就被当做常量被优化掉了。

你可以覆盖一下ScalarFunction#isDeterministic方法,说明你这个函数时非确定性的,就不会被优化掉了。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370224?spm=a2c6h.12873639.article-detail.67.6f9243783Lv0fl

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
9月前
|
SQL Shell API
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
|
9月前
|
资源调度 监控 关系型数据库
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
|
9月前
|
SQL Java Apache
实时计算 Flink版操作报错合集之使用parquet时,怎么解决报错:无法访问到java.uti.Arrays$ArrayList类的私有字段
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
9月前
|
Oracle 关系型数据库 Java
实时计算 Flink版操作报错合集之遇到了关于MySqIValidator类缺失的错误,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
9月前
|
SQL 存储 资源调度
实时计算 Flink版操作报错合集之启动项目时报错缺少MySqlValidator类,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
9月前
|
Java 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在使用批处理模式中使用flat_aggregate函数时报错,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
9月前
|
SQL Oracle NoSQL
实时计算 Flink版操作报错合集之报错“找不到对应的归档日志文件”,怎么处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
8月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
6月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
2470 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
zdl
|
6月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
285 56

相关产品

  • 实时计算 Flink版