如何快速把hdfs数据动态导入到hive表

简介:

1. hdfs 文件

 
1
{ "retCode" : 1 , "retMsg" : "Success" , "data" :[{ "secID" : "000001.XSHE" , "ticker" : "000001" , "secShortName" : "深发展A" , "exchangeCD" : "XSHE" , "tradeDate" : "1991-10-21" , "preClosePrice" : 24 , "actPreClosePrice" : 24 , "openPrice" : 24 , "highestPrice" : 24.4 , "lowestPrice" : 23.85 , "closePrice" : 23.9 , "turnoverVol" : 355700 , "turnoverValue" : 8582250 , "turnoverRate" : 0.0058 , "accumAdjFactor" : 0.0117201563 , "negMarketValue" : 1462295257.8 , "marketValue" : 2145064267.7 , "PB" : 2.2666 , "isOpen" : 1 },{ "secID" : "000002.XSHE" , "ticker" : "000002" , "secShortName" : "深万科A" , "exchangeCD" : "XSHE" , "tradeDate" : "1991-10-21" , "preClosePrice" : 8 , "actPreClosePrice" : 8 , "openPrice" : 8 , "highestPrice" : 8 , "lowestPrice" : 7.7 , "closePrice" : 7.9 , "turnoverVol" : 375000 , "turnoverValue" : 2944200 , "turnoverRate" : 0.0066 , "accumAdjFactor" : 0.0117337592 , "negMarketValue" : 451011000 , "marketValue" : 615927450 , "PB" : 1.0001 , "isOpen" : 1 },{ "secID" : "000004.XSHE" , "ticker" : "000004" , "secShortName" : "深安达A" , "exchangeCD" : "XSHE" , "tradeDate" : "1991-10-21" , "preClosePrice" : 7.25 , "actPreClosePrice" : 7.25 , "openPrice" : 7.25 , "highestPrice" : 7.25 , "lowestPrice" : 7.2 , "closePrice" : 7.2 , "turnoverVol" : 92000 , "turnoverValue" : 665125 , "turnoverRate" : 0.0078 , "accumAdjFactor" : 0.2649084628 , "negMarketValue" : 84977100 , "marketValue" : 175500000 , "PB" : 7.4199 , "isOpen" : 1 },{ "secID" : "000005.XSHE" , "ticker" : "000005" , "secShortName" : "深原野A" , "exchangeCD" : "XSHE" , "tradeDate" : "1991-10-21" , "preClosePrice" : 6.46 , "actPreClosePrice" : 6.46 , "openPrice" : 6.49 , "highestPrice" : 6.49 , "lowestPrice" : 6.49 , "closePrice" : 6.49 , "turnoverVol" : 94500 , "turnoverValue" : 613305 , "turnoverRate" : 0.0021 , "accumAdjFactor" : 0.1016459912 , "negMarketValue" : 287756865 , "marketValue" : 584100000 , "PB" : 9.1783 , "isOpen" : 1 },{ "secID" : "000009.XSHE" , "ticker" : "000009" , "secShortName" : "深宝安A" , "exchangeCD" : "XSHE" , "tradeDate" : "1991-10-21" , "preClosePrice" : 5.75 , "actPreClosePrice" : 5.75 , "openPrice" : 5.7 , "highestPrice" : 5.8 , "lowestPrice" : 5.65 , "closePrice" : 5.75 , "turnoverVol" : 767500 , "turnoverValue" : 4382245 , "turnoverRate" : 0.0084 , "accumAdjFactor" : 0.1026538759 , "negMarketValue" : 524745000 , "marketValue" : 1293922500 , "PB" : 2.4503 , "isOpen" : 1 },{ "secID" : "600601.XSHG" , "ticker" : "600601" , "secShortName" : "延中实业" , "exchangeCD" : "XSHG" , "tradeDate" : "1991-10-21" , "preClosePrice" : 65.7 , "actPreClosePrice" : 65.7 , "openPrice" : 66.4 , "highestPrice" : 66.4 , "lowestPrice" : 66.4 , "closePrice" : 66.4 , "turnoverVol" : 5333 , "turnoverValue" : 354111 , "dealAmount" : 81 , "turnoverRate" : 0.0053 , "accumAdjFactor" : 0.0010592167 , "negMarketValue" : 66400000 , "marketValue" : 66400000 , "PB" : 40.7703 , "isOpen" : 1 },{ "secID" : "600602.XSHG" , "ticker" : "600602" , "secShortName" : "真空电子" , "exchangeCD" : "XSHG" , "tradeDate" : "1991-10-21" , "preClosePrice" : 640.6 , "actPreClosePrice" : 640.6 , "openPrice" : 647 , "highestPrice" : 647 , "lowestPrice" : 647 , "closePrice" : 647 , "turnoverVol" : 2589 , "turnoverValue" : 1675083 , "dealAmount" : 227 , "turnoverRate" : 0.0051 , "accumAdjFactor" : 0.0019640692 , "negMarketValue" : 330552300 , "marketValue" : 1294000000 , "PB" : 287.6707 , "isOpen" : 1 },{ "secID" : "600651.XSHG" , "ticker" : "600651" , "secShortName" : "飞乐音响" , "exchangeCD" : "XSHG" , "tradeDate" : "1991-10-21" , "preClosePrice" : 119.6 , "actPreClosePrice" : 119.6 , "openPrice" : 120.8 , "highestPrice" : 120.8 , "lowestPrice" : 120.8 , "closePrice" : 120.8 , "turnoverVol" : 1102 , "turnoverValue" : 133122 , "dealAmount" : 14 , "turnoverRate" : 0.0022 , "accumAdjFactor" : 0.0008192464 , "negMarketValue" : 60400000 , "marketValue" : 60400000 , "PB" : 39.6397 , "isOpen" : 1 },{ "secID" : "600652.XSHG" , "ticker" : "600652" , "secShortName" : "爱使电子" , "exchangeCD" : "XSHG" , "tradeDate" : "1991-10-21" , "preClosePrice" : 83.2 , "actPreClosePrice" : 83.2 , "openPrice" : 0 , "highestPrice" : 0 , "lowestPrice" : 0 , "closePrice" : 83.2 , "turnoverVol" : 0 , "turnoverValue" : 0 , "dealAmount" : 0 , "turnoverRate" : 0 , "accumAdjFactor" : 0.0006920481 , "negMarketValue" : 22464000 , "marketValue" : 22464000 , "PB" : 33.8019 , "isOpen" : 0 },{ "secID" : "600653.XSHG" , "ticker" : "600653" , "secShortName" : "申华电工" , "exchangeCD" : "XSHG" , "tradeDate" : "1991-10-21" , "preClosePrice" : 103.4 , "actPreClosePrice" : 103.4 , "openPrice" : 104.4 , "highestPrice" : 104.4 , "lowestPrice" : 104.4 , "closePrice" : 104.4 , "turnoverVol" : 240 , "turnoverValue" : 25056 , "dealAmount" : 4 , "turnoverRate" : 0.0005 , "accumAdjFactor" : 0.0009289199 , "negMarketValue" : 52200000 , "marketValue" : 52200000 , "PB" : 97.279 , "isOpen" : 1 },{ "secID" : "600654.XSHG" , "ticker" : "600654" , "secShortName" : "飞乐股份" , "exchangeCD" : "XSHG" , "tradeDate" : "1991-10-21" , "preClosePrice" : 633.2 , "actPreClosePrice" : 633.2 , "openPrice" : 639.5 , "highestPrice" : 639.5 , "lowestPrice" : 639.5 , "closePrice" : 639.5 , "turnoverVol" : 101 , "turnoverValue" : 64590 , "dealAmount" : 26 , "turnoverRate" : 0.0048 , "accumAdjFactor" : 0.000663586 , "negMarketValue" : 13429500 , "marketValue" : 134358950 , "PB" : 282.9834 , "isOpen" : 1 },{ "secID" : "600656.XSHG" , "ticker" : "600656" , "secShortName" : "浙江凤凰" , "exchangeCD" : "XSHG" , "tradeDate" : "1991-10-21" , "preClosePrice" : 1242.9 , "actPreClosePrice" : 1242.9 , "openPrice" : 1255.3 , "highestPrice" : 1255.3 , "lowestPrice" : 1255.3 , "closePrice" : 1255.3 , "turnoverVol" : 140 , "turnoverValue" : 175742 , "dealAmount" : 7 , "turnoverRate" : 0.0031 , "accumAdjFactor" : 0.0007136096 , "negMarketValue" : 56502308.3 , "marketValue" : 321798665.6 , "PB" :- 604.4303 , "isOpen" : 1 }]}
1
  

