Flink1.9 Create table 语句转换 为 Operation流程分析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文主要描述 Flink1.9 新提供的 create table sql 通过 Calcite 解析,校验 并注册到catalog的过程

本文主要描述 Flink1.9 新提供的 create table sql 通过 Calcite 解析,校验 并注册到catalog的过程。
样例SQL:

CREATE TABLE user_log (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP
) WITH (
    'connector.type' = 'kafka', -- 使用 kafka connector
    'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
    'connector.topic' = 'xc_user_behavior',  -- kafka topic
    'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取
    'connector.properties.0.key' = 'zookeeper.connect',  -- 连接信息
    'connector.properties.0.value' = '172.16.8.107:2181', 
    'connector.properties.1.key' = 'bootstrap.servers',
    'connector.properties.1.value' = '172.16.8.107:9092', 
    'update-mode' = 'append',
    'format.type' = 'json',  -- 数据源格式为 json
    'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
)
AI 代码解读

带着两个疑问:1:如何自定义create table 语法(之后会另外写一篇), 2:create table 的表信息是如何注册到catalog上的,供之后的select 语句查询;

一:StreamTableEnvironment.create() 关注:Planner 的初始化

  1. 实例化一个CatalogManager(元数据管理器)

       --初始化一个GenericInMemroyCatalog作为默认的Catalog  
    
    AI 代码解读
  2. 实例化StreamPlanner(本文以流作为背景,所以实例化目标为StreamPlanner)
  • 初始化StreamPlanner继承的超类PlannerContext
  • 可以看到提供了统一生成Calcite FrameworkConfig的函数createFrameworkConfig,关注其中对defaultSchema的设置,(之前实例的CatalogManager的包装类CatalogManagerCalciteSchema作为默认的Schema),另外ParserFactory参数也需要关注下,FlinkSqlParserImpl.FACTORY 这个是flink 基于Calcite 扩展的支持DDL的解析类;
  • 初始化查询优化器:

        RelOptPlanner planner = new VolcanoPlanner(frameworkConfig.getCostFactory(), frameworkConfig.getContext());//基于cost的优化器    
    AI 代码解读

    到这一步Planner环境初始化相关的操作已经完成。

二:解析Create table Sql语句:StreamTableEnvironment.sqlUpdate(createSqlStr)

  1. StreamTableEnvironment 调用 之前初始化环境过程中创建的 StreamPlanner 解析create table 语句 生成 CreateTableOperation;

         其中生成Operation 的部分主体是:
    AI 代码解读
  • 生成Planner
  • PlannerContext基于统一提供的createFrameworkConfig() 生成FlinkPlannerImpl。
  • FlinkPlannerImpl 解析 DDL语句 生成SqlNode(如何根据Calcite自定义Table create 这个会后会另外单独写一篇)。
  • 调用validate() 对create table 信息进行校验, 包括table 的column, primaryKeys,uniqueKeys,partitionKey设置是否正确,这个校验不包括对create table 语法的校验,create 语法的校验在parse阶段已经处理过;
  • org.apache.flink.table.planner.operations.SqlToOperationConverter#convert:将SqlNode转换为org.apache.flink.table.operations.ddl.CreateTableOperation,当然convert()方法包含对Query, Create, Drop, Insert 的处理,本文只关注convertCreateTable模块;
  1. 调用TableEnvironmentImpl#registerCatalogTableInternal 把生成的CreateTableOperation注册到CatalogManager 上, 注册的元数据信息之后可以被sql 查询引用;

调用GenericInMemoryCatalog#createTable 将表的信息添加到catalog 元数据上;

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
打赏
0
0
0
0
2
分享
相关文章
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
233 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
180 5
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
740 2
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
阿里云OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理结构化、半结构化和非结构化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的核心计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和EMR StarRocks搭建流式湖仓。
836 5
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
场景实践 | 基于Flink+Hologres搭建GitHub实时数据分析
基于Flink和Hologres构建的实时数仓方案在数据开发运维体验、成本与收益等方面均表现出色。同时,该产品还具有与其他产品联动组合的可能性,能够为企业提供更全面、更智能的数据处理和分析解决方案。
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
184 0
联通实时计算平台问题之实时计算平台的数据处理流程是什么样的
联通实时计算平台问题之实时计算平台的数据处理流程是什么样的
揭秘数据洪流中的救世主:Confluent与Flink的实时分析奇迹!
【8月更文挑战第9天】在现代数据处理中,实时数据分析至关重要。Confluent Platform与Apache Flink的组合提供了一套高效的数据流处理方案。Confluent Platform基于Apache Kafka构建,负责数据的收集、传输及管理;而Flink则擅长实时处理这些数据流,进行复杂分析。首先需配置Confluent Platform,包括设置Zookeeper、Kafka brokers及相关服务。
184 1
揭秘大数据时代的极速王者!Flink:颠覆性流处理引擎,让实时数据分析燃爆你的想象力!
【8月更文挑战第29天】Apache Flink 是一个高性能的分布式流处理框架,适用于高吞吐量和低延迟的实时数据处理。它采用统一执行引擎处理有界和无界数据流,具备精确状态管理和灵活窗口操作等特性。Flink 支持毫秒级处理和广泛生态集成,但学习曲线较陡峭,社区相对较小。通过实时日志分析示例,我们展示了如何利用 Flink 从 Kafka 中读取数据并进行词频统计,体现了其强大功能和灵活性。
240 0
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问