hive 使用技巧笔记

简介:

 

例子:

INSERT OVERWRITE TABLE prices_collected_${hiveconf:wid_version}

select 

pc.collect_id as product_id ,

regexp_extract(pc.price,'(\\d*\\.?\\d+)',1) as price ,

pc.region,

'' as location_area_code,

'' as city_code,

from_unixtime(unix_timestamp() , 'yyyy-MM-dd hh:mm:ss') as created_at,

from_unixtime(unix_timestamp() , 'yyyy-MM-dd hh:mm:ss') as updated_at

from products_compared_${hiveconf:wid_version} as pc

 

1.根据hive执行的参数来动态的设置表名称 prices_collected_${hiveconf:wid_version}

hive -hiveconf wid_version='4' 

则可以通过${hiveconft:wid_version}来接收参数,生成prices_collected_4这张表

 

2. 使用正则表达式获取需要的信息,如:获取一段字符串中的数字

regexp_extract(pc.price,'(\\d*\\.?\\d+)',1) as price

注意hive中需要使用双斜杠来处理正则表达式

 

3. 获取系统时间

from_unixtime(unix_timestamp() , 'yyyy-MM-dd hh:mm:ss') as created_a

使用from_unixtime(unix_timestamp() , 'yyyy-MM-dd hh:mm:ss') 获取系统时间,格式可以根据需要调整

 

4. 多个表进行join的时候,可能会报错

使用set hive.auto.convert.join=false;解决

 

5. 创建表

create table if not exists brands (

  name string,

  created_at string,

  updated_at string

)

ROW FORMAT DELIMITED

FIELDS TERMINATED BY '\t'

ESCAPED BY '\\'

STORED AS TEXTFILE;

以文本方式进行存储,"\\"进行转义,"\t"作为换行符

 

6.到处hive中的某个表中的数据到本地,执行hive命令如下:

hive 

-hiveconf local_path=/home/hive/hive_data/products_24_1 

-hiveconf hive_table=products_24_1 

-hiveconf columnstr=' name , created_at,  updated_at,   "released" as status ' 

-f /home/hive/export_hive_table_to_local.sql

 

需要执行的参数依次是

1.导出到本地的位置local_path

2.导出hive中的哪个表 hive_table

3. 导出products_24_1 表中的哪些字段 colunmstr

4. 根据上面的参数,在本地创建products_24_1 表,使用-f来指定调用的文件

 

/home/hive/export_hive_table_to_local.sql 文件内容如下:

insert overwrite local directory '${hiveconf:local_path}' 

ROW FORMAT DELIMITED

FIELDS TERMINATED BY '\t'

ESCAPED BY '\\'

STORED AS TEXTFILE

select ${hiveconf:columnstr}

from ${hiveconf:hive_table};

 

7.将本地文件导入到psql数据库中, hive对pg的支持不好,不能用sqoop来进行数据的导入,可以先将hive中的数据读到本地,在使用python脚本来进行文件的写入

Python代码  收藏代码

  1. def insert_to_pg(conn , table_name , file_path , insert_columns=None):  

  2.   conn = psycopg2.connect(conn)  

  3.   cursor = conn.cursor()  

  4.   if os.path.isfile( file_path ):  

  5.     datafile=ReadFileProgress(file_path)  

  6.     cursor.copy_from(file=datafile, table=table_name, sep='\t', null='\\N', size=81920, columns=insert_columns)  

  7.     datafile.close()  

 

