七、Sqoop实战
1 Mysql数据导入HDFS上.
1. 全量导入:
将mysql表中全部数据都导入HDFS,如果HDFS中存在这个目录的话就会报错,默认存储的HDFS目录是 /user/root/XXX.
bin/sqoop import (在sqoop的安装目录内,import表名是导入)
--connect jdbc:mysql://192.168.52.130:3306/userdb (连接:协议:数据库类型://ip地址:端口号/数据库)
--username root (用户名 root)
--password 123456 (密码 123456)
--table emp (表 emp)
--m 1 (--num-mappers:使用几个mapper,写1就可以)
若要导入到HDFS指定目录下,并指定字段之间的分隔符:
使用参数 --target-dir 来指定导出目的地,
使用参数 --delete-target-dir 来判断导出目录是否存在,如果存在就删掉.
使用参数 --fields-terminated-by '\t' 使用''\t''来切割字段,sqoop默认是使用','逗号进行分割的.
bin/sqoop import (在sqoop的安装目录内,import表名是导入)
--connect jdbc:mysql://192.168.52.130:3306/userdb (连接:协议:数据库类型://ip地址:端口号/数据库)
--username root (用户名 root)
--password 123456 (密码 123456)
--table emp (表 emp)
--delete-target-dir (如果指定目录存在就删除它)
--target-dir /sqoop/emp (导入到指定目录)
--fields-terminated-by '\t' (指定字段分割符为\t)
--m 1 (--num-mappers:使用几个mapper,写1就可以)
2.增量导入:
将数据库中某一字段,增加的导入,在HDFS上单独形成一个文件.
注意:增量导入的时候,一定不能加参数--delete-target-dir否则会报错
bin/sqoop import
--connect jdbc:mysql://192.168.52.130:3306/myhive
--username root
--password 123456
--table emp
--incremental append (表明增量导入)
--check-column id (检查哪个字段,这里检查的是mysql数据库表中的id字段)
--last-value 4 (id字段最后一个id是4,那增量导入的话就是从id=5开始往后导入)
--m 1
3.减量导入:
设置where条件,通过条件可以判断减少的数据或增加的数据,控制更加灵活一些,例如可以通过表创建时间来判断数据是哪一天生成的等,每个表均设置3个字段,create_time(表创建时间),update_time(表更新时间),is_delete(是否删除)
注意:where条件的地方需要使用“”双引号引起来,否则where条件失效
bin/sqoop import \ --connect jdbc:mysql://192.168.52.130:3306/userdb \ --username root \ --password admin \ --table emp \ --incremental append \ --where "create_time > '2019-02-14 00:00:00' and is_delete='1' and create_time < '2019-02-14 23:59:59'" \ --target-dir /sqoop/incement \ --check-column id \ --m 1
4.SQL语句查找导入HDFS
我们还可以通过 –query参数来指定我们的sql语句,通过sql语句来过滤我们的数据进行导入
bin/sqoop import --connect jdbc:mysql://192.168.52.130:3306/userdb --username root --password 123456 --delete-target-dir --query 'select phno from emp_conn where 1=1 and $CONDITIONS' --target-dir /sqoop/emp_conn --m 1
注意事项:
使用sql语句来进行查找是不能加参数--table
并且必须要添加where条件,
并且where条件后面必须带一个$CONDITIONS 这个字符串,
并且这个sql语句必须用单引号,不能用双引号.
2. Mysql数据导入Hive上.
1.将我们mysql表当中的数据直接导入到hive表中的话,需要将hive的一个叫做hive-exec-1.1.0-cdh5.14.0.jar的jar包拷贝到sqoop的lib目录下
cp /export/servers/hive-1.1.0-cdh5.14.0/lib/hive-exec-1.1.0-cdh5.14.0.jar /export/servers/sqoop-1.4.6-cdh5.14.0/lib/
2.将我们mysql当中的数据导入到hive表当中来,需要准备hive数据库与表
hive (default)> create database sqooptohive; hive (default)> use sqooptohive; hive (sqooptohive)> create external table emp_hive(id int,name string,deg string,salary int ,dept string) row format delimited fields terminated by '\t';
3.导入语句
bin/sqoop import --connect jdbc:mysql://192.168.52.130:3306/userdb --username root --password 123456 --table emp --fields-terminated-by '\t' (字段之间的分隔符) --hive-import (将数据从mysql数据库中导入到hive表中) --hive-table qooptohive.emp_hive (后面接要创建的hive表,数据库中的某个表,使用"."连接) --hive-overwrite (覆盖掉在hive表中已经存在的数据) --delete-target-dir --m 1
注意:我们还可以导入关系表到hive并自动创建hive表,导入
bin/sqoop import --connect jdbc:mysql://192.168.52.130:3306/userdb --username root --password 123456 --table emp_conn --hive-import --hive-database sqooptohive (--hive-database 后面直接接数据库名) --m 1
3.Sqoop的数据导出
将数据从HDFS把文件导出到mysql数据库,导出前,目标表必须存在于目标数据库中。
数据是在HDFS当中的如下目录/sqoop/emp,数据内容如下
1201,gopal,manager,50000,TP,2018-06-17 18:54:32.0,2018-06-17 18:54:32.0,1 1202,manisha,Proof reader,50000,TP,2018-06-15 18:54:32.0,2018-06-17 20:26:08.0,1 1203,khalil,php dev,30000,AC,2018-06-17 18:54:32.0,2018-06-17 18:54:32.0,1 1204,prasanth,php dev,30000,AC,2018-06-17 18:54:32.0,2018-06-17 21:05:52.0,0 1205,kranthi,admin,20000,TP,2018-06-17 18:54:32.0,2018-06-17 18:54:32.0,1
1.创建mysql表
CREATE TABLE `emp_out` ( `id` INT(11) DEFAULT NULL, `name` VARCHAR(100) DEFAULT NULL, `deg` VARCHAR(100) DEFAULT NULL, `salary` INT(11) DEFAULT NULL, `dept` VARCHAR(10) DEFAULT NULL, `create_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, `update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `is_delete` BIGINT(20) DEFAULT '1' ) ENGINE=INNODB DEFAULT CHARSET=utf8;
2.执行导出命令:通过export来实现数据的导出,将hdfs的数据导出到mysql当中去
bin/sqoop export --connect jdbc:mysql://192.168.52.130:3306/userdb --username root --password 123456 --table emp_out --export-dir /sqoop/emp --input-fields-terminated-by ","
3.验证mysql表数据
八、Hbase--分布式列存储NOSQL数据库
1、Hbase数据存储在hdfs,少量存内存
2、hbase适合海量稀疏数据存储
hbase属于nosql数据库,列存储
3、与传统关系型数据库对比:
行存储:传统关系型数据mysql、oracle
优点:保证数据完整性,写入检查
缺点:读的过程会产生冗余信息
列存储:Nosql数据库
优点:读的过程不会产生冗余
缺点:写入效率差,不保证完整性
4、Hbase优点:
(1)存储海量数据
(2)快速随机访问
(3)进行大量的改写操作
Hbase的优点及应用场景:
半结构化或非结构化数据:
对于数据结构字段不够确定或杂乱无章非常难按一个概念去进行抽取的数据适合用HBase,因为HBase支持动态添加列。
记录很稀疏:
RDBMS的行有多少列是固定的。为null的列浪费了存储空间。HBase为null的Column不会被存储,这样既节省了空间又提高了读性能。
多版本号数据:
依据Row key和Column key定位到的Value能够有随意数量的版本号值,因此对于须要存储变动历史记录的数据,用HBase是很方便的。比方某个用户的Address变更,用户的Address变更记录也许也是具有研究意义的。
仅要求最终一致性:
对于数据存储事务的要求不像金融行业和财务系统这么高,只要保证最终一致性就行。(比如HBase+elasticsearch时,可能出现数据不一致)
高可用和海量数据以及很大的瞬间写入量:
WAL解决高可用,支持PB级数据,put性能高
适用于插入比查询操作更频繁的情况。比如,对于历史记录表和日志文件。(HBase的写操作更加高效)
业务场景简单:
不需要太多的关系型数据库特性,列入交叉列,交叉表,事务,连接等。
Hbase的缺点:
单一RowKey固有的局限性决定了它不可能有效地支持多条件查询[2]
不适合于大范围扫描查询
不直接支持 SQL 的语句查询
5、Hbase结构:rowkey -> Column Family -> Column Qualifer列族具体列
rowkey行键
table的主键,table中的记录按照rowkey 的字典序进行排序
Column Family列族
hbase表中的每个列,都归属与某个列族。列族是表的schema的一部分(而列不是),必须在使用表之前定义。
Timestamp时间戳
每次数据操作对应的时间戳,可以看作是数据的version number版本号
Column列
列族下面的具体列
属于某一个ColumnFamily,类似于我们mysql当中创建的具体的列
cell单元格
由{row key, column( = + ), version} 唯一确定的单元
cell中的数据是没有类型的,全部是以字节数组进行存储
6、Hbase逻辑模型:三维有序
Rowkey -> Column Family -> Column Qualifier -> Timestamp
rowkey行(正序, 从小到大)、column列(正序从小到大)、timestamp时间(倒叙从大到小)
面试点:为什么说hbase表的列族不宜超过3个?
a、列族数量决定store, 一个store至少有一个memstore,而memstore占内存
b、如果列族越多的话,造成更多的flush会产生更多IO
flush的最小单位是region, 一个region中的某个列族做flush , 其余的列族也会做flush
频繁的flush产生更多的storeFile,storeFile增多就会产生更多compaction操作
compaction操作和flush都是重IO操作
c、列族过多,split操作会出现数据不均匀的情况
散列原则:
前提:服务器的配置不是很好并且对查询速度要求不是很高
rowkey设计为:random+时间
目的:防止某一个或某几个regionserver成为热点
有序原则:
前提:服务器本身的配置要高一些, 会出现一个或是多个region热点效应
rowkey设计为:时间+random
Hbase shell 基础
list_namespace 查看所有数据,类似于show database;
scan 'hbase:meta' 查看元数据信息
--创建表 'cf1','cf2' 表示列族
create 'badou_20_a','cf1','cf2'
-- 查看表的结构
describe 'badou_20_a'
-- 删除cf1列族
alter 'badou_20_a',{NAME=>'cf1',METHOD=>'delete'}
-- 查看存在哪些表
list
exists 'badou_20_a'
-- 保留两个版本的数据, IN_MEMORY数据保存到内存中
alter 'badou_20_a',{NAME=>'cf2',VERSIONS=>2,IN_MEMORY=>true}
-- 删除表
disable 'badou_20_a' : 将表转换为去激活的状态
drop 'badou_20_a' : 删除表
-- 激活表
enable 'badou_20_a'
-- 插入记录
put 'badou_20','1003','cf2:name','root'
put 'badou_20','1004','cf2:name','scott'
-- 获取记录
scan 'badou_20' 注意 hbase表的数据量特别大的时候, scan 慎用
-- 根据rowkey 查询
get 'badou_20','1001'
-- 根据列族获取
get 'badou_20','1001',{COLUMN=>'cf2:name'}
-- 根据列族和指定的时间戳进行获取
get 'badou_20','1001',{COLUMN=>'cf2:name',TIMESTAMP=>1615465406738}
-- 查询表的记录
count 'badou_20'
-- 强制刷出内存的数据到HDFS
flush 'badou_20'
-- 清除表的数据,保留表的结构
truncate 'order'
9、hbase shell 进阶
-- 修改 badou_20 版本为2
put 'badou_20','1001','cf2:name','max' put 'badou_20','1001','cf2:name','avg' alter 'badou_20',{NAME=>'cf2',VERSIONS=>2}
如何显示两个版本?
scan 'badou_20',{VERSIONS=>2} get 'badou_20','1001',{COLUMN=>'cf2:name',VERSIONS=>2} get 'badou_20','1001',{COLUMN=>'cf2',VERSIONS=>2}
-- 修改表的版本
alter 'badou_20',{NAME=>'cf2',VERSIONS=>3} alter 'badou_20',{NAME=>'cf2',VERSIONS=>4}
-- TTL 按照规定的时间对数据进行超时间设置
TTL => 'FOREVER' 表示数据永不过期 TTL => '60 SECONDS 表示一分钟之前的数据会过期 create 'tt_table',{NAME=>'cf1',TTL=>60} 1616311193758 put 'tt_table','rowkey001','cf1:age','30',1616311993900
九、Flume实战
1、采集目录到HDFS
采集需求:某服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,就需要把文件采集到HDFS中去
根据需求,首先定义以下3大要素
采集源,即source——监控文件目录 : spooldir
下沉目标,即sink——HDFS文件系统 : hdfs sink
source和sink之间的传递通道——channel,可用file channel 也可以用内存channel
配置文件编写:
#定义三大组件的名称 agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 # 配置source组件 agent1.sources.source1.type = spooldir agent1.sources.source1.spoolDir = /home/hadoop/logs/ agent1.sources.source1.fileHeader = false #配置拦截器 agent1.sources.source1.interceptors = i1 agent1.sources.source1.interceptors.i1.type = host agent1.sources.source1.interceptors.i1.hostHeader = hostname # 配置sink组件 agent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M agent1.sinks.sink1.hdfs.filePrefix = access_log agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat =Text agent1.sinks.sink1.hdfs.rollSize = 102400 agent1.sinks.sink1.hdfs.rollCount = 1000000 agent1.sinks.sink1.hdfs.rollInterval = 60 #agent1.sinks.sink1.hdfs.round = true #agent1.sinks.sink1.hdfs.roundValue = 10 #agent1.sinks.sink1.hdfs.roundUnit = minute agent1.sinks.sink1.hdfs.useLocalTimeStamp = true # Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.keep-alive = 120 agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600 # Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1
Channel参数解释:
capacity:默认该通道中最大的可以存储的event数量
trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量
keep-alive:event添加到通道中或者移出的允许时间
2、采集文件到HDFS
采集需求:比如业务系统使用log4j生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到hdfs
根据需求,首先定义以下3大要素
采集源,即source——监控文件内容更新 : exec ‘tail -F file’
下沉目标,即sink——HDFS文件系统 : hdfs sink
Source和sink之间的传递通道——channel,可用file channel 也可以用 内存channel
配置文件编写:
agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 # Describe/configure tail -F source1 agent1.sources.source1.type = exec agent1.sources.source1.command = tail -F /home/hadoop/logs/access_log agent1.sources.source1.channels = channel1 #configure host for source agent1.sources.source1.interceptors = i1 agent1.sources.source1.interceptors.i1.type = host agent1.sources.source1.interceptors.i1.hostHeader = hostname # Describe sink1 agent1.sinks.sink1.type = hdfs #a1.sinks.k1.channel = c1 agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M agent1.sinks.sink1.hdfs.filePrefix = access_log agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat =Text agent1.sinks.sink1.hdfs.rollSize = 102400 agent1.sinks.sink1.hdfs.rollCount = 1000000 agent1.sinks.sink1.hdfs.rollInterval = 60 agent1.sinks.sink1.hdfs.round = true agent1.sinks.sink1.hdfs.roundValue = 10 agent1.sinks.sink1.hdfs.roundUnit = minute agent1.sinks.sink1.hdfs.useLocalTimeStamp = true # Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.keep-alive = 120 agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600 # Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1 更多source和sink组件:
Flume支持众多的source和sink类型,详细手册可参考官方文档
Flume 1.9.0 User Guide — Apache Flume
十、Kafka介绍
1.Apache Kafka简介
Kakfa最初由Linkedin公司开发,使用 Scala 编写,拥有高吞吐、可持久化、可水平扩展的基于发布/订阅模式的分布式消息队列,支持分区策略、多副本策略,基于zookeeper协调的分布式消息系统,主要应用于大数据的实时或离线数据处理、日志收集以及实时指标监控等领域。
2.Kafka常用术语
消息:message,消息是kafka的基本数据单元,代表着一条一条的数据,为了提高网络和存储的利用率,生产者会批量发送消息到Kafka,并在发送之前对消息进行压缩。
主题:topic,主题是kafka对消息的分类,是一个逻辑概念,可以看作消息集合,用于接收不同业务的消息。
分区:partition,类似数据库的分区表,通常topic下会多个分区,每个分区内的数据是有序的,同一个topic的多个分区kafka不保证消息的顺序性,一个分区在逻辑上对应一个Log,对应磁盘上的一个文件夹。
偏移量:offset,偏移量是表示消息在分区中的位置,kafka存储的文件是按照offset.log的格式来命名的,便于快速查找。
副本:replicas,副本是针对分区而言的,kafka对消息做了冗余备份,目的就是为了容灾,每个分区可以指定多个副本来冗余数据,分为leader partition和follower partition,只有leader partition对外提供服务,follower partition单纯是从leader partition同步数据,因此会存在多份相同的数据。
生产者:producer,生产者是kafka集群的上游,顾名思义就是往kafka输入数据的客户端。
消费者:comsumer,消费者是kafka集群的下游,与生产者相辅相成,kafka类似一个仓库,生产者负责生产消息往仓库放,自然得有消费者从仓库里拿消息,不然仓库容易爆满。
消费者组:Comsumer Group,简称CG,这个比较容易理解,就是将多个消费者捆绑起来,组团消费消息,一个Consumer只能属于一个Consumer Group,Kafka还通过Consumer Group实现了消费者的水平扩展和故障转移。
节点:broker,一个broker就是一个kafka server实例,多个broker组成kafka集群,主要用于接收生产者发送过来的消息,写入磁盘,同时可以接收消费者和其他broker的请求。
重新负载均衡:rebalance,当消费者组的消费者实例出现变化时,例如新增消费者或者减少消费者,都会触发kafka的Rebalance机制,这个过程比较耗性能,要尽量避免这个过程被触发。
Kafka架构
我们把架构分主从架构和对等架构,主从架构就是分为管理节点和工作节点,职责不同,如HDFS 、spark、flink;对等架构则不区分节点属性,所有的实例职责都是一样的,kafka的架构有点类似于对等架构,但又不完全是。Kafka的设计理念之一就是同时提供离线处理和实时处理。
Kafka ACK消息确认机制有三个值,分别为1,0和-1,默认是1,对应不同的状态:
ack=1,意味着producer要等待leader成功收到数据并得到确认,才发送下一条message,安全性较高但是性能相对较低。
ack=0,意思就是说,我只管发送消息,不用你给我回复,成就成,不成我也不管,这种策略的性能是最高的,但是容易丢失数据。
ack=-1,这种情况下,生产者只有收到所有副本写入成功的通知后,才会认为消息写入成功,安全性最高,但是性能是三者里面最差的。
kafka分区和副本机制
kafka分区机制:
实现kafka高吞吐量的重要手段,实现流量分发和负载均衡,试想一下,如果所有的消息都往同一个数据写,对于服务器来说会造成极高的负载,特别是出现热点数据的时候容易崩溃,对于多个生产者和多个消费者来说,只有一个分区可以用于生产和消费,这显然是非常受限的。
kafka提供了三种分区策略:
轮询策略:Round-robin,轮询策略是Kafka默认的分区策略,根据主题分区数量从头到尾进行轮询,目的就是为了将消息均匀地分布在分区中,保证消息最大限度地被平均分配到所有分区上。
随机策略:Range Strategy,所谓随机就是我们随意地将消息放置到任意一个分区上,随机分发,这样有可能会造成消息分发不均匀,相比之下,轮询策略显得更加合理,旧版本默认是用随机策略,新版本默认用的是轮询策略。
按key分发策略:顾名思义就是根据消息的key指定分区写入,这种方式主观性比较强,相对比较灵活。
除此之外,kafka支持自定义分区器,实现更多复杂的逻辑处理消息。
kafka副本机制:
为了提供数据冗余、数据备份的安全策略,等同于备份,实际上,基本所有的分布式消息队列都会存在副本机制,不光是消息队列,HDFS也是如此。
前面在kafka常用术语中说到,Kafka 是有主题概念的,主题下划分成若干个分区,副本是分区的逻辑概念,分区可以指定多个副本。本质上副本就是一个只能不断追加的日志文件,在实际的生产中,为了保障数据安全,通常会配置多个副本,根据算法分散在不同的broker上,一份数据(leader和副本)不会同时出现在一台服务器上,这样当服务器出现故障时,能够最大程度保证数据不丢失,如下图。
其中leader partition和 follower partition的工作原理如下,正常情况下,只有leader partition对外提供服务,follower partition负责从leader partition拉取数据,当leader发送故障时,follower拥有被选举为新leader的权利。
3.Kafka的优势
支持数据离线和实时处理
能保证消息的可靠性传递
支持消息的持久化存储,并通过多副本分布式的存储方案来保证消息的容错
高吞吐率,每秒处理百万级的消息量
高并发,支持数千个客户端同时读写
支持在线水平扩展
kafka为什么能实现读写都这么快呢?
答:离不开kafka顺序读写机制和零拷贝数据传输,减少了寻址的时间消耗,降低了读写延迟,同时有利于快到定位消息偏移量,零拷贝机制可以提高数据传输的效率,减少IO资源的占用。
十一、Spark
1.Spark介绍
1.1 什么是spark
基于内存的分布式计算框架
只负责算 不负责存
spark 在离线计算 功能上 类似于mapreduce的作用
1.2 为什么用spark
MapReduce的缺点
运行速度慢 (没有充分利用内存)
接口比较简单,仅支持Map Reduce
功能比较单一 只能做离线计算
不适合迭代计算(如机器学习、图计算等等),交互式处理(数据挖掘)
不适合流式处理(点击日志分析)
需要一种灵活的框架可同时进行批处理、流式计算、交互式计算
内存计算引擎,提供cache机制来支持需要反复迭代计算或者多次数据共享,减少数据读取的IO开销
DAG引擎,减少多次计算之间中间结果写到HDFS的开销
使用多线程模型来减少task启动开销,shuffle过程中避免不必要的sort操作以及减少磁盘IO
spark的缺点是:吃内存,不太稳定
Spark优势
速度快(比mapreduce在内存中快100倍,在磁盘中快10倍)spark中的job中间结果可以不落地,可以存放在内存中。 mapreduce中map和reduce任务都是以进程的方式运行着,而spark中的job是以线程方式运行在进程中。
易用性(可以通过java/scala/python/R开发spark应用程序)
通用性(可以使用spark sql/spark streaming/mlib/Graphx)
兼容性(spark程序可以运行在standalone/yarn/mesos)
2. RDD 的概念
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合.
Dataset:一个数据集,简单的理解为集合,用于存放数据的
Distributed:它的数据是分布式存储,并且可以做分布式的计算
Resilient:弹性的
它表示的是数据可以保存在磁盘,也可以保存在内存中
数据分布式也是弹性的
弹性:并不是指他可以动态扩展,而是容错机制。
RDD会在多个节点上存储,就和hdfs的分布式道理是一样的。hdfs文件被切分为多个block存储在各个节点上,而RDD是被切分为多个partition。不同的partition可能在不同的节点上
spark读取hdfs的场景下,spark把hdfs的block读到内存就会抽象为spark的partition。
spark计算结束,一般会把数据做持久化到hive,hbase,hdfs等等。我们就拿hdfs举例,将RDD持久化到hdfs上,RDD的每个partition就会存成一个文件,如果文件小于128M,就可以理解为一个partition对应hdfs的一个block。反之,如果大于128M,就会被且分为多个block,这样,一个partition就会对应多个block。
所有spark中对数据的操作最终都会转换成RDD的操作
spark sql
spark streaming
spark ml 、spark mllib
RDD是不可变的
父RDD 生成一个子 RDD 父RDD的状态不会变化
从容错的角度去做这样的设计
2.1 RDD的创建
创建RDD之前先要有spark context
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
通过内存中的数据创建RDD
= [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
创建RDD时可以指定 partition的数量(RDD会分成几份)一个partition会对应一个task,根据CPU的内核数来指定partition (1核对应2~4个partition)
从文件创建RDD 可以是HDFS支持的任何一种存储介质
可以从 hdfs、 数据库(mysql) 、本地文件系统、 hbase 这些地方加载数据创建RDD
rdd = sc.textFile(‘file:///root/tmp/test.txt’)
2.2 RDD的三类算子
transformation
所有的transformation 都是延迟执行的,只要不调用action 不会执行,只是记录过程
transformation 这一类算子返回值还是 rdd
rdd.transformation 还会得到新的rdd
map(func) 将func函数作用到数据集的每一个元素上,生成一个新的RDD返回
filter(func) 选出所有func返回值为true的元素,生成一个新的RDD返回
flatMap(func) 会先执行map的操作,再将所有对象合并为一个对象
union() 取并集
intersection() 交集
groupByKey() 以元组中的第0个元素作为key,进行分组,返回一个新的RDD 结果中 value是一个Iterable
reducebykey(func) 将key相同的键值对,按照Function进行计算
sortbykey(ascending=True, numPartitions=None, keyfunc=)Sorts this RDD, which is assumed to consist of (key, value) pairs.按照关键词排序,排完后函数操作
action
会触发之前的rdd所有的transformation
获取最终的结果
collect 所有的结果都会加载到内存中
reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。
fitst 第一个
take(num) 前n个
count()
persist
数据存储,可以存到内存,也可以是磁盘
3. spark-core 实战
详情见spark-core 实战 通过spark实现ip地址查询
4. spark集群架构
Application
用户自己写的Spark应用程序,批处理作业的集合。Application的main方法为应用程序的入口,用户通过Spark的API,定义了RDD和对RDD的操作。
Client:客户端进程,负责提交作业到Master。
Master(类比与ResourceManager)
Standalone模式中主控节点,负责接收Client提交的作业,管理Worker,并命令Worker启动Driver和Executor。
Worker(类比于NodeManager)
Standalone模式中slave节点上的守护进程,负责管理本节点的资源,定期向Master汇报心跳,接收Master的命令,启动Driver和Executor。
Driver(类比于ApplicationMaster)
一个Spark作业运行时包括一个Driver进程,也是作业的主进程,负责作业的解析、生成Stage并调度Task到Executor上。包括DAGScheduler,TaskScheduler。
DAGScheduler: 实现将Spark作业分解成一到多个Stage,每个Stage根据RDD的Partition个数决定Task的个数,然后生成相应的Task set放到TaskScheduler中。
TaskScheduler:实现Task分配到Executor上执行。
Stage:一个Spark作业一般包含一到多个Stage。
Task:一个Stage包含一到多个Task,通过多个Task实现并行运行的功能。
Executor(类比于Container):即真正执行作业的地方,一个集群一般包含多个Executor,每个Executor接收Driver的命令Launch Task,一个Executor可以执行一到多个Task。