2021年最新最全Flink系列教程__FlinkTable&SQL(六、七)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 2021年最新最全Flink系列教程__FlinkTable&SQL(六、七)

day06-07_FlinkSQL&Table

今日目标

  • 了解Flink Table&SQL发展历史
  • 了解为什么要使用Table API & SQL
  • 掌握Flink Table&SQL进行批处理开发
  • 掌握Flink Table&SQL进行流处理开发
  • 掌握常用的开发案例
  • Flink-SQL的常用算子

Flink Table & SQL

  • FlinkTable & SQL 是抽象级别更高的操作, 底层Flink Runtime => Stream 流程
  • 批处理是流处理的一种特殊形态
  • FlinkSQL 遵循ANSI的SQL规范
  • Flink1.9之前, FlinkSQL包括两套Table api , DataStream Table API(流处理) ,DataSet Table API(批处理)
  • Planner 查询器, 抽象语法树,parser、optimizer、codegen(模板代码生成),最终生成 Flink Runtime 直接进行执行的代码
  • Planner包括old Planner 和 Blink Planner ,Blink Planner 底层实现了 流批一体(默认的Planner)

FlinkTable & SQL 程序结构

  • 导入 pom 依赖, jar包坐标
<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- flink执行计划,这是1.9版本之前的-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- blink执行计划,1.11+默认的-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>

  • 创建 FlinkTable FlinkSQL的 表的方式
// table is the result of a simple projection query 
Table projTable = tableEnv.from("X").select(...);
// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable);
  • SQL的四种语句
  1. DDL 数据定义语言, 创建数据库、表,删除数据库、表
  2. DML 数据操作语言, 对数据进行增、删、改操作
  3. DCL 数据控制语言, 对数据的操作权限进行设置 grant revoke
  4. DQL 数据查询语言,对数据表中的数据进行查询,基础查询,复杂查询,多表查询,子查询
  • 需求
    将两个数据流 DataStream 通过 FlinkTable & SQL API 进行 union all 操作,条件ds1 amount>2 union all ds2 amount<2
  • 开发步骤
package cn.itcast.flink.sql;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.util.Arrays;
import static org.apache.flink.table.api.Expressions.$;
/**
 * Author itcast
 * Date 2021/6/22 9:45
 * Desc TODO
 */
public class FlinkTableAPIDemo {
    public static void main(String[] args) throws Exception {
        //1.准备环境 创建流环境 和 流表环境,并行度设置为1
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        //创建流表环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,settings);
        //2.Source 创建数据集
        DataStream<Order> orderA = env.fromCollection(Arrays.asList(
                new Order(1L, "beer", 3),
                new Order(1L, "diaper", 4),
                new Order(3L, "rubber", 2)));
        DataStream<Order> orderB = env.fromCollection(Arrays.asList(
                new Order(2L, "pen", 3),
                new Order(2L, "rubber", 3),
                new Order(4L, "beer", 1)));
        //3.注册表 将数据流转换成表
        // 通过fromDataStream将数据流转换成表
        Table orderTableA = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount"));
        // 将数据流转换成 创建临时视图
        tEnv.createTemporaryView("orderTableB",orderB,$("user"), $("product"), $("amount"));
        //4.执行查询,查询order1的amount>2并union all 上 order2的amoun<2的数据生成表
        Table result = tEnv.sqlQuery("" +
                "select * from " + orderTableA + " where amount>2 " +
                "union all " +
                "select * from orderTableB where amount<2");
        //4.1 将结果表转换成toAppendStream数据流
        //字段的名称和类型
        result.printSchema();
        DataStream<Row> resultDS = tEnv.toAppendStream(result, Row.class);
        //5.打印结果
        resultDS.print();
        //6.执行环境
        env.execute();
        // 创建实体类 user:Long product:String amount:int
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Order {
        public Long user;
        public String product;
        public int amount;
    }
}

动态表 & 连续查询

