2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二.五)

本文涉及的产品
RDS Agent(兼容OpenClaw),2核4GB
RDS DuckDB + QuickBI 企业套餐,8核32GB + QuickBI 专业版
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
简介: 2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二.五)

引言

大家好,我是ChinaManor,直译过来就是中国码农的意思,我希望自己能成为国家复兴道路的铺路人,大数据领域的耕耘者,平凡但不甘于平庸的人。

下面为大家带来阿里巴巴极度热推的Flink,实时数仓是未来的方向,学好Flink,月薪过万不是梦!!

相关教程直通车:

2021年最新最全Flink系列教程_Flink快速入门(概述,安装部署)(一)

2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二)

2021年最新最全Flink系列教程__Flink高级API(三)

day02-03_流批一体API

今日目标

  • 流处理原理初探
  • 流处理概念(理解)
  • 程序结构之数据源Source(掌握)
  • 程序结构之数据转换Transformation(掌握)
  • 程序结构之数据落地Sink(掌握)
  • Flink连接器Connectors(理解)

流处理原理初探

  • Flink的角色分配
  1. JobMaster 老大, 主要负责 集群的管理, 故障的恢复, checkpoint 检查点设置
  2. taskmanager worker 小弟, 具体负责任务的执行节点
  3. client 提交任务的界面
  • taskmanager 执行能力
  1. taskslot 静态的概念
  2. parallelism 并行度 动态概念

  • 每个节点就是一个 task 任务
    每个任务拆分成多个并行处理的任务, 多个线程就有多个子任务,就叫子任务 subtask
  • 流图 StreamGraph 逻辑执行流图 DataFlow
    operator chain 操作链
  • JobGraph
    ExecuteGraph 物理执行计划
  • Event 事件 带有时间戳的
  • Operator 传递模式 : one to one 模式, redistributing模式

  • Flink之执行图

流处理概念

数据的时效性

  • 强调的是数据的处理时效
    处理的时间窗口, 按月, 按天, 按小时还是秒级处理

流处理和批处理

  • 批处理是有界的数据
  • 处理完整的数据集, 比如排序数据, 计算全局的状态, 生成最终的输入概述.
  • 批量计算: 统一收集数据->存储到DB->对数据进行批量处理
  • 流处理是无界的数据
  • 窗口操作来划分数据的边界进行计算
  • 流式计算,顾名思义,就是对数据流进行处理
  • 在Flink1.12时支持流批一体 既支持流处理也支持批处理。

  • 流批一体 Flink1.12.x 批处理和流处理
  • 可复用性: 作业在流模式或者批处理两种模式自由切换, 无需重写任何代码.
  • 维护简单: 统一的 API 意味着流和批可以共用同一组 connector,维护同一套代码.
编程模型

  • source - 读取数据源
  • transformation - 数据转换 map flatMap groupBy keyBy sum
  • sink - 落地数据 addSink print

Source

基于集合的Source

  • 开发和测试使用
  • 分类
1.env.fromElements(可变参数);
2.env.fromColletion(各种集合);
# 过期
3.env.generateSequence(开始,结束);
4.env.fromSequence(开始,结束);
  • 使用集合 Source
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
/**
 * Author itcast
 * Date 2021/6/16 9:29
 * 需求: 通过集合source打印结果,查看如何使用
 * 开发步骤:
 * 1. 创建流环境
 * 2. 从集合中读取数据
 * 3. 打印输出
 * 4. 运行执行
 */
public class SourceDemo01 {
    public static void main(String[] args) throws Exception {
        //1. 创建流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2. 从集合中读取数据
        //2.1 fromElement 从元素集合
        DataStreamSource<String> source1 = env.fromElements("hello world", "hello spark", "hello flink");
        //2.2 fromCollection  从集合列表
        ArrayList<String> strings = new ArrayList<>();
        strings.add("hello world");
        strings.add("hello flink");
        DataStreamSource<String> source2 = env.fromCollection(strings);
        //2.3 fromSequence   从序列
        DataStreamSource<Long> source3 = env.fromSequence(1, 10);
        //3. 打印输出
        source1.print();
        //4. 运行执行
        env.execute();
    }
}
  • socket 数据源 wordcount 统计
/**
 * Author itcast
 * Desc
 * SocketSource
 */
