Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(1)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

前言

      今天一天争取搞完最后这一部分,学完赶紧把 Kafka 和 Flume 学完,就要开始做实时数仓了。据说是应届生得把实时数仓搞个 80%~90% 才能差不多找个工作,太牛马了。

1、常用 Connector 读写

       之前我们已经用过了一些简单的内置连接器,比如 'datagen' 、'print' ,其它的可以查看官网:Overview | Apache Flink

环境准备:

# 1. 先启动 hadoop
myhadoop start
# 2. 不需要启动 flink 只启动yarn-session即可
/opt/module/flink-1.17.0/bin/yarn-session.sh -d
# 3. 启动 flink sql 的环境 sql-client
./sql-client.sh embedded -s yarn-session

1.1、Kafka

1)添加kafka连接器依赖

  • 将flink-sql-connector-kafka-1.17.0.jar上传到flink的lib目录下
  • 重启yarn-session、sql-client

       使用 kafka 连接器,我们需要清楚,我们用 Flink SQL 往连接器为 kafka 的表中插入数据就相当于 Flink 往 Kafka 写入数据,而我们查询 Flink SQL 表中的数据就相当于 从 Kafka 中读取数据。所以当我们建表时就需要初始化读取 Kafka 数据和消费 Kafka 数据的参数。

2)创建 kfaka 的映射表

CREATE TABLE t1( 
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
id int, 
ts bigint , 
vc int )
WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = 'hadoop102:9092',
  'properties.group.id' = 'lyh',
-- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
  'scan.startup.mode' = 'earliest-offset',
  -- fixed为flink实现的分区器,一个并行度只写往kafka一个分区
'sink.partitioner' = 'fixed',
  'topic' = 'ws1',
  'format' = 'json'
) 

上面有一个参数 'sink.partitioner' 的值是 'fixed' ,我们之前学过 Kafka 的生产者的分区器有默认的 hash分区器和粘性分区器,这种 fixed 分区器是 kafka 为flink实现的 ,一个并行度只写往一个 kafka 分区,我们可以查看一下 FlinkFixedPartition 的源码:

创建好的表格是没有数据的,所以我们再创建一个数据源往 kfaka 里插入数据:

Flink SQL> CREATE TABLE source ( 
>     id INT, 
>     ts BIGINT, 
>     vc INT
> ) WITH ( 
>     'connector' = 'datagen', 
>     'rows-per-second'='1', 
>     'fields.id.kind'='random', 
>     'fields.id.min'='1', 
>     'fields.id.max'='10', 
>     'fields.ts.kind'='sequence', 
>     'fields.ts.start'='1', 
>     'fields.ts.end'='1000000', 
>     'fields.vc.kind'='random', 
>     'fields.vc.min'='1', 
>     'fields.vc.max'='100'
> );

插入数据:

insert into t1(id,ts,vc) select * from source;

查询 kafka 表:

select * from t1;

3)upsert-kafka 表

       如果当前表存在更新操作,那么普通的kafka连接器将无法满足(因为普通的连接器不支持更新操作),此时可以使用Upsert Kafka连接器

       Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。

       作为 source,upsert-kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的 value 被解释为同一 key 的最后一个 value 的 UPDATE,如果有这个 key(如果不存在相应的 key,则该更新被视为 INSERT)。用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同 key 的现有行都被覆盖。另外,value 为空的消息将会被视作为 DELETE 消息。

       作为 sink,upsert-kafka 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。

1)创建upsert-kafka的映射表(必须定义主键)

CREATE TABLE t2( 
    id int , 
    sumVC int ,
    -- 主键必须 not enforced
    primary key (id) NOT ENFORCED 
)
WITH (
  'connector' = 'upsert-kafka',
  'properties.bootstrap.servers' = 'hadoop102:9092',
  'topic' = 'ws2',
  'key.format' = 'json',
  'value.format' = 'json'
)

2)插入 upset-kafka 表

insert into t2 select  id,sum(vc) sumVC  from source group by id;

3)查询 upset-kafka 表

select * from t2;

查询结果:

可以看到,upsert-kafka 表是支持数据更新操作的。

1.2、File

Flink 天生就支持本地系统、HDFS 等。

1)创建 FileSystem 映射表

CREATE TABLE t3( id int, ts bigint , vc int )
WITH (
  'connector' = 'filesystem',
  -- 如果是本地系统就用 file:/// 
  'path' = 'hdfs://hadoop102:8020/data/t3',
  'format' = 'csv'
);

注意:之前我们在 flink 的 lib 目录下放了 hive 的连接器,这个包会和 flink 的依赖产生冲突:java.lang.ClassNotFoundException: org.apache.flink.table.planner.delegation.DialectFactory 我们需要把这个依赖移除掉或者改名并重启 sqlSession :

