开发者社区> 问答> 正文

pyflink UDTF有问题想要求助?

定制UDTF想要拆分字符串、但是报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 毫无头绪、有大佬遇到过吗?谢谢! 好像是 udf 和 udtf 一起使用时出现的~下面有可复现的例子,谢谢

class myKerasMLP(ScalarFunction):

def eval(self, *args): ...

返回预测结果

return str(trueY[0][0]) + '|' + str(trueY[0][1])

注册UDF函数 myKerasMLP = udf(myKerasMLP(), input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.STRING())

class SplitStr(TableFunction): def eval(self, str_value): str_arr = str_value.split('|') yield str_arr[0], str_arr[1] yield str_arr[0], str_arr[1]

注册UDTF函数 splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.STRING()])

t_env.register_function('train_and_predict', myKerasMLP) t_env.register_function("splitStr", splitStr)

==================

t_env.sql_query(""" select A.hotime , A.before_ta , A.before_rssi , A.after_ta , A.after_rssil , A.nb_tath , A.nb_rssith , nbr_rssi , nbr_ta from (SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil, nb_tath, nb_rssith) predict FROM source) as A, LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta) """).insert_into("predict_sink")

==================== 报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 Traceback (most recent call last): File "C:/Users/Administrator.XTZ-02012091146/PycharmProjects/pythonProject/kerasTest/UdtfNtPredictPyFlink.py", line 280, in t_env.execute('NT重连预测参数') File "D:\tools\Python3.6.5\lib\site-packages\pyflink\table\table_environment.py", line 1057, in execute return JobExecutionResult(self._j_tenv.execute(job_name)) File "D:\tools\Python3.6.5\lib\site-packages\py4j\java_gateway.py", line 1286, in call answer, self.gateway_client, self.target_id, self.name) File "D:\tools\Python3.6.5\lib\site-packages\pyflink\util\exceptions.py", line 147, in deco return f(*a, **kw) File "D:\tools\Python3.6.5\lib\site-packages\py4j\protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o25.execute. : java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 at java.util.ArrayList.rangeCheck(ArrayList.java:657)

==================== 这段SQL可以执行 t_env.sql_query(""" SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil, nb_tath, nb_rssith) predict FROM source """).insert_into("print_table")


+I(37,14,-66,92,-74,24,-65,22.621065|-64.12096) +I(291,136,-76,136,-78,22,-65,19.479145|-65.958)

====================== 简单可复现的例子 ========================

=======================SQL 源================= /* Navicat MySQL Data Transfer

Source Server : localhost Source Server Version : 50717 Source Host : localhost:3306 Source Database : nufront-nt

Target Server Type : MYSQL Target Server Version : 50717 File Encoding : 65001

Date: 2021-03-13 14:23:41 */

SET FOREIGN_KEY_CHECKS=0;


-- Table structure for test


DROP TABLE IF EXISTS test; CREATE TABLE test ( hotime varchar(5) DEFAULT NULL, before_ta varchar(5) DEFAULT NULL, before_rssi varchar(10) DEFAULT NULL, after_ta varchar(5) DEFAULT NULL, after_rssil varchar(10) DEFAULT NULL, nb_tath varchar(5) DEFAULT NULL, nb_rssith varchar(10) DEFAULT NULL, predict varchar(50) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8;


-- Records of test


INSERT INTO test VALUES ('35', '8', '-62', '136', '-65', '20', '-65', '22.30014|-63.884907'); INSERT INTO test VALUES ('43', '8', '-71', '248', '-73', '20', '-65', '20.598848|-65.127464'); INSERT INTO test VALUES ('82', '216', '-74', '208', '-74', '20', '-65', '14.919615|-66.15158');

================== 程序 ===================

-- coding: utf-8 -

import logging import os

from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes from pyflink.table.udf import ScalarFunction, TableFunction, udf, udtf

env = StreamExecutionEnvironment.get_execution_environment() table_env = StreamTableEnvironment.create(env) env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)

