1 预先配置
在hive配置文件:%HIVE_HOME%/conf/hive-site.xml添加
代码语言:javascript
复制
<!-- 禁用 impersonation --> <property> <name>hive.server2.enable.doAs</name> <value>false</value> </property>
在 Hadoop 的配置文件中%HADOOP_HOME%/etc/hadoo/下的:core-site.xml 和 hdfs-site.xml添加
代码语言:javascript
复制
<property> <name>hadoop.proxyuser.root.groups</name> <value>*</value> </property> <property> <name>hadoop.proxyuser.root.hosts</name> <value>*</value> </property>
确保没有设置限制 root 用户的权限
修改访问数据库表person的权限
代码语言:javascript
复制
#hdfs dfs -chmod -R 775 /user/hive/warehouse/demo.db/person
由于Hive是数据仓库,而不是数据库,所以一般不支持增删改查,这里仅介绍如何通过Java来向Hive插入,查询数据。2 用Java来开发Hive应用
pom.xml
代码语言:javascript
复制
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.jerry</groupId> <artifactId>hive</artifactId> <version>0.0.1-SNAPSHOT</version> <description>Java How to connect Hivi</description> <dependencies> <!-- Hive JDBC Driver --> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>3.1.2</version> </dependency> <!-- Hadoop Common --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.2.2</version> </dependency> <!-- Hadoop Client --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.2.2</version> </dependency> </dependencies> </project>
Java文件
代码语言:javascript
复制
package com.jerry; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.util.HashMap; import java.util.Map; public class HiveClient { private static final String DRIVER_CLASS = "org.apache.hive.jdbc.HiveDriver"; private static final String CONNECTION_URL = "jdbc:hive2://192.168.31.184:10000/demo"; private static PreparedStatement preparedstatement; private static Statement statement; private static ResultSet resultSet = null; //链接 private Connection getConnection() throws SQLException { try { Class.forName(DRIVER_CLASS); Connection con = DriverManager.getConnection(CONNECTION_URL); statement = con.createStatement(); return con; } catch (ClassNotFoundException e) { e.printStackTrace(); throw new SQLException(e.getMessage()); } } //断开链接 public void disconnect(Connection con) throws SQLException { // Close resources resultSet.close(); statement.close(); con.close(); } //执行查询 public void query(String query) throws SQLException { // Execute a query resultSet = statement.executeQuery(query); } //带条件执行查询 public void query(Connection con,String query,Map<String, String> condition) throws SQLException { String where = " where "; int i = 0; int length = condition.size(); String[] valuearray= new String[length]; for (String key : condition.keySet()) { String value = condition.get(key); where = where+key+" = ? AND "; valuearray[i] = value; i++; } where = where + "1=1"; query = query + where; PreparedStatement preparedStatement = con.prepareStatement(query); for(int j=0;j<length;j++) { preparedStatement.setString(j+1, valuearray[j]); } resultSet = preparedStatement.executeQuery();; } //打印查询记录 public void printQueryResult(ResultSet resultSet) throws SQLException { //获取 ResultSet 的元数据 ResultSetMetaData metaData = resultSet.getMetaData(); // 获取列数 int columnCount = metaData.getColumnCount(); while (resultSet.next()) { for (int i=1;i<=columnCount;i++) { System.out.print(resultSet.getString(i)+","); } System.out.println(""); } } //查询并且打印数据 public void queryAndPrint(String query) throws SQLException { query(query); printQueryResult(resultSet); } //查询并且打印数据 public void queryAndPrint(Connection con,String query,Map<String, String> condition) throws SQLException { query(con,query,condition); printQueryResult(resultSet); } //添加数据 public void addDataToHiveTable(Connection con,String tableName,String[] newValue,String like,String map) { try { String insertSql = "INSERT INTO person SELECT ?,?,?,"+like+","+map; System.out.println(like); preparedstatement = con.prepareStatement(insertSql); preparedstatement.setInt(1, Integer.parseInt(newValue[0])); preparedstatement.setString(2, newValue[1]); preparedstatement.setInt(3, Integer.parseInt(newValue[2])); preparedstatement.executeUpdate(); } catch (SQLException e) { e.printStackTrace(); } } //将文件中的数据加载到表中 public void loadDataForLocal(String tableName,String path) throws SQLException { String query = "LOAD DATA LOCAL INPATH '"+path+"' INTO TABLE "+tableName; statement.execute(query); } //清空数据表 public void truncateTable(Connection con,String tableName) throws SQLException { String query = "truncate table "+tableName; con.setAutoCommit(true); // 确保自动提交 Statement statement = con.createStatement(); statement.execute(query); } public static void main(String[] args) throws SQLException { HiveClient hive = new HiveClient(); String tableName = "person"; String like = "array('basketball', 'music', 'dance')"; String map = "map('address','xxxx')"; String[] newAddValue = {"10","elite0","50"}; Connection con = hive.getConnection(); String query = "SELECT * FROM "+tableName; Map<String, String> condition = new HashMap<String, String>(); condition.put("name","elite0"); condition.put("age","50"); String inpath = "/home/jerry/hive/person"; try { System.out.println("全表查询:"); hive.queryAndPrint(query); hive.addDataToHiveTable(con,tableName,newAddValue,like,map); System.out.println("插入数据后全表查询:"); hive.queryAndPrint(query); System.out.println("条件查询:"); hive.queryAndPrint(con,query,condition); hive.truncateTable(con,tableName); System.out.println("清空表:"); hive.queryAndPrint(query); hive.loadDataForLocal(tableName,inpath); System.out.println("从文件中加载:"); hive.queryAndPrint(query); hive.disconnect(con); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
运行结果
代码语言:javascript
复制
全表查询: 1,elite0,10,["basketball","music","dance"],{"adderss":"xx"}, 2,elite1,20,["basketball","music","dance"],{"adderss":"xx"}, 3,elite2,10,["basketball","music","dance"],{"adderss":"xx"}, 4,elite3,20,["basketball","music","dance"],{"adderss":"xx"}, 5,elite4,10,["basketball","music","dance"],{"adderss":"xx"}, 6,elite5,20,["basketball","music","dance"],{"adderss":"xx"}, array('basketball', 'music', 'dance') 插入数据后全表查询: 10,elite0,50,["basketball","music","dance"],{"address":"xxxx"}, 1,elite0,10,["basketball","music","dance"],{"adderss":"xx"}, 2,elite1,20,["basketball","music","dance"],{"adderss":"xx"}, 3,elite2,10,["basketball","music","dance"],{"adderss":"xx"}, 4,elite3,20,["basketball","music","dance"],{"adderss":"xx"}, 5,elite4,10,["basketball","music","dance"],{"adderss":"xx"}, 6,elite5,20,["basketball","music","dance"],{"adderss":"xx"}, 条件查询: 10,elite0,50,["basketball","music","dance"],{"address":"xxxx"}, 清空表: 从文件中加载: 1,elite0,10,["basketball","music","dance"],{"adderss":"xx"}, 2,elite1,20,["basketball","music","dance"],{"adderss":"xx"}, 3,elite2,10,["basketball","music","dance"],{"adderss":"xx"}, 4,elite3,20,["basketball","music","dance"],{"adderss":"xx"}, 5,elite4,10,["basketball","music","dance"],{"adderss":"xx"}, 6,elite5,20,["basketball","music","dance"],{"adderss":"xx"},
3 用Python来开发Hive应用
pip3
代码语言:javascript
复制
pip install sasl pip install thrift pip install thrift-sasl pip install PyHive
Python
代码语言:javascript
复制
import pandas as pd from pyhive import hive from sqlalchemy import create_engine from pyhive import hive class Hive: def __init__(self): self.database= 'demo' self.host = '192.168.31.184' self.port = '10000' def getconnect(self): conn = hive.Connection(host=self.host, port=self.port,database=self.database) return conn; def getEngine(self): # 创建 Hive 数据库连接 hive_uri = f"hive://"+self.host+":"+self.port+"/"+self.database return create_engine(hive_uri) def disconnect(self,engine,conn): engine.dispose() conn.close() #执行查询 def query(self,sql,engine,condition=None): try: if condition is None: # 执行 SQL 查询 df = pd.read_sql(sql, engine) print(df) else: values = [] where = " where " for key in condition: where = where+key+" = %s and " values.append(condition[key]) where = where+"1=1" sql = sql + where params = tuple(values) df = pd.read_sql(sql, engine, params=params) print(df) except Exception as e: print("Error occurred:", e) #添加数据 def addDataToHiveTable(self,conn,tableName,data): like_array = f"array({', '.join(map(lambda x: f'\'{x}\'', data['like']))})" # 使用单引号包裹字符串 address_map = f"map('{list(data['address'].keys())[0]}', '{list(data['address'].values())[0]}')" # 创建 MAP 格式 # 创建游标 cursor = conn.cursor() insertSql = "INSERT INTO person SELECT %s,%s,%s,"+like_array+","+address_map # 执行插入操作 try: cursor.execute(insertSql, (data['id'], data['name'], data['age'])) except Exception as e: print(f"Error inserting data: {e}") conn.commit() cursor.close() #将文件中的数据加载到表中 def loadDataForLocal(self,conn,tableName,path): cursor = conn.cursor() query = "LOAD DATA LOCAL INPATH '"+path+"' INTO TABLE "+tableName cursor.execute(query) conn.commit() cursor.close() #清空数据表 def truncateTable(self,conn,tableName): cursor = conn.cursor() query = "truncate table "+tableName; #con.setAutoCommit(true) #确保自动提交 cursor.execute(query) conn.commit() cursor.close() if __name__ == "__main__": sql = "SELECT * FROM person" condition={"name":"elite1","age":"20"} # 准备要插入的数据 data = { 'id': "50", 'name': "Jerry", 'age': 50, # 确保这里是整数 'like': ["basketball", "music", "dance"], 'address': {"address": "xx"} } tableName = "person" path = "/home/jerry/hive/person" myhive = Hive() print("建立连接") conn = myhive.getconnect() engine = myhive.getEngine() print("全表查询") myhive.query(sql,engine) print("条件查询") myhive.query(sql,engine,condition) print("加数据进入表") myhive.addDataToHiveTable(conn,tableName,data) myhive.query(sql,engine) print("清空表中所有数据") myhive.truncateTable(conn,tableName) print("从文件中导入数据") myhive.loadDataForLocal(conn,tableName,path) myhive.query(sql,engine) print("断开连接") myhive.disconnect(engine,conn)
- connect:用于其他操作
- engine:用于查询
运行结果
代码语言:javascript
复制
建立连接 全表查询 id name age likes address 0 1 elite0 10 ["basketball","music","dance"] {"adderss":"xx"} 1 2 elite1 20 ["basketball","music","dance"] {"adderss":"xx"} 2 3 elite2 10 ["basketball","music","dance"] {"adderss":"xx"} 3 4 elite3 20 ["basketball","music","dance"] {"adderss":"xx"} 4 5 elite4 10 ["basketball","music","dance"] {"adderss":"xx"} 5 6 elite5 20 ["basketball","music","dance"] {"adderss":"xx"} 条件查询 id name age likes address 0 2 elite1 20 ["basketball","music","dance"] {"adderss":"xx"} 加数据进入表 id name age likes address 0 50 Jerry 50 ["basketball","music","dance"] {"address":"xx"} 1 1 elite0 10 ["basketball","music","dance"] {"adderss":"xx"} 2 2 elite1 20 ["basketball","music","dance"] {"adderss":"xx"} 3 3 elite2 10 ["basketball","music","dance"] {"adderss":"xx"} 4 4 elite3 20 ["basketball","music","dance"] {"adderss":"xx"} 5 5 elite4 10 ["basketball","music","dance"] {"adderss":"xx"} 6 6 elite5 20 ["basketball","music","dance"] {"adderss":"xx"} 清空表中所有数据 从文件中导入数据 id name age likes address 0 1 elite0 10 ["basketball","music","dance"] {"adderss":"xx"} 1 2 elite1 20 ["basketball","music","dance"] {"adderss":"xx"} 2 3 elite2 10 ["basketball","music","dance"] {"adderss":"xx"} 3 4 elite3 20 ["basketball","music","dance"] {"adderss":"xx"} 4 5 elite4 10 ["basketball","music","dance"] {"adderss":"xx"} 5 6 elite5 20 ["basketball","music","dance"] {"adderss":"xx"} 断开连接