Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(1)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

前言

      今天一天争取搞完最后这一部分,学完赶紧把 Kafka 和 Flume 学完,就要开始做实时数仓了。据说是应届生得把实时数仓搞个 80%~90% 才能差不多找个工作,太牛马了。

1、常用 Connector 读写

       之前我们已经用过了一些简单的内置连接器,比如 'datagen' 、'print' ,其它的可以查看官网:Overview | Apache Flink

环境准备:

# 1. 先启动 hadoop
myhadoop start
# 2. 不需要启动 flink 只启动yarn-session即可
/opt/module/flink-1.17.0/bin/yarn-session.sh -d
# 3. 启动 flink sql 的环境 sql-client
./sql-client.sh embedded -s yarn-session

1.1、Kafka

1)添加kafka连接器依赖

  • 将flink-sql-connector-kafka-1.17.0.jar上传到flink的lib目录下
  • 重启yarn-session、sql-client

       使用 kafka 连接器,我们需要清楚,我们用 Flink SQL 往连接器为 kafka 的表中插入数据就相当于 Flink 往 Kafka 写入数据,而我们查询 Flink SQL 表中的数据就相当于 从 Kafka 中读取数据。所以当我们建表时就需要初始化读取 Kafka 数据和消费 Kafka 数据的参数。

2)创建 kfaka 的映射表

CREATE TABLE t1( 
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
id int, 
ts bigint , 
vc int )
WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = 'hadoop102:9092',
  'properties.group.id' = 'lyh',
-- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
  'scan.startup.mode' = 'earliest-offset',
  -- fixed为flink实现的分区器,一个并行度只写往kafka一个分区
'sink.partitioner' = 'fixed',
  'topic' = 'ws1',
  'format' = 'json'
) 

上面有一个参数 'sink.partitioner' 的值是 'fixed' ,我们之前学过 Kafka 的生产者的分区器有默认的 hash分区器和粘性分区器,这种 fixed 分区器是 kafka 为flink实现的 ,一个并行度只写往一个 kafka 分区,我们可以查看一下 FlinkFixedPartition 的源码:

创建好的表格是没有数据的,所以我们再创建一个数据源往 kfaka 里插入数据:

Flink SQL> CREATE TABLE source ( 
>     id INT, 
>     ts BIGINT, 
>     vc INT
> ) WITH ( 
>     'connector' = 'datagen', 
>     'rows-per-second'='1', 
>     'fields.id.kind'='random', 
>     'fields.id.min'='1', 
>     'fields.id.max'='10', 
>     'fields.ts.kind'='sequence', 
>     'fields.ts.start'='1', 
>     'fields.ts.end'='1000000', 
>     'fields.vc.kind'='random', 
>     'fields.vc.min'='1', 
>     'fields.vc.max'='100'
> );

插入数据:

insert into t1(id,ts,vc) select * from source;

查询 kafka 表:

select * from t1;

3)upsert-kafka 表

       如果当前表存在更新操作,那么普通的kafka连接器将无法满足(因为普通的连接器不支持更新操作),此时可以使用Upsert Kafka连接器

       Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。

       作为 source,upsert-kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的 value 被解释为同一 key 的最后一个 value 的 UPDATE,如果有这个 key(如果不存在相应的 key,则该更新被视为 INSERT)。用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同 key 的现有行都被覆盖。另外,value 为空的消息将会被视作为 DELETE 消息。

       作为 sink,upsert-kafka 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。

1)创建upsert-kafka的映射表(必须定义主键)

CREATE TABLE t2( 
    id int , 
    sumVC int ,
    -- 主键必须 not enforced
    primary key (id) NOT ENFORCED 
)
WITH (
  'connector' = 'upsert-kafka',
  'properties.bootstrap.servers' = 'hadoop102:9092',
  'topic' = 'ws2',
  'key.format' = 'json',
  'value.format' = 'json'
)

2)插入 upset-kafka 表

insert into t2 select  id,sum(vc) sumVC  from source group by id;

3)查询 upset-kafka 表

select * from t2;

查询结果:

可以看到,upsert-kafka 表是支持数据更新操作的。

1.2、File

Flink 天生就支持本地系统、HDFS 等。

1)创建 FileSystem 映射表