Python代码  收藏代码

  1. #!/usr/bin/python  

  2. # #_*_ coding: utf-8 _*_  

  3.   

  4. import os , sys  

  5. import psycopg2  

  6.   

  7. class ReadFileProgress:  

  8.   

  9.   def __init__(self, filename):  

  10.     self.datafile = open(filename)  

  11.     self.totalRecords = 0  

  12.     self.totalBytes = os.stat(filename).st_size  

  13.     self.readBytes = 0  

  14.     self.datafile.readline()  

  15.     i = 0  

  16.     for i, l in enumerate(self.datafile):  

  17.       pass  

  18.     self.totalRecords = i + 1  

  19.     sys.stderr.write("Number of records: %d\n" % (self.totalRecords))  

  20.     self.datafile.seek(0)  

  21.     self.datafile.readline()  

  22.     self.perc5 = self.totalBytes / 20.0  

  23.     self.perc5count = 0  

  24.     self.lastPerc5 = 0  

  25.     sys.stderr.write("Writing records: 0%")  

  26.   

  27.   def countBytes(self, size=0):  

  28.     self.readBytes += size  

  29.     if (self.readBytes - self.lastPerc5 >= self.perc5):  

  30.       self.lastPerc5 = self.readBytes  

  31.       if (int(self.readBytes / self.perc5) == 5):  

  32.         sys.stderr.write("25%")  

  33.       elif (int(self.readBytes / self.perc5) == 10):  

  34.         sys.stderr.write("50%")  

  35.       elif (int(self.readBytes / self.perc5) == 15):  

  36.         sys.stderr.write("75%")  

  37.       else:  

  38.         sys.stderr.write(".")  

  39.   

  40.       sys.stderr.flush()  

  41.   

  42.   def readline(self, size=None):  

  43.     countBytes(size)  

  44.     return self.datafile.readline(size)  

  45.    

  46.   def read(self, size=None):  

  47.     self.countBytes(size)  

  48.     return self.datafile.read(size)  

  49.   

  50.   def close(self):  

  51.     sys.stderr.write("100%\n")  

  52.     self.datafile.close()  

 

8. 从pg上导出指定表

Python代码  收藏代码

  1. def do_export(conn , table_name , file_path , columns=None):  

  2.   conn = psycopg2.connect(conn)  

  3.   cursor = conn.cursor()  

  4.   cursor.copy_to(file=file(file_path , 'w'), table=table_name, sep='\t', null='\\N', columns=columns)  

  5.   cursor.close()  

  6.   conn.commit()  

  7.   sys.stdout.write("Transaction finished successfully.\n")  

 

9. 则select语句中也可以通过hiveconf来传递参数,执行hive命令

hive -hiveconf name='hello hive'

INSERT OVERWRITE TABLE companies

select 

  '${hiveconf:name}' as name 

from companies_old



本文转自 SimplePoint 51CTO博客,原文链接:http://blog.51cto.com/2226894115/1898261,如需转载请自行联系原作者

相关文章
|
7月前
|
SQL 关系型数据库 HIVE
sqoop笔记——一次从Hive到PostgreSql的数据迁移
sqoop笔记——一次从Hive到PostgreSql的数据迁移
318 0
|
SQL 存储 大数据
大数据开发笔记(四):Hive分区详解
在Hive Select查询中一般会扫描整个表内容,会消耗很多时间做没必要的工作。有时候只需要扫描表中关心的一部分数据,因此建表时引入了partition概念。
317 0
大数据开发笔记(四):Hive分区详解
|
SQL 存储 分布式计算
大数据开发笔记(四):Hive数据仓库
Hive主要解决海量结构化日志的数据统计分析,它是hadoop上的一种数据仓库工具,可以将结构化的数据文件映射成一张表,并提供类似于SQL的查询方式,本质上来说是将Hive转化成MR程序。
186 0
大数据开发笔记(四):Hive数据仓库
|
SQL 缓存 负载均衡
|
存储 SQL 数据库
Hive笔记
此文是学习《Hive编程指南》时所作的笔记,用于加深理解。
2470 0
|
SQL API HIVE
|
SQL 分布式计算 Hadoop
Hive数据压缩笔记
Hive数据压缩 本文介绍Hadoop系统中Hive数据压缩方案的比较结果及具体压缩方法。
879 0
|
8月前
|
SQL 数据采集 数据挖掘
大数据行业应用之Hive数据分析航班线路相关的各项指标
大数据行业应用之Hive数据分析航班线路相关的各项指标
220 1
|
3月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
92 0
|
6月前
|
SQL 分布式计算 大数据
大数据处理平台Hive详解
【7月更文挑战第15天】Hive作为基于Hadoop的数据仓库工具,在大数据处理和分析领域发挥着重要作用。通过提供类SQL的查询语言,Hive降低了数据处理的门槛,使得具有SQL背景的开发者可以轻松地处理大规模数据。然而,Hive也存在查询延迟高、表达能力有限等缺点,需要在实际应用中根据具体场景和需求进行选择和优化。

热门文章

最新文章