导入HDFS的数据到Hive

简介:

1. 通过Hive view

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
CREATE EXTERNAL TABLE  if  not exists finance.json_serde_optd_table (
   retCode string,
   retMsg string,
   data array<struct< secid:string,= ""  tradedate:date,= ""  optid:string,= ""  ticker:string,= ""  secshortname:string,= ""  exchangecd:string,= ""  presettleprice: double ,= ""  precloseprice: double ,= ""  openprice: double ,= ""  highestprice: double ,= ""  lowestprice: double ,= ""  closeprice: double ,= ""  settlprice: double ,= ""  turnovervol: double ,= ""  turnovervalue: double ,= ""  openint: int = "" >>)
ROW FORMAT SERDE  'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION  'hdfs://wdp.xxxxx.cn:8020/nifi/finance1/optd/' ;
create table  if  not exists finance.tb_optd
as
SELECT b.data.secID,
         b.data.tradeDate,
         b.data.optID,
         b.data.ticker,
         b.data.secShortName,
         b.data.exchangeCD,
         b.data.preSettlePrice,
         b.data.preClosePrice,
         b.data.openPrice,
         b.data.highestPrice,
         b.data.lowestPrice,
         b.data.closePrice,
         b.data.settlPrice,
         b.data.turnoverVol,
         b.data.turnoverValue,
         b.data.openInt
FROM finance.json_serde_optd_table LATERAL VIEW explode(json_serde_optd_table.data) b AS data;
1
  

2. 通过Zeppelin

 

1
2
%dep
z.load( "/usr/hdp/2.4.2.0-258/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar" );

 

1
2
3
4
// 定义导入的hive对象集合
 
case  class  HiveConfig(database: String, modelName: String, hdfsPath: String, schema: String, schema_tb: String);
var hiveConfigList = List[HiveConfig]();
1
  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 创建equd数据结构
// 定义json结构
val schema_json_equd_serde = "" "  retCode string,
                               retMsg string,
                               data array<struct< secid= ""  := ""  string,= ""  tradedate= ""  date,= ""  ticker= ""  secshortname= ""  exchangecd= ""  precloseprice= ""  double ,= ""  actprecloseprice:= ""  openprice= ""  highestprice= ""  lowestprice= ""  closeprice= ""  turnovervol= ""  turnovervalue= ""  dealamount= ""  int ,= ""  turnoverrate= ""  accumadjfactor= ""  negmarketvalue= ""  marketvalue= ""  pe= ""  pe1= ""  pb= ""  isopen= ""  int = "" >> "" ";
var schema_equd = "" "b.data.secID,
                     b.data.ticker,
                     b.data.secShortName,
                     b.data.exchangeCD,
                     b.data.tradeDate,
                     b.data.preClosePrice,
                     b.data.actPreClosePrice,
                     b.data.openPrice,
                     b.data.highestPrice,
                     b.data.lowestPrice,
                     b.data.closePrice,
                     b.data.turnoverVol,
                     b.data.turnoverValue,
                     b.data.dealAmount,
                     b.data.turnoverRate,
                     b.data.accumAdjFactor,
                     b.data.negMarketValue,
                     b.data.marketValue,
                     b.data.PE,
                     b.data.PE1,
                     b.data.PB,
                     b.data.isOpen "" ";
hiveConfigList  = hiveConfigList :+ HiveConfig( "finance" "equd" "hdfs://wdp.xxxxx.cn:8020/nifi/finance1/" , schema_json_equd_serde, schema_equd);

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 创建idxd数据结构
// 定义json结构
val schema_json_idxd_serde = "" "  retCode string,
                               retMsg string,
                               data array<struct< indexid:string,= ""  tradedate:date,= ""  ticker:string,= ""  porgfullname:string,= ""  secshortname:string,= ""  exchangecd:string,= ""  precloseindex: double ,= ""  openindex: double ,= ""  lowestindex: double ,= ""  highestindex: double ,= ""  closeindex: double ,= ""  turnovervol: double ,= ""  turnovervalue: double ,= ""  chg: double ,= ""  chgpct: double = "" >> "" ";
var schema_idxd = "" "b.data.indexID,
                     b.data.tradeDate,
                     b.data.ticker,
                     b.data.porgFullName,
                     b.data.secShortName,
                     b.data.exchangeCD,
                     b.data.preCloseIndex,
                     b.data.openIndex,
                     b.data.lowestIndex,
                     b.data.highestIndex,
                     b.data.closeIndex,
                     b.data.turnoverVol,
                     b.data.turnoverValue,
                     b.data.CHG,
                     b.data.CHGPct "" ";
