3.4 服务器和JDK准备
3.4.1 服务器准备
第1章 Hadoop运行环境搭建 1.1 模板虚拟机环境准备 0)安装模板虚拟机,IP地址192.168.10.100、主机名称hadoop100、内存4G、硬盘50G 1)hadoop100虚拟机配置要求如下(本文Linux系统全部以CentOS-7.5-x86-1804为例) (1)使用yum安装需要虚拟机可以正常上网,yum安装前可以先测试下虚拟机联网情况 [root@hadoop100 ~]# ping www.baidu.com PING www.baidu.com (14.215.177.39) 56(84) bytes of data. 64 bytes from 14.215.177.39 (14.215.177.39): icmp_seq=1 ttl=128 time=8.60 ms 64 bytes from 14.215.177.39 (14.215.177.39): icmp_seq=2 ttl=128 time=7.72 ms (2)安装epel-release 注:Extra Packages for Enterprise Linux是为“红帽系”的操作系统提供额外的软件包,适用于RHEL、CentOS和Scientific Linux。相当于是一个软件仓库,大多数rpm包在官方 repository 中是找不到的) [root@hadoop100 ~]# yum install -y epel-release (3)注意:如果Linux安装的是最小系统版,还需要安装如下工具;如果安装的是Linux桌面标准版,不需要执行如下操作 net-tool:工具包集合,包含ifconfig等命令 [root@hadoop100 ~]# yum install -y net-tools vim:编辑器 [root@hadoop100 ~]# yum install -y vim 2)关闭防火墙,关闭防火墙开机自启 [root@hadoop100 ~]# systemctl stop firewalld [root@hadoop100 ~]# systemctl disable firewalld.service 注意:在企业开发时,通常单个服务器的防火墙时关闭的。公司整体对外会设置非常安全的防火墙 3)创建atguigu用户,并修改atguigu用户的密码 [root@hadoop100 ~]# useradd atguigu [root@hadoop100 ~]# passwd atguigu 4)配置atguigu用户具有root权限,方便后期加sudo执行root权限的命令 [root@hadoop100 ~]# vim /etc/sudoers 修改/etc/sudoers文件,在%wheel这行下面添加一行,如下所示: ## Allow root to run any commands anywhere root ALL=(ALL) ALL ## Allows people in group wheel to run all commands %wheel ALL=(ALL) ALL atguigu ALL=(ALL) NOPASSWD:ALL 注意:atguigu这一行不要直接放到root行下面,因为所有用户都属于wheel组,你先配置了atguigu具有免密功能,但是程序执行到%wheel行时,该功能又被覆盖回需要密码。所以atguigu要放到%wheel这行下面。 5)在/opt目录下创建文件夹,并修改所属主和所属组 (1)在/opt目录下创建module、software文件夹 [root@hadoop100 ~]# mkdir /opt/module [root@hadoop100 ~]# mkdir /opt/software (2)修改module、software文件夹的所有者和所属组均为atguigu用户 [root@hadoop100 ~]# chown atguigu:atguigu /opt/module [root@hadoop100 ~]# chown atguigu:atguigu /opt/software (3)查看module、software文件夹的所有者和所属组 [root@hadoop100 ~]# cd /opt/ [root@hadoop100 opt]# ll 总用量 12 drwxr-xr-x. 2 atguigu atguigu 4096 5月 28 17:18 module drwxr-xr-x. 2 root root 4096 9月 7 2017 rh drwxr-xr-x. 2 atguigu atguigu 4096 5月 28 17:18 software 6)卸载虚拟机自带的JDK 注意:如果你的虚拟机是最小化安装不需要执行这一步。 [root@hadoop100 ~]# rpm -qa | grep -i java | xargs -n1 rpm -e --nodeps rpm -qa:查询所安装的所有rpm软件包 grep -i:忽略大小写 xargs -n1:表示每次只传递一个参数 rpm -e –nodeps:强制卸载软件 7)重启虚拟机 [root@hadoop100 ~]# reboot 1.2 克隆虚拟机 1)利用模板机hadoop100,克隆三台虚拟机:hadoop102、hadoop103、hadoop104 注意:克隆时,要先关闭hadoop100 2)修改克隆机IP,以下以hadoop102举例说明 (1)修改克隆虚拟机的静态IP [atguigu@hadoop100 ~]# sudo vim /etc/sysconfig/network-scripts/ifcfg-ens33 改成 DEVICE=ens33 TYPE=Ethernet ONBOOT=yes BOOTPROTO=static NAME="ens33" IPADDR=192.168.10.102 PREFIX=24 GATEWAY=192.168.10.2 DNS1=192.168.10.2 (2)查看Linux虚拟机的虚拟网络编辑器,编辑->虚拟网络编辑器->VMnet8 (3)查看Windows系统适配器VMware Network Adapter VMnet8的IP地址 (4)保证Linux系统ifcfg-ens33文件中IP地址、虚拟网络编辑器地址和Windows系统VM8网络IP地址相同。 3)修改克隆机主机名,以下以hadoop102举例说明 (1)修改主机名称 [atguigu@hadoop100 ~]# sudo vim /etc/hostname hadoop102 (2)配置Linux克隆机主机名称映射hosts文件,打开/etc/hosts [atguigu@hadoop100 ~]# sudo vim /etc/hosts 添加如下内容 192.168.10.100 hadoop100 192.168.10.101 hadoop101 192.168.10.102 hadoop102 192.168.10.103 hadoop103 192.168.10.104 hadoop104 192.168.10.105 hadoop105 192.168.10.106 hadoop106 192.168.10.107 hadoop107 192.168.10.108 hadoop108 4)重启克隆机hadoop102 [atguigu@hadoop100 ~]# sudo reboot 5)修改windows的主机映射文件(hosts文件) (1)如果操作系统是window7,可以直接修改 ①进入C:\Windows\System32\drivers\etc路径 ②打开hosts文件并添加如下内容,然后保存 192.168.10.100 hadoop100 192.168.10.101 hadoop101 192.168.10.102 hadoop102 192.168.10.103 hadoop103 192.168.10.104 hadoop104 192.168.10.105 hadoop105 192.168.10.106 hadoop106 192.168.10.107 hadoop107 192.168.10.108 hadoop108 (2)如果操作系统是window10,先拷贝出来,修改保存以后,再覆盖即可 ①进入C:\Windows\System32\drivers\etc路径 ②拷贝hosts文件到桌面 ③打开桌面hosts文件并添加如下内容 192.168.10.100 hadoop100 192.168.10.101 hadoop101 192.168.10.102 hadoop102 192.168.10.103 hadoop103 192.168.10.104 hadoop104 192.168.10.105 hadoop105 192.168.10.106 hadoop106 192.168.10.107 hadoop107 192.168.10.108 hadoop108 ④将桌面hosts文件覆盖C:\Windows\System32\drivers\etc路径hosts文件
3.4.2 编写集群分发脚本xsync
1、xsync集群分发脚本
(1)需求:循环复制文件到所有节点的相同目录下
(2)需求分析
①rsync命令原始拷贝:
rsync -av /opt/module root@hadoop103:/opt/
②期望脚本:
xsync要同步的文件名称
③说明:在/home/zhm/bin这个目录下存放的脚本,zhm用户可以在系统任何地方直接执行。
(3)脚本实现
①在用的家目录/home/zhm下创建bin文件夹
②在/home/atguigu/bin目录下创建xsync文件,以便全局调用
在该文件中编写如下代码
#!/bin/bash #1. 判断参数个数 if [ $# -lt 1 ] then echo Not Enough Arguement! exit; fi #2. 遍历集群所有机器 for host in hadoop102 hadoop103 hadoop104 do echo ==================== $host ==================== #3. 遍历所有目录,挨个发送 for file in $@ do #4 判断文件是否存在 if [ -e $file ] then #5. 获取父目录 pdir=$(cd -P $(dirname $file); pwd) #6. 获取当前文件的名称 fname=$(basename $file) ssh $host "mkdir -p $pdir" rsync -av $pdir/$fname $host:$pdir else echo $file does not exists! fi done done
③修改脚本xsync具有执行权限
3.4.3 SSH无密登录配置
说明:这里面只配置了hadoop102、hadoop103到其他主机的无密登录;因为hadoop102未外配置的是NameNode,hadoop103配置的是ResourceManager,都要求对其他节点无密访问。
1、hadoop102上生成公钥和私钥:
ssh-keygen -t rsa (要cd ~/.ssh)
然后敲(三个回车),就会生成两个文件id_rsa(私钥)、id_rsa.pub(公钥)
2、将hadoop102公钥拷贝到要免密登录的目标机器上(在.ssh目录下,下面一样)
ssh-copy-id hadoop102 ssh-copy-id hadoop103 ssh-copy-id hadoop104
3、hadoop103上生成公钥和私钥:
ssh-keygen -t rsa
4、将hadoop103公钥拷贝到要免密登录的目标机器上
ssh-copy-id hadoop102 ssh-copy-id hadoop103 ssh-copy-id hadoop104
3.4.4 JDK准备
1、卸载现有JDK(3台节点)
sudo rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps
(1)rpm -qa:表示查询所有已经安装的软件包
(2)grep -i:表示过滤时不区分大小写
(3)xargs -n1:表示一次获取上次执行结果的一个值
(4)rpm -e --nodeps:表示卸载软件
2、用XShell工具将JDK导入到hadoop102的/opt/software文件夹下面
3、解压JDK到/opt/module目录下
tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/
4、配置JDK环境变量
(1)新建/etc/profile.d/my_env.sh文件
添加如下内容,然后保存(:wq)退出
#JAVA_HOME export JAVA_HOME=/opt/module/jdk export PATH=$PATH:$JAVA_HOME/bin
(2)让环境变量生效
source /etc/profile.d/my_env.sh
5、测试JDK是否安装成功
java -version
6、分发JDK
xsync /opt/module/jdk
7、分发环境变量配置文件
sudo /home/atguigu/bin/xsync /etc/profile.d/my_env.sh
8、分别在hadoop103、hadoop104上执行source
3.5 模拟数据
3.5.1 使用说明
1、将application.yml、gmall2020-mock-log-2021-10-10.jar、path.json、logback.xml上传到hadoop102的/opt/module/applog目录下
资源获取
链接:https://pan.baidu.com/s/1d_sumGVy5OxYGHT1HbB7xg
提取码:zhm6
(1)创建applog路径
(2)上传文件到/opt/module/applog目录
2、配置文件
(1)application.yml文件
可以根据需求生成对应日期的用户行为日志。
vim application.yml
修改如下内容
# 外部配置打开 logging.config: "./logback.xml" #业务日期 注意:并不是Linux系统生成日志的日期,而是生成数据中的时间 mock.date: "2020-06-14" #模拟数据发送模式 #mock.type: "http" #mock.type: "kafka" mock.type: "log" #http模式下,发送的地址 mock.url: "http://hdp1/applog" #kafka模式下,发送的地址 mock: kafka-server: "hdp1:9092,hdp2:9092,hdp3:9092" kafka-topic: "ODS_BASE_LOG" #启动次数 mock.startup.count: 200 #设备最大值 mock.max.mid: 500000 #会员最大值 mock.max.uid: 100 #商品最大值 mock.max.sku-id: 35 #页面平均访问时间 mock.page.during-time-ms: 20000 #错误概率 百分比 mock.error.rate: 3 #每条日志发送延迟 ms mock.log.sleep: 10 #商品详情来源 用户查询,商品推广,智能推荐, 促销活动 mock.detail.source-type-rate: "40:25:15:20" #领取购物券概率 mock.if_get_coupon_rate: 75 #购物券最大id mock.max.coupon-id: 3 #搜索关键词 mock.search.keyword: "图书,小米,iphone11,电视,口红,ps5,苹果手机,小米盒子"
(2)path.json,该文件用来配置访问路径
(3)logback配置文件
可配置日志生成路径,修改内容如下
<?xml version="1.0" encoding="UTF-8"?> <configuration> <property name="LOG_HOME" value="/opt/module/applog/log" /> <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%msg%n</pattern> </encoder> </appender> <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender"> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern> </rollingPolicy> <encoder> <pattern>%msg%n</pattern> </encoder> </appender> <!-- 将某一个包下日志单独打印日志 --> <logger name="com.atgugu.gmall2020.mock.log.util.LogUtil" level="INFO" additivity="false"> <appender-ref ref="rollingFile" /> <appender-ref ref="console" /> </logger> <root level="error" > <appender-ref ref="console" /> </root> </configuration>
(4)生成日志
- 进入到/opt/module/applog路径,执行以下命令
java -jar gmall2020-mock-log-2021-10-10.jar
- 在/opt/module/applog/log目录下查看生成日志
3.5.2 集群日志生成脚本
在hadoop102的/home/atguigu目录下创建bin目录,这样脚本可以在服务器的任何目录执行。
1、在/home/atguigu/bin目录下创建脚本lg.sh
2、在脚本中编写如下内容
#!/bin/bash for i in hadoop102 hadoop103; do echo "========== $i ==========" ssh $i "cd /opt/module/applog/; java -jar gmall2020-mock-log-2021-10-10.jar >/dev/null 2>&1 &" done
3、修改脚本执行权限
chmod 777 lg.sh
4、将jar包及配置文件上传至hadoop103的/opt/module/applog/路径
5、启动脚本
lg.sh
6、分别在hadoop102、hadoop103的/opt/module/applog/log目录上查看生成的数据
4、用户行为数据采集模块
4.1 数据通道
用户行为日志数据通道
4.2 环境准备
4.2.1 集群所有进程查看脚本
1、在/home/atguigu/bin目录下创建脚本jpsall
vim jpsall
2、在脚本中编写如下内容
#! /bin/bash for i in hadoop102 hadoop103 hadoop104 do echo --------- $i ---------- ssh $i jps done
3、修改脚本执行权限
chmod 777 jpsall
4、启动脚本
xcall
4.2.2 Hadoop安装
4.2.3 Zookeeper安装
4.2.4 Kafka安装
4.2.5 Flume安装
4.3 日志采集Flume
4.3.1 日志采集Flume概述
按照规划,需要采集的用户行为日志文件分布在hadoop102,hadoop103两台日志服务器,故需要在hadoop102,hadoop103两台节点配置日志采集Flume。日志采集Flume需要采集日志文件内容,并对日志格式(JSON)进行校验,然后将校验通过的日志发送到Kafka。
此处可选择TaildirSource和KafkaChannel,并配置日志校验拦截器。
选择TailDirSource和KafkaChannel的原因如下:
1、TailDirSource
TailDirSource相比ExecSource、SpoolingDirectorySource的优势
TailDirSource:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
SpoolingDirectorySource监控目录,支持断点续传。
2、KafkaChannel
采用Kafka Channel,省去了Sink,提高了效率。
日志采集Flume关键配置如下:
4.3.2 日志采集Flume配置实操
1、创建Flume配置文件
在hadoop102节点的Flume的job目录下创建file_to_kafka.conf
@hadoop104 flume]$ mkdir job @hadoop104 flume]$ vim job/file_to_kafka.conf
2、配置文件内容如下
#定义组件 a1.sources = r1 a1.channels = c1 #配置source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.* a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = org.zhm.gmall.flume.interceptor.ETLInterceptor$Builder #配置channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092 a1.channels.c1.kafka.topic = topic_log a1.channels.c1.parseAsFlumeEvent = false #组装 a1.sources.r1.channels = c1
3、编写Flume拦截器
(1)创建Maven工程flume-interceptor
(2)创建包:com.atguigu.gmall.flume.interceptor
(3)在pom.xml文件中添加如下配置
<dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
(4)在org.zhm.gmall.flume.utils包下创建JSONUtil类
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONException; public class JSONUtil { /* * 通过异常判断是否是json字符串 * 是:返回true 不是:返回false * */ public static boolean isJSONValidate(String log){ try { JSONObject.parseObject(log); return true; }catch (JSONException e){ return false; } } }
(5)在org.zhm.gmall.flume.interceptor包下创建ETLInterceptor类
import com.atguigu.gmall.flume.utils.JSONUtil; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.List; public class ETLInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { //1、获取body当中的数据并转成字符串 byte[] body = event.getBody(); String log = new String(body, StandardCharsets.UTF_8); //2、判断字符串是否是一个合法的json,是:返回当前event;不是:返回null if (JSONUtil.isJSONValidate(log)) { return event; } else { return null; } } @Override public List<Event> intercept(List<Event> list) { Iterator<Event> iterator = list.iterator(); while (iterator.hasNext()){ Event next = iterator.next(); if(intercept(next)==null){ iterator.remove(); } } return list; } public static class Builder implements Interceptor.Builder{ @Override public Interceptor build() { return new ETLInterceptor(); } @Override public void configure(Context context) { } } @Override public void close() { } }
(6)打包
(7)需要先将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面。
4.3.3 日志采集Flume测试
1、启动Zookeeper、Kafka集群
2、启动hadoop102的日志采集Flume
bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console
3、启动一个Kafka的Console-Consumer
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log
4、生成模拟数据
lg.sh
5、观察Kafka消费者是否能消费到数据
4.3.4 日志·采集Flume启停脚本
1、分发日志采集Flume配置文件和拦截器
若上述测试通过,需将hadoop102节点的Flume的配置文件和拦截器jar包,向另一台日志服务器发送一份。
scp -r job hadoop103:/opt/module/flume/ [atguigu@hadoop102 flume]$ scp lib/flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar hadoop103:/opt/module/flume/lib/
2、方便起见,此处编写一个日志采集Flume进程的启停脚本
在hadoop102节点的/home/atguigu/bin目录下创建脚本f1.sh
vim f1.sh
在脚本中填写如下内容
#!/bin/bash case $1 in "start"){ for i in hadoop102 hadoop103 do echo " --------启动 $i 采集flume-------" ssh $i "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf >/dev/null 2>&1 &" done };; "stop"){ for i in hadoop102 hadoop103 do echo " --------停止 $i 采集flume-------" ssh $i "ps -ef | grep file_to_kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 " done };; esac
3、增加脚本执行权限
chmod 777 f1.sh
4、f1启动
f1.sh start
5、f1停止
f1.sh stop