问题一:Flink1.11执行sql当判空使用<> null,程序直接结束怎么办?
环境:flink1.11: 代码如下: val dataStreamEnv: StreamExecutionEnvironment = FlinkUtils.streamEnv val tableEnv: StreamTableEnvironment = FlinkUtils.streamTableEnv val sql = """SELECT CASE WHEN kafka_table.log_type = 'detect' AND kafka_table.event_level = 3 THEN 3 ELSE 0 END as weight, kafka_table.src_ip as kafka_table_src_ip_0, kafka_table.dev_type as kafka_table_dev_type_0 FROM kafka_table WHERE kafka_table.event_time >= unix_timestamp() - 60 * 60 * 5 AND kafka_table.src_ip <> null AND kafka_table.event_level > 0 AND kafka_table.dev_type = 1
val data:Table = tableEnv.sqlQuery(sql) val result = tableEnv.toRetractStreamRow result.print("====>") """
现象:如果判空条件为kafka_table.src_ip <> null,则程序直接结束,没有任何报错,而使用kafka_table.src_ip is not null 可以正常运行并一直产生数据。
疑问:我明白is not null是正确的用法,问题是用<> null 为什么程序会直接结束而且没有任何报错,感觉像是当作批处理去运行了。*来自志愿者整理的flink邮件归档
参考回答:
这是因为kafka_table.src_ip <>
null是恒等于false的,所以这个计算过程就被优化掉了,最后你的作业的逻辑就变成了一个单纯的values,里面没有一条数据。
关于为什么kafka_table.src_ip <> null,这个可以了解一下关于three-value-logic[1].
简单来说,在标准SQL里面,boolean类型是有三种值的,正常的= <>这种算子跟null比较的时候,结果都是unknown,
然后这个在filter条件里面会被视作false。
[1] https://modern-sql.com/concept/three-valued-logic
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359292?spm=a2c6h.13262185.0.0.51804c79pMhZZN
问题二:关于statement输出结果疑问
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view("simple_source", table)
table_env.execute_sql("""
CREATE TABLE first_sink_table (
id BIGINT,
data VARCHAR
) WITH (
'connector' = 'print'
)
""")
table_env.execute_sql("""
CREATE TABLE second_sink_table (
id BIGINT,
data VARCHAR
) WITH (
'connector' = 'print'
)
""")
‘# 创建一个statement对象
statement_set = table_env.create_statement_set()
‘# 使用TABLE API 将table表插进first_sink_table表里面
statement_set.add_insert("first_sink_table", table)
‘# 使用SQL将table表插进second_sink_table表里面
statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source")
’# 执行查询
statement_set.execute().wait()我执行上面脚本之后,输出以下的结果。在我理解输出内容的应该是1,2,1,2;而不是1,1,2,2。4> +I(1,Hi)
4> +I(1,Hi)
4> +I(2,Hello)
4> +I(2,Hello)*来自志愿者整理的flink邮件归档
参考回答:
奥,那你理解错了。这里面其实细分成2种情况: - sink1和sink2,通过operator chain之后,和source节点位于同一个物理节点:由于执行的时候,在一个JVM里,每处理一条数据,先发送给其中一个sink,再发送给另外一个sink。然后处理下一条数据 - sink1 和sink2,和source节点处于不同一个物理节点:这个时候是纯异步的,每处理一条数据,会分别通过网络发送给sink1和sink2,同时网络有buffer,所以sink1和sink2收到数据的顺序是完全不确定的。
但是不管是上面哪种情况,都不是先把数据全部发送给其中一个sink;再全部发送给第二个sink。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359295?spm=a2c6h.13262185.0.0.51804c79pMhZZN
问题三:pyflink UDTF有问题想要求助
定制UDTF想要拆分字符串、但是报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 毫无头绪、有大佬遇到过吗?谢谢! 好像是 udf 和 udtf 一起使用时出现的~下面有可复现的例子,谢谢
class myKerasMLP(ScalarFunction):
def eval(self, *args): ...
返回预测结果
return str(trueY[0][0]) + '|' + str(trueY[0][1])
注册UDF函数 myKerasMLP = udf(myKerasMLP(), input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.STRING())
class SplitStr(TableFunction): def eval(self, str_value): str_arr = str_value.split('|') yield str_arr[0], str_arr[1] yield str_arr[0], str_arr[1]
注册UDTF函数 splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.STRING()])
t_env.register_function('train_and_predict', myKerasMLP) t_env.register_function("splitStr", splitStr)
==================
t_env.sql_query(""" select A.hotime , A.before_ta , A.before_rssi , A.after_ta , A.after_rssil , A.nb_tath , A.nb_rssith , nbr_rssi , nbr_ta from (SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil, nb_tath, nb_rssith) predict FROM source) as A, LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta) """).insert_into("predict_sink")
==================== 报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 Traceback (most recent call last): File "C:/Users/Administrator.XTZ-02012091146/PycharmProjects/pythonProject/kerasTest/UdtfNtPredictPyFlink.py", line 280, in t_env.execute('NT重连预测参数') File "D:\tools\Python3.6.5\lib\site-packages\pyflink\table\table_environment.py", line 1057, in execute return JobExecutionResult(self._j_tenv.execute(job_name)) File "D:\tools\Python3.6.5\lib\site-packages\py4j\java_gateway.py", line 1286, in call answer, self.gateway_client, self.target_id, self.name) File "D:\tools\Python3.6.5\lib\site-packages\pyflink\util\exceptions.py", line 147, in deco return f(*a, **kw) File "D:\tools\Python3.6.5\lib\site-packages\py4j\protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o25.execute. : java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 at java.util.ArrayList.rangeCheck(ArrayList.java:657)
==================== 这段SQL可以执行 t_env.sql_query(""" SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil, nb_tath, nb_rssith) predict FROM source """).insert_into("print_table")
+I(37,14,-66,92,-74,24,-65,22.621065|-64.12096) +I(291,136,-76,136,-78,22,-65,19.479145|-65.958)
====================== 简单可复现的例子 ========================
=======================SQL 源================= /* Navicat MySQL Data Transfer
Source Server : localhost Source Server Version : 50717 Source Host : localhost:3306 Source Database : nufront-nt
Target Server Type : MYSQL Target Server Version : 50717 File Encoding : 65001
Date: 2021-03-13 14:23:41 */
SET FOREIGN_KEY_CHECKS=0;
-- Table structure for test
DROP TABLE IF EXISTS test
; CREATE TABLE test
( hotime
varchar(5) DEFAULT NULL, before_ta
varchar(5) DEFAULT NULL, before_rssi
varchar(10) DEFAULT NULL, after_ta
varchar(5) DEFAULT NULL, after_rssil
varchar(10) DEFAULT NULL, nb_tath
varchar(5) DEFAULT NULL, nb_rssith
varchar(10) DEFAULT NULL, predict
varchar(50) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- Records of test
INSERT INTO test
VALUES ('35', '8', '-62', '136', '-65', '20', '-65', '22.30014|-63.884907'); INSERT INTO test
VALUES ('43', '8', '-71', '248', '-73', '20', '-65', '20.598848|-65.127464'); INSERT INTO test
VALUES ('82', '216', '-74', '208', '-74', '20', '-65', '14.919615|-66.15158');
================== 程序 ===================
-- coding: utf-8 -
import logging import os
from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes from pyflink.table.udf import ScalarFunction, TableFunction, udf, udtf
env = StreamExecutionEnvironment.get_execution_environment() table_env = StreamTableEnvironment.create(env) env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
设置该参数以使用 UDF
t_env.get_config().get_configuration().set_boolean("python.fn-execution.4memory.managed", True) t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", "80m")
class SplitStr(TableFunction): def eval(self, str_value): str_arr = str_value.split('|') yield str_arr[0], str_arr[1]
splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.STRING()]) t_env.register_function("splitStr", splitStr)
class SplitStr(TableFunction): def eval(self, str_value): str_arr = str_value.split('|') yield str_arr[0], str_arr[1]
class myKerasMLP(ScalarFunction): def eval(self, *args):
拼接参数
a = '' for u in args: a += u + "|" return a
myKerasMLP = udf(myKerasMLP(), input_types=[DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.STRING()) t_env.register_function('train_and_predict', myKerasMLP)
t_env.execute_sql(""" CREATE TABLE print_table ( hotime STRING , before_ta STRING , before_rssi STRING , after_ta STRING , after_rssil STRING , nb_tath STRING , nb_rssith STRING , predict STRING ) WITH ( 'connector' = 'print' ) """)
t_env.execute_sql(""" CREATE TABLE source ( hotime STRING , before_ta STRING , before_rssi STRING , after_ta STRING , after_rssil STRING , nb_tath STRING , nb_rssith STRING , predict STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/nufront-nt', 'table-name' = 'test', 'username' = 'root', 'password' = '123456' ) """)
t_env.execute_sql(""" CREATE TABLE predict_sink ( hotime STRING , before_ta STRING , before_rssi STRING , after_ta STRING , after_rssil STRING , nb_tath STRING , nb_rssith STRING , nbr_rssi STRING , nbr_ta STRING ) WITH ( 'connector' = 'print' ) """)
#######################################
可执行
#######################################
t_env.sql_query("""
select A.hotime ,
A.before_ta ,
A.before_rssi ,
A.after_ta ,
A.after_rssil ,
A.nb_tath ,
A.nb_rssith ,
nbr_rssi ,
nbr_ta
from (SELECT
hotime ,
before_ta ,
before_rssi ,
after_ta ,
after_rssil ,
nb_tath ,
nb_rssith ,
predict
FROM
source) as A,LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta)
""").insert_into("predict_sink")
#######################################
执行报错
#######################################
t_env.sql_query("""
select A.hotime ,
A.before_ta ,
A.before_rssi ,
A.after_ta ,
A.after_rssil ,
A.nb_tath ,
A.nb_rssith ,
nbr_rssi ,
nbr_ta
from (SELECT
hotime ,
before_ta ,
before_rssi ,
after_ta ,
after_rssil ,
nb_tath ,
nb_rssith ,
train_and_predict(nb_tath, nb_rssith) predict
FROM
source) as A,LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta)
""").insert_into("predict_sink")
####################################### ##可执行 ####################################### t_env.sql_query(""" SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(hotime, before_ta) predict FROM source """).insert_into("print_table")
t_env.execute('pyflink UDTF')*来自志愿者整理的flink邮件归档
参考回答:
经过排查,这个确实一个bug。问题出在没有正确处理在sub-query中使用的python udf。我已经创建JIRA[1] 来记录这个问题了。目前的workaroud方案是使用Table API。 具体可以参考下面的代码:
a = t_env.sql_query(""" SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(nb_tath, nb_rssith) predict FROM source """) result = a.join_lateral("splitStr(predict) as (nbr_rssi, nbr_ta)") [1] https://issues.apache.org/jira/browse/FLINK-21856
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359298?spm=a2c6h.13262185.0.0.51804c79pMhZZN
问题四:Flink1.11如何实现Tumble Window后基于event time倒序取第一条作统计?
Hi 大家好 现在想对5分钟的kafka数据开窗,因为是DTS同步消息数据,会有update 和 delete,所以需要对相同user_id的数据根据事件时间倒序第一条,统计最后一次status(状态字段)共有多少人。 marketingMapDS: DataStream[(String, String, Long)] | tEnv.createTemporaryView("test", marketingMapDS,"status","status","status", "upd_user_id", $"upd_time".rowtime) val resultSQL = """ |SELECT t.status, | COUNT(t.upd_user_id) as num |FROM ( | SELECT *, | ROW_NUMBER() OVER (PARTITION BY upd_user_id ORDER BY upd_time DESC) as row_num | FROM test |) t |WHERE t.row_num = 1 |GROUP BY t.status, TUMBLE(t.upd_time, INTERVAL '5' MINUTE) |""".stripMargin val table2 = tEnv.sqlQuery(resultSQL) val resultDS = tEnv.toRetractStreamRow |
这样写后会报以下错: | Exception in thread "main" org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[upd_user_id], orderBy=[upd_time DESC], select=[status, upd_user_id, upd_time]) |
所以想实现该需求,请问还可以怎么实现。。。
TABLE API 可以实现 类似 ROW_NUMBER() OVER 这样功能吗? | val table = tEnv.fromDataStream(marketingMapDS, "status","status","status", "upd_user_id", "updtime".rowtime).window(Tumbleover5.millison"updtime".rowtime).window(Tumbleover5.millison"upd_time".rowtime) .window(Tumble over 5.millis on "upd_time" as "w") .groupBy($"w") ??? |
Flink新手一个。。。请大佬指点~*来自志愿者整理的flink邮件归档
参考回答:
GroupWindowAggregate不支持update或者delete的datasource。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359300?spm=a2c6h.13262185.0.0.51804c79pMhZZN
问题五:Flink sql 如何实现全局row_number()分组排序?
在做实时数仓的时候,有需求要使用flink sql实现全局的row_number(),请教下各位有啥方案吗?
目前想的是,将流进行row number处理后存储到hbase中,然后每次处理流数据都和hbase进行关联,row_number处理后将最新结果存入hbase中,即通过对hbase的实时读写实现全局row_number(). 请问以上方法可行不,,实时读hbase关联,然后在写入最新数据到hbase,效率会有问题吗,这样能满足实时的需求吗?*来自志愿者整理的flink邮件归档
参考回答:
直接 SQL Top-N 即可: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359314?spm=a2c6h.13262185.0.0.54e839c0D2mgIx