2021年最新最全Flink系列教程__FlinkTable&SQL(六、七)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 2021年最新最全Flink系列教程__FlinkTable&SQL(六、七)

day06-07_FlinkSQL&Table

今日目标

  • 了解Flink Table&SQL发展历史
  • 了解为什么要使用Table API & SQL
  • 掌握Flink Table&SQL进行批处理开发
  • 掌握Flink Table&SQL进行流处理开发
  • 掌握常用的开发案例
  • Flink-SQL的常用算子

Flink Table & SQL

  • FlinkTable & SQL 是抽象级别更高的操作, 底层Flink Runtime => Stream 流程
  • 批处理是流处理的一种特殊形态
  • FlinkSQL 遵循ANSI的SQL规范
  • Flink1.9之前, FlinkSQL包括两套Table api , DataStream Table API(流处理) ,DataSet Table API(批处理)
  • Planner 查询器, 抽象语法树,parser、optimizer、codegen(模板代码生成),最终生成 Flink Runtime 直接进行执行的代码
  • Planner包括old Planner 和 Blink Planner ,Blink Planner 底层实现了 流批一体(默认的Planner)

FlinkTable & SQL 程序结构

  • 导入 pom 依赖, jar包坐标
<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- flink执行计划,这是1.9版本之前的-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- blink执行计划,1.11+默认的-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>

  • 创建 FlinkTable FlinkSQL的 表的方式
// table is the result of a simple projection query 
Table projTable = tableEnv.from("X").select(...);
// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable);
  • SQL的四种语句
  1. DDL 数据定义语言, 创建数据库、表,删除数据库、表
  2. DML 数据操作语言, 对数据进行增、删、改操作
  3. DCL 数据控制语言, 对数据的操作权限进行设置 grant revoke
  4. DQL 数据查询语言,对数据表中的数据进行查询,基础查询,复杂查询,多表查询,子查询
  • 需求
    将两个数据流 DataStream 通过 FlinkTable & SQL API 进行 union all 操作,条件ds1 amount>2 union all ds2 amount<2
  • 开发步骤
package cn.itcast.flink.sql;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.util.Arrays;
import static org.apache.flink.table.api.Expressions.$;
/**
 * Author itcast
 * Date 2021/6/22 9:45
 * Desc TODO
 */
public class FlinkTableAPIDemo {
    public static void main(String[] args) throws Exception {
        //1.准备环境 创建流环境 和 流表环境,并行度设置为1
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        //创建流表环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,settings);
        //2.Source 创建数据集
        DataStream<Order> orderA = env.fromCollection(Arrays.asList(
                new Order(1L, "beer", 3),
                new Order(1L, "diaper", 4),
                new Order(3L, "rubber", 2)));
        DataStream<Order> orderB = env.fromCollection(Arrays.asList(
                new Order(2L, "pen", 3),
                new Order(2L, "rubber", 3),
                new Order(4L, "beer", 1)));
        //3.注册表 将数据流转换成表
        // 通过fromDataStream将数据流转换成表
        Table orderTableA = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount"));
        // 将数据流转换成 创建临时视图
        tEnv.createTemporaryView("orderTableB",orderB,$("user"), $("product"), $("amount"));
        //4.执行查询,查询order1的amount>2并union all 上 order2的amoun<2的数据生成表
        Table result = tEnv.sqlQuery("" +
                "select * from " + orderTableA + " where amount>2 " +
                "union all " +
                "select * from orderTableB where amount<2");
        //4.1 将结果表转换成toAppendStream数据流
        //字段的名称和类型
        result.printSchema();
        DataStream<Row> resultDS = tEnv.toAppendStream(result, Row.class);
        //5.打印结果
        resultDS.print();
        //6.执行环境
        env.execute();
        // 创建实体类 user:Long product:String amount:int
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Order {
        public Long user;
        public String product;
        public int amount;
    }
}

动态表 & 连续查询

