数据仓库实战 2

本文涉及的产品
云原生网关 MSE Higress,422元/月
日志服务 SLS,月写入数据量 50GB 1个月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 数据仓库实战 2

我们接着来看数据采集模块

Flume采集日志数据到Kafka

首先我们需要用Flume采集日志数据到Kafka

配置

日志采集我们采用的是flume,比较传统成熟的日志采集项目。

首先我们从实时生成的日志文件通过flume采集到kafka中。log日志的格式是app-yyyy-mm-dd.log

CDH7.1.1中移除了flume组件,代替的是Nifi。我们直接将flume下载到节点中,具体的配置如下。

在/data0/apache-flume-1.9.0-bin/conf目录下创建file-flume-kafka.conf文件

[root@cdh3 conf]# cat file-flume-kafka.conf 
a1.sources=r1
a1.channels=c1 c2
# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /data0/apache-flume-1.9.0-bin/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2
#interceptor
a1.sources.r1.interceptors =  i1 i2
a1.sources.r1.interceptors.i1.type = com.soundhearer.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.soundhearer.flume.interceptor.LogTypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2
# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = cdh1.macro.com:9092,cdh2.macro.com:9092,cdh2.macro.com:9092
a1.channels.c1.kafka.topic = topic_start
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer
a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = cdh1.macro.com:9092,cdh2.macro.com:9092,cdh2.macro.com:9092
a1.channels.c2.kafka.topic = topic_event
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer

com.soundhearer.flume.interceptor.LogETLInterceptor和com.soundhearer.flume.interceptor.LogTypeInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。

Flume的ETL和分类型拦截器

本项目中自定义了两个拦截器,分别是:ETL拦截器、日志类型区分拦截器。

ETL拦截器主要用于,过滤时间戳不合法和Json数据不完整的日志

日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便发往Kafka的不同Topic。

具体的代码,在我的github上。

Kafka创建topic

我们通过命令行创建两个topic

kafka-topics --zookeeper cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka --create --topic topic_start --partitions 3 --replication-factor 2
kafka-topics --zookeeper cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka --create --topic topic_event --partitions 3 --replication-factor 2

查看topic

[root@cdh3 conf]# kafka-topics --zookeeper cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka --list
20/11/24 18:41:34 INFO zookeeper.ClientCnxn: Session establishment complete on server cdh2.macro.com/192.168.0.207:2181, sessionid = 0x1007949b99a034d, negotiated timeout = 30000
20/11/24 18:41:34 INFO zookeeper.ZooKeeperClient: [ZooKeeperClient] Connected.
ATLAS_ENTITIES
ATLAS_HOOK
ATLAS_SPARK_HOOK
__consumer_offsets
fill
topic_event
topic_start
20/11/24 18:41:34 INFO zookeeper.ZooKeeperClient: [ZooKeeperClient] Closing.
20/11/24 18:41:34 INFO zookeeper.ZooKeeper: Session: 0x1007949b99a034d closed
20/11/24 18:41:34 INFO zookeeper.ClientCnxn: EventThread shut down for session: 0x1007949b99a034d
20/11/24 18:41:34 INFO zookeeper.ZooKeeperClient: [ZooKeeperClient] Closed.

kafka机器数量计算

Kafka机器数量(经验公式)=2(峰值生产速度副本数/100)+1

先拿到峰值生产速度,再根据设定的副本数,就能预估出需要部署Kafka的数量。

比如我们的峰值生产速度是50M/s。副本数为2。

Kafka机器数量=2(502/100)+ 1=3台

启动Flume采集

在/data0/apache-flume-1.9.0-bin/bin目录下执行如下命令

nohup flume-ng agent --name a1 --conf-file ../conf/file-flume-kafka.conf &

消费kafka topic数据,发现已经有数据了

