大数据实战项目之电商数仓(一):https://developer.aliyun.com/article/1535211
flume数据采集通道搭建
flume第一层采集通道
设置flume的配置文件f1.conf
#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔 a1.sources = r1 a1.sinks = k1 a1.channels = c1 #组名名.属性名=属性值 a1.sources.r1.type=TAILDIR a1.sources.r1.filegroups=f1 #读取/tmp/logs/app-yyyy-mm-dd.log ^代表以xxx开头$代表以什么结尾 .代表匹配任意字符 #+代表匹配任意位置 a1.sources.r1.filegroups.f1=/tmp/logs/^app.+.log$ #JSON文件的保存位置 a1.sources.r1.positionFile=/opt/module/flume/test/log_position.json #定义拦截器 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.atguigu.dw.flume.MyInterceptor$Builder #定义sink a1.sinks.k1.type=logger #定义chanel a1.channels.c1.type=memory a1.channels.c1.capacity=1000 #连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据! a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
flume第一层通道的启动和关闭脚本f1
#!/bin/bash if(($#!=1)) then echo 请输入start或stop! exit; fi cmd=cmd if [ $1 = start ] then cmd="nohup flume-ng agent -c $FLUME_HOME/conf/ -n a1 -f $FLUME_HOME/myagents/f1.conf -Dflume.root.logger=DEBUG,console > /home/atguigu/f1.log 2>&1 &" elif [ $1 = stop ] then cmd="ps -ef | grep f1.conf | grep -v grep | awk -F ' ' '{print \$2}' | xargs kill -9" else echo 请输入start或stop! fi #在hadoop102和hadoop103开启采集 for i in hadoop102 hadoop103 do ssh $i $cmd done
flume第二层采集通道
设置flume的配置文件f1.conf
#配置文件编写 a1.sources = r1 r2 a1.sinks = k1 k2 a1.channels = c1 c2 #配置source a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource a1.sources.r1.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sources.r1.kafka.topics=topic_start a1.sources.r1.kafka.consumer.auto.offset.reset=earliest a1.sources.r1.kafka.consumer.group.id=CG_Start a1.sources.r2.type=org.apache.flume.source.kafka.KafkaSource a1.sources.r2.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sources.r2.kafka.topics=topic_event a1.sources.r2.kafka.consumer.auto.offset.reset=earliest a1.sources.r2.kafka.consumer.group.id=CG_Event #配置channel a1.channels.c1.type=file a1.channels.c1.checkpointDir=/opt/module/flume/c1/checkpoint #启动备用checkpoint a1.channels.c1.useDualCheckpoints=true a1.channels.c1.backupCheckpointDir=/opt/module/flume/c1/backupcheckpoint #event存储的目录 a1.channels.c1.dataDirs=/opt/module/flume/c1/datas a1.channels.c2.type=file a1.channels.c2.checkpointDir=/opt/module/flume/c2/checkpoint a1.channels.c2.useDualCheckpoints=true a1.channels.c2.backupCheckpointDir=/opt/module/flume/c2/backupcheckpoint a1.channels.c2.dataDirs=/opt/module/flume/c2/datas #sink a1.sinks.k1.type = hdfs #一旦路径中含有基于时间的转义序列,要求event的header中必须有timestamp=时间戳,如果没有需要将useLocalTimeStamp = true a1.sinks.k1.hdfs.path = hdfs://hadoop102:9000/origin_data/gmall/log/topic_start/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = logstart- a1.sinks.k1.hdfs.batchSize = 1000 #文件的滚动 #60秒滚动生成一个新的文件 a1.sinks.k1.hdfs.rollInterval = 30 #设置每个文件到128M时滚动 a1.sinks.k1.hdfs.rollSize = 134217700 #禁用基于event数量的文件滚动策略 a1.sinks.k1.hdfs.rollCount = 0 #指定文件使用LZO压缩格式 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = lzop #a1.sinks.k1.hdfs.round = true #a1.sinks.k1.hdfs.roundValue = 10 #a1.sinks.k1.hdfs.roundUnit = second a1.sinks.k2.type = hdfs a1.sinks.k2.hdfs.path = hdfs://hadoop102:9000/origin_data/gmall/log/topic_event/%Y-%m-%d a1.sinks.k2.hdfs.filePrefix = logevent- a1.sinks.k2.hdfs.batchSize = 1000 a1.sinks.k2.hdfs.rollInterval = 30 a1.sinks.k2.hdfs.rollSize = 134217700 a1.sinks.k2.hdfs.rollCount = 0 a1.sinks.k2.hdfs.fileType = CompressedStream a1.sinks.k2.hdfs.codeC = lzop #a1.sinks.k2.hdfs.round = true #a1.sinks.k2.hdfs.roundValue = 10 #a1.sinks.k2.hdfs.roundUnit = second #连接组件 a1.sources.r1.channels=c1 a1.sources.r2.channels=c2 a1.sinks.k1.channel=c1 a1.sinks.k2.channel=c2
flume写入hdfs采用lzo格式需要先向core-site.xml添加相关压缩格式的配置
<property> <name>io.compression.codecs</name> <value> com.hadoop.compression.lzo.LzoCodec, com.hadoop.compression.lzo.LzopCodec </value> </property> <property> <name>io.compression.codec.lzo.class</name> <value>com.hadoop.compression.lzo.LzoCodec</value> </property>
flume第二层通道的启动和关闭脚本f2
#!/bin/bash if(($#!=1)) then echo 请输入start或stop! exit; fi if [ $1 = start ] then ssh hadoop104 "nohup flume-ng agent -c $FLUME_HOME/conf/ -n a1 -f $FLUME_HOME/myagents/f2.conf -Dflume.root.logger=INFO,console > /home/atguigu/f2.log 2>&1 &" elif [ $1 = stop ] then ssh hadoop104 "ps -ef | grep f2.conf | grep -v grep | awk -F ' ' '{print \$2}' | xargs kill -9" else echo 请输入start或stop! fi
数据采集集群一键启动脚本
#!/bin/bash if(($#!=1)) then echo 请输入start或stop! exits; fi #编写函数,这个函数的功能为返回集群中启动成功的broker的数量 function countKafkaBrokers() { count=0 for((i=102;i<=104;i++)) do result=$(ssh hadoop$i "jps | grep Kafka | wc -l") count=$[$result+$count] done #函数可以定义返回值,如果定义,返回函数最后一行命令的状态(返回0,代表成功,非0,即为异常) return $count } #启动 if [ $1 = start ] then zk start hd start kf start #保证kafka集群已经启动,才能启动f1,f2采集通道 while [ 1 ] do countKafkaBrokers if(($?==3)) then break fi sleep 2s done f1 start f2 start #查看已经启动进程 xcall jps elif [ $1 = stop ] then f1 stop f2 stop kf stop #在kafka没有停止完成之前,不能停止zk集群 while [ 1 ] do countKafkaBrokers if(($?==0)) then break fi sleep 2s done zk stop hd stop #查看还剩那些进程 xcall jps else echo echo 请输入start或stop! fi
HDFS-HA配置
配置nameservice,编写hdfs-sitx.xml
vim hdfs-site.xml • 1
<property> <name>dfs.nameservices</name> <value>mycluster</value> </property> <property> <name>dfs.ha.namenodes.mycluster</name> <value>nn1,nn2</value> </property> <property> <name>dfs.namenode.rpc-address.mycluster.nn1</name> <value>hadoop101:8020</value> </property> <property> <name>dfs.namenode.http-address.mycluster.nn1</name> <value>hadoop101:50070</value> </property> <property> <name>dfs.namenode.http-address.mycluster.nn2</name> <value>hadoop102:50070</value> </property> <property> <name>dfs.namenode.rpc-address.mycluster.nn2</name> <value>hadoop102:8020</value> </property> <property> <property> <name>dfs.namenode.shared.edits.dir</name> <value>qjournal://hadoop101:8485;hadoop102:8485;hadoop103:8485/mycluster</value> </property> <property> <name>dfs.journalnode.edits.dir</name> <value>/opt/module/hadoop-2.7.2/data/data</value> </property> <!-- 配置隔离机制,即同一时刻只能有一台服务器对外响应 --> <property> <name>dfs.ha.fencing.methods</name> <value>sshfence</value> </property> <!-- 使用隔离机制时需要ssh无秘钥登录--> <property> <name>dfs.ha.fencing.ssh.private-key-files</name> <value>/home/atguigu/.ssh/id_rsa</value> </property> <property> <name>dfs.client.failover.proxy.provider.mycluster</name> <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value> </property> <property> <name>dfs.namenode.secondary.http-address</name> <value>hadoop103:50090</value> </property> <!--配置故障自动转义--> <property> <name>dfs.ha.automatic-failover.enabled</name> <value>true</value>
(2)编写core-site.xml
<!-- 指定HDFS中NameNode的地址 --> <property> <name>fs.defaultFS</name> <value>hdfs://mycluster</value> </property> <!--配置hadoop运行时临时文件--> <property> <name>hadoop.tmp.dir</name> <value>/opt/module/hadoop-2.7.2/data/tmp</value> </property> <!-- 指定Hadoop运行时产生文件的存储目录 --> <!--<property> <name>hadoop.tmp.dir</name> <value>/opt/module/hadoop-2.7.2/data/tmp</value> </property>--> <!--配置zookeeper地址--> <property> <name>ha.zookeeper.quorum</name> <value>hadoop101:2181,hadoop102:2181,hadoop103:2181</value> </property>
启动journalnode
xcall hadoop-daemon.sh start journalnode • 1
在nn1上对namenode进行格式化
hadoop namenode -format hdfs namenode -bootstrapStandby
在nn2上对namenode信息进行拷贝
stop-all.sh hdfs zkfc -formatZK start-dfs.sh
ResouceManager-HA配置
(1)编写yarn-site.xml
<property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.resourcemanager.cluster-id</name> <value>cluster1</value> </property> <property> <name>yarn.resourcemanager.ha.rm-ids</name> <value>rm1,rm2</value> </property> <property> <name>yarn.resourcemanager.hostname.rm1</name> <value>hadoop101</value> </property> <property> <name>yarn.resourcemanager.hostname.rm2</name> <value>hadoop102</value> </property> <property> <name>yarn.resourcemanager.webapp.address.rm1</name> <value>hadoop101:8088</value> </property> <property> <name>yarn.resourcemanager.webapp.address.rm2</name> <value>hadoop102:8088</value> </property> <property> <name>yarn.resourcemanager.zk-address</name> <value>hadoop101:2181,hadoop102:2181,hadoop103:2181</value> </property> <!--启用自动恢复--> <property> <name>yarn.resourcemanager.recovery.enabled</name> <value>true</value> </property> <!--指定resourcemanager的状态信息存储在zookeeper集群--> <property> <name>yarn.resourcemanager.store.class</name> <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value> </property> <property> <name>yarn.resourcemanager.ha.enabled</name> <value>true</value> </property> <!-- 日志聚集功能使能 --> <property> <name>yarn.log-aggregation-enable</name> <value>true</value> </property> <!-- 日志保留时间设置7天 --> <property> <name>yarn.log-aggregation.retain-seconds</name> <value>604800</value> </property>