hiveConfigList = hiveConfigList :+ HiveConfig( "finance" "idxd" "hdfs://wdp.xxxxx.cn:8020/nifi/finance1/" , schema_json_idxd_serde, schema_idxd);

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 循环加载数据中
   def loadDataToHive(args:HiveConfig){
     val loadPath = args.hdfsPath + args.modelName;
     val tb_json_serde =  "json_serde_"  + args.modelName + "_table" ;
     val tb=  "tb_"  + args.modelName;
     val hiveContext =  new  org.apache.spark.sql.hive.HiveContext(sc)
     if (args.database !=  ""  && args.schema !=  "" ) {
         print( "正在创建项目..."  + args.modelName)
         hiveContext.sql( "CREATE DATABASE IF NOT EXISTS "  + args.database);
         print( "正在构造扩展模型..." );
         hiveContext.sql( "CREATE TABLE IF NOT EXISTS "  + args.database +  "."  + tb_json_serde +  "("  + args.schema +  ") row format serde 'org.apache.hive.hcatalog.data.JsonSerDe' LOCATION "  "'"  + loadPath +  "/'" );
         println( "CREATE TABLE IF NOT EXISTS "  + args.database +  "."  + tb +  " as select "  + args.schema_tb +  " from "  + args.database +  "."  + tb_json_serde +  " LATERAL VIEW explode("  + tb_json_serde +  ".data) b AS data" );
         hiveContext.sql( "CREATE TABLE IF NOT EXISTS "  + args.database +  "."  + tb +  " as select "  + args.schema_tb +  " from "  + args.database +  "."  + tb_json_serde +  " LATERAL VIEW explode("  + tb_json_serde +  ".data) b AS data" );
         println(args.modelName +  " 扩展模型加载已完成!" );
     }
   }
   hiveConfigList.size;
   hiveConfigList.foreach { x => loadDataToHive(x) };

 

 3. 第二种取法

由于data是json数据里的一个数组,所以上面的转换复杂了一点。下面这种方法是先把json里data数组取出来放到hdfs,然后直接用下面的语句放到hive:

用splitjson 来提取、分隔 data 数组

NewImage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
CREATE EXTERNAL TABLE  if  not exists finance.awen_optd (
   secid string,
   tradedate date,
   optid string,
   ticker string,
   secshortname string,
   exchangecd string,
   presettleprice  double ,
   precloseprice  double ,
   openprice  double ,
   highestprice  double ,
   lowestprice  double ,
   closeprice  double ,
   settlprice  double ,
   turnovervol  double ,
   turnovervalue  double ,
   openint  int )
ROW FORMAT SERDE  'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION  'hdfs://wdp.xxxx.cn:8020/nifi/finance2/optd/' ;

 

 本文转自疯吻IT博客园博客,原文链接:http://www.cnblogs.com/fengwenit/p/5631455.html,如需转载请自行联系原作者


目录
相关文章
|
2月前
|
SQL 存储 分布式计算
HDFS数据(跨集群)迁移
HDFS数据(跨集群)迁移
|
2月前
|
SQL 关系型数据库 HIVE
实时计算 Flink版产品使用问题之如何将PostgreSQL数据实时入库Hive并实现断点续传
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
SQL 分布式计算 数据处理
实时计算 Flink版产品使用问题之怎么将数据从Hive表中读取并写入到另一个Hive表中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
SQL 存储 分布式计算
|
2月前
|
SQL 存储 监控
Hive 插入大量数据
【8月更文挑战第15天】
|
5月前
|
SQL 数据采集 数据挖掘
大数据行业应用之Hive数据分析航班线路相关的各项指标
大数据行业应用之Hive数据分析航班线路相关的各项指标
175 1
|
5月前
|
SQL 存储 大数据
【大数据技术Hadoop+Spark】Hive基础SQL语法DDL、DML、DQL讲解及演示(附SQL语句)
【大数据技术Hadoop+Spark】Hive基础SQL语法DDL、DML、DQL讲解及演示(附SQL语句)
235 0
|
5月前
|
SQL 分布式计算 数据库
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
203 0
|
3月前
|
SQL 分布式计算 大数据
大数据处理平台Hive详解
【7月更文挑战第15天】Hive作为基于Hadoop的数据仓库工具,在大数据处理和分析领域发挥着重要作用。通过提供类SQL的查询语言,Hive降低了数据处理的门槛,使得具有SQL背景的开发者可以轻松地处理大规模数据。然而,Hive也存在查询延迟高、表达能力有限等缺点,需要在实际应用中根据具体场景和需求进行选择和优化。