Flink报错问题之udtf collect方法报错NEP如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink sql cdc 如果只处理一次全量数据问题

之前一直使用streaming api,这两天开始使用sql。

有个疑问,flink sql cdc读取mysql的数据时候,会处理 全量 + 增量数据。

那么如果同一个任务上线后,后续有变化,修改后再次上线,这个时候我并不希望处理之前过的数据。这个时候是怎么做呢?

cdc里面有进行state保存消费过的changelog的位置吗?这样我重新上线的时候从savepoint或者checkpoint进行恢复,是不是就可以了?

*来自志愿者整理的flink邮件归档



参考答案:

你说的有变化是后续的数据库进行增删改操作吗,如果是的话你从checkpoint启动就好了啊

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370026?spm=a2c6h.13066369.question.78.33bf585f4ld2dd



问题二:flink sql实时计算分位数如何实现

   目前flink sql实时计算中没有percentile函数吗?如果没有,如何实现这一功能。

   期待你的答复,谢谢!

*来自志愿者整理的flink邮件归档



参考答案:

Percentile 函数应该是 hive 内置的 UDF,在 Flink SQL 可以直接使用 Hive 的内置 UDF, 具体使用方式可查阅文档[1]. [1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/hive/hive_functions.html *来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370025?spm=a2c6h.13066369.question.79.33bf585ftIMQJy



问题三:发现flinksql写hive比写hdfs慢很多

两个job,都从同一个kafka读数据,一份写入hdfs,一份写入hive,都是分钟分区,并发都是200。运行一段时间后发现写hive要落后hdfs很多,而且hive任务对应的hdfs路径下,某一分区内的文件甚至跨度2个小时之久。大家遇到过这种情况没 附上对应ddl hive: CREATE EXTERNAL TABLE hive_table ( log_timestamp BIGINT, ip STRING, raw STRING ) PARTITIONED BY (day STRING, hour STRING,minute STRING) STORED AS PARQUET TBLPROPERTIES ( 'parquet.compression'='SNAPPY', 'sink.partition-commit.policy.kind' = 'success-file', 'sink.partition-commit.success-file.name' = '_SUCCESS' );

Hdfs:

CREATE TABLE hdfs_table ( log_timestamp BIGINT, ip STRING, raw STRING, day STRING, hour STRING,minute STRING ) PARTITIONED BY (day , hour ,minute) WITH ( 'connector'='filesystem', 'path'='hdfs://xx/test.db/hdfs_table', 'format'='parquet', 'parquet.compression'='SNAPPY', 'sink.partition-commit.policy.kind' = 'success-file’, 'sink.partition-commit.success-file.name' = '_SUCCESS' );

实际hdfs文件对比:

-rw-r--r-- 3 hadoop hadoop 1514862 2020-11-26 09:26 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-150-824 -rw-r--r-- 3 hadoop hadoop 10798011 2020-11-26 09:34 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-151-830 -rw-r--r-- 3 hadoop hadoop 4002618 2020-11-26 09:35 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-151-831 -rw-r--r-- 3 hadoop hadoop 8057522 2020-11-26 09:51 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-152-844 -rw-r--r-- 3 hadoop hadoop 6675744 2020-11-26 09:52 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-152-845 -rw-r--r-- 3 hadoop hadoop 4062571 2020-11-26 09:51 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-153-844 -rw-r--r-- 3 hadoop hadoop 10247973 2020-11-26 09:52 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-153-845 -rw-r--r-- 3 hadoop hadoop 483029 2020-11-26 09:53 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-153-846 -rw-r--r-- 3 hadoop hadoop 9440221 2020-11-26 09:16 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-154-816 -rw-r--r-- 3 hadoop hadoop 5346956 2020-11-26 09:17 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-154-817 -rw-r--r-- 3 hadoop hadoop 4940718 2020-11-26 09:51 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-155-844 -rw-r--r-- 3 hadoop hadoop 9687410 2020-11-26 09:52 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-155-845 -rw-r--r-- 3 hadoop hadoop 51998 2020-11-26 09:53 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-155-846 -rw-r--r-- 3 hadoop hadoop 3518 2020-11-26 09:37 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-156-833 -rw-r--r-- 3 hadoop hadoop 13801987 2020-11-26 09:39 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-156-834 -rw-r--r-- 3 hadoop hadoop 963288 2020-11-26 09:40 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-156-835 -rw-r--r-- 3 hadoop hadoop 6036601 2020-11-26 09:27 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-157-825 -rw-r--r-- 3 hadoop hadoop 8864235 2020-11-26 09:29 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-157-826 -rw-r--r-- 3 hadoop hadoop 10865872 2020-11-26 09:37 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-158-833 -rw-r--r-- 3 hadoop hadoop 4031077 2020-11-26 09:39 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-158-834 -rw-r--r-- 3 hadoop hadoop 228350 2020-11-26 09:09 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-159-811 -rw-r--r-- 3 hadoop hadoop 14661395 2020-11-26 09:11 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-159-812 -rw-r--r-- 3 hadoop hadoop 5451995 2020-11-26 09:29 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-160-826 -rw-r--r-- 3 hadoop hadoop 9149301 2020-11-26 09:30 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-160-827 -rw-r--r-- 3 hadoop hadoop 4731543 2020-11-26 09:30 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-161-827

-rw-r--r-- 3 hadoop hadoop 5950562 2020-11-26 09:00 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-17-1288 -rw-r--r-- 3 hadoop hadoop 8922364 2020-11-26 08:59 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-170-1287 -rw-r--r-- 3 hadoop hadoop 5898257 2020-11-26 09:00 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-170-1288 -rw-r--r-- 3 hadoop hadoop 8848292 2020-11-26 08:59 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-171-1287 -rw-r--r-- 3 hadoop hadoop 5893106 2020-11-26 09:00 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-171-1288 -rw-r--r-- 3 hadoop hadoop 8905617 2020-11-26 08:59 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-172-1287 -rw-r--r-- 3 hadoop hadoop 5800338 2020-11-26 09:00 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-172-1288 -rw-r--r-- 3 hadoop hadoop 8914099 2020-11-26 08:59 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-173-1287 -rw-r--r-- 3 hadoop hadoop 5773258 2020-11-26 09:00 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-173-1288 -rw-r--r-- 3 hadoop hadoop 8950742 2020-11-26 08:59 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-174-1287 -rw-r--r-- 3 hadoop hadoop 5829613 2020-11-26 09:00 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-174-1288 -rw-r--r-- 3 hadoop hadoop 8808161 2020-11-26 08:59 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-175-1287 -rw-r--r-- 3 hadoop hadoop 5910085 2020-11-26 09:00 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-175-1288 -rw-r--r-- 3 hadoop hadoop 8871508 2020-11-26 08:59 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-176-1287 -rw-r--r-- 3 hadoop hadoop 5896191 2020-11-26 09:00 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-176-1288 -rw-r--r-- 3 hadoop hadoop 8855378 2020-11-26 08:59 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-177-1287 -rw-r--r-- 3 hadoop hadoop 5857271 2020-11-26 09:00 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-177-1288

*来自志愿者整理的flink邮件归档



参考答案:

Hi, admin 结合这个 issue 和你的对比结果, 我觉得应该是这个bug,这个问题在最新的分支已经修复,今天社区cut branch了,你可以帮忙在1.12的分支或master的分支上验证下吗?

祝好, Leonard [1] https://github.com/apache/flink/tree/release-1.12 https://github.com/apache/flink/tree/release-1.12 [2] https://github.com/apache/flink/tree/master https://github.com/apache/flink/tree/master

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370023?spm=a2c6h.13066369.question.80.33bf585fPsJwlU



问题四:udtf collect方法 报NEP

Hi all, 我在使用udtf 调用collect方法的时候出现空指针异常,显示collector对象为null,第一次调用正常,第二次调用数据被标记了 “INSERT”是什么意思? https://imgchr.com/i/D0HuCj https://imgchr.com/i/D0Hm5Q udtf代码: https://imgchr.com/i/D0HeUg

*来自志愿者整理的flink邮件归档



参考答案:

+I 表示是一条 insert 的数据,其它类型的可以查看: https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/types/RowKind.java

其中 NPE 问题,你是用哪个版本的呢? 方便复制下 sql 和 udtf 吗,我看下能不能复现。 PS:“第一次调用正常” 是指? *来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370020?spm=a2c6h.13066369.question.79.33bf585fU5LOfS



问题五:flink1.11.2 一直不入hbase问题

我这边测试flink入hbase发现一直入不了hbase,也没有报错。我的具体环境如下:

Hbase版本1.1.2with hdp2.6.5

Hadoop版本2.7.3

Flink版本1.11.2

Hbasesink:

create table f_volteics_gm_all_h_hbase (

msisdn string,

INFO ROW ,

PRIMARY KEY (msisdn) NOT ENFORCED

) WITH (

'connector' = 'hbase-1.4.3',

'table-name' = 'f_volteics_gm_all_h',

'zookeeper.quorum' = '192.168.6.200:2181',

'sink.buffer-flush.max-size'='0',

'sink.buffer-flush.max-rows'='0'

处理逻辑从kafka读数据,写入hbase。如下:

我将我的程序打成jar包,在yarn上运行。结果没有报错,但一直没有数据入hbase。从监控页面上看bytes sent一直为0如下:

*来自志愿者整理的flink邮件归档



参考答案:

Flink 1.11.x 版本hbase的 identifier 都是 hbase-1.4, 不是 hbase-1.4.3, 应该会报错才对,可以确认下日志里也没有报错吗? *来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370019?spm=a2c6h.13066369.question.80.33bf585fxvBDuR

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
26天前
|
流计算
在Flink中,你可以通过以下方法为join操作设置并行度
【2月更文挑战第27天】在Flink中,你可以通过以下方法为join操作设置并行度
21 3
|
19天前
|
Java 关系型数据库 MySQL
Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
【2月更文挑战第33天】Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
32 2
|
24天前
|
Java 关系型数据库 MySQL
Flink CDC有见这个报错不?
【2月更文挑战第29天】Flink CDC有见这个报错不?
17 2
|
26天前
|
存储 关系型数据库 MySQL
Flink CDC产品常见问题之写hudi的时候报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
26天前
|
资源调度 关系型数据库 测试技术
Flink CDC产品常见问题之没有报错但是一直监听不到数据如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
26天前
|
缓存 监控 Java
Flink CDC产品常见问题之flink集群jps命令报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
29天前
|
Oracle 关系型数据库 MySQL
Flink CDC产品常见问题之用superset连接starrocks报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
29天前
|
Oracle 关系型数据库 MySQL
flink cdc 增量问题之增量数据会报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
476 5
|
25天前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1037 1
官宣|Apache Flink 1.19 发布公告

相关产品

  • 实时计算 Flink版