  • 动态表就是无界的数据表, 源源不断的将数据输入和输出
  • 需求: 使用SQL和Table两种方式对DataStream中的单词进行统计。
package cn.itcast.flink.sql;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.util.Arrays;
import static org.apache.flink.table.api.Expressions.$;
/**
 * Author itcast
 * Date 2021/6/22 11:16
 * Desc TODO
 */
public class FlinkSQLDemo {
    public static void main(String[] args) throws Exception {
        //1.准备环境 获取流执行环境 流表环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //流表环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        //2.Source 获取 单词信息
        //2.Source
        DataStream<Order> orderA = env.fromCollection(Arrays.asList(
                new Order(1L, "beer", 3),
                new Order(1L, "diaper", 4),
                new Order(3L, "rubber", 2)));
        DataStream<Order> orderB = env.fromCollection(Arrays.asList(
                new Order(2L, "pen", 3),
                new Order(2L, "rubber", 3),
                new Order(4L, "beer", 1)));
        //3.创建视图 WordCount
        tEnv.createTemporaryView("t_order",orderA,$("user"),$("product"),$("amount"));
        //4.执行查询 根据用户统计订单总量
        Table table = tEnv.sqlQuery(
                "select user,sum(amount) as totalAmount " +
                        " from t_order " +
                        " group by user "
        );
        //5.输出结果 retractStream获取数据流(别名)
        DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(table, Row.class);
        //6.打印输出结果
        result.print();
        //7.执行
        env.execute();
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Order {
        public Long user;
        public String product;
        public int amount;
    }
}
  • 需求
    单词统计,统计出来单词的出现次数为2 的单词的数据流打印输出,使用 Flink Table
  • 开发步骤
package cn.itcast.flink.sql;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
/**
 * Author itcast
 * Date 2021/6/22 11:29
 * Desc TODO
 */
public class FlinkTableDemo {
    public static void main(String[] args) throws Exception {
        //1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        //2.Source
        DataStream<WC> input = env.fromElements(
                new WC("Hello", 1),
                new WC("World", 1),
                new WC("Hello", 1)
        );
        //3.注册表
        Table table = tEnv.fromDataStream(input, $("word"), $("frequency"));
        //4.通过 FLinkTable API 过滤分组查询
        // select word,count(frequency) as frequency
        // from table
        // group by word
        // having count(frequency)=2;
        Table filter = table
                .groupBy($("word"))
                .select($("word"),
                        $("frequency").count().as("frequency"))
                .filter($("frequency").isEqual(2));
        //5.将结果集转换成 DataStream
        DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(filter, Row.class);
        //6.打印输出
        result.print();
        //7.执行
        env.execute();
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class WC {
        public String word;
        public long frequency;
    }
}
  • 需求
    使用Flink SQL来统计 5秒内 每个用户的 订单总数、订单的最大金额、订单的最小金额
    也就是每隔5秒统计最近5秒的每个用户的订单总数、订单的最大金额、订单的最小金额
    上面的需求使用流处理的Window的基于时间的滚动窗口就可以搞定!
    那么接下来使用FlinkTable&SQL-API来实现
  • 开发步骤
package cn.itcast.flink.SQL;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import static org.apache.flink.table.api.Expressions.$;
/**
 * Author itcast
 * Date 2021/6/23 8:42
 * Desc TODO
 */
public class FlinkTableWindow {
    public static void main(String[] args) throws Exception {
        //1.准备环境 创建流执行环境和流表环境
        //准备流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置Flink table配置
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        //准备流表环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
        //2.Source 自定义Order 每一秒中睡眠一次
        DataStreamSource<Order> source = env.addSource(new MyOrder());
        //3.Transformation 分配时间戳和水印2秒
        SingleOutputStreamOperator<Order> watermarkDS = source.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(
                Duration.ofSeconds(2)
        ).withTimestampAssigner((element, recordTimestamp) -> element.createTime));
        //4.注册表 创建临时视图并在事件时间上分配 rowtime
        tEnv.createTemporaryView("t_order",
                watermarkDS,$("orderId"),$("userId"),$("money"),$("createTime").rowtime());
        //5.编写SQL,根据 userId 和 createTime 滚动分组统计 userId、订单总笔数、最大、最小金额
        String sql="SELECT userId,count(orderId) totalCount,max(money) maxMoney,min(money) minMoney " +
                "FROM t_order " +
                "group by userId," +
                "tumble(createTime,interval '5' second)";
        //6.执行查询语句返回结果
        Table resultTable = tEnv.sqlQuery(sql);
        //7.Sink toRetractStream  → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false
        DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(resultTable, Row.class);
        //8.打印输出
        result.print();
        //9.执行
        env.execute();
    }
    public static class MyOrder extends RichSourceFunction<Order> {
        Random rm = new Random();
        boolean flag = true;
        @Override
        public void run(SourceContext<Order> ctx) throws Exception {
            while(flag) {
                String oid = UUID.randomUUID().toString();
                int uid = rm.nextInt(3);
                int money = rm.nextInt(101);
                long createTime = System.currentTimeMillis();
                //收集数据
                ctx.collect(new Order(oid, uid, money, createTime));
                Thread.sleep(1000);
            }
        }
        @Override
        public void cancel() {
            flag = false;
        }
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        //订单id
        private String orderId;
        //用户id
        private Integer userId;
        //订单金额
        private Integer money;
        //事件时间
        private Long createTime;
    }
}
  • 需求 使用 FlinkTable API 来实现订单的总笔数,最大金额和最小金额根据用户id
  • 开发步骤
package cn.itcast.flink.SQL;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;
/**
 * Author itcast
 * Date 2021/6/23 9:20
 * Desc TODO
 */
public class FlinkTableAPIWindow {
    public static void main(String[] args) throws Exception {
//1.准备环境 创建流执行环境和流表环境
        //准备流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置Flink table配置
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        //准备流表环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
        //2.Source 自定义Order 每一秒中睡眠一次
        DataStreamSource<Order> source = env.addSource(new MyOrder());
        //3.Transformation 分配时间戳和水印2秒
        SingleOutputStreamOperator<Order> watermarkDS = source.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(
                Duration.ofSeconds(2)
        ).withTimestampAssigner((element, recordTimestamp) -> element.createTime));
        //4.注册表 创建临时视图并在事件时间上分配 rowtime
        tEnv.createTemporaryView("t_order",
                watermarkDS,$("orderId"),$("userId"),$("money"),$("createTime").rowtime());
        //5.TableAPI查询
        // 获取 TableApi
        Table t_order = tEnv.from("t_order");
        //6.TableAPI 订单的统计,根据用户id 统计订单金额,最大金额和最小金额
        Table resultTable = t_order
                //6.1 根据窗口 window 分组,先有个滚动 window 窗口
                .window(Tumble.over(lit(5).second())
                .on($("createTime")).as("tumbleWindow"))
                //6.2 对用户id 和 时间window 窗口 分组
                .groupBy($("tumbleWindow"), $("userId"))
                //6.3 查询出来对用订单总笔数和最大金额和最小金额
                .select($("userId"), $("orderId").count().as("totalCount")
                        , $("money").max().as("maxMoney")
                        , $("money").min().as("minMoney"));
        //7.Sink toRetractStream  → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false
        DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(resultTable, Row.class);
        //8.打印输出
        result.print();
        //9.执行
        env.execute();
    }
    public static class MyOrder extends RichSourceFunction<Order> {
        Random rm = new Random();
        boolean flag = true;
        @Override
        public void run(SourceContext<Order> ctx) throws Exception {
            while(flag) {
                String oid = UUID.randomUUID().toString();
                int uid = rm.nextInt(3);
                int money = rm.nextInt(101);
                long createTime = System.currentTimeMillis();
                //收集数据
                ctx.collect(new Order(oid, uid, money, createTime));
                Thread.sleep(1000);
            }
        }
        @Override
        public void cancel() {
            flag = false;
        }
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        //订单id
        private String orderId;
        //用户id
        private Integer userId;
        //订单金额
        private Integer money;
        //事件时间
        private Long createTime;
    }
}
  • 需求 将kafka中的json字符串映射成一张Flink表,对这张表进行过滤分组聚合操作之后落地到 Kafka的表中如果不用 FlinkTable 直接使用 Flink DataStream 能做吗?
  1. 读取 Kafka 数据源 FlinkKafkaConsumer
  2. 将Json字符串转换成 Java Bean
  3. Flink的 filter算子 进行过滤 .filter(t->t.status.equal(“success”))
  4. 将对象 map 转换成 JSON.toJsonString => json string
  5. 写入 Kafka FlinkKafkaProducer
  • 使用 Flink TableApi 来实现过滤 status=“status”
  • 开发步骤
package cn.itcast.flink.SQL;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
 * Author itcast
 * Date 2021/6/23 9:46
 * Desc TODO
 */
public class FlinkTableKafka {
    public static void main(String[] args) throws Exception {
        //1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //创建流表环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        //2.Source
        //从 kafka 中直接映射到输入表
        TableResult inputTable = tEnv.executeSql(
                "CREATE TABLE input_kafka (\n" +
                        "  `user_id` BIGINT,\n" +
                        "  `page_id` BIGINT,\n" +
                        "  `status` STRING\n" +
                        ") WITH (\n" +
                        "  'connector' = 'kafka',\n" +  //连接的数据源是 kafka
                        "  'topic' = 'input_kafka',\n" + //映射的主题topic
                        "  'properties.bootstrap.servers' = 'node1:9092,node2:9092,node3:9092',\n" + //kafka地址
                        "  'properties.group.id' = 'default',\n" + //kafka消费的消费组
                        "  'scan.startup.mode' = 'latest-offset',\n" + //从最新的位置扫描
                        "  'format' = 'json'\n" +  //扫描的数据是格式:json格式
                        ")"
        );
        //从 kafka 中映射一个输出表
        TableResult outputTable = tEnv.executeSql(
                "CREATE TABLE output_kafka (\n" +
                        "  `user_id` BIGINT,\n" +
                        "  `page_id` BIGINT,\n" +
                        "  `status` STRING\n" +
                        ") WITH (\n" +
                        "  'connector' = 'kafka',\n" +
                        "  'topic' = 'output_kafka',\n" +
                        "  'properties.bootstrap.servers' = 'node1:9092',\n" +
                        "  'format' = 'json',\n" +
                        "  'sink.partitioner' = 'round-robin'\n" +  //分区的方式,轮训
                        ")"
        );
        String sql = "select " +
                "user_id," +
                "page_id," +
                "status " +
                "from input_kafka " +
                "where status = 'success'";
        Table ResultTable = tEnv.sqlQuery(sql);
        DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class);
        resultDS.print();
        //将满足 status = 'success' 的记录存储到 output_kafka 落地表中
        tEnv.executeSql("insert into output_kafka select * from "+ResultTable);
        //7.excute
        env.execute();
    }
}
  • 总结
  1. input_kafka 的 topic ,基于这个topic 创建一个临时表 input_kafka
  2. 基于output_kafka 的topic , output_kafka 表
  3. 读出来每一条数据并过滤出来 status=“success” 数据
  4. insert into output_kafka select * from input_kafka
  5. 直接在 output_kafka 这个topic 消费到数据
  • 可选项
