定制UDTF想要拆分字符串、但是报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 毫无头绪、有大佬遇到过吗?谢谢! 好像是 udf 和 udtf 一起使用时出现的~下面有可复现的例子,谢谢
class myKerasMLP(ScalarFunction):
def eval(self, *args): ...
return str(trueY[0][0]) + '|' + str(trueY[0][1])
注册UDF函数 myKerasMLP = udf(myKerasMLP(), input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.STRING())
class SplitStr(TableFunction): def eval(self, str_value): str_arr = str_value.split('|') yield str_arr[0], str_arr[1] yield str_arr[0], str_arr[1]
注册UDTF函数 splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.STRING()])
t_env.register_function('train_and_predict', myKerasMLP) t_env.register_function("splitStr", splitStr)
==================
t_env.sql_query(""" select A.hotime , A.before_ta , A.before_rssi , A.after_ta , A.after_rssil , A.nb_tath , A.nb_rssith , nbr_rssi , nbr_ta from (SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil, nb_tath, nb_rssith) predict FROM source) as A, LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta) """).insert_into("predict_sink")
==================== 报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 Traceback (most recent call last): File "C:/Users/Administrator.XTZ-02012091146/PycharmProjects/pythonProject/kerasTest/UdtfNtPredictPyFlink.py", line 280, in t_env.execute('NT重连预测参数') File "D:\tools\Python3.6.5\lib\site-packages\pyflink\table\table_environment.py", line 1057, in execute return JobExecutionResult(self._j_tenv.execute(job_name)) File "D:\tools\Python3.6.5\lib\site-packages\py4j\java_gateway.py", line 1286, in call answer, self.gateway_client, self.target_id, self.name) File "D:\tools\Python3.6.5\lib\site-packages\pyflink\util\exceptions.py", line 147, in deco return f(*a, **kw) File "D:\tools\Python3.6.5\lib\site-packages\py4j\protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o25.execute. : java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 at java.util.ArrayList.rangeCheck(ArrayList.java:657)
==================== 这段SQL可以执行 t_env.sql_query(""" SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil, nb_tath, nb_rssith) predict FROM source """).insert_into("print_table")
====================== 简单可复现的例子 ========================
=======================SQL 源================= /* Navicat MySQL Data Transfer
Source Server : localhost Source Server Version : 50717 Source Host : localhost:3306 Source Database : nufront-nt
Target Server Type : MYSQL Target Server Version : 50717 File Encoding : 65001
Date: 2021-03-13 14:23:41 */
SET FOREIGN_KEY_CHECKS=0;
-- Table structure for test
DROP TABLE IF EXISTS test
; CREATE TABLE test
( hotime
varchar(5) DEFAULT NULL, before_ta
varchar(5) DEFAULT NULL, before_rssi
varchar(10) DEFAULT NULL, after_ta
varchar(5) DEFAULT NULL, after_rssil
varchar(10) DEFAULT NULL, nb_tath
varchar(5) DEFAULT NULL, nb_rssith
varchar(10) DEFAULT NULL, predict
varchar(50) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- Records of test
INSERT INTO test
VALUES ('35', '8', '-62', '136', '-65', '20', '-65', '22.30014|-63.884907'); INSERT INTO test
VALUES ('43', '8', '-71', '248', '-73', '20', '-65', '20.598848|-65.127464'); INSERT INTO test
VALUES ('82', '216', '-74', '208', '-74', '20', '-65', '14.919615|-66.15158');
================== 程序 ===================
import logging import os
from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes from pyflink.table.udf import ScalarFunction, TableFunction, udf, udtf
env = StreamExecutionEnvironment.get_execution_environment() table_env = StreamTableEnvironment.create(env) env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.4memory.managed", True) t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", "80m")
class SplitStr(TableFunction): def eval(self, str_value): str_arr = str_value.split('|') yield str_arr[0], str_arr[1]
splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.STRING()]) t_env.register_function("splitStr", splitStr)
class SplitStr(TableFunction): def eval(self, str_value): str_arr = str_value.split('|') yield str_arr[0], str_arr[1]
class myKerasMLP(ScalarFunction): def eval(self, *args):
a = '' for u in args: a += u + "|" return a
myKerasMLP = udf(myKerasMLP(), input_types=[DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.STRING()) t_env.register_function('train_and_predict', myKerasMLP)
t_env.execute_sql(""" CREATE TABLE print_table ( hotime STRING , before_ta STRING , before_rssi STRING , after_ta STRING , after_rssil STRING , nb_tath STRING , nb_rssith STRING , predict STRING ) WITH ( 'connector' = 'print' ) """)
t_env.execute_sql(""" CREATE TABLE source ( hotime STRING , before_ta STRING , before_rssi STRING , after_ta STRING , after_rssil STRING , nb_tath STRING , nb_rssith STRING , predict STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/nufront-nt', 'table-name' = 'test', 'username' = 'root', 'password' = '123456' ) """)
t_env.execute_sql(""" CREATE TABLE predict_sink ( hotime STRING , before_ta STRING , before_rssi STRING , after_ta STRING , after_rssil STRING , nb_tath STRING , nb_rssith STRING , nbr_rssi STRING , nbr_ta STRING ) WITH ( 'connector' = 'print' ) """)
#######################################
#######################################
#######################################
#######################################
####################################### ##可执行 ####################################### t_env.sql_query(""" SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(hotime, before_ta) predict FROM source """).insert_into("print_table")
t_env.execute('pyflink UDTF')*来自志愿者整理的flink邮件归档
经过排查,这个确实一个bug。问题出在没有正确处理在sub-query中使用的python udf。我已经创建JIRA[1] 来记录这个问题了。目前的workaroud方案是使用Table API。 具体可以参考下面的代码:
a = t_env.sql_query(""" SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(nb_tath, nb_rssith) predict FROM source """) result = a.join_lateral("splitStr(predict) as (nbr_rssi, nbr_ta)") [1] https://issues.apache.org/jira/browse/FLINK-21856*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。