kafka-console-consumer --bootstrap-server cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092 --from-beginning --topic topic_start
kafka-console-consumer --bootstrap-server cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092 --from-beginning --topic topic_event
1606152976690|{"cm":{"ln":"-90.5","sv":"V2.0.7","os":"8.2.8","g":"ZPWDFI86@gmail.com","mid":"991","nw":"3G","l":"en","vc":"5","hw":"640*960","ar":"MX","uid":"991","t":"1606064013535","la":"-40.9","md":"Huawei-1","vn":"1.3.6","ba":"Huawei","sr":"Q"},"ap":"app","et":[{"ett":"1606139735962","en":"display","kv":{"goodsid":"244","action":"2","extend1":"1","place":"5","category":"81"}},{"ett":"1606060625200","en":"newsdetail","kv":{"entry":"1","goodsid":"245","news_staytime":"18","loading_time":"0","action":"1","showtype":"3","category":"23","type1":"102"}},{"ett":"1606148719063","en":"loading","kv":{"extend2":"","loading_time":"45","action":"2","extend1":"","type":"2","type1":"102","loading_way":"1"}},{"ett":"1606112496011","en":"comment","kv":{"p_comment_id":1,"addtime":"1606069010840","praise_count":692,"other_id":0,"comment_id":1,"reply_count":58,"userid":5,"content":"爹钧异"}},{"ett":"1606138524102","en":"favorites","kv":{"course_id":8,"id":0,"add_time":"1606078090460","userid":2}}]}
1606152976691|{"cm":{"ln":"-58.1","sv":"V2.6.0","os":"8.0.4","g":"R2Q998F1@gmail.com","mid":"995","nw":"3G","l":"en","vc":"2","hw":"640*960","ar":"MX","uid":"995","t":"1606111827871","la":"6.4","md":"Huawei-17","vn":"1.0.5","ba":"Huawei","sr":"I"},"ap":"app","et":[{"ett":"1606129460089","en":"newsdetail","kv":{"entry":"1","goodsid":"245","news_staytime":"42","loading_time":"0","action":"1","showtype":"5","category":"79","type1":"201"}},{"ett":"1606100900686","en":"ad","kv":{"entry":"3","show_style":"3","action":"4","detail":"102","source":"1","behavior":"1","content":"2","newstype":"8"}},{"ett":"1606098687596","en":"active_foreground","kv":{"access":"","push_id":"3"}},{"ett":"1606067052812","en":"active_background","kv":{"active_source":"3"}},{"ett":"1606068620224","en":"error","kv":{"errorDetail":"java.lang.NullPointerException\\n    at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n at cn.lift.dfdf.web.AbstractBaseController.validInbound","errorBrief":"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)"}},{"ett":"1606076123601","en":"favorites","kv":{"course_id":6,"id":0,"add_time":"1606133566208","userid":2}}]}

Flume消费Kafka数据到HDFS

接着我们通过flume消费kafka数据到HDFS

配置

在cdh2节点部署另一个Flume,在/data0/apache-flume-1.9.0-bin/conf目录下创建kafka-flume-hdfs.conf文件

[root@cdh2 conf]# cat kafka-flume-hdfs.conf 
## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = cdh1.macro.com:9092,cdh2.macro.com:9092,cdh2.macro.com:9092
a1.sources.r1.kafka.topics=topic_start
## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = cdh1.macro.com:9092,cdh2.macro.com:9092,cdh2.macro.com:9092
a1.sources.r2.kafka.topics=topic_event
## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data0/apache-flume-1.9.0-bin/checkpoint/behavior1
a1.channels.c1.dataDirs = /data0/apache-flume-1.9.0-bin/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## channel2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /data0/apache-flume-1.9.0-bin/checkpoint/behavior2
a1.channels.c2.dataDirs = /data0/apache-flume-1.9.0-bin/data/behavior2/
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 10
a1.sinks.k2.hdfs.roundUnit = second
## 不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream 
a1.sinks.k2.hdfs.fileType = CompressedStream 
a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2

启动flume消费kafka

在HDFS中创建origin_data目录

hadoop fs -mkdir /origin_data

在/data0/apache-flume-1.9.0-bin/bin目录下执行如下命令

nohup flume-ng agent --name a1 --conf-file ../conf/kafka-flume-hdfs.conf &

可以看到HDFS origin_data目录下已经生成了数据,flume成功地消费kafka数据到HDFS中了。