public class SourceDemo03 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //2.source
        DataStream<String> linesDS = env.socketTextStream("node1", 9999);
        //3.处理数据-transformation
        //3.1每一行数据按照空格切分成一个个的单词组成一个集合
        DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                //value就是一行行的数据
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(word);//将切割处理的一个个的单词收集起来并返回
                }
            }
        });
        //3.2对集合中的每个单词记为1
        DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                //value就是进来一个个的单词
                return Tuple2.of(value, 1);
            }
        });
        //3.3对数据按照单词(key)进行分组
        //KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0);
        KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);
        //3.4对各个组内的数据按照数量(value)进行聚合就是求sum
        DataStream<Tuple2<String, Integer>> result = groupedDS.sum(1);
        //4.输出结果-sink
        result.print();
        //5.触发执行-execute
        env.execute();
    }
}
  • 自定义数据源 - 随机数据
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.Random;
import java.util.UUID;
/**
 * Author itcast
 * Date 2021/6/16 10:18
 * 每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
 * 要求:
 * - 随机生成订单ID(UUID)
 * - 随机生成用户ID(0-2)
 * - 随机生成订单金额(0-100)
 * - 时间戳为当前系统时间
 *
 * SourceFunction:非并行数据源(并行度只能=1)
 * RichSourceFunction:多功能非并行数据源(并行度只能=1)
 * ParallelSourceFunction:并行数据源(并行度能够>=1)
 * RichParallelSourceFunction:多功能并行数据源(并行度能够>=1)--后续学习的Kafka数据源使用的就是该接口
 */
public class CustomSource01 {
    public static void main(String[] args) throws Exception {
        //1.env 创建 StreamExectution
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2.source,创建自动生成 Order 数据源
        DataStreamSource<Order> source = env.addSource(new MyOrderSource());
        //3.打印数据源
        source.print();
        //4.执行
        env.execute();
        //定义实体类 Order 包括四个字段 oid uid money currentTime
        //定义静态内部类 MyOrderSource 继承 RichParallelSourceFunction
        //每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
        //要求:
        //- 随机生成订单ID(UUID)
        //- 随机生成用户ID(0-2)
        //- 随机生成订单金额(0-100)
        //- 时间戳为当前系统时间
    }
    public static class MyOrderSource extends RichParallelSourceFunction<Order> {
        boolean flag = true;
        Random rn = new Random();
        @Override
        public void run(SourceContext<Order> ctx) throws Exception {
            //每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
            //要求:
            while(flag) {
                //- 随机生成订单ID(UUID)
                String oid = UUID.randomUUID().toString();
                //- 随机生成用户ID(0-2)
                int uid = rn.nextInt(3);
                //- 随机生成订单金额(0-100)
                int money = rn.nextInt(101);
                //- 时间戳为当前系统时间
                long currentTime = System.currentTimeMillis();
                ctx.collect(new Order(oid,uid,money,currentTime));
                //一秒钟休息一下
                Thread.sleep(1000);
            }
        }
        @Override
        public void cancel() {
            flag = false;
        }
    }
    //创建 Order 对象
    @AllArgsConstructor
    @NoArgsConstructor
    @Data
    public static class Order{
        private String oid;
        private int uid;
        private int money;
        private long currentTime;
    }
}
  • 自定义数据源 - 从MySQL数据库中读取 t_student表中数据( 这种场景用的非常少 - 自定义数据源 )
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
/**
 * Author itcast
 * Date 2021/6/16 10:37
 * 需求:
 * 从MySql数据库中读取 t_student 表数据
 * 开发步骤:
 * //1.env 设置并行度为 1
 * //2.source,创建连接MySQL数据源 数据源,每2秒钟生成一条数据
 * //3.打印数据源
 * //4.执行
 * //创建静态内部类 Student ,字段为 id:int name:String age:int
 * //创建静态内部类 MySQLSource 继承RichParallelSourceFunction<Student>
 * // 实现 open 方法 ,创建 connection 和 prepareStatement
 * // 获取数据库连接 mysql5.7版本
 *  jdbc:mysql://192.168.88.163:3306/bigdata?useSSL=false
 * // 实现 run 方法, 每 5秒钟创建一条数据
 * // 实现 close 方法
 */
