【Flink-API】之复习一系列Transformation/Sink操作

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 【Flink-API】之复习一系列Transformation/Sink操作

一、Map


1.1 介绍


1.DataStream->DataStream 数据集转换。

2.数据集合中的元素一一映射的关系。


1.2 MapFunction

public class Map01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // source
        DataStream<Integer> nums = env.fromElements(1, 2, 3, 4, 5);
        // 实现一:map 方法做映射,输入输出一直的2倍
        SingleOutputStreamOperator<Integer> result1 = nums.map(new MapFunction<Integer, Integer>() {
            @Override
            public Integer map(Integer value) throws Exception {
                return value * 2;
            }
        });
        // 实现二:lambada 表达式的应用
        SingleOutputStreamOperator<Integer> result2 = nums.map(i -> i * 2);
        // sink
        result1.print();
        result2.print();
        env.execute();
    }
}

结果如下:


20200919211256120.png

1.3 RichMapFunction


1.open()方法 在构造方法之后,map方法之前,只执行一次,初始化方法 使用数据库

2.Configuration为全局的配置

3.close()方法 销毁之前执行一次,通常为资源额的释放

程序如下:

        // RichMapFunction
        nums.map(new RichMapFunction<Integer, Integer>() {
            // open 在构造方法之后,map方法之前,只执行一次,初始化方法 使用数据库
            // Configuration为全局的配置
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
            }
            @Override
            public Integer map(Integer value) throws Exception {
                return value * 10;
            }
            // close 销毁之前执行一次,通常为资源额的释放
            @Override
            public void close() throws Exception {
                super.close();
            }
        });

二、FlatMap


2.1 介绍


输入一个元素,会被切分成多个元素。一对多。


2.2 FlatMapFunction

public class FlatMap01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> lines = env.fromElements("GW001001 GW002002 GW003003", "GW001001 GW002002 GW003003", "GW001001 GW002002 GW003003");
        SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> out) throws Exception {
                //实现一:jdk8的流式处理 lambada表达式【推荐】
                //Arrays.stream(line.split(" ")).forEach(out::collect);
                //实现二:
                //Arrays.asList(line.split(" ")).forEach(w -> out.collect(w));
                //实现三:最原始的方式
                String[] words = line.split(" ");
                for (String word : words) {
                    out.collect(word);
                }
            }
        });
        words.print();
        env.execute();
    }
}

结果如下:

20200919212649465.png


2.3 RichFlatMapFunction


雷同方法 open() close()方法。


三、Filter


3.1 介绍


1.ture是留下,false过滤

2.实现对输入的数据进行逻辑判断,判断是否是奇数?


3.2 FilterFunction

public class Filter01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Integer> nums = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9);
        //实现一:ture是留下,false时过滤
        SingleOutputStreamOperator<Integer> filter1 = nums.filter(new FilterFunction<Integer>() {
            @Override
            public boolean filter(Integer value) throws Exception {
                return value % 2 != 0;
            }
        });
        //实现二:lambada表达式:filter2
        //SingleOutputStreamOperator<Integer> filter2 = nums.filter(i -> i >= 5);
        SingleOutputStreamOperator<Integer> filter2 = nums.filter(i -> {
            //换行要有return
            return i >= 5;
        });
        filter1.print();
        filter2.print();
        env.execute();
    }
}


计算结果:

20200919214414832.png

四、KeyBy


实时计算的算子


4.1 lambda实现


1.实现输入一个,返回一个,使用lambada表达式 代替new Function,利用虚拟机开设一个socket端口号,实现实时聚合计算。

2.虚拟机centos中:nc -lk 8888

3.元组也是一个特殊的集合,角标 0 开始 最大Tuple25

4.代码实现:

public class KeyBy01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888);
        //使用lambada表达式 代替new Function
        //输入一个返回一个
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(w -> Tuple2.of(w, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT));
        //元组也是一个特殊的集合,角标 0 开始 最大Tuple25
        KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
        //聚合
        keyed.print();
        env.execute("KeyBy01");
    }
}

运行结果:


20200920085206896.png

4.2 KeyBy自定义实体类


1.实体类WordAndCount

