2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二)

day02_流批一体API

今日目标

  • 流处理概念(理解)
  • 程序结构之数据源Source(掌握)
  • 程序结构之数据转换Transformation(掌握)
  • 程序结构之数据落地Sink(掌握)
  • Flink连接器Connectors(理解)

流处理概念

数据的时效性

  • 强调的是数据的处理时效
    网站的数据访问,被爬虫爬取

流处理和批处理

  • 流处理是无界的
  • 窗口操作来划分数据的边界进行计算
  • 批处理是有界的
  • 在Flink1.12时支持流批一体 既支持流处理也支持批处理。

编程模型

  • source
  • transformation
  • sink

Source

基于File

  • 需求env.readTextFile(本地/HDFS文件/文件夹);//压缩文件也可以
package cn.itcast.sz22.day02;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * Author itcast
 * Date 2021/5/5 9:50
 * env.readTextFile(本地/HDFS文件/文件夹);//压缩文件也可以
 */
public class FileSourceDemo {
    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //读取文件 hdfs://node1:8020/user/root/xxx.txt
        //读取通过 gzip 压缩的 gz 文件
        DataStreamSource<String> source1 = env.readTextFile("data/hello.txt");
        DataStreamSource<String> source2 = env.readTextFile("D:\\_java_workspace\\sz22\\data\\hello.txt.gz");
        //打印文本
        source1.print();
        source2.print("source2:");
        //执行流环境
        env.execute();
    }
}

基于数据集合 fromElemet

  • 需求
    1.env.fromElements(可变参数);
    2.env.fromColletion(各种集合);
    3.env.generateSequence(开始,结束);
    4.env.fromSequence(开始,结束);
  • 案例
package cn.itcast.sz22.day02;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
/**
 * Author itcast
 * Date 2021/5/5 9:20
 * 1. 创建环境(流处理环境)
 * 2. 获取数据源
 * 3. 打印数据
 * 4. 执行
 */
public class SourceDemo01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //1.env.fromElements(可变参数);
        DataStreamSource<String> source1 = env.fromElements("hello world", "hello flink");
        //2.env.fromColletion(各种集合);
        ArrayList list = new ArrayList();
        list.add("hello hadoop");
        list.add("hello flink");
        DataStreamSource source2 = env.fromCollection(list);
        //3.env.generateSequence(开始,结束);
        DataStreamSource<Long> source3 = env.generateSequence(1, 10).setParallelism(1);
        //4.env.fromSequence(开始,结束);
        DataStreamSource<Long> source4 = env.fromSequence(10, 20);
        //打印输出
        source1.print("source1");
        source2.print("source2");
        source3.print("source3");
        source4.print("source4");
        //执行环境
        env.execute();
    }
}

custom

  • 几种 SourceFunction
    SourceFunction:非并行数据源(并行度只能=1)
    RichSourceFunction:多功能非并行数据源(并行度只能=1)
    ParallelSourceFunction:并行数据源(并行度能够>=1)
    RichParallelSourceFunction:多功能并行数据源(并行度能够>=1)–后续学习的Kafka数据源使用的
  • 需求:每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
package cn.itcast.sz22.day02;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
import java.util.UUID;
/**
 * Author itcast
 * Date 2021/5/5 10:15
 * 每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
 * 要求:
 * - 随机生成订单ID(UUID)
 * - 随机生成用户ID(0-2)
 * - 随机生成订单金额(0-100)
 * - 时间戳为当前系统时间
 */
public class CustomSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //自定义 source
        Random rn = new Random();
        DataStreamSource<Order> source = env.addSource(new ParallelSourceFunction<Order>() {
            boolean flag = true;
            //创建一个 标记
            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                while (flag) {
                    //随机生成订单ID(UUID)
                    String oid = UUID.randomUUID().toString();
                    //随机生成用户ID(0-2)
                    int uid = rn.nextInt(3);
                    //随机生成订单金额(0-100)
                    int money = rn.nextInt(101);
                    //时间戳为当前系统时间
                    long timestamp = System.currentTimeMillis();
                    //将数据封装成 Order 收集数据
                    ctx.collect(new Order(oid, uid, money, timestamp));
                    //每一秒休息一次
                    Thread.sleep(1000);
                }
            }
            @Override
            public void cancel() {
                flag = false;
            }
        }).setParallelism(1);
        //打印输出
        source.print();
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order{
        private String uuid;
        private int uid;
        private int money;
        private Long timestamp;
    }
}
  • 从mysql 中自定义数据源读取数据
  • 初始化