public class CustomSourceMySQL {
    public static void main(String[] args) throws Exception {
        //1.env 设置并行度为 1
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2.source,创建连接MySQL数据源 数据源,每2秒钟生成一条数据
        DataStreamSource<Student> source = env.addSource(new MySQLSource());
        //3.打印数据源
        source.print();
        //4.执行
        env.execute();
        //创建静态内部类 Student ,字段为 id:int name:String age:int
        //创建静态内部类 MySQLSource 继承RichParallelSourceFunction<Student>
        // 实现 open 方法 ,创建 connection 和 prepareStatement
        // 获取数据库连接 mysql5.7版本
         jdbc:mysql://192.168.88.163:3306/bigdata?useSSL=false
        // 实现 run 方法, 每 5秒钟创建一条数据
        // 实现 close 方法
    }
    public static class MySQLSource extends RichSourceFunction<Student> {
        boolean flag = true;
        Connection conn = null;
        PreparedStatement ps = null;
        // open 生命周期的开始, 只做一次
        @Override
        public void open(Configuration parameters) throws Exception {
            conn = DriverManager.getConnection("jdbc:mysql://192.168.88.163:3306/bigdata?useSSL=false",
                    "root", "123456");
            String sql = "select id,name,age from t_student";
            // 执行 preparement
            ps = conn.prepareStatement(sql);
        }
        @Override
        public void run(SourceContext<Student> ctx) throws Exception {
            while(flag){
                // 查询结果集
                ResultSet rs = ps.executeQuery();
                while(rs.next()){
                    int id = rs.getInt("id");
                    String name = rs.getString("name");
                    int age = rs.getInt("age");
                    ctx.collect(new Student(id,name,age));
                    Thread.sleep(5000);
                }
            }
        }
        @Override
        public void cancel() {
            flag = false;
        }
        //关闭数据库, 在整个生命周期也只做一次
        @Override
        public void close() throws Exception {
            if(!ps.isClosed()) ps.close();
            if(!conn.isClosed()) conn.close();
        }
    }
    //定义 student
    @AllArgsConstructor
    @NoArgsConstructor
    @Data
    public static class Student{
        private int id;
        private String name ;
        private int age;
    }
}

合并-拆分

  • 合并数据流 将两个数据流合并成一个数据流
  • 应用场景
    ① 将不同的数据源 电脑, app , ipad ,微信小程序的所有的订单的信息 ,统计分析,挖掘
    ② 将不同的手持设备, 电脑用户行为轨迹收集统计分析
  • union 和connect 区别
    union 和 connect 合流
    union 算子 要求数据流的类型必须保持一致.
    connect 算子 要求数据流的类型可以不一致
  • 需求: 将两个数据流合并到一起
/**
 * Author itcast
 * Desc
 */
public class TransformationDemo02 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);
        //2.Source
        DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");
        DataStream<String> ds2 = env.fromElements("oozie", "flume", "flink");
        DataStream<Long> ds3 = env.fromElements(1L, 2L, 3L);
        //3.Transformation
        // union 算子 保证两个数据流类型保持一致
        DataStream<String> result1 = ds1.union(ds2);//合并但不去重 https://blog.csdn.net/valada/article/details/104367378
        // connect 算子 两个数据流类型可以不一样
        ConnectedStreams<String, Long> tempResult = ds1.connect(ds3);
        //interface CoMapFunction<IN1, IN2, OUT>
        DataStream<String> result2 = tempResult.map(new CoMapFunction<String, Long, String>() {
            @Override
            public String map1(String value) throws Exception {
                return "String->String:" + value;
            }
            @Override
            public String map2(Long value) throws Exception {
                return "Long->String:" + value.toString();
            }
        });
        //4.Sink
        //result1.print();
        result2.print();
        //5.execute
        env.execute();
    }
}

分流 select 和 outputside

  • 将一个数据流分成多个数据流
  • 应用场景
    ① 服务器日志 分流出正常的日志, 告警日志, 报错日志
  • 需求 - 将数据流拆分成 偶数 和 奇数
  • 开发步骤
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
 * Author itcast
 * Date 2021/6/16 11:29
 * 需求: 拆分数据
 * 开发步骤:
 * //1.env
 * //2.Source 比如 1-20之间的数字
 * //定义两个输出tag 一个奇数 一个偶数,指定类型为Long
 * //对source的数据进行process处理区分奇偶数
 * //3.获取两个侧输出流
 * //4.sink打印输出
 * //5.execute
 */
