前言
今天一天争取搞完最后这一部分,学完赶紧把 Kafka 和 Flume 学完,就要开始做实时数仓了。据说是应届生得把实时数仓搞个 80%~90% 才能差不多找个工作,太牛马了。
1、常用 Connector 读写
之前我们已经用过了一些简单的内置连接器,比如 'datagen' 、'print' ,其它的可以查看官网:Overview | Apache Flink
环境准备:
# 1. 先启动 hadoop myhadoop start # 2. 不需要启动 flink 只启动yarn-session即可 /opt/module/flink-1.17.0/bin/yarn-session.sh -d # 3. 启动 flink sql 的环境 sql-client ./sql-client.sh embedded -s yarn-session
1.1、Kafka
1)添加kafka连接器依赖
- 将flink-sql-connector-kafka-1.17.0.jar上传到flink的lib目录下
- 重启yarn-session、sql-client
使用 kafka 连接器,我们需要清楚,我们用 Flink SQL 往连接器为 kafka 的表中插入数据就相当于 Flink 往 Kafka 写入数据,而我们查询 Flink SQL 表中的数据就相当于 从 Kafka 中读取数据。所以当我们建表时就需要初始化读取 Kafka 数据和消费 Kafka 数据的参数。
2)创建 kfaka 的映射表
CREATE TABLE t1( `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读 `partition` BIGINT METADATA VIRTUAL, `offset` BIGINT METADATA VIRTUAL, id int, ts bigint , vc int ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'hadoop102:9092', 'properties.group.id' = 'lyh', -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets' 'scan.startup.mode' = 'earliest-offset', -- fixed为flink实现的分区器,一个并行度只写往kafka一个分区 'sink.partitioner' = 'fixed', 'topic' = 'ws1', 'format' = 'json' )
上面有一个参数 'sink.partitioner' 的值是 'fixed' ,我们之前学过 Kafka 的生产者的分区器有默认的 hash分区器和粘性分区器,这种 fixed 分区器是 kafka 为flink实现的 ,一个并行度只写往一个 kafka 分区,我们可以查看一下 FlinkFixedPartition 的源码:
创建好的表格是没有数据的,所以我们再创建一个数据源往 kfaka 里插入数据:
Flink SQL> CREATE TABLE source ( > id INT, > ts BIGINT, > vc INT > ) WITH ( > 'connector' = 'datagen', > 'rows-per-second'='1', > 'fields.id.kind'='random', > 'fields.id.min'='1', > 'fields.id.max'='10', > 'fields.ts.kind'='sequence', > 'fields.ts.start'='1', > 'fields.ts.end'='1000000', > 'fields.vc.kind'='random', > 'fields.vc.min'='1', > 'fields.vc.max'='100' > );
插入数据:
insert into t1(id,ts,vc) select * from source;
查询 kafka 表:
select * from t1;
3)upsert-kafka 表
如果当前表存在更新操作,那么普通的kafka连接器将无法满足(因为普通的连接器不支持更新操作),此时可以使用Upsert Kafka连接器。
Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。
作为 source,upsert-kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的 value 被解释为同一 key 的最后一个 value 的 UPDATE,如果有这个 key(如果不存在相应的 key,则该更新被视为 INSERT)。用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同 key 的现有行都被覆盖。另外,value 为空的消息将会被视作为 DELETE 消息。
作为 sink,upsert-kafka 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。
1)创建upsert-kafka的映射表(必须定义主键)
CREATE TABLE t2( id int , sumVC int , -- 主键必须 not enforced primary key (id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'hadoop102:9092', 'topic' = 'ws2', 'key.format' = 'json', 'value.format' = 'json' )
2)插入 upset-kafka 表
insert into t2 select id,sum(vc) sumVC from source group by id;
3)查询 upset-kafka 表
select * from t2;
查询结果:
可以看到,upsert-kafka 表是支持数据更新操作的。
1.2、File
Flink 天生就支持本地系统、HDFS 等。
1)创建 FileSystem 映射表
CREATE TABLE t3( id int, ts bigint , vc int ) WITH ( 'connector' = 'filesystem', -- 如果是本地系统就用 file:/// 'path' = 'hdfs://hadoop102:8020/data/t3', 'format' = 'csv' );
注意:之前我们在 flink 的 lib 目录下放了 hive 的连接器,这个包会和 flink 的依赖产生冲突:java.lang.ClassNotFoundException: org.apache.flink.table.planner.delegation.DialectFactory 我们需要把这个依赖移除掉或者改名并重启 sqlSession :
# 重命名连接器 mv flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar.del # yarn web端 kill 掉job # 重启 yarn-session bin/yarn-session.sh -d bin/sql-client.sh embedded -s yarn-session.sh -i sql-client-init.sql
插入数据:
查询插入结果:
除了上面这种方式,我们还可以把 flink 目录下 opt/ 的 flink-table-planner-1.17.0.jar 和 lib/ 下面的 flink-table-planner-loader-1.17.0.jar 替换一下位置,这样我们就不用把 hive 的连接器移除带了。
1.3、JDBC
Flink在将数据写入外部数据库时使用DDL中定义的主键。如果定义了主键,则连接器以upsert模式操作,否则,连接器以追加模式操作。
在upsert模式下,Flink会根据主键插入新行或更新现有行,Flink这样可以保证幂等性。为了保证输出结果符合预期,建议为表定义主键,并确保主键是底层数据库表的唯一键集或主键之一。在追加模式下,Flink将所有记录解释为INSERT消息,如果底层数据库中发生了主键或唯一约束违反,则INSERT操作可能会失败。
1)mysql 的 test 库中建表
CREATE TABLE `ws2` ( `id` int(11) NOT NULL, `ts` bigint(20) DEFAULT NULL, `vc` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
上传jdbc连接器的jar包和mysql的连接驱动包到flink/lib下:
- flink-connector-jdbc-1.17-20230109.003314-120.jar
- mysql-connector-j-5.1.7.jar
CREATE TABLE t4 ( id INT, ts BIGINT, vc INT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector'='jdbc', 'url' = 'jdbc:mysql://hadoop102:3306/test?useUnicode=true&characterEncoding=UTF-8', 'username' = 'root', 'password' = '123456', 'connection.max-retry-timeout' = '60s', 'table-name' = 'ws2', 'sink.buffer-flush.max-rows' = '500', 'sink.buffer-flush.interval' = '5s', 'sink.max-retries' = '3', 'sink.parallelism' = '1' );
测试插入数据:
insert into t4 values(1,1,1);
查看结果:
这里,因为我们给 mysql 的这张表设置了主键,所以默认当出现和主键字段相同的新数据时,会直接以 upsert 的方式操作:
insert into t4 values(1,2,2);
运行结果:
注意:我们这个表是和 mysql 关联的,所以我们不管对 mysql 操做还是对这张映射表操作都会互相影响,上面我们修改了映射表 t4 之后,同样会修改到 mysql 表 ws2(除了删除表格,删除flink sql 中的表格并不会删除mysql 中的表格)
如果我们希望使用追加模式,就必须保证 mysql 表和 Flink SQL 表都是没有主键的。
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(2)https://developer.aliyun.com/article/1532335