public class WordAndCount {
    private String word;
    private Long counts;
}

2.keyby sum

public class KeyBy02 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> lines = env.socketTextStream("192.168.52.200", 8808);
        //输入一个返回一个
        SingleOutputStreamOperator<WordAndCount> wordAndOne = lines.map(new MapFunction<String, WordAndCount>() {
            @Override
            public WordAndCount map(String value) throws Exception {
                return Turbine.of(value,1L);
            }
        });
        //根据实体类的字段进行聚合
        KeyedStream<WordAndCount, Tuple> keyed = wordAndOne.keyBy("word");
        //聚合
        SingleOutputStreamOperator<WordAndCount> sumed = keyed.sum("counts");
        keyed.print();
        sumed.print();
        env.execute();
    }
}

运行结果:

2020092009265921.png

4.3 keyBy多字段进行分组


代码:

public class KeyBy03 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> lines = env.socketTextStream("192.168.52.200", 8828);
        //山东,烟台,2000
        //山东,烟台,2000
        //山东,烟台,2000
        SingleOutputStreamOperator<Tuple3<String, String, Double>> provinceCityMoney = lines.map(new MapFunction<String, Tuple3<String, String, Double>>() {
            @Override
            public Tuple3<String, String, Double> map(String line) throws Exception {
                //切分
                String[] fields = line.split(",");
                String province = fields[0];
                String city = fields[1];
                double money = Double.parseDouble(fields[2]);
                return Tuple3.of(province, city, money);
            }
        });
        //按照省份,城市分组  多个字段进行分组,最后一个字段进行聚合
        /**
         * 如果是自己定义的bean实体类,可以进行将字段写进去
         */
        SingleOutputStreamOperator<Tuple3<String, String, Double>> summed = provinceCityMoney.keyBy(0, 1).sum(2);
        summed.print();
        env.execute();
    }
}


运行结果:

20200920093945101.png

五、Reduce


聚合,sum只可以加法,而reduce可以乘法,可以自定义算子。

public class Reduce01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> lines = env.socketTextStream("192.168.52.200", 8828);
        //使用lambada表达式 代替new Function
        //输入一个返回一个
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(w -> Tuple2.of(w, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT));
        //元组也是一个特殊的集合,角标 0 开始 最大Tuple25
        KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
        //聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> reduced = keyed.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                String key = value1.f0;
                Integer count1 = value1.f1;
                Integer count2 = value2.f1;
                Integer counts = count1 + count2;
                return Tuple2.of(key, counts);
            }
        });
        reduced.print();
        env.execute();
    }
}

运行结果:

20200920095307419.png

六、Max


例如:输入

spark,10

spark,20

hadoop,10

和历史数据比较,求最大次数的,最大的留下,最小的丢弃

public class Max01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888);
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String line) throws Exception {
                String[] fields = line.split(",");
                String words = fields[0];
                int num = Integer.parseInt(fields[1]);
                return Tuple2.of(words, num);
            }
        });
        //按照单词进行分组
        KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
        //求最大次数的,最大的留下,最小的丢弃
        SingleOutputStreamOperator<Tuple2<String, Integer>> max = keyed.max(1);
        max.print();
        env.execute();
    }
}

运行结果:

20200920100605464.png


七、Sink


7.1 print()


打印到控制台也是sink.


7.2 CSV

public class AddSink01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lines = env.socketTextStream("192.168.52.200", 8888);
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String line) throws Exception {
                String[] fields = line.split(",");
                String words = fields[0];
                int num = Integer.parseInt(fields[1]);
                return Tuple2.of(words, num);
            }
        });
        //按照单词进行分组
        KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
        SingleOutputStreamOperator<Tuple2<String, Integer>> max = keyed.max(1);
        /**
         * 自定义sink,比如 写入数据库,磁盘等等
         * 不需要有返回就可以
         */
        max.addSink(new SinkFunction<Tuple2<String, Integer>>() {
            @Override
            public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
                System.out.println(value);
            }
        });
        /**
         * 写入磁盘
         *  如果是写入scv文件 必须时tuple格式
         */
        max.writeAsCsv("F:\\out222", FileSystem.WriteMode.OVERWRITE);
        max.print();
        env.execute();
    }
}

