- Flume拦截器
- 拦截器注意事项
项目中自定义了:ETL拦截器和区分类型拦截器。
采用两个拦截器的优缺点:优点,模块化开发和可移植性;缺点,性能会低一些
- 自定义拦截器步骤
- 实现 Interceptor
- 重写四个方法
- initialize 初始化
- public Event intercept(Event event) 处理单个Event
- public List<Event> intercept(List<Event> events) 处理多个Event,在这个方法中调用Event intercept(Event event)
- close 方法
- 静态内部类,实现Interceptor.Builder
Ganglia监控
加州伯克利大学千禧计划的其中一个开源项目.是一个集群汇总监控用的的软件,和Cacti不同,cacti是详细监控集群中每台服务器的运行状态,而Ganglia是将集群中的服务器数据进行汇总然后监控。有时通过cacti或者zabbix(监控软件)看不出来的集群总体负载问题,却能够在Ganglia中体现。被监控的主机(即client)安装ganglia-gmond并启动该进程。服务器端需要安装gmetad和web程序。大致大构图如下:
参考链接:https://www.cnblogs.com/yinzhengjie/p/9798739.html
不会,Channel存储可以存储在File中,数据传输自身有事务。
开发中在flume-env.sh中设置JVM heap为4G或更高,部署在单独的服务器上(4核8线程16G内存)
-Xmx(设定程序运行期间最大可占用的内存大小)与-Xms(设定程序启动时占用内存大小)最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。
通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
官方说明如下:
checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据
- HDFS存入大量小文件,有什么影响?
元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命
计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。
- HDFS小文件处理
官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0,hdfs.roundValue=10,hdfs.roundUnit= second几个参数综合作用,效果如下:
(1)tmp文件在达到128M时会滚动生成正式文件
(2)tmp文件创建超10分时会滚动生成正式文件
举例:在2018-01-01 05:23的时侯sink接收到数据,那会产生如下tmp文件:
/itcast/20180101/itcast.201801010520.tmp
即使文件内容没有达到128M,也会在05:33时滚动生成正式文件
Kafka官方自带压力测试脚本(kafka-consumer-perf-test.sh、kafka-producer-perf-test.sh)。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO达到瓶颈。
Kafka机器数量=2*(峰值生产速度*副本数/100)+1
7天
每天的数据量*7天
公司自己开发的监控器;
开源的监控器:KafkaManager、KafkaMonitor
分区数并不是越多越好,一般分区数不要超过集群机器数量。分区数越多占用内存越大(ISR等),一个节点集中的分区也就越多,当它宕机的时候,对系统的影响也就越大。
分区数一般设置为:3-10个
一般我们设置成2个或3个,很多企业设置为2个。
通常情况:多少个日志类型就多少个Topic。也有对日志类型进行合并的。
Ack=0,相当于异步发送,消息发送完毕即offset增加,继续生产。
Ack=1,leader收到leader replica 对一个消息的接受ack才增加offset,然后继续生产。
Ack=-1,leader收到所有replica 对一个消息的接受ack才增加offset,然后继续生产。
ISR(In-Sync Replicas),副本同步队列。ISR中包括Leader和Follower。如果Leader进程挂掉,会在ISR队列中选择一个服务作为新的Leader。有replica.lag.max.messages(延迟条数)和replica.lag.time.max.ms(延迟时间)两个参数决定一台服务是否可以加入ISR副本队列,在0.10版本移除了replica.lag.max.messages参数,防止服务频繁的进去队列。
任意一个维度超过阈值都会把Follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也会先存放在OSR中。
每天总数据量100g,每天产生1亿条日志, 10000万/24/60/60=1150条/每秒钟
平均每秒钟:1150条
低谷每秒钟:400条
高峰每秒钟:1150条*(2-20倍)=2300条->23000条
每条日志大小:0.5k-2k
每秒多少数据量:2.3M-20MB
- Flume记录
- 日志有记录
- 短期没事
- 如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。(两者缺一不可)
- 如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压。
- Kafka的再平衡机制
- 什么是再平衡
所谓的再平衡,指的是在kafka consumer所订阅的topic发生变化时发生的一种分区重分配机制。一般有三种情况会触发再平衡:
- consumer group中的新增或删除某个consumer,导致其所消费的分区需要分配到组内其他的consumer上;
- consumer订阅的topic发生变化,比如订阅的topic采用的是正则表达式的形式,如test-*此时如果有一个新建了一个topic test-user,那么这个topic的所有分区也是会自动分配给当前的consumer的,此时就会发生再平衡;
- consumer所订阅的topic发生了新增分区的行为,那么新增的分区就会分配给当前的consumer,此时就会触发再平衡。
Kafka提供的再平衡策略主要有三种:Round Robin,Range和Sticky,默认使用Range。这三种分配策略的主要区别在于:
- Round Robin:会采用轮询的方式将当前所有的分区依次分配给所有的consumer;
- Range:首先会计算每个consumer可以消费的分区个数,然后按照顺序将指定个数范围的分区分配给各个consumer;
- Sticky:这种分区策略是最新版本中新增的一种策略,其主要实现了两个目的:
- 将现有的分区尽可能均衡的分配给各个consumer,存在此目的的原因在于Round Robin和Range分配策略实际上都会导致某几个consumer承载过多的分区,从而导致消费压力不均衡;
- 如果发生再平衡,那么在重新分配前的基础上会尽力保证当前未宕机的consumer所消费的分区不会被分配给其他的consumer上;
本文主要会通过几个示例来对上面讲解的三种分区重分配策略的基本实现原理进行讲解。
- Round Robin
关于Round Robin重分配策略,其主要采用的是一种轮询的方式分配所有的分区,该策略主要实现的步骤如下。这里我们首先假设有三个topic:t0、t1和t2,这三个topic拥有的分区数分别为1、2和3,那么总共有六个分区,这六个分区分别为:t0-0、t1-0、t1-1、t2-0、t2-1和t2-2。这里假设我们有三个consumer:C0、C1和C2,它们订阅情况为:
C0订阅t0,C1订阅t0和t1,C2订阅t0、t1和t2。那么这些分区的分配步骤如下:
- 首先将所有的partition和consumer按照字典序进行排序,所谓的字典序,就是按照其名称的字符串顺序,那么上面的六个分区和三个consumer排序之后分别为:
- 然后依次以按顺序轮询的方式将这六个分区分配给三个consumer,如果当前consumer没有订阅当前分区所在的topic,则轮询的判断下一个consumer:
- 尝试将t0-0分配给C0,由于C0订阅了t0,因而可以分配成功;
- 尝试将t1-0分配给C1,由于C1订阅了t1,因而可以分配成功;
- 尝试将t1-1分配给C2,由于C2订阅了t1,因而可以分配成功;
- 尝试将t2-0分配给C0,由于C0没有订阅t2,因而会轮询下一个consumer;
- 尝试将t2-0分配给C1,由于C1没有订阅t2,因而会轮询下一个consumer;
- 尝试将t2-0分配给C2,由于C2订阅了t2,因而可以分配成功;
- 同理由于t2-1和t2-2所在的topic都没有被C0和C1所订阅,因而都不会分配成功,最终都会分配给C2。
按照上述的步骤将所有的分区都分配完毕之后,最终分区的订阅情况如下:
从上面的步骤分析可以看出,轮询的策略就是简单的将所有的partition和consumer按照字典序进行排序之后,然后依次将partition分配给各个consumer,如果当前的consumer没有订阅当前的partition,那么就会轮询下一个consumer,直至最终将所有的分区都分配完毕。但是从上面的分配结果可以看出,轮询的方式会导致每个consumer所承载的分区数量不一致,从而导致各个consumer压力不均一。
- Range(默认策略)
所谓的Range重分配策略,就是首先会计算各个consumer将会承载的分区数量,然后将指定数量的分区分配给该consumer。这里我们假设有两个consumer:C0和C1,两个topic:t0和t1,这两个topic分别都有三个分区,那么总共的分区有六个:t0-0、t0-1、t0-2、t1-0、t1-1和t1-2。那么Range分配策略将会按照如下步骤进行分区的分配:
- 需要注意的是,Range策略是按照topic依次进行分配的,比如我们以t0进行讲解,其首先会获取t0的所有分区:t0-0、t0-1和t0-2,以及所有订阅了该topic的consumer:C0和C1,并且会将这些分区和consumer按照字典序进行排序;
- 然后按照平均分配的方式计算每个consumer会得到多少个分区,如果没有除尽,则会将多出来的分区依次计算到前面几个consumer。比如这里是三个分区和两个consumer,那么每个consumer至少会得到1个分区,而3除以2后还余1,那么就会将多余的部分依次算到前面几个consumer,也就是这里的1会分配给第一个consumer,总结来说,那么C0将会从第0个分区开始,分配2个分区,而C1将会从第2个分区开始,分配1个分区;
- 同理,按照上面的步骤依次进行后面的topic的分配。
最终上面六个分区的分配情况如下:
可以看到,如果按照Range分区方式进行分配,其本质上是依次遍历每个topic,然后将这些topic的分区按照其所订阅的consumer数量进行平均的范围分配。这种方式从计算原理上就会导致排序在前面的consumer分配到更多的分区,从而导致各个consumer的压力不均衡。
- Sticky
Sticky策略是新版本中新增的策略,顾名思义,这种策略会保证再分配时已经分配过的分区尽量保证其能够继续由当前正在消费的consumer继续消费,当然,前提是每个consumer所分配的分区数量都大致相同,这样能够保证每个consumer消费压力比较均衡。关于这种分配方式的分配策略,我们分两种情况进行讲解,即初始状态的分配和某个consumer宕机时的分配情况。
- 初始分配
初始状态分配的特点是,所有的分区都还未分配到任意一个consumer上。这里我们假设有三个consumer:C0、C1和C2,三个topic:t0、t1和t2,这三个topic分别有1、2和3个分区,那么总共的分区为:t0-0、t1-0、t1-1、t2-0、t2-1和t2-2。关于订阅情况,这里C0订阅了t0,C1订阅了t0和1,C2则订阅了t0、t1和t2。这里的分区分配规则如下:
- 首先将所有的分区进行排序,排序方式为:首先按照当前分区所分配的consumer数量从低到高进行排序,如果consumer数量相同,则按照分区的字典序进行排序。这里六个分区由于所在的topic的订阅情况各不相同,因而其排序结果如下:
- 然后将所有的consumer进行排序,其排序方式为:首先按照当前consumer已经分配的分区数量有小到大排序,如果两个consumer分配的分区数量相同,则会按照其名称的字典序进行排序。由于初始时,这三个consumer都没有分配任何分区,因而其排序结果即为其按照字典序进行排序的结果:
- 然后将各个分区依次遍历分配给各个consumer,首先需要注意的是,这里的遍历并不是C0分配完了再分配给C1,而是每次分配分区的时候都整个的对所有的consumer从头开始遍历分配,如果当前consumer没有订阅当前分区,则会遍历下一个consumer。然后需要注意的是,在整个分配的过程中,各个consumer所分配的分区数是动态变化的,而这种变化是会体现在各个consumer的排序上的,比如初始时C0是排在第一个的,此时如果分配了一个分区给C0,那么C0就会排到最后,因为其拥有的分区数是最多的。上面的六个分区整体的分配流程如下:
- 首先将t2-0尝试分配给C0,由于C0没有订阅t2,因而分配不成功,继续轮询下一个consumer;
- 然后将t2-0尝试分配给C1,由于C1没有订阅t2,因而分配不成功,继续轮询下一个consumer;
- 接着将t2-0尝试分配给C2,由于C2订阅了t2,因而分配成功,此时由于C2分配的分区数发生变化,各个consumer变更后的排序结果为:
- 接下来的t2-1和t2-2,由于也只有C2订阅了t2,因而其最终还是会分配给C2,最终在t2-0、t2-1和t2-2分配完之后,各个consumer的排序以及其分区分配情况如下:
- 接着继续分配t1-0,首先尝试将其分配给C0,由于C0没有订阅t1,因而分配不成功,继续轮询下一个consumer;
- 然后尝试将t1-0分配给C1,由于C1订阅了t1,因而分配成功,此时各个consumer以及其分配的分区情况如下:
- 同理,接下来会分配t1-1,虽然C1和C2都订阅了t1,但是由于C1排在C2前面,因而该分区会分配给C1,即:
- 最后,尝试将t0-0分配给C0,由于C0订阅了t0,因而分配成功,最终的分配结果为:
上面的分配过程中,需要始终注意的是,虽然示例中的consumer顺序始终没有变化,但这是由于各个分区分配之后正好每个consumer所分配的分区数量的排序结果与初始状态一致。这里读者也可以比较一下这种分配方式与前面讲解的Round Robin进行对比,可以很明显的发现,Sticky重分配策略分配得更加均匀一些。
- 模拟consumer宕机
由于前一个示例中最终的分区分配方式模拟宕机的情形比较简单,因而我们使用另一种订阅策略。这里我们的示例的consumer有三个:C0、C1和C2,topic有四个:t0、t1、t2和t3,每个topic都有两个分区,那么总的分区有:t0-0、t0-1、t1-0、t1-1、t2-0、t2-1、t3-0和t3-1。这里的订阅情况为三个consumer订阅所有的主题,那么如果按照Sticky的分区分配策略,初始状态时,分配情况如下,读者可以按照前一示例讲解的方式进行推算:
这里我们假设在消费的过程中,C1发生了宕机,此时就会发生再平衡,而根据Sticky策略,其再分配步骤如下:
- 首先会将宕机之后未分配的分区进行排序,排序方式为:首先按照分区所拥有的consumer数量从低到高进行排序,如果consumer数量相同,则按照分区的字典序进行排序。这里需要注意的是,由于只有C1宕机,因而未分配的分区为:t0-1、t2-0和t3-1,排序之后的结果为:
- 然后将所有的consumer进行排序,排序方式为:首先将consumer按照其所拥有的consumer数量从小到大排序,如果数量相同,则按照consumer名称的字典序进行排序,排序结果如下:
- 接着依次遍历各个分区,将其分配给各个consumer,需要注意的是,在分配的过程中,consumer所分配的分区数量是在变化的,而这种变化是会反应在consumer的排序上的:
- 首先尝试将t0-1分配给C2,由于C2订阅了t0,因而可以分配成功,此时consumer排序和分区分配情况如下,需要注意的是,虽然分配之后,C2和C0的分区数量相同,但是由于按照字典序,C0在C2前面,因而排序情况还是会发生变化:
- 然后尝试将t2-0分配给C0,由于C0订阅了t2,因而分配可以成功,此时consumer排序和分区分配情况如下:
- 最后尝试分配t3-1给C2,由于C2订阅了t3,因而分配可以成功,此时consumer排序与分区分配情况如下:
在上面的分区分配过程中,我们可以看到,由于分区的不断分配,各个consumer所拥有的分区数量也在不断变化,因而其排序情况也在变化,但是最终可以看到,各个分区是均匀的分配到各个consumer的,并且还保证了当前consumer已经消费的分区是不会分配到其他的consumer上的。
Hive 和数据库除了拥有类似的查询语言,再无类似之处。
- 数据存储位置
Hive存储在HDFS。数据库将数据保存在块设备或者本地文件系统中。
- 数据更新
Hive中不建议对数据的改写。而数据库中的数据通常是需要经常进行修改的,
- 执行延迟
Hive执行延迟较高。数据库的执行延迟较低。当然,这个是有条件的,即数据规模较小,当数据规模大到超过数据库的处理能力的时候,Hive的并行计算显然能体现出优势。
- 数据规模
Hive支持很大规模的数据计算;数据库可以支持的数据规模较小。
- 内部表:当我们删除一个内部表时,Hive也会删除这个表中数据。内部表不适合和其他工具共享数据。
- 外部表:删除该表并不会删除掉原始数据,删除的是表的元数据
- Sort By:分区内有序;
- Order By:全局排序,只有一个Reducer;
- Distrbute By:类似MR中Partition,进行分区,结合sort by使用。
- Cluster By:当Distribute by和Sorts by字段相同时,可以使用Cluster by方式。Cluster by除了具有Distribute by的功能外还兼具Sort by的功能。但是排序只能是升序排序,不能指定排序规则为ASC或者DESC。
RANK() 排序相同时会重复,总数不会变
DENSE_RANK() 排序相同时会重复,总数会减少
ROW_NUMBER() 会根据顺序计算
- OVER():指定分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变而变化
- CURRENT ROW:当前行
- n PRECEDING:往前n行数据
- n FOLLOWING:往后n行数据
- UNBOUNDED:起点,UNBOUNDED PRECEDING 表示从前面的起点, UNBOUNDED FOLLOWING表示到后面的终点
- LAG(col,n):往前第n行数据
- LEAD(col,n):往后第n行数据
- NTILE(n):把有序分区中的行分发到指定数据的组中,各个组有编号,编号从1开始,对于每一行,NTILE返回此行所属的组的编号。注意:n必须为int类型。
在项目中是否自定义过UDF、UDTF函数,以及用他们处理了什么问题,及自定义步骤?
- 自定义过。
- 用UDF函数解析公共字段;用UDTF函数解析事件字段。
自定义UDF:继承UDF,重写evaluate方法
自定义UDTF:继承自GenericUDTF,重写3个方法:initialize(自定义输出的列名和类型),process(将结果返回forward(result)),close
为什么要自定义UDF/UDTF,因为自定义函数,可以自己埋点Log打印日志,出错或者数据异常,方便调试.
- MapJoin
如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即:在Reduce阶段完成join。容易发生数据倾斜。可以用MapJoin把小表全部加载到内存在map端进行join,避免reducer处理。
- 行列过滤
列处理:在SELECT中,只拿需要的列,如果有,尽量使用分区过滤,少用SELECT *。
行处理:在分区剪裁中,当使用外关联时,如果将副表的过滤条件写在Where后面,那么就会先全表关联,之后再过滤。
- 采用分桶技术
- 采用分区技术
- 合理设置Map数
- 通常情况下,作业会通过input的目录产生一个或者多个map任务。
主要的决定因素有:input的文件总个数,input的文件大小,集群设置的文件块大小。
- 是不是map数越多越好?
答案是否定的。如果一个任务有很多小文件(远远小于块大小128m),则每个小文件也会被当做一个块,用一个map任务来完成,而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的map数是受限的。
- 是不是保证每个map处理接近128m的文件块,就高枕无忧了?
答案也是不一定。比如有一个127m的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,如果map处理的逻辑比较复杂,用一个map任务去做,肯定也比较耗时。
针对上面的问题2和3,我们需要采取两种方式来解决:即减少map数和增加map数;
- 小文件进行合并
在Map执行前合并小文件,减少Map数:CombineHiveInputFormat具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat没有对小文件合并功能。
- 合理设置Reduce数
Reduce个数并不是越多越好
- 过多的启动和初始化Reduce也会消耗时间和资源;
- 另外,有多少个Reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;
在设置Reduce个数的时候也需要考虑这两个原则:处理大数据量利用合适的Reduce数;使单个Reduce任务处理数据量大小要合适;
- 常用参数
// 输出合并小文件
SET hive.merge.mapfiles = true; -- 默认true,在map-only任务结束时合并小文件
SET hive.merge.mapredfiles = true; -- 默认false,在map-reduce任务结束时合并小文件
SET hive.merge.size.per.task = 268435456; -- 默认256M
SET hive.merge.smallfiles.avgsize = 16777216; -- 当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge
- Client先访问zookeeper,从meta表读取region的位置,然后读取meta表中的数据。meta中又存储了用户表的region信息;
- 根据namespace、表名和rowkey在meta表中找到对应的region信息;
- 找到这个region对应的regionserver;
- 查找对应的region;
- 先从MemStore找数据,如果没有,再到BlockCache里面读;
- BlockCache还没有,再到StoreFile上读(为了读取的效率);
- 如果是从StoreFile里面读取的数据,不是直接返回给客户端,而是先写入BlockCache,再返回给客户端。
图2 HBase写数据流程
- Client向HregionServer发送写请求;
- HRegionServer将数据写到HLog(write ahead log)。为了数据的持久化和恢复;
- HRegionServer将数据写到内存(MemStore);
- 反馈Client写成功。
- 当MemStore数据达到阈值(默认是128M,老版本是64M),将数据刷到硬盘,将内存中的数据删除,同时删除HLog中的历史数据;
- 并将数据存储到HDFS中;
- 在HLog中做标记点。
- 当数据块达到3块,Hmaster触发合并操作,Region将数据块加载到本地,进行合并;
- 当合并的数据超过256M,进行拆分,将拆分后的Region分配给不同的HregionServer管理;
- 当HregionServer宕机后,将HregionServer上的hlog拆分,然后分配给不同的HregionServer加载,修改.META.;
- 注意:HLog会同步到HDFS。
Flush: <!--当memstore的大小超过这个值的时候,会flush到磁盘。128M--> <property> <name>hbase.hregion.memstore.flush.size</name> <value>134217728</value> </property> <!--单个regionserver的全部memstore的最大值。超过这个值总容量(Max Heap=983.4 M)*0.4, 一个新的put插入操作会被挂起,强制执行flush操作。 --> <property> <name>hbase.regionserver.global.memstore.upperLimit</name> <value>0.4</value> </property> <!--当强制执行flush操作的时候,当低于这个值的时候,flush会停止。默认是堆大小的 35% . --> <property> <name>hbase.regionserver.global.memstore.lowerLimit</name> <value>0.35</value> </property>
Compact: <!--当一个HStore含有多于这个值的HStoreFiles(每一个memstore flush产生一个HStoreFile)的时候,会执行一个合并操作,把这HStoreFiles写成一个--> <property> <name>hbase.hstore.compactionThreshold</name> <value>3</value> </property> <!--一个Region中的所有HStoreFile的major compactions的时间间隔。默认是1天。--> <property> <name>hbase.hregion.majorcompaction</name> <value>86400000</value> </property>
Split: <!--最大HStoreFile大小。若某个列族的HStoreFile增长达到这个值,这个Hegion会被切割成两个。 默认: 10G.--> <property> <name>hbase.hregion.max.filesize</name> <value>10737418240</value> </property> |
- rowkey长度原则(Rowkey的长度被很多开发者建议说设计在10~100个字节,不过建议是越短越好,不要超过16个字节)
- rowkey散列原则
- rowkey唯一原则
/opt/module/sqoop/bin/sqoop import \
--connect \
--username \
--password \
--target-dir \
--delete-target-dir \
--num-mappers \
--fields-terminated-by \
Hive中的Null在底层是以“\N”来存储,而MySQL中的Null在底层就是Null,为了保证数据两端的一致性。在导出数据时采用--input-null-string和--input-null-non-string两个参数。导入数据时采用--null-string和--null-non-string。
1)场景1:如Sqoop在导出到Mysql时,使用4个Map任务,过程中有2个任务失败,那此时MySQL中存储了另外两个Map任务导入的数据,此时老板正好看到了这个报表数据。而开发工程师发现任务失败后,会调试问题并最终将全部数据正确的导入MySQL,那后面老板再次看报表数据,发现本次看到的数据与之前的不一致,这在生产环境是不允许的。
官网:http://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html
Since Sqoop breaks down export process into multiple transactions, it is possible that a failed export job may result in partial data being committed to the database. This can further lead to subsequent jobs failing due to insert collisions in some cases, or lead to duplicated data in others. You can overcome this problem by specifying a staging table via the --staging-table option which acts as an auxiliary table that is used to stage exported data. The staged data is finally moved to the destination table in a single transaction.
–staging-table方式
2)场景2:设置map数量为1个(不推荐,面试官想要的答案不只这个)
多个Map任务时,采用–staging-table方式,仍然可以解决数据一致性问题。
只有Map阶段,没有Reduce阶段的任务。
Sqoop任务5分钟-2个小时的都有。取决于数据量。
- 元组的创建
val tuple1 = (1, 2, 3, "heiheihei")
println(tuple1)
- 元组数据的访问,注意元组元素的访问有下划线,并且访问下标从1开始,而不是0
val value1 = tuple1._4
println(value1)
- 元组的遍历
方式1:
for (elem <- tuple1.productIterator ) {
print(elem)
}
println()
方式2:
tuple1.productIterator.foreach(i => println(i))
隐式转换函数是以implicit关键字声明的带有单个参数的函数。这种函数将会自动应用,将值从一种类型转换为另一种类型。
1)Scala中函数的地位:一等公民
2) Scala中的匿名函数(函数字面量)
3)Scala中的高阶函数
4)Scala中的闭包
5)Scala中的部分应用函数
6)Scala中的柯里化函数
case class Person(name:String,age:Int)
一般使用在 ds=df.as[Person]
函数编程中,接受多个参数的函数都可以转化为接受单个参数的函数,这个转化过程就叫柯里化,柯里化就是证明了函数只需要一个参数而已。其实我们刚的学习过程中,已经涉及到了柯里化操作,所以这也印证了,柯里化就是以函数为主体这种思想发展的必然产生的结果。
1)柯里化的示例
def mul(x: Int, y: Int) = x * y
println(mul(10, 10))
def mulCurry(x: Int) = (y: Int) => x * y
比较两个字符串在忽略大小写的情况下是否相等,注意,这里是两个任务:
- 全部转大写(或小写)
- 比较是否相等
针对这两个操作,我们用一个函数去处理的思想,其实无意间也变成了两个函数处理的思想。示例如下:
val a = Array("Hello", "World")
val b = Array("hello", "world")
println(a.corresponds(b)(_.equalsIgnoreCase(_)))
其中corresponds函数的源码如下:
def corresponds[B](that: GenSeq[B])(p: (A,B) => Boolean): Boolean = {
val i = this.iterator
val j = that.iterator
while (i.hasNext && j.hasNex t)
if (!p(i.next(), j.next() ))
return fals
!i.hasNext && !j.hasNext
}
尖叫提示:不要设立柯里化存在义这样的命题,柯里化,是面向函数思想的必然产生结果。
一个函数把外部的那些不属于自己的对象也包含(闭合)进来。
案例1:
def minusxy(x: Int) = (y: Int) => x - y
这就是一个闭包:
1) 匿名函数(y: Int) => x -y嵌套在minusxy函数中。
2) 匿名函数(y: Int) => x -y使用了该匿名函数之外的变量x
3) 函数minusxy返回了引用了局部变量的匿名函数
案例2
def minusxy(x: Int) = (y: Int) => x - y
val f1 = minusxy(10)
val f2 = minusxy(10)
println(f1(3) + f2(3))
此处f1,f2这两个函数就叫闭包。
val map = Map("Tom"-> 23)
map("Jack") // 抛出异常 java.util.NoSuchElementException: key not found: Jack
map.get("Jack") // None
map("Tom") // 23
map.get("Tom") // Some(23)
使用模式匹配取出最后结果
val optionAge = map.get("Tom")
val age = optionAge match {
case Some(x) => optionAge.get
case None => 0
- Local:运行在一台机器上,通常是练手或者测试环境。
- Standalone:构建一个基于Master+Slaves的资源调度集群,Spark任务提交给Master运行。是Spark自身的一个调度系统。
- Yarn: Spark客户端直接连接Yarn,不需要额外构建Spark集群。有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。
- Mesos:国内大环境比较少用。
Shell 脚本。
参考答案:
https://blog.csdn.net/gamer_gyt/article/details/79135118
- 在提交任务时的几个重要参数
executor-cores —— 每个executor使用的内核数,默认为1,官方建议2-5个,我们企业是4个
num-executors —— 启动executors的数量,默认为2
executor-memory —— executor内存大小,默认1G
driver-cores —— driver使用内核数,默认为1
driver-memory —— driver内存大小,默认512M
- 给一个提交任务的样式
spark-submit \
--master local[5] \
--driver-cores 2 \
--driver-memory 8g \
--executor-cores 4 \
--num-executors 10 \
--executor-memory 8g \
--class PackageName.ClassName XXXX.jar \
--name "Spark Job Name" \
InputPath \
OutputPath
RDD在Lineage依赖方面分为两种Narrow Dependencies与Wide Dependencies用来解决数据容错时的高效性以及划分任务时候起到重要作用。
Stage:根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage。
Task:Stage是一个TaskSet,将Stage根据分区数划分成一个个的Task。
- map(func):返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成.
- mapPartitions(func):类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。
- reduceByKey(func,[numTask]):在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。
- aggregateByKey (zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U: 在kv对的RDD中,,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。
- combineByKey(createCombiner: V=>C, mergeValue: (C, V) =>C, mergeCombiners: (C, C) =>C):
对相同K,把V合并成一个集合。
- createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值
- mergeValue: 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并
- mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。
…
根据自身情况选择比较熟悉的算子加以介绍。
- reduce:
- collect:
- first:
- take:
- aggregate:
- countByKey:
- foreach:
- saveAsTextFile: