- 快排
代码实现:
/** * 快排 * 时间复杂度:平均时间复杂度为O(nlogn) * 空间复杂度:O(logn),因为递归栈空间的使用问题 */ public class QuickSort { public static void main(String[] args) { int [] a = {1,6,8,7,3,5,16,4,8,36,13,44}; QKSourt(a,0,a.length-1); for (int i:a) { System.out.print(i + " "); } } private static void QKSourt(int[] a, int start, int end) { if (a.length < 0){ return ; } if (start >= end){ return ; } int left = start; int right = end; int temp = a[left]; while (left < right){ while (left < right && a[right] > temp){ right -- ; } a[left] = a[right]; while (left < right && a[left] < temp){ left ++ ; } a[right] = a[left]; } a[left] = temp; System.out.println(Arrays.toString(a)); QKSourt(a, start, left -1); QKSourt(a,left+1,end); } } |
- 归并
核心思想:不断的将大的数组分成两个小数组,直到不能拆分为止,即形成了单个值。此时使用合并的排序思想对已经有序的数组进行合并,合并为一个大的数据,不断重复此过程,直到最终所有数据合并到一个数组为止。
代码实现:
/** * 快排 * 时间复杂度:O(nlogn) * 空间复杂度:O(n) */ def merge(left: List[Int], right: List[Int]): List[Int] = (left, right) match { case (Nil, _) => right case (_, Nil) => left case (x :: xTail, y :: yTail) => if (x <= y) x :: merge(xTail, right) else y :: merge(left, yTail) } |
- 二叉树之Scala实现
- 树执行查找、删除、插入的时间复杂度都是O(logN)
- 遍历二叉树的方法包括前序、中序、后序
- 非平衡树指的是根的左右两边的子节点的数量不一致
- 在非空二叉树中,第i层的结点总数不超过 , i>=1;
- 深度为h的二叉树最多有个结点(h>=1),最少有h个结点;
- 对于任意一棵二叉树,如果其叶结点数为N0,而度数为2的结点总数为N2,则N0=N2+1;
定义节点以及前序、中序、后序遍历
class TreeNode(treeNo:Int){ val no = treeNo var left:TreeNode = null var right:TreeNode = null
//后序遍历 def postOrder():Unit={ //向左递归输出左子树 if(this.left != null){ this.left.postOrder } //向右递归输出右子树 if (this.right != null) { this.right.postOrder }
//输出当前节点值 printf("节点信息 no=%d \n",no) }
//中序遍历 def infixOrder():Unit={ //向左递归输出左子树 if(this.left != null){ this.left.infixOrder() }
//输出当前节点值 printf("节点信息 no=%d \n",no)
//向右递归输出右子树 if (this.right != null) { this.right.infixOrder() } }
//前序遍历 def preOrder():Unit={ //输出当前节点值 printf("节点信息 no=%d \n",no)
//向左递归输出左子树 if(this.left != null){ this.left.postOrder() }
//向右递归输出右子树 if (this.right != null) { this.right.preOrder() } }
//后序遍历查找 def postOrderSearch(no:Int): TreeNode = { //向左递归输出左子树 var resNode:TreeNode = null if (this.left != null) { resNode = this.left.postOrderSearch(no) } if (resNode != null) { return resNode } if (this.right != null) { resNode = this.right.postOrderSearch(no) } if (resNode != null) { return resNode } println("ttt~~") if (this.no == no) { return this } resNode }
//中序遍历查找 def infixOrderSearch(no:Int): TreeNode = {
var resNode : TreeNode = null //先向左递归查找 if (this.left != null) { resNode = this.left.infixOrderSearch(no) } if (resNode != null) { return resNode } println("yyy~~") if (no == this.no) { return this } //向右递归查找 if (this.right != null) { resNode = this.right.infixOrderSearch(no) } return resNode
}
//前序查找 def preOrderSearch(no:Int): TreeNode = { if (no == this.no) { return this } //向左递归查找 var resNode : TreeNode = null if (this.left != null) { resNode = this.left.preOrderSearch(no) } if (resNode != null){ return resNode } //向右边递归查找 if (this.right != null) { resNode = this.right.preOrderSearch(no) }
return resNode }
//删除节点 //删除节点规则 //1如果删除的节点是叶子节点,则删除该节点 //2如果删除的节点是非叶子节点,则删除该子树
def delNode(no:Int): Unit = { //首先比较当前节点的左子节点是否为要删除的节点 if (this.left != null && this.left.no == no) { this.left = null return } //比较当前节点的右子节点是否为要删除的节点 if (this.right != null && this.right.no == no) { this.right = null return } //向左递归删除 if (this.left != null) { this.left.delNode(no) } //向右递归删除 if (this.right != null) { this.right.delNode(no) } } |
定义二叉树,前序、中序、后序遍历,前序、中序、后序查找,删除节点
class BinaryTree { var root: TreeNode = null
//后序遍历 def postOrder(): Unit = { if (root != null) { root.postOrder() } else { println("当前二叉树为空,不能遍历") } }
//中序遍历 def infixOrder(): Unit = { if (root != null) { root.infixOrder() } else { println("当前二叉树为空,不能遍历") } }
//前序遍历 def preOrder(): Unit = { if (root != null) { root.preOrder() } else { println("当前二叉树为空,不能遍历") } }
//后序遍历查找 def postOrderSearch(no: Int): TreeNode = { if (root != null) { root.postOrderSearch(no) } else { null } }
//中序遍历查找 def infixOrderSeacher(no: Int): TreeNode = { if (root != null) { return root.infixOrderSearch(no) } else { return null } }
//前序查找 def preOrderSearch(no: Int): TreeNode = {
if (root != null) { return root.preOrderSearch(no) } else { //println("当前二叉树为空,不能查找") return null } }
//删除节点 def delNode(no: Int): Unit = { if (root != null) { //先处理一下root是不是要删除的 if (root.no == no) { root = null } else { root.delNode(no) } }
} } |
- 手写Spark-WordCount
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(conf)
sc.textFile("/input") .flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _) .saveAsTextFile("/output")
sc.stop() |
- 项目架构
数据仓库的输入数据源和输出系统分别是什么?
输入系统:埋点产生的用户行为数据、JavaEE后台产生的业务数据。
输出系统:报表系统、用户画像系统、推荐系统
- Apache:运维麻烦,组件间兼容性需要自己调研。(一般大厂使用,技术实力雄厚,有专业的运维人员)
- CDH:国内使用最多的版本,但 CM不开源,但其实对中、小公司使用来说没有影响(建议使用)
- HDP:开源,可以进行二次开发,但是没有CDH稳定,国内使用较少
- 服务器选型
服务器使用物理机还是云主机?
- 机器成本考虑:
- 物理机:以128G内存,20核物理CPU,40线程,8THDD和2TSSD硬盘,单台报价4W出头,需考虑托管服务器费用。一般物理机寿命5年左右
- 云主机,以阿里云为例,差不多相同配置,每年5W
- 运维成本考虑:
属于研发部,技术总监下面有各个项目组,我们属于数据组,其他还有后端项目组,基础平台等。总监上面就是副总等级别了。其他的还有产品运营部等。
职级就分初级,中级,高级。晋升规则不一定,看公司效益和职位空缺。
小型公司(3人左右):组长1人,剩余组员无明确分工,并且可能兼顾javaEE和前端。
中小型公司(3~6人左右):组长1人,离线2人左右,实时1人左右(离线一般多于实时),JavaEE 1人(有或者没有人单独负责JavaEE,有时是有组员大数据和JavaEE一起做,或者大数据和前端一起做)。
中型公司(5~10人左右):组长1人,离线3~5人左右(离线处理、数仓),实时2人左右,JavaEE 1人左右(负责对接JavaEE业务),前端1人左右(有或者没有人单独负责前端)。
中大型公司(5~20人左右):组长1人,离线5~10人(离线处理、数仓),实时5人左右,JavaEE2人左右(负责对接JavaEE业务),前端1人(有或者没有人单独负责前端)。(发展比较良好的中大型公司可能大数据部门已经细化拆分,分成多个大数据组,分别负责不同业务)
上面只是参考配置,因为公司之间差异很大,例如ofo大数据部门只有5个人左右,因此根据所选公司规模确定一个合理范围,在面试前必须将这个人员配置考虑清楚,回答时要非常确定。
序号 |
命令 |
命令解释 |
1 |
top |
查看内存 |
2 |
df -h |
查看磁盘存储情况 |
3 |
iotop |
查看磁盘IO读写(yum install iotop安装) |
4 |
iotop -o |
直接查看比较高的磁盘读写程序 |
5 |
netstat -tunlp | grep 端口号 |
查看端口占用情况 |
6 |
uptime |
查看报告系统运行时长及平均负载 |
7 |
ps aux |
查看进程 |
- Shell常用工具
awk、sed、cut、sort
Hadoop 2.x 版本:
- dfs.namenode.http-address:50070
- dfs.datanode.http-address:50075
- SecondaryNameNode辅助名称节点端口号:50090
- dfs.datanode.address:50010
- fs.defaultFS:8020 或者9000
- yarn.resourcemanager.webapp.address:8088
- 历史服务器web访问端口:19888
Hadoop 3.x版本:(仅列出相对于2.x版本有更改的端口,箭头前为2.x,箭头后为3.x)
NameNode 的端口:
50070 --> 9870
8020 --> 9820,
50470 --> 9871
Secondary NameNode的端口:
50091 --> 9869
50090 --> 9868
DataNode 的端口:
50020 --> 9867
50010 --> 9866
50475 --> 9865
50075 --> 9864
Hadoop KMS 的端口:
16000 --> 9600
- 配置文件:
core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml
hadoop-env.sh、yarn-env.sh、mapred-env.sh、slaves
- 简单的集群搭建过程:
JDK安装
配置SSH免密登录
配置hadoop核心文件:
格式化namenode
HDFS读数据:
HDFS写数据:
MapReduce的详细工作流程:
一、Shuffle机制
- Map方法之后Reduce方法之前这段处理过程叫Shuffle
- Map方法之后,数据首先进入到分区方法,把数据标记好分区,然后把数据发送到环形缓冲区;环形缓冲区默认大小100m,环形缓冲区达到80%时,进行溢写;溢写前对数据进行排序,排序按照对key的索引进行字典顺序排序,排序的手段快排;溢写产生大量溢写文件,需要对溢写文件进行归并排序;对溢写的文件也可以进行Combiner操作,前提是汇总操作,求平均值不行。最后将文件按照分区存储到磁盘,等待Reduce端拉取。
- 每个Reduce拉取Map端对应分区的数据。拉取数据后先存储到内存中,内存不够了,再存储到磁盘。拉取完所有数据后,采用归并排序将内存和磁盘中的数据都进行排序。在进入Reduce方法前,可以对数据进行分组操作。
二、Hadoop优化
- HDFS小文件影响
- 影响NameNode的寿命,因为文件元数据存储在NameNode的内存中
- 影响计算引擎的任务数量,比如每个小的文件都会生成一个Map任务
- 数据输入小文件处理:
- 合并小文件:对小文件进行归档(Har)、自定义Inputformat将小文件存储成SequenceFile文件。
- 采用ConbinFileInputFormat来作为输入,解决输入端大量小文件场景。
- 对于大量小文件Job,可以开启JVM重用。
- Map阶段
- 增大环形缓冲区大小。由100m扩大到200m
- 增大环形缓冲区溢写的比例。由80%扩大到90%
- 减少对溢写文件的merge次数。
- 不影响实际业务的前提下,采用Combiner提前合并,减少 I/O。
- Reduce阶段
- 合理设置Map和Reduce数:两个都不能设置太少,也不能设置太多。太少,会导致Task等待,延长处理时间;太多,会导致 Map、Reduce任务间竞争资源,造成处理超时等错误。
- 设置Map、Reduce共存:调整slowstart.completedmaps参数,使Map运行到一定程度后,Reduce也开始运行,减少Reduce的等待时间。
- 规避使用Reduce,因为Reduce在用于连接数据集的时候将会产生大量的网络消耗。
- 增加每个Reduce去Map中拿数据的并行数
- 集群性能可以的前提下,增大Reduce端存储数据内存的大小。
- IO传输
- 采用数据压缩的方式,减少网络IO的的时间。安装Snappy和LZOP压缩编码器。
- 使用SequenceFile二进制文件
- 整体
- MapTask默认内存大小为1G,可以增加MapTask内存大小为4-5g
- ReduceTask默认内存大小为1G,可以增加ReduceTask内存大小为4-5g
- 可以增加MapTask的cpu核数,增加ReduceTask的CPU核数
- 增加每个Container的CPU核数和内存大小
- 调整每个Map Task和Reduce Task最大重试次数
三、压缩
压缩格式 |
Hadoop自带? |
算法 |
文件扩展名 |
支持切分 |
换成压缩格式后,原来的程序是否需要修改 |
DEFLATE |
是,直接使用 |
DEFLATE |
.deflate |
否 |
和文本处理一样,不需要修改 |
Gzip |
是,直接使用 |
DEFLATE |
.gz |
否 |
和文本处理一样,不需要修改 |
bzip2 |
是,直接使用 |
bzip2 |
.bz2 |
是 |
和文本处理一样,不需要修改 |
LZO |
否,需要安装 |
LZO |
.lzo |
是 |
需要建索引,还需要指定输入格式 |
Snappy |
否,需要安装 |
Snappy |
.snappy |
否 |
和文本处理一样,不需要修改 |
提示:如果面试过程问起,我们一般回答压缩方式为Snappy,特点速度快,缺点无法切分(可以回答在链式MR中,Reduce端输出使用bzip2压缩,以便后续的map任务对数据进行split)
四、切片机制
- 简单地按照文件的内容长度进行切片
- 切片大小,默认等于Block大小
- 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
提示:切片大小公式:max(0,min(Long_max,blockSize))
- Hadoop调度器重要分为三类:
FIFO(先进先出调度器)、Capacity Scheduler(容量调度器)和Fair Sceduler(公平调度器)。
Hadoop2.7.2默认的资源调度器是 容量调度器
- 区别:
FIFO调度器:先进先出,同一时间队列中只有一个任务在执行。
容量调度器:多队列;每个队列内部先进先出,同一时间队列中只有一个任务在执行。队列的并行度为队列的个数。
公平调度器:多队列;每个队列内部按照缺额大小分配资源启动任务,同一时间队列中有多个任务执行。队列的并行度大于等于队列的个数。
- 一定要强调生产环境中不是使用的FifoScheduler,面试的时侯会发现候选人大概了解这几种调度器的区别,但是问在生产环境用哪种,却说使用的FifoScheduler(企业生产环境一定不会用这个调度的)
启用lzo的压缩方式对于小规模集群是很有用处,压缩比率大概能降到原始日志大小的1/3。同时解压缩的速度也比较快。
Hadoop默认不支持LZO压缩,如果需要支持LZO压缩,需要添加jar包,并在hadoop的cores-site.xml文件中添加相关压缩配置。
- 环境准备
maven(下载安装,配置环境变量,修改sitting.xml加阿里云镜像)
gcc-c++
zlib-devel
autoconf
automake
libtool
通过yum安装即可
yum -y install gcc-c++ lzo-devel zlib-devel autoconf automake libtool
- 下载、安装并编译LZO
wget http://www.oberhumer.com/opensource/lzo/download/lzo-2.10.tar.gz
tar -zxvf lzo-2.10.tar.gz
cd lzo-2.10
./configure -prefix=/usr/local/hadoop/lzo/
make
make install
- 编译hadoop-lzo源码
- 下载hadoop-lzo的源码
下载地址:https://github.com/twitter/hadoop-lzo/archive/master.zip
- 解压之后,修改pom.xml
<hadoop.current.version>2.7.2</hadoop.current.version>
- 声明两个临时环境变量
export C_INCLUDE_PATH=/usr/local/hadoop/lzo/include
export LIBRARY_PATH=/usr/local/hadoop/lzo/lib
- 进入hadoop-lzo-master,执行maven编译命令
mvn package -Dmaven.test.skip=true
- 进入target,将hadoop-lzo-0.4.21-SNAPSHOT.jar放到hadoop的classpath下
如${HADOOP_HOME}/share/hadoop/common
- 修改core-site.xml增加配置支持LZO压缩
<configuration>
<property>
<name>io.compression.codecs</name>
<value>
org.apache.hadoop.io.compress.GzipCodec,
org.apache.hadoop.io.compress.DefaultCodec,
org.apache.hadoop.io.compress.BZip2Codec,
org.apache.hadoop.io.compress.SnappyCodec,
com.hadoop.compression.lzo.LzoCodec,
com.hadoop.compression.lzo.LzopCodec
</value>
</property>
<property>
<name>io.compression.codec.lzo.class</name>
<value>com.hadoop.compression.lzo.LzoCodec</value>
</property>
</configuration>
注意:
LZO本身是不支持分片的,但是我们给LZO压缩的文件加上索引,就支持分片了
- 在hdfs-site.xml文件中配置多目录,最好提前配置好,否则更改目录需要重新启动集群
- NameNode有一个工作线程池,用来处理不同DataNode的并发心跳以及客户端并发的元数据操作。
dfs.namenode.handler.count=20 * log2(Cluster Size),比如集群规模为10台时,此参数设置为60
- 编辑日志存储路径dfs.namenode.edits.dir设置与镜像文件存储路径dfs.namenode.name.dir尽量分开,达到最低写入延迟
- 服务器节点上YARN可使用的物理内存总量,默认是8192(MB),注意,如果你的节点内存资源不够8GB,则需要调减小这个值,而YARN不会智能的探测节点的物理内存总量。yarn.nodemanager.resource.memory-mb
- 单个任务可申请的最多物理内存量,默认是8192(MB)。yarn.scheduler.maximum-allocation-mb
Hadoop 带有一些基准测试程序,可以最少的准备成本轻松运行。基准测试被打包在测试程序JAR文件中,通过无参调用JAR文件可以得到其列表
- 查看信息:
- 写入速度测试
测试写入10个10M文件
- 测试读文件速度
- 命令查看
[root@node1 hadoop-2.6.0-cdh5.14.0]# cat TestDFSIO_results.log
----- TestDFSIO ----- : write
Date & time: Fri Nov 08 10:43:20 CST 2019
Number of files: 10
Total MBytes processed: 100.0
Throughput mb/sec: 4.551039912620034
Average IO rate mb/sec: 5.2242536544799805
IO rate std deviation: 2.1074511594328245
Test exec time sec: 52.394
----- TestDFSIO ----- : read
Date & time: Fri Nov 08 10:45:28 CST 2019
Number of files: 10
Total MBytes processed: 100.0
Throughput mb/sec: 73.85524372230428
Average IO rate mb/sec: 135.5804901123047
IO rate std deviation: 111.20953898062095
Test exec time sec: 28.231
- 如果MR造成系统宕机。此时要控制Yarn同时运行的任务数,和每个任务申请的最大内存。调整参数:yarn.scheduler.maximum-allocation-mb(单个任务可申请的最多物理内存量,默认是8192MB)
- 如果写入文件过量造成NameNode宕机。那么调高Kafka的存储大小,控制从Kafka到HDFS的写入速度。高峰期的时候用Kafka进行缓存,高峰期过去数据同步会自动跟上。
半数机制
三个核心选举原则:
- Zookeeper集群中只有超过半数以上的服务器启动,集群才能正常工作;
- 在集群正常工作之前,myid小的服务器给myid大的服务器投票,直到集群正常工作,选出Leader;
- 选出Leader之后,之前的服务器状态由Looking改变为Following,以后的服务器都是Follower
比如:
集群半数以上存活,集群可用
假设有五台服务器组成的zookeeper集群,它们的id从1-5,同时它们都是最新启动的
- 1启动,选自己
- 2启动,选自己(比1大,12选2)
- 3启动,选自己(123都选3,超过半数)当选leader
- 4启动,已有leader3
- 5启动,已有leader3
ls、get、create
- flume组成,Put事务,Take事务
Taildir Source:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
File Channel:数据存储在磁盘,宕机数据可以保存。但是传输速率慢。适合对数据传输可靠性要求高的场景,比如,金融行业。
Memory Channel:数据存储在内存中,宕机数据丢失。传输速率快。适合对数据传输可靠性要求不高的场景,比如,普通的日志数据。
Kafka Channel:减少了Flume的Sink阶段,提高了传输效率。
Source到Channel是Put事务
Channel到Sink是Take事务