1、离线数仓同步数据
1.1 用户行为数据同步
1.1.1 数据通道
用户行为数据由Flume从Kafka直接同步到HDFS,由于离线数仓采用Hive的分区表按天统计,所以目标路径要包含一层日期。具体数据流向如下图所示。
1.1.2 日志消费Flume配置概述
按照规划,该Flume需将Kafka中topic_log的数据发往HDFS。并且对每天产生的用户行为日志进行区分,将不同天的数据发往HDFS不同天的路径。
此次选择KafkaSource、FileChannel、HDFSSink
2.1.3 日志消费Flume配置实操
1、创建Flume配置文件
在hadoop104节点的Flume的job目录下创建kafka_to_hdfs_log.conf
vim job/kafka_to_hdfs_log.conf
2、配置内容如下
#定义组件 a1.sources=r1 a1.channels=c1 a1.sinks=k1 #配置source1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sources.r1.kafka.topics=topic_log a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = org.zhm.gmall.flume.interceptor.TimestampInterceptor$Builder #配置channel a1.channels.c1.type = file a1.channels.c1.checkpointDir = /opt/module/flume-1.9.0/checkpoint/behavior1 a1.channels.c1.dataDirs = /opt/module/flume-1.9.0/data/behavior1 a1.channels.c1.maxFileSize = 2146435071 a1.channels.c1.capacity = 1000000 a1.channels.c1.keep-alive = 6 #配置sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = log a1.sinks.k1.hdfs.round = false a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 #控制输出文件类型 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = gzip #组装 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
注:配置优化
1、FileChannel优化
通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据
2、HDFS Sink优化
(1)HDFS存入大量小文件,有什么影响?
元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命
计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。
(2)HDFS小文件处理
官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0几个参数综合作用,效果如下:
(1)文件在达到128M时会滚动生成新文件
(2)文件创建超3600秒时会滚动生成新文件
3、编写Flume拦截器
(1)数据漂移问题
(2)在com.zhm.gmall.flume.interceptor包下创建TimestampInterceptor类
package org.zhm.gmall.flume.interceptor; /** * @ClassName TimestampInterceptor * @Description TODO * @Author Zouhuiming * @Date 2023/6/19 18:33 * @Version 1.0 */ import com.alibaba.fastjson.JSONObject; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; public class TimestampInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { //1、获取header和body的数据 Map<String, String> headers = event.getHeaders(); String log = new String(event.getBody(), StandardCharsets.UTF_8); //2、将body的数据类型转成jsonObject类型(方便获取数据) JSONObject jsonObject = JSONObject.parseObject(log); //3、header中timestamp时间字段替换成日志生成的时间戳(解决数据漂移问题) String ts = jsonObject.getString("ts"); headers.put("timestamp", ts); return event; } @Override public List<Event> intercept(List<Event> list) { for (Event event : list) { intercept(event); } return list; } @Override public void close() { } public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new TimestampInterceptor(); } @Override public void configure(Context context) { } } }
(3)重新打包
(4)需要先将打好的包放入到hadoop104的/opt/module/flume/lib文件夹下面
2.1.4 日志消费Flume测试
1、启动Zookeeper、Kafka、hadoop集群
2、启动日志采集Flume
f1.sh start
3、启动hadoop104的日志消费Flume
bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf -Dflume.root.logger=info,console
4、生成模拟数据
lg.sh
5、观察HDFS是否出现数据
2.1.5 日志消费Flume启停脚本
若上述测试通过,为方便,此处创建一个Flume的启停脚本。
1、在hadoop102节点的/home/atguigu/bin目录下创建脚本f2.sh
vim f2.sh
2、在脚本中填写如下内容
#!/bin/bash case $1 in "start") echo " --------启动 hadoop104 日志数据flume-------" ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_log.conf >/dev/null 2>&1 &" ;; "stop") echo " --------停止 hadoop104 日志数据flume-------" ssh hadoop104 "ps -ef | grep kafka_to_hdfs_log | grep -v grep |awk '{print \$2}' | xargs -n1 kill" ;; esac
3、增加脚本权限
chmod 777 f2.sh
2.2 业务数据同步
2.2.1 数据同步策略概述
业务数据是数据仓库的重要数据来源,我们需要每日定时从业务数据库中抽取数据,传输到数据仓库中,之后再对数据进行分析统计。
为保证统计结果的正确性,需要保证数据仓库中的数据与业务数据库是同步的,离线数仓的计算周期通常为天,所以数据同步周期也通常为天,即每天同步一次即可。
数据的同步策略有全量同步和增量同步。
全量同步就是每天都将业务数据库中的全部数据同步到一份到数据仓库,这是保证两侧数据同步的最简单的方式。
增量同步就是每天只将业务数据中的新增变化数据同步到数据仓库。采用每日增量同步的表,通常需要在首日先进行一次全量同步。。
2.2.2 数据同步策略选择
根据上述对比,可以得出以下结论:通常情况,业务表数据量比较大,优先考虑增量,数据量比较小,优先考虑全量
下图为各表同步策略:
2.2.3 数据同步工具概述
数据同步工具种类繁多,大致可分为两类,一类是以DataX、Sqoop为代表的基于Select查询的离线、批量同步工具,另一类是以Maxwell、Canal为代表的基于数据库数据变更日志(例如MySQL的binlog,其会实时记录所有的insert、update以及delete操作)的实时流式同步工具。
全量同步通常使用DataX、Sqoop等基于查询的离线同步工具。而增量同步既可以使用DataX、Sqoop等工具,也可使用Maxwell、Canal等工具,下面对增量同步不同方案进行简要对比。
2.2.5 全量表数据同步
1、数据同步工具DataX部署
2、数据通道
全量表数据由DataX从MySQL业务数据库直接同步到HDFS,具体数据流向如下图所示。
3、DataX配置文件
我们需要为每张全量表编写一个DataX的json配置文件,此处以activity_info为例,配置文件内容如下:
{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": [ "id", "activity_name", "activity_type", "activity_desc", "start_time", "end_time", "create_time" ], "connection": [ { "jdbcUrl": [ "jdbc:mysql://hadoop102:3306/gmall" ], "table": [ "activity_info" ] } ], "password": "000000", "splitPk": "", "username": "root" } }, "writer": { "name": "hdfswriter", "parameter": { "column": [ { "name": "id", "type": "bigint" }, { "name": "activity_name", "type": "string" }, { "name": "activity_type", "type": "string" }, { "name": "activity_desc", "type": "string" }, { "name": "start_time", "type": "string" }, { "name": "end_time", "type": "string" }, { "name": "create_time", "type": "string" } ], "compress": "gzip", "defaultFS": "hdfs://hadoop102:8020", "fieldDelimiter": "\t", "fileName": "activity_info", "fileType": "text", "path": "${targetdir}", "writeMode": "append" } } } ], "setting": { "speed": { "channel": 1 } } } }
注:由于目标路径包含一层日期,用于对不同天的数据加以区分,故path参数并未写死,需在提交任务时通过参数动态传入,参数名称为targetdir。
4、 DataX配置文件生成脚本
(1)在~/bin目录下创建gen_import_config.py脚本
# ecoding=utf-8 import json import getopt import os import sys import MySQLdb #MySQL相关配置,需根据实际情况作出修改 mysql_host = "hadoop102" mysql_port = "3306" mysql_user = "root" mysql_passwd = "000000" #HDFS NameNode相关配置,需根据实际情况作出修改 hdfs_nn_host = "hadoop102" hdfs_nn_port = "8020" #生成配置文件的目标路径,可根据实际情况作出修改 output_path = "/opt/module/datax/job/import" def get_connection(): return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd) def get_mysql_meta(database, table): connection = get_connection() cursor = connection.cursor() sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION" cursor.execute(sql, [database, table]) fetchall = cursor.fetchall() cursor.close() connection.close() return fetchall def get_mysql_columns(database, table): return map(lambda x: x[0], get_mysql_meta(database, table)) def get_hive_columns(database, table): def type_mapping(mysql_type): mappings = { "bigint": "bigint", "int": "bigint", "smallint": "bigint", "tinyint": "bigint", "decimal": "string", "double": "double", "float": "float", "binary": "string", "char": "string", "varchar": "string", "datetime": "string", "time": "string", "timestamp": "string", "date": "string", "text": "string" } return mappings[mysql_type] meta = get_mysql_meta(database, table) return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta) def generate_json(source_database, source_table): job = { "job": { "setting": { "speed": { "channel": 3 }, "errorLimit": { "record": 0, "percentage": 0.02 } }, "content": [{ "reader": { "name": "mysqlreader", "parameter": { "username": mysql_user, "password": mysql_passwd, "column": get_mysql_columns(source_database, source_table), "splitPk": "", "connection": [{ "table": [source_table], "jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database] }] } }, "writer": { "name": "hdfswriter", "parameter": { "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port, "fileType": "text", "path": "${targetdir}", "fileName": source_table, "column": get_hive_columns(source_database, source_table), "writeMode": "append", "fieldDelimiter": "\t", "compress": "gzip" } } }] } } if not os.path.exists(output_path): os.makedirs(output_path) with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f: json.dump(job, f) def main(args): source_database = "" source_table = "" options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl=']) for opt_name, opt_value in options: if opt_name in ('-d', '--sourcedb'): source_database = opt_value if opt_name in ('-t', '--sourcetbl'): source_table = opt_value generate_json(source_database, source_table) if __name__ == '__main__': main(sys.argv[1:])
注:(1)安装Python Mysql驱动
sudo yum install -y MySQL-python
(2)脚本使用说明
python gen_import_config.py -d database -t table ##通过-d传入数据库名,-t传入表名,执行上述命令即可生成该表的DataX同步配置文件。
(2)在~/bin目录下创建gen_import_config.sh脚本
vim ~/bin/gen_import_config.sh