Flink问题之提交作业失败如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:Flink 中 shuffle Partitioner 和 rebalance partitoner 有什么区别?


问题如上,辛苦大神解答下*来自志愿者整理的flink邮件归档


参考回答:

ShufflePartitioner:

public int selectChannel(SerializationDelegate record) { return random.nextInt(numberOfChannels); }

RebalancePartitioner public int selectChannel(SerializationDelegate record) { nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels; return nextChannelToSendTo; }

一个随机,一个严格 round-robin。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359357?spm=a2c6h.13262185.0.0.54e839c0D2mgIx


问题二:关于pyflink LATERAL TABLE 问题请教


定制UDTF想要拆分字符串、但是报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 毫无头绪、有大佬遇到过吗?谢谢!

class myKerasMLP(ScalarFunction):

def eval(self, *args): ...

返回预测结果

return str(trueY[0][0]) + '|' + str(trueY[0][1])

注册UDF函数 myKerasMLP = udf(myKerasMLP(), input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.STRING())

class SplitStr(TableFunction): def eval(self, str_value): str_arr = str_value.split('|') yield str_arr[0], str_arr[1] yield str_arr[0], str_arr[1]

注册UDTF函数 splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.STRING()])

t_env.register_function('train_and_predict', myKerasMLP) t_env.register_function("splitStr", splitStr)

==================

t_env.sql_query(""" select A.hotime , A.before_ta , A.before_rssi , A.after_ta , A.after_rssil , A.nb_tath , A.nb_rssith , nbr_rssi , nbr_ta from (SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil, nb_tath, nb_rssith) predict FROM source) as A, LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta) """).insert_into("predict_sink")

==================== 报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 Traceback (most recent call last): File "C:/Users/Administrator.XTZ-02012091146/PycharmProjects/pythonProject/kerasTest/UdtfNtPredictPyFlink.py", line 280, in t_env.execute('NT重连预测参数') File "D:\tools\Python3.6.5\lib\site-packages\pyflink\table\table_environment.py", line 1057, in execute return JobExecutionResult(self._j_tenv.execute(job_name)) File "D:\tools\Python3.6.5\lib\site-packages\py4j\java_gateway.py", line 1286, in call answer, self.gateway_client, self.target_id, self.name) File "D:\tools\Python3.6.5\lib\site-packages\pyflink\util\exceptions.py", line 147, in deco return f(*a, **kw) File "D:\tools\Python3.6.5\lib\site-packages\py4j\protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o25.execute. : java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 at java.util.ArrayList.rangeCheck(ArrayList.java:657)

==================== 这段SQL可以执行 t_env.sql_query(""" SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil, nb_tath, nb_rssith) predict FROM source """).insert_into("print_table")


+I(37,14,-66,92,-74,24,-65,22.621065|-64.12096) +I(291,136,-76,136,-78,22,-65,19.479145|-65.958) ------------------------------*来自志愿者整理的flink邮件归档


参考回答:

apache-flink 1.11.1


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359385?spm=a2c6h.13262185.0.0.54e839c0LrOat6


问题三:1.12 yarn-per-job提交作业失败怎么办?


> 我在用这个命令提交的时候会报

> flink Application rejected by queue placement policy

> 这个应该是没有指定queue

> 但是我在命令中加了-yqu 这个参数,在web界面看quene 的时候,不是我指定的,而是default 。

> 另外,我用旧命令提交作业可以正常运行。请问有人碰到过这个问题吗。*来自志愿者整理的flink邮件归档


参考回答:

从 Flink 1.12 开始,-yqu 等 YARN 相关的参数被移除了,可以使用 [1] 来代替。

[1 ]https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#yarn-application-queue


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359388?spm=a2c6h.13262185.0.0.54e839c0LrOat6


问题四:Pyflink如何对接HBase?


请问pyflink如何对接hbase,有没有案例?*来自志愿者整理的flink邮件归档


参考回答:

1)在PyFlink Table API中可以使用所有SQL中支持的connector,所以HBase connector也自然支持,具体使用方式可以看一下文档: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#how-to-use-connectors 2)HBase connector的使用方式可以看一下: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hbase.html


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359389?spm=a2c6h.13262185.0.0.54e839c0ROdVpg


问题五:PyFlink UDTF 运行一段时间后报 NullPointerException怎么办?


PyFlink UDTF 运行一段时间后报 NullPointerException怎么办?


参考回答:

这个问题应该和这个JIRA有关系:https://issues.apache.org/jira/browse/FLINK-21434

目前已经在master和1.12.3上修复了,但是1.12.3还没有release。你要不cherry-pick这个fix,自己build一个版本,验证一下?


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359392?spm=a2c6h.13262185.0.0.54e839c0ROdVpg

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
9天前
|
资源调度 监控 关系型数据库
在Flink CDC作业提交过程中,出现超时问题可能与多种因素有关
【2月更文挑战第8天】在Flink CDC作业提交过程中,出现超时问题可能与多种因素有关
103 11
|
8天前
|
SQL Java API
实时计算 Flink版产品使用合集之什么情况作业会被认为是有限流作业呢二者该怎么区分
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
21 0
|
1天前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版操作报错之当将两个连接器放在同一个作业中时,MySQL作业无法启动,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6天前
|
SQL Oracle 关系型数据库
实时计算 Flink版操作报错合集之连接器换成2.4.2之后,mysql作业一直报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
20 3
|
7天前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用合集之在进行数据同步作业时,有什么方法可以用于检查源端和目标端的数据一致性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
15 0
|
8天前
|
SQL 关系型数据库 数据处理
实时计算 Flink版产品使用合集之作业原本只配置了采集一张表,现在想增加一张表,这张新增的表将会增量采集还是重新全量采集
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
31 0
|
8天前
|
关系型数据库 MySQL Java
实时计算 Flink版产品使用合集之作业的检查点总是超时失败,该怎么解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
30 0
|
8天前
|
消息中间件 SQL API
实时计算 Flink版产品使用合集之如何配置让CDC作业不去同步无主键的表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
19 2
|
9天前
|
消息中间件 Java Kafka
Apache Hudi + Flink作业运行指南
Apache Hudi + Flink作业运行指南
180 1
|
9天前
|
SQL 消息中间件 Kafka
Flink报错问题之SQL作业中调用UDTF报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

热门文章

最新文章

相关产品

  • 实时计算 Flink版