[root@cdh2 bin]# hadoop fs -ls /origin_data/gmall/log
Found 2 items
drwxr-xr-x   - hive hive          0 2020-11-24 02:41 /origin_data/gmall/log/topic_event
drwxr-xr-x   - hive hive          0 2020-11-24 10:19 /origin_data/gmall/log/topic_start
[root@cdh2 bin]# hadoop fs -ls /origin_data/gmall/log/topic_event
Found 1 items
drwxr-xr-x   - hive hive          0 2020-11-24 02:41 /origin_data/gmall/log/topic_event/2020-11-24
相关文章
|
3月前
|
机器学习/深度学习 消息中间件 搜索推荐
【数据飞轮】驱动业务增长的高效引擎 —从数据仓库到数据中台的技术进化与实战
在数据驱动时代,企业逐渐从数据仓库过渡到数据中台,并进一步发展为数据飞轮。本文详细介绍了这一演进路径,涵盖数据仓库的基础存储与查询、数据中台的集成与实时决策,以及数据飞轮的自动化增长机制。通过代码示例展示如何在实际业务中运用数据技术,实现数据的最大价值,推动业务持续优化与增长。
140 4
|
存储 SQL Cloud Native
《阿里云认证的解析与实战-数据仓库ACP认证》——云原生数据仓库AnalyticDB MySQL版解析与实践(上)——一、产品概述
《阿里云认证的解析与实战-数据仓库ACP认证》——云原生数据仓库AnalyticDB MySQL版解析与实践(上)——一、产品概述
|
存储 SQL 弹性计算
《阿里云认证的解析与实战-数据仓库ACP认证》——云原生数据仓库AnalyticDB MySQL版解析与实践(上)——二、产品架构及原理
《阿里云认证的解析与实战-数据仓库ACP认证》——云原生数据仓库AnalyticDB MySQL版解析与实践(上)——二、产品架构及原理
|
SQL JSON Cloud Native
《阿里云认证的解析与实战-数据仓库ACP认证》——云原生数据仓库AnalyticDB MySQL版解析与实践(上)——三、产品相关概念(上)
《阿里云认证的解析与实战-数据仓库ACP认证》——云原生数据仓库AnalyticDB MySQL版解析与实践(上)——三、产品相关概念(上)
|
存储 固态存储 Cloud Native
《阿里云认证的解析与实战-数据仓库ACP认证》——云原生数据仓库AnalyticDB MySQL版解析与实践(上)——三、产品相关概念(下)
《阿里云认证的解析与实战-数据仓库ACP认证》——云原生数据仓库AnalyticDB MySQL版解析与实践(上)——三、产品相关概念(下)
|
存储 SQL Cloud Native
《阿里云认证的解析与实战-数据仓库ACP认证》——云原生数据仓库AnalyticDB MySQL版解析与实践(下)——一、弹性能力(资源池、分时弹性)
《阿里云认证的解析与实战-数据仓库ACP认证》——云原生数据仓库AnalyticDB MySQL版解析与实践(下)——一、弹性能力(资源池、分时弹性)
|
SQL 分布式计算 DataWorks
《阿里云认证的解析与实战-数据仓库ACP认证》——云原生数据仓库AnalyticDB MySQL版解析与实践(下)——二、数据导入导出与同步链路搭建(上)
《阿里云认证的解析与实战-数据仓库ACP认证》——云原生数据仓库AnalyticDB MySQL版解析与实践(下)——二、数据导入导出与同步链路搭建(上)
|
SQL 存储 分布式计算
《阿里云认证的解析与实战-数据仓库ACP认证》——云原生数据仓库AnalyticDB MySQL版解析与实践(下)——二、数据导入导出与同步链路搭建(下)
《阿里云认证的解析与实战-数据仓库ACP认证》——云原生数据仓库AnalyticDB MySQL版解析与实践(下)——二、数据导入导出与同步链路搭建(下)
|
存储 SQL Cloud Native
《阿里云认证的解析与实战-数据仓库ACP认证》——云原生数据仓库AnalyticDB MySQL版解析与实践(下)——三、SQL优化与慢查询解决(上)
《阿里云认证的解析与实战-数据仓库ACP认证》——云原生数据仓库AnalyticDB MySQL版解析与实践(下)——三、SQL优化与慢查询解决(上)
|
SQL 存储 缓存
《阿里云认证的解析与实战-数据仓库ACP认证》——云原生数据仓库AnalyticDB MySQL版解析与实践(下)——三、SQL优化与慢查询解决(下)
《阿里云认证的解析与实战-数据仓库ACP认证》——云原生数据仓库AnalyticDB MySQL版解析与实践(下)——三、SQL优化与慢查询解决(下)

热门文章

最新文章