大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(已更完)

Flink(正在更新!)

章节内容

上节我们完成了如下的内容:


Flink DataStream API

Rich并行源 并行源

Flink针对DataStream提供了大量的已经实现的算子

Map

DataStream -> DataStream,获取一个元素并产生一个元素。

以下是将输入流中的值*2的映射函数:

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});

FlatMap

DataStream -> DataStream

获取一个元素,产生0个、1个、多个元素。

以下是一个FlapMap函数切分句子:

dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});

Filter

DataSteam -> DataStream

返回一个布尔值,如果满足条件就筛选出来。

以下是去掉0的值:

dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});

KeyBy

DataStream -> KeyedStream

将流逻辑上划分为不相交的分区。所有具有相同键的记录被分配到相同的分区。在函数的内部,keyBy() 使用哈希分区实现。

有不同的方法来指定键。 此转换返回一个 KeyedStream。

dataStream.keyBy(value -> value.getSomeKey())
dataStream.keyBy(value -> value.f0)

Reduce

KeyedStream -> DataStream

"滚动"归约是一种在数据流处理中常用的操作。简单来说,它是一个累积的过程,逐个处理流中的元素并将结果逐步更新。


键控数据流:这是指数据流中的元素根据某个键(如ID或类别)进行分组。

当前元素:这是指数据流中正在处理的当前记录。

上一个归约值:这是指之前已经处理过并累积的结果。

keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2) throws Exception {
        return value1 + value2;
    }
});

举一个例子:

部分和是一种典型的归约操作,它逐步计算出流中各元素的累积和。例如,对于一个输入数据流 [1, 2, 3, 4, 5],部分和的输出将是 [1, 3, 6, 10, 15]。也就是说:


第一个元素1的部分和是1。

第二个元素2的部分和是1+2=3。

第三个元素3的部分和是1+2+3=6。

以此类推。

通过这样的处理方式,你可以在数据流中实时地看到每一步累积的结果。

Fold

KeyedStream -> DataStream

对于带有初始值的键控数据流进行滚动折叠,将当前元素与上一个折叠值相互结合,并发出新值。

一个折叠函数,应用序列是:(1,2,3,4,5),会发出:start-1、start-1-2、start-1-2-3…

DataStream<String> result =
    keyedStream.fold("start", new FoldFunction<Integer, String>() {
        @Override
        public String fold(String current, Integer value) {
            return current + "-" + value;
        }
});

Aggregations

KeyedStream -> DataStream

对KeyedStream进行滚动聚合,min 和 minBy 的区别在于,min 返回最小值,而 minBy 返回该字段中具有最小元素值(max 和 maxBy 同理)。

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

Window

KeyedStream -> WindowedStream

可以在已经分区的KeyedStream上定义窗口,窗口根据某些特性(例如:最近5秒内到达的数据),对每个键中的数据进行分组。

 // Last 5 seconds of data
dataStream.keyBy(value -> value.f0).window(TumblingEventTimeWindows.of(Time.seconds(5)));

WindowAll

DataStream -> AllWindowStream

可以在常规的 DataStream 上定义窗口,窗口根据某些特性(例如:最近5秒内到达的数据)对所有流事件进行分组。

// Last 5 seconds of data
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); 

Window Apply

WindowedStream -> DataStream

AllWindowedStream -> DataStream

对整个窗口应用一个通用函数:


windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple,
Window>() {
    public void apply (Tuple tuple,
        Window window,
        Iterable<Tuple2<String, Integer>> values,
        Collector<Integer> out) throws Exception {
            int sum = 0;
            for (value t: values) {
                sum += t.f1;
            }
            out.collect (new Integer(sum));
        }
});

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer,
Window>() {
    public void apply (Window window,
        Iterable<Tuple2<String, Integer>> values,
        Collector<Integer> out) throws Exception {
            int sum = 0;
            for (value t: values) {
                sum += t.f1;
            }
            out.collect (new Integer(sum));
        }
});