CREATE DATABASE if not exists bigdata;
USE bigdata;
CREATE TABLE if not exists `t_student` (
    `id` int(11) NOT NULL AUTO_INCREMENT,
    `name` varchar(255) DEFAULT NULL,
    `age` int(11) DEFAULT NULL,
    PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;
INSERT INTO `t_student` VALUES ('1', 'jack', '18');
INSERT INTO `t_student` VALUES ('2', '张三', '19');
INSERT INTO `t_student` VALUES ('3', 'rose', '20');
INSERT INTO `t_student` VALUES ('4', 'tom', '19');
INSERT INTO `t_student` VALUES ('5', '李四', '18');
INSERT INTO `t_student` VALUES ('6', '王五', '20');
package cn.itcast.sz22.day02;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
/**
 * Author itcast
 * Date 2021/5/5 10:32
 * Desc TODO
 */
public class MySQLSourceDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.env 设置并行度为 1
        env.setParallelism(1);
        //2.source,创建连接MySQL数据源 数据源,每2秒钟生成一条数据
        DataStreamSource<Student> source = env.addSource(new RichSourceFunction<Student>() {
            Connection conn;
            PreparedStatement ps;
            boolean flag = true;
            @Override
            public void open(Configuration parameters) throws Exception {
                //连接数据源
                conn = DriverManager.getConnection("jdbc:mysql://192.168.88.100:3306/bigdata?useSSL=false"
                        , "root", "123456");
                //编写读取数据表的sql
                String sql = "select `id`,`name`,age from t_student";
                //准备 preparestatement SQL
                ps = conn.prepareStatement(sql);
            }
            @Override
            public void run(SourceContext<Student> ctx) throws Exception {
                while (flag) {
                    ResultSet rs = ps.executeQuery();
                    while (rs.next()) {
                        int id = rs.getInt("id");
                        String name = rs.getString("name");
                        int age = rs.getInt("age");
                        Student student = new Student(id, name, age);
                        ctx.collect(student);
                    }
                }
            }
            @Override
            public void cancel() {
                flag = false;
            }
            @Override
            public void close() throws Exception {
                ps.close();
                conn.close();
            }
        });
        //3.打印数据源
        //4.执行
        //创建静态内部类 Student ,字段为 id name age
        //创建静态内部类 MySQLSource 继承RichParallelSourceFunction<Student>
        // 实现 open 方法
        // 获取数据库连接 mysql5.7版本
         jdbc:mysql://192.168.88.100:3306/bigdata?useSSL=false
        // 实现 run 方法
        source.print();
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Student {
        private int id;
        private String name;
        private int age;
    }
}

socket 套接字

  • 安装 netcat yum install nc -y
  • 需求:通过 socket 接收数据并做单词wordcount 统计
package cn.itcast.sz22.day02;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
 * Author itcast
 * Date 2021/5/5 9:59
 * Desc TODO
 */
public class SocketSourceDemo {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.source socketSource
        DataStreamSource<String> source = env.socketTextStream("192.168.88.100", 9999);
        //3.处理数据-transformation
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = source
                .flatMap((String value, Collector<String> out) -> Arrays
                        .stream(value.split(" ")).forEach(out::collect))
                .returns(Types.STRING)
                .map(value -> Tuple2.of(value, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(t -> t.f0)
                .sum(1);
        //3.1每一行数据按照空格切分成一个个的单词组成一个集合
        //3.2对集合中的每个单词记为1
        //3.3对数据按照单词(key)进行分组
        //3.4对各个组内的数据按照数量(value)进行聚合就是求sum
        //4.输出结果-sink
        result.print();
        //5.触发执行-execute
        env.execute();
    }
}

Transformation

转换操作的数据操作类型

  • 对于单条数据的处理 map filter
  • 对于多条数据,window窗口内的数据处理 reduce
  • 合流 union join 将多个流合并到一起
  • 分流 将一个数据流分成多个数据流 spit或 outputTag

案例