CREATE TABLE t3( id int, ts bigint , vc int )
WITH (
  'connector' = 'filesystem',
  -- 如果是本地系统就用 file:/// 
  'path' = 'hdfs://hadoop102:8020/data/t3',
  'format' = 'csv'
);

注意:之前我们在 flink 的 lib 目录下放了 hive 的连接器,这个包会和 flink 的依赖产生冲突:java.lang.ClassNotFoundException: org.apache.flink.table.planner.delegation.DialectFactory 我们需要把这个依赖移除掉或者改名并重启 sqlSession :

# 重命名连接器
mv flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar.del
# yarn web端 kill 掉job
# 重启 yarn-session
bin/yarn-session.sh -d
bin/sql-client.sh embedded -s yarn-session.sh -i sql-client-init.sql

插入数据:

查询插入结果:

除了上面这种方式,我们还可以把 flink 目录下 opt/ 的 flink-table-planner-1.17.0.jar 和 lib/ 下面的 flink-table-planner-loader-1.17.0.jar 替换一下位置,这样我们就不用把 hive 的连接器移除带了。

1.3、JDBC

       Flink在将数据写入外部数据库时使用DDL中定义的主键。如果定义了主键,则连接器以upsert模式操作,否则,连接器以追加模式操作。

       在upsert模式下,Flink会根据主键插入新行或更新现有行,Flink这样可以保证幂等性。为了保证输出结果符合预期,建议为表定义主键,并确保主键是底层数据库表的唯一键集或主键之一。在追加模式下,Flink将所有记录解释为INSERT消息,如果底层数据库中发生了主键或唯一约束违反,则INSERT操作可能会失败。

1)mysql 的 test 库中建表

CREATE TABLE `ws2` (
  `id` int(11) NOT NULL,
  `ts` bigint(20) DEFAULT NULL,
  `vc` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

上传jdbc连接器的jar包和mysql的连接驱动包到flink/lib下:

  • flink-connector-jdbc-1.17-20230109.003314-120.jar
  • mysql-connector-j-5.1.7.jar
CREATE TABLE t4
(
    id INT,
    ts BIGINT,
    vc INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector'='jdbc',
    'url' = 'jdbc:mysql://hadoop102:3306/test?useUnicode=true&characterEncoding=UTF-8',
    'username' = 'root',
    'password' = '123456',
    'connection.max-retry-timeout' = '60s',
    'table-name' = 'ws2',
    'sink.buffer-flush.max-rows' = '500',
    'sink.buffer-flush.interval' = '5s',
    'sink.max-retries' = '3',
    'sink.parallelism' = '1'
);

测试插入数据:

insert into t4 values(1,1,1);

查看结果:

这里,因为我们给 mysql 的这张表设置了主键,所以默认当出现和主键字段相同的新数据时,会直接以 upsert 的方式操作:

insert into t4 values(1,2,2);

运行结果:

注意:我们这个表是和 mysql 关联的,所以我们不管对 mysql 操做还是对这张映射表操作都会互相影响,上面我们修改了映射表 t4 之后,同样会修改到 mysql 表 ws2(除了删除表格,删除flink sql 中的表格并不会删除mysql 中的表格

如果我们希望使用追加模式,就必须保证 mysql 表和 Flink SQL 表都是没有主键的。

Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(2)https://developer.aliyun.com/article/1532335

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
3月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
206 15
|
10天前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
80 14
|
2月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
60 0
|
3月前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
94 2
|
3月前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
53 1
|
14天前
|
人工智能 自然语言处理 API
Multimodal Live API:谷歌推出新的 AI 接口,支持多模态交互和低延迟实时互动
谷歌推出的Multimodal Live API是一个支持多模态交互、低延迟实时互动的AI接口,能够处理文本、音频和视频输入,提供自然流畅的对话体验,适用于多种应用场景。
64 3
Multimodal Live API:谷歌推出新的 AI 接口,支持多模态交互和低延迟实时互动
|
2天前
|
JSON 安全 API
淘宝商品详情API接口(item get pro接口概述)
淘宝商品详情API接口旨在帮助开发者获取淘宝商品的详细信息,包括商品标题、描述、价格、库存、销量、评价等。这些信息对于电商企业而言具有极高的价值,可用于商品信息展示、市场分析、价格比较等多种应用场景。
|
10天前
|
前端开发 API 数据库
Next 编写接口api
Next 编写接口api