Window Reduce

WindowedStream -> DataStream

对于一个功能性Reduce函数应用于窗口,并返回简化的值。

windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
    Tuple2<String, Integer> value2) throws Exception {
        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
    }
});

Window Fold

WindowedStream -> DataStream

应用一个函数式的折叠函数到窗口,并返回折叠后的值。

示例函数在序列(1,2,3,4,5)上应用时,将折叠成字符:start-1-2-3-4-5

windowedStream.fold("start", new FoldFunction<Integer, String>() {
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
});

Aggregations on windows

WindowedStream -> DataStream

聚合窗口中的内容,min 和 minBy 的区别于,min 返回最小值,而 minBy 返回在此字段中具有最小值的元素(max 和 maxBy 同理)

windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key");

Union

DataStream -> DataStream

将两个或者多个数据流合并,创建一个包含所有数据流中所有元素的新数据流。

注意:如果一个数据流于自身合并,结果数据流中每个元素将出现两次。

dataStream.union(otherStream1, otherStream2, ...);

Window Join

DataStream, DataStream -> DataStream

将两个数据流按给定的键和一个公共窗口进行连接。

dataStream
.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});

Interval Join

KeyedStream, KeyedStream -> DataStream

将两个具有相同键的键控流中的元素e1和e2在给定时间间隔内连接起来,使得e2的时间戳满足 e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound

// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
.between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper
bound
.upperBoundExclusive(true) // optional
.lowerBoundExclusive(true) // optional
.process(new IntervalJoinFunction() {...});

Window CoGroup

DataStream, DataStream -> DataStream

将两个数据流在给定键和公共窗口上进行合并。

dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {...});

Connect

DataStrean, DataStream -> ConnectedStreams

将两个数据流连接起来,保留他们的类型,连接允许在两个数据流之间共享。

DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...
ConnectedStreams<Integer, String> connectedStreams =
someStream.connect(otherStream);

CoMap, CoFlatMap

ConnetedStreams -> DataStream

类似Map和FlatMap,只不过是连接流。

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }
    @Override
    public Boolean map2(String value) {
        return false;
    }
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
    @Override
    public void flatMap1(Integer value, Collector<String> out) {
        out.collect(value.toString());
    }
    @Override
    public void flatMap2(String value, Collector<String> out) {
        for (String word: value.split(" ")) {
            out.collect(word);
        }
    }
});


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
143 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
SQL 消息中间件 分布式计算
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
83 5
|
2月前
|
分布式计算 监控 大数据
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
87 0
|
9天前
|
数据处理 数据安全/隐私保护 流计算
Flink 三种时间窗口、窗口处理函数使用及案例
Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。
80 27
zdl
|
1月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
159 56
|
1月前
|
存储 机器学习/深度学习 大数据
量子计算与大数据:处理海量信息的新方法
量子计算作为革命性的计算范式,凭借量子比特和量子门的独特优势,展现出在大数据处理中的巨大潜力。本文探讨了量子计算的基本原理、在大数据处理中的应用及面临的挑战与前景,展望了其在金融、医疗和物流等领域的广泛应用。
|
1月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
72 1
|
2月前
|
存储 机器学习/深度学习 大数据
量子计算与大数据:处理海量信息的新方法
【10月更文挑战第31天】量子计算凭借其独特的量子比特和量子门技术,为大数据处理带来了革命性的变革。相比传统计算机,量子计算在计算效率、存储容量及并行处理能力上具有显著优势,能有效应对信息爆炸带来的挑战。本文探讨了量子计算如何通过量子叠加和纠缠等原理,加速数据处理过程,提升计算效率,特别是在金融、医疗和物流等领域中的具体应用案例,同时也指出了量子计算目前面临的挑战及其未来的发展方向。
|
2月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
86 1
|
2月前
|
消息中间件 分布式计算 Kafka
大数据平台的毕业设计02:Spark与实时计算
大数据平台的毕业设计02:Spark与实时计算
120 0