Flink问题之提交作业失败如何解决

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:Flink 中 shuffle Partitioner 和 rebalance partitoner 有什么区别?


问题如上,辛苦大神解答下*来自志愿者整理的flink邮件归档


参考回答:

ShufflePartitioner:

public int selectChannel(SerializationDelegate record) { return random.nextInt(numberOfChannels); }

RebalancePartitioner public int selectChannel(SerializationDelegate record) { nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels; return nextChannelToSendTo; }

一个随机,一个严格 round-robin。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359357?spm=a2c6h.13262185.0.0.54e839c0D2mgIx


问题二:关于pyflink LATERAL TABLE 问题请教


定制UDTF想要拆分字符串、但是报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 毫无头绪、有大佬遇到过吗?谢谢!

class myKerasMLP(ScalarFunction):

def eval(self, *args): ...

返回预测结果

return str(trueY[0][0]) + '|' + str(trueY[0][1])

注册UDF函数 myKerasMLP = udf(myKerasMLP(), input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.STRING())

class SplitStr(TableFunction): def eval(self, str_value): str_arr = str_value.split('|') yield str_arr[0], str_arr[1] yield str_arr[0], str_arr[1]

注册UDTF函数 splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.STRING()])

t_env.register_function('train_and_predict', myKerasMLP) t_env.register_function("splitStr", splitStr)

==================

t_env.sql_query(""" select A.hotime , A.before_ta , A.before_rssi , A.after_ta , A.after_rssil , A.nb_tath , A.nb_rssith , nbr_rssi , nbr_ta from (SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil, nb_tath, nb_rssith) predict FROM source) as A, LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta) """).insert_into("predict_sink")

==================== 报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 Traceback (most recent call last): File "C:/Users/Administrator.XTZ-02012091146/PycharmProjects/pythonProject/kerasTest/UdtfNtPredictPyFlink.py", line 280, in t_env.execute('NT重连预测参数') File "D:\tools\Python3.6.5\lib\site-packages\pyflink\table\table_environment.py", line 1057, in execute return JobExecutionResult(self._j_tenv.execute(job_name)) File "D:\tools\Python3.6.5\lib\site-packages\py4j\java_gateway.py", line 1286, in call answer, self.gateway_client, self.target_id, self.name) File "D:\tools\Python3.6.5\lib\site-packages\pyflink\util\exceptions.py", line 147, in deco return f(*a, **kw) File "D:\tools\Python3.6.5\lib\site-packages\py4j\protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o25.execute. : java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 at java.util.ArrayList.rangeCheck(ArrayList.java:657)

==================== 这段SQL可以执行 t_env.sql_query(""" SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil, nb_tath, nb_rssith) predict FROM source """).insert_into("print_table")


+I(37,14,-66,92,-74,24,-65,22.621065|-64.12096) +I(291,136,-76,136,-78,22,-65,19.479145|-65.958) ------------------------------*来自志愿者整理的flink邮件归档


参考回答:

apache-flink 1.11.1


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359385?spm=a2c6h.13262185.0.0.54e839c0LrOat6


问题三:1.12 yarn-per-job提交作业失败怎么办?


> 我在用这个命令提交的时候会报

> flink Application rejected by queue placement policy

> 这个应该是没有指定queue

> 但是我在命令中加了-yqu 这个参数,在web界面看quene 的时候,不是我指定的,而是default 。

> 另外,我用旧命令提交作业可以正常运行。请问有人碰到过这个问题吗。*来自志愿者整理的flink邮件归档


参考回答:

从 Flink 1.12 开始,-yqu 等 YARN 相关的参数被移除了,可以使用 [1] 来代替。

[1 ]https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#yarn-application-queue


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359388?spm=a2c6h.13262185.0.0.54e839c0LrOat6


问题四:Pyflink如何对接HBase?


请问pyflink如何对接hbase,有没有案例?*来自志愿者整理的flink邮件归档


参考回答:

1)在PyFlink Table API中可以使用所有SQL中支持的connector,所以HBase connector也自然支持,具体使用方式可以看一下文档: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#how-to-use-connectors 2)HBase connector的使用方式可以看一下: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hbase.html


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359389?spm=a2c6h.13262185.0.0.54e839c0ROdVpg


问题五:PyFlink UDTF 运行一段时间后报 NullPointerException怎么办?


PyFlink UDTF 运行一段时间后报 NullPointerException怎么办?


参考回答:

这个问题应该和这个JIRA有关系:https://issues.apache.org/jira/browse/FLINK-21434

目前已经在master和1.12.3上修复了,但是1.12.3还没有release。你要不cherry-pick这个fix,自己build一个版本,验证一下?


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359392?spm=a2c6h.13262185.0.0.54e839c0ROdVpg

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
消息中间件 分布式计算 大数据
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
534 0
|
消息中间件 分布式计算 大数据
大数据-128 - Flink 并行度设置 细节详解 全局、作业、算子、Slot
大数据-128 - Flink 并行度设置 细节详解 全局、作业、算子、Slot
895 0
|
Oracle 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何从savepoint重新启动作业
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
机器学习/深度学习 人工智能 运维
美团 Flink 大作业部署问题之Flink在生态技术演进上有什么主要方向
美团 Flink 大作业部署问题之Flink在生态技术演进上有什么主要方向
|
监控 Serverless Apache
美团 Flink 大作业部署问题之如何体现Flink在业界的影响力
美团 Flink 大作业部署问题之如何体现Flink在业界的影响力
|
监控 Serverless 数据库
美团 Flink 大作业部署问题之端云联调并将流量恢复到云端实例如何结束
美团 Flink 大作业部署问题之端云联调并将流量恢复到云端实例如何结束
|
监控 Java Serverless
美团 Flink 大作业部署问题之想在Serverless平台上实时查看Spring Boot应用的日志要怎么操作
美团 Flink 大作业部署问题之想在Serverless平台上实时查看Spring Boot应用的日志要怎么操作
|
Java 流计算
美团 Flink 大作业部署问题之files-to-delete 的执行为什么能够异步进行呢
美团 Flink 大作业部署问题之files-to-delete 的执行为什么能够异步进行呢
|
缓存 流计算
美团 Flink 大作业部署问题之根据已存在的 Checkpoint 副本进行增量的副本制作如何实现
美团 Flink 大作业部署问题之根据已存在的 Checkpoint 副本进行增量的副本制作如何实现
124 0
|
分布式计算 流计算
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的
157 0

相关产品

  • 实时计算 Flink版