Flink SQL Client初探

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 体验Flink SQL Client

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码): https://github.com/zq2599/blog_demos

关于Flink SQL Client

  • Flink Table & SQL的API实现了通过SQL语言处理实时技术算业务,但还是要编写部分Java代码(或Scala),并且还要编译构建才能提交到Flink运行环境,这对于不熟悉Java或Scala的开发者就略有些不友好了;

SQL Client的目标就是解决上述问题(官方原话with a build tool before being submitted to a cluster.

局限性

  • 遗憾的是,在Flink-1.10.0版本中,SQL Client只是个Beta版本(不适合用于生产环境),并且只能连接到本地Flink,不能像mysql、cassandra等客户端工具那样远程连接server,这些在将来的版本会解决:

在这里插入图片描述

环境信息

  • 接下来采用实战的方式对Flink SQL Client做初步尝试,环境信息如下:
  • 电脑:MacBook Pro2018 13寸,macOS Catalina 10.15.3
  • Flink:1.10.0
  • JDK:1.8.0_211

本地启动flink

  • 下载flink包,地址:http: //ftp.kddilabs.jp/infosystems/apache/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz
  • 解压:tar -zxvf flink-1.10.0-bin-scala_2.11.tgz
  • 进目录flink-1.10.0/bin/,执行命令./start-cluster.sh启动本地flink;
  • 访问该机器的8081端口,可见本地flink启动成功:

5

启动SQL Client CLI

  • 在目录flink-1.10.0/bin/执行./sql-client.sh即可启动SQL Client CLI,如下图所示,红框中的BETA提醒着在生产环境如果要用此工具:

在这里插入图片描述

  • 第一个要掌握的是HELP命令:

在这里插入图片描述

  • 从hello world开始把,执行命令select ‘Hello world!’;,控制台输出如下图所示,输入Q可退出:

在这里插入图片描述

两种展示模式

  • 第一种是table mode,效果像是对普通数据表的查询,设置该模式的命令:
SET execution.result-mode=table;
  • 第二种是changelog mode,效果像是打印每一次数据变更的日志,设置该模式的命令:
SET execution.result-mode=changelog;
  • 设置table mode后,执行以下命令作一次简单的分组查询:
SELECT name, 
  COUNT(*) AS cnt 
  FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) 
  AS NameTable(name) 
  GROUP BY name;
  • 为了便于对比,下图同时贴上两种模式的查询结果,注意绿框中显示了该行记录是增加还是删除:

在这里插入图片描述

  • 不论是哪种模式,查询结构都保存在SQL Client CLI进程的堆内存中;
  • 在chenglog模式下,为了保证控制台可以正常输入输出,查询结果只展示最近1000条;
  • table模式下,可以翻页查询更多结果,结果数量受配置项max-table-result-rows以及可用堆内存限制;

进一步体验

  • 前面写了几行SQL,对Flink SQL Client有了最基本的感受,接下来做进一步的体验,内容如下:
  • 创建CSV文件,这是个最简单的图书信息表,只有三个字段:名字、数量、类目,一共十条记录;
  • 创建SQL Client用到的环境配置文件,该文件描述了数据源以及对应的表的信息;
  • 启动SQL Client,执行SQL查询上述CSV文件;
  • 整个操作步骤如下图所示:

在这里插入图片描述

操作

  • 首先请确保Flink已经启动;
  • 创建名为book-store.csv的文件,内容如下:
name001,1,aaa
name002,2,aaa
name003,3,bbb
name004,4,bbb
name005,5,bbb
name006,6,ccc
name007,7,ccc
name008,8,ccc
name009,9,ccc
name010,10,ccc
  • flink-1.10.0/conf目录下创建名为book-store.yaml的文件,内容如下:
tables:
  - name: BookStore
    type: source-table
    update-mode: append
    connector:
      type: filesystem
      path: "/Users/zhaoqin/temp/202004/26/book-store.csv"
    format:
      type: csv
      fields:
        - name: BookName
          type: VARCHAR
        - name: BookAmount
          type: INT
        - name: BookCatalog
          type: VARCHAR
      line-delimiter: "\n"
      comment-prefix: ","
    schema:
      - name: BookName
        type: VARCHAR
      - name: BookAmount
        type: INT
      - name: BookCatalog
        type: VARCHAR
  - name: MyBookView
    type: view
    query: "SELECT BookCatalog, SUM(BookAmount) AS Amount FROM BookStore GROUP BY BookCatalog"


execution:
  planner: blink                    # optional: either 'blink' (default) or 'old'
  type: streaming                   # required: execution mode either 'batch' or 'streaming'
  result-mode: table                # required: either 'table' or 'changelog'
  max-table-result-rows: 1000000    # optional: maximum number of maintained rows in
                                    #   'table' mode (1000000 by default, smaller 1 means unlimited)
  time-characteristic: event-time   # optional: 'processing-time' or 'event-time' (default)
  parallelism: 1                    # optional: Flink's parallelism (1 by default)
  periodic-watermarks-interval: 200 # optional: interval for periodic watermarks (200 ms by default)
  max-parallelism: 16               # optional: Flink's maximum parallelism (128 by default)
  min-idle-state-retention: 0       # optional: table program's minimum idle state time
  max-idle-state-retention: 0       # optional: table program's maximum idle state time

                                    #   (default database of the current catalog by default)
  restart-strategy:                 # optional: restart strategy
    type: fallback                  #   "fallback" to global restart strategy by default

# Configuration options for adjusting and tuning table programs.

# A full list of options and their default values can be found
# on the dedicated "Configuration" page.
configuration:
  table.optimizer.join-reorder-enabled: true
  table.exec.spill-compression.enabled: true
  table.exec.spill-compression.block-size: 128kb

# Properties that describe the cluster to which table programs are submitted to.

deployment:
  response-timeout: 5000
  • 对于book-store.yaml文件,有以下几处需要注意:

a. tables.type等于source-table,表明这是数据源的配置信息;
b. tables.connector描述了详细的数据源信息,path是book-store.csv文件的完整路径;
c. tables.format描述了文件内容;
d. tables.schema描述了数据源表的表结构;
ed. type为view表示MyBookView是个视图(参考数据库的视图概念);

  • flink-1.10.0目录执行以下命令,即可启动SQL Client,并指定book-store.yaml为环境配置:
bin/sql-client.sh embedded -d conf/book-store.yaml
  • 查全表:
SELECT * FROM BookStore;

在这里插入图片描述

  • 按照BookCatalog分组统计记录数:
SELECT BookCatalog, COUNT(*) AS BookCount FROM BookStore GROUP BY BookCatalog;

在这里插入图片描述

  • 查询视图:
select * from MyBookView;

在这里插入图片描述

  • 至此,Flink SQL Client的初次体验就完成了,咱们此工具算是有了基本了解,接下来的文章会进一步使用Flink SQL Client做些复杂的操作;

欢迎关注阿里云开发者社区博客:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
218 15
|
27天前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
109 14
|
3月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
65 0
|
4月前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
101 2
|
4月前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
57 1
|
4月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
6月前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
148 13
|
6月前
|
SQL
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。
|
6月前
|
SQL 存储 网络安全
关系数据库SQLserver 安装 SQL Server
【7月更文挑战第26天】
84 6
|
6月前
|
存储 SQL C++
对比 SQL Server中的VARCHAR(max) 与VARCHAR(n) 数据类型
【7月更文挑战7天】SQL Server 中的 VARCHAR(max) vs VARCHAR(n): - VARCHAR(n) 存储最多 n 个字符(1-8000),适合短文本。 - VARCHAR(max) 可存储约 21 亿个字符,适合大量文本。 - VARCHAR(n) 在处理小数据时性能更好,空间固定。 - VARCHAR(max) 对于大文本更合适,但可能影响性能。 - 选择取决于数据长度预期和业务需求。
510 1