Option Required Default Type Description
connector required (none) String Specify what connector to use, for Kafka use 'kafka'.
topic required for sink (none) String Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like 'topic-1;topic-2'. Note, only one of “topic-pattern” and “topic” can be specified for sources. When the table is used as sink, the topic name is the topic to write data to. Note topic list is not supported for sinks.
topic-pattern optional (none) String The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of “topic-pattern” and “topic” can be specified for sources.
properties.bootstrap.servers required (none) String Comma separated list of Kafka brokers.
properties.group.id required by source (none) String The id of the consumer group for Kafka source, optional for Kafka sink.
properties.* optional (none) String This can set and pass arbitrary Kafka configurations. Suffix names must match the configuration key defined in Kafka Configuration documentation. Flink will remove the “properties.” key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via 'properties.allow.auto.create.topics' = 'false'. But there are some configurations that do not support to set, because Flink will override them, e.g. 'key.deserializer' and 'value.deserializer'.
format required (none) String The format used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more format options. Note: Either this option or the 'value.format' option are required.
key.format optional (none) String The format used to deserialize and serialize the key part of Kafka messages. Please refer to the formats page for more details and more format options. Note: If a key format is defined, the 'key.fields' option is required as well. Otherwise the Kafka records will have an empty key.
key.fields optional [] List Defines an explicit list of physical columns from the table schema that configure the data type for the key format. By default, this list is empty and thus a key is undefined. The list should look like 'field1;field2'.
key.fields-prefix optional (none) String Defines a custom prefix for all fields of the key format to avoid name clashes with fields of the value format. By default, the prefix is empty. If a custom prefix is defined, both the table schema and 'key.fields' will work with prefixed names. When constructing the data type of the key format, the prefix will be removed and the non-prefixed names will be used within the key format. Please note that this option requires that 'value.fields-include' must be set to 'EXCEPT_KEY'.
value.format required (none) String The format used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more format options. Note: Either this option or the 'format' option are required.
value.fields-include optional ALL EnumPossible values: [ALL, EXCEPT_KEY] Defines a strategy how to deal with key columns in the data type of the value format. By default, 'ALL' physical columns of the table schema will be included in the value format which means that key columns appear in the data type for both the key and value format.
scan.startup.mode optional group-offsets String Startup mode for Kafka consumer, valid values are 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'. See the following Start Reading Position for more details.
scan.startup.specific-offsets optional (none) String Specify offsets for each partition in case of 'specific-offsets' startup mode, e.g. 'partition:0,offset:42;partition:1,offset:300'.
scan.startup.timestamp-millis optional (none) Long Start from the specified epoch timestamp (milliseconds) used in case of 'timestamp' startup mode.
scan.topic-partition-discovery.interval optional (none) Duration Interval for consumer to discover dynamically created Kafka topics and partitions periodically.
sink.partitioner optional ‘default’ String Output partitioning from Flink’s partitions into Kafka’s partitions. Valid values aredefault: use the kafka default partitioner to partition records.fixed: each Flink partition ends up in at most one Kafka partition.round-robin: a Flink partition is distributed to Kafka partitions sticky round-robin. It only works when record’s keys are not specified.Custom FlinkKafkaPartitioner subclass: e.g. 'org.mycompany.MyPartitioner'.See the following Sink Partitioning for more details.
sink.semantic optional at-least-once String Defines the delivery semantic for the Kafka sink. Valid enumerationns are 'at-least-once', 'exactly-once' and 'none'. See Consistency guarantees for more details.
sink.parallelism optional (none) Integer Defines the parallelism of the Kafka sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.