public class SplitDataStream {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2.Source 比如 1-20之间的数字
        DataStreamSource<Long> source = env.fromSequence(1, 21);
        //定义两个侧输出流 一个奇数 一个偶数, 指定类型为Long
        // 需要指定对应的数据类型, 默认OutputTag 会用通用类型,需要手动
        OutputTag<Long> odd = new OutputTag<Long>("odd", TypeInformation.of(Long.class));
        OutputTag<Long> even = new OutputTag<Long>("even", TypeInformation.of(Long.class));
        //对source的数据进行process处理区分奇偶数
        SingleOutputStreamOperator<Long> result = source.process(new ProcessFunction<Long, Long>() {
            @Override
            public void processElement(Long value, Context ctx, Collector<Long> out) throws Exception {
                if (value % 2 == 0) {
                    ctx.output(even, value);
                } else {
                    ctx.output(odd, value);
                }
            }
        });
        //3.获取两个侧输出流
        //result.print();
        result.getSideOutput(even).print("偶数");
        result.getSideOutput(odd).print("奇数");
        //4.sink打印输出
        //5.execute
        env.execute();
    }
}

数据重平衡 rebalance

  • 将数据均匀大散到各个节点上, 计算更均匀。
  • 需求: 使用3个线程将100个大于10的90个数字,均匀计算
  • 代码
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * Author itcast
 * Date 2021/6/17 15:00
 * 需求: 3个线程处理90个数字, 大于10的数字
 */
public class RebalanceDemo {
    public static void main(String[] args) throws Exception {
        //1.env 设置并行度为3
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        //2.source fromSequence 1-100
        DataStreamSource<Long> source = env.fromSequence(1, 100);
        //3.Transformation
        //下面的操作相当于将数据随机分配一下,有可能出现数据倾斜,过滤出来大于10
        DataStream<Long> filterDS = source.filter(s -> s > 10);
        //3.1 接下来使用map操作,将Long数据转为 tuple2(分区编号/子任务编号, 1)
        /*SingleOutputStreamOperator<Tuple2<Integer, Integer>> mapDS = filterDS.map(new RichMapFunction<Long, Tuple2<Integer*//**CPU的核心编号*//*, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> map(Long value) throws Exception {
                //通过getRuntimeContext获取到任务Index
                int idx = getRuntimeContext().getIndexOfThisSubtask();
                //返回Tuple2(任务Index,1)
                return Tuple2.of(idx, 1);
            }
        });
        //按照子任务id/分区编号分组,统计每个子任务/分区中有几个元素
        SingleOutputStreamOperator<Tuple2<Integer, Integer>> result1 = mapDS.keyBy(i -> i.f0)
                //对当前的数据流根据 key 进行分组聚合
                .sum(1);*/
        //3.2 重新执行以上操作在filter之后先 rebalance 再map ,同上
        SingleOutputStreamOperator<Tuple2<Integer, Integer>> mapDS = filterDS
                .rebalance()
                .map(new RichMapFunction<Long, Tuple2<Integer/**CPU的核心编号*/, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> map(Long value) throws Exception {
                //通过getRuntimeContext获取到任务Index
                int idx = getRuntimeContext().getIndexOfThisSubtask();
                //返回Tuple2(任务Index,1)
                return Tuple2.of(idx, 1);
            }
        });
        //按照子任务id/分区编号分组,统计每个子任务/分区中有几个元素
        SingleOutputStreamOperator<Tuple2<Integer, Integer>> result2 = mapDS.keyBy(i -> i.f0)
                //对当前的数据流根据 key 进行分组聚合
                .sum(1);
        //4.sink
        //result1.print("没有重分区");
        result2.print("重分区");
        //5.execute
        env.execute();
    }
}

Sink

预定义Sink

/**
 * Author itcast
 * Desc
 * 1.ds.print 直接输出到控制台
 * 2.ds.printToErr() 直接输出到控制台,用红色
 * 3.ds.collect 将分布式数据收集为本地集合
 * 4.ds.setParallelism(1).writeAsText("本地/HDFS的path",WriteMode.OVERWRITE)
 */
public class SinkDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.source
        //DataStream<String> ds = env.fromElements("hadoop", "flink");
        DataStream<String> ds = env.readTextFile("data/input/words.txt");
        //3.transformation
        //4.sink
        ds.print();
        ds.printToErr();
        ds.writeAsText("data/output/test", FileSystem.WriteMode.OVERWRITE).setParallelism(2);
        //注意:
        //Parallelism=1为文件
        //Parallelism>1为文件夹
        //5.execute
        env.execute();
    }
}

自定义Sink

  • 需求
    将集合中的数据写入到 MySQL 中
  • 开发步骤
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
/**
 * Author itcast
 * Date 2021/6/17 15:43
 * Desc TODO
 */
public class CustomSinkMySQL {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStream<Student> studentDS = env.fromElements(new Student(null, "tonyma", 18));
        //3.Transformation
        //4.Sink
        studentDS.addSink(new MySQLSink());
        //5.execute
        env.execute();
    }
    //实现 RichSinkFunction 来实现将数据插入到 MySQL 中 t_student 表中
    public static class MySQLSink extends RichSinkFunction<Student>{
        Connection conn ;
        PreparedStatement ps;
        //连接数据库
        @Override
        public void open(Configuration parameters) throws Exception {
            conn = DriverManager.getConnection("jdbc:mysql://192.168.88.163:3306/bigdata?useSSL=false",
                    "root",
                    "123456");
            String sql = "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?)";
            ps = conn.prepareStatement(sql);
        }
        //将数据插入到数据库
        @Override
        public void invoke(Student value, Context context) throws Exception {
            ps.setString(1,value.name);
            ps.setInt(2,value.age);
            ps.executeUpdate();
        }
        //关闭数据库
        @Override
        public void close() throws Exception {
            if(!ps.isClosed()) ps.close();
            if(!conn.isClosed()) conn.close();
        }
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
}

Connector

  • Flink官方提供的连接器, 用于连接 JDBC 或者 Kafka ,MQ等
JDBC 连接方式
  • 需求:将数据元素通过JDBC方式存储到MySQL数据库
/**
 * Author itcast
 * Date 2021/6/17 15:59
 * Desc TODO
 */
public class JDBCSinkMySQL {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStreamSource<Student> source = env.fromElements(new Student(null, "JackMa", 42));
        //3.将数据通过 jdbc 插入到 mysql 数据库
        source.addSink(JdbcSink.sink(
                // 输入 SQL 执行插入SQL 语句
                "INSERT INTO t_student(id,name,age) values (null,?,?)",
                    // 执行插入的赋值
                    (ps, student) -> {
                        ps.setString(1,student.name);
                        ps.setInt(2,student.age);
                    },
                //构造器
                // 执行的选项 设置批处理大小等参数
                JdbcExecutionOptions.builder()
                        .withBatchSize(1000)
                        .withBatchIntervalMs(200)
                        .withMaxRetries(5)
                        .build(),
                //4.参数配置 连接参数
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://192.168.88.163:3306/bigdata?useSSL=false")
                        .withUsername("root")
                        .withPassword("123456")
                        .withDriverName("com.mysql.jdbc.Driver")
                        .build()));
        //5.执行环境
        env.execute();
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
}
Kafka 连接方式
  • Kafka 是消息队列
  • 需求:
    通过 Flink 将数据元素写入(producer)到 Kafka 中
package cn.itcast.flink.sink;
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
/**
 * Author itcast
 * Date 2021/6/17 16:46
 * 需求: 将数据元素封装成 JSON字符串 生产到 Kafka 中
 * 步骤:
 *
 */
