Stage1:
Mapper - <TableScanOperator,ProjectionOperator,ParitialAggregationOperator>
Reducer - <FinalAggregationOperator, ReducerSinkOperator>
写到这里发现选取的例子并没有复杂表达式,比如我其实要计算大家统一加薪10%之后的成本,那我其实会写Salary * 1.1。如果是这样的话,Salary * 1.1这样的表达式会被封装成一个可以求值的求值器。很多类似Hive的SQL On Hadoop系统选择进行代码生成,比如Spark,Impala或者Presto(其实基本Hive之后的都进行了代码生成,而Hive应该还没有进行代码生成,这个我并不确定)。
不管Hive现在有没有代码生成,就算有,也和很多人想象中的代码生成并不同。Hive并不会直接产生一个MapReduce作业的全部代码。Hive会将刚才说的Operator信息进行封装,产生一个可以序列化传输的包,这个包里包涵了各个Operator求值所需的信息,然后提交作业分发给Mapper和Reducer。Hive使用的Mapper和Reducer是两个特定的Hive类,它们的一部分初始化信息来自于Job阶段根据Operator的信息进行设定(比如TableScan相关的信息一部分在Job生成的时候就已经设置好),另一部分会在每个Task启动的时候装载刚才序列化的Operator信息并产生一个可以求值的求值器,当map和reduce函数被调用的时候,求值器也被调用,每个输入数据都被求值一遍。
这样一次Mapper和Reducer走完,SQL就计算完毕了。
具体Hive使用的MapReduce类是
org.apache.hadoop.hive.ql.exec.mr.ExecMapper,ExecReducer
好奇的同学可以去翻翻看,到底逻辑是如何的。
实际上上面的SQL基本上可以说是最最简单的场景了,如果有诸如Join,窗口函数,子查询等,整个执行计划会一下子变的复杂。另外元数据管理,权限,HiveServer2和JDBC等等,就没有那么令人感兴趣了,在此略过不提,用过Hive的人多少可以自己想得出这几个部分如何工作。
5.3HIVE数据存储
Hive的数据存储
1、Hive中所有的数据都存储在 HDFS 中,没有专门的数据存储格式(可支持Text,SequenceFile,ParquetFile,RCFILE等)
2、只需要在创建表的时候告诉 Hive 数据中的列分隔符和行分隔符,Hive 就可以解析数据。
3、Hive 中包含以下数据模型:DB、Table,External Table,Partition,Bucket。
db:在hdfs中表现为${hive.metastore.warehouse.dir}目录下一个文件夹 table:在hdfs中表现所属db目录下一个文件夹 external table:外部表, 与table类似,不过其数据存放位置可以在任意指定路径 普通表: 删除表后, hdfs上的文件都删了 External外部表删除后, hdfs上的文件没有删除, 只是把文件删除了 partition:在hdfs中表现为table目录下的子目录 bucket:桶, 在hdfs中表现为同一个表目录下根据hash散列之后的多个文件, 会根据不同的文件把数据放到不同的文件中
5.4HIVE常用语句
常用语句
创建数据库 >create database db_name; >create database if not exists db_name;//创建一个不存在的数据库final 查看数据库 >show databases; 选择性查看数据库 >show databases like 'f.*'; 查看某一个数据库的详细信息 >describe database db_name; 删除非空数据库 >drop database db_name CASCADE; 创建数据库时,指定数据库位置 >create database db_name location '/home/database/' 创建数据库的时候希望能够给数据库增加一些描述性东西 >create database db_name comment 'helloworld'; 创建数据库的时候,需要为数据库增加属性信息,可以使用with dbproperties信息 >create database db_name with dbproperties<'createor'='hello','date'='2018-3-3'); 如果要使用自己已经存在的数据库 >use db_name; 修改数据库的属性信息 >alter database db_name set dbproperties('edited-by'='hello'); 创建表 >create table tab_name(id int,name string) row format delimited fields terminated by '\t'; 创建一个表,该表和已有的某一个表的结构一样(复制表结构) >create table if not exists emp like employeel; 查看当前数据库下的所有表 >show tables; 删除一个已经存在的表 >drop table employee; 修改一个表明,重命名 >alter table old_name rename to new_name; 将hdfs上面的文件信息导入到hive表中(/home/bigdata代表文件在在HDFS上位置)使用改命令时一定要注意数据与数据之间在txt文件编辑的时候一定要Tab间隔 >load data local inpath '/home/bigdata' into table hive.dep; 修改某一个表的某一列的信息 >alter table tab_name change column key key_1 int comment 'h' after value; 给某一个表增加某一列的信息 >alter table tab_name add columns(value1 string,value2 string); 如果想替换表中的某一个列 >alter table tab_name replace columns(values string,value11 string); 修改表中某一列的属性 >alter table tab_name set tblproperties('value'='hello'); hive成批向某一表插入数据 >insert overwrite table tab_name select * from tab_name2; 将 查询结果保留到一个新表中去 >create table tab_name as select * from t_name2; 将查询结果保存到指定的文件目录(可以是本地,也可以HDFS) >insert overwrite local directory '/home/hadoop/test' select *from t_name; >insert overwrite directory '/aaa/bbb/' select *from t_p; 两表内连 >select *from dual a join dual b on a.key=b.key; 将hive查询结果输出到本地特定目录 insert overwrite local directory '/home/bigdata1/mydir' select *from test; 将hive查询结果输出到HDFS特定目录 insert overwrite directory '/home/mydir' select *from test;
案例:EMP DEPT表
(1)命令行窗口通过输入hive,连接到hive,查看数据库 hive show databases; 通过location指定数据库路径,本例中数据库路径为当前用户桌面上目录 create database hadoop_hive location '/home/ubuntu/Desktop/hadoop_hive'; 使用创建好的数据库 use hadoop_hive; (2)创建员工表(emp+学号,如:emp001) create table emp001(empno int,ename string,job string,mgr int,hiredate string,sal int,comm int,deptno int) row format delimited fields terminated by ','; (3)创建部门表(dept+学号,如:dept001) create table dept001(deptno int,dname string,loc string) row format delimited fields terminated by ','; (4)导入数据 load data inpath '/001/hive/emp.csv' into table emp001; load data inpath '/001/hive/dept.csv' into table dept001; (5)根据员工的部门号创建分区,表名emp_part+学号,如:emp_part001 create table emp_part001(empno int,ename string,job string,mgr int,hiredate string,sal int,comm int)partitioned by (deptno int)row format delimited fields terminated by ','; 往分区表中插入数据:指明导入的数据的分区(通过子查询导入数据) insert into table emp_part001 partition(deptno=10) select empno,ename,job,mgr,hiredate,sal,comm from emp001 where deptno=10; insert into table emp_part001 partition(deptno=20) select empno,ename,job,mgr,hiredate,sal,comm from emp001 where deptno=20; insert into table emp_part001 partition(deptno=30) select empno,ename,job,mgr,hiredate,sal,comm from emp001 where deptno=30; select * from emp_part001; (6)创建一个桶表,表名emp_bucket+学号,如:emp_bucket001,根据员工的职位(job)进行分桶: create table emp_bucket001(empno int,ename string,job string,mgr int,hiredate string,sal int,comm int,deptno int)clustered by (job) into 4 buckets row format delimited fields terminated by ','; (7)通过子查询插入数据: insert into emp_bucket001 select * from emp001; (8)查询员工信息:员工号 姓名 薪水 select empno,ename,sal from emp001; (9)多表查询 select dept001.dname,emp001.ename from emp001,dept001 where emp001.deptno=dept001.deptno; (10)做报表,根据职位给员工涨工资,把涨前、涨后的薪水显示出来 select empno,ename,job,sal,case job when 'PRESIDENT' then sal+1000 when 'MANAGER' then sal+800 else sal+400 end from emp001;
6.Zookeeper专题
6.1 Zookeeper概念
1.Zookeeper作用:
Zookeeper是针对大型分布式系统的高可靠的协调系统。 由这个定义我们知道 zookeeper是个协调系统,作用的对象是分布式系统。 zookeeper主要是文件系统和通知机制 文件系统主要是用来存储数据 通知机制主要是服务器或者客户端进行通知,并且监督
2.Zookeeper文件系统节点类型
有4种类型的znode 1、PERSISTENT--持久化目录节点 客户端与zookeeper断开连接后,该节点依旧存在 2、PERSISTENT_SEQUENTIAL-持久化顺序编号目录节点 客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号 3、EPHEMERAL-临时目录节点 客户端与zookeeper断开连接后,该节点被删除 4、EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点 客户端与zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号
3.Zookeeper特点
一个leader,多个follower的集群 集群只要有半数以上包括半数就可正常服务,一般安装奇数台服务器 全局数据一致,每个服务器都保存同样的数据,实时更新 更新的请求顺序保持顺序(来自同一个服务器) 数据更新的原子性,数据要么成功要么失败 数据实时更新性很快
6.2 Zookeeper应用场景
1.服务动态上下线通知 2.分布式锁 3.数据配置 4.集群管理
6.3 Zookeeper原理
PAXOS算法:
参考:https://www.cnblogs.com/linbingdong/p/6253479.html
ZAB算法:
参考:https://blog.csdn.net/liuchang19950703/article/details/111406622
6.4 Zookeeper实现服务动态上下线通知原理
如图:
6.5 Zookeeper实现分布式锁原理
如图:
6.6 Zookeeper常用命令
1、zk服务命令
启动ZK服务: bin/zkServer.sh start
查看ZK服务状态: bin/zkServer.sh status
停止ZK服务: bin/zkServer.sh stop
重启ZK服务: bin/zkServer.sh restart
连接服务器: zkCli.sh -server 127.0.0.1:2181
2、连接zk
启动ZooKeeper服务之后,我们可以使用如下命令连接到 ZooKeeper 服务:
eg、zookeeper-3.4.8\bin>zkCli.cmd -server 127.0.0.1:2181
Linux环境下:
eg、zkCli.sh -server 127.0.0.1:2181
连接成功后,系统会输出 ZooKeeper 的相关环境以及配置信息,如下:
3、zk客户端命令
我们可以使用 help命令来查看帮助:
命令行工具的一些常用操作命令如下:
1.ls – 查看某个目录包含的所有文件,例如:
[zk: 127.0.0.1:2181(CONNECTED) 1] ls /
2.ls2 – 查看某个目录包含的所有文件,与ls不同的是它查看到time、version等信息,例如:
[zk: 127.0.0.1:2181(CONNECTED) 1] ls2 /
3.create – 创建znode,并设置初始内容,例如:
[zk: 127.0.0.1:2181(CONNECTED) 1] create /test “test”
Created /test
创建一个新的 znode节点“ test ”以及与它关联的字符串
4.get – 获取znode的数据,如下:
[zk: 127.0.0.1:2181(CONNECTED) 1] get /test
5.set – 修改znode内容,例如:
[zk: 127.0.0.1:2181(CONNECTED) 1] set /test “ricky”
6.delete – 删除znode,例如:
[zk: 127.0.0.1:2181(CONNECTED) 1] delete /test
7.quit – 退出客户端
8.help – 帮助命令
7 Flume专题
7.1Flume简介
Flume是一种分布式的、可靠的、可用的服务,用于高效地收集、聚合和移动大量的日志数据。它具有基于流数据流的简单而灵活的架构。它具有可调的可靠性机制和许多故障转移和恢复机制,具有健壮性和容错能力。它使用一个简单的可扩展数据模型,允许在线分析应用程序。
Flume中的数据传递被称为event事件,event就是数据流单元。Flume中的agent被称为代理,agent的本质是一个(JVM)进程,每个agent中包含了source,channel,sink这几个组件,这些组件会把数据从一个地方(source)采集到目的地(sink)中(被称为一个hop,跳)。
7.2 Sources,Channels,Sinks配置
Flume的source配置,见http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-sources
Flume的channel配置,见
http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-channels
Flume的sink配置,见
http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-sinks
7.3 可靠性
在每个agent中,event都会暂存在channel中。然后将event传递给下一个agent或是终端存储库中(如sink的类型为HDFS时)。这些event在存储到下一个agent的channel中或是存储到终端存储中(如HDFS)中后,才会在当前agent的channel中将event删除。这样只有在将事件存储到下一个代理的通道或终端存储库中之后,它们才会从通道中删除。这种方式提供了Flume在消息传递时的端到端可靠性。
7.4 可恢复性
当消息传递失败时,event由于已经暂存在channel中,可以从channel中恢复。Flume支持持久化channel(比如采用本地文件系统作为channel),如果追求性能,也可采用memory作为channel,但这样有可能存在数据丢失无法恢复的情况。
7.5 Flume配置文件与用法举例
# example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 6666 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
上面的配置文件定义了一个agent的name为a1,a1的source监听6666端口,并且读取6666端口传过来的数据, a1的channel 采用内存作为缓存,a1的sink 类型为logs,具体含义可以参考官网,或是留言。
在flume的安装目录下执行如下命令,即可使用flume采集数据:
$ bin/flume-ng agent -n a1 -c conf -f conf/netcat2logger.conf -Dflume.root.logger=INFO,console
flume-ng agent :表示flume的启动一个agent,ng是表示这是new的版本命令
-n a1:-n 表示name ,a1表示agent的名字为a1 对应配置文件中的a1
-c conf :表示flume的配置文件目录所在位置
-f conf/netcat2logger.conf: 表示自定义的数据采集配置文件位置。
-Dflume.root.logger=INFO,console:表示我们制定flume的日志格式,并且输出到控制台。