  • 对流数据中的单词进行统计,排除敏感词heihei
package cn.itcast.sz22.day02;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
 * Author itcast
 * Date 2021/5/5 9:59
 * 1.filter过滤 将单词中  heihei 单词过滤掉
 * 2.reduce聚合
 */
public class SocketSourceFilterDemo {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.source socketSource
        DataStreamSource<String> source = env.socketTextStream("192.168.88.100", 9998);
        //3.处理数据-transformation
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = source
                .flatMap((String value, Collector<String> out) -> Arrays
                        .stream(value.split(" ")).forEach(out::collect))
                .returns(Types.STRING)
                //过滤掉 包含 heihei 单词的所有信息 boolean filter(T value)
                .filter(word-> !word.equals("heihei"))
                .map(value -> Tuple2.of(value, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(t -> t.f0)
                //.sum(1);
                //T reduce(T value1, T value2)
                // hadoop,1 hadoop,1 => hadoop,1+1
        .reduce((Tuple2<String,Integer> a,Tuple2<String,Integer> b)->Tuple2.of(a.f0,a.f1+b.f1));
        //3.1每一行数据按照空格切分成一个个的单词组成一个集合
        //3.2对集合中的每个单词记为1
        //3.3对数据按照单词(key)进行分组
        //3.4对各个组内的数据按照数量(value)进行聚合就是求sum
        //4.输出结果-sink
        result.print();
        //5.触发执行-execute
        env.execute();
    }
}

合并-拆分

  • connect 不同的数据类型进行流合并
  • union 相同的数据类型进行流合并
案例

需求: 将两个String类型的流进行union

将一个String类型和一个Long类型的流进行connect

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
/**
 * Author itcast
 * Date 2021/5/5 11:24
 * 将两个String类型的流进行union
 * 将一个String类型和一个Long类型的流进行connect
 *
 */
public class UnionAndConnectDemo {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);
        //2.Source
        DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");
        DataStream<String> ds2 = env.fromElements("hadoop", "spark", "flink");
        DataStream<Long> ds3 = env.fromElements(1L, 2L, 3L);
        //3. transformation
        //3.1 union
        DataStream<String> union = ds1.union(ds2);
        union.print("union:");
        //3.2 connect
        ConnectedStreams<String, Long> connect = ds1.connect(ds3);
        SingleOutputStreamOperator<String> source2 = connect.map(new CoMapFunction<String, Long, String>() {
            @Override
            public String map1(String value) throws Exception {
                return "string->string:" + value;
            }
            @Override
            public String map2(Long value) throws Exception {
                return "Long->Long:" + value;
            }
        });
        //打印输出
        source2.print("connect:");
        env.execute();
    }
}

拆分

  • 将数据流拆分成多个数据流
案例
  • 需求:对流中的数据按照奇数和偶数进行分流,并获取分流后的数据
package cn.itcast.sz22.day02;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
 * Author itcast
 * Date 2021/5/5 11:35
 * 对流中的数据按照奇数和偶数进行分流,并获取分流后的数据
 */
public class SplitStreamDemo {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);
        //2.Source 比如 1-20之间的数字
        DataStreamSource<Long> source = env.fromSequence(1, 20);
        //定义两个输出tag 一个奇数 一个偶数,指定类型为Long
        OutputTag<Long> odd = new OutputTag<>("odd", TypeInformation.of(Long.class));
        OutputTag<Long> even = new OutputTag<>("even", TypeInformation.of(Long.class));
        //对source的数据进行process处理区分奇偶数
        SingleOutputStreamOperator<Long> processDS = source.process(new ProcessFunction<Long, Long>() {
            @Override
            public void processElement(Long value, Context ctx, Collector<Long> out) throws Exception {
                if (value % 2 == 0) {
                    ctx.output(even, value);
                } else {
                    ctx.output(odd, value);
                }
            }
        });
        //3.获取两个侧输出流
        DataStream<Long> evenDS = processDS.getSideOutput(even);
        DataStream<Long> oddDS = processDS.getSideOutput(odd);
        //4.sink打印输出
        evenDS.printToErr("even");
        oddDS.print("odd");
        //5.execute
        env.execute();
    }
}