  • 动态表就是无界的数据表, 源源不断的将数据输入和输出
  • 需求: 使用SQL和Table两种方式对DataStream中的单词进行统计。
package cn.itcast.flink.sql;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.util.Arrays;
import static org.apache.flink.table.api.Expressions.$;
/**
 * Author itcast
 * Date 2021/6/22 11:16
 * Desc TODO
 */
public class FlinkSQLDemo {
    public static void main(String[] args) throws Exception {
        //1.准备环境 获取流执行环境 流表环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //流表环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        //2.Source 获取 单词信息
        //2.Source
        DataStream<Order> orderA = env.fromCollection(Arrays.asList(
                new Order(1L, "beer", 3),
                new Order(1L, "diaper", 4),
                new Order(3L, "rubber", 2)));
        DataStream<Order> orderB = env.fromCollection(Arrays.asList(
                new Order(2L, "pen", 3),
                new Order(2L, "rubber", 3),
                new Order(4L, "beer", 1)));
        //3.创建视图 WordCount
        tEnv.createTemporaryView("t_order",orderA,$("user"),$("product"),$("amount"));
        //4.执行查询 根据用户统计订单总量
        Table table = tEnv.sqlQuery(
                "select user,sum(amount) as totalAmount " +
                        " from t_order " +
                        " group by user "
        );
        //5.输出结果 retractStream获取数据流(别名)
        DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(table, Row.class);
        //6.打印输出结果
        result.print();
        //7.执行
        env.execute();
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Order {
        public Long user;
        public String product;
        public int amount;
    }
}
  • 需求
    单词统计,统计出来单词的出现次数为2 的单词的数据流打印输出,使用 Flink Table
  • 开发步骤
package cn.itcast.flink.sql;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
/**
 * Author itcast
 * Date 2021/6/22 11:29
 * Desc TODO
 */
public class FlinkTableDemo {
    public static void main(String[] args) throws Exception {
        //1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        //2.Source
        DataStream<WC> input = env.fromElements(
                new WC("Hello", 1),
                new WC("World", 1),
                new WC("Hello", 1)
        );
        //3.注册表
        Table table = tEnv.fromDataStream(input, $("word"), $("frequency"));
        //4.通过 FLinkTable API 过滤分组查询
        // select word,count(frequency) as frequency
        // from table
        // group by word
        // having count(frequency)=2;
        Table filter = table
                .groupBy($("word"))
                .select($("word"),
                        $("frequency").count().as("frequency"))
                .filter($("frequency").isEqual(2));
        //5.将结果集转换成 DataStream
        DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(filter, Row.class);
        //6.打印输出
        result.print();
        //7.执行
        env.execute();
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class WC {
        public String word;
        public long frequency;
    }
}
  • 需求
    使用Flink SQL来统计 5秒内 每个用户的 订单总数、订单的最大金额、订单的最小金额
    也就是每隔5秒统计最近5秒的每个用户的订单总数、订单的最大金额、订单的最小金额
    上面的需求使用流处理的Window的基于时间的滚动窗口就可以搞定!
    那么接下来使用FlinkTable&SQL-API来实现
  • 开发步骤
package cn.itcast.flink.SQL;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import static org.apache.flink.table.api.Expressions.$;
/**
 * Author itcast
 * Date 2021/6/23 8:42
 * Desc TODO
 */
public class FlinkTableWindow {
    public static void main(String[] args) throws Exception {
        //1.准备环境 创建流执行环境和流表环境
        //准备流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置Flink table配置
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        //准备流表环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
        //2.Source 自定义Order 每一秒中睡眠一次
        DataStreamSource<Order> source = env.addSource(new MyOrder());
        //3.Transformation 分配时间戳和水印2秒
        SingleOutputStreamOperator<Order> watermarkDS = source.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(
                Duration.ofSeconds(2)
        ).withTimestampAssigner((element, recordTimestamp) -> element.createTime));
        //4.注册表 创建临时视图并在事件时间上分配 rowtime
        tEnv.createTemporaryView("t_order",
                watermarkDS,$("orderId"),$("userId"),$("money"),$("createTime").rowtime());
        //5.编写SQL,根据 userId 和 createTime 滚动分组统计 userId、订单总笔数、最大、最小金额
        String sql="SELECT userId,count(orderId) totalCount,max(money) maxMoney,min(money) minMoney " +
                "FROM t_order " +
                "group by userId," +
                "tumble(createTime,interval '5' second)";
        //6.执行查询语句返回结果
        Table resultTable = tEnv.sqlQuery(sql);
        //7.Sink toRetractStream  → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false
        DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(resultTable, Row.class);
        //8.打印输出
        result.print();
        //9.执行
        env.execute();
    }
    public static class MyOrder extends RichSourceFunction<Order> {
        Random rm = new Random();
        boolean flag = true;
        @Override
        public void run(SourceContext<Order> ctx) throws Exception {
            while(flag) {
                String oid = UUID.randomUUID().toString();
                int uid = rm.nextInt(3);
                int money = rm.nextInt(101);
                long createTime = System.currentTimeMillis();
                //收集数据
                ctx.collect(new Order(oid, uid, money, createTime));
                Thread.sleep(1000);
            }
        }
        @Override
        public void cancel() {
            flag = false;
        }
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        //订单id
        private String orderId;
        //用户id
        private Integer userId;
        //订单金额
        private Integer money;
        //事件时间
        private Long createTime;
    }
}
  • 需求 使用 FlinkTable API 来实现订单的总笔数,最大金额和最小金额根据用户id
  • 开发步骤
package cn.itcast.flink.SQL;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;
/**
 * Author itcast
 * Date 2021/6/23 9:20
 * Desc TODO
 */
public class FlinkTableAPIWindow {
    public static void main(String[] args) throws Exception {
//1.准备环境 创建流执行环境和流表环境
        //准备流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置Flink table配置
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        //准备流表环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
        //2.Source 自定义Order 每一秒中睡眠一次
        DataStreamSource<Order> source = env.addSource(new MyOrder());
        //3.Transformation 分配时间戳和水印2秒
        SingleOutputStreamOperator<Order> watermarkDS = source.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(
                Duration.ofSeconds(2)
        ).withTimestampAssigner((element, recordTimestamp) -> element.createTime));
        //4.注册表 创建临时视图并在事件时间上分配 rowtime
        tEnv.createTemporaryView("t_order",
                watermarkDS,$("orderId"),$("userId"),$("money"),$("createTime").rowtime());
        //5.TableAPI查询
        // 获取 TableApi
        Table t_order = tEnv.from("t_order");
        //6.TableAPI 订单的统计,根据用户id 统计订单金额,最大金额和最小金额
        Table resultTable = t_order
                //6.1 根据窗口 window 分组,先有个滚动 window 窗口
                .window(Tumble.over(lit(5).second())
                .on($("createTime")).as("tumbleWindow"))
                //6.2 对用户id 和 时间window 窗口 分组
                .groupBy($("tumbleWindow"), $("userId"))
                //6.3 查询出来对用订单总笔数和最大金额和最小金额
                .select($("userId"), $("orderId").count().as("totalCount")
                        , $("money").max().as("maxMoney")
                        , $("money").min().as("minMoney"));
        //7.Sink toRetractStream  → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false
        DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(resultTable, Row.class);
        //8.打印输出
        result.print();
        //9.执行
        env.execute();
    }
    public static class MyOrder extends RichSourceFunction<Order> {
        Random rm = new Random();
        boolean flag = true;
        @Override
        public void run(SourceContext<Order> ctx) throws Exception {
            while(flag) {
                String oid = UUID.randomUUID().toString();
                int uid = rm.nextInt(3);
                int money = rm.nextInt(101);
                long createTime = System.currentTimeMillis();
                //收集数据
                ctx.collect(new Order(oid, uid, money, createTime));
                Thread.sleep(1000);
            }
        }
        @Override
        public void cancel() {
            flag = false;
        }
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        //订单id
        private String orderId;
        //用户id
        private Integer userId;
        //订单金额
        private Integer money;
        //事件时间
        private Long createTime;
    }
}
  • 需求 将kafka中的json字符串映射成一张Flink表,对这张表进行过滤分组聚合操作之后落地到 Kafka的表中如果不用 FlinkTable 直接使用 Flink DataStream 能做吗?
  1. 读取 Kafka 数据源 FlinkKafkaConsumer
  2. 将Json字符串转换成 Java Bean
  3. Flink的 filter算子 进行过滤 .filter(t->t.status.equal(“success”))
  4. 将对象 map 转换成 JSON.toJsonString => json string
  5. 写入 Kafka FlinkKafkaProducer
  • 使用 Flink TableApi 来实现过滤 status=“status”
  • 开发步骤
package cn.itcast.flink.SQL;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
 * Author itcast
 * Date 2021/6/23 9:46
 * Desc TODO
 */
public class FlinkTableKafka {
    public static void main(String[] args) throws Exception {
        //1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //创建流表环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        //2.Source
        //从 kafka 中直接映射到输入表
        TableResult inputTable = tEnv.executeSql(
                "CREATE TABLE input_kafka (\n" +
                        "  `user_id` BIGINT,\n" +
                        "  `page_id` BIGINT,\n" +
                        "  `status` STRING\n" +
                        ") WITH (\n" +
                        "  'connector' = 'kafka',\n" +  //连接的数据源是 kafka
                        "  'topic' = 'input_kafka',\n" + //映射的主题topic
                        "  'properties.bootstrap.servers' = 'node1:9092,node2:9092,node3:9092',\n" + //kafka地址
                        "  'properties.group.id' = 'default',\n" + //kafka消费的消费组
                        "  'scan.startup.mode' = 'latest-offset',\n" + //从最新的位置扫描
                        "  'format' = 'json'\n" +  //扫描的数据是格式:json格式
                        ")"
        );
        //从 kafka 中映射一个输出表
        TableResult outputTable = tEnv.executeSql(
                "CREATE TABLE output_kafka (\n" +
                        "  `user_id` BIGINT,\n" +
                        "  `page_id` BIGINT,\n" +
                        "  `status` STRING\n" +
                        ") WITH (\n" +
                        "  'connector' = 'kafka',\n" +
                        "  'topic' = 'output_kafka',\n" +
                        "  'properties.bootstrap.servers' = 'node1:9092',\n" +
                        "  'format' = 'json',\n" +
                        "  'sink.partitioner' = 'round-robin'\n" +  //分区的方式,轮训
                        ")"
        );
        String sql = "select " +
                "user_id," +
                "page_id," +
                "status " +
                "from input_kafka " +
                "where status = 'success'";
        Table ResultTable = tEnv.sqlQuery(sql);
        DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class);
        resultDS.print();
        //将满足 status = 'success' 的记录存储到 output_kafka 落地表中
        tEnv.executeSql("insert into output_kafka select * from "+ResultTable);
        //7.excute
        env.execute();
    }
}
  • 总结
  1. input_kafka 的 topic ,基于这个topic 创建一个临时表 input_kafka
  2. 基于output_kafka 的topic , output_kafka 表
  3. 读出来每一条数据并过滤出来 status=“success” 数据
  4. insert into output_kafka select * from input_kafka
  5. 直接在 output_kafka 这个topic 消费到数据
  • 可选项
