1.kafka为什么不支持读写分离?
在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从 而实现的是一种主写主读的生产消费模型。主写从读也就是读写分离,容易产生数据一致性问题,延时问题。
2.hadoop造成宕机的原因以及恢复的方法?
1)如果MR造成系统宕机。此时要控制Yarn同时运行的任务数,和每个任务申请的最大内存。调整参数:yarn.scheduler.maximum-allocation-mb(单个任务可申请的最多物理内存量,默认是8192MB)
2)如果写入文件过快造成NameNode宕机。那么调高Kafka的存储大小,控制从Kafka到HDFS的写入速度。例如,可以调整Flume每批次拉取数据量的大小参数batchsize。。
3.kafka的分区分配策略?+1-----------
在 Kafka内部存在两种默认的分区分配策略:Range和 RoundRobin。
Range是默认分区:假如有10个分区,3个消费者,把分区按照序号排列0,1,2,3,4,5,6,7,8,9;消费者为C1,C2,C3,那么用分区数除以消费者数来决定每个Consumer消费几个Partition,除不尽的前面几个消费者将会多消费一个 。
RoundRobin 轮询分区:把所有的partition和consumer列出来,然后轮询consumer和partition,尽可能的让把partition均匀的分配给consumer
4.hbase如何保证数据的安全可靠性?
hbase写入数据时,为了提升写入效率,先写入缓存memory store,默认为128M,缓存满了会触发flush落盘,写入hfile文件。
但这种方式并不安全,如果缓存未满之前region server挂掉,还未来得及落盘的数据就会丢失。
hbase提供的解决方案是在写入缓存之前写入hlog日志文件。
宕机之后,hbase会找到对应的hlog日志,将oldestUnflushedSequenceId之后的数据恢复出来,写入缓存,数据就得到了恢复。
5.在spark中创建临时表和全局临时表的方法是什么?
DataFrame.createTempView() 创建普通临时表
DataFrame.createGlobalTempView() DataFrame.createOrReplaceTempView() 创建全局临时表
6.sparksql中join操作和leftjoin操作的区别?
join和sql中的inner join操作很相似,返回结果是前面一个集合和后面一个集合中匹配成功的,过滤掉关联不上的。
leftJoin类似于SQL中的左外关联left outer join,返回结果以第一个RDD为主,关联不上的记录为空。
部分场景下可以使用left semi join替代left join:
因为 left semi join 是 in(keySet) 的关系,遇到右表重复记录,左表会跳过,性能更高,而 left join 则会一直遍历。但是left semi join 中最后 select 的结果中只许出现左表中的列名,因为右表只有 join key 参与关联计算了
7.spark中driver的作用是什么?
Spark的驱动器是执行开发程序中的main方法的进程。
1)把用户程序转为作业(JOB)
2)跟踪Executor的运行状况
3)为执行器节点调度任务
4)UI展示应用运行状况
8.hadoop的数据备份与恢复?
9.hdfs读写流程?
读流程
1)客户端通过Distributed FileSystem(分布式文件系统)向NameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址。
2)挑选一台DataNode(就近原则,然后随机)服务器,请求读取数据。
3)DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来做校验)。
4)客户端以Packet为单位接收,先在本地缓存,然后写入目标文件。
写流程
1)客户端通过Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已存在,父目录是否存在。
2)NameNode返回是否可以上传。
3)客户端请求第一个 Block上传到哪几个DataNode服务器上。
4)NameNode返回3个DataNode节点,分别为dn1、dn2、dn3。
5)客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成。
6)dn1、dn2、dn3逐级应答客户端。
7)客户端开始往dn1上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以Packet为单位,dn1收到一个Packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个应答队列等待应答。
8)当一个Block传输完成之后,客户端再次请求NameNode上传第二个Block的服务器。(重复执行3-7步)。
10两台namenode的作用?+1---------
1.NameNode它是hadoop中的主服务器,管理文件系统名称空间和对集群中存储的文件的访问,保存有metadate。
2.SecondaryNameNode它不是namenode的冗余守护进程,而是提供周期检查点和清理任务。帮助NN合并editslog,减少NN启动时间。
11.两台namenode是怎么做数据同步的?
1) sn向nn请求是否需要同步元数据信息
如果需要同步,则nn会滚动当前正在写的edits.inprogressing
2) s n把所有的edits(日志)文件和fsimage(镜像)文件下载到sn(只有第一次同步会下载nn的fsimage文件,后面同步只需要下载edits文件即可)
3)在sn把edits文件和fsimage文件加载到内存合并成新fsimage文件,重命名为 fsimage.checkpoint文件
4)把fsimage.checkpoint文件上传到nn上,并重命名为fsimage,整个元数据同步完成
12.使用mapreduce怎么处理数据倾斜?
1)提前在map进行combine,减少传输的数据量
在Mapper加上combiner相当于提前进行reduce,即把一个Mapper中的相同key进行了聚合,减少shuffle过程中传输的数据量,以及Reducer端的计算量。
如果导致数据倾斜的key大量分布在不同的mapper的时候,这种方法就不是很有效了。
导致数据倾斜的key 大量分布在不同的mapper
(1)局部聚合加全局聚合。
第一次在map阶段对那些导致了数据倾斜的key 加上1到n的随机前缀,这样本来相同的key 也会被分到多个Reducer中进行局部聚合,数量就会大大降低。
第二次mapreduce,去掉key的随机前缀,进行全局聚合。
思想:二次mr,第一次将key随机散列到不同reducer进行处理达到负载均衡目的。第二次再根据去掉key的随机前缀,按原key进行reduce处理。
这个方法进行两次mapreduce,性能稍差。
(2)增加Reducer,提升并行度
13.hdfs数据安全性是如何保证的?
1)、存储在HDFS系统上的文件,会分割成128M大小的block存储在不同的节点上,block的副本数默认3份,也可配置成更多份;
2)、第一个副本一般放置在与client(客户端)所在的同一节点上(若客户端无datanode,则随机放),第二个副本放置到与第一个副本同一机架的不同节点,第三个副本放到不同机架的datanode节点,当取用时遵循就近原则;
3)、datanode已block为单位,每3s报告心跳状态,做10min内不报告心跳状态则namenode认为block已死掉,namonode会把其上面的数据备份到其他一个datanode节点上,保证数据的副本数量;
4)、datanode会默认每小时把自己节点上的所有块状态信息报告给namenode;
5)、采用safemode模式:datanode会周期性的报告block信息。Namenode会计算block的损坏率,当阀值<0.999f时系统会进入安全模式,HDFS只读不写。 HDFS元数据采用secondaryname备份或者HA备份
14.一条HQL假如我们使用count(distinct)没有groupby 怎么优化?
mapjoin,没有shuffle,没有倾斜。
开启数据倾斜时负载均衡
15.分区和分桶的区别?
1.分桶随机分割数据库,分区是非随机分割数据库。因为分桶是按照列的哈希函数进行分割的,相对比较平均;而分区是按照列的值来进行分割的,容易造成数据倾斜。
2.分桶是对应不同的文件(细粒度),分区是对应不同的文件夹(粗粒度)。桶是更为细粒度的数据范围划分,分桶的比分区获得更高的查询处理效率,使取样更高效。
3.注意:普通表(外部表、内部表)、分区表这三个都是对应HDFS上的目录,桶表对应是目录里的文件
16.partition by和distribute by的区别?
partition by是分区 区内排序用order by
Distribute by 可以理解为分簇 簇内排序用sort by 另外当 distribute by 和 sorts by 后的字段相同时,可以使用 cluster by 方式
partitioned by (分区名 string) 按所分区名分区建表使用
clustered by(列名) 按列分桶建表使用
17.七天内连续三天活跃指标怎么实现的?
1)查询出最近7天的活跃用户,并对用户活跃日期进行排名
2)计算用户活跃日期及排名之间的差值
3)对同用户及差值分组,统计差值个数
4)将差值相同个数大于等于3的数据取出,然后去重(去的是什么重???),即为连续3天及以上活跃的用户
7天连续收藏、点赞、购买、加购、付款、浏览、商品点击、退货
18.sql语句的执行顺序?
(1)from
(2)on
(3)join
(4)where
(5)group by 同时计算聚合函数
(6)having
(7)select
(8)distinct
(9)distribute by /cluster by
(10)sort by 局部排序,每个reduce内部的排序
(11) order by
(12) limit
(13) union /union all
若有开窗函数,执行完非开窗函数后,select等待执行完开窗函数,然后执行完select,开窗函数通过表数据进行分区和排序,跟select查询中的字段是平行关系,不依赖查询字段
19.spark on hive 和 hive on spark区别?+1----------
Hive on Spark:Hive既作为存储元数据又负责SQL的解析优化,语法是HQL语法,执行引擎变成了Spark,Spark负责采用RDD执行。
Spark on Hive : Hive只作为存储元数据,Spark负责SQL解析优化,语法是Spark SQL语法,Spark负责采用RDD执行。
20.大表join小表的优化?+1-------------
1)MapJoin
join就是需要把两份数据关联起来,如果一份数据比较大(t1),一份比较小(t2),可以采用mapjoin,具体实现:把t2的全部数据一次性加载,t1的数据一条条处理,这样就没有shuffle过程,不需要reduce,是相比于reducejoin的效率要高。适合大表和小表进行关联。
如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即:在Reduce阶段完成join。容易发生数据倾斜。可以用MapJoin把小表全部加载到内存在map端进行join,避免reducer处理。默认是打开的,不要关闭。
21.hive有了分区为什么还要分桶?+1-------
分区:随着系统运行时间增长,表的数据量越来越大,使用分区技术可以指定条件,缩小数据扫描的范围,避免hive全表扫描,提升查询效率
分桶:单个分区或者表中的数据量越来越大,当分区不能更细粒的划分数据时,所以会采用分桶技术将数据更细粒度的划分和管理。
单个分区或者表中的数据量越来越大,当分区不能更细粒的划分数据时,所以会采用分桶技术将数据更细粒度的划分和管理。
22.datanode宕机恢复流程?
如果是短暂的宕机,可以实现写好脚本监控,将它启动起来。
如果是长时间宕机了,那么datanode上的数据应该已经被备份到其他机器了,那这台datanode就是一台新的datanode了,删除他的所有数据文件和状态文件,重新启动。
23.kafka重复消费数据怎么解决?+2-----
幂等性+ack-1+事务
Kafka数据重复,可以再下一级:SparkStreaming、redis或者hive中dwd层去重,去重的手段:分组、按照id开窗只取第一个值;
24.hive中map和reduce数是怎么确定的?
MapTask的个数=输入文件总大小/分片尺寸,个人理解就是输出的文件数量
计算reducer数的公式很简单:N=min(参数2,总输入数据量/参数1)
25.hbase的读写流程?
hbase读数据流程:
1)Client先访问zookeeper,获取hbase:meta表位于哪个Region Server。
2)访问对应的Region Server,获取hbase:meta表,根据读请求的namespace:table/rowkey,查询出目标数据位于哪个Region Server中的哪个Region中。并将该table的region信息以及meta表的位置信息缓存在客户端的meta cache,方便下次访问。
3)与目标Region Server进行通讯;
4)分别在MemStore和Store File(HFile)中查询目标数据,并将查到的所有数据进行合并。此处所有数据是指同一条数据的不同版本(time stamp)或者不同的类型(Put/Delete)。
5)将查询到的新的数据块(Block,HFile数据存储单元,默认大小为64KB)缓存到Block Cache。
6)将合并后的最终结果返回给客户端。
hbase写数据流程:
1)Client先访问zookeeper,获取hbase:meta表位于哪个Region Server。
2)访问对应的Region Server,获取hbase:meta表,根据读请求的namespace:table/rowkey,查询出目标数据位于哪个Region Server中的哪个Region中。并将该table的region信息以及meta表的位置信息缓存在客户端的meta cache,方便下次访问。
3)与目标Region Server进行通讯;
4)将数据顺序写入(追加)到WAL;
5)将数据写入对应的MemStore,数据会在MemStore进行排序;
6)向客户端发送ack;
7)等达到MemStore的刷写时机后,将数据刷写到HFile。
26.hadoop中combiner和partition的区别?+1------
combine和partition都是函数,中间的步骤应该只有shuffle!
combine分为map端和reduce端,作用是把同一个key的键值对合并在一起,可以自定义的。
combine函数把一个map函数产生的<key,value>对(多个key,value)合并成一个新的<key2,value2>.将新的<key2,value2>作为输入到reduce函数中
这个value2亦可称之为values,因为有多个。这个合并的目的是为了减少网络传输。
partition是分割map每个节点的结果,按照key分别映射给不同的reduce,也是可以自定义的。这里其实可以理解归类。
我们对于错综复杂的数据归类。比如在动物园里有牛羊鸡鸭鹅,他们都是混在一起的,但是到了晚上他们就各自牛回牛棚,羊回羊圈,鸡回鸡窝。partition的作用就是把这些数据归类。只不过在写程序的时候,mapreduce使用哈希HashPartitioner帮我们归类了。这个我们也可以自定义。
27.hadoop的优化?
1.hdfs
HDFS:做HA高可靠。如果磁盘空间充足,可以增加副本数。还有一些小文件合并。
hdfs的文件存储:比如可以压缩文件等。
2.mapreduce
1)配置层面
调整map数量
当集群的资源(cpu,内存)比较充足。可以适当将切片大小,调小。
这时候会增加map的数量,加快读取处理速度。
当集群的资源(cpu,内存)比较紧缺。可以适当将切片大小,调大。
这时候会减少map的数量,可以一定程度提高效率。
调整reduce数量
reduce的数量默认由分区决定,也可以直接手动设置reduce个数。
数据本地化
适当增加副本数(前提是磁盘空间充足)
2)程序层面
combine
可以减少shuffle过程。提高计算效率。
mapjoin
一个大表关联一个小表的时候。一般是512M以下。将小表广播复制到各个计算节点的内存中,用于匹配大表。
3.yarn
Hadoop2.0后才有的yarn
yarn默认的调度策略是:计算能力调度
默认的资源任务队列:default
通过增加队列的方式进行优化
通过修改配置文件,增加队列
修改配置文件capacity-scheduler.xml
修改mapred-site.xml
将修改后的两个配置文件发送到其他两个节点覆盖
28.有一千万条短信,有重复,以文本文件的形式保存,一行一条数据,请用五分钟时间,找出重复出现最多的前10条
解析: 对于本题来说,某些面试者想用数据库的办法实现,首先将文本导入数据库,再利用select 语句的方法得出前10 个短信。但实际上用数据库是绝对满足不了5分钟解决这个条件的。这是因为1千万条短信即使1秒钟导入1万条(这已经算是很快的数据导入了),5分钟才3 百万条,即便真的能在5分钟内录完1千万条,也必须先建索引,否则SQL语句在5 分钟内肯定得不出结果。但对1千万条记录建索引,在5 分钟内也不能完成。所以用数据库的办法不行。
这种类型的题之所以会出现,这是因为互联网公司每时每刻需要处理由用户产生的海量数据/日志,所以海量数据的题现在很热,互联网公司招聘时基本上都会考。重点考查求职者的数据结构设计与算法基本功。类似题目是如何根据关键词搜索访问最多的前10 个网站。
方法1: 用哈希表的方法。
可以将1千万条短信分成若干组,进行边扫描边建散列表的方法。第一次扫描,取首字节、尾字节、中间任意两字节作为Hash Code,插入到hash table中,并记录其地址、信息长度和重复次数。同hash code 且等长就疑似相同,比较一下。相同记录只加1次进hash table,但将重复次数加1。一次扫描以后,已经记录各自的重复次数,进行第二次hash table 的处理。用线性时间选择可在O(n)的级别上完成前10 条的寻找。分组后每组中的top10 必须保证各不相同,可用hash 来保证,也可直接按hash值的大小来分类。
方法2: 采用从小到大排序的办法。
根据经验,除非是群发的过节短信,否则字数越少的短信,出现重复的概率越高。建议从字数少的短信开始找起,比如一开始搜个字的短信,找出重复出现的top10 并分别记录出现次数,然后搜两个字的,以此类推。对于对相同字数的比较长的短信的搜索,除了hash 之类的算法外,可以选择只抽取头、中和尾等几个位置的字符进行粗判,因为此种判断方式是为了加快查找速度,但未必能得到真正期望的top10,因此,需要做标记,如此搜索一遍后,可以从各次top10结果中找到备选的top10,如果这次top10 中有刚才做过标记的,则对其对应字数的所有短信进行精确搜索,以找到真正的topl0 并再次比较。
方法3: 采用内存映射办法。
首先,1千万条短信按现在的短信长度将不会超过1GB 空间,使用内存映射文件比较合适,可以一次映射(如果有更大的数据量,可以采用分段映射),由于不需要频繁使用文件I/O 和频繁分配小内存,这将大大提高了數据的加载速度。其次,对每条短信的第i (i 从0到70) 个字母按ASCII码进行分组,也就是创建树。i是树的深度,也是短信第i 个字母。
该问题主要是解决两方面的内容,一是内容加载,二是短信内容的比较。采用文件内存映射技术可以解决内容加载的性能问题(不仅仅不需要调用文件I/O 函数,而且也不需要每读出一条短信都要分配一小块内存),而使用树技术可以有效地减少比较的次数。
29.hadoop的缺点?
HDFS的缺点
1)不适合低延时数据访问。
2)无法高效的对大量小文件进行存储
3)不支持并发写入、文件随机修改
mapreduce缺点:不擅长实时计算
不擅长流式计算
不擅长DAG(有向图)计算
Hadoop的缺点 1、Hadoop不适用于低延迟数据访问。 2、Hadoop不能高效存储大量小文件。 3、Hadoop不支持多用户写入并任意修改文件。
30.spark判断shffle的依据?
31.spark中partition有什么决定的?
partition 是指的spark在计算过程中,生成的数据在计算空间内最小单元,同一份数据(RDD)的partition 大小不一,数量不定,是根据application里的算子和最初读入的数据分块数量决定的,这也是为什么叫“弹性分布式”数据集的原因之一。
32.spark中rdd、dataframe、dataset区别?
1)RDD
优点:
编译时类型安全
编译时就能检查出类型错误
面向对象的编程风格
直接通过类名点的方式来操作数据
缺点:
序列化和反序列化的性能开销
无论是集群间的通信, 还是IO操作都需要对对象的结构和数据进行序列化和反序列化。
GC的性能开销,频繁的创建和销毁对象, 势必会增加GC
2)DataFrame
DataFrame引入了schema和off-heap
schema : RDD每一行的数据, 结构都是一样的,这个结构就存储在schema中。 Spark通过schema就能够读懂数据, 因此在通信和IO时就只需要序列化和反序列化数据, 而结构的部分就可以省略了。
3)DataSet
DataFrame可以理解为是dataSet的一个特例,dataSet,dataSet是强类型的,dataframe是弱类型的.
DataSet结合了RDD和DataFrame的优点,并带来的一个新的概念Encoder。
当序列化数据时,Encoder产生字节码与off-heap进行交互,能够达到按需访问数据的效果,而不用反序列化整个对象。Spark还没有提供自定义Encoder的API,但是未来会加入。
33.spark中创建rdd的方式?
使用程序中的集合创建RDD
使用本地文件创建RDD
使用HDFS文件创建RDD
34.hbase的过滤器有哪些?
HBase 内置过滤器可以分为三类:分别是比较过滤器,专用过滤器和包装过滤器。
35.kafka如何保证数据的一致性?
数据一致性保证
一致性定义:若某条消息对client可见,那么即使Leader挂了,在新Leader上数据依然可以被读到
HW-HighWaterMark: client可以从Leader读到的最大msg offset,即对外可见的最大offset, HW=max(replica.offset)
对于Leader新收到的msg,client不能立刻消费,Leader会等待该消息被所有ISR中的replica同步后,更新HW,此时该消息才能被client消费,这样就保证了如果Leader fail,该消息仍然可以从新选举的Leader中获取。
对于来自内部Broker的读取请求,没有HW的限制。同时,Follower也会维护一份自己的HW,Folloer.HW = min(Leader.HW, Follower.offset)
36.kafka如何保证数据的负载均衡?
分区器:
分区器是生产者层面的负载均衡。Kafka 生产者生产消息时,根据分区器将消息投递到 指定的分区中,所以 Kafka 的负载均衡很大程度上依赖于分区器。
Kafka 默认的分区器是 Kafka 提供的 DefaultPartitioner。它的分区策略是根据 Key 值进 行分区分配的:
如果 key 不为 null:对 Key 值进行 Hash 计算,从所有分区中根据 Key 的 Hash 值计 算出一个分区号;拥有相同 Key 值的消息被写入同一个分区;
如果 key 为 null:消息将以轮询的方式,在所有可用分区中分别写入消息。
如果不想使用 Kafka 默认的分区器,用户可以实现 Partitioner 接口,自行实现分区方 法。
注:
分区器的负载均衡与顺序性有着一定程度上的矛盾。
负载均衡的目的是将消息尽可能平均分配,对于 Kafka 而言,就是尽可能将消息平均分 配给所有分区;
如果使用 Kafka 保证顺序性,则需要利用到 Kafka 的分区顺序性的特性。对于需要保 证顺序性的场景,通常会利用 Key 值实现分区顺序性,那么所有 Key 值相同的消 息就会进入同一个分区。这样的情况下,对于大量拥有相同 Key 值的消息,会涌 入同一个分区,导致一个分区消息过多,其他分区没有消息的情况,即与负载均衡 的思想相悖。
并非分区数量越多,效率越高:
Topic 每个 partition 在 Kafka 路径下都有一个自己的目录,该目录下有两个主要的文 件:base_offset.log 和 base_offset.index。Kafka 服务端的 ReplicaManager 会为每 个 Broker 节点保存每个分区的这两个文件的文件句柄。所以如果分区过多, ReplicaManager 需要保持打开状态的文件句柄数也就会很多。
每个 Producer, Consumer 进程都会为分区缓存消息,如果分区过多,缓存的消息越多, 占用的内存就越大;
n 个分区有 1 个 Leader,(n-1) 个 Follower,如果运行过程中 Leader 挂了,则会从剩 余 (n-1) 个 Followers 中选举新 Leader;如果有成千上万个分区,那么需要很长时 间的选举,消耗较大的性能。
再均衡
再均衡是消费者层面的负载均衡
37.hbase的两种compaction?
1.Minor compaction:只用来做部分文件的合并操作以及包括minVersion=0并且设置ttl的过期版本清理,不做任何删除数据、多版本数据的清理工作。
2.Major compaction:是对Region下的HStore的所有StoreFile执行合并操作,最终的结果是整理合并出一个文件。
38.简述你知道的spark调优?
1.避免创建重复的RDD;
2.尽可能使用同一个RDD
3.对多次使用的RDD进行持久化
4.尽量避免使用shuffle类算子
5.使用map-side预聚合的shuffle操作
6.使用高性能的算子
7.广播大变量
8.使用Kryo优化序列化性能: java Kryo
9.优化数据结构:
对象,字符串,集合都比较占用内存
字符串代替对象
数组代替集合
使用原始类型(比如Int、Long)替代字符串
优化数据结构
10.资源调优
11.spark调优数据倾斜调优:map filter
39.spark和flink的区别?
1.流和微批:
flink一切都是流
Spark Streaming相当于把流处理转换为批处理,spark Streaming 是伪实时处理,处理延迟一定是秒级别的,不能再低了;
所以对于延迟性非常高的场景必须要用Flink。
2.数据模型 :
spark 采用 RDD 模型,spark streaming 的 DStream 实际上也就是一组 组小批数 据 RDD 的集合;
flink 基本数据模型是数据流,以及事件(Event)序列。
3.运行时架构:
spark 是批计算,将 DAG 划分为不同的 stage,一个完成后才可以计算下一个 ;
flink 是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节点进行处理。
40.HRegionserver的作用?
Region Server为 Region的管理者,其实现类为HRegionServer,主要作用如下:
对于数据的操作:get, put, delete;
对于Region的操作:splitRegion、compactRegion。
41.kafka消费积压怎么处理?
1)如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。(两者缺一不可)
2)如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压。
42.hive的元数据是如何进行管理的?
Metastore
元数据包括:表名、表所属的数据库(默认是default)、表的拥有者、列/分区字段、表的类型(是否是外部表)、表的数据所在目录等;
默认存储在自带的derby数据库中,推荐使用MySQL存储Metastore
43.hive中小文件是如何进行处理的?
(1)在Map执行前合并小文件,减少Map数
(2)merge 输出合并小文件
(3)开启JVM重用
44.shuffle为什么会影响性能?
由于 Shuffle 阶段涉及磁盘的读写和网络传输,因此 Shuffle 的性能直接影响整个程序的性能和吞吐量。
45.hive的执行流程?
Hive通过给用户提供的一系列交互接口,接收到用户的指令(SQL),使用自己的Driver,结合元数据(MetaStore),将这些指令翻译成MapReduce,提交到Hadoop中执行,最后,将执行返回的结果输出到用户交互接口。
46.yarn的执行流程?
(1)MR程序提交到客户端所在的节点。
(2)YarnRunner向ResourceManager申请一个Application。
(3)RM将该应用程序的资源路径返回给YarnRunner。
(4)该程序将运行所需资源提交到HDFS上。
(5)程序资源提交完毕后,申请运行mrAppMaster。
(6)RM将用户的请求初始化成一个Task。
(7)其中一个NodeManager领取到Task任务。
(8)该NodeManager创建容器Container,并产生MRAppmaster。
(9)Container从HDFS上拷贝资源到本地。
(10)MRAppmaster向RM 申请运行MapTask资源。
(11)RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
(12)MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。
(13)MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
(14)ReduceTask向MapTask获取相应分区的数据。
(15)程序运行完毕后,MR会向RM申请注销自己。
47.spark的运行流程?
1)构建 Spark Application 的运行环境(启动 SparkContext),SparkContext 向 Cluster Manager 注册,并申请运行 Executor 资源。
2)Cluster Manager 为 Executor 分配资源并启动 Executor 进程,Executor 运行情况将随着“心跳”发送到 Cluster Manager 上。
图 2 Spark 运行基本流程图
3)SparkContext 构建 DAG 图,将 DAG 图分解成多个 Stage,并把每个 Stage 的 TaskSet(任务集)发送给 Task Scheduler (任务调度器)。Executor 向 SparkContext 申请 Task, Task Scheduler 将 Task 发放给 Executor,同时,SparkContext 将应用程序代码发放给 Executor。
4)Task 在 Executor 上运行,把执行结果反馈给 Task Scheduler,然后再反馈给 DAG Scheduler。运行完毕后写入数据,SparkContext 向 ClusterManager 注销并释放所有资源。
48.kafka如何保证不丢失数据?
Ack=0,相当于异步发送,消息发送完毕即offset增加,继续生产。
Ack=1,leader收到leader replica 对一个消息的接受ack才增加offset,然后继续生产。
Ack=-1,leader收到所有replica 对一个消息的接受ack才增加offset,然后继续生产。
49.hbase的查询效率如何进行提升?
1、使用bloomfilter和mapfile_index_interval
2、 hbase对于内存有特别的嗜好,在硬件允许的情况下配足够多的内存给它。
通过修改hbase-env.sh中的
export HBASE_HEAPSIZE=3000 #这里默认为1000m
3、修改java虚拟机属性
(1)、在环境允许的情况下换64位的
(2)、替换掉默认的垃圾回收器,因为默认的垃圾回收器在多线程环境下会有更多的wait 等待
export HBASE_OPTS="-server -XX:NewSize=6m -XX:MaxNewSize=6m -XX:+UseConcMarkSweepGC -XX:+CMSIncrementalMode"
4、增大RPC数量
通过修改hbase-site.xml中的
hbase.region.handler.count属性,可以适当的放大。默认值为10有点小
5、做程序开发是注意的地方
(1)、需要判断所求的数据行是否存在时,尽量不要用HTable.exists(final byte [] row) 而用 HTable.exists(final byte [] row, final byte[] column)等带列族的方法替代。
(2)、不要使用HTable.get(final byte [] row, final byte [] column) == null来判断所求的数据 存在,而是用HTable.exists(final byte [] row, final byte[] column)替代
(3)、HTable.close()方法少用.因为我遇到过一些很令人费解的错误
6、记住HBase是基于列模式的存储,如果一个列族能搞定就不要把它分开成两个,的那套 在这里很不实用.分成多个列来存储会浪费更多的空间,除非你认为现在的硬盘和白菜一个 价。
7、如果数据量没有达到TB级别或者没有上亿条记录,很难发挥HBase的优势,建议换关系 数据库或别的存储技术。
50.spark部署方式有哪些?
1)Local:运行在一台机器上,通常是练手或者测试环境。
2)Standalone:构建一个基于Mster+Slaves的资源调度集群,Spark任务提交给Master运行。是Spark自身的一个调度系统。
3)Yarn: Spark客户端直接连接Yarn,不需要额外构建Spark集群。有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。
4)Mesos:国内大环境比较少用。
51.spark如何处理数据倾斜?
52.hdfs作用是什么?
hdfs在hadoop中的作用是为海量的数据提供了存储,能提供高吞吐量的数据访问,HDFS有高容错性的特点,并且设计用来部署在低廉的硬件上;而且它提供高吞吐量来访问应用程序的数据,适合那些有着超大数据集的应用程序。
对外部客户机而言,HDFS就像一个传统的分级文件系统。可以创建、删除、移动或重命名文件,等等。但是 HDFS 的架构是基于一组特定的节点构建的(参见图 1),这是由它自身的特点决定的。这些节点包括 NameNode(仅一个),它在 HDFS 内部提供元数据服务;DataNode,它为 HDFS 提供存储块。由于仅存在一个 NameNode,因此这是 HDFS 1.x版本的一个缺点(单点失败)。在Hadoop 2.x版本可以存在两个NameNode,解决了单节点故障问题。
存储在 HDFS 中的文件被分成块,然后将这些块复制到多个计算机中(DataNode)。这与传统的 RAID 架构大不相同。块的大小(1.x版本默认为 64MB,2.x版本默认为128MB)和复制的块数量在创建文件时由客户机决定。NameNode 可以控制所有文件操作。HDFS 内部的所有通信都基于标准的 TCP/IP 协议。
53.spark中cache和checkpoint区别?
54.hql转换为mr的过程?
了解了MapReduce实现SQL基本操作之后,我们来看看Hive是如何将SQL转化为MapReduce任务的,整个编译过程分为六个阶段:
Antlr定义SQL的语法规则,完成SQL词法,语法解析,将SQL转化为抽象语法树AST Tree (做语法解析,语法检查,转化为ast tree)
遍历AST Tree,抽象出查询的基本组成单元QueryBlock
遍历QueryBlock,翻译为执行操作树OperatorTree
4.逻辑层优化器进行OperatorTree变换,合并不必要的ReduceSinkOperator,减少shuffle数据量,把逻辑计划进行优化,得到优化后的逻辑计划
5.遍历OperatorTree,翻译为MapReduce任务,得到物理计划
6.物理层优化器进行MapReduce任务的变换,生成最终的执行计划
55.hive中压缩格式有啥区别?
压缩格式 压缩率 压缩速度 是否可切分
gzip 很高 比较快 否
lzo 比较高 很快 是
snappy 比较高 很快 否
bzip2 最高 慢 是
56.kafka分区和消费者的关系?
每个分区只能由同一个消费组内的一个消费者(consumer)来消费,可以由不同的消费组的消费者来消费,同组的消费者则起到并发的效果。
57.什么是共享变量什么是累加器?
Spark两种共享变量:广播变量与累加器
累加器用来对信息进行聚合,而广播变量用来高效分发较大的对象。
共享变量出现的原因:
通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。Spark 的两个共享变量,累加器与广播变量,分别为结果聚合与广播这两种常见的通信模式突破了这一限制。
58.sparkcontext作用?
SparkContext是开发Spark应用的入口,它负责和整个集群的交互,包括创建RDD等。
从本质上来说,SparkContext是Spark的对外接口,负责向调用这提供Spark的各种功能。
59.广播变量的用途?
将通用的数据抽取出来,减少资源的消耗。
如果Executor端需要访问Driver端的某个变量,spark会向Executor端每个task都发送一个此变量的副本。如果此变量很大,就会占用大量的Executor节点的内存。利用广播变量,spark只会给一个Executor节点发送一个变量。可以减少资源的消耗。
60.mapreduce的执行流程?
- 提交作业
客户端向 JobTracker 提交作业。首先,用户需要将所有应该配置的参数根据需求配置好。作业提交之后,就会进入自动化执行。在这个过程中,用户只能监控程序的执行情况和强制中断作业,但是不能对作业的执行过程进行任何干预。提交作业的基本过程如下。
1)客户端通过 Runjob() 方法启动作业提交过程。
2)客户端通过 JobTracker 的 getNewJobId() 请求一个新的作业 ID。
3)客户端检查作业的输出说明,计算作业的输入分片等,如果有问题,就抛出异常,如果正常,就将运行作业所需的资源(如作业 Jar 文件,配置文件,计算所得的输入分片等)复制到一个以作业 ID 命名的目录中。
4)通过调用 JobTracker 的 submitjob() 方法告知作业准备执行。
- 初始化作业
JobTracker 在 JobTracker 端开始初始化工作,包括在其内存里建立一系列数据结构,来记录这个 Job 的运行情况。
1)JobTracker 接收到对其 submitJob() 方法的调用后,就会把这个调用放入一个内部队列中,交由作业调度器进行调度。初始化主要是创建一个表示正在运行作业的对象,以便跟踪任务的状态和进程。
2)为了创建任务运行列表,作业调度器首先从 HDFS 中获取 JobClient 已计算好的输入分片信息,然后为每个分片创建一个 MapTask,并且创建 ReduceTask。
- 分配任务
JobTracker 会向 HDFS 的 NameNode 询问有关数据在哪些文件里面,这些文件分别散落在哪些结点里面。JobTracker 需要按照“就近运行”原则分配任务。
TaskTracker 定期通过“心跳”与 JobTracker 进行通信,主要是告知 JobTracker 自身是否还存活,以及是否已经准备好运行新的任务等。
JobTracker 接收到心跳信息后,如果有待分配的任务,就会为 TaskTracker 分配一个任务,并将分配信息封装在心跳通信的返回值中返回给 TaskTracker。
对于 Map 任务,JobTracker 通常会选取一个距离其输入分片最近的 TaskTracker,对于 Reduce 任务,JobTracker 则无法考虑数据的本地化。
4.执行任务
1)TaskTracker 分配到一个任务后,通过 HDFS 把作业的 Jar 文件复制到 TaskTracker 所在的文件系统,同时,TaskTracker 将应用程序所需要的全部文件从分布式缓存复制到本地磁盘。TaskTracker 为任务新建一个本地工作目录,并把 Jar 文件中的内容解压到这个文件夹中。
2)TaskTracker 启动一个新的 JVM 来运行每个任务(包括 Map 任务和 Reduce 任务),这样,JobClient 的 MapReduce 就不会影响 TaskTracker 守护进程。任务的子进程每隔几秒便告知父进程它的进度,直到任务完成。
- 进程和状态的更新
一个作业和它的每个任务都有一个状态信息,包括作业或任务的运行状态,Map 任务和 Reduce 任务的进度,计数器值,状态消息或描述。任务在运行时,对其进度保持追踪。
这些消息通过一定的时间间隔由 ChildJVM 向 TaskTracker 汇聚,然后再向 JobTracker 汇聚。JobTracker 将产生一个表明所有运行作业及其任务状态的全局视图,用户可以通过 Web UI 进行查看。JobClient 通过每秒查询 JobTracker 来获得最新状态,并且输出到控制台上。
- 作业的完成
当 JobTracker 接收到的这次作业的最后一个任务已经完成时,它会将 Job 的状态改为“successful”。当 JobClient 获取到作业的状态时,就知道该作业已经成功完成,然后 JobClient 打印信息告知用户作业已成功结束,最后从 Runjob() 方法返回。
61.namenode如何对元数据进行管理?
(1)第一次启动NameNode格式化后,创建Fsimage和Edits文件,如果不是第一次启动,直接加载编辑日志和镜像文件到内存
(2)客户端对元数据进行增删改查的请求
(3)NameNode记录操作日志,更新滚动日志
(4)NameNode在内存中对数据进行增删改
62.hbase的切分策略?
Region的分割操作是不可见的,因为Master不会参与其中。RegionServer拆分region的步骤是,先将该region下线,然后拆分,将其子region加入到META元信息中,再将他们加入到原本的RegionServer中,最后汇报Master。
63.hMaster的作用?
为HRegionServer分配HRegion
2.负责HRegionServer的负载均衡
3.发现失效的HRegionServer并重新分配
4.HDFS上的垃圾文件回收
5.处理Schema更新请求
64.sparkstreaming的偏移量怎么维护的?
65.spark作业提交流程?
1.客户端提交作业
2.Driver启动流程
3.Driver申请资源并启动其余Executor(即Container)
4.Executor启动流程
5.作业调度,生成stages与tasks。
6.Task调度到Executor上,Executor启动线程执行Task逻辑
7.Driver管理Task状态
8.Task完成,Stage完成,作业完成
66.spark如何划分stage?
Stage:根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage。
Task:Stage是一个TaskSet,将Stage根据分区数划分成一个个的Task。
67.hbase的事务有哪些?
HBase目前只支持行级事务;
可以保证行级数据的原子性、一致性、隔离性以及持久性
68.hive优化?
Mapjoin
行列过滤
列式存储
采用分区技术
合理设置map数
合理设置reduce数
解决小文件产生
开启map端combiner
压缩
采用tez引擎或spark引擎
69.hive在执行哪些查询不会执行mr?
hive 为了执行效率考虑,简单的查询,就是只是select,不带count,sum,group by这样的,都不走map/reduce,直接读取hdfs目录中的文件进行filter过滤。
70.yarn的资源调度策略?
FIFO调度器:支持单队列 、先进先出 生产环境不会用。
容量调度器:支持多队列,保证先进入的任务优先执行。
公平调度器:支持多队列,保证每个任务公平享有队列资源。
71.宽依赖和窄依赖的区别?
宽依赖:父Rdd 的一个partition 对应 多个子Rdd 的partition
窄依赖:父Rdd 的一个partition 对应 一个子Rdd 的partition
72.hadoop出现数据倾斜怎么解决?
1)提前在map进行combine,减少传输的数据量
在Mapper加上combiner相当于提前进行reduce,即把一个Mapper中的相同key进行了聚合,减少shuffle过程中传输的数据量,以及Reducer端的计算量。
如果导致数据倾斜的key大量分布在不同的mapper的时候,这种方法就不是很有效了。
2)导致数据倾斜的key 大量分布在不同的mapper
(1)局部聚合加全局聚合。
第一次在map阶段对那些导致了数据倾斜的key 加上1到n的随机前缀,这样本来相同的key 也会被分到多个Reducer中进行局部聚合,数量就会大大降低。
第二次mapreduce,去掉key的随机前缀,进行全局聚合。
思想:二次mr,第一次将key随机散列到不同reducer进行处理达到负载均衡目的。第二次再根据去掉key的随机前缀,按原key进行reduce处理。
这个方法进行两次mapreduce,性能稍差。
(2)增加Reducer,提升并行度
JobConf.setNumReduceTasks(int)
(3)实现自定义分区
根据数据分布情况,自定义散列函数,将key均匀分配到不同Reducer
73.kafka如何保证不丢失数据?
Ack=0,相当于异步发送,消息发送完毕即offset增加,继续生产。
Ack=1,leader收到leader replica 对一个消息的接受ack才增加offset,然后继续生产。
Ack=-1,leader收到所有replica 对一个消息的接受ack才增加offset,然后继续生产。