Flink-SQL常用算子

  • 常用算子 Join - full join

多语言实现WordCount

  • 使用 scala 实现wordcount
package cn.itcast.flink.scala.demo
import cn.itcast.flink.SQL.WordCountData
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig
object HelloWorld {
  def main(args: Array[String]): Unit = {
    //1.创建流执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //获取当前上下文的参数
    val params: ParameterTool = ParameterTool.fromArgs(args)
    //将当前的参数设置到全局变量中
    env.getConfig.setGlobalJobParameters(params)
    //设置 checkpoint
    env.enableCheckpointing(1000)
    //设置checkpoint 保存 stateback
    env.setStateBackend(new FsStateBackend("file:///d:/chk"))
    //当前flink任务结束了checkpoint 不删除
    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    env.getCheckpointConfig.setCheckpointInterval(60000)
    //设置重启策略
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000))
    //并行度
    env.setParallelism(1)
    //2.读取数据源
    val source: DataStream[String] = env.fromElements(WordCountData.WORDS: _*)
    //3.转换任务
    val result: DataStream[(String, Int)] = source.flatMap(_.split("\\W+"))
      .map((_, 1))
      .keyBy(_._1)
      .sum(1)
    //4.打印输出
    result.print()
    //5.执行流环境
    env.execute()
  }
}