Option Required Default Type Description
connector required (none) String Specify what connector to use, for Kafka use 'kafka'.
topic required for sink (none) String Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like 'topic-1;topic-2'. Note, only one of “topic-pattern” and “topic” can be specified for sources. When the table is used as sink, the topic name is the topic to write data to. Note topic list is not supported for sinks.
topic-pattern optional (none) String The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of “topic-pattern” and “topic” can be specified for sources.
properties.bootstrap.servers required (none) String Comma separated list of Kafka brokers.
properties.group.id required by source (none) String The id of the consumer group for Kafka source, optional for Kafka sink.
properties.* optional (none) String This can set and pass arbitrary Kafka configurations. Suffix names must match the configuration key defined in Kafka Configuration documentation. Flink will remove the “properties.” key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via 'properties.allow.auto.create.topics' = 'false'. But there are some configurations that do not support to set, because Flink will override them, e.g. 'key.deserializer' and 'value.deserializer'.
format required (none) String The format used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more format options. Note: Either this option or the 'value.format' option are required.
key.format optional (none) String The format used to deserialize and serialize the key part of Kafka messages. Please refer to the formats page for more details and more format options. Note: If a key format is defined, the 'key.fields' option is required as well. Otherwise the Kafka records will have an empty key.
key.fields optional [] List Defines an explicit list of physical columns from the table schema that configure the data type for the key format. By default, this list is empty and thus a key is undefined. The list should look like 'field1;field2'.
key.fields-prefix optional (none) String Defines a custom prefix for all fields of the key format to avoid name clashes with fields of the value format. By default, the prefix is empty. If a custom prefix is defined, both the table schema and 'key.fields' will work with prefixed names. When constructing the data type of the key format, the prefix will be removed and the non-prefixed names will be used within the key format. Please note that this option requires that 'value.fields-include' must be set to 'EXCEPT_KEY'.
value.format required (none) String The format used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more format options. Note: Either this option or the 'format' option are required.
value.fields-include optional ALL EnumPossible values: [ALL, EXCEPT_KEY] Defines a strategy how to deal with key columns in the data type of the value format. By default, 'ALL' physical columns of the table schema will be included in the value format which means that key columns appear in the data type for both the key and value format.
scan.startup.mode optional group-offsets String Startup mode for Kafka consumer, valid values are 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'. See the following Start Reading Position for more details.
scan.startup.specific-offsets optional (none) String Specify offsets for each partition in case of 'specific-offsets' startup mode, e.g. 'partition:0,offset:42;partition:1,offset:300'.
scan.startup.timestamp-millis optional (none) Long Start from the specified epoch timestamp (milliseconds) used in case of 'timestamp' startup mode.
scan.topic-partition-discovery.interval optional (none) Duration Interval for consumer to discover dynamically created Kafka topics and partitions periodically.
sink.partitioner optional ‘default’ String Output partitioning from Flink’s partitions into Kafka’s partitions. Valid values aredefault: use the kafka default partitioner to partition records.fixed: each Flink partition ends up in at most one Kafka partition.round-robin: a Flink partition is distributed to Kafka partitions sticky round-robin. It only works when record’s keys are not specified.Custom FlinkKafkaPartitioner subclass: e.g. 'org.mycompany.MyPartitioner'.See the following Sink Partitioning for more details.
sink.semantic optional at-least-once String Defines the delivery semantic for the Kafka sink. Valid enumerationns are 'at-least-once', 'exactly-once' and 'none'. See Consistency guarantees for more details.
sink.parallelism optional (none) Integer Defines the parallelism of the Kafka sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.

