用Java、Python来开发Hive应用

简介: 用Java、Python来开发Hive应用

1 预先配置

hive配置文件:%HIVE_HOME%/conf/hive-site.xml添加

代码语言:javascript

复制

<!-- 禁用 impersonation -->
<property>
    <name>hive.server2.enable.doAs</name>
    <value>false</value> 
</property>

Hadoop 的配置文件中%HADOOP_HOME%/etc/hadoo/下的:core-site.xml 和 hdfs-site.xml添加

代码语言:javascript

复制

<property>
    <name>hadoop.proxyuser.root.groups</name>
    <value>*</value>
</property>
<property>
    <name>hadoop.proxyuser.root.hosts</name>
    <value>*</value>
</property>

确保没有设置限制 root 用户的权限

修改访问数据库表person的权限

代码语言:javascript

复制

#hdfs dfs -chmod -R 775 /user/hive/warehouse/demo.db/person

由于Hive是数据仓库,而不是数据库,所以一般不支持增删改查,这里仅介绍如何通过Java来向Hive插入,查询数据。2 用Java来开发Hive应用

pom.xml

代码语言:javascript

复制

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.jerry</groupId>
  <artifactId>hive</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <description>Java How to connect Hivi</description>
    <dependencies>
        <!-- Hive JDBC Driver -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>3.1.2</version>
        </dependency>
        <!-- Hadoop Common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.2.2</version>
        </dependency>
        <!-- Hadoop Client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.2.2</version>
        </dependency>
    </dependencies>
</project>

Java文件

代码语言:javascript

复制

package com.jerry;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
public class HiveClient {
    private static final String DRIVER_CLASS = "org.apache.hive.jdbc.HiveDriver";
    private static final String CONNECTION_URL = "jdbc:hive2://192.168.31.184:10000/demo";
    private static PreparedStatement preparedstatement;
    private static Statement statement;
    private static ResultSet resultSet = null;
    //链接
    private Connection getConnection() throws SQLException {
        try {
            Class.forName(DRIVER_CLASS);
            Connection con = DriverManager.getConnection(CONNECTION_URL);
            statement = con.createStatement();
            return con;
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            throw new SQLException(e.getMessage());
        }
    }
    
    //断开链接
    public void disconnect(Connection con) throws SQLException {
      // Close resources
        resultSet.close();
        statement.close();
        con.close();
    }
    
    //执行查询
    public void query(String query) throws SQLException {
      // Execute a query
        resultSet = statement.executeQuery(query);
    }
    
    //带条件执行查询
    public void query(Connection con,String query,Map<String, String> condition) throws SQLException {
      String where = " where ";
      int i = 0;
      int length = condition.size(); 
      String[] valuearray= new String[length];
      for (String key : condition.keySet()) {
         String value = condition.get(key);
         where = where+key+" = ? AND ";
         valuearray[i] = value;
         i++;
      }
      where = where + "1=1";
      query = query + where;
      PreparedStatement preparedStatement = con.prepareStatement(query);
      for(int j=0;j<length;j++) {
        preparedStatement.setString(j+1, valuearray[j]);
      }
      resultSet = preparedStatement.executeQuery();;
    }
    
    //打印查询记录
    public void printQueryResult(ResultSet resultSet) throws SQLException {
      //获取 ResultSet 的元数据
        ResultSetMetaData metaData = resultSet.getMetaData();
        // 获取列数
        int columnCount = metaData.getColumnCount();
        while (resultSet.next()) {
          for (int i=1;i<=columnCount;i++) {
            System.out.print(resultSet.getString(i)+",");
          }
          System.out.println("");
        }
    }
    
    //查询并且打印数据
    public void queryAndPrint(String query) throws SQLException {
      query(query);
      printQueryResult(resultSet);
    }
    
    //查询并且打印数据
    public void queryAndPrint(Connection con,String query,Map<String, String> condition) throws SQLException {
      query(con,query,condition);
      printQueryResult(resultSet);
    }
    