2. 创建 hive 临时表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
CREATE EXTERNAL TABLE  if  not exists sensitop.equd_json_tmp (
   retCode string,
   retMsg string,
   data array<struct<
               secID: string,
               tradeDate: date,
               ticker: string,
               secShortName: string,
               exchangeCD: string,
               preClosePrice:  double ,
               actPreClosePrice:  double ,
               openPrice:  double ,
               highestPrice:  double ,
               lowestPrice:  double ,
               closePrice:  double ,
               turnoverVol:  double ,
               turnoverValue:  double ,
               dealAmount:  int ,
               turnoverRate:  double ,
               accumAdjFactor:  double ,
               negMarketValue:  double ,
               marketValue:  double ,
               PE:  double ,
               PE1:  double ,
               PB:  double ,
               isOpen:  int >>)
ROW FORMAT SERDE  'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION  'hdfs://hdfs1.wdp:8020/sensitop/finance/equd' ;
1
  

3. 创建 hive 表

1
需要把上面表里数组里的数据一条一条放入这个表:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
CREATE TABLE  if  not exists sensitop.equd_h(
                         secID string,
                         ticker string,
                         secShortName string,
                         exchangeCD string,
                         tradeDate date,
                         preClosePrice  double ,
                         actPreClosePrice  double ,
                         openPrice  double ,
                         highestPrice  double ,
                         lowestPrice  double ,
                         closePrice  double ,
                         turnoverVol  double ,
                         turnoverValue  double ,
                         dealAmount  int ,
                         turnoverRate  double ,
                         accumAdjFactor  double ,
                         negMarketValue  double ,
                         marketValue  double ,
                         PE  double ,
                         PE1  double ,
                         PB  double ,
                         isOpen  int )
