用Java、Python来开发Hive应用

简介: 用Java、Python来开发Hive应用

1 预先配置

hive配置文件:%HIVE_HOME%/conf/hive-site.xml添加

代码语言:javascript

复制

<!-- 禁用 impersonation -->
<property>
    <name>hive.server2.enable.doAs</name>
    <value>false</value> 
</property>

Hadoop 的配置文件中%HADOOP_HOME%/etc/hadoo/下的:core-site.xml 和 hdfs-site.xml添加

代码语言:javascript

复制

<property>
    <name>hadoop.proxyuser.root.groups</name>
    <value>*</value>
</property>
<property>
    <name>hadoop.proxyuser.root.hosts</name>
    <value>*</value>
</property>

确保没有设置限制 root 用户的权限

修改访问数据库表person的权限

代码语言:javascript

复制

#hdfs dfs -chmod -R 775 /user/hive/warehouse/demo.db/person

由于Hive是数据仓库,而不是数据库,所以一般不支持增删改查,这里仅介绍如何通过Java来向Hive插入,查询数据。2 用Java来开发Hive应用

pom.xml

代码语言:javascript

复制

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.jerry</groupId>
  <artifactId>hive</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <description>Java How to connect Hivi</description>
    <dependencies>
        <!-- Hive JDBC Driver -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>3.1.2</version>
        </dependency>
        <!-- Hadoop Common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.2.2</version>
        </dependency>
        <!-- Hadoop Client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.2.2</version>
        </dependency>
    </dependencies>
</project>

Java文件

代码语言:javascript

复制

package com.jerry;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
public class HiveClient {
    private static final String DRIVER_CLASS = "org.apache.hive.jdbc.HiveDriver";
    private static final String CONNECTION_URL = "jdbc:hive2://192.168.31.184:10000/demo";
    private static PreparedStatement preparedstatement;
    private static Statement statement;
    private static ResultSet resultSet = null;
    //链接
    private Connection getConnection() throws SQLException {
        try {
            Class.forName(DRIVER_CLASS);
            Connection con = DriverManager.getConnection(CONNECTION_URL);
            statement = con.createStatement();
            return con;
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            throw new SQLException(e.getMessage());
        }
    }
    
    //断开链接
    public void disconnect(Connection con) throws SQLException {
      // Close resources
        resultSet.close();
        statement.close();
        con.close();
    }
    
    //执行查询
    public void query(String query) throws SQLException {
      // Execute a query
        resultSet = statement.executeQuery(query);
    }
    
    //带条件执行查询
    public void query(Connection con,String query,Map<String, String> condition) throws SQLException {
      String where = " where ";
      int i = 0;
      int length = condition.size(); 
      String[] valuearray= new String[length];
      for (String key : condition.keySet()) {
         String value = condition.get(key);
         where = where+key+" = ? AND ";
         valuearray[i] = value;
         i++;
      }
      where = where + "1=1";
      query = query + where;
      PreparedStatement preparedStatement = con.prepareStatement(query);
      for(int j=0;j<length;j++) {
        preparedStatement.setString(j+1, valuearray[j]);
      }
      resultSet = preparedStatement.executeQuery();;
    }
    
    //打印查询记录
    public void printQueryResult(ResultSet resultSet) throws SQLException {
      //获取 ResultSet 的元数据
        ResultSetMetaData metaData = resultSet.getMetaData();
        // 获取列数
        int columnCount = metaData.getColumnCount();
        while (resultSet.next()) {
          for (int i=1;i<=columnCount;i++) {
            System.out.print(resultSet.getString(i)+",");
          }
          System.out.println("");
        }
    }
    
    //查询并且打印数据
    public void queryAndPrint(String query) throws SQLException {
      query(query);
      printQueryResult(resultSet);
    }
    
    //查询并且打印数据
    public void queryAndPrint(Connection con,String query,Map<String, String> condition) throws SQLException {
      query(con,query,condition);
      printQueryResult(resultSet);
    }
    
