导入HDFS的数据到Hive-阿里云开发者社区

开发者社区> 嗯哼9925> 正文

导入HDFS的数据到Hive

简介:
+关注继续查看

1. 通过Hive view

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
CREATE EXTERNAL TABLE if not exists finance.json_serde_optd_table (
  retCode string,
  retMsg string,
  data array<struct< secid:string,="" tradedate:date,="" optid:string,="" ticker:string,="" secshortname:string,="" exchangecd:string,="" presettleprice:double,="" precloseprice:double,="" openprice:double,="" highestprice:double,="" lowestprice:double,="" closeprice:double,="" settlprice:double,="" turnovervol:double,="" turnovervalue:double,="" openint:int="">>)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION 'hdfs://wdp.xxxxx.cn:8020/nifi/finance1/optd/';
create table if not exists finance.tb_optd
as
SELECT b.data.secID,
        b.data.tradeDate,
        b.data.optID,
        b.data.ticker,
        b.data.secShortName,
        b.data.exchangeCD,
        b.data.preSettlePrice,
        b.data.preClosePrice,
        b.data.openPrice,
        b.data.highestPrice,
        b.data.lowestPrice,
        b.data.closePrice,
        b.data.settlPrice,
        b.data.turnoverVol,
        b.data.turnoverValue,
        b.data.openInt
FROM finance.json_serde_optd_table LATERAL VIEW explode(json_serde_optd_table.data) b AS data;
1
  

2. 通过Zeppelin

 

1
2
%dep
z.load("/usr/hdp/2.4.2.0-258/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar");

 

1
2
3
4
// 定义导入的hive对象集合
 
case class HiveConfig(database: String, modelName: String, hdfsPath: String, schema: String, schema_tb: String);
var hiveConfigList = List[HiveConfig]();
1
  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 创建equd数据结构
// 定义json结构
val schema_json_equd_serde ="""  retCode string,
                              retMsg string,
                              data array<struct< secid="" :="" string,="" tradedate="" date,="" ticker="" secshortname="" exchangecd="" precloseprice="" double,="" actprecloseprice:="" openprice="" highestprice="" lowestprice="" closeprice="" turnovervol="" turnovervalue="" dealamount="" int,="" turnoverrate="" accumadjfactor="" negmarketvalue="" marketvalue="" pe="" pe1="" pb="" isopen="" int="">>""";
var schema_equd ="""b.data.secID,
                    b.data.ticker,
                    b.data.secShortName,
                    b.data.exchangeCD,
                    b.data.tradeDate,
                    b.data.preClosePrice,
                    b.data.actPreClosePrice,
                    b.data.openPrice,
                    b.data.highestPrice,
                    b.data.lowestPrice,
                    b.data.closePrice,
                    b.data.turnoverVol,
                    b.data.turnoverValue,
                    b.data.dealAmount,
                    b.data.turnoverRate,
                    b.data.accumAdjFactor,
                    b.data.negMarketValue,
                    b.data.marketValue,
                    b.data.PE,
                    b.data.PE1,
                    b.data.PB,
                    b.data.isOpen""";
hiveConfigList  = hiveConfigList :+ HiveConfig("finance""equd""hdfs://wdp.xxxxx.cn:8020/nifi/finance1/", schema_json_equd_serde, schema_equd);

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 创建idxd数据结构
// 定义json结构
val schema_json_idxd_serde ="""  retCode string,
                              retMsg string,
                              data array<struct< indexid:string,="" tradedate:date,="" ticker:string,="" porgfullname:string,="" secshortname:string,="" exchangecd:string,="" precloseindex:double,="" openindex:double,="" lowestindex:double,="" highestindex:double,="" closeindex:double,="" turnovervol:double,="" turnovervalue:double,="" chg:double,="" chgpct:double="">>""";
var schema_idxd ="""b.data.indexID,
                    b.data.tradeDate,
                    b.data.ticker,
                    b.data.porgFullName,
                    b.data.secShortName,
                    b.data.exchangeCD,
                    b.data.preCloseIndex,
                    b.data.openIndex,
                    b.data.lowestIndex,
                    b.data.highestIndex,
                    b.data.closeIndex,
                    b.data.turnoverVol,
                    b.data.turnoverValue,
                    b.data.CHG,
                    b.data.CHGPct""";