partitioned by (year string)
ROW FORMAT SERDE  'org.apache.hive.hcatalog.data.JsonSerDe'
1
  
1
然后新建一个最张表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
CREATE TABLE  if  not exists sensitop.equd(
                         secID string,
                         ticker string,
                         secShortName string,
                         exchangeCD string,
                         tradeDate date,
                         preClosePrice  double ,
                         actPreClosePrice  double ,
                         openPrice  double ,
                         highestPrice  double ,
                         lowestPrice  double ,
                         closePrice  double ,
                         turnoverVol  double ,
                         turnoverValue  double ,
                         dealAmount  int ,
                         turnoverRate  double ,
                         accumAdjFactor  double ,
                         negMarketValue  double ,
                         marketValue  double ,
                         PE  double ,
                         PE1  double ,
                         PB  double ,
                         isOpen  int )
partitioned by (year string)
1
  
1
<strong>注意:这里的字段顺序和上面临时表的顺序要一致。</strong>
1
  

4. 用 Partition 更新数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
insert overwrite table sensitop.equd_tmp
partition (year= '2016' )
select b.dt.secID,
b.dt.ticker,
b.dt.secShortName,
b.dt.exchangeCD,
b.dt.tradeDate,
b.dt.preClosePrice,
b.dt.actPreClosePrice,
b.dt.openPrice,
b.dt.highestPrice,
b.dt.lowestPrice,
b.dt.closePrice,
b.dt.turnoverVol,
b.dt.turnoverValue,
b.dt.dealAmount,
b.dt.turnoverRate,
b.dt.accumAdjFactor,
b.dt.negMarketValue,
b.dt.marketValue,
b.dt.PE,
b.dt.PE1,
b.dt.PB,
b.dt.isOpen
from sensitop.equd_json_tmp LATERAL VIEW explode(equd_json_tmp.data) b AS dt
where dt.tradedate >=  '2016-01-01'  and dt.tradedate <=  '2016-12-31' ;
1
  
1
  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
insert overwrite table sensitop.equd
partition (year= '2016' )
select secID,
ticker,
secShortName,
exchangeCD,
tradeDate,
preClosePrice,
actPreClosePrice,
openPrice,
highestPrice,
lowestPrice,
closePrice,
turnoverVol,
turnoverValue,
dealAmount,
turnoverRate,
accumAdjFactor,
negMarketValue,
marketValue,
PE,
PE1,
PB,
isOpen
from sensitop.equd_tmp dt
where year =  '2016' ;
1
  

