Flink SQL Client初探

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 体验Flink SQL Client

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码): https://github.com/zq2599/blog_demos

关于Flink SQL Client

  • Flink Table & SQL的API实现了通过SQL语言处理实时技术算业务,但还是要编写部分Java代码(或Scala),并且还要编译构建才能提交到Flink运行环境,这对于不熟悉Java或Scala的开发者就略有些不友好了;
  • SQL Client的目标就是解决上述问题(官方原话with a build tool before being submitted to a cluster.

局限性

  • 遗憾的是,在Flink-1.10.0版本中,SQL Client只是个Beta版本(不适合用于生产环境),并且只能连接到本地Flink,不能像mysql、cassandra等客户端工具那样远程连接server,这些在将来的版本会解决:

在这里插入图片描述

环境信息

  • 接下来采用实战的方式对Flink SQL Client做初步尝试,环境信息如下:
  1. 电脑:MacBook Pro2018 13寸,macOS Catalina 10.15.3
  2. Flink:1.10.0
  3. JDK:1.8.0_211

本地启动flink

  • 下载flink包
  • 解压:tar -zxvf flink-1.10.0-bin-scala_2.11.tgz
  • 进目录flink-1.10.0/bin/,执行命令./start-cluster.sh启动本地flink;
  • 访问该机器的8081端口,可见本地flink启动成功:

5

启动SQL Client CLI

  • 在目录flink-1.10.0/bin/执行./sql-client.sh即可启动SQL Client CLI,如下图所示,红框中的BETA提醒着在生产环境如果要用此工具:

在这里插入图片描述

  • 第一个要掌握的是HELP命令:

在这里插入图片描述

  • 从hello world开始把,执行命令select ‘Hello world!’;,控制台输出如下图所示,输入Q可退出:

在这里插入图片描述

两种展示模式

  • 第一种是table mode,效果像是对普通数据表的查询,设置该模式的命令:
SET execution.result-mode=table;
  • 第二种是changelog mode,效果像是打印每一次数据变更的日志,设置该模式的命令:
SET execution.result-mode=changelog;
  • 设置table mode后,执行以下命令作一次简单的分组查询:
SELECT name, 
  COUNT(*) AS cnt 
  FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) 
  AS NameTable(name) 
  GROUP BY name;
  • 为了便于对比,下图同时贴上两种模式的查询结果,注意绿框中显示了该行记录是增加还是删除:

在这里插入图片描述

  • 不论是哪种模式,查询结构都保存在SQL Client CLI进程的堆内存中;
  • 在chenglog模式下,为了保证控制台可以正常输入输出,查询结果只展示最近1000条;
  • table模式下,可以翻页查询更多结果,结果数量受配置项max-table-result-rows以及可用堆内存限制;

进一步体验

  • 前面写了几行SQL,对Flink SQL Client有了最基本的感受,接下来做进一步的体验,内容如下:
  1. 创建CSV文件,这是个最简单的图书信息表,只有三个字段:名字、数量、类目,一共十条记录;
  2. 创建SQL Client用到的环境配置文件,该文件描述了数据源以及对应的表的信息;
  3. 启动SQL Client,执行SQL查询上述CSV文件;
  4. 整个操作步骤如下图所示:

在这里插入图片描述

操作

  • 首先请确保Flink已经启动;
  • 创建名为book-store.csv的文件,内容如下:
name001,1,aaa
name002,2,aaa
name003,3,bbb
name004,4,bbb
name005,5,bbb
name006,6,ccc
name007,7,ccc
name008,8,ccc
name009,9,ccc
name010,10,ccc
  • flink-1.10.0/conf目录下创建名为book-store.yaml的文件,内容如下:
tables:
  - name: BookStore
    type: source-table
    update-mode: append
    connector:
      type: filesystem
      path: "/Users/zhaoqin/temp/202004/26/book-store.csv"
    format:
      type: csv
      fields:
        - name: BookName
          type: VARCHAR
        - name: BookAmount
          type: INT
        - name: BookCatalog
          type: VARCHAR
      line-delimiter: "\n"
      comment-prefix: ","
    schema:
      - name: BookName
        type: VARCHAR
      - name: BookAmount
        type: INT
      - name: BookCatalog
        type: VARCHAR
  - name: MyBookView
    type: view
    query: "SELECT BookCatalog, SUM(BookAmount) AS Amount FROM BookStore GROUP BY BookCatalog"


execution:
  planner: blink                    # optional: either 'blink' (default) or 'old'
  type: streaming                   # required: execution mode either 'batch' or 'streaming'
  result-mode: table                # required: either 'table' or 'changelog'
  max-table-result-rows: 1000000    # optional: maximum number of maintained rows in
                                    #   'table' mode (1000000 by default, smaller 1 means unlimited)
  time-characteristic: event-time   # optional: 'processing-time' or 'event-time' (default)
  parallelism: 1                    # optional: Flink's parallelism (1 by default)
  periodic-watermarks-interval: 200 # optional: interval for periodic watermarks (200 ms by default)
  max-parallelism: 16               # optional: Flink's maximum parallelism (128 by default)
  min-idle-state-retention: 0       # optional: table program's minimum idle state time
  max-idle-state-retention: 0       # optional: table program's maximum idle state time

                                    #   (default database of the current catalog by default)
  restart-strategy:                 # optional: restart strategy
    type: fallback                  #   "fallback" to global restart strategy by default

# Configuration options for adjusting and tuning table programs.

