01 引言
在前面的教程,已经初步了解了Hive
的数据模型、数据类型和操作的命令,有兴趣的同学可以参阅:
- 《Hive教程(01)- 初识Hive》
- 《Hive教程(02)- Hive安装》
- 《Hive教程(03)- Hive数据模型》
- 《Hive教程(04)- Hive数据类型》
- 《Hive教程(05)- Hive命令汇总》
既然我们知道的使用hive
将数据存储到的地方(数据模型)、存的数据类型以及存的方式(命令),那么这里会提出一个疑问,数据最终是会存入一个文件的,但是数据的来源是一个“对象”,这个过程是怎么转换的?本文来讲解下。
02 SerDe
2.1 概念
什么是SerDe
?其实他代表两种含义的缩写:
Serializer
序列化:是对象转换成字节序列的过程Deserializer
反序列化:是字节序列转换成对象的过程
序列化与反序列化的过程如下:
Serializer
序列化:数据行对象(Row object
)—> 序列化 —>OutputFileFormate
—>HDFS
文件Deserializer
反序列化:HDFS
文件 —>InputFileFormate
—> 反序列化 —> 数据行对象(Row object
)
SerDe
可以让Hive
从表中读取数据,然后以任意自定义格式把数据写回 HDFS
,并可以根据具体的数据格式开发自定义的SerDe
实现。
Hive
创建表时,指定数据的序列化和反序列化方式,模板如下:
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name [(col_name data_type [COMMENT col_comment], ...)] [COMMENT table_comment] [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] [CLUSTERED BY (col_name, col_name, ...) [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS] [ROW FORMAT row_format] [STORED AS file_format] [LOCATION hdfs_path]
2.2 分类
Hive SerDe分为内置的自定义的,下面来讲讲。
2.2.1 内置 SerDe 类型
Hive 使用下面的 FileFormat 类型来读写 HDFS 文件:
TextInputFormat/HiveIgnoreKeyTextOutputFormat
:该格式用于读写文本文件格式的数据。SequenceFileInputFormat/SequenceFileOutputFormat
:该格式用于读写 Hadoop 的序列文件格式。
内置的SerDe类型分类有:
- Avro
- ORC
- RegEx
- Thrift
- Parquet
- CSV
- JsonSerDe
2.2.1.1 MetadataTypedColumnsetSerDe
这个 SerDe
类型用于读写以某个分隔符分隔的记录。比如使用逗号分隔符的记录(CSV
),tab
键分隔符的记录。
2.2.1.2 LazySimpleSerDe
这个是默认的 SerDe
类型。读取与 MetadataTypedColumnsetSerDe
和 TCTLSeparatedProtocol
相同的数据格式。
可以用这个 Hive SerDe
类型,它是以惰性的方式创建对象的,因此具有更好的性能。
在 Hive 0.14.0
版本以后,在读写数据时它支持指定字符编码。例如:
ALTER TABLE person SET SERDEPROPERTIES (‘serialization.encoding’=’GBK’)
如果把配置属性 hive.lazysimple.extended_boolean_literal
设置为true
(Hive 0.14.0
以后版本),LazySimpleSerDe
可以把‘T’, ‘t’, ‘F’, ‘f’, ‘1’, and ‘0’
视为合法的布尔字面量。而该配置默认是false
的,因此它只会把‘True’
和‘False’
视为合法的布尔字面量。
2.2.1.3 Thrift SerDe
读写Thrift
序列化对象,可以使用这种 Hive SerDe
类型。需要确定的是,对于 Thrift
对象,类文件必须先被加载。
2.2.1.4 动态 SerDe
为了读写 Thrift
序列化对象,我们可以使用这种 SerDe
类型。
它可以理解 Thrift DDL
语句,所以对象的模式可以在运行时被提供。
另外,它支持许多不同的协议,包括 TBinaryProtocol
, TJSONProtocol
, TCTLSeparatedProtocol
。
2.2.2 自定义 SerDe 类型
2.2.2.1 步骤一:自定义SerDe
首先定义一个类, 继承抽象类 AbstractSerDe
, 实现 initialize
和 deserialize
两个方法。
注意:下面代码中,Hive
使用 ObjectInspector
对象分析行对象的内部结构以及列的结构,具体来说,ObjectInspector
给访问复杂的对象提供了一种统一的方式。对象可能以多种格式存储在内存中:
Java
类实例,Thrift
或者 原生Java
- 标准
Java
对象,比如Map
字段,我们使用java.util.List
表示Struct
和Array
,以及使用java.util.Map
- 惰性初始化对象
此外,可以通过 (ObjectInspector, java
对象) 这种结构表示一个复杂的对象。它为我们提供了访问对象内部字段的方法,而不涉及对象结构相关的信息。出了序列化的目的,Hive
建议为自定义 SerDes
创建自定义的 objectinspector
,SerDe
有两个构造器,一个无参构造器,一个常规构造器。
示例代码:
public class MySerDe extends AbstractSerDe { // params private List<String> columnNames = null; private List<TypeInfo> columnTypes = null; private ObjectInspector objectInspector = null; // seperator private String nullString = null; private String lineSep = null; private String kvSep = null; @Override public void initialize(Configuration conf, Properties tbl) throws SerDeException { // Read sep lineSep = "\n"; kvSep = "="; nullString = tbl.getProperty(Constants.SERIALIZATION_NULL_FORMAT, ""); // Read Column Names String columnNameProp = tbl.getProperty(Constants.LIST_COLUMNS); if (columnNameProp != null && columnNameProp.length() > 0) { columnNames = Arrays.asList(columnNameProp.split(",")); } else { columnNames = new ArrayList<String>(); } // Read Column Types String columnTypeProp = tbl.getProperty(Constants.LIST_COLUMN_TYPES); // default all string if (columnTypeProp == null) { String[] types = new String[columnNames.size()]; Arrays.fill(types, 0, types.length, Constants.STRING_TYPE_NAME); columnTypeProp = StringUtils.join(types, ":"); } columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProp); // Check column and types equals if (columnTypes.size() != columnNames.size()) { throw new SerDeException("len(columnNames) != len(columntTypes)"); } // Create ObjectInspectors from the type information for each column List<ObjectInspector> columnOIs = new ArrayList<ObjectInspector>(); ObjectInspector oi; for (int c = 0; c < columnNames.size(); c++) { oi = TypeInfoUtils .getStandardJavaObjectInspectorFromTypeInfo(columnTypes .get(c)); columnOIs.add(oi); } objectInspector = ObjectInspectorFactory .getStandardStructObjectInspector(columnNames, columnOIs); } @Override public Object deserialize(Writable wr) throws SerDeException { // Split to kv pair if (wr == null) return null; Map<String, String> kvMap = new HashMap<String, String>(); Text text = (Text) wr; for (String kv : text.toString().split(lineSep)) { String[] pair = kv.split(kvSep); if (pair.length == 2) { kvMap.put(pair[0], pair[1]); } } // Set according to col_names and col_types ArrayList<Object> row = new ArrayList<Object>(); String colName = null; TypeInfo type_info = null; Object obj = null; for (int i = 0; i < columnNames.size(); i++) { colName = columnNames.get(i); type_info = columnTypes.get(i); obj = null; if (type_info.getCategory() == ObjectInspector.Category.PRIMITIVE) { PrimitiveTypeInfo p_type_info = (PrimitiveTypeInfo) type_info; switch (p_type_info.getPrimitiveCategory()) { case STRING: obj = StringUtils.defaultString(kvMap.get(colName), ""); break; case LONG: case INT: try { obj = Long.parseLong(kvMap.get(colName)); } catch (Exception e) { } } } row.add(obj); } return row; } @Override public ObjectInspector getObjectInspector() throws SerDeException { return objectInspector; } @Override public SerDeStats getSerDeStats() { return null; } @Override public Class<? extends Writable> getSerializedClass() { return Text.class; } @Override public Writable serialize(Object arg0, ObjectInspector arg1) throws SerDeException { return null; } }
2.2.2.2 步骤二:hive添加Serde
使用自定义 Serde
类型:
hive > add jar MySerDe.jar
2.2.2.3 步骤三:使用Serde
创建表格时属性 row fromat
指定自定义的 SerDe
类:
CREATE EXTERNAL TABLE IF NOT EXISTS teacher ( id BIGINT, name STRING, age INT) ROW FORMAT SERDE 'com.coder4.hive.MySerDe' STORED AS TEXTFILE LOCATION '/usr/hive/text/'
03 文末
本文主要参考了以下文献:
本文完!