hiveConfigList = hiveConfigList :+ HiveConfig("finance""idxd""hdfs://wdp.xxxxx.cn:8020/nifi/finance1/", schema_json_idxd_serde, schema_idxd);

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 循环加载数据中
  def loadDataToHive(args:HiveConfig){
    val loadPath = args.hdfsPath + args.modelName;
    val tb_json_serde = "json_serde_" + args.modelName +"_table";
    val tb= "tb_" + args.modelName;
    val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
    if(args.database != "" && args.schema != "") {
        print("正在创建项目..." + args.modelName)
        hiveContext.sql("CREATE DATABASE IF NOT EXISTS " + args.database);
        print("正在构造扩展模型...");
        hiveContext.sql("CREATE TABLE IF NOT EXISTS " + args.database + "." + tb_json_serde + "(" + args.schema + ") row format serde 'org.apache.hive.hcatalog.data.JsonSerDe' LOCATION " "'" + loadPath + "/'");
        println("CREATE TABLE IF NOT EXISTS " + args.database + "." + tb + " as select " + args.schema_tb + " from " + args.database + "." + tb_json_serde + " LATERAL VIEW explode(" + tb_json_serde + ".data) b AS data");
        hiveContext.sql("CREATE TABLE IF NOT EXISTS " + args.database + "." + tb + " as select " + args.schema_tb + " from " + args.database + "." + tb_json_serde + " LATERAL VIEW explode(" + tb_json_serde + ".data) b AS data");
        println(args.modelName + " 扩展模型加载已完成!");
    }
  }
  hiveConfigList.size;
  hiveConfigList.foreach { x => loadDataToHive(x) };

 

 3. 第二种取法

由于data是json数据里的一个数组,所以上面的转换复杂了一点。下面这种方法是先把json里data数组取出来放到hdfs,然后直接用下面的语句放到hive:

用splitjson 来提取、分隔 data 数组

NewImage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
CREATE EXTERNAL TABLE if not exists finance.awen_optd (
  secid string,
  tradedate date,
  optid string,
  ticker string,
  secshortname string,
  exchangecd string,
  presettleprice double,
  precloseprice double,
  openprice double,
  highestprice double,
  lowestprice double,
  closeprice double,
  settlprice double,
  turnovervol double,
  turnovervalue double,
  openint int)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION 'hdfs://wdp.xxxx.cn:8020/nifi/finance2/optd/';

 

 本文转自疯吻IT博客园博客,原文链接:http://www.cnblogs.com/fengwenit/p/5631455.html,如需转载请自行联系原作者


版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
关于hive数据导入方式的总结
从本地导入数据到hive: load data local inpath '/home/hive/tb_dw_cu_three_type_list_dtal/*.dat' overwrite into table csap.tb_dw_cu_three_type_list_dtal  partition(statis_date=20160121); 从HDFS直接导入数据到hive:
1979 0
android 向SD卡写入数据
原文: android 向SD卡写入数据 1.代码: /** * 向sdcard中写入文件 * @param filename 文件名 * @param content 文件内容 */ public ...
702 0
自建Hive数据仓库跨版本迁移到阿里云Databricks数据洞察
客户在IDC或者公有云环境自建Hadoop集群构建数据仓库和分析系统,购买阿里云Databricks数据洞察集群之后,涉及到数仓数据和元数据的迁移以及Hive版本的订正更新。
137 0
hive数据导入云hbase
网络环境 专线:用户需要把hbase集群的VPC相关网络信息配置到专线里面,可直通hbase环境 公有云虚拟机VPC环境:选择和hbase通VPC 其他:需要开hbase公网 注意:默认导入hbase数据,依赖的hbase-common、hbase-client、hbase-server、hbase-protocol使用社区的包即可。
5108 0
使用 www_fdw 插件向PG/PPAS导入数据
www_fdw 插件支持通过http协议把文本数据导入到PG/PPAS中 RDS PG/PPAS 新版本已经支持通过 www_fdw 下面简单介绍下使用方式 启动http服务 导入数据前,我们需要在文件服务器上启动一个http服务,再把要导入的文本数据放在对应目录 创建 www_fdw 插
3138 0
+关注
4716
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载