FlinkSQL使用DDL语句创建kafka源表

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 在Flink1.9.x版本中,社区版本的 Flink 新增 了一个 SQL DDL 的新特性,但是暂时还不支持流式的一些概念的定义,比如说水位(watermark). 下面主要介绍一下怎么使用DDL创建kafak源表.定义create table语句从kafka中读取数据 """ |CREATE TABLE PERSON ( | name VARCHAR COMMENT '姓名', | age VARCHAR COMMENT '年龄', | city VARCH

在Flink1.9.x版本中,社区版本的 Flink 新增 了一个 SQL DDL 的新特性,但是暂时还不支持流式的一些概念的定义,比如说水位(watermark). 下面主要介绍一下怎么使用DDL创建kafak源表.


定义create table语句从kafka中读取数据


"""
              |CREATE TABLE PERSON (
              |    name VARCHAR COMMENT '姓名',
              |    age VARCHAR COMMENT '年龄',
              |    city VARCHAR COMMENT '所在城市',
              |    address VARCHAR COMMENT '家庭住址',
              |    ts TIMESTAMP COMMENT '时间戳'
              |)
              |WITH (
              |    'connector.type' = 'kafka', -- 使用 kafka connector
              |    'connector.version' = '0.11',  -- kafka 版本
              |    'connector.topic' = 'xxx',  -- kafka topic
              |    'connector.startup-mode' = 'latest-offset', -- 从最新的 offset 开始读取
              |    'connector.properties.0.key' = 'zookeeper.connect',  -- 连接信息
              |    'connector.properties.0.value' = 'xxx',
              |    'connector.properties.1.key' = 'bootstrap.servers',
              |    'connector.properties.1.value' = 'xxx',
              |    'update-mode' = 'append',
              |    'format.type' = 'json',  -- 数据源格式为 json
              |    'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
              |)

如上面的 sql,基本语法是 create table () with ()


with 后面是一些基本的属性,比如 connector.type 描述了 从 kafka 中读取数据


connector.version 描述了 使用的是哪个版本的 kafka


connector.topic 描述了 从 哪个 topic 中读取数据


connector.startup-mode 描述了 从 哪个位置开始读取数据 等等。


可能有同学会觉得其中的 connector.properties.0.key 等参数比较奇怪,社区计划将在下一个版本中改进并简化 connector 的参数配置。


需要注意的是:


1,表的字段要区分大小写


2,字段里面不能有timestamp ,watermark等关键字


完整的代码如下


package flink.ddl
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
/**
  * @program: Flink1.9.0
  * @description: ${description}
  * @author: JasonLee
  * @create: 2020-01-14 19:49
  */
object FlinkKafkaDDLDemo {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(3)
        val settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build()
        val tEnv = StreamTableEnvironment.create(env, settings)
        val createTable =
            """
              |CREATE TABLE PERSON (
              |    name VARCHAR COMMENT '姓名',
              |    age VARCHAR COMMENT '年龄',
              |    city VARCHAR COMMENT '所在城市',
              |    address VARCHAR COMMENT '家庭住址',
              |    ts TIMESTAMP COMMENT '时间戳'
              |)
              |WITH (
              |    'connector.type' = 'kafka', -- 使用 kafka connector
              |    'connector.version' = '0.11',  -- kafka 版本
              |    'connector.topic' = 'xxx',  -- kafka topic
              |    'connector.startup-mode' = 'latest-offset', -- 从最新的 offset 开始读取
              |    'connector.properties.0.key' = 'zookeeper.connect',  -- 连接信息
              |    'connector.properties.0.value' = 'xxx',
              |    'connector.properties.1.key' = 'bootstrap.servers',
              |    'connector.properties.1.value' = 'xxx',
              |    'update-mode' = 'append',
              |    'format.type' = 'json',  -- 数据源格式为 json
              |    'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
              |)
            """.stripMargin
        tEnv.sqlUpdate(createTable)
        val query =
            """
              |SELECT name,COUNT(age) FROM PERSON GROUP BY name
            """.stripMargin
        val result = tEnv.sqlQuery(query)
        result.toRetractStream[Row].print()
        tEnv.execute("Flink SQL DDL")
    }
}


我们把上面的代码提交到集群,可以看到任务的DAG图,如下图所示:



然后我们向上面配置的topic写入数据,可以看下Flink的UI输出的结果:



可以看到这是上面那个SQL打印的结果.也可以通过DDL直接把结果写入mysql库中,今天就主要介绍到这里,后面会带来更多相关的内容.


相关文章
|
3月前
|
消息中间件 JSON Kafka
实时计算 Flink版操作报错合集之kafka源表没有指定group.id,遇到报错,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
消息中间件 SQL Kafka
Flink CDC中flinksql消费kafka的数据写入doris中,没报错,但是也没有输出,大家有遇到过么?
Flink CDC中flinksql消费kafka的数据写入doris中,没报错,但是也没有输出,大家有遇到过么?
279 0
|
10月前
|
消息中间件 关系型数据库 MySQL
FlinkSQL 实时采集Kafka内容到MySQL(实战记录)
FlinkSQL 实时采集Kafka内容到MySQL(实战记录)
99 0
|
消息中间件 存储 SQL
使用Flink 读取kafka Catalog 源表数据展开json写入下游hologres表示例操作
本文主要介绍如何使用Flink 读取kafka Catalog 源表数据写入下游hologres存储,并演示'json.infer-schema.flatten-nested-columns.enable参数,递归的展开JSON中的嵌套列的效果。
使用Flink 读取kafka Catalog 源表数据展开json写入下游hologres表示例操作
|
消息中间件 关系型数据库 MySQL
(7)FlinkSQL将kafka数据写入到mysql方式二
FlinkSQL将kafka数据写入到mysql方式二
(7)FlinkSQL将kafka数据写入到mysql方式二
|
消息中间件 关系型数据库 MySQL
(6)FlinkSQL将kafka数据写入到mysql方式一
(6)FlinkSQL将kafka数据写入到mysql方式一
(6)FlinkSQL将kafka数据写入到mysql方式一
|
SQL 消息中间件 存储
【FlinkSQL实战系列】Flink SQL CDC 实时同步 Mysql 的 Binlog 数据到 kafka
什么是 CDC ? CDC,Change Data Capture,变更数据获取的简称,使用 CDC 我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括 INSERT,DELETE,UPDATE 等. 要解决什么问题 ?
|
8天前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
42 9
|
21天前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
46 3
|
2天前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章

下一篇
云函数