repartition 重分区

  • 需求: 通过重分区保证每个 cpu 处理数据均衡
package cn.itcast.sz22.day02;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * Author itcast
 * Date 2021/5/5 14:46
 * Desc TODO
 */
public class TransformationRebalance {
    public static void main(String[] args) throws Exception {
        //1.env 设置并行度为3
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.enableCheckpointing(1000);
        // 并行度为3 taskslot用3个
        env.setParallelism(3);
        //2.source fromSequence 0-100
        DataStreamSource<Long> source = env.fromSequence(0, 100);
        //3.Transformation
         //下面的操作相当于将数据随机分配一下,有可能出现数据倾斜,过滤出来大于10
        //boolean filter(T value) throws Exception;
        //11 ~ 100
        //默认会将 90 条数据分不到3个分区了
        DataStream<Long> filterDS = source.filter(w -> w > 10);
        //3.1 接下来使用map操作,将Long数据转为(分区编号/子任务编号, 数据)
        SingleOutputStreamOperator<Tuple2<Integer, Integer>> mapDS = filterDS.map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> map(Long value) throws Exception {
                // getRuntimeContext().getIndexOfThisSubtask() 当前这个子任务执行的CPU的索引 0 , 1 ,2
                // (CPU的索引,计数(1)) (0,1) (1,1) (2,1) (1,1) (2,1) (0,1)
                return Tuple2.of(getRuntimeContext().getIndexOfThisSubtask(), 1);
            }
        });
        SingleOutputStreamOperator<Tuple2<Integer, Integer>> result = mapDS.keyBy(t -> t.f0).sum(1);
        //通过getRuntimeContext获取到任务Index
        //返回Tuple2(任务Index,1)
                //按照子任务id/分区编号分组,统计每个子任务/分区中有几个元素
        //3.2 重新执行以上操作在filter之后先 rebalance 再map
        SingleOutputStreamOperator<Tuple2<Integer, Integer>> mapDS1 = filterDS
                //当前处理重分布
                .rebalance()
                .map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> map(Long value) throws Exception {
                return Tuple2.of(getRuntimeContext().getIndexOfThisSubtask(), 1);
            }
        });
        SingleOutputStreamOperator<Tuple2<Integer, Integer>> result1 = mapDS1.keyBy(t -> t.f0).sum(1);
        //result.print("重分布之前");
        result1.print("重分布之后");
        //4.sink
                //result1.print();//有可能出现数据倾斜
                //result2.print();
        //5.execute
        env.execute();
    }
}
  • 案例-对流中的元素使用各种分区,并输出
/**
 * Author itcast
 * Desc
 */
public class TransformationDemo05 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //2.Source
        DataStream<String> linesDS = env.readTextFile("data/input/words.txt");
        SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //3.Transformation
        DataStream<Tuple2<String, Integer>> result1 = tupleDS.global();
        DataStream<Tuple2<String, Integer>> result2 = tupleDS.broadcast();
        DataStream<Tuple2<String, Integer>> result3 = tupleDS.forward();
        DataStream<Tuple2<String, Integer>> result4 = tupleDS.shuffle();
        DataStream<Tuple2<String, Integer>> result5 = tupleDS.rebalance();
        DataStream<Tuple2<String, Integer>> result6 = tupleDS.rescale();
        DataStream<Tuple2<String, Integer>> result7 = tupleDS.partitionCustom(new Partitioner<String>() {
            @Override
            public int partition(String key, int numPartitions) {
                //return key.equals("hello") ? 0 : 1;
                int keys = Integer.parseInt(key);
                if(Integer.parseInt(key)>0 && Integer.parseInt(key)<100){ //10000
                    return 0;
                }else if(keys>101 && keys <10000){ //11000
                    return 1;
                }else{ //8000
                    return 2;
                }
            }
        }, t -> t.f0);
        //4.sink
        //result1.print();
        //result2.print();
        //result3.print();
        //result4.print();
        //result5.print();
        //result6.print();
        result7.print();
        //5.execute
        env.execute();
    }
}

Sink

  • 打印到控制台
  • 输出到文件

  • 案例
