【flink番外篇】1、flink的23种常用算子介绍及详细示例(2)- keyby、reduce和Aggregations

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文主要介绍Flink 的3种常用的operator(keyby、reduce和Aggregations)及以具体可运行示例进行说明

Flink 系列文章


一、Flink 专栏


Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。


1、Flink 部署系列

本部分介绍Flink的部署、配置相关基础内容。


2、Flink基础系列

本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。


3、Flik Table API和SQL基础系列

本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。


4、Flik Table API和SQL提高与应用系列

本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。


5、Flink 监控系列

本部分和实际的运维、监控工作相关。


二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。


两专栏的所有文章入口点击:Flink 系列文章汇总索引


本文主要介绍Flink 的3种常用的operator(keyby、reduce和Aggregations)及以具体可运行示例进行说明。


本文除了maven依赖外,没有其他依赖。


本专题分为五篇,即:

【flink番外篇】1、flink的23种常用算子介绍及详细示例(1)- map、flatmap和filter

【flink番外篇】1、flink的23种常用算子介绍及详细示例(2)- keyby、reduce和Aggregations

【flink番外篇】1、flink的23种常用算子介绍及详细示例(3)-window、distinct、join等

【flink番外篇】1、flink的23种常用算子介绍及详细示例(4)- union、window join、connect、outputtag、cache、iterator、project

【flink番外篇】1、flink的23种常用算子介绍及详细示例(完整版)


一、Flink的23种算子说明及示例

本文示例中使用的maven依赖和java bean 参考本专题的第一篇中的maven和java bean。

6、KeyBy

DataStream → KeyedStream

按照指定的key来对流中的数据进行分组

KeyBy 在逻辑上是基于 key 对流进行分区。在内部,它使用 hash 函数对流进行分区。它返回 KeyedDataStream 数据流。将同一Key的数据放到同一个分区。


分区结果和KeyBy下游算子的并行度强相关。如下游算子只有一个并行度,不管怎么分,都会分到一起。

对于POJO类型,KeyBy可以通过keyBy(fieldName)指定字段进行分区。

对于Tuple类型,KeyBy可以通过keyBy(fieldPosition)指定字段进行分区。

对于一般类型,如上,KeyBy可以通过keyBy(new KeySelector {…})指定字段进行分区。



importjava.util.Arrays;
importorg.apache.flink.api.common.functions.MapFunction;
importorg.apache.flink.api.common.typeinfo.Types;
importorg.apache.flink.api.java.functions.KeySelector;
importorg.apache.flink.api.java.tuple.Tuple;
importorg.apache.flink.api.java.tuple.Tuple2;
importorg.apache.flink.streaming.api.datastream.DataStreamSource;
importorg.apache.flink.streaming.api.datastream.KeyedStream;
importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.datastreamapi.User;
/*** @author alanchan**/publicclassTestKeyByDemo {
publicstaticvoidmain(String[] args) throwsException {
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//  env.setParallelism(4);// 设置数据分区数量keyByFunction6(env);
env.execute();
    }
// 构造User数据源publicstaticDataStreamSourcesource(StreamExecutionEnvironmentenv) {
DataStreamSourcesource=env.fromCollection(Arrays.asList(
newUser(1, "alan1", "1", "1@1.com", 12, 1000), 
newUser(2, "alan2", "2", "2@2.com", 19, 200),
newUser(3, "alan1", "3", "3@3.com", 28, 1500), 
newUser(5, "alan1", "5", "5@5.com", 15, 500), 
newUser(4, "alan2", "4", "4@4.com", 30, 400)));
returnsource;
    }
// 按照name进行keyby 对于POJO类型,KeyBy可以通过keyBy(fieldName)指定字段进行分区publicstaticvoidkeyByFunction1(StreamExecutionEnvironmentenv) throwsException {
DataStreamSourcesource=source(env);
KeyedStreamsink=source.keyBy(newKeySelector() {
@OverridepublicStringgetKey(Uservalue) throwsException {
returnvalue.getName();
    }
  });
sink.map(user-> {
System.out.println("当前线程ID:"+Thread.currentThread().getId() +",user:"+user.toString());
returnuser;
  });
sink.print();
    }
// lambda 对于POJO类型,KeyBy可以通过keyBy(fieldName)指定字段进行分区publicstaticvoidkeyByFunction2(StreamExecutionEnvironmentenv) throwsException {
DataStreamSourcesource=source(env);
KeyedStreamsink=source.keyBy(user->user.getName());
// 演示keyby后的数据输出sink.map(user-> {
System.out.println("当前线程ID:"+Thread.currentThread().getId() +",user:"+user.toString());
returnuser;
  });