Flink-SQL常用算子

  • 常用算子 Join - full join

多语言实现WordCount

  • 使用 scala 实现wordcount
package cn.itcast.flink.scala.demo
import cn.itcast.flink.SQL.WordCountData
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig
object HelloWorld {
  def main(args: Array[String]): Unit = {
    //1.创建流执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //获取当前上下文的参数
    val params: ParameterTool = ParameterTool.fromArgs(args)
    //将当前的参数设置到全局变量中
    env.getConfig.setGlobalJobParameters(params)
    //设置 checkpoint
    env.enableCheckpointing(1000)
    //设置checkpoint 保存 stateback
    env.setStateBackend(new FsStateBackend("file:///d:/chk"))
    //当前flink任务结束了checkpoint 不删除
    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    env.getCheckpointConfig.setCheckpointInterval(60000)
    //设置重启策略
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000))
    //并行度
    env.setParallelism(1)
    //2.读取数据源
    val source: DataStream[String] = env.fromElements(WordCountData.WORDS: _*)
    //3.转换任务
    val result: DataStream[(String, Int)] = source.flatMap(_.split("\\W+"))
      .map((_, 1))
      .keyBy(_._1)
      .sum(1)
    //4.打印输出
    result.print()
    //5.执行流环境
    env.execute()
  }
}