问题

  • 建模块和导包问题
  • Operator state 案例 - Checkpoint
    如果不设置一秒钟产生一条记录, 不会保存state ,每次还是 从头开始。
    原因:checkpoint 需要 1s ,但是每5条数据生成报异常 Exception ,5条数据生成完之后并没有做完整的checkpoint 状态备份,每次重启之后都重新开始消费。

  • Kafka tool 连接node1:9092集群
  1. 使用 node1 node2 node3
# 在 windows HOSTS文件
192.168.88.161 node1 node1.itcast.cn
192.168.88.162 node2 node2.itcast.cn
192.168.88.163 node3 node3.itcast.cn
  1. 防火墙 windows 和 Linux ,杀毒软件是否关闭
  2. 在配置文件中
  • advertised.listeners=192.168.88.161:9092
  1. 重启 zookeeper 和 kafka
  • FlinkSQL 需要有空格分割,否则报语义错误
Table result = tEnv.sqlQuery("" +
                "select * from " + orderTableA + " where amount>2 " +
                "union all " +
                "select * from orderTableB where amount<2");
  • FlinkTable & SQL ,Table.printSchema()
    打印当前表结构 , 字段,字段类型
    ckpoint 状态备份,每次重启之后都重新开始消费。

[外链图片转存中…(img-2s6cRETy-1624435933007)]

  • Kafka tool 连接node1:9092集群
  1. 使用 node1 node2 node3
