前言
今天来把数仓数据同步解决掉,前面我们已经把日志数据到 Kafka 的通道打通了。
1、实时数仓数据同步
关于实时数仓,我们的 Flink 直接去 Kafka 读取即可,我们在学习 Flink 的时候也知道 Flink 提供了 Kafka Source,所以这里不需要再去添加什么额外的配置。
2、离线数仓数据同步
Flink 可以从 Kafka 中读取数据,可是 Hive 不行啊,Hive 是从 Hadoop HDFS 中读取数据,所以的离线数仓需要进行一些配置。
2.1、用户行为日志数据同步
2.1.1、数据通道选择
用户行为数据由 Flume 从 Kafka 直接同步到 HDFS,由于离线数仓采用 Hive 的分区表按天统计,所以目标路径要包含一层日期。
这里,我们的 hive 分区表需要按天分区,那就需要我们 Flume 从 Kafka 读取到的数据包含 Event Header 信息(hdfs sink 默认就是按照 event header 中的 timestamp 来落盘的),但是我们上游把用户行为日志传输到 Kafka Channel 的时候,我们设置了 parseAsFlumeEvent=false,这就导致存储在 Kafka Channel 中的日志只有 Event Body,没有 Event Header。应该怎么把 Kafka Channel 中的数据读取写入到 HDFS 而且还能够给日志数据增加一个 header,我们有两种选择方案:
1. 如果我们选择了 Kafka Channel 做数据源(我们之前说 Kfaka Channel 一共有 3 种结构:source -> kafka channel 、source -> kafka channel -> sink、kafka channel -> sink),选择了 kafka channel -> sink 结构的话,kafka channel 自己会封装一个 header 发送给 sink,但是这个 header 没有时间信息(timestamp),Event Body 中也可以有时间信息(要求我们日志时产生给每一条日志添加时间信息),但是我们不可以在 kafka channel 和 hdfs sink 之间设置拦截器去提取 body 中的时间信息(因为自定义拦截器只能在 source 和 channel 之间使用),所以这种结构无法实现。
2. 上一种方案如果可以实现的话,我们就省去了 source 读取,可惜上一种结构无法实现,除非把上游的 parseAsFlumeEvent 设置为 true 。所以我们只能再开一个完整的 flume 作业去 kafka 读取,即 kafak source -> file channel -> hdfs sink。
2.1.2 日志消费Flume配置概述
按照规划,该 Flume 需将 Kafka 中 topic_log 的数据发往 HDFS。并且对每天产生的用户行为日志进行区分,将不同天的数据发往HDFS不同天的路径。
这里我们选择Kafka Source、File Channel(数据比较重要的话一般都用 file channel)、HDFS Sink。
2.1.3、Flume 配置文件
# 定义组件 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # 配置sources a1.sources.r1.channels = c1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.kafka.bootstrap = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sources.r1.kafka.topics = topic_log a1.sources.r1.kafka.consumer.group.id = topic_log a1.sources.r1.batchSize = 2000 a1.sources.r1.batchDurationMillis = 1000 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.lyh.gmall.interceptor.TimestampInterceptor$Builder # 配置channels a1.channels.c1.type = file a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1 a1.channels.c1.useDualCheckpoints = false a1.channels.c1.dataDirs = /opt/module/flume/checkpoint/behavior1 a1.channels.c1.maxFileSize = 2146435071 a1.channels.c1.capacity = 1000000 a1.channels.c1.keep-alive = 3 # 配置 sinks a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = log a1.sinks.k1.hdfs.round = false a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 #组装 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
kafka source 配置:
- 我们知道,flume 每发送一次数据需要满足的条件是数据量达到 batchSize 条,或者时间达到 batchDurationMillis 。所以这里 batchDurationMillis 的配置尽量和系统达到 batchSize 的时间相近。比如每 2s 生成 2000 条的数据,那我们这里的 batchDurationMillis 最好就配置为 2000
- kafka.topics 和 kafka.topics.regex 这两个配置虽然都被加粗,但是只需要配置一个即可。
- kafka.consumer.group.id(默认为 flume),这里我们尽量配置 kafka 消费者组和业务名一样,因为我们实际项目中可能会有很多业务,如果这几个业务都需要消费这个 topic,但是如果不配置消费者组id,那么这些业务的消费者就会默认被分配到一个消费者组(flume 组),而一个 topic 的一个分区只能被一个消费者组的一个消费者所消费(我们这里的主题 topic_log 并只有一个分区),这样的话,只有一个业务的消费者可以消费到,而别的业务的消费者消费不到。
file channel 配置:
- checkpointDir:flume 的 file channel 有一个索引机制,它会把读取到的索引保存到内存当中去,但是防止数据丢失,还会再备份一次,这里就是配置备份的路径。
- useDualCheckpoints(默认为 false):表示是否开启二次备份。因为一次备份即使保存在磁盘,还是有出问题的可能,如果配置这个参数为 true 则必须配置参数 backupCheckpointDir
- backupCheckpointDir:这个参数就是配置二次备份的地址。
- dataDirs:flume 的多目录存储,可以把数据存储在服务器的多个磁盘上
- maxFileSize:我们的 file channel 是要写入文件的,这里配置的是这个文件的最大大小
- capacity:file channel 容纳数据条数的限制,默认最多 100w 条
- keep-alive:我们的 file channel 中的数据如果满了的时候,source 是写不进去的,这就需要回滚,还需要 kafka source 再从 kafka 去读一次,这样条浪费性能了。这个参数的作用是等一会,等到 channel 腾出一定空间之后再写进去。
hdfs sink 配置:
- hdfs.path:我们的 hdfs 保存路径中包含 %Y-%m-%d ,这意味这这个文件夹中保存的是一天的数据内容,如果我们有要求保存几个小时的内容,就需要设置 round 参数。
- round(默认是 false):flume 做的是离线的数据传输,我们的日志会每隔一定时间进行落盘。要精确到小时分钟或秒的话,就需要设置 roundValue 和 roundUnit 参数。比如每 6 个小时进行一次落盘的话,我们首先把路径改为 %Y-%m-%d/%h ,然后 roundValue 设置为 6,roundUnit 设置为 hour。
- roundValue:时间值
- roundUnit:时间单位
- rollInterval:hdfs 数据块滚动间隔(默认是 30s,单位是秒),同样我们最好设置这个采纳数的时间刚好差不多生成一个块大小(128MB)
- rollSize:基于文件的大小进行滚动(一般我们配置为 134217728 也就是 128MB)
- rollCount:基于 event 的条数进行滚动(一般设置为 0,因为用数据条数不太好控制文件的大小)
注意:rollInterval、rollSize、rollCount 如果都设置为 0 则代表该配置参数不生效。配置不当很容易造成大量小文件问题(危害:hdfs中一个文件在namenode中占用 150kb、一个文件会生成一个 map task )。
1. 数据漂移问题
当我们有数据比如在 23:59:59 经过 3s 才能发送到 kafka source,这时 kafka source 会在 event header 中封装一个 timestamp 信息,但是这时封装的 timestamp 已经到第二天了。
所以解决的办法就是,利用 flume 的自定义拦截器去把 kafka source 中 event body 的时间信息读取出来,封装到 header 当中去,这样就不会造成落盘错误了:
2. 编写拦截器
package com.lyh.gmall.interceptor; import com.alibaba.fastjson.JSONObject; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; public class TimestampInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // 1. 获取 header 和 body 中的数据 Map<String, String> headers = event.getHeaders(); String log = new String(event.getBody(), StandardCharsets.UTF_8); // 2. 解析 log(json) 中的 ts 字段 String ts = JSONObject.parseObject(log).getString("ts"); // 3. 把解析出来的 ts 值放到 header 中 headers.put("timestamp",ts); return event; } @Override public List<Event> intercept(List<Event> list) { for (Event event : list) { intercept(event); } return list; } @Override public void close() { } public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new TimestampInterceptor(); } @Override public void configure(Context context) { } } }
完了重新打包到 hadoop104 下 flume 的 lib 目录下
2.1.4、日志消费测试
1. 启动 Zookeeper、Kafka
2. 启动hadoop102的用户日志采集脚本
f1.sh start
这样我们的用户行为日志就被 flume 采集到了 kafka
3. 在 hadoop104 从kafka 采集日志到 hdfs
bin/flume-ng agent -n a1 -c conf/ -f job/warehouse/kafka_to_hdfs_log.conf -Dflume.root.logger=INFO,console
4. 模拟数据生成
mklog.sh
5. 测试结果
可以看到,用户行为日志被成功上传到了 hdfs。
2.1.5、日志启停脚本
我们在 hadoop102 编写一个脚本 f2.sh:
#!/bin/bash case $1 in "start") echo " --------启动 hadoop104 日志数据flume-------" ssh hadoop104 "nohup /opt/module/flume-1.9.0/bin/flume-ng agent -n a1 -c /opt/module/flume-1.9.0/conf -f /opt/module/flume-1.9.0/job/warehouse/kafka_to_hdfs_log.conf >/dev/null 2>&1 &" ;; "stop") echo " --------停止 hadoop104 日志数据flume-------" ssh hadoop104 "ps -ef | grep kafka_to_hdfs_log | grep -v grep |awk '{print \$2}' | xargs -n1 kill" ;; esac
2.2、业务日志数据同步
对离线数仓来说,业务数据一般都是按天来进行同步的;但对实时数仓来说,来一条业务数据就必须马上同步。所以对于离线数仓,我们可以不使用 MaxWell ,而是通过 DataX 每天全量采集到数仓。
2.2.1 数据同步策略概述
业务数据是数据仓库的重要数据来源,我们需要每日定时从业务数据库中抽取数据,传输到数据仓库中,之后再对数据进行分析统计。
为保证统计结果的正确性,需要保证数据仓库中的数据与业务数据库是同步的,离线数仓的计算周期通常为天,所以数据同步周期也通常为天,即每天同步一次即可。
数据的同步策略有全量同步和增量同步。
全量同步,就是每天都将业务数据库中的全部数据同步一份到数据仓库,这是保证两侧数据同步的最简单的方式。就相当于每天进行一次 select * from xxx;
那我们的历史数据(比如今天全量同步后,今天之前的数据就是历史数据)就没有意义了吗?其实我们并不会立即删除历史数据,因为数据是有价值的,我们既可以分析其中的变化,也可以作为备份以防不测。
增量同步,就是每天只将业务数据中的新增及变化数据同步到数据仓库。采用每日增量同步的表,通常需要在首日先进行一次全量同步。
离线数仓(四)【数仓数据同步策略】(2)https://developer.aliyun.com/article/1532382