我们接着来看数据采集模块
Flume采集日志数据到Kafka
首先我们需要用Flume采集日志数据到Kafka
配置
日志采集我们采用的是flume,比较传统成熟的日志采集项目。
首先我们从实时生成的日志文件通过flume采集到kafka中。log日志的格式是app-yyyy-mm-dd.log
CDH7.1.1中移除了flume组件,代替的是Nifi。我们直接将flume下载到节点中,具体的配置如下。
在/data0/apache-flume-1.9.0-bin/conf目录下创建file-flume-kafka.conf文件
[root@cdh3 conf]# cat file-flume-kafka.conf a1.sources=r1 a1.channels=c1 c2 # configure source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /data0/apache-flume-1.9.0-bin/test/log_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /tmp/logs/app.+ a1.sources.r1.fileHeader = true a1.sources.r1.channels = c1 c2 #interceptor a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = com.soundhearer.flume.interceptor.LogETLInterceptor$Builder a1.sources.r1.interceptors.i2.type = com.soundhearer.flume.interceptor.LogTypeInterceptor$Builder a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = topic a1.sources.r1.selector.mapping.topic_start = c1 a1.sources.r1.selector.mapping.topic_event = c2 # configure channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = cdh1.macro.com:9092,cdh2.macro.com:9092,cdh2.macro.com:9092 a1.channels.c1.kafka.topic = topic_start a1.channels.c1.parseAsFlumeEvent = false a1.channels.c1.kafka.consumer.group.id = flume-consumer a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c2.kafka.bootstrap.servers = cdh1.macro.com:9092,cdh2.macro.com:9092,cdh2.macro.com:9092 a1.channels.c2.kafka.topic = topic_event a1.channels.c2.parseAsFlumeEvent = false a1.channels.c2.kafka.consumer.group.id = flume-consumer
com.soundhearer.flume.interceptor.LogETLInterceptor和com.soundhearer.flume.interceptor.LogTypeInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。
Flume的ETL和分类型拦截器
本项目中自定义了两个拦截器,分别是:ETL拦截器、日志类型区分拦截器。
ETL拦截器主要用于,过滤时间戳不合法和Json数据不完整的日志
日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便发往Kafka的不同Topic。
具体的代码,在我的github上。
Kafka创建topic
我们通过命令行创建两个topic
kafka-topics --zookeeper cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka --create --topic topic_start --partitions 3 --replication-factor 2 kafka-topics --zookeeper cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka --create --topic topic_event --partitions 3 --replication-factor 2
查看topic
[root@cdh3 conf]# kafka-topics --zookeeper cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka --list 20/11/24 18:41:34 INFO zookeeper.ClientCnxn: Session establishment complete on server cdh2.macro.com/192.168.0.207:2181, sessionid = 0x1007949b99a034d, negotiated timeout = 30000 20/11/24 18:41:34 INFO zookeeper.ZooKeeperClient: [ZooKeeperClient] Connected. ATLAS_ENTITIES ATLAS_HOOK ATLAS_SPARK_HOOK __consumer_offsets fill topic_event topic_start 20/11/24 18:41:34 INFO zookeeper.ZooKeeperClient: [ZooKeeperClient] Closing. 20/11/24 18:41:34 INFO zookeeper.ZooKeeper: Session: 0x1007949b99a034d closed 20/11/24 18:41:34 INFO zookeeper.ClientCnxn: EventThread shut down for session: 0x1007949b99a034d 20/11/24 18:41:34 INFO zookeeper.ZooKeeperClient: [ZooKeeperClient] Closed.
kafka机器数量计算
Kafka机器数量(经验公式)=2(峰值生产速度副本数/100)+1
先拿到峰值生产速度,再根据设定的副本数,就能预估出需要部署Kafka的数量。
比如我们的峰值生产速度是50M/s。副本数为2。
Kafka机器数量=2(502/100)+ 1=3台
启动Flume采集
在/data0/apache-flume-1.9.0-bin/bin目录下执行如下命令
nohup flume-ng agent --name a1 --conf-file ../conf/file-flume-kafka.conf &
消费kafka topic数据,发现已经有数据了
kafka-console-consumer --bootstrap-server cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092 --from-beginning --topic topic_start kafka-console-consumer --bootstrap-server cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092 --from-beginning --topic topic_event 1606152976690|{"cm":{"ln":"-90.5","sv":"V2.0.7","os":"8.2.8","g":"ZPWDFI86@gmail.com","mid":"991","nw":"3G","l":"en","vc":"5","hw":"640*960","ar":"MX","uid":"991","t":"1606064013535","la":"-40.9","md":"Huawei-1","vn":"1.3.6","ba":"Huawei","sr":"Q"},"ap":"app","et":[{"ett":"1606139735962","en":"display","kv":{"goodsid":"244","action":"2","extend1":"1","place":"5","category":"81"}},{"ett":"1606060625200","en":"newsdetail","kv":{"entry":"1","goodsid":"245","news_staytime":"18","loading_time":"0","action":"1","showtype":"3","category":"23","type1":"102"}},{"ett":"1606148719063","en":"loading","kv":{"extend2":"","loading_time":"45","action":"2","extend1":"","type":"2","type1":"102","loading_way":"1"}},{"ett":"1606112496011","en":"comment","kv":{"p_comment_id":1,"addtime":"1606069010840","praise_count":692,"other_id":0,"comment_id":1,"reply_count":58,"userid":5,"content":"爹钧异"}},{"ett":"1606138524102","en":"favorites","kv":{"course_id":8,"id":0,"add_time":"1606078090460","userid":2}}]} 1606152976691|{"cm":{"ln":"-58.1","sv":"V2.6.0","os":"8.0.4","g":"R2Q998F1@gmail.com","mid":"995","nw":"3G","l":"en","vc":"2","hw":"640*960","ar":"MX","uid":"995","t":"1606111827871","la":"6.4","md":"Huawei-17","vn":"1.0.5","ba":"Huawei","sr":"I"},"ap":"app","et":[{"ett":"1606129460089","en":"newsdetail","kv":{"entry":"1","goodsid":"245","news_staytime":"42","loading_time":"0","action":"1","showtype":"5","category":"79","type1":"201"}},{"ett":"1606100900686","en":"ad","kv":{"entry":"3","show_style":"3","action":"4","detail":"102","source":"1","behavior":"1","content":"2","newstype":"8"}},{"ett":"1606098687596","en":"active_foreground","kv":{"access":"","push_id":"3"}},{"ett":"1606067052812","en":"active_background","kv":{"active_source":"3"}},{"ett":"1606068620224","en":"error","kv":{"errorDetail":"java.lang.NullPointerException\\n at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n at cn.lift.dfdf.web.AbstractBaseController.validInbound","errorBrief":"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)"}},{"ett":"1606076123601","en":"favorites","kv":{"course_id":6,"id":0,"add_time":"1606133566208","userid":2}}]}
Flume消费Kafka数据到HDFS
接着我们通过flume消费kafka数据到HDFS
配置
在cdh2节点部署另一个Flume,在/data0/apache-flume-1.9.0-bin/conf目录下创建kafka-flume-hdfs.conf文件
[root@cdh2 conf]# cat kafka-flume-hdfs.conf ## 组件 a1.sources=r1 r2 a1.channels=c1 c2 a1.sinks=k1 k2 ## source1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = cdh1.macro.com:9092,cdh2.macro.com:9092,cdh2.macro.com:9092 a1.sources.r1.kafka.topics=topic_start ## source2 a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r2.batchSize = 5000 a1.sources.r2.batchDurationMillis = 2000 a1.sources.r2.kafka.bootstrap.servers = cdh1.macro.com:9092,cdh2.macro.com:9092,cdh2.macro.com:9092 a1.sources.r2.kafka.topics=topic_event ## channel1 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /data0/apache-flume-1.9.0-bin/checkpoint/behavior1 a1.channels.c1.dataDirs = /data0/apache-flume-1.9.0-bin/data/behavior1/ a1.channels.c1.maxFileSize = 2146435071 a1.channels.c1.capacity = 1000000 a1.channels.c1.keep-alive = 6 ## channel2 a1.channels.c2.type = file a1.channels.c2.checkpointDir = /data0/apache-flume-1.9.0-bin/checkpoint/behavior2 a1.channels.c2.dataDirs = /data0/apache-flume-1.9.0-bin/data/behavior2/ a1.channels.c2.maxFileSize = 2146435071 a1.channels.c2.capacity = 1000000 a1.channels.c2.keep-alive = 6 ## sink1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = logstart- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = second ##sink2 a1.sinks.k2.type = hdfs a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d a1.sinks.k2.hdfs.filePrefix = logevent- a1.sinks.k2.hdfs.round = true a1.sinks.k2.hdfs.roundValue = 10 a1.sinks.k2.hdfs.roundUnit = second ## 不要产生大量小文件 a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k2.hdfs.rollInterval = 10 a1.sinks.k2.hdfs.rollSize = 134217728 a1.sinks.k2.hdfs.rollCount = 0 ## 控制输出文件是原生文件。 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k2.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = lzop a1.sinks.k2.hdfs.codeC = lzop ## 拼装 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1 a1.sources.r2.channels = c2 a1.sinks.k2.channel= c2
启动flume消费kafka
在HDFS中创建origin_data目录
hadoop fs -mkdir /origin_data
在/data0/apache-flume-1.9.0-bin/bin目录下执行如下命令
nohup flume-ng agent --name a1 --conf-file ../conf/kafka-flume-hdfs.conf &
可以看到HDFS origin_data目录下已经生成了数据,flume成功地消费kafka数据到HDFS中了。
[root@cdh2 bin]# hadoop fs -ls /origin_data/gmall/log Found 2 items drwxr-xr-x - hive hive 0 2020-11-24 02:41 /origin_data/gmall/log/topic_event drwxr-xr-x - hive hive 0 2020-11-24 10:19 /origin_data/gmall/log/topic_start [root@cdh2 bin]# hadoop fs -ls /origin_data/gmall/log/topic_event Found 1 items drwxr-xr-x - hive hive 0 2020-11-24 02:41 /origin_data/gmall/log/topic_event/2020-11-24