Flink之DataStream转换操作

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

(1)Map详解


调用用户定义的MapFunction对DataStream数据进行处理,形成新的DataStream,其中数据格式可能会发生变化,常用作对数据集内数据的清洗和转换。

Map[DataStream -> DataStream]
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});


(2)FlatMap详解


处理输入一个元素产生一个或者多个元素的计算场景。

FlatMap[DataStream -> DataStream]
dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});


(3)Filter详解


处理输入一个元素产生一个或者多个元素的计算场景。

Filter[DataStream -> DataStream]
dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});


(4)KeyBy详解


将数据集中相同的Key值的数据放置在相同的分区中,也就是对数据集执行Partition操作

KeyBy[DataStream -> KeyedStream

注意:两种情况不能使用KeyBy方法对数据集进行重新分区

  • 数据集类型为P0J0s类型
  • 数据集类型为数组结构
dataStream.keyBy(value -> value.getSomeKey()) // Key by field "someKey"
dataStream.keyBy(value -> value.f0) // Key by the first element of a Tuple

(5)Reduce详解


对数据集滚动进行聚合处理,其中定义的ReduceFuction必须满足运算结合律和交换律。

Reduce[KeyedStream -> DataStream]
keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});

(6)Aggregations详解


Aggregations是DataStream接口提供的聚合算子,根据指定的字段进行聚合操作,滚动地产生一系列数据聚合结果。

Aggregations[KeyedStream -> DataStream]
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

以上Transform我统一写一个wordcount:

package com.aikfk.flink.datastream.transform;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/11 1:22 下午
 */
public class Transform {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> dataStreamSource = env.socketTextStream("bigdata-pro-m07",9999);
        /**
         * map()
         */
        DataStream<Tuple2<String,String>> mapResult = dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String line) throws Exception {
                String[] word = line.split(" ");
                return new Tuple2<>(word[0],word[1]);
            }
        });
        /**
         * flatmap()
         */
        DataStream<Tuple2<String, Integer>> flatmapResult = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String word : s.split(" ")){
                    collector.collect(new Tuple2<>(word,1));
                }
            }
        });
        /**
         * filter() -> keyBy() -> reduce()
         */
        DataStream<Tuple2<String,Integer>> result = flatmapResult.filter(new FilterFunction<Tuple2<String, Integer>>() {
            @Override
            public boolean filter(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                String saprk = "spark";
                return !stringIntegerTuple2.f0.equals(saprk);
            }
        })
            .keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
            @Override
            public Object getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2.f0;
            }
        })
            .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                    return new Tuple2<>(t1.f0,t1.f1 + t2.f1);
                }
            });
        result.print();
        env.execute("stream");
    }
}

(7)Union详解


Union主要是将两个或者两个以上的数据集合并成一个数据集,需要保证两个数据集的格式一致。

Union[DataStream -> DataStream]
package com.aikfk.flink.datastream.transform;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/11 1:22 下午
 */
public class UnionJava {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> dataStreamSource1 = env.socketTextStream("bigdata-pro-m07",9999);
        DataStream<String> dataStreamSource2 = env.socketTextStream("bigdata-pro-m07",9998);
        DataStream<String> dataStreamSource = dataStreamSource1.union(dataStreamSource2);
        /**
         * flatmap()
         */
        DataStream<Tuple2<String, Integer>> flatmapResult = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String word : s.split(" ")){
                    collector.collect(new Tuple2<>(word,1));
                }
            }
        });
        /**
         * filter() -> keyBy() -> reduce()
         */
        DataStream<Tuple2<String,Integer>> result = flatmapResult.filter(new FilterFunction<Tuple2<String, Integer>>() {
            @Override
            public boolean filter(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                String saprk = "spark";
                return !stringIntegerTuple2.f0.equals(saprk);
            }
        })
            .keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
            @Override
            public Object getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2.f0;
            }
        })
            .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                    return new Tuple2<>(t1.f0,t1.f1 + t2.f1);
                }
            });
        result.print();
        env.execute("stream");
    }
}

(8)Connect详解


Connnect主要是为了合并两种或者多种不同数据类型的数据集,合并会保留原来的数据集的数据类型。

Connect[KeyedStream -> DataStream]
package com.aikfk.flink.datastream.transform;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/11 1:22 下午
 */
public class ConnectJava {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String,Integer>> dataStreamSource1 = env.fromElements(new Tuple2<>("spark",1),new Tuple2<>("java",3));
        DataStream<Tuple2<String,Integer>> dataStreamSource2 = env.fromElements(new Tuple2<>("hive",2),new Tuple2<>("hadoop",5));
        /**
         * connect()
         */
        ConnectedStreams<Tuple2<String, Integer>, Tuple2<String, Integer>> connectedStreams = dataStreamSource1
                .connect(dataStreamSource2).keyBy(0,0);
        DataStream<Tuple2<String,Integer>> mapResult = connectedStreams.map(new CoMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map1(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return new Tuple2<>(stringIntegerTuple2.f0, stringIntegerTuple2.f1 + 10);
            }
            @Override
            public Tuple2<String, Integer> map2(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2;
            }
        });
        mapResult.print();
        env.execute("stream");
    }
}