问题

  • 建模块和导包问题
  • Operator state 案例 - Checkpoint
    如果不设置一秒钟产生一条记录, 不会保存state ,每次还是 从头开始。
    原因:checkpoint 需要 1s ,但是每5条数据生成报异常 Exception ,5条数据生成完之后并没有做完整的checkpoint 状态备份,每次重启之后都重新开始消费。

  • Kafka tool 连接node1:9092集群
  1. 使用 node1 node2 node3
# 在 windows HOSTS文件
192.168.88.161 node1 node1.itcast.cn
192.168.88.162 node2 node2.itcast.cn
192.168.88.163 node3 node3.itcast.cn
  1. 防火墙 windows 和 Linux ,杀毒软件是否关闭
  2. 在配置文件中
  • advertised.listeners=192.168.88.161:9092
  1. 重启 zookeeper 和 kafka
  • FlinkSQL 需要有空格分割,否则报语义错误
Table result = tEnv.sqlQuery("" +
                "select * from " + orderTableA + " where amount>2 " +
                "union all " +
                "select * from orderTableB where amount<2");
  • FlinkTable & SQL ,Table.printSchema()
    打印当前表结构 , 字段,字段类型
    ckpoint 状态备份,每次重启之后都重新开始消费。

[外链图片转存中…(img-2s6cRETy-1624435933007)]

  • Kafka tool 连接node1:9092集群
  1. 使用 node1 node2 node3
