业务数据的变化,我们可以通过 FlinkCDC 采集到,但是 FlinkCDC 是把全部数据统一写入一个 Topic 中, 这些数据包括事实数据,也包含维度数据,这样显然不利于日后的数据处理,所以这个功能是从 Kafka 的业务数据 ODS 层读取数据,经过处理后,将维度数据保存到 HBase,将事实数据写回 Kafka 作为业务数据的 DWD 层
实现动态分流功能
由于 FlinkCDC 是把全部数据统一写入一个 Topic 中, 这样显然不利于日后的数据处理。所以需要把各个表拆开处理。但是由于每个表有不同的特点,有些表是维度表,有些表是事实表。
在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL 等。一般把事实数据写入流中,进行进一步处理,最终形成宽表。
维度数据不放 Redis 的原因:User 用户维度数据量很大,其它维度还行。
为什么不放 MySQL: 并发压力大
这样的配置不适合写在配置文件中,因为这样的话,业务端随着需求变化每增加一张表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。
这种可以有两个方案实现
- 一种是用 Zookeeper 存储,通过 Watch 感知数据变化;
- 另一种是用 mysql 数据库存储,周期性的同步;(有配置表,指定哪些表的数据发给哪些主题)
- 另一种是用 mysql 数据库存储,使用广播流。
这里选择第二种方案,主要是 MySQL 对于配置数据初始化和维护管理,使用 FlinkCDC 读取配置信息表,将配置流作为广播流与主流进行连接。
- 获取执行环境
- 消费Kafka ods_base_db 主题数据创建流
- 将每行数据转换为JSON对象并过滤(delete) 主流
- 使用FlinkCDC消费配置表并处理成 广播流
- 连接主流和广播流
- 分流 处理数据 广播流数据,主流数据(根据广播流数据进行处理)
- 提取Kafka流数据和HBase流数据
- 将Kafka数据写入Kafka主题,将HBase数据写入Phoenix表
- 启动任务
table_process
table_process | 主健:sourceTable + type |
sourceTable | 根据表名分流 |
type | 用来区分新增、变更的数据,不同类型的数据放到不同主题表不 |
sinkType | 放Kafka还是其它地方 |
sinkTable | 如果是维度表,就是Phoenix表名,如果是 kafka 就是 主题 |
sinkColumns | 提供字段,为了自动建表 |
pk | Phoenix 建表必须有主健 |
extend | 指定要不要做分区表,等等 |
Demo
sourceTable | type | sinkType | sinkTable |
base_trademark | insert | hbase | dim_xxx(Phoenix 表名) |
order_info | insert | kafka | dwd_xxx(主题名) |
CREATE TABLE `table_process` ( `source_table` varchar(200) NOT NULL COMMENT '来源表', `operate_type` varchar(200) NOT NULL COMMENT '操作类型 insert,update,delete', `sink_type` varchar(200) DEFAULT NULL COMMENT '输出类型 hbase kafka', `sink_table` varchar(200) DEFAULT NULL COMMENT '输出表(主题)', `sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段', `sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段', `sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展', PRIMARY KEY (`source_table`,`operate_type`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
1.读取状态2.过滤数据
3.分流
ODS:
- 数据源:行为数据,业务数据
- 架构分析:
FlinkCDC: DataStream/FlinkSOIFlinkCDC/Maxwell/Canal保持数据原貌,不做任何修改! ods_base_log,ods_base_db
DWD-DIM:
行为数据:DWD(Kafka)
1.过滤脏数据 --> 侧输出流 脏数据率
2.新老用户校验 --> 前台校验不准
3.分流 --> 侧输出流 页面、启动、曝光、动作、错误
4.写入Kafka
业务数据:DWD (Kafka)-DIM(Phoenix)
1.过滤数据-->删除数据
2.读取配置表创建广播流
3.连接主流和广播流并处理
1)广播流数据:
- 解析数据
- Phoenix 建表(HBase)
- 写入状态广播
2)主流数据
- 读取状态
- 过滤字段
- 分流(添加 SinkTable 字段)
4.提取Kafka和 HBase 流,分别对应的位置
5.HBase流:自定义 Sink
6.Kafka流:自定义序列化方式