# A full list of options and their default values can be found
# on the dedicated "Configuration" page.
configuration:
  table.optimizer.join-reorder-enabled: true
  table.exec.spill-compression.enabled: true
  table.exec.spill-compression.block-size: 128kb

# Properties that describe the cluster to which table programs are submitted to.

deployment:
  response-timeout: 5000
  • 对于book-store.yaml文件,有以下几处需要注意:

a. tables.type等于source-table,表明这是数据源的配置信息;
b. tables.connector描述了详细的数据源信息,path是book-store.csv文件的完整路径;
c. tables.format描述了文件内容;
d. tables.schema描述了数据源表的表结构;
ed. type为view表示MyBookView是个视图(参考数据库的视图概念);

  • flink-1.10.0目录执行以下命令,即可启动SQL Client,并指定book-store.yaml为环境配置:
bin/sql-client.sh embedded -d conf/book-store.yaml
  • 查全表:
SELECT * FROM BookStore;

在这里插入图片描述

  • 按照BookCatalog分组统计记录数:
SELECT BookCatalog, COUNT(*) AS BookCount FROM BookStore GROUP BY BookCatalog;

在这里插入图片描述

  • 查询视图:
select * from MyBookView;

在这里插入图片描述

  • 至此,Flink SQL Client的初次体验就完成了,咱们此工具算是有了基本了解,接下来的文章会进一步使用Flink SQL Client做些复杂的操作;

欢迎关注阿里云开发者社区博客:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...
相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
11月前
|
存储 人工智能 算法
【AI系统】计算与调度
本文探讨了计算与调度的概念,特别是在神经网络和图像处理中的应用。通过分离算法定义和计算组织,Halide 等工具能够显著提升图像处理程序的性能,同时保持代码的简洁性和可维护性。文章详细介绍了计算与调度的基本概念、调度树的构建与约束,以及如何通过调度变换优化计算性能。此外,还讨论了自动调优方法在大规模调度空间中的应用,展示了如何通过探索和预测找到最优的调度方案。
218 0
|
SQL XML Java
Mybatis中选择语句的使用:<choose>标签、分区排序 Row_num() over ()函数的使用呢
Mybatis中选择语句的使用:<choose>标签、分区排序 Row_num() over ()函数的使用呢
235 0
|
SQL 存储 数据库
Flink + Paimon 数据 CDC 入湖最佳实践
Flink + Paimon 数据 CDC 入湖最佳实践
2692 59
|
6月前
|
SQL 数据可视化 关系型数据库
Doris Manager 24.3 版本正式发布,增强集群巡检能力
Apache Doris 是全球领先的开源实时数据仓库,已被 5000+ 中大型企业广泛应用。为解决传统客户端工具在功能、成本及适配性上的不足,飞轮科技推出专为 Apache Doris 打造的可视化工具 SelectDB Studio。它提供 Desktop 和 Server 两个版本,支持永久免费使用,具备多数据源连接、SQL 编辑器、Profile 分析、日志检索、查询审计和权限管理等核心功能,深度集成 Apache Doris 的联邦查询与湖仓一体分析能力。未来还将新增数据导入与迁移功能,助力用户高效开发与管理数据。
583 27
|
6月前
|
机器学习/深度学习 数据采集 数据可视化
Python数据分析,别再死磕Excel了!
Python数据分析,别再死磕Excel了!
277 2
|
8月前
|
Ubuntu Linux Windows
Debian:apt-get命令汇总
`apt-get` 是 Debian 及其衍生发行版(如 Ubuntu)中的核心包管理工具,用于安装、更新和卸载软件。它通过操作 `.deb` 格式的软件包,实现对系统软件的集中管理。常见命令包括 `install`(安装)、`remove`(卸载)、`update`(更新软件列表)和 `upgrade`(升级软件)。此外,`autoclean` 和 `clean` 可清理无用的包文件以释放空间。掌握 `apt-get` 的使用方法,是 Linux 系统管理的基础技能之一。
352 3
|
12月前
「Mac畅玩鸿蒙与硬件15」鸿蒙UI组件篇5 - Slider和Progress组件
Slider 和 Progress 是鸿蒙系统中的常用 UI 组件。Slider 控制数值输入,如音量调节;Progress 显示任务的完成状态,如下载进度。本文通过代码示例展示如何使用这些组件,并涵盖 进度条类型介绍、节流优化、状态同步 和 定时器动态更新。
314 7
「Mac畅玩鸿蒙与硬件15」鸿蒙UI组件篇5 - Slider和Progress组件
|
SQL 消息中间件 API
Flink---14、Flink SQL(SQL-Client准备、流处理中的表、时间属性、DDL)
Flink---14、Flink SQL(SQL-Client准备、流处理中的表、时间属性、DDL)
|
人工智能 自然语言处理 Serverless
阿里云百炼应用实践系列-让微信公众号成为智能客服
本文主要介绍如何基于阿里云百炼平台快速在10分钟让您的微信公众号(订阅号)变成 AI 智能客服。我们基于阿里云百炼平台的能力,以官方帮助文档为参考,让您的微信公众号(订阅号)成 为AI 智能客服,以便全天候(7x24)回应客户咨询,提升用户体验,介绍了相关技术方案和主要代码,供开发者参考。
1170 9
阿里云百炼应用实践系列-让微信公众号成为智能客服
|
消息中间件 关系型数据库 MySQL
Flink CDC 最佳实践(以 MySQL 为例)
Flink CDC 最佳实践(以 MySQL 为例)
3644 0