(7)FlinkSQL将kafka数据写入到mysql方式二

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: FlinkSQL将kafka数据写入到mysql方式二
public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("CREATE TABLE WaterSensor (" +
                "id STRING," +
                "ts BIGINT," +
                "vc BIGINT," +
//                "`pt` TIMESTAMP(3),"+
//                "WATERMARK FOR pt AS pt - INTERVAL '10' SECOND" +
                "pt as PROCTIME() " +
                ") WITH (" +
                "'connector' = 'kafka'," +
                "'topic' = 'kafka_data_waterSensor'," +
                "'properties.bootstrap.servers' = '127.0.0.1:9092'," +
                "'properties.group.id' =  'test'," +
                "'scan.startup.mode' = 'earliest-offset'," +
//                "'json.fail-on-missing-field' = 'false'," +
//                "'json.ignore-parse-errors' = 'true'," +
                "'format' = 'json'" +
                ")"
        );

        tableEnv.executeSql("CREATE TABLE flinksink (" +
                "componentname STRING," +
                "componentcount BIGINT NOT NULL," +
                "componentsum BIGINT" +
                ") WITH (" +
                "'connector.type' = 'jdbc'," +
                "'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," +
                "'connector.table' = 'flinksink'," +
                "'connector.driver' =  'com.mysql.cj.jdbc.Driver'," +
                "'connector.username' = 'root'," +
                "'connector.password' = 'root'," +
                "'connector.write.flush.max-rows'='3'\r\n" +
                ")"
        );

        Table result = tableEnv.sqlQuery(
                "SELECT " +
                        "id as componentname, " +                //window_start, window_end,
                        "COUNT(ts) as componentcount ,SUM(ts) as componentsum " +
                        "FROM TABLE( " +
                        "TUMBLE( TABLE WaterSensor , " +
                        "DESCRIPTOR(pt), " +
                        "INTERVAL '10' SECOND)) " +
                        "GROUP BY id , window_start, window_end"
        );

//        //方式一:写入数据库
////        result.executeInsert("flinksink").print(); //;.insertInto("flinksink");
//
        //方式二:写入数据库
        tableEnv.createTemporaryView("ResultTable", result);
        tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();

        env.execute();
    }
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
14天前
|
缓存 NoSQL 关系型数据库
13- Redis和Mysql如何保证数据⼀致?
该内容讨论了保证Redis和MySQL数据一致性的几种策略。首先提到的两种方法存在不一致风险:先更新MySQL再更新Redis,或先删Redis再更新MySQL。第三种方案是通过MQ异步同步以达到最终一致性,适用于一致性要求较高的场景。项目中根据不同业务需求选择不同方案,如对一致性要求不高的情况不做处理,时效性数据设置过期时间,高一致性需求则使用MQ确保同步,最严格的情况可能涉及分布式事务(如Seata的TCC模式)。
38 6
|
21天前
|
SQL 关系型数据库 MySQL
轻松入门MySQL:保障数据完整性,MySQL事务在进销存管理系统中的应用(12)
轻松入门MySQL:保障数据完整性,MySQL事务在进销存管理系统中的应用(12)
|
28天前
|
关系型数据库 MySQL
elasticsearch对比mysql以及使用工具同步mysql数据全量增量
elasticsearch对比mysql以及使用工具同步mysql数据全量增量
21 0
|
30天前
Mybatis+mysql动态分页查询数据案例——测试类HouseDaoMybatisImplTest)
Mybatis+mysql动态分页查询数据案例——测试类HouseDaoMybatisImplTest)
20 1
|
13天前
|
消息中间件 存储 算法
深入了解Kafka的数据持久化机制
深入了解Kafka的数据持久化机制
32 0
|
30天前
Mybatis+mysql动态分页查询数据案例——工具类(MybatisUtil.java)
Mybatis+mysql动态分页查询数据案例——工具类(MybatisUtil.java)
15 1
|
1天前
|
存储 数据可视化 关系型数据库
MySQL字段的时间类型该如何选择?千万数据下性能提升10%~30%🚀
本文探讨MySQL中时间类型的选择,阐述datetime、timestamp、整形时间戳等类型特点以及它们在千万级数据量下的查询性能
MySQL字段的时间类型该如何选择?千万数据下性能提升10%~30%🚀
|
27天前
|
关系型数据库 MySQL
MySQL查询当天昨天明天本月上月今年等数据
MySQL查询当天昨天明天本月上月今年等数据
19 2
|
27天前
|
关系型数据库 MySQL 开发工具
MySQL分组后,组内排序,然后取每组的第一条数据
MySQL分组后,组内排序,然后取每组的第一条数据
15 1
|
28天前
|
canal SQL 关系型数据库
MySQL数据直接实时同步到ES
MySQL数据直接实时同步到ES
33 0