# 重命名连接器
mv flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar.del
# yarn web端 kill 掉job
# 重启 yarn-session
bin/yarn-session.sh -d
bin/sql-client.sh embedded -s yarn-session.sh -i sql-client-init.sql

插入数据:

查询插入结果:

除了上面这种方式,我们还可以把 flink 目录下 opt/ 的 flink-table-planner-1.17.0.jar 和 lib/ 下面的 flink-table-planner-loader-1.17.0.jar 替换一下位置,这样我们就不用把 hive 的连接器移除带了。

1.3、JDBC

       Flink在将数据写入外部数据库时使用DDL中定义的主键。如果定义了主键,则连接器以upsert模式操作,否则,连接器以追加模式操作。

       在upsert模式下,Flink会根据主键插入新行或更新现有行,Flink这样可以保证幂等性。为了保证输出结果符合预期,建议为表定义主键,并确保主键是底层数据库表的唯一键集或主键之一。在追加模式下,Flink将所有记录解释为INSERT消息,如果底层数据库中发生了主键或唯一约束违反,则INSERT操作可能会失败。

1)mysql 的 test 库中建表

CREATE TABLE `ws2` (
  `id` int(11) NOT NULL,
  `ts` bigint(20) DEFAULT NULL,
  `vc` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

上传jdbc连接器的jar包和mysql的连接驱动包到flink/lib下:

  • flink-connector-jdbc-1.17-20230109.003314-120.jar
  • mysql-connector-j-5.1.7.jar
CREATE TABLE t4
(
    id INT,
    ts BIGINT,
    vc INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector'='jdbc',
    'url' = 'jdbc:mysql://hadoop102:3306/test?useUnicode=true&characterEncoding=UTF-8',
    'username' = 'root',
    'password' = '123456',
    'connection.max-retry-timeout' = '60s',
    'table-name' = 'ws2',
    'sink.buffer-flush.max-rows' = '500',
    'sink.buffer-flush.interval' = '5s',
    'sink.max-retries' = '3',
    'sink.parallelism' = '1'
);

测试插入数据:

insert into t4 values(1,1,1);

查看结果:

这里,因为我们给 mysql 的这张表设置了主键,所以默认当出现和主键字段相同的新数据时,会直接以 upsert 的方式操作:

insert into t4 values(1,2,2);

运行结果:

注意:我们这个表是和 mysql 关联的,所以我们不管对 mysql 操做还是对这张映射表操作都会互相影响,上面我们修改了映射表 t4 之后,同样会修改到 mysql 表 ws2(除了删除表格,删除flink sql 中的表格并不会删除mysql 中的表格

如果我们希望使用追加模式,就必须保证 mysql 表和 Flink SQL 表都是没有主键的。

Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(2)https://developer.aliyun.com/article/1532335

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
存储 API 网络架构
【Azure 存储服务】调用REST API获取Stroage Account Table中所有的Entity计数 -- Count
【Azure 存储服务】调用REST API获取Stroage Account Table中所有的Entity计数 -- Count
|
3月前
|
存储 JSON API
【Azure 存储服务】使用REST API操作Azure Storage Table,删除数据(Delete Entity)
【Azure 存储服务】使用REST API操作Azure Storage Table,删除数据(Delete Entity)
【Azure 存储服务】使用REST API操作Azure Storage Table,删除数据(Delete Entity)
|
4月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之使用API调用ODPS SQL时,出现资源被定时任务抢占,该怎么办
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
SQL 分布式计算 测试技术
概述Flink API中的4个层次
【7月更文挑战第14天】Flink的API分为4个层次:核心底层API(如ProcessFunction)、DataStream/DataSet API、Table API和SQL。
|
5月前
|
SQL 关系型数据库 API
实时计算 Flink版产品使用问题之如何使用stream api
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
Kubernetes Oracle 关系型数据库
实时计算 Flink版操作报错合集之用dinky在k8s上提交作业,会报错:Caused by: org.apache.flink.table.api.ValidationException:,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
261 0
|
5月前
|
SQL Apache HIVE
实时计算 Flink版操作报错合集之CTAS(Create Table As Select)目标库为StarRocks时报错,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
消息中间件 SQL Java
Flink自定义Connector
Flink自定义Connector
458 0
|
6月前
|
消息中间件 Oracle 关系型数据库
实时计算 Flink版产品使用合集之如果想自定义connector和pipeline要如何入手
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
SQL 存储 NoSQL
Flink SQL 自定义 redis connector
一般情况下,我们不需要创建新的 connector,因为 Flink SQL 已经内置了丰富的 connector 供我们使用,但是在实际生产环境中我们的存储是多种多样的,所以原生的 connector 并不能满足所有用户的需求,这个时候就需要我们自定义 connector,这篇文章的重点就是介绍一下如何实现自定义 Flink SQL connector ? 先来看一下官网的一张 connector 架构图:
Flink SQL 自定义 redis connector