设置该参数以使用 UDF

t_env.get_config().get_configuration().set_boolean("python.fn-execution.4memory.managed", True) t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", "80m")

class SplitStr(TableFunction): def eval(self, str_value): str_arr = str_value.split('|') yield str_arr[0], str_arr[1]

splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.STRING()]) t_env.register_function("splitStr", splitStr)

class SplitStr(TableFunction): def eval(self, str_value): str_arr = str_value.split('|') yield str_arr[0], str_arr[1]

class myKerasMLP(ScalarFunction): def eval(self, *args):

拼接参数

a = '' for u in args: a += u + "|" return a

myKerasMLP = udf(myKerasMLP(), input_types=[DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.STRING()) t_env.register_function('train_and_predict', myKerasMLP)

t_env.execute_sql(""" CREATE TABLE print_table ( hotime STRING , before_ta STRING , before_rssi STRING , after_ta STRING , after_rssil STRING , nb_tath STRING , nb_rssith STRING , predict STRING ) WITH ( 'connector' = 'print' ) """)

t_env.execute_sql(""" CREATE TABLE source ( hotime STRING , before_ta STRING , before_rssi STRING , after_ta STRING , after_rssil STRING , nb_tath STRING , nb_rssith STRING , predict STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/nufront-nt', 'table-name' = 'test', 'username' = 'root', 'password' = '123456' ) """)

t_env.execute_sql(""" CREATE TABLE predict_sink ( hotime STRING , before_ta STRING , before_rssi STRING , after_ta STRING , after_rssil STRING , nb_tath STRING , nb_rssith STRING , nbr_rssi STRING , nbr_ta STRING ) WITH ( 'connector' = 'print' ) """)

#######################################

可执行

#######################################

t_env.sql_query("""

select A.hotime ,

A.before_ta ,

A.before_rssi ,

A.after_ta ,

A.after_rssil ,

A.nb_tath ,

A.nb_rssith ,

nbr_rssi ,

nbr_ta

from (SELECT

hotime ,

before_ta ,

before_rssi ,

after_ta ,

after_rssil ,

nb_tath ,

nb_rssith ,

predict

FROM

source) as A,LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta)

""").insert_into("predict_sink")

#######################################

执行报错

#######################################

t_env.sql_query("""

select A.hotime ,

A.before_ta ,

A.before_rssi ,

A.after_ta ,

A.after_rssil ,

A.nb_tath ,

A.nb_rssith ,

nbr_rssi ,

nbr_ta

from (SELECT

hotime ,

before_ta ,

before_rssi ,

after_ta ,

after_rssil ,

nb_tath ,

nb_rssith ,

train_and_predict(nb_tath, nb_rssith) predict

FROM

source) as A,LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta)

""").insert_into("predict_sink")

####################################### ##可执行 ####################################### t_env.sql_query(""" SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(hotime, before_ta) predict FROM source """).insert_into("print_table")

t_env.execute('pyflink UDTF')*来自志愿者整理的flink邮件归档

展开
收起
moonlightdisco 2021-12-01 14:02:29 1044 0
1 条回答
写回答
取消 提交回答
  • 经过排查,这个确实一个bug。问题出在没有正确处理在sub-query中使用的python udf。我已经创建JIRA[1] 来记录这个问题了。目前的workaroud方案是使用Table API。 具体可以参考下面的代码:

    a = t_env.sql_query(""" SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(nb_tath, nb_rssith) predict FROM source """) result = a.join_lateral("splitStr(predict) as (nbr_rssi, nbr_ta)") [1] https://issues.apache.org/jira/browse/FLINK-21856*来自志愿者整理的flink邮件归档

    2021-12-01 14:16:03
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
大批量处理excel文件到ODPS中方案 立即下载
《Apache Flink-重新定义计算》PDF下载 立即下载
LEARNINGS USING SPARK STREAMING & DATAFRAMES FOR WALMART SEARCH 立即下载