【Flink】小白级入门,Flink sql 的基础用法(下)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【Flink】小白级入门,Flink sql 的基础用法
// TODO 下面代码仅供参考,具体测试根据自己时间环境来
//    以下只是一些简单的案例,后面会逐步深入复杂sql和原理层面
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
 * @author 857hub
 */
public class ClickhouseSinkApp {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(
                env,
                EnvironmentSettings.newInstance().
                        //      useBlinkPlanner().
                                build()
        );
        tEnv.getConfig().getConfiguration().setString(PipelineOptions.NAME, "sql test");
        // sources
        String source = "CREATE TABLE source (\n" +
                "  `id` int,\n" +
                "  `name` varchar.\n" +
                "  `ts` timestamp(3),\n" +
                // 指定watermark 允许延迟5s
                "WATERMARK FOR ts AS ts - INTERVAL '5' SECOND"+
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'test1',\n" +
                "  'properties.bootstrap.servers' = '172.16.100.109:9092',\n" +
                "  'properties.group.id' = 'xzw',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'format' = 'json'\n" +
                ")";
        String source2 = "CREATE TABLE source2 (\n" +
                "  `id` int,\n" +
                "  `name` varchar,\n" +
                "  `ts` timestamp(3)\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'test2',\n" +
                "  'properties.bootstrap.servers' = '172.16.100.109:9092',\n" +
                "  'properties.group.id' = 'xzw',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'format' = 'json'\n" +
                ")";
        // clickhouse sink 由我自己定义,后面会对sql自定义source和sink进行讲解
        String sink = "CREATE TABLE sink (\n" +
                "       `id` INT,\n" +
                "       `name` VARCHAR\n" +
                ") WITH (\n" +
                // 需要自定义接信息参数    --  option
                "      'connector' = 'xzw_ck',\n" +
                "      'url' = 'jdbc:clickhouse://localhost:8123/default',\n" +
                "      'table-name' = 'test',\n" +
                "      'username' = 'default',\n" +
                "      'password' = '123456'\n" +
                "      )";
      // 执行 source sink sql
        tEnv.executeSql(source);
        tEnv.executeSql(source2);
        tEnv.executeSql(sink);
      /*
      由于是简单使用,没有在场景应用,简单介绍一下区别,可以根据们不同的区别在自己项目中使用
       left json : 无论是否join上都返回左表的数据
       inner join : 只有join上才会返回匹配后的结果
       full outer join : 两边的数据都会返回,无论是否join上,没有的则为null
       interval join : 基于时间范围内的join,在指定的时间范围内返回join上的数据
      */
        String joinSql = "select * from source1 s1" +
                "left join source2 s2" +
                // 内连接
//                "inner join source2" || "join source2"
                // 全连接
//                "full outer join source2"
                // 时间范围join
//                "s1.ts >= s2.ts AND s1.ts < s2.ts + INTERVAL '10' MINUTE" +
                " on s1.id =s2.id "
                ;
        Table joinTable = tEnv.sqlQuery(joinSql);
        // 分组排序,取topN,  如果要是去重 rnum=1即可实现去重操作
        String insertSql = "insert into sink select id,name from(" +
                "select *," +
                "row_number() over(partition by id order by ts) as rnum " +
                "from "+joinTable+" where rnum < 5 " +
                ")";
        // add insert sql
        TableResult tableResult = executeSql(tEnv, "insert into sink select * from source", "*",insertSql);
        // 随意使用
        // Optional<JobClient> jobClient = tableResult.getJobClient();
    }
  // 添加多个sql具体执行
    private static TableResult executeSql(StreamTableEnvironment tEnv, final String... sqls) {
        StatementSet statementSet = tEnv.createStatementSet();
        for (String sql : sqls) {
            if ("*".equals(sql) || sql.length()>=27) {
                continue;
            }
            statementSet.addInsertSql(sql);
        }
        return statementSet.execute();
    }
}
maven 依赖
<properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.12.2</flink.version>
        <scala.version>2.11</scala.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!---->
        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.2</version>
        </dependency>
        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.2.4</version>
        </dependency>
    </dependencies>
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1天前
|
SQL 关系型数据库 数据库
实时计算 Flink版操作报错合集之在本地执行代码没有问题,但是在线执行sql命令就会报错,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
1天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在执行SQL语句时遇到了类找不到,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5天前
|
SQL XML Java
MyBatis动态SQL------------------choose用法
MyBatis动态SQL------------------choose用法
14 1
|
6天前
|
SQL 存储 安全
SQL入门与进阶:数据库查询与管理的实用指南
一、引言 在数字化时代,数据库已经成为各行各业存储、管理和分析数据的关键基础设施
|
8天前
|
SQL 数据挖掘 数据库
深入理解SQL从入门到避坑
深入理解SQL从入门到避坑
|
11天前
|
SQL 存储 API
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(5)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】
|
SQL 存储 关系型数据库
|
8天前
|
SQL DataWorks NoSQL
DataWorks产品使用合集之如何将SQL Server中的数据转存到MongoDB
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
1月前
|
SQL API 流计算
实时计算 Flink版产品使用合集之在Mac M1下的Docker环境中开启SQL Server代理的操作步骤是什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
156 1
|
1天前
|
SQL 弹性计算 API
云服务器 ECS产品使用问题之如何通过API调用阿里云服务器上SQL Server数据库中的数据
云服务器ECS(Elastic Compute Service)是各大云服务商阿里云提供的一种基础云计算服务,它允许用户租用云端计算资源来部署和运行各种应用程序。以下是一个关于如何使用ECS产品的综合指南。

热门文章

最新文章