package cn.itcast.sz22.day02;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
/**
 * Author itcast
 * Desc
 * 1.ds.print 直接输出到控制台
 * 2.ds.printToErr() 直接输出到控制台,用红色
 * 3.ds.collect 将分布式数据收集为本地集合
 * 4.ds.setParallelism(1).writeAsText("本地/HDFS的path",WriteMode.OVERWRITE)
 */
public class SinkDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.source
        //DataStream<String> ds = env.fromElements("hadoop", "flink");
        DataStream<String> ds = env.readTextFile("data/input/words.txt");
        //3.transformation
        //4.sink
        //4.1 控制台输出
        ds.print();
        ds.printToErr();
        //4.2 输出到文件
        ds.writeAsText("/data/output/words.txt", FileSystem.WriteMode.OVERWRITE)
                .setParallelism(1);
        //
        /*ds.addSink(new SinkFunction<String>() {
            @Override
            public void invoke(String value, Context context) throws Exception {
            }
        })*/
        //注意:
        //Parallelism=1为文件
        //Parallelism>1为文件夹
        //5.execute
        env.execute();
    }
}
  • 案例 - 将Flink集合中的数据通过自定义Sink保存到MySQL
package cn.itcast.sz22.day02;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
/**
 * Author itcast
 * Date 2021/5/5 16:00
 * 将 Student 集合数据sink到MySQL数据库中
 */
public class SinkMySQLDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source 定义 Student 对象
        DataStream<Student> studentDS = env.fromElements(new Student(null, "tonyma", 18));
        studentDS.addSink(new RichSinkFunction<Student>() {
            Connection conn;
            PreparedStatement ps;
            boolean flag = true;
            @Override
            public void open(Configuration parameters) throws Exception {
                //初始化操作,添加连接MySQL
                conn = DriverManager.getConnection("jdbc:mysql://192.168.88.100:3306/bigdata?useSSL=false"
                        , "root", "123456");
                String sql="INSERT INTO t_student(`id`,`name`,`age`) values(null,?,?)";
                ps = conn.prepareStatement(sql);
            }
            @Override
            public void invoke(Student value, Context context) throws Exception {
                ps.setString(1,value.getName());
                ps.setInt(2,value.getAge());
                ps.executeUpdate();
            }
            @Override
            public void close() throws Exception {
                ps.close();
                conn.close();
            }
        });
        //3.Transformation 暂时不需要
        //4.Sink 实现自定义 MySQL sink
        //5.execute
        //创建 Student 类,包含3个字段 id name age
        //创建 MySQLSink 类继承 RichSinkFunction<Student>
        //实现 open invoke close 方法
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Student {
        private String id;
        private String name;
        private int age;
    }
}

Connectors

JDBC

  • 官方提供的JDBC,能提供 仅一次 语义的连接数据库的连接
  • 需求- 将数据存到 MySQL 中
package cn.itcast.sz22.day02;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * Author itcast
 * Date 2021/5/5 16:16
 * Desc TODO
 */
public class SinkJDBCMySQLDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Student> studentDS = env.fromElements(new Student(null, "dehua", 22));
        studentDS.addSink(JdbcSink.sink(
                "insert into t_student(`id`,`name`,`age`) values (null,?,?)",
                (ps, s) -> {
                    ps.setString(1,s.getName());
                    ps.setInt(2,s.getAge());
                },
                JdbcExecutionOptions.builder().build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://192.168.88.100:3306/bigdata?useSSL=false")
                        .withUsername("root")
                        .withPassword("123456")
                        .withDriverName("com.mysql.jdbc.Driver")
                        .build())
        );
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Student {
        private String id;
        private String name;
        private int age;
    }
}

Kafka

  • 消费的起始位置

  • 消费者自动发现分区和topic

  • 设置FlinkKafkaConsumer 属性

package cn.itcast.sz22.day02;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
/**
 * Author itcast
 * Date 2021/5/5 17:23
 * 需求:使用flink-connector-kafka_2.12中的FlinkKafkaConsumer消费Kafka中的数据做WordCount
 * 需要设置如下参数:
 * 1.订阅的主题
 * 2.反序列化规则
 * 3.消费者属性-集群地址
 * 4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)
 * 5.消费者属性-offset重置规则,如earliest/latest...
 * 6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)
 * 7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中
 */