sink.print();
    }
// 对于Tuple类型,KeyBy可以通过keyBy(fieldPosition)指定字段进行分区。lambdapublicstaticvoidkeyByFunction3(StreamExecutionEnvironmentenv) throwsException {
DataStreamSourcesource=source(env);
SingleOutputStreamOperator>userTemp=source.map((MapFunction>) user-> {
returnnewTuple2(user.getName(), user);
  }).returns(Types.TUPLE(Types.STRING, Types.POJO(User.class)));
KeyedStream, Tuple>sink=userTemp.keyBy(0);
// 演示keyby后的数据输出sink.map(user-> {
System.out.println("当前线程ID:"+Thread.currentThread().getId() +",user:"+user.f1.toString());
returnuser.f1;
  });
sink.print();
    }
// 对于Tuple类型,KeyBy可以通过keyBy(fieldPosition)指定字段进行分区。publicstaticvoidkeyByFunction4(StreamExecutionEnvironmentenv) throwsException {
DataStreamSourcesource=source(env);
SingleOutputStreamOperator>userTemp=source.map(newMapFunction>() {
@OverridepublicTuple2map(Uservalue) throwsException {
returnnewTuple2(value.getName(), value);
    }
  });
KeyedStream, String>sink=userTemp.keyBy(newKeySelector, String>() {
@OverridepublicStringgetKey(Tuple2value) throwsException {
returnvalue.f0;
    }
  });
// 演示keyby后的数据输出sink.map(user-> {
System.out.println("当前线程ID:"+Thread.currentThread().getId() +",user:"+user.f1.toString());
returnuser.f1;
  });
//  sink.map(new MapFunction, String>() {////      @Override//      public String map(Tuple2 value) throws Exception {//    System.out.println("当前线程ID:" + Thread.currentThread().getId() + ",user:" + value.f1.toString());//    return null;//      }});sink.print();
    }
// 对于一般类型,如上,KeyBy可以通过keyBy(new KeySelector {...})指定字段进行分区。// 按照name的前4位进行keybypublicstaticvoidkeyByFunction5(StreamExecutionEnvironmentenv) throwsException {
DataStreamSourcesource=source(env);
KeyedStreamsink=source.keyBy(newKeySelector() {
@OverridepublicStringgetKey(Uservalue) throwsException {
//    String temp = value.getName().substring(0, 4);returnvalue.getName().substring(0, 4);
    }
  });
sink.map(user-> {
System.out.println("当前线程ID:"+Thread.currentThread().getId() +",user:"+user.toString());
returnuser;
  });
sink.print();
    }
// 对于一般类型,如上,KeyBy可以通过keyBy(new KeySelector {...})指定字段进行分区。 lambda// 按照name的前4位进行keybypublicstaticvoidkeyByFunction6(StreamExecutionEnvironmentenv) throwsException {
DataStreamSourcesource=source(env);
KeyedStreamsink=source.keyBy(user->user.getName().substring(0, 4));
sink.map(user-> {
System.out.println("当前线程ID:"+Thread.currentThread().getId() +",user:"+user.toString());
returnuser;
  });
sink.print();
    }
}



7、Reduce

KeyedStream → DataStream

对集合中的元素进行聚合。Reduce 返回单个的结果值,并且 reduce 操作每处理一个元素总是创建一个新值。常用的方法有 average, sum, min, max, count,使用 reduce 方法都可实现。基于ReduceFunction进行滚动聚合,并向下游算子输出每次滚动聚合后的结果。

注意: Reduce会输出每一次滚动聚合的结果。


importjava.util.Arrays;
importorg.apache.flink.api.common.functions.ReduceFunction;
importorg.apache.flink.streaming.api.datastream.DataStreamSource;
importorg.apache.flink.streaming.api.datastream.KeyedStream;
importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.datastreamapi.User;
/*** @author alanchan**/publicclassTestReduceDemo {
publicstaticvoidmain(String[] args) throwsException {
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//  env.setParallelism(4);// 设置数据分区数量reduceFunction2(env);
env.execute();
    }
// 构造User数据源publicstaticDataStreamSourcesource(StreamExecutionEnvironmentenv) {
DataStreamSourcesource=env.fromCollection(Arrays.asList(
newUser(1, "alan1", "1", "1@1.com", 12, 1000), 
newUser(2, "alan2", "2", "2@2.com", 19, 200),
newUser(3, "alan1", "3", "3@3.com", 28, 1500), 
newUser(5, "alan1", "5", "5@5.com", 15, 500), 
newUser(4, "alan2", "4", "4@4.com", 30, 400)));
returnsource;
    }