    //添加数据
    public void addDataToHiveTable(Connection con,String tableName,String[] newValue,String like,String map) {
        try {
          String insertSql = "INSERT INTO person SELECT ?,?,?,"+like+","+map;
          System.out.println(like);
          preparedstatement = con.prepareStatement(insertSql);
          preparedstatement.setInt(1, Integer.parseInt(newValue[0]));
          preparedstatement.setString(2, newValue[1]);
          preparedstatement.setInt(3, Integer.parseInt(newValue[2]));
          preparedstatement.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
    
    //将文件中的数据加载到表中
    public void loadDataForLocal(String tableName,String path) throws SQLException {
      String query = "LOAD DATA LOCAL INPATH '"+path+"' INTO TABLE "+tableName;
      statement.execute(query);
    }
    
    //清空数据表
    public void truncateTable(Connection con,String tableName) throws SQLException {
      String query = "truncate table "+tableName;
      con.setAutoCommit(true); // 确保自动提交
      Statement statement = con.createStatement();
      statement.execute(query);
    }
    
public static void main(String[] args) throws SQLException {
  HiveClient hive = new HiveClient();
  String tableName = "person";
  String like = "array('basketball', 'music', 'dance')";
  String map = "map('address','xxxx')";
  String[] newAddValue = {"10","elite0","50"};
  Connection con = hive.getConnection();
  String query = "SELECT * FROM "+tableName;
  Map<String, String> condition = new HashMap<String, String>();
  condition.put("name","elite0");
  condition.put("age","50");
  String inpath = "/home/jerry/hive/person";
  try {
    System.out.println("全表查询:");
    hive.queryAndPrint(query);
    hive.addDataToHiveTable(con,tableName,newAddValue,like,map);
    System.out.println("插入数据后全表查询:");
    hive.queryAndPrint(query);
    System.out.println("条件查询:");
    hive.queryAndPrint(con,query,condition);
    hive.truncateTable(con,tableName);
    System.out.println("清空表:");
    hive.queryAndPrint(query);
    hive.loadDataForLocal(tableName,inpath);
    System.out.println("从文件中加载:");
    hive.queryAndPrint(query);
    hive.disconnect(con);
  } catch (Exception e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
  }
  }
}

运行结果

代码语言:javascript

复制

全表查询:
1,elite0,10,["basketball","music","dance"],{"adderss":"xx"},
2,elite1,20,["basketball","music","dance"],{"adderss":"xx"},
3,elite2,10,["basketball","music","dance"],{"adderss":"xx"},
4,elite3,20,["basketball","music","dance"],{"adderss":"xx"},
5,elite4,10,["basketball","music","dance"],{"adderss":"xx"},
6,elite5,20,["basketball","music","dance"],{"adderss":"xx"},
array('basketball', 'music', 'dance')
插入数据后全表查询:
10,elite0,50,["basketball","music","dance"],{"address":"xxxx"},
1,elite0,10,["basketball","music","dance"],{"adderss":"xx"},
2,elite1,20,["basketball","music","dance"],{"adderss":"xx"},
3,elite2,10,["basketball","music","dance"],{"adderss":"xx"},
4,elite3,20,["basketball","music","dance"],{"adderss":"xx"},
5,elite4,10,["basketball","music","dance"],{"adderss":"xx"},
6,elite5,20,["basketball","music","dance"],{"adderss":"xx"},
条件查询:
10,elite0,50,["basketball","music","dance"],{"address":"xxxx"},
清空表:
从文件中加载:
1,elite0,10,["basketball","music","dance"],{"adderss":"xx"},
2,elite1,20,["basketball","music","dance"],{"adderss":"xx"},
3,elite2,10,["basketball","music","dance"],{"adderss":"xx"},
4,elite3,20,["basketball","music","dance"],{"adderss":"xx"},
5,elite4,10,["basketball","music","dance"],{"adderss":"xx"},
6,elite5,20,["basketball","music","dance"],{"adderss":"xx"},

3 用Python开发Hive应用

pip3

代码语言:javascript

复制

pip install sasl
pip install thrift
pip install thrift-sasl
pip install PyHive

Python

代码语言:javascript

复制

import pandas as pd
from pyhive import hive
from sqlalchemy import create_engine
from pyhive import hive
class Hive:
    def __init__(self):
        self.database= 'demo'
        self.host = '192.168.31.184'
        self.port = '10000'
        
    def getconnect(self):
        conn = hive.Connection(host=self.host, port=self.port,database=self.database)
        return conn;
        
    def getEngine(self):
        # 创建 Hive 数据库连接
        hive_uri = f"hive://"+self.host+":"+self.port+"/"+self.database
        return create_engine(hive_uri)
    def disconnect(self,engine,conn):
        engine.dispose()
        conn.close()
    #执行查询
    def query(self,sql,engine,condition=None):
        try:
            if condition is None:
            # 执行 SQL 查询
                df = pd.read_sql(sql, engine)
                print(df)
            else:
                values = []
                where = " where "
                for key in condition:
                    where = where+key+" = %s and "
                    values.append(condition[key])
                where = where+"1=1"
                sql = sql + where
                params = tuple(values)
                df = pd.read_sql(sql, engine, params=params)
                print(df)
        except Exception as e:
            print("Error occurred:", e)
    #添加数据
    def addDataToHiveTable(self,conn,tableName,data):
        like_array = f"array({', '.join(map(lambda x: f'\'{x}\'', data['like']))})"  # 使用单引号包裹字符串
        address_map = f"map('{list(data['address'].keys())[0]}', '{list(data['address'].values())[0]}')"  # 创建 MAP 格式
        # 创建游标
        cursor = conn.cursor()
        insertSql = "INSERT INTO person SELECT %s,%s,%s,"+like_array+","+address_map
        # 执行插入操作
        try:
            cursor.execute(insertSql, (data['id'], data['name'], data['age']))
        except Exception as e:
            print(f"Error inserting data: {e}")
        conn.commit()
        cursor.close()
    #将文件中的数据加载到表中
    def loadDataForLocal(self,conn,tableName,path):
        cursor = conn.cursor()
        query = "LOAD DATA LOCAL INPATH '"+path+"' INTO TABLE "+tableName
        cursor.execute(query)
        conn.commit()
        cursor.close()
    
    #清空数据表
    def truncateTable(self,conn,tableName):
        cursor = conn.cursor()
        query = "truncate table "+tableName;
        #con.setAutoCommit(true) #确保自动提交
        cursor.execute(query)
        conn.commit()
        cursor.close()
        
if __name__ == "__main__":
    sql = "SELECT * FROM person"
    condition={"name":"elite1","age":"20"}
    # 准备要插入的数据
    data = {
        'id': "50",
        'name': "Jerry",
        'age': 50,  # 确保这里是整数
        'like': ["basketball", "music", "dance"],
        'address': {"address": "xx"}
    }
    tableName = "person"
    path = "/home/jerry/hive/person"
    myhive = Hive()
    print("建立连接")
    conn = myhive.getconnect()
    engine = myhive.getEngine()
    print("全表查询")
    myhive.query(sql,engine)
    print("条件查询")
    myhive.query(sql,engine,condition)
    print("加数据进入表")
    myhive.addDataToHiveTable(conn,tableName,data)
    myhive.query(sql,engine)
    print("清空表中所有数据")
    myhive.truncateTable(conn,tableName)
    print("从文件中导入数据")
    myhive.loadDataForLocal(conn,tableName,path)
    myhive.query(sql,engine)
    print("断开连接")
    myhive.disconnect(engine,conn)
  • connect:用于其他操作
  • engine:用于查询

运行结果

代码语言:javascript

复制

建立连接
全表查询
   id    name  age                           likes           address
0   1  elite0   10  ["basketball","music","dance"]  {"adderss":"xx"}
1   2  elite1   20  ["basketball","music","dance"]  {"adderss":"xx"}
2   3  elite2   10  ["basketball","music","dance"]  {"adderss":"xx"}
3   4  elite3   20  ["basketball","music","dance"]  {"adderss":"xx"}
4   5  elite4   10  ["basketball","music","dance"]  {"adderss":"xx"}
5   6  elite5   20  ["basketball","music","dance"]  {"adderss":"xx"}
条件查询
   id    name  age                           likes           address
0   2  elite1   20  ["basketball","music","dance"]  {"adderss":"xx"}
加数据进入表
   id    name  age                           likes           address
0  50   Jerry   50  ["basketball","music","dance"]  {"address":"xx"}
1   1  elite0   10  ["basketball","music","dance"]  {"adderss":"xx"}
2   2  elite1   20  ["basketball","music","dance"]  {"adderss":"xx"}
3   3  elite2   10  ["basketball","music","dance"]  {"adderss":"xx"}
4   4  elite3   20  ["basketball","music","dance"]  {"adderss":"xx"}
5   5  elite4   10  ["basketball","music","dance"]  {"adderss":"xx"}
6   6  elite5   20  ["basketball","music","dance"]  {"adderss":"xx"}
清空表中所有数据
从文件中导入数据
   id    name  age                           likes           address
0   1  elite0   10  ["basketball","music","dance"]  {"adderss":"xx"}
1   2  elite1   20  ["basketball","music","dance"]  {"adderss":"xx"}
2   3  elite2   10  ["basketball","music","dance"]  {"adderss":"xx"}
3   4  elite3   20  ["basketball","music","dance"]  {"adderss":"xx"}
4   5  elite4   10  ["basketball","music","dance"]  {"adderss":"xx"}
5   6  elite5   20  ["basketball","music","dance"]  {"adderss":"xx"}
断开连接
目录
相关文章
|
3天前
|
前端开发 JavaScript UED
探索Python Django中的WebSocket集成:为前后端分离应用添加实时通信功能
通过在Django项目中集成Channels和WebSocket,我们能够为前后端分离的应用添加实时通信功能,实现诸如在线聊天、实时数据更新等交互式场景。这不仅增强了应用的功能性,也提升了用户体验。随着实时Web应用的日益普及,掌握Django Channels和WebSocket的集成将为开发者开启新的可能性,推动Web应用的发展迈向更高层次的实时性和交互性。
21 1
|
2天前
|
监控 并行计算 数据处理
构建高效Python应用:并发与异步编程的实战秘籍,IO与CPU密集型任务一网打尽!
在Python编程的征途中,面对日益增长的性能需求,如何构建高效的应用成为了每位开发者必须面对的课题。并发与异步编程作为提升程序性能的两大法宝,在处理IO密集型与CPU密集型任务时展现出了巨大的潜力。今天,我们将深入探讨这些技术的最佳实践,助你打造高效Python应用。
10 0
|
7天前
|
数据采集 存储 数据挖掘
深入探索 Python 爬虫:高级技术与实战应用
本文介绍了Python爬虫的高级技术,涵盖并发处理、反爬虫策略(如验证码识别与模拟登录)及数据存储与处理方法。通过asyncio库实现异步爬虫,提升效率;利用tesseract和requests库应对反爬措施;借助SQLAlchemy和pandas进行数据存储与分析。实战部分展示了如何爬取电商网站的商品信息及新闻网站的文章内容。提醒读者在实际应用中需遵守法律法规。
121 66
|
7天前
|
SQL 数据采集 数据可视化
深入 Python 数据分析:高级技术与实战应用
本文系统地介绍了Python在高级数据分析中的应用,涵盖数据读取、预处理、探索及可视化等关键环节,并详细展示了聚类分析、PCA、时间序列分析等高级技术。通过实际案例,帮助读者掌握解决复杂问题的方法,提升数据分析技能。使用pandas、matplotlib、seaborn及sklearn等库,提供了丰富的代码示例,便于实践操作。
129 64
|
4天前
|
Python
Python 格式化输出的高级技巧与应用
Python 格式化输出技术可让数据展示更加清晰易读。本文介绍了四种高级技巧:使用 `%` 占位符进行简单格式化;利用 `format()` 方法提供更灵活的参数位置控制;通过 `{:.2f}` 格式化浮点数保留两位小数;使用 `&lt;`, `&gt;`, `^` 对齐方式及字符填充;最后,展示了如何用特定格式代码格式化日期和时间。这些技巧能够显著提升代码的可读性和美观性。
14 6
|
2天前
|
中间件 API 调度
深入探究 Python 异步编程:利用 asyncio 和 aiohttp 构建高效并发应用
深入探究 Python 异步编程:利用 asyncio 和 aiohttp 构建高效并发应用
9 4
|
23小时前
|
存储 算法 安全
Python 加密算法详解与应用
Python 加密算法详解与应用
6 1
|
2天前
|
Python
Datetime模块应用:Python计算上周周几对应的日期
Datetime模块应用:Python计算上周周几对应的日期
10 1
|
3天前
|
中间件 API 调度
深入探究 Python 异步编程:利用 asyncio 和 aiohttp 构建高效并发应用 精选
深入探究 Python 异步编程:利用 asyncio 和 aiohttp 构建高效并发应用 精选
10 2
|
3天前
|
机器学习/深度学习 数据采集 算法
【Python篇】从零到精通:全面分析Scikit-Learn在机器学习中的绝妙应用
【Python篇】从零到精通:全面分析Scikit-Learn在机器学习中的绝妙应用
15 2