【Flink】小白级入门,Flink sql 的基础用法(下)

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 【Flink】小白级入门,Flink sql 的基础用法
// TODO 下面代码仅供参考,具体测试根据自己时间环境来
//    以下只是一些简单的案例,后面会逐步深入复杂sql和原理层面
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
 * @author 857hub
 */
public class ClickhouseSinkApp {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(
                env,
                EnvironmentSettings.newInstance().
                        //      useBlinkPlanner().
                                build()
        );
        tEnv.getConfig().getConfiguration().setString(PipelineOptions.NAME, "sql test");
        // sources
        String source = "CREATE TABLE source (\n" +
                "  `id` int,\n" +
                "  `name` varchar.\n" +
                "  `ts` timestamp(3),\n" +
                // 指定watermark 允许延迟5s
                "WATERMARK FOR ts AS ts - INTERVAL '5' SECOND"+
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'test1',\n" +
                "  'properties.bootstrap.servers' = '172.16.100.109:9092',\n" +
                "  'properties.group.id' = 'xzw',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'format' = 'json'\n" +
                ")";
        String source2 = "CREATE TABLE source2 (\n" +
                "  `id` int,\n" +
                "  `name` varchar,\n" +
                "  `ts` timestamp(3)\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'test2',\n" +
                "  'properties.bootstrap.servers' = '172.16.100.109:9092',\n" +
                "  'properties.group.id' = 'xzw',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'format' = 'json'\n" +
                ")";
        // clickhouse sink 由我自己定义,后面会对sql自定义source和sink进行讲解
        String sink = "CREATE TABLE sink (\n" +
                "       `id` INT,\n" +
                "       `name` VARCHAR\n" +
                ") WITH (\n" +
                // 需要自定义接信息参数    --  option
                "      'connector' = 'xzw_ck',\n" +
                "      'url' = 'jdbc:clickhouse://localhost:8123/default',\n" +
                "      'table-name' = 'test',\n" +
                "      'username' = 'default',\n" +
                "      'password' = '123456'\n" +
                "      )";
      // 执行 source sink sql
        tEnv.executeSql(source);
        tEnv.executeSql(source2);
        tEnv.executeSql(sink);
      /*
      由于是简单使用,没有在场景应用,简单介绍一下区别,可以根据们不同的区别在自己项目中使用
       left json : 无论是否join上都返回左表的数据
       inner join : 只有join上才会返回匹配后的结果
       full outer join : 两边的数据都会返回,无论是否join上,没有的则为null
       interval join : 基于时间范围内的join,在指定的时间范围内返回join上的数据
      */
        String joinSql = "select * from source1 s1" +
                "left join source2 s2" +
                // 内连接
//                "inner join source2" || "join source2"
                // 全连接
//                "full outer join source2"
                // 时间范围join
//                "s1.ts >= s2.ts AND s1.ts < s2.ts + INTERVAL '10' MINUTE" +
                " on s1.id =s2.id "
                ;
        Table joinTable = tEnv.sqlQuery(joinSql);
        // 分组排序,取topN,  如果要是去重 rnum=1即可实现去重操作
        String insertSql = "insert into sink select id,name from(" +
                "select *," +
                "row_number() over(partition by id order by ts) as rnum " +
                "from "+joinTable+" where rnum < 5 " +
                ")";
        // add insert sql
        TableResult tableResult = executeSql(tEnv, "insert into sink select * from source", "*",insertSql);
        // 随意使用
        // Optional<JobClient> jobClient = tableResult.getJobClient();
    }
  // 添加多个sql具体执行
    private static TableResult executeSql(StreamTableEnvironment tEnv, final String... sqls) {
        StatementSet statementSet = tEnv.createStatementSet();
        for (String sql : sqls) {
            if ("*".equals(sql) || sql.length()>=27) {
                continue;
            }
            statementSet.addInsertSql(sql);
        }
        return statementSet.execute();
    }
}
maven 依赖
<properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.12.2</flink.version>
        <scala.version>2.11</scala.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!---->
        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.2</version>
        </dependency>
        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.2.4</version>
        </dependency>
    </dependencies>
相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
3月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
652 43
|
3月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
232 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
4月前
|
SQL 消息中间件 Kafka
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是 Apache Flink 提供的 SQL 引擎,支持流批一体处理,统一操作流数据与批数据,具备高性能、低延迟、丰富数据源支持及标准 SQL 兼容性,适用于实时与离线数据分析。
675 1
|
3月前
|
SQL 分布式计算 大数据
SparkSQL 入门指南:小白也能懂的大数据 SQL 处理神器
在大数据处理的领域,SparkSQL 是一种非常强大的工具,它可以让开发人员以 SQL 的方式处理和查询大规模数据集。SparkSQL 集成了 SQL 查询引擎和 Spark 的分布式计算引擎,使得我们可以在分布式环境下执行 SQL 查询,并能利用 Spark 的强大计算能力进行数据分析。
|
8月前
|
SQL Oracle 关系型数据库
【YashanDB知识库】共享利用Python脚本解决Oracle的SQL脚本@@用法
【YashanDB知识库】共享利用Python脚本解决Oracle的SQL脚本@@用法
|
10月前
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
1823 27
|
8月前
|
SQL Oracle 关系型数据库
【YashanDB知识库】共享利用Python脚本解决Oracle的SQL脚本@@用法
本文来自YashanDB官网,介绍如何处理Oracle客户端sql*plus中使用@@调用同级目录SQL脚本的场景。崖山数据库23.2.x.100已支持@@用法,但旧版本可通过Python脚本批量重写SQL文件,将@@替换为绝对路径。文章通过Oracle示例展示了具体用法,并提供Python脚本实现自动化处理,最后调整批处理脚本以适配YashanDB运行环境。
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
494 13
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。
303 9
下一篇
开通oss服务