// 按照name进行balance进行sumpublicstaticvoidreduceFunction1(StreamExecutionEnvironmentenv) throwsException {
DataStreamSourcesource=source(env);
KeyedStreamkeyedStream=source.keyBy(user->user.getName());
SingleOutputStreamOperatorsink=keyedStream.reduce(newReduceFunction() {
@OverridepublicUserreduce(Uservalue1, Uservalue2) throwsException {
doublebalance=value1.getBalance() +value2.getBalance();
returnnewUser(value1.getId(), value1.getName(), "", "", 0, balance);
    }
  });
//sink.print();
    }
// 按照name进行balance进行sum lambdapublicstaticvoidreduceFunction2(StreamExecutionEnvironmentenv) throwsException {
DataStreamSourcesource=source(env);
KeyedStreamuserKeyBy=source.keyBy(user->user.getName());
SingleOutputStreamOperatorsink=userKeyBy.reduce((user1, user2) -> {
Useruser=user1;
user.setBalance(user1.getBalance() +user2.getBalance());
returnuser;
  });
sink.print();
    }
}




8、Aggregations

KeyedStream → DataStream

DataStream API 支持各种聚合,例如 min,max,sum 等。 这些函数可以应用于 KeyedStream 以获得 Aggregations 聚合。

Aggregate 对KeyedStream按指定字段滚动聚合并输出每一次滚动聚合后的结果。默认的聚合函数有:sum、min、minBy、max、maxBy。

注意:

max(field)与maxBy(field)的区别: maxBy返回field最大的那条数据;而max则是将最大的field的值赋值给第一条数据并返回第一条数据。同理,min与minBy。

Aggregate聚合算子会滚动输出每一次聚合后的结果

max 和 maxBy 之间的区别在于 max 返回流中的最大值,但 maxBy 返回具有最大值的键, min 和 minBy 同理。

max以第一个比较对象的比较列值进行替换,maxBy是以整个比较对象进行替换。


importjava.util.ArrayList;
importjava.util.Arrays;
importjava.util.List;
importorg.apache.flink.api.java.tuple.Tuple3;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.datastream.DataStreamSource;
importorg.apache.flink.streaming.api.datastream.KeyedStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.datastreamapi.User;
/*** @author alanchan**/publicclassTestAggregationsDemo {
publicstaticvoidmain(String[] args) throwsException {
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
aggregationsFunction2(env);
env.execute();
    }
// 构造User数据源publicstaticDataStreamSourcesource(StreamExecutionEnvironmentenv) {
DataStreamSourcesource=env.fromCollection(Arrays.asList(
newUser(1, "alan1", "1", "1@1.com", 12, 1000), 
newUser(2, "alan2", "2", "2@2.com", 19, 200),
newUser(3, "alan1", "3", "3@3.com", 28, 1500), 
newUser(5, "alan1", "5", "5@5.com", 15, 500), 
newUser(4, "alan2", "4", "4@4.com", 30, 400)));
returnsource;
    }
//分组统计sum、max、min、maxby、minbypublicstaticvoidaggregationsFunction(StreamExecutionEnvironmentenv) throwsException {
DataStreamSourcesource=source(env);
KeyedStreamuserTemp=source.keyBy(user->user.getName());
DataStreamsink=null;
//1、根据name进行分区统计balance之和 alan1----2500/alan2----600//  16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)//  1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)//  16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=2500.0)//  1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=600.0)//  16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=3000.0)sink=userTemp.sum("balance");
//2、根据name进行分区统计balance的max alan1----1500/alan2----400//   1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)//   16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)//   16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1500.0)//   1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=400.0)//   16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1500.0)sink=userTemp.max("balance");//1@1.com-3000 --  2@2.com-300//3、根据name进行分区统计balance的min  alan1----500/alan2---200//  16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)//  16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)//  1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)//  16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=500.0)//  1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)sink=userTemp.min("balance");
//4、根据name进行分区统计balance的maxBy alan2----400/alan1----1500//  1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)//  1> User(id=4, name=alan2, pwd=4, email=4@4.com, age=30, balance=400.0)//  16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)//  16> User(id=3, name=alan1, pwd=3, email=3@3.com, age=28, balance=1500.0)//  16> User(id=3, name=alan1, pwd=3, email=3@3.com, age=28, balance=1500.0)sink=userTemp.maxBy("balance");
//5、根据name进行分区统计balance的minBy alan2----200/alan1----500//  1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)//  1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)//  16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)//  16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)//  16> User(id=5, name=alan1, pwd=5, email=5@5.com, age=15, balance=500.0)sink=userTemp.minBy("balance");
sink.print();
    }