public class KafkaProducerDemo {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source 生成一个元素 Student
        DataStreamSource<Student> studentDS = env.fromElements(new Student(102, "Oking", 25));
        //3.Transformation
        //注意:目前来说我们使用Kafka使用的序列化和反序列化都是直接使用最简单的字符串,所以先将Student转为字符串
        //3.1 map 方法 将 Student转换成字符串
        SingleOutputStreamOperator<String> mapDS = studentDS.map(new MapFunction<Student, String>() {
            @Override
            public String map(Student value) throws Exception {
                //可以直接调用JSON的toJsonString,也可以转为JSON
                String json = JSON.toJSONString(value);
                return json;
            }
        });
        //4.Sink
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.88.161:9092");
        //根据参数实例化 FlinkKafkaProducer
        //4.1如果不需要复杂的参数设置,只需要将数据存储到 kafka 消息队列中,使用第一个重载方法
        //  如果需要设置复杂的 kafka 的配置的时候, 使用除了第一个之外的重载方法
        //  如果需要设置仅一次语义 Semantic ,可以使用最后两个
        /*FlinkKafkaProducer producer = new FlinkKafkaProducer(
                "192.168.88.161:9092,192.168.88.162:9092,192.168.88.163:9092",
                "flink_kafka",
                new SimpleStringSchema()
        );*/
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(
                "flink_kafka",
                new KafkaSerializationSchemaWrapper(
                        "flink_kafka",
                        new FlinkFixedPartitioner(),
                        false,
                        new SimpleStringSchema()
                ),
                props,
                //支持仅一次语义的方式进行提交数据
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        );
        mapDS.addSink(producer);
        // ds.addSink 落地到kafka集群中
        //5.execute
        env.execute();
        //测试 /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
}
从 kafka 集群中消费数据
  • 需求
    读取 kafka 中的数据到控制台
  • 开发步骤
/**
 * Author itcast
 * Date 2021/6/17 16:46
 * 需求: 将数据元素封装成 JSON字符串 生产到 Kafka 中
 * 步骤:
 *
 */
public class KafkaProducerDemo {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source 生成一个元素 Student
        DataStreamSource<Student> studentDS = env.fromElements(new Student(104, "chaoxian", 25));
        //3.Transformation
        //注意:目前来说我们使用Kafka使用的序列化和反序列化都是直接使用最简单的字符串,所以先将Student转为字符串
        //3.1 map 方法 将 Student转换成字符串
        SingleOutputStreamOperator<String> mapDS = studentDS.map(new MapFunction<Student, String>() {
            @Override
            public String map(Student value) throws Exception {
                //可以直接调用JSON的toJsonString,也可以转为JSON
                String json = JSON.toJSONString(value);
                return json;
            }
        });
        //4.Sink
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.88.161:9092");
        //根据参数实例化 FlinkKafkaProducer
        //4.1如果不需要复杂的参数设置,只需要将数据存储到 kafka 消息队列中,使用第一个重载方法
        //  如果需要设置复杂的 kafka 的配置的时候, 使用除了第一个之外的重载方法
        //  如果需要设置仅一次语义 Semantic ,可以使用最后两个
        /*FlinkKafkaProducer producer = new FlinkKafkaProducer(
                "192.168.88.161:9092,192.168.88.162:9092,192.168.88.163:9092",
                "flink_kafka",
                new SimpleStringSchema()
        );*/
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(
                "flink_kafka",
                new KafkaSerializationSchemaWrapper(
                        "flink_kafka",
                        new FlinkFixedPartitioner(),
                        false,
                        new SimpleStringSchema()
                ),
                props,
                //支持仅一次语义的方式进行提交数据
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        );
        mapDS.addSink(producer);
        // ds.addSink 落地到kafka集群中
        //5.execute
        env.execute();
        //测试 /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
}
Flink写入到 Redis 数据库
  • Redis 是支持缓存的内存数据库,支持持久化
  • 使用场景
  1. 热数据处理 , 缓存机制
  2. 去重
  3. 五种数据类型 String Hash set Zset List
  • 需求:
    通过 Flink 将数据写入到 Redis 中
package cn.itcast.flink.sink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;
/**
 * Author itcast
 * Desc
 * 需求:
 * 接收消息并做WordCount,
 * 最后将结果保存到Redis
 * 注意:存储到Redis的数据结构:使用hash也就是map
 * key            value
 * WordCount    (单词,数量)
 */
