flink sql 可不可以实现 过滤某种操作事件
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
给你看个flink sql 过滤列子吧:
public class UdtfJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings streamSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment streamTabelEnv = StreamTableEnvironment.create(streamEnv, streamSettings);
KafkaTabelSource kafkaTabelSource = new KafkaTabelSource();
streamTabelEnv.registerTableSource("kafkaDataStream", kafkaTabelSource);//使用自定义TableSource
//注册自定义函数定义三个关键字:"KeyWord","WARNING","illegal"
streamTabelEnv.registerFunction("CountKEY", new KyeWordCount(new String[]{"KeyWord","WARNING","illegal"}));
//编写SQL
Table wordWithCount = streamTabelEnv.sqlQuery("SELECT key,COUNT(countv) AS countsum FROM kafkaDataStream LEFT JOIN LATERAL TABLE(CountKEY(response)) as T(key, countv) ON TRUE GROUP BY key");
//直接输出Retract流
streamTabelEnv.toRetractStream(wordWithCount, Row.class).print();
streamTabelEnv.execute("BLINK STREAMING QUERY");
}
}
评论
全部评论 (0)
使用自定义函数实现关键字过滤统计 自定义表函数(UDTF) 与自定义的标量函数相似,自定义表函数将零,一个或多个标量值作为输入参数。 但是,与标量函数相比,它可以返回任意数量的行作为输出,而不是单个值。
为了定义表函数,必须扩展基类TableFunction并实现评估方法。 表函数的行为由其评估方法确定。 必须将评估方法声明为公开并命名为eval。 通过实现多个名为eval的方法,可以重载TableFunction。 评估方法的参数类型确定表函数的所有有效参数。 返回表的类型由TableFunction的通用类型确定。 评估方法使用 collect(T)方法发出输出行。
自定义聚合函数(UDAGGs)将一个表聚合为一个标量值。
聚合函数适合用于累计的工作,假设您有一个包含饮料数据的表。该表由三列组成:id、name和price,共计5行。想象一下,你需要找到所有饮料的最高价格。执行max()聚合。您需要检查5行中的每一行,结果将是单个数值。
用户定义的聚合函数是通过扩展AggregateFunction类来实现的。AggregateFunction的工作原理如下。首先,它需要一个累加器,这个累加器是保存聚合中间结果的数据结构。通过调用AggregateFunction的createAccumulator()方法来创建一个空的累加器。随后,对每个输入行调用该函数的accumulator()方法来更新累加器。处理完所有行之后,将调用函数的getValue()方法来计算并返回最终结果。
用户定义的表聚合函数(UDTAGGs)将一个表(具有一个或多个属性的一个或多个行)聚合到具有多行和多列的结果表。
评论
全部评论 (0)
使用自定义函数实现关键字过滤统计 自定义表函数(UDTF) 与自定义的标量函数相似,自定义表函数将零,一个或多个标量值作为输入参数。 但是,与标量函数相比,它可以返回任意数量的行作为输出,而不是单个值。
为了定义表函数,必须扩展基类TableFunction并实现评估方法。 表函数的行为由其评估方法确定。 必须将评估方法声明为公开并命名为eval。 通过实现多个名为eval的方法,可以重载TableFunction。 评估方法的参数类型确定表函数的所有有效参数。 返回表的类型由TableFunction的通用类型确定。 评估方法使用 collect(T)方法发出输出行。 定义一个过滤字符串 记下关键字 的自定义表函数
评论
全部评论 (0)
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。
你好,我是AI助理
可以解答问题、推荐解决方案等
评论
全部评论 (0)