public class FlinkKafkaConsumerDemo {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //开启checkpoint
        env.enableCheckpointing(5000);
        //2.Source
        Properties props  = new Properties();
        props.setProperty("bootstrap.servers", "node1:9092");
        props.setProperty("group.id", "flink");
        props.setProperty("auto.offset.reset","latest");
        props.setProperty("flink.partition-discovery.interval-millis","5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "2000");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("flink_kafka"
                , new SimpleStringSchema(), props);
        consumer.setStartFromEarliest();
        DataStreamSource<String> source = env.addSource(consumer).setParallelism(1);
        source.print();
        env.execute();
    }
}
  • kafka 消费数据
package cn.itcast.sz22.day02;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
/**
 * Author itcast
 * Date 2021/5/5 17:23
 * 需求:使用flink-connector-kafka_2.12中的FlinkKafkaConsumer消费Kafka中的数据做WordCount
 * 需要设置如下参数:
 * 1.订阅的主题
 * 2.反序列化规则
 * 3.消费者属性-集群地址
 * 4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)
 * 5.消费者属性-offset重置规则,如earliest/latest...
 * 6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)
 * 7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中
 */
public class FlinkKafkaConsumerDemo {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //开启checkpoint
        env.enableCheckpointing(5000);
        //2.Source
        Properties props  = new Properties();
        props.setProperty("bootstrap.servers", "node1:9092");
        props.setProperty("group.id", "flink");
        props.setProperty("auto.offset.reset","latest");
        props.setProperty("flink.partition-discovery.interval-millis","5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "2000");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("flink_kafka"
                , new SimpleStringSchema(), props);
        consumer.setStartFromEarliest();
        DataStreamSource<String> source = env.addSource(consumer).setParallelism(1);
        source.print();
        env.execute();
    }
}

redis

  • Flink-Sink-Redis
  • 案例 - 统计保存到 redis
package cn.itcast.sz22.day02;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
 * Author itcast
 * Date 2021/5/5 18:03
 * Desc TODO
 */
public class FlinkRedisSink {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source 通过socket获取数据源
        DataStreamSource<String> source = env.socketTextStream("node1", 9999);
        //3.Transformation
        //3.1切割并记为1
        DataStream<String> faltMapDS = source.flatMap((String value, Collector<String> out) ->
                Arrays.stream(value.split(" "))
                        .forEach(out::collect))
                .returns(Types.STRING);
        //O map(T value)
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapDS = faltMapDS
                .map((word) -> Tuple2.of(word, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT));
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = mapDS.keyBy(t -> t.f0).sum(1);
        //4.Sink
        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                .setHost("node1").build();
        result.addSink(new RedisSink<Tuple2<String, Integer>>(config, new RedisMapperEx()));
        env.execute();
        // * 最后将结果保存到Redis 实现 FlinkJedisPoolConfig
        // * 注意:存储到Redis的数据结构:使用hash也就是map
        // * key            value
        // * WordCount      (单词,数量)
        //-1.创建RedisSink之前需要创建RedisConfig
        //连接单机版Redis
//5.execute
    }
    public static class RedisMapperEx implements RedisMapper<Tuple2<String, Integer>> {
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "hashOpt");
        }
        @Override
        public String getKeyFromData(Tuple2<String, Integer> data) {
            return data.f0;
        }
        @Override
        public String getValueFromData(Tuple2<String, Integer> data) {
            return data.f1 + "";
        }
    }
}
oolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost(“node1”).build();
result.addSink(new RedisSink<Tuple2<String, Integer>>(config, new RedisMapperEx()));
env.execute();
// * 最后将结果保存到Redis 实现 FlinkJedisPoolConfig
// * 注意:存储到Redis的数据结构:使用hash也就是map
// * key value
// * WordCount (单词,数量)
//-1.创建RedisSink之前需要创建RedisConfig
      //连接单机版Redis

//5.execute

}