    //添加数据
    public void addDataToHiveTable(Connection con,String tableName,String[] newValue,String like,String map) {
        try {
          String insertSql = "INSERT INTO person SELECT ?,?,?,"+like+","+map;
          System.out.println(like);
          preparedstatement = con.prepareStatement(insertSql);
          preparedstatement.setInt(1, Integer.parseInt(newValue[0]));
          preparedstatement.setString(2, newValue[1]);
          preparedstatement.setInt(3, Integer.parseInt(newValue[2]));
          preparedstatement.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
    
    //将文件中的数据加载到表中
    public void loadDataForLocal(String tableName,String path) throws SQLException {
      String query = "LOAD DATA LOCAL INPATH '"+path+"' INTO TABLE "+tableName;
      statement.execute(query);
    }
    
    //清空数据表
    public void truncateTable(Connection con,String tableName) throws SQLException {
      String query = "truncate table "+tableName;
      con.setAutoCommit(true); // 确保自动提交
      Statement statement = con.createStatement();
      statement.execute(query);
    }
    
public static void main(String[] args) throws SQLException {
  HiveClient hive = new HiveClient();
  String tableName = "person";
  String like = "array('basketball', 'music', 'dance')";
  String map = "map('address','xxxx')";
  String[] newAddValue = {"10","elite0","50"};
  Connection con = hive.getConnection();
  String query = "SELECT * FROM "+tableName;
  Map<String, String> condition = new HashMap<String, String>();
  condition.put("name","elite0");
  condition.put("age","50");
  String inpath = "/home/jerry/hive/person";
  try {
    System.out.println("全表查询:");
    hive.queryAndPrint(query);
    hive.addDataToHiveTable(con,tableName,newAddValue,like,map);
    System.out.println("插入数据后全表查询:");
    hive.queryAndPrint(query);
    System.out.println("条件查询:");
    hive.queryAndPrint(con,query,condition);
    hive.truncateTable(con,tableName);
    System.out.println("清空表:");
    hive.queryAndPrint(query);
    hive.loadDataForLocal(tableName,inpath);
    System.out.println("从文件中加载:");
    hive.queryAndPrint(query);
    hive.disconnect(con);
  } catch (Exception e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
  }
  }
}

运行结果

代码语言:javascript

复制

全表查询:
1,elite0,10,["basketball","music","dance"],{"adderss":"xx"},
2,elite1,20,["basketball","music","dance"],{"adderss":"xx"},
3,elite2,10,["basketball","music","dance"],{"adderss":"xx"},
4,elite3,20,["basketball","music","dance"],{"adderss":"xx"},
5,elite4,10,["basketball","music","dance"],{"adderss":"xx"},
6,elite5,20,["basketball","music","dance"],{"adderss":"xx"},
array('basketball', 'music', 'dance')
插入数据后全表查询:
10,elite0,50,["basketball","music","dance"],{"address":"xxxx"},
1,elite0,10,["basketball","music","dance"],{"adderss":"xx"},
2,elite1,20,["basketball","music","dance"],{"adderss":"xx"},
3,elite2,10,["basketball","music","dance"],{"adderss":"xx"},
4,elite3,20,["basketball","music","dance"],{"adderss":"xx"},
5,elite4,10,["basketball","music","dance"],{"adderss":"xx"},
6,elite5,20,["basketball","music","dance"],{"adderss":"xx"},
条件查询:
10,elite0,50,["basketball","music","dance"],{"address":"xxxx"},
清空表:
从文件中加载:
1,elite0,10,["basketball","music","dance"],{"adderss":"xx"},
2,elite1,20,["basketball","music","dance"],{"adderss":"xx"},
3,elite2,10,["basketball","music","dance"],{"adderss":"xx"},
4,elite3,20,["basketball","music","dance"],{"adderss":"xx"},
5,elite4,10,["basketball","music","dance"],{"adderss":"xx"},
6,elite5,20,["basketball","music","dance"],{"adderss":"xx"},

3 用Python开发Hive应用

pip3

代码语言:javascript

复制

pip install sasl
pip install thrift
pip install thrift-sasl
pip install PyHive

Python

代码语言:javascript

复制

import pandas as pd
from pyhive import hive
from sqlalchemy import create_engine
from pyhive import hive
class Hive:
    def __init__(self):
        self.database= 'demo'
        self.host = '192.168.31.184'
        self.port = '10000'
        
    def getconnect(self):
        conn = hive.Connection(host=self.host, port=self.port,database=self.database)
        return conn;
        
    def getEngine(self):
        # 创建 Hive 数据库连接
        hive_uri = f"hive://"+self.host+":"+self.port+"/"+self.database
        return create_engine(hive_uri)
    def disconnect(self,engine,conn):
        engine.dispose()
        conn.close()
    #执行查询
    def query(self,sql,engine,condition=None):
        try:
            if condition is None:
            # 执行 SQL 查询
                df = pd.read_sql(sql, engine)
                print(df)
            else:
                values = []
                where = " where "
                for key in condition:
                    where = where+key+" = %s and "
                    values.append(condition[key])
                where = where+"1=1"
                sql = sql + where
                params = tuple(values)
                df = pd.read_sql(sql, engine, params=params)
                print(df)
        except Exception as e:
            print("Error occurred:", e)
    #添加数据
    def addDataToHiveTable(self,conn,tableName,data):
        like_array = f"array({', '.join(map(lambda x: f'\'{x}\'', data['like']))})"  # 使用单引号包裹字符串
        address_map = f"map('{list(data['address'].keys())[0]}', '{list(data['address'].values())[0]}')"  # 创建 MAP 格式
        # 创建游标
        cursor = conn.cursor()
        insertSql = "INSERT INTO person SELECT %s,%s,%s,"+like_array+","+address_map
        # 执行插入操作
        try:
            cursor.execute(insertSql, (data['id'], data['name'], data['age']))
        except Exception as e:
            print(f"Error inserting data: {e}")
        conn.commit()
        cursor.close()
    #将文件中的数据加载到表中
    def loadDataForLocal(self,conn,tableName,path):
        cursor = conn.cursor()
        query = "LOAD DATA LOCAL INPATH '"+path+"' INTO TABLE "+tableName
        cursor.execute(query)
        conn.commit()
        cursor.close()
    
    #清空数据表
    def truncateTable(self,conn,tableName):
        cursor = conn.cursor()
        query = "truncate table "+tableName;
        #con.setAutoCommit(true) #确保自动提交
        cursor.execute(query)
        conn.commit()
        cursor.close()
        
if __name__ == "__main__":
    sql = "SELECT * FROM person"
    condition={"name":"elite1","age":"20"}
    # 准备要插入的数据
    data = {
        'id': "50",
        'name': "Jerry",
        'age': 50,  # 确保这里是整数
        'like': ["basketball", "music", "dance"],
        'address': {"address": "xx"}
    }
    tableName = "person"
    path = "/home/jerry/hive/person"
    myhive = Hive()
    print("建立连接")
    conn = myhive.getconnect()
    engine = myhive.getEngine()
    print("全表查询")
    myhive.query(sql,engine)
    print("条件查询")
    myhive.query(sql,engine,condition)
    print("加数据进入表")
    myhive.addDataToHiveTable(conn,tableName,data)
    myhive.query(sql,engine)
    print("清空表中所有数据")
    myhive.truncateTable(conn,tableName)
    print("从文件中导入数据")
    myhive.loadDataForLocal(conn,tableName,path)
    myhive.query(sql,engine)
    print("断开连接")
    myhive.disconnect(engine,conn)
  • connect:用于其他操作
  • engine:用于查询

运行结果

代码语言:javascript

复制

建立连接
全表查询
   id    name  age                           likes           address
0   1  elite0   10  ["basketball","music","dance"]  {"adderss":"xx"}
1   2  elite1   20  ["basketball","music","dance"]  {"adderss":"xx"}
2   3  elite2   10  ["basketball","music","dance"]  {"adderss":"xx"}
3   4  elite3   20  ["basketball","music","dance"]  {"adderss":"xx"}
4   5  elite4   10  ["basketball","music","dance"]  {"adderss":"xx"}
5   6  elite5   20  ["basketball","music","dance"]  {"adderss":"xx"}
条件查询
   id    name  age                           likes           address
0   2  elite1   20  ["basketball","music","dance"]  {"adderss":"xx"}
加数据进入表
   id    name  age                           likes           address
0  50   Jerry   50  ["basketball","music","dance"]  {"address":"xx"}
1   1  elite0   10  ["basketball","music","dance"]  {"adderss":"xx"}
2   2  elite1   20  ["basketball","music","dance"]  {"adderss":"xx"}
3   3  elite2   10  ["basketball","music","dance"]  {"adderss":"xx"}
4   4  elite3   20  ["basketball","music","dance"]  {"adderss":"xx"}
5   5  elite4   10  ["basketball","music","dance"]  {"adderss":"xx"}
6   6  elite5   20  ["basketball","music","dance"]  {"adderss":"xx"}
清空表中所有数据
从文件中导入数据
   id    name  age                           likes           address
0   1  elite0   10  ["basketball","music","dance"]  {"adderss":"xx"}
1   2  elite1   20  ["basketball","music","dance"]  {"adderss":"xx"}
2   3  elite2   10  ["basketball","music","dance"]  {"adderss":"xx"}
3   4  elite3   20  ["basketball","music","dance"]  {"adderss":"xx"}
4   5  elite4   10  ["basketball","music","dance"]  {"adderss":"xx"}
5   6  elite5   20  ["basketball","music","dance"]  {"adderss":"xx"}
断开连接
目录
相关文章
|
6天前
|
JSON 安全 API
如何使用Python开发API接口?
在现代软件开发中,API(应用程序编程接口)用于不同软件组件之间的通信和数据交换,实现系统互操作性。Python因其简单易用和强大功能,成为开发API的热门选择。本文详细介绍了Python开发API的基础知识、优势、实现方式(如Flask和Django框架)、实战示例及注意事项,帮助读者掌握高效、安全的API开发技巧。
29 3
如何使用Python开发API接口?
|
11天前
|
人工智能 前端开发 Java
基于开源框架Spring AI Alibaba快速构建Java应用
本文旨在帮助开发者快速掌握并应用 Spring AI Alibaba,提升基于 Java 的大模型应用开发效率和安全性。
基于开源框架Spring AI Alibaba快速构建Java应用
|
2天前
|
JSON Java Apache
非常实用的Http应用框架,杜绝Java Http 接口对接繁琐编程
UniHttp 是一个声明式的 HTTP 接口对接框架,帮助开发者快速对接第三方 HTTP 接口。通过 @HttpApi 注解定义接口,使用 @GetHttpInterface 和 @PostHttpInterface 等注解配置请求方法和参数。支持自定义代理逻辑、全局请求参数、错误处理和连接池配置,提高代码的内聚性和可读性。
|
4天前
|
SQL 安全 Java
安全问题已经成为软件开发中不可忽视的重要议题。对于使用Java语言开发的应用程序来说,安全性更是至关重要
在当今网络环境下,Java应用的安全性至关重要。本文深入探讨了Java安全编程的最佳实践,包括代码审查、输入验证、输出编码、访问控制和加密技术等,帮助开发者构建安全可靠的应用。通过掌握相关技术和工具,开发者可以有效防范安全威胁,确保应用的安全性。
14 4
|
4天前
|
SQL Java 数据库连接
从理论到实践:Hibernate与JPA在Java项目中的实际应用
本文介绍了Java持久层框架Hibernate和JPA的基本概念及其在具体项目中的应用。通过一个在线书店系统的实例,展示了如何使用@Entity注解定义实体类、通过Spring Data JPA定义仓库接口、在服务层调用方法进行数据库操作,以及使用JPQL编写自定义查询和管理事务。这些技术不仅简化了数据库操作,还显著提升了开发效率。
13 3
|
6天前
|
缓存 监控 Java
如何运用JAVA开发API接口?
本文详细介绍了如何使用Java开发API接口,涵盖创建、实现、测试和部署接口的关键步骤。同时,讨论了接口的安全性设计和设计原则,帮助开发者构建高效、安全、易于维护的API接口。
24 4
|
5天前
|
数据采集 存储 JSON
Python爬虫开发中的分析与方案制定
Python爬虫开发中的分析与方案制定
|
10天前
|
算法 测试技术 开发者
性能优化与代码审查:提升Python开发效率
性能优化与代码审查:提升Python开发效率
22 1
|
5天前
|
安全 Java 测试技术
Java开发必读,谈谈对Spring IOC与AOP的理解
Spring的IOC和AOP机制通过依赖注入和横切关注点的分离,大大提高了代码的模块化和可维护性。IOC使得对象的创建和管理变得灵活可控,降低了对象之间的耦合度;AOP则通过动态代理机制实现了横切关注点的集中管理,减少了重复代码。理解和掌握这两个核心概念,是高效使用Spring框架的关键。希望本文对你深入理解Spring的IOC和AOP有所帮助。
10 0
|
6天前
|
Java API Android开发
kotlin和java开发优缺点
kotlin和java开发优缺点
17 0