Flink报错问题之udtf collect方法报错NEP如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink sql cdc 如果只处理一次全量数据问题

之前一直使用streaming api,这两天开始使用sql。

有个疑问,flink sql cdc读取mysql的数据时候,会处理 全量 + 增量数据。

那么如果同一个任务上线后,后续有变化,修改后再次上线,这个时候我并不希望处理之前过的数据。这个时候是怎么做呢?

cdc里面有进行state保存消费过的changelog的位置吗?这样我重新上线的时候从savepoint或者checkpoint进行恢复,是不是就可以了?

*来自志愿者整理的flink邮件归档



参考答案:

你说的有变化是后续的数据库进行增删改操作吗,如果是的话你从checkpoint启动就好了啊

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370026?spm=a2c6h.13066369.question.78.33bf585f4ld2dd



问题二:flink sql实时计算分位数如何实现

   目前flink sql实时计算中没有percentile函数吗?如果没有,如何实现这一功能。

   期待你的答复,谢谢!

*来自志愿者整理的flink邮件归档



参考答案:

Percentile 函数应该是 hive 内置的 UDF,在 Flink SQL 可以直接使用 Hive 的内置 UDF, 具体使用方式可查阅文档[1]. [1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/hive/hive_functions.html *来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370025?spm=a2c6h.13066369.question.79.33bf585ftIMQJy



问题三:发现flinksql写hive比写hdfs慢很多

两个job,都从同一个kafka读数据,一份写入hdfs,一份写入hive,都是分钟分区,并发都是200。运行一段时间后发现写hive要落后hdfs很多,而且hive任务对应的hdfs路径下,某一分区内的文件甚至跨度2个小时之久。大家遇到过这种情况没 附上对应ddl hive: CREATE EXTERNAL TABLE hive_table ( log_timestamp BIGINT, ip STRING, raw STRING ) PARTITIONED BY (day STRING, hour STRING,minute STRING) STORED AS PARQUET TBLPROPERTIES ( 'parquet.compression'='SNAPPY', 'sink.partition-commit.policy.kind' = 'success-file', 'sink.partition-commit.success-file.name' = '_SUCCESS' );

Hdfs:

CREATE TABLE hdfs_table ( log_timestamp BIGINT, ip STRING, raw STRING, day STRING, hour STRING,minute STRING ) PARTITIONED BY (day , hour ,minute) WITH ( 'connector'='filesystem', 'path'='hdfs://xx/test.db/hdfs_table', 'format'='parquet', 'parquet.compression'='SNAPPY', 'sink.partition-commit.policy.kind' = 'success-file’, 'sink.partition-commit.success-file.name' = '_SUCCESS' );

实际hdfs文件对比:

-rw-r--r-- 3 hadoop hadoop 1514862 2020-11-26 09:26 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-150-824 -rw-r--r-- 3 hadoop hadoop 10798011 2020-11-26 09:34 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-151-830 -rw-r--r-- 3 hadoop hadoop 4002618 2020-11-26 09:35 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-151-831 -rw-r--r-- 3 hadoop hadoop 8057522 2020-11-26 09:51 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-152-844 -rw-r--r-- 3 hadoop hadoop 6675744 2020-11-26 09:52 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-152-845 -rw-r--r-- 3 hadoop hadoop 4062571 2020-11-26 09:51 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-153-844 -rw-r--r-- 3 hadoop hadoop 10247973 2020-11-26 09:52 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-153-845 -rw-r--r-- 3 hadoop hadoop 483029 2020-11-26 09:53 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-153-846 -rw-r--r-- 3 hadoop hadoop 9440221 2020-11-26 09:16 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-154-816 -rw-r--r-- 3 hadoop hadoop 5346956 2020-11-26 09:17 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-154-817 -rw-r--r-- 3 hadoop hadoop 4940718 2020-11-26 09:51 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-155-844 -rw-r--r-- 3 hadoop hadoop 9687410 2020-11-26 09:52 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-155-845 -rw-r--r-- 3 hadoop hadoop 51998 2020-11-26 09:53 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-155-846 -rw-r--r-- 3 hadoop hadoop 3518 2020-11-26 09:37 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-156-833 -rw-r--r-- 3 hadoop hadoop 13801987 2020-11-26 09:39 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-156-834 -rw-r--r-- 3 hadoop hadoop 963288 2020-11-26 09:40 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-156-835 -rw-r--r-- 3 hadoop hadoop 6036601 2020-11-26 09:27 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-157-825 -rw-r--r-- 3 hadoop hadoop 8864235 2020-11-26 09:29 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-157-826 -rw-r--r-- 3 hadoop hadoop 10865872 2020-11-26 09:37 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-158-833 -rw-r--r-- 3 hadoop hadoop 4031077 2020-11-26 09:39 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-158-834 -rw-r--r-- 3 hadoop hadoop 228350 2020-11-26 09:09 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-159-811 -rw-r--r-- 3 hadoop hadoop 14661395 2020-11-26 09:11 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-159-812 -rw-r--r-- 3 hadoop hadoop 5451995 2020-11-26 09:29 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-160-826 -rw-r--r-- 3 hadoop hadoop 9149301 2020-11-26 09:30 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-160-827 -rw-r--r-- 3 hadoop hadoop 4731543 2020-11-26 09:30 hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-161-827

-rw-r--r-- 3 hadoop hadoop 5950562 2020-11-26 09:00 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-17-1288 -rw-r--r-- 3 hadoop hadoop 8922364 2020-11-26 08:59 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-170-1287 -rw-r--r-- 3 hadoop hadoop 5898257 2020-11-26 09:00 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-170-1288 -rw-r--r-- 3 hadoop hadoop 8848292 2020-11-26 08:59 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-171-1287 -rw-r--r-- 3 hadoop hadoop 5893106 2020-11-26 09:00 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-171-1288 -rw-r--r-- 3 hadoop hadoop 8905617 2020-11-26 08:59 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-172-1287 -rw-r--r-- 3 hadoop hadoop 5800338 2020-11-26 09:00 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-172-1288 -rw-r--r-- 3 hadoop hadoop 8914099 2020-11-26 08:59 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-173-1287 -rw-r--r-- 3 hadoop hadoop 5773258 2020-11-26 09:00 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-173-1288 -rw-r--r-- 3 hadoop hadoop 8950742 2020-11-26 08:59 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-174-1287 -rw-r--r-- 3 hadoop hadoop 5829613 2020-11-26 09:00 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-174-1288 -rw-r--r-- 3 hadoop hadoop 8808161 2020-11-26 08:59 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-175-1287 -rw-r--r-- 3 hadoop hadoop 5910085 2020-11-26 09:00 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-175-1288 -rw-r--r-- 3 hadoop hadoop 8871508 2020-11-26 08:59 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-176-1287 -rw-r--r-- 3 hadoop hadoop 5896191 2020-11-26 09:00 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-176-1288 -rw-r--r-- 3 hadoop hadoop 8855378 2020-11-26 08:59 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-177-1287 -rw-r--r-- 3 hadoop hadoop 5857271 2020-11-26 09:00 hdfs://xxx/test.db/hdfs_table/day=2020-11-26/hour=08/minute=59/part-633fa7d2-8d80-4e4d-ab90-f8b77c792b40-177-1288

*来自志愿者整理的flink邮件归档



参考答案:

Hi, admin 结合这个 issue 和你的对比结果, 我觉得应该是这个bug,这个问题在最新的分支已经修复,今天社区cut branch了,你可以帮忙在1.12的分支或master的分支上验证下吗?

祝好, Leonard [1] https://github.com/apache/flink/tree/release-1.12 https://github.com/apache/flink/tree/release-1.12 [2] https://github.com/apache/flink/tree/master https://github.com/apache/flink/tree/master

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370023?spm=a2c6h.13066369.question.80.33bf585fPsJwlU



问题四:udtf collect方法 报NEP

Hi all, 我在使用udtf 调用collect方法的时候出现空指针异常,显示collector对象为null,第一次调用正常,第二次调用数据被标记了 “INSERT”是什么意思? https://imgchr.com/i/D0HuCj https://imgchr.com/i/D0Hm5Q udtf代码: https://imgchr.com/i/D0HeUg

*来自志愿者整理的flink邮件归档



参考答案:

+I 表示是一条 insert 的数据,其它类型的可以查看: https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/types/RowKind.java

其中 NPE 问题,你是用哪个版本的呢? 方便复制下 sql 和 udtf 吗,我看下能不能复现。 PS:“第一次调用正常” 是指? *来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370020?spm=a2c6h.13066369.question.79.33bf585fU5LOfS



问题五:flink1.11.2 一直不入hbase问题

我这边测试flink入hbase发现一直入不了hbase,也没有报错。我的具体环境如下:

Hbase版本1.1.2with hdp2.6.5

Hadoop版本2.7.3

Flink版本1.11.2

Hbasesink:

create table f_volteics_gm_all_h_hbase (

msisdn string,

INFO ROW ,

PRIMARY KEY (msisdn) NOT ENFORCED

) WITH (

'connector' = 'hbase-1.4.3',

'table-name' = 'f_volteics_gm_all_h',

'zookeeper.quorum' = '192.168.6.200:2181',

'sink.buffer-flush.max-size'='0',

'sink.buffer-flush.max-rows'='0'

处理逻辑从kafka读数据,写入hbase。如下:

我将我的程序打成jar包,在yarn上运行。结果没有报错,但一直没有数据入hbase。从监控页面上看bytes sent一直为0如下:

*来自志愿者整理的flink邮件归档



参考答案:

Flink 1.11.x 版本hbase的 identifier 都是 hbase-1.4, 不是 hbase-1.4.3, 应该会报错才对,可以确认下日志里也没有报错吗? *来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370019?spm=a2c6h.13066369.question.80.33bf585fxvBDuR

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
SQL 消息中间件 分布式计算
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
46 0
|
5月前
|
SQL Shell API
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
|
5月前
|
资源调度 监控 关系型数据库
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
|
5月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之水位线的设置方法是什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL Java Apache
实时计算 Flink版操作报错合集之使用parquet时,怎么解决报错:无法访问到java.uti.Arrays$ArrayList类的私有字段
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
Oracle 关系型数据库 Java
实时计算 Flink版操作报错合集之遇到了关于MySqIValidator类缺失的错误,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
SQL 存储 资源调度
实时计算 Flink版操作报错合集之启动项目时报错缺少MySqlValidator类,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
Java 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在使用批处理模式中使用flat_aggregate函数时报错,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
2月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
1385 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎

相关产品

  • 实时计算 Flink版