FlinkSQL使用DDL语句创建kafka源表

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 在Flink1.9.x版本中,社区版本的 Flink 新增 了一个 SQL DDL 的新特性,但是暂时还不支持流式的一些概念的定义,比如说水位(watermark). 下面主要介绍一下怎么使用DDL创建kafak源表.定义create table语句从kafka中读取数据 """ |CREATE TABLE PERSON ( | name VARCHAR COMMENT '姓名', | age VARCHAR COMMENT '年龄', | city VARCH

在Flink1.9.x版本中,社区版本的 Flink 新增 了一个 SQL DDL 的新特性,但是暂时还不支持流式的一些概念的定义,比如说水位(watermark). 下面主要介绍一下怎么使用DDL创建kafak源表.


定义create table语句从kafka中读取数据


"""
              |CREATE TABLE PERSON (
              |    name VARCHAR COMMENT '姓名',
              |    age VARCHAR COMMENT '年龄',
              |    city VARCHAR COMMENT '所在城市',
              |    address VARCHAR COMMENT '家庭住址',
              |    ts TIMESTAMP COMMENT '时间戳'
              |)
              |WITH (
              |    'connector.type' = 'kafka', -- 使用 kafka connector
              |    'connector.version' = '0.11',  -- kafka 版本
              |    'connector.topic' = 'xxx',  -- kafka topic
              |    'connector.startup-mode' = 'latest-offset', -- 从最新的 offset 开始读取
              |    'connector.properties.0.key' = 'zookeeper.connect',  -- 连接信息
              |    'connector.properties.0.value' = 'xxx',
              |    'connector.properties.1.key' = 'bootstrap.servers',
              |    'connector.properties.1.value' = 'xxx',
              |    'update-mode' = 'append',
              |    'format.type' = 'json',  -- 数据源格式为 json
              |    'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
              |)

如上面的 sql,基本语法是 create table () with ()


with 后面是一些基本的属性,比如 connector.type 描述了 从 kafka 中读取数据


connector.version 描述了 使用的是哪个版本的 kafka


connector.topic 描述了 从 哪个 topic 中读取数据


connector.startup-mode 描述了 从 哪个位置开始读取数据 等等。


可能有同学会觉得其中的 connector.properties.0.key 等参数比较奇怪,社区计划将在下一个版本中改进并简化 connector 的参数配置。


需要注意的是:


1,表的字段要区分大小写


2,字段里面不能有timestamp ,watermark等关键字


完整的代码如下


package flink.ddl
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
/**
  * @program: Flink1.9.0
  * @description: ${description}
  * @author: JasonLee
  * @create: 2020-01-14 19:49
  */
object FlinkKafkaDDLDemo {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(3)
        val settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build()
        val tEnv = StreamTableEnvironment.create(env, settings)
        val createTable =
            """
              |CREATE TABLE PERSON (
              |    name VARCHAR COMMENT '姓名',
              |    age VARCHAR COMMENT '年龄',
              |    city VARCHAR COMMENT '所在城市',
              |    address VARCHAR COMMENT '家庭住址',
              |    ts TIMESTAMP COMMENT '时间戳'
              |)
              |WITH (
              |    'connector.type' = 'kafka', -- 使用 kafka connector
              |    'connector.version' = '0.11',  -- kafka 版本
              |    'connector.topic' = 'xxx',  -- kafka topic
              |    'connector.startup-mode' = 'latest-offset', -- 从最新的 offset 开始读取
              |    'connector.properties.0.key' = 'zookeeper.connect',  -- 连接信息
              |    'connector.properties.0.value' = 'xxx',
              |    'connector.properties.1.key' = 'bootstrap.servers',
              |    'connector.properties.1.value' = 'xxx',
              |    'update-mode' = 'append',
              |    'format.type' = 'json',  -- 数据源格式为 json
              |    'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
              |)
            """.stripMargin
        tEnv.sqlUpdate(createTable)
        val query =
            """
              |SELECT name,COUNT(age) FROM PERSON GROUP BY name
            """.stripMargin
        val result = tEnv.sqlQuery(query)
        result.toRetractStream[Row].print()
        tEnv.execute("Flink SQL DDL")
    }
}


我们把上面的代码提交到集群,可以看到任务的DAG图,如下图所示:



然后我们向上面配置的topic写入数据,可以看下Flink的UI输出的结果:



可以看到这是上面那个SQL打印的结果.也可以通过DDL直接把结果写入mysql库中,今天就主要介绍到这里,后面会带来更多相关的内容.


相关文章
|
7天前
|
消息中间件 SQL Kafka
Flink CDC中flinksql消费kafka的数据写入doris中,没报错,但是也没有输出,大家有遇到过么?
Flink CDC中flinksql消费kafka的数据写入doris中,没报错,但是也没有输出,大家有遇到过么?
118 0
|
6月前
|
消息中间件 关系型数据库 MySQL
FlinkSQL 实时采集Kafka内容到MySQL(实战记录)
FlinkSQL 实时采集Kafka内容到MySQL(实战记录)
60 0
|
消息中间件 存储 SQL
使用Flink 读取kafka Catalog 源表数据展开json写入下游hologres表示例操作
本文主要介绍如何使用Flink 读取kafka Catalog 源表数据写入下游hologres存储,并演示'json.infer-schema.flatten-nested-columns.enable参数,递归的展开JSON中的嵌套列的效果。
使用Flink 读取kafka Catalog 源表数据展开json写入下游hologres表示例操作
|
消息中间件 关系型数据库 MySQL
(7)FlinkSQL将kafka数据写入到mysql方式二
FlinkSQL将kafka数据写入到mysql方式二
(7)FlinkSQL将kafka数据写入到mysql方式二
|
消息中间件 关系型数据库 MySQL
(6)FlinkSQL将kafka数据写入到mysql方式一
(6)FlinkSQL将kafka数据写入到mysql方式一
(6)FlinkSQL将kafka数据写入到mysql方式一
|
SQL 消息中间件 存储
【FlinkSQL实战系列】Flink SQL CDC 实时同步 Mysql 的 Binlog 数据到 kafka
什么是 CDC ? CDC,Change Data Capture,变更数据获取的简称,使用 CDC 我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括 INSERT,DELETE,UPDATE 等. 要解决什么问题 ?
|
7天前
|
消息中间件 存储 算法
深入了解Kafka的数据持久化机制
深入了解Kafka的数据持久化机制
46 0
|
5天前
|
消息中间件 关系型数据库 Kafka
实时计算 Flink版产品使用合集之想要加快消费 Kafka 数据的速度,该怎么配置参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
17 2
|
7天前
|
消息中间件 Kafka Apache
Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
【2月更文挑战第6天】Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
76 2

热门文章

最新文章