7.3 RedisSink

public class MyRedisSink extends RichSinkFunction<Turbine> {
    //初始化redis连接
    private transient Jedis jedis;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ParameterTool params = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        String host = params.getRequired("redis.host");
        //String password = params.getRequired("redis.pwd");
        int db = params.getInt("redis.db", 0);
        jedis = new Jedis(host, 6379, 5000);
        //jedis.auth(password);
        jedis.select(db);
    }
    @Override
    public void invoke(Turbine value, Context context) throws Exception {
        if (!jedis.isConnected()) {
            jedis.connect();
        }
        //写入redis
        jedis.hset(value.word, value.province, String.valueOf(value.counts));
    }
    @Override
    public void close() throws Exception {
        super.close();
        jedis.close();
    }
}

7.4 MySqlSink

public class MySqlSink extends RichSinkFunction<GW200001> {
    //最好连接不参与序列化
    private transient Connection conn = null;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //创建mysql连接
        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/turbine?characterEncoding=UTF-8", "root", "123456");
        System.out.println("拿到连接了");
    }
    @Override
    public void invoke(GW200001 gw200001, Context context) throws Exception {
        //更新,插入,统计业务
        PreparedStatement pstm = null;
        try {
            pstm = conn.prepareStatement("insert into gw200001(wt_number, wt_date_time) values(?,?)");
            pstm.setString(1, gw200001.wt_number);
            pstm.setString(2, gw200001.wt_date_time);
            //执行sql  executeUpdate() executeQuery()
            System.out.println("执行sql");
            pstm.execute();
        } finally {
            if (pstm != null) {
                pstm.close();
                System.out.println("正常关闭");
            }
        }
    }
    @Override
    public void close() throws Exception {
        super.close();
        conn.close();
        System.out.println("正常关闭了!");
    }
}


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
5天前
|
关系型数据库 MySQL 流计算
Flink自定义sink写入mysql
Flink自定义sink写入mysql
57 0
|
5天前
|
流计算
Flink自定义source、自定义sink
Flink自定义source、自定义sink
41 0
|
5天前
|
数据处理 数据库 流计算
Flink 操作mapper、sink解析
Flink 操作mapper、sink解析
31 0
|
5天前
|
SQL 消息中间件 存储
Flink报错问题之Flink报错:Table sink 'a' doesn't support consuming update and delete changes which is produced by node如何解决
Flink报错通常是指在使用Apache Flink进行实时数据处理时遇到的错误和异常情况;本合集致力于收集Flink运行中的报错信息和解决策略,以便开发者及时排查和修复问题,优化Flink作业的稳定性。
|
7月前
|
消息中间件 关系型数据库 MySQL
Flink--6、输出算子(连接到外部系统、文件、kafka、MySQL、自定义Sink)
Flink--6、输出算子(连接到外部系统、文件、kafka、MySQL、自定义Sink)
|
4天前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用合集之支持sink到多分区的kafka ,还能保持有序吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
18 0
|
5天前
|
消息中间件 关系型数据库 MySQL
[flink 实时流基础] 输出算子(Sink)
[flink 实时流基础] 输出算子(Sink)
|
6月前
|
存储 SQL API
Flink教程(23)- Flink高级特性(Streaming File Sink)
Flink教程(23)- Flink高级特性(Streaming File Sink)
143 0
|
5天前
|
Oracle 关系型数据库 数据库
Flink Sink to Oracle 存在字段CLOB类型,如何处理错误”ORA-01461: 仅能绑定要插入LONG的LONG值“
做Flink CDC同步数据过程中,目标是Oracle数据库,其中某个字段较大被设置为CLOB类型,其中会遇到异常,”ORA-01461: 仅能绑定要插入LONG的LONG值“
|
5天前
|
SQL 消息中间件 Kubernetes
Flink报错问题之Flink报错:Incompatible types for sink column 'xxx' at position x.如何解决
Flink报错通常是指在使用Apache Flink进行实时数据处理时遇到的错误和异常情况;本合集致力于收集Flink运行中的报错信息和解决策略,以便开发者及时排查和修复问题,优化Flink作业的稳定性。