public class ConnectorsDemo_Redis {
    public static void main(String[] args) throws Exception {
        //1.env 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source 从 socket 中读取数据
        DataStream<String> linesDS = env.socketTextStream("192.168.88.163", 9999);
        //3.Transformation
        //3.1切割并记为1
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //3.2分组
        KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOneDS.keyBy(t -> t.f0);
        //3.3聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);
        //4.Sink
        result.print();
        // * 最后将结果保存到Redis
        // * 注意:存储到Redis的数据结构:使用hash也就是map
        // * key            value
        // * WordCount      (单词,数量)
        //-1.创建RedisSink之前需要创建RedisConfig
        //连接单机版Redis
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
                .setHost("192.168.88.163")
                .setDatabase(2)
                .build();
        //-3.创建并使用RedisSink
        result.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisWordCountMapper()));
        //5.execute
        env.execute();
    }
    /**
     * -2.定义一个Mapper用来指定存储到Redis中的数据结构
     */
    public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {
        @Override
        public RedisCommandDescription getCommandDescription() {
            //使用哪一种数据类型, key:WordCount
            return new RedisCommandDescription(RedisCommand.HSET, "WordCount");
        }
        @Override
        public String getKeyFromData(Tuple2<String, Integer> data) {
            // 存储数据的 key
            return data.f0;
        }
        @Override
        public String getValueFromData(Tuple2<String, Integer> data) {
            // 存储数据的 value
            return data.f1.toString();
        }
    }
}

问题

  • vmware 打开镜像文件 15.5.x 升级为 16.1.0 , 可以升级为
  • fromSequece(1,10) , CPU 12线程, from <= to
    设置的并行度大于生成的数据, 并行度为12, 生成数据只有 10 个,报这个。
  • Flink Standalone HA 高可用
    jobmanager -> log

总结

以上便是2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二.五)

愿你读过之后有自己的收获,如果有收获不妨一键三连一下~


相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
算法 搜索推荐 API
京东拍立淘图片搜索 API 接口使用指南:从原理到实践
京东拍立淘图片搜索API,基于先进图像识别技术,支持上传图片、URL或拍摄实物搜索相似商品。其特点包括:搜索便捷高效,用户可快速发起搜索;精准匹配结果,通过算法捕捉商品特征确保准确;数据覆盖广泛,依托京东海量商品资源满足个性化需求;智能推荐拓展,根据用户行为挖掘潜在需求,提升购物体验。
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
本文整理自阿里云智能集团 Flink PMC Member & Committer 徐榜江(雪尽)在 FFA 2024 分论坛的分享,涵盖四大主题:Flink CDC、YAML API、Transform + AI 和 Community。文章详细介绍了 Flink CDC 的发展历程及其优势,特别是 YAML API 的设计与实现,以及如何通过 Transform 和 AI 模型集成提升数据处理能力。最后,分享了社区动态和未来规划,欢迎更多开发者加入开源社区,共同推动 Flink CDC 的发展。
938 12
Flink CDC YAML:面向数据集成的 API 设计
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
Flink CDC YAML:面向数据集成的 API 设计
759 5
|
消息中间件 资源调度 API
Apache Flink 流批融合技术介绍
本文源自阿里云高级研发工程师周云峰在Apache Asia Community OverCode 2024的分享,内容涵盖从“流批一体”到“流批融合”的演进、技术解决方案及社区进展。流批一体已在API、算子和引擎层面实现统一,但用户仍需手动配置作业模式。流批融合旨在通过动态调整优化策略,自动适应不同场景需求。文章详细介绍了如何通过量化指标(如isProcessingBacklog和isInsertOnly)实现这一目标,并展示了针对不同场景的具体优化措施。此外,还概述了社区当前进展及未来规划,包括将优化方案推向Flink社区、动态调整算子流程结构等。
1092 31
Apache Flink 流批融合技术介绍
|
SQL 分布式计算 测试技术
概述Flink API中的4个层次
【7月更文挑战第14天】Flink的API分为4个层次:核心底层API(如ProcessFunction)、DataStream/DataSet API、Table API和SQL。
|
API 开发者 Python
API接口:原理、实现及应用
本文详细介绍了API接口在现代软件开发中的重要性及其工作原理。API接口作为应用程序间通信的桥梁,通过预定义的方法和协议实现数据和服务的共享。文章首先解释了API接口的概念,接着通过Python Flask框架示例展示了API的设计与实现过程,并强调了安全性的重要性。最后,本文还讨论了API接口在Web服务和移动应用程序等领域的广泛应用场景。
|
Kubernetes 负载均衡 API
在K8S中,api-service 和 kube-schedule 高可用原理是什么?
在K8S中,api-service 和 kube-schedule 高可用原理是什么?
|
10月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
854 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。

热门文章

最新文章