# 在 windows HOSTS文件
192.168.88.161 node1 node1.itcast.cn
192.168.88.162 node2 node2.itcast.cn
192.168.88.163 node3 node3.itcast.cn
  1. 防火墙 windows 和 Linux ,杀毒软件是否关闭
  2. 在配置文件中
  • advertised.listeners=192.168.88.161:9092
  1. 重启 zookeeper 和 kafka
  • FlinkSQL 需要有空格分割,否则报语义错误
Table result = tEnv.sqlQuery("" +
                "select * from " + orderTableA + " where amount>2 " +
                "union all " +
                "select * from orderTableB where amount<2");
  • FlinkTable & SQL ,Table.printSchema()
    打印当前表结构 , 字段,字段类型


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
200 15
|
7天前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
70 14
|
1月前
|
SQL 缓存 Java
【详细实用のMyBatis教程】获取参数值和结果的各种情况、自定义映射、动态SQL、多级缓存、逆向工程、分页插件
本文详细介绍了MyBatis的各种常见用法MyBatis多级缓存、逆向工程、分页插件 包括获取参数值和结果的各种情况、自定义映射resultMap、动态SQL
【详细实用のMyBatis教程】获取参数值和结果的各种情况、自定义映射、动态SQL、多级缓存、逆向工程、分页插件
|
2月前
|
SQL 数据管理 数据库
SQL语句实例教程:掌握数据查询、更新与管理的关键技巧
SQL(Structured Query Language,结构化查询语言)是数据库管理和操作的核心工具
|
3月前
|
SQL 安全 Go
SQL注入不可怕,XSS也不难防!Python Web安全进阶教程,让你安心做开发!
在Web开发中,安全至关重要,尤其要警惕SQL注入和XSS攻击。SQL注入通过在数据库查询中插入恶意代码来窃取或篡改数据,而XSS攻击则通过注入恶意脚本来窃取用户敏感信息。本文将带你深入了解这两种威胁,并提供Python实战技巧,包括使用参数化查询和ORM框架防御SQL注入,以及利用模板引擎自动转义和内容安全策略(CSP)防范XSS攻击。通过掌握这些方法,你将能够更加自信地应对Web安全挑战,确保应用程序的安全性。
104 3
|
2月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
59 0
|
3月前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
52 1
|
3月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
5月前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
135 13
|
5月前
|
SQL
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。