publicstaticvoidaggregationsFunction2(StreamExecutionEnvironmentenv) throwsException {
Listlist=newArrayList>();
list.add(newTuple3<>(0,3,6));
list.add(newTuple3<>(0,2,5));
list.add(newTuple3<>(0,1,6));
list.add(newTuple3<>(0,4,3));
list.add(newTuple3<>(1,1,9));
list.add(newTuple3<>(1,2,8));
list.add(newTuple3<>(1,3,10));
list.add(newTuple3<>(1,2,9));
list.add(newTuple3<>(1,5,7));
DataStreamSource>source=env.fromCollection(list);
KeyedStream, Integer>tTemp=source.keyBy(t->t.f0);
DataStream>sink=null;
//按照分区,以第一个Tuple3的元素为基础进行第三列值比较,如果第三列值小于第一个tuple3的第三列值,则进行第三列值替换,其他的不变//        12> (0,3,6)//        11> (1,1,9)//        11> (1,1,8)//        12> (0,3,5)//        11> (1,1,8)//        12> (0,3,5)//        11> (1,1,8)//        12> (0,3,3)//        11> (1,1,7)  sink=tTemp.min(2);
//     按照数据分区,以第一个tuple3的元素为基础进行第三列值比较,如果第三列值小于第一个tuple3的第三列值,则进行整个tuple3的替换//     12> (0,3,6)//     11> (1,1,9)//     12> (0,2,5)//     11> (1,2,8)//     12> (0,2,5)//     11> (1,2,8)//     12> (0,4,3)//     11> (1,2,8)//     11> (1,5,7)sink=tTemp.minBy(2);
sink.print();
        }
}


以上,本文主要介绍Flink 的3种常用的operator(keyby、reduce和Aggregations)及以具体可运行示例进行说明。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。


本专题分为五篇,即:

【flink番外篇】1、flink的23种常用算子介绍及详细示例(1)- map、flatmap和filter

【flink番外篇】1、flink的23种常用算子介绍及详细示例(2)- keyby、reduce和Aggregations

【flink番外篇】1、flink的23种常用算子介绍及详细示例(3)-window、distinct、join等

【flink番外篇】1、flink的23种常用算子介绍及详细示例(4)- union、window join、connect、outputtag、cache、iterator、project

【flink番外篇】1、flink的23种常用算子介绍及详细示例(完整版)

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
消息中间件 分布式计算 大数据
大数据-121 - Flink Time Watermark 详解 附带示例详解
大数据-121 - Flink Time Watermark 详解 附带示例详解
82 0
|
2月前
|
SQL 消息中间件 分布式计算
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
38 0
|
4月前
|
消息中间件 传感器 数据处理
"揭秘实时流式计算:低延迟、高吞吐量的数据处理新纪元,Apache Flink示例带你领略实时数据处理的魅力"
【8月更文挑战第10天】实时流式计算即时处理数据流,低延迟捕获、处理并输出数据,适用于金融分析等需即时响应场景。其框架(如Apache Flink)含数据源、处理逻辑及输出目标三部分。例如,Flink可从数据流读取信息,转换后输出。此技术优势包括低延迟、高吞吐量、强容错性及处理逻辑的灵活性。
103 4
|
5月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在Flink算子内部使用异步IO可以通过什么办法实现
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之如何在open算子中有办法获取到jobmanager的ip
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何将算子链断开
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
负载均衡 算法 大数据
[flink 实时流基础] 转换算子
[flink 实时流基础] 转换算子
|
7月前
|
消息中间件 关系型数据库 MySQL
[flink 实时流基础] 输出算子(Sink)
[flink 实时流基础] 输出算子(Sink)
234 1
|
7月前
|
消息中间件 网络协议 Kafka
[flink 实时流基础] flink 源算子
[flink 实时流基础] flink 源算子
|
7月前
|
消息中间件 网络协议 大数据
[flink 实时流基础]源算子和转换算子
[flink 实时流基础]源算子和转换算子

相关产品

  • 实时计算 Flink版
  • 下一篇
    DataWorks