5. 用nifi实现动态插入数据

NewImage
1
  
1
这里有二个分支,左边一个是每天 20 00 更新当年的partion; 右边一个是更新 1990  到  2015  年的数据,而且只需要更新一次。
1
  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
insert overwrite table sensitop.equd_h
partition (year= '${year}' )
select b.dt.secID,
b.dt.ticker,
b.dt.secShortName,
b.dt.exchangeCD,
b.dt.tradeDate,
b.dt.preClosePrice,
b.dt.actPreClosePrice,
b.dt.openPrice,
b.dt.highestPrice,
b.dt.lowestPrice,
b.dt.closePrice,
b.dt.turnoverVol,
b.dt.turnoverValue,
b.dt.dealAmount,
b.dt.turnoverRate,
b.dt.accumAdjFactor,
b.dt.negMarketValue,
b.dt.marketValue,
b.dt.PE,
b.dt.PE1,
b.dt.PB,
b.dt.isOpen
from sensitop.equd_json_tmp LATERAL VIEW explode(equd_json_tmp.data) b AS dt
where dt.tradedate >=  '${year}-01-01'  and dt.tradedate <=  '${year}-12-31'
1
  
1
  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
insert overwrite table sensitop.equd
partition (year= '${year}' )
select secID,
ticker,
secShortName,
exchangeCD,
tradeDate,
preClosePrice,
actPreClosePrice,
openPrice,
highestPrice,
lowestPrice,
closePrice,
turnoverVol,
turnoverValue,
dealAmount,
turnoverRate,
accumAdjFactor,
negMarketValue,
marketValue,
PE,
PE1,
PB,
isOpen
from sensitop.equd_tmp dt
where year =  '${year}' 
1
  

 

 本文转自疯吻IT博客园博客,原文链接:http://www.cnblogs.com/fengwenit/p/6022599.html,如需转载请自行联系原作者


目录
相关文章
|
2月前
|
SQL 存储 分布式计算
HDFS数据(跨集群)迁移
HDFS数据(跨集群)迁移
|
2月前
|
SQL 物联网 数据处理
"颠覆传统,Hive SQL与Flink激情碰撞!解锁流批一体数据处理新纪元,让数据决策力瞬间爆表,你准备好了吗?"
【8月更文挑战第9天】数据时代,实时性和准确性至关重要。传统上,批处理与流处理各司其职,但Apache Flink打破了这一界限,尤其Flink与Hive SQL的结合,开创了流批一体的数据处理新时代。这不仅简化了数据处理流程,还极大提升了效率和灵活性。例如,通过Flink SQL,可以轻松实现流数据与批数据的融合分析,无需在两者间切换。这种融合不仅降低了技术门槛,还为企业提供了更强大的数据支持,无论是在金融、电商还是物联网领域,都将发挥巨大作用。
45 6
|
2月前
|
SQL 关系型数据库 HIVE
实时计算 Flink版产品使用问题之如何将PostgreSQL数据实时入库Hive并实现断点续传
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
SQL 分布式计算 数据处理
实时计算 Flink版产品使用问题之怎么将数据从Hive表中读取并写入到另一个Hive表中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
分布式计算 Hadoop
|
2月前
|
SQL 存储 分布式计算
|
2月前
|
SQL 存储 监控
Hive 插入大量数据
【8月更文挑战第15天】
|
5月前
|
SQL 数据采集 数据挖掘
大数据行业应用之Hive数据分析航班线路相关的各项指标
大数据行业应用之Hive数据分析航班线路相关的各项指标
175 1
|
3月前
|
SQL 分布式计算 大数据
大数据处理平台Hive详解
【7月更文挑战第15天】Hive作为基于Hadoop的数据仓库工具,在大数据处理和分析领域发挥着重要作用。通过提供类SQL的查询语言,Hive降低了数据处理的门槛,使得具有SQL背景的开发者可以轻松地处理大规模数据。然而,Hive也存在查询延迟高、表达能力有限等缺点,需要在实际应用中根据具体场景和需求进行选择和优化。

热门文章

最新文章