Flink1.9 Create table 语句转换 为 Operation流程分析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 本文主要描述 Flink1.9 新提供的 create table sql 通过 Calcite 解析,校验 并注册到catalog的过程

本文主要描述 Flink1.9 新提供的 create table sql 通过 Calcite 解析,校验 并注册到catalog的过程。
样例SQL:

CREATE TABLE user_log (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP
) WITH (
    'connector.type' = 'kafka', -- 使用 kafka connector
    'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
    'connector.topic' = 'xc_user_behavior',  -- kafka topic
    'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取
    'connector.properties.0.key' = 'zookeeper.connect',  -- 连接信息
    'connector.properties.0.value' = '172.16.8.107:2181', 
    'connector.properties.1.key' = 'bootstrap.servers',
    'connector.properties.1.value' = '172.16.8.107:9092', 
    'update-mode' = 'append',
    'format.type' = 'json',  -- 数据源格式为 json
    'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
)

带着两个疑问:1:如何自定义create table 语法(之后会另外写一篇), 2:create table 的表信息是如何注册到catalog上的,供之后的select 语句查询;

一:StreamTableEnvironment.create() 关注:Planner 的初始化

  1. 实例化一个CatalogManager(元数据管理器)

       --初始化一个GenericInMemroyCatalog作为默认的Catalog  
    
  2. 实例化StreamPlanner(本文以流作为背景,所以实例化目标为StreamPlanner)
  • 初始化StreamPlanner继承的超类PlannerContext
  • 可以看到提供了统一生成Calcite FrameworkConfig的函数createFrameworkConfig,关注其中对defaultSchema的设置,(之前实例的CatalogManager的包装类CatalogManagerCalciteSchema作为默认的Schema),另外ParserFactory参数也需要关注下,FlinkSqlParserImpl.FACTORY 这个是flink 基于Calcite 扩展的支持DDL的解析类;
  • 初始化查询优化器:

        RelOptPlanner planner = new VolcanoPlanner(frameworkConfig.getCostFactory(), frameworkConfig.getContext());//基于cost的优化器    

    到这一步Planner环境初始化相关的操作已经完成。

二:解析Create table Sql语句:StreamTableEnvironment.sqlUpdate(createSqlStr)

  1. StreamTableEnvironment 调用 之前初始化环境过程中创建的 StreamPlanner 解析create table 语句 生成 CreateTableOperation;

         其中生成Operation 的部分主体是:
  • 生成Planner
  • PlannerContext基于统一提供的createFrameworkConfig() 生成FlinkPlannerImpl。
  • FlinkPlannerImpl 解析 DDL语句 生成SqlNode(如何根据Calcite自定义Table create 这个会后会另外单独写一篇)。
  • 调用validate() 对create table 信息进行校验, 包括table 的column, primaryKeys,uniqueKeys,partitionKey设置是否正确,这个校验不包括对create table 语法的校验,create 语法的校验在parse阶段已经处理过;
  • org.apache.flink.table.planner.operations.SqlToOperationConverter#convert:将SqlNode转换为org.apache.flink.table.operations.ddl.CreateTableOperation,当然convert()方法包含对Query, Create, Drop, Insert 的处理,本文只关注convertCreateTable模块;
  1. 调用TableEnvironmentImpl#registerCatalogTableInternal 把生成的CreateTableOperation注册到CatalogManager 上, 注册的元数据信息之后可以被sql 查询引用;

调用GenericInMemoryCatalog#createTable 将表的信息添加到catalog 元数据上;

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
6月前
|
自然语言处理 监控 数据挖掘
【Flink】Flink中的窗口分析
【4月更文挑战第19天】【Flink】Flink中的窗口分析
|
1月前
|
SQL 消息中间件 分布式计算
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
66 5
|
3月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
224 2
|
1月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
73 0
|
3月前
|
消息中间件 监控 Kafka
联通实时计算平台问题之实时计算平台的数据处理流程是什么样的
联通实时计算平台问题之实时计算平台的数据处理流程是什么样的
|
3月前
|
消息中间件 数据挖掘 Kafka
揭秘数据洪流中的救世主:Confluent与Flink的实时分析奇迹!
【8月更文挑战第9天】在现代数据处理中,实时数据分析至关重要。Confluent Platform与Apache Flink的组合提供了一套高效的数据流处理方案。Confluent Platform基于Apache Kafka构建,负责数据的收集、传输及管理;而Flink则擅长实时处理这些数据流,进行复杂分析。首先需配置Confluent Platform,包括设置Zookeeper、Kafka brokers及相关服务。
70 1
|
5月前
|
消息中间件 分布式计算 Kafka
深度分析:Apache Flink及其在大数据处理中的应用
Apache Flink是低延迟、高吞吐量的流处理框架,以其状态管理和事件时间处理能力脱颖而出。与Apache Spark Streaming相比,Flink在实时性上更强,但Spark生态系统更丰富。Apache Storm在低延迟上有优势,而Kafka Streams适合轻量级流处理。选型考虑延迟、状态管理、生态系统和运维成本。Flink适用于实时数据分析、复杂事件处理等场景,使用时注意资源配置、状态管理和窗口操作的优化。
|
6月前
|
Oracle 关系型数据库 数据库
实时计算 Flink版操作报错合集之错误信息"ORA-65040: operation not allowed from within a pluggable database"如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
330 2
|
5月前
|
SQL Apache HIVE
实时计算 Flink版操作报错合集之CTAS(Create Table As Select)目标库为StarRocks时报错,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
存储 NoSQL MongoDB
阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference
本文整理自阿里云 Flink 团队归源老师关于阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference 的研究。
47188 2
阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference

热门文章

最新文章