public static class RedisMapperEx implements RedisMapper<Tuple2<String, Integer>> {
      @Override
      public RedisCommandDescription getCommandDescription() {
          return new RedisCommandDescription(RedisCommand.HSET, "hashOpt");
      }
      @Override
      public String getKeyFromData(Tuple2<String, Integer> data) {
          return data.f0;
      }
      @Override
      public String getValueFromData(Tuple2<String, Integer> data) {
          return data.f1 + "";
      }
  }

}


         


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
21天前
|
网络协议 API
检测指定TCP端口开放状态免费API接口教程
此API用于检测指定TCP端口是否开放,支持POST/GET请求。需提供用户ID、KEY、目标主机,可选指定端口(默认80)和地区(默认国内)。返回状态码、信息提示、检测主机、端口及状态(开放或关闭)。示例中ID和KEY为公共测试用,建议使用个人ID和KEY以享受更高调用频率。
36 14
|
24天前
|
API 网络安全
发送UDP数据免费API接口教程
此API用于向指定主机发送UDP数据,支持POST或GET请求。需提供用户ID、密钥、接收IP及端口、数据内容等参数。返回状态码和信息提示。示例中含公共ID与KEY,建议使用个人凭证以提高调用频率。
44 13
|
24天前
|
网络协议 API 网络安全
发送TCP数据免费API接口教程
此API用于向指定主机发送TCP数据,支持POST/GET请求,需提供用户ID、KEY、接收IP、端口及数据内容。返回状态码和信息提示,示例如下:{&quot;code&quot;:200,&quot;msg&quot;:&quot;发送成功!&quot;}。详情见:https://www.apihz.cn/api/datacstcp.html
35 11
|
27天前
|
API
历史上的今天免费API接口教程
该接口用于获取历史上的今天发生的事件,支持随机获取记录,数据同步自百度历史上的今天。请求方式为POST或GET,需提供用户ID和KEY,可选指定查询的具体日期。返回数据包括事件标题、年份、月份、日、关键词及百度百科链接。示例请求和响应详见文档。
57 12
|
22天前
|
API
获取网页状态码[可指定地域]免费API接口教程
该接口用于获取指定网址的访问状态码,支持从国内、香港、美国等地域节点访问。通过POST或GET请求,需提供用户ID、KEY及目标网址等参数。返回结果包括状态码和信息提示。 示例:https://cn.apihz.cn/api/wangzhan/getcode.php?id=88888888&key=88888888&type=1&url=www.apihz.cn。
|
23天前
|
API
诸葛神签免费API接口教程
该接口用于随机获取一支诸葛神签,共100签。通过POST或GET请求至https://cn.apihz.cn/api/mingli/zhuge.php,需提供用户ID和KEY。返回结果包括状态码、消息内容、签文图片、序号、吉凶及解签。示例中ID与KEY为公共使用,建议使用个人ID与KEY以提高调用频率。
|
23天前
|
API
观音灵签免费API接口教程
该API提供观音灵签的随机获取服务,含100签,支持POST/GET请求。需用户ID和KEY认证。返回签文、解曰、典故等信息。示例请求:https://cn.apihz.cn/api/mingli/guanyin.php?id=88888888&key=88888888。详情参见官方文档。
|
22天前
|
缓存 算法 API
查询域名WHOIS信息免费API接口教程
该API用于查询顶级域名的WHOIS信息,不支持国别域名和中文域名。通过POST或GET请求,需提供用户ID、KEY及待查询域名。返回信息包括域名状态、注册商、时间等详细数据。示例与文档见官网。
|
23天前
|
API
周公解梦免费API接口教程
该接口提供数万个解梦数据,支持通过用户ID、用户KEY和关键词查询梦的解析。支持POST/GET请求,返回状态码、消息内容、结果集等信息。示例请求地址:https://cn.apihz.cn/api/mingli/zhougong.php?id=88888888&key=88888888&word=捞鱼。返回数据包括梦的标题、内容等。
|
22天前
|
API
icp备案查询免费API接口教程
该接口用于查询指定域名的ICP备案信息,支持POST或GET请求方式。请求时需提供用户ID、用户KEY及待查询的域名,可选参数为查询通道。响应中包含状态码、消息内容、备案号、备案主体、域名及审核时间等信息。示例中提供了GET和POST请求方式及返回数据样例。

热门文章

最新文章