FlinkSQL使用DDL语句创建kafka源表

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 在Flink1.9.x版本中,社区版本的 Flink 新增 了一个 SQL DDL 的新特性,但是暂时还不支持流式的一些概念的定义,比如说水位(watermark). 下面主要介绍一下怎么使用DDL创建kafak源表.定义create table语句从kafka中读取数据 """ |CREATE TABLE PERSON ( | name VARCHAR COMMENT '姓名', | age VARCHAR COMMENT '年龄', | city VARCH

在Flink1.9.x版本中,社区版本的 Flink 新增 了一个 SQL DDL 的新特性,但是暂时还不支持流式的一些概念的定义,比如说水位(watermark). 下面主要介绍一下怎么使用DDL创建kafak源表.


定义create table语句从kafka中读取数据


"""
              |CREATE TABLE PERSON (
              |    name VARCHAR COMMENT '姓名',
              |    age VARCHAR COMMENT '年龄',
              |    city VARCHAR COMMENT '所在城市',
              |    address VARCHAR COMMENT '家庭住址',
              |    ts TIMESTAMP COMMENT '时间戳'
              |)
              |WITH (
              |    'connector.type' = 'kafka', -- 使用 kafka connector
              |    'connector.version' = '0.11',  -- kafka 版本
              |    'connector.topic' = 'xxx',  -- kafka topic
              |    'connector.startup-mode' = 'latest-offset', -- 从最新的 offset 开始读取
              |    'connector.properties.0.key' = 'zookeeper.connect',  -- 连接信息
              |    'connector.properties.0.value' = 'xxx',
              |    'connector.properties.1.key' = 'bootstrap.servers',
              |    'connector.properties.1.value' = 'xxx',
              |    'update-mode' = 'append',
              |    'format.type' = 'json',  -- 数据源格式为 json
              |    'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
              |)

如上面的 sql,基本语法是 create table () with ()


with 后面是一些基本的属性,比如 connector.type 描述了 从 kafka 中读取数据


connector.version 描述了 使用的是哪个版本的 kafka


connector.topic 描述了 从 哪个 topic 中读取数据


connector.startup-mode 描述了 从 哪个位置开始读取数据 等等。


可能有同学会觉得其中的 connector.properties.0.key 等参数比较奇怪,社区计划将在下一个版本中改进并简化 connector 的参数配置。


需要注意的是:


1,表的字段要区分大小写


2,字段里面不能有timestamp ,watermark等关键字


完整的代码如下


package flink.ddl
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
/**
  * @program: Flink1.9.0
  * @description: ${description}
  * @author: JasonLee
  * @create: 2020-01-14 19:49
  */
object FlinkKafkaDDLDemo {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(3)
        val settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build()
        val tEnv = StreamTableEnvironment.create(env, settings)
        val createTable =
            """
              |CREATE TABLE PERSON (
              |    name VARCHAR COMMENT '姓名',
              |    age VARCHAR COMMENT '年龄',
              |    city VARCHAR COMMENT '所在城市',
              |    address VARCHAR COMMENT '家庭住址',
              |    ts TIMESTAMP COMMENT '时间戳'
              |)
              |WITH (
              |    'connector.type' = 'kafka', -- 使用 kafka connector
              |    'connector.version' = '0.11',  -- kafka 版本
              |    'connector.topic' = 'xxx',  -- kafka topic
              |    'connector.startup-mode' = 'latest-offset', -- 从最新的 offset 开始读取
              |    'connector.properties.0.key' = 'zookeeper.connect',  -- 连接信息
              |    'connector.properties.0.value' = 'xxx',
              |    'connector.properties.1.key' = 'bootstrap.servers',
              |    'connector.properties.1.value' = 'xxx',
              |    'update-mode' = 'append',
              |    'format.type' = 'json',  -- 数据源格式为 json
              |    'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
              |)
            """.stripMargin
        tEnv.sqlUpdate(createTable)
        val query =
            """
              |SELECT name,COUNT(age) FROM PERSON GROUP BY name
            """.stripMargin
        val result = tEnv.sqlQuery(query)
        result.toRetractStream[Row].print()
        tEnv.execute("Flink SQL DDL")
    }
}


我们把上面的代码提交到集群,可以看到任务的DAG图,如下图所示:



然后我们向上面配置的topic写入数据,可以看下Flink的UI输出的结果:



可以看到这是上面那个SQL打印的结果.也可以通过DDL直接把结果写入mysql库中,今天就主要介绍到这里,后面会带来更多相关的内容.


相关文章
|
14天前
|
消息中间件 JSON Kafka
实时计算 Flink版操作报错合集之kafka源表没有指定group.id,遇到报错,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
2月前
|
消息中间件 SQL Kafka
Flink CDC中flinksql消费kafka的数据写入doris中,没报错,但是也没有输出,大家有遇到过么?
Flink CDC中flinksql消费kafka的数据写入doris中,没报错,但是也没有输出,大家有遇到过么?
152 0
|
8月前
|
消息中间件 关系型数据库 MySQL
FlinkSQL 实时采集Kafka内容到MySQL(实战记录)
FlinkSQL 实时采集Kafka内容到MySQL(实战记录)
68 0
|
消息中间件 存储 SQL
使用Flink 读取kafka Catalog 源表数据展开json写入下游hologres表示例操作
本文主要介绍如何使用Flink 读取kafka Catalog 源表数据写入下游hologres存储,并演示'json.infer-schema.flatten-nested-columns.enable参数,递归的展开JSON中的嵌套列的效果。
使用Flink 读取kafka Catalog 源表数据展开json写入下游hologres表示例操作
|
消息中间件 关系型数据库 MySQL
(7)FlinkSQL将kafka数据写入到mysql方式二
FlinkSQL将kafka数据写入到mysql方式二
(7)FlinkSQL将kafka数据写入到mysql方式二
|
消息中间件 关系型数据库 MySQL
(6)FlinkSQL将kafka数据写入到mysql方式一
(6)FlinkSQL将kafka数据写入到mysql方式一
(6)FlinkSQL将kafka数据写入到mysql方式一
|
SQL 消息中间件 存储
【FlinkSQL实战系列】Flink SQL CDC 实时同步 Mysql 的 Binlog 数据到 kafka
什么是 CDC ? CDC,Change Data Capture,变更数据获取的简称,使用 CDC 我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括 INSERT,DELETE,UPDATE 等. 要解决什么问题 ?
|
15天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之通过flink同步kafka数据进到doris,decimal数值类型的在kafka是正常显示数值,但是同步到doris表之后数据就变成了整数,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
15天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之 从Kafka读取数据,并与两个仅在任务启动时读取一次的维度表进行内连接(inner join)时,如果没有匹配到的数据会被直接丢弃还是会被存储在内存中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
14天前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
743 0