用Java、Python来开发Hive应用

简介: 用Java、Python来开发Hive应用

1 预先配置

hive配置文件:%HIVE_HOME%/conf/hive-site.xml添加

代码语言:javascript

复制

<!-- 禁用 impersonation -->
<property>
    <name>hive.server2.enable.doAs</name>
    <value>false</value> 
</property>

Hadoop 的配置文件中%HADOOP_HOME%/etc/hadoo/下的:core-site.xml 和 hdfs-site.xml添加

代码语言:javascript

复制

<property>
    <name>hadoop.proxyuser.root.groups</name>
    <value>*</value>
</property>
<property>
    <name>hadoop.proxyuser.root.hosts</name>
    <value>*</value>
</property>

确保没有设置限制 root 用户的权限

修改访问数据库表person的权限

代码语言:javascript

复制

#hdfs dfs -chmod -R 775 /user/hive/warehouse/demo.db/person

由于Hive是数据仓库,而不是数据库,所以一般不支持增删改查,这里仅介绍如何通过Java来向Hive插入,查询数据。2 用Java来开发Hive应用

pom.xml

代码语言:javascript

复制

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.jerry</groupId>
  <artifactId>hive</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <description>Java How to connect Hivi</description>
    <dependencies>
        <!-- Hive JDBC Driver -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>3.1.2</version>
        </dependency>
        <!-- Hadoop Common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.2.2</version>
        </dependency>
        <!-- Hadoop Client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.2.2</version>
        </dependency>
    </dependencies>
</project>

Java文件

代码语言:javascript

复制

package com.jerry;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
public class HiveClient {
    private static final String DRIVER_CLASS = "org.apache.hive.jdbc.HiveDriver";
    private static final String CONNECTION_URL = "jdbc:hive2://192.168.31.184:10000/demo";
    private static PreparedStatement preparedstatement;
    private static Statement statement;
    private static ResultSet resultSet = null;
    //链接
    private Connection getConnection() throws SQLException {
        try {
            Class.forName(DRIVER_CLASS);
            Connection con = DriverManager.getConnection(CONNECTION_URL);
            statement = con.createStatement();
            return con;
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            throw new SQLException(e.getMessage());
        }
    }
    
    //断开链接
    public void disconnect(Connection con) throws SQLException {
      // Close resources
        resultSet.close();
        statement.close();
        con.close();
    }
    
    //执行查询
    public void query(String query) throws SQLException {
      // Execute a query
        resultSet = statement.executeQuery(query);
    }
    
    //带条件执行查询
    public void query(Connection con,String query,Map<String, String> condition) throws SQLException {
      String where = " where ";
      int i = 0;
      int length = condition.size(); 
      String[] valuearray= new String[length];
      for (String key : condition.keySet()) {
         String value = condition.get(key);
         where = where+key+" = ? AND ";
         valuearray[i] = value;
         i++;
      }
      where = where + "1=1";
      query = query + where;
      PreparedStatement preparedStatement = con.prepareStatement(query);
      for(int j=0;j<length;j++) {
        preparedStatement.setString(j+1, valuearray[j]);
      }
      resultSet = preparedStatement.executeQuery();;
    }
    
    //打印查询记录
    public void printQueryResult(ResultSet resultSet) throws SQLException {
      //获取 ResultSet 的元数据
        ResultSetMetaData metaData = resultSet.getMetaData();
        // 获取列数
        int columnCount = metaData.getColumnCount();
        while (resultSet.next()) {
          for (int i=1;i<=columnCount;i++) {
            System.out.print(resultSet.getString(i)+",");
          }
          System.out.println("");
        }
    }
    
    //查询并且打印数据
    public void queryAndPrint(String query) throws SQLException {
      query(query);
      printQueryResult(resultSet);
    }
    
    //查询并且打印数据
    public void queryAndPrint(Connection con,String query,Map<String, String> condition) throws SQLException {
      query(con,query,condition);
      printQueryResult(resultSet);
    }
    