(9)Side Out详解


Side Outs提供了根据条件对DataStream数据集进行拆分,原来是Split算子可以提供这个功能,但在Flink的后续版本中已经不推荐使用Split算子了。


SideOut[DataStream -> SingleOutputStreamOperator -> DataStream]
package com.aikfk.flink.datastream.transform;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/11 1:22 下午
 */
public class SideOut {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        OutputTag<Tuple2<String,Integer>> outputTag = new OutputTag<Tuple2<String,Integer>>("side-output"){};
        DataStream<Tuple2<String,Integer>> dataStreamSource = env.fromElements(
                new Tuple2<>("alex",11000),
                new Tuple2<>("lili",3200),
                new Tuple2<>("lucy",3400),
                new Tuple2<>("pony",13000),
                new Tuple2<>("tony",33000),
                new Tuple2<>("herry",4500),
                new Tuple2<>("cherry",9000),
                new Tuple2<>("jack",13450)
        );
        /**
         * mainDataStream为拆分出薪资小于10000的数据集
         */
        SingleOutputStreamOperator<Tuple2<String,Integer>> mainDataStream = dataStreamSource
                .process(new ProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            @Override
            public void processElement(Tuple2<String, Integer> stringIntegerTuple2,
                                       Context context,
                                       Collector<Tuple2<String, Integer>> collector) throws Exception {
                if (stringIntegerTuple2.f1 > 10000){
                    context.output(outputTag,stringIntegerTuple2);
                } else {
                    collector.collect(stringIntegerTuple2);
                }
            }
        });
        /**
         * sideOutputStream为拆分出薪资大于10000的数据集
         */
        DataStream<Tuple2<String,Integer>> sideOutputStream = mainDataStream.getSideOutput(outputTag);
        sideOutputStream.print();
        /**
         * 6> (tony,33000)
         * 5> (pony,13000)
         * 2> (alex,11000)
         * 9> (jack,13450)
         */
        mainDataStream.print();
        /**
         * 13> (herry,4500)
         * 10> (lucy,3400)
         * 9> (lili,3200)
         * 14> (cherry,9000)
         */
        env.execute("stream");
    }
}

(10)Iterate详解


Iterate算子适合迭代计算场景,通过每一次的迭代计算,并将计算结果反馈到下一次迭代计算中。

SideOut[DataStream -> IterativeStream -> DataStream]
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value > 0;
    }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value <= 0;
    }
});

(11)分区详解


自定义分区:

通过实现partitonCustom()方法对数据集创建自定义分区。

自定义分区[DataStream -> DataStream]
dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);
package com.aikfk.flink.datastream.transform;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/12 7:38 下午
 */
public class PartitionCustom {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String,Integer>> dataStream = env.socketTextStream("bigdata-pro-m07",9999)
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                        for (String word : s.split(" ")){
                            collector.collect(new Tuple2<>(word,1));
                        }
                    }
                });
        /**
         * 自定义分区
         */
        dataStream.partitionCustom(new Partitioner<String>() {
            @Override
            public int partition(String key, int numPartitions) {
                int partition = key.hashCode() % numPartitions;
                System.out.println("key: " + key + " partition: " + partition + " numPartitions: " + numPartitions);
                return partition;
            }
        }, new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2.f0;
            }
        });
        env.execute("partition");
    }
}
key: java partition: 2 numPartitions: 16
key: java partition: 2 numPartitions: 16
key: hive partition: 0 numPartitions: 16
key: hive partition: 0 numPartitions: 16
key: hbase partition: 9 numPartitions: 16

Random分区:

通过随机的方式对数据集进行分区,分区相对比较平衡。

Random分区[DataStream -> DataStream]
dataStream.shuffle();

Rebalance分区:

通过轮训的方式对数据集进行分区,分区相对比较平衡。

Rebalance分区[DataStream -> DataStream]
dataStream.rebalance();


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
144 0
|
1月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
134 0
|
1月前
|
SQL 消息中间件 分布式计算
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
36 0
|
4月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之从mysql读数据写到hive报错,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
存储 JSON Kubernetes
实时计算 Flink版操作报错合集之 写入hudi时报错,该如何排查
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
SQL 流计算
实时计算 Flink版操作报错合集之怎么向一个未定义列的表中写入数据
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
资源调度 分布式计算 Hadoop
实时计算 Flink版操作报错合集之perjob提交给yarn,报错显示无法连接yarn- Connecting to ResourceManager,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之运行mysql to doris pipeline时报错,该如何排查
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
Oracle 关系型数据库 Java
实时计算 Flink版操作报错合集之cdc postgres数据库,当表行记录修改后报错,该如何修改
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之整库同步mysql到starRock提交任务异常,该如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
下一篇
无影云桌面