# 在 windows HOSTS文件
192.168.88.161 node1 node1.itcast.cn
192.168.88.162 node2 node2.itcast.cn
192.168.88.163 node3 node3.itcast.cn
  1. 防火墙 windows 和 Linux ,杀毒软件是否关闭
  2. 在配置文件中
  • advertised.listeners=192.168.88.161:9092
  1. 重启 zookeeper 和 kafka
  • FlinkSQL 需要有空格分割,否则报语义错误
Table result = tEnv.sqlQuery("" +
                "select * from " + orderTableA + " where amount>2 " +
                "union all " +
                "select * from orderTableB where amount<2");
  • FlinkTable & SQL ,Table.printSchema()
    打印当前表结构 , 字段,字段类型


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
6天前
|
消息中间件 自然语言处理 API
老板让阿粉学习 flink 中的 Watermark,现在他出教程了
老板让阿粉学习 flink 中的 Watermark,现在他出教程了
|
16天前
|
SQL 关系型数据库 数据库
实时计算 Flink版操作报错合集之在本地执行代码没有问题,但是在线执行sql命令就会报错,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
16天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在执行SQL语句时遇到了类找不到,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
26天前
|
SQL 存储 API
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(5)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】
|
26天前
|
SQL 消息中间件 Java
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(4)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】
|
26天前
|
SQL Java API
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(3)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】
|
16天前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
769 0
|
16天前
|
存储 SQL 关系型数据库
实时计算 Flink版操作报错合集之按时间恢复时,报错:在尝试读取binlog时发现所需的binlog位置不再可用,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
682 0
|
16天前
|
消息中间件 资源调度 Java
实时计算 Flink版操作报错合集之遇到了缺少包的错误,已经添加了相应的 jar 包,仍然出现同样的报错,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
658 2
|
16天前
|
监控 Oracle 关系型数据库
实时计算 Flink版操作报错合集之在配置连接时,添加了scan.startup.mode参数后,出现报错。是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
746 0