    //添加数据
    public void addDataToHiveTable(Connection con,String tableName,String[] newValue,String like,String map) {
        try {
          String insertSql = "INSERT INTO person SELECT ?,?,?,"+like+","+map;
          System.out.println(like);
          preparedstatement = con.prepareStatement(insertSql);
          preparedstatement.setInt(1, Integer.parseInt(newValue[0]));
          preparedstatement.setString(2, newValue[1]);
          preparedstatement.setInt(3, Integer.parseInt(newValue[2]));
          preparedstatement.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
    
    //将文件中的数据加载到表中
    public void loadDataForLocal(String tableName,String path) throws SQLException {
      String query = "LOAD DATA LOCAL INPATH '"+path+"' INTO TABLE "+tableName;
      statement.execute(query);
    }
    
    //清空数据表
    public void truncateTable(Connection con,String tableName) throws SQLException {
      String query = "truncate table "+tableName;
      con.setAutoCommit(true); // 确保自动提交
      Statement statement = con.createStatement();
      statement.execute(query);
    }
    
public static void main(String[] args) throws SQLException {
  HiveClient hive = new HiveClient();
  String tableName = "person";
  String like = "array('basketball', 'music', 'dance')";
  String map = "map('address','xxxx')";
  String[] newAddValue = {"10","elite0","50"};
  Connection con = hive.getConnection();
  String query = "SELECT * FROM "+tableName;
  Map<String, String> condition = new HashMap<String, String>();
  condition.put("name","elite0");
  condition.put("age","50");
  String inpath = "/home/jerry/hive/person";
  try {
    System.out.println("全表查询:");
    hive.queryAndPrint(query);
    hive.addDataToHiveTable(con,tableName,newAddValue,like,map);
    System.out.println("插入数据后全表查询:");
    hive.queryAndPrint(query);
    System.out.println("条件查询:");
    hive.queryAndPrint(con,query,condition);
    hive.truncateTable(con,tableName);
    System.out.println("清空表:");
    hive.queryAndPrint(query);
    hive.loadDataForLocal(tableName,inpath);
    System.out.println("从文件中加载:");
    hive.queryAndPrint(query);
    hive.disconnect(con);
  } catch (Exception e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
  }
  }
}

运行结果

代码语言:javascript

复制

全表查询:
1,elite0,10,["basketball","music","dance"],{"adderss":"xx"},
2,elite1,20,["basketball","music","dance"],{"adderss":"xx"},
3,elite2,10,["basketball","music","dance"],{"adderss":"xx"},
4,elite3,20,["basketball","music","dance"],{"adderss":"xx"},
5,elite4,10,["basketball","music","dance"],{"adderss":"xx"},
6,elite5,20,["basketball","music","dance"],{"adderss":"xx"},
array('basketball', 'music', 'dance')
插入数据后全表查询:
10,elite0,50,["basketball","music","dance"],{"address":"xxxx"},
1,elite0,10,["basketball","music","dance"],{"adderss":"xx"},
2,elite1,20,["basketball","music","dance"],{"adderss":"xx"},
3,elite2,10,["basketball","music","dance"],{"adderss":"xx"},
4,elite3,20,["basketball","music","dance"],{"adderss":"xx"},
5,elite4,10,["basketball","music","dance"],{"adderss":"xx"},
6,elite5,20,["basketball","music","dance"],{"adderss":"xx"},
条件查询:
10,elite0,50,["basketball","music","dance"],{"address":"xxxx"},
清空表:
从文件中加载:
1,elite0,10,["basketball","music","dance"],{"adderss":"xx"},
2,elite1,20,["basketball","music","dance"],{"adderss":"xx"},
3,elite2,10,["basketball","music","dance"],{"adderss":"xx"},
4,elite3,20,["basketball","music","dance"],{"adderss":"xx"},
5,elite4,10,["basketball","music","dance"],{"adderss":"xx"},
6,elite5,20,["basketball","music","dance"],{"adderss":"xx"},

3 用Python开发Hive应用

pip3

代码语言:javascript

复制

pip install sasl
pip install thrift
pip install thrift-sasl
pip install PyHive

Python

代码语言:javascript

复制

import pandas as pd
from pyhive import hive
from sqlalchemy import create_engine
from pyhive import hive
class Hive:
    def __init__(self):
        self.database= 'demo'
        self.host = '192.168.31.184'
        self.port = '10000'
        
    def getconnect(self):
        conn = hive.Connection(host=self.host, port=self.port,database=self.database)
        return conn;
        
    def getEngine(self):
        # 创建 Hive 数据库连接
        hive_uri = f"hive://"+self.host+":"+self.port+"/"+self.database
        return create_engine(hive_uri)
    def disconnect(self,engine,conn):
        engine.dispose()
        conn.close()
    #执行查询
    def query(self,sql,engine,condition=None):
        try:
            if condition is None:
            # 执行 SQL 查询
                df = pd.read_sql(sql, engine)
                print(df)
            else:
                values = []
                where = " where "
                for key in condition:
                    where = where+key+" = %s and "
                    values.append(condition[key])
                where = where+"1=1"
                sql = sql + where
                params = tuple(values)
                df = pd.read_sql(sql, engine, params=params)
                print(df)
        except Exception as e:
            print("Error occurred:", e)
    #添加数据
    def addDataToHiveTable(self,conn,tableName,data):
        like_array = f"array({', '.join(map(lambda x: f'\'{x}\'', data['like']))})"  # 使用单引号包裹字符串
        address_map = f"map('{list(data['address'].keys())[0]}', '{list(data['address'].values())[0]}')"  # 创建 MAP 格式
        # 创建游标
        cursor = conn.cursor()
        insertSql = "INSERT INTO person SELECT %s,%s,%s,"+like_array+","+address_map
        # 执行插入操作
        try:
            cursor.execute(insertSql, (data['id'], data['name'], data['age']))
        except Exception as e:
            print(f"Error inserting data: {e}")
        conn.commit()
        cursor.close()
    #将文件中的数据加载到表中
    def loadDataForLocal(self,conn,tableName,path):
        cursor = conn.cursor()
        query = "LOAD DATA LOCAL INPATH '"+path+"' INTO TABLE "+tableName
        cursor.execute(query)
        conn.commit()
        cursor.close()
    
    #清空数据表
    def truncateTable(self,conn,tableName):
        cursor = conn.cursor()
        query = "truncate table "+tableName;
        #con.setAutoCommit(true) #确保自动提交
        cursor.execute(query)
        conn.commit()
        cursor.close()
        
if __name__ == "__main__":
    sql = "SELECT * FROM person"
    condition={"name":"elite1","age":"20"}
    # 准备要插入的数据
    data = {
        'id': "50",
        'name': "Jerry",
        'age': 50,  # 确保这里是整数
        'like': ["basketball", "music", "dance"],
        'address': {"address": "xx"}
    }
    tableName = "person"
    path = "/home/jerry/hive/person"
    myhive = Hive()
    print("建立连接")
    conn = myhive.getconnect()
    engine = myhive.getEngine()
    print("全表查询")
    myhive.query(sql,engine)
    print("条件查询")
    myhive.query(sql,engine,condition)
    print("加数据进入表")
    myhive.addDataToHiveTable(conn,tableName,data)
    myhive.query(sql,engine)
    print("清空表中所有数据")
    myhive.truncateTable(conn,tableName)
    print("从文件中导入数据")
    myhive.loadDataForLocal(conn,tableName,path)
    myhive.query(sql,engine)
    print("断开连接")
    myhive.disconnect(engine,conn)
  • connect:用于其他操作
  • engine:用于查询

运行结果

代码语言:javascript

复制

建立连接
全表查询
   id    name  age                           likes           address
0   1  elite0   10  ["basketball","music","dance"]  {"adderss":"xx"}
1   2  elite1   20  ["basketball","music","dance"]  {"adderss":"xx"}
2   3  elite2   10  ["basketball","music","dance"]  {"adderss":"xx"}
3   4  elite3   20  ["basketball","music","dance"]  {"adderss":"xx"}
4   5  elite4   10  ["basketball","music","dance"]  {"adderss":"xx"}
5   6  elite5   20  ["basketball","music","dance"]  {"adderss":"xx"}
条件查询
   id    name  age                           likes           address
0   2  elite1   20  ["basketball","music","dance"]  {"adderss":"xx"}
加数据进入表
   id    name  age                           likes           address
0  50   Jerry   50  ["basketball","music","dance"]  {"address":"xx"}
1   1  elite0   10  ["basketball","music","dance"]  {"adderss":"xx"}
2   2  elite1   20  ["basketball","music","dance"]  {"adderss":"xx"}
3   3  elite2   10  ["basketball","music","dance"]  {"adderss":"xx"}
4   4  elite3   20  ["basketball","music","dance"]  {"adderss":"xx"}
5   5  elite4   10  ["basketball","music","dance"]  {"adderss":"xx"}
6   6  elite5   20  ["basketball","music","dance"]  {"adderss":"xx"}
清空表中所有数据
从文件中导入数据
   id    name  age                           likes           address
0   1  elite0   10  ["basketball","music","dance"]  {"adderss":"xx"}
1   2  elite1   20  ["basketball","music","dance"]  {"adderss":"xx"}
2   3  elite2   10  ["basketball","music","dance"]  {"adderss":"xx"}
3   4  elite3   20  ["basketball","music","dance"]  {"adderss":"xx"}
4   5  elite4   10  ["basketball","music","dance"]  {"adderss":"xx"}
5   6  elite5   20  ["basketball","music","dance"]  {"adderss":"xx"}
断开连接
目录
相关文章
|
7月前
|
人工智能 算法 Java
Java与AI驱动区块链:构建智能合约与去中心化AI应用
区块链技术和人工智能的融合正在开创去中心化智能应用的新纪元。本文深入探讨如何使用Java构建AI驱动的区块链应用,涵盖智能合约开发、去中心化AI模型训练与推理、数据隐私保护以及通证经济激励等核心主题。我们将完整展示从区块链基础集成、智能合约编写、AI模型上链到去中心化应用(DApp)开发的全流程,为构建下一代可信、透明的智能去中心化系统提供完整技术方案。
467 3
|
7月前
|
消息中间件 缓存 Java
Spring框架优化:提高Java应用的性能与适应性
以上方法均旨在综合考虑Java Spring 应该程序设计原则, 数据库交互, 编码实践和系统架构布局等多角度因素, 旨在达到高效稳定运转目标同时也易于未来扩展.
616 8
|
8月前
|
设计模式 人工智能 API
AI智能体开发实战:17种核心架构模式详解与Python代码实现
本文系统解析17种智能体架构设计模式,涵盖多智能体协作、思维树、反思优化与工具调用等核心范式,结合LangChain与LangGraph实现代码工作流,并通过真实案例验证效果,助力构建高效AI系统。
914 7
|
8月前
|
人工智能 Java API
Java与大模型集成实战:构建智能Java应用的新范式
随着大型语言模型(LLM)的API化,将其强大的自然语言处理能力集成到现有Java应用中已成为提升应用智能水平的关键路径。本文旨在为Java开发者提供一份实用的集成指南。我们将深入探讨如何使用Spring Boot 3框架,通过HTTP客户端与OpenAI GPT(或兼容API)进行高效、安全的交互。内容涵盖项目依赖配置、异步非阻塞的API调用、请求与响应的结构化处理、异常管理以及一些面向生产环境的最佳实践,并附带完整的代码示例,助您快速将AI能力融入Java生态。
1339 12
|
8月前
|
监控 数据可视化 数据挖掘
Python Rich库使用指南:打造更美观的命令行应用
Rich库是Python的终端美化利器,支持彩色文本、智能表格、动态进度条和语法高亮,大幅提升命令行应用的可视化效果与用户体验。
751 0
|
8月前
|
jenkins Java Shell
Java、Python、C++支持jenkins和SonarQube(全集)
Jenkins 是一个开源的持续集成(CI)和持续交付(CD)工具,用于自动化构建、测试和部署软件项目。它基于 Java 开发,支持跨平台运行,并拥有丰富的插件生态系统,可以灵活地扩展功能
719 1
|
8月前
|
jenkins Shell 测试技术
|
8月前
|
jenkins Java 持续交付
Java、Python、C++支持Jenkins和SonarQube(三)
Python与Jenkins和SonarQube
402 1
|
8月前
|
jenkins Java 测试技术
|
8月前
|
机器学习/深度学习 JSON Java
Java调用Python的5种实用方案:从简单到进阶的全场景解析
在机器学习与大数据融合背景下,Java与Python协同开发成为企业常见需求。本文通过真实案例解析5种主流调用方案,涵盖脚本调用到微服务架构,助力开发者根据业务场景选择最优方案,提升开发效率与系统性能。
1907 0

推荐镜像

更多