Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文主要介绍Flink 的3种常用的operator(keyby、reduce和Aggregations)及以具体可运行示例进行说明。
本文除了maven依赖外,没有其他依赖。
本专题分为五篇,即:
【flink番外篇】1、flink的23种常用算子介绍及详细示例(1)- map、flatmap和filter
【flink番外篇】1、flink的23种常用算子介绍及详细示例(2)- keyby、reduce和Aggregations
【flink番外篇】1、flink的23种常用算子介绍及详细示例(3)-window、distinct、join等
【flink番外篇】1、flink的23种常用算子介绍及详细示例(4)- union、window join、connect、outputtag、cache、iterator、project
【flink番外篇】1、flink的23种常用算子介绍及详细示例(完整版)
一、Flink的23种算子说明及示例
本文示例中使用的maven依赖和java bean 参考本专题的第一篇中的maven和java bean。
6、KeyBy
DataStream → KeyedStream
按照指定的key来对流中的数据进行分组
KeyBy 在逻辑上是基于 key 对流进行分区。在内部,它使用 hash 函数对流进行分区。它返回 KeyedDataStream 数据流。将同一Key的数据放到同一个分区。
分区结果和KeyBy下游算子的并行度强相关。如下游算子只有一个并行度,不管怎么分,都会分到一起。
对于POJO类型,KeyBy可以通过keyBy(fieldName)指定字段进行分区。
对于Tuple类型,KeyBy可以通过keyBy(fieldPosition)指定字段进行分区。
对于一般类型,如上,KeyBy可以通过keyBy(new KeySelector {…})指定字段进行分区。
importjava.util.Arrays; importorg.apache.flink.api.common.functions.MapFunction; importorg.apache.flink.api.common.typeinfo.Types; importorg.apache.flink.api.java.functions.KeySelector; importorg.apache.flink.api.java.tuple.Tuple; importorg.apache.flink.api.java.tuple.Tuple2; importorg.apache.flink.streaming.api.datastream.DataStreamSource; importorg.apache.flink.streaming.api.datastream.KeyedStream; importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment; importorg.datastreamapi.User; /*** @author alanchan**/publicclassTestKeyByDemo { publicstaticvoidmain(String[] args) throwsException { StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); // env.setParallelism(4);// 设置数据分区数量keyByFunction6(env); env.execute(); } // 构造User数据源publicstaticDataStreamSourcesource(StreamExecutionEnvironmentenv) { DataStreamSourcesource=env.fromCollection(Arrays.asList( newUser(1, "alan1", "1", "1@1.com", 12, 1000), newUser(2, "alan2", "2", "2@2.com", 19, 200), newUser(3, "alan1", "3", "3@3.com", 28, 1500), newUser(5, "alan1", "5", "5@5.com", 15, 500), newUser(4, "alan2", "4", "4@4.com", 30, 400))); returnsource; } // 按照name进行keyby 对于POJO类型,KeyBy可以通过keyBy(fieldName)指定字段进行分区publicstaticvoidkeyByFunction1(StreamExecutionEnvironmentenv) throwsException { DataStreamSourcesource=source(env); KeyedStreamsink=source.keyBy(newKeySelector() { publicStringgetKey(Uservalue) throwsException { returnvalue.getName(); } }); sink.map(user-> { System.out.println("当前线程ID:"+Thread.currentThread().getId() +",user:"+user.toString()); returnuser; }); sink.print(); } // lambda 对于POJO类型,KeyBy可以通过keyBy(fieldName)指定字段进行分区publicstaticvoidkeyByFunction2(StreamExecutionEnvironmentenv) throwsException { DataStreamSourcesource=source(env); KeyedStreamsink=source.keyBy(user->user.getName()); // 演示keyby后的数据输出sink.map(user-> { System.out.println("当前线程ID:"+Thread.currentThread().getId() +",user:"+user.toString()); returnuser; }); sink.print(); } // 对于Tuple类型,KeyBy可以通过keyBy(fieldPosition)指定字段进行分区。lambdapublicstaticvoidkeyByFunction3(StreamExecutionEnvironmentenv) throwsException { DataStreamSourcesource=source(env); SingleOutputStreamOperator>userTemp=source.map((MapFunction>) user-> { returnnewTuple2(user.getName(), user); }).returns(Types.TUPLE(Types.STRING, Types.POJO(User.class))); KeyedStream, Tuple>sink=userTemp.keyBy(0); // 演示keyby后的数据输出sink.map(user-> { System.out.println("当前线程ID:"+Thread.currentThread().getId() +",user:"+user.f1.toString()); returnuser.f1; }); sink.print(); } // 对于Tuple类型,KeyBy可以通过keyBy(fieldPosition)指定字段进行分区。publicstaticvoidkeyByFunction4(StreamExecutionEnvironmentenv) throwsException { DataStreamSourcesource=source(env); SingleOutputStreamOperator>userTemp=source.map(newMapFunction>() { publicTuple2map(Uservalue) throwsException { returnnewTuple2(value.getName(), value); } }); KeyedStream, String>sink=userTemp.keyBy(newKeySelector, String>() { publicStringgetKey(Tuple2value) throwsException { returnvalue.f0; } }); // 演示keyby后的数据输出sink.map(user-> { System.out.println("当前线程ID:"+Thread.currentThread().getId() +",user:"+user.f1.toString()); returnuser.f1; }); // sink.map(new MapFunction, String>() {//// @Override// public String map(Tuple2 value) throws Exception {// System.out.println("当前线程ID:" + Thread.currentThread().getId() + ",user:" + value.f1.toString());// return null;// }});sink.print(); } // 对于一般类型,如上,KeyBy可以通过keyBy(new KeySelector {...})指定字段进行分区。// 按照name的前4位进行keybypublicstaticvoidkeyByFunction5(StreamExecutionEnvironmentenv) throwsException { DataStreamSourcesource=source(env); KeyedStreamsink=source.keyBy(newKeySelector() { publicStringgetKey(Uservalue) throwsException { // String temp = value.getName().substring(0, 4);returnvalue.getName().substring(0, 4); } }); sink.map(user-> { System.out.println("当前线程ID:"+Thread.currentThread().getId() +",user:"+user.toString()); returnuser; }); sink.print(); } // 对于一般类型,如上,KeyBy可以通过keyBy(new KeySelector {...})指定字段进行分区。 lambda// 按照name的前4位进行keybypublicstaticvoidkeyByFunction6(StreamExecutionEnvironmentenv) throwsException { DataStreamSourcesource=source(env); KeyedStreamsink=source.keyBy(user->user.getName().substring(0, 4)); sink.map(user-> { System.out.println("当前线程ID:"+Thread.currentThread().getId() +",user:"+user.toString()); returnuser; }); sink.print(); } }
7、Reduce
KeyedStream → DataStream
对集合中的元素进行聚合。Reduce 返回单个的结果值,并且 reduce 操作每处理一个元素总是创建一个新值。常用的方法有 average, sum, min, max, count,使用 reduce 方法都可实现。基于ReduceFunction进行滚动聚合,并向下游算子输出每次滚动聚合后的结果。
注意: Reduce会输出每一次滚动聚合的结果。
importjava.util.Arrays; importorg.apache.flink.api.common.functions.ReduceFunction; importorg.apache.flink.streaming.api.datastream.DataStreamSource; importorg.apache.flink.streaming.api.datastream.KeyedStream; importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment; importorg.datastreamapi.User; /*** @author alanchan**/publicclassTestReduceDemo { publicstaticvoidmain(String[] args) throwsException { StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); // env.setParallelism(4);// 设置数据分区数量reduceFunction2(env); env.execute(); } // 构造User数据源publicstaticDataStreamSourcesource(StreamExecutionEnvironmentenv) { DataStreamSourcesource=env.fromCollection(Arrays.asList( newUser(1, "alan1", "1", "1@1.com", 12, 1000), newUser(2, "alan2", "2", "2@2.com", 19, 200), newUser(3, "alan1", "3", "3@3.com", 28, 1500), newUser(5, "alan1", "5", "5@5.com", 15, 500), newUser(4, "alan2", "4", "4@4.com", 30, 400))); returnsource; } // 按照name进行balance进行sumpublicstaticvoidreduceFunction1(StreamExecutionEnvironmentenv) throwsException { DataStreamSourcesource=source(env); KeyedStreamkeyedStream=source.keyBy(user->user.getName()); SingleOutputStreamOperatorsink=keyedStream.reduce(newReduceFunction() { publicUserreduce(Uservalue1, Uservalue2) throwsException { doublebalance=value1.getBalance() +value2.getBalance(); returnnewUser(value1.getId(), value1.getName(), "", "", 0, balance); } }); //sink.print(); } // 按照name进行balance进行sum lambdapublicstaticvoidreduceFunction2(StreamExecutionEnvironmentenv) throwsException { DataStreamSourcesource=source(env); KeyedStreamuserKeyBy=source.keyBy(user->user.getName()); SingleOutputStreamOperatorsink=userKeyBy.reduce((user1, user2) -> { Useruser=user1; user.setBalance(user1.getBalance() +user2.getBalance()); returnuser; }); sink.print(); } }
8、Aggregations
KeyedStream → DataStream
DataStream API 支持各种聚合,例如 min,max,sum 等。 这些函数可以应用于 KeyedStream 以获得 Aggregations 聚合。
Aggregate 对KeyedStream按指定字段滚动聚合并输出每一次滚动聚合后的结果。默认的聚合函数有:sum、min、minBy、max、maxBy。
注意:
max(field)与maxBy(field)的区别: maxBy返回field最大的那条数据;而max则是将最大的field的值赋值给第一条数据并返回第一条数据。同理,min与minBy。
Aggregate聚合算子会滚动输出每一次聚合后的结果
max 和 maxBy 之间的区别在于 max 返回流中的最大值,但 maxBy 返回具有最大值的键, min 和 minBy 同理。
max以第一个比较对象的比较列值进行替换,maxBy是以整个比较对象进行替换。
importjava.util.ArrayList; importjava.util.Arrays; importjava.util.List; importorg.apache.flink.api.java.tuple.Tuple3; importorg.apache.flink.streaming.api.datastream.DataStream; importorg.apache.flink.streaming.api.datastream.DataStreamSource; importorg.apache.flink.streaming.api.datastream.KeyedStream; importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment; importorg.datastreamapi.User; /*** @author alanchan**/publicclassTestAggregationsDemo { publicstaticvoidmain(String[] args) throwsException { StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); aggregationsFunction2(env); env.execute(); } // 构造User数据源publicstaticDataStreamSourcesource(StreamExecutionEnvironmentenv) { DataStreamSourcesource=env.fromCollection(Arrays.asList( newUser(1, "alan1", "1", "1@1.com", 12, 1000), newUser(2, "alan2", "2", "2@2.com", 19, 200), newUser(3, "alan1", "3", "3@3.com", 28, 1500), newUser(5, "alan1", "5", "5@5.com", 15, 500), newUser(4, "alan2", "4", "4@4.com", 30, 400))); returnsource; } //分组统计sum、max、min、maxby、minbypublicstaticvoidaggregationsFunction(StreamExecutionEnvironmentenv) throwsException { DataStreamSourcesource=source(env); KeyedStreamuserTemp=source.keyBy(user->user.getName()); DataStreamsink=null; //1、根据name进行分区统计balance之和 alan1----2500/alan2----600// 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)// 1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)// 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=2500.0)// 1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=600.0)// 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=3000.0)sink=userTemp.sum("balance"); //2、根据name进行分区统计balance的max alan1----1500/alan2----400// 1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)// 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)// 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1500.0)// 1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=400.0)// 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1500.0)sink=userTemp.max("balance");//1@1.com-3000 -- 2@2.com-300//3、根据name进行分区统计balance的min alan1----500/alan2---200// 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)// 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)// 1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)// 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=500.0)// 1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)sink=userTemp.min("balance"); //4、根据name进行分区统计balance的maxBy alan2----400/alan1----1500// 1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)// 1> User(id=4, name=alan2, pwd=4, email=4@4.com, age=30, balance=400.0)// 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)// 16> User(id=3, name=alan1, pwd=3, email=3@3.com, age=28, balance=1500.0)// 16> User(id=3, name=alan1, pwd=3, email=3@3.com, age=28, balance=1500.0)sink=userTemp.maxBy("balance"); //5、根据name进行分区统计balance的minBy alan2----200/alan1----500// 1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)// 1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)// 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)// 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)// 16> User(id=5, name=alan1, pwd=5, email=5@5.com, age=15, balance=500.0)sink=userTemp.minBy("balance"); sink.print(); } publicstaticvoidaggregationsFunction2(StreamExecutionEnvironmentenv) throwsException { Listlist=newArrayList>(); list.add(newTuple3<>(0,3,6)); list.add(newTuple3<>(0,2,5)); list.add(newTuple3<>(0,1,6)); list.add(newTuple3<>(0,4,3)); list.add(newTuple3<>(1,1,9)); list.add(newTuple3<>(1,2,8)); list.add(newTuple3<>(1,3,10)); list.add(newTuple3<>(1,2,9)); list.add(newTuple3<>(1,5,7)); DataStreamSource>source=env.fromCollection(list); KeyedStream, Integer>tTemp=source.keyBy(t->t.f0); DataStream>sink=null; //按照分区,以第一个Tuple3的元素为基础进行第三列值比较,如果第三列值小于第一个tuple3的第三列值,则进行第三列值替换,其他的不变// 12> (0,3,6)// 11> (1,1,9)// 11> (1,1,8)// 12> (0,3,5)// 11> (1,1,8)// 12> (0,3,5)// 11> (1,1,8)// 12> (0,3,3)// 11> (1,1,7) sink=tTemp.min(2); // 按照数据分区,以第一个tuple3的元素为基础进行第三列值比较,如果第三列值小于第一个tuple3的第三列值,则进行整个tuple3的替换// 12> (0,3,6)// 11> (1,1,9)// 12> (0,2,5)// 11> (1,2,8)// 12> (0,2,5)// 11> (1,2,8)// 12> (0,4,3)// 11> (1,2,8)// 11> (1,5,7)sink=tTemp.minBy(2); sink.print(); } }
以上,本文主要介绍Flink 的3种常用的operator(keyby、reduce和Aggregations)及以具体可运行示例进行说明。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本专题分为五篇,即:
【flink番外篇】1、flink的23种常用算子介绍及详细示例(1)- map、flatmap和filter
【flink番外篇】1、flink的23种常用算子介绍及详细示例(2)- keyby、reduce和Aggregations
【flink番外篇】1、flink的23种常用算子介绍及详细示例(3)-window、distinct、join等
【flink番外篇】1、flink的23种常用算子介绍及详细示例(4)- union、window join、connect、outputtag、cache、iterator、project