用Java、Python来开发Hive应用

简介: 用Java、Python来开发Hive应用

1 预先配置

hive配置文件:%HIVE_HOME%/conf/hive-site.xml添加

代码语言:javascript

复制

<!-- 禁用 impersonation -->
<property>
    <name>hive.server2.enable.doAs</name>
    <value>false</value> 
</property>

Hadoop 的配置文件中%HADOOP_HOME%/etc/hadoo/下的:core-site.xml 和 hdfs-site.xml添加

代码语言:javascript

复制

<property>
    <name>hadoop.proxyuser.root.groups</name>
    <value>*</value>
</property>
<property>
    <name>hadoop.proxyuser.root.hosts</name>
    <value>*</value>
</property>

确保没有设置限制 root 用户的权限

修改访问数据库表person的权限

代码语言:javascript

复制

#hdfs dfs -chmod -R 775 /user/hive/warehouse/demo.db/person

由于Hive是数据仓库,而不是数据库,所以一般不支持增删改查,这里仅介绍如何通过Java来向Hive插入,查询数据。2 用Java来开发Hive应用

pom.xml

代码语言:javascript

复制

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.jerry</groupId>
  <artifactId>hive</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <description>Java How to connect Hivi</description>
    <dependencies>
        <!-- Hive JDBC Driver -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>3.1.2</version>
        </dependency>
        <!-- Hadoop Common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.2.2</version>
        </dependency>
        <!-- Hadoop Client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.2.2</version>
        </dependency>
    </dependencies>
</project>

Java文件

代码语言:javascript

复制

package com.jerry;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
public class HiveClient {
    private static final String DRIVER_CLASS = "org.apache.hive.jdbc.HiveDriver";
    private static final String CONNECTION_URL = "jdbc:hive2://192.168.31.184:10000/demo";
    private static PreparedStatement preparedstatement;
    private static Statement statement;
    private static ResultSet resultSet = null;
    //链接
    private Connection getConnection() throws SQLException {
        try {
            Class.forName(DRIVER_CLASS);
            Connection con = DriverManager.getConnection(CONNECTION_URL);
            statement = con.createStatement();
            return con;
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            throw new SQLException(e.getMessage());
        }
    }
    
    //断开链接
    public void disconnect(Connection con) throws SQLException {
      // Close resources
        resultSet.close();
        statement.close();
        con.close();
    }
    
    //执行查询
    public void query(String query) throws SQLException {
      // Execute a query
        resultSet = statement.executeQuery(query);
    }
    
    //带条件执行查询
    public void query(Connection con,String query,Map<String, String> condition) throws SQLException {
      String where = " where ";
      int i = 0;
      int length = condition.size(); 
      String[] valuearray= new String[length];
      for (String key : condition.keySet()) {
         String value = condition.get(key);
         where = where+key+" = ? AND ";
         valuearray[i] = value;
         i++;
      }
      where = where + "1=1";
      query = query + where;
      PreparedStatement preparedStatement = con.prepareStatement(query);
      for(int j=0;j<length;j++) {
        preparedStatement.setString(j+1, valuearray[j]);
      }
      resultSet = preparedStatement.executeQuery();;
    }
    
    //打印查询记录
    public void printQueryResult(ResultSet resultSet) throws SQLException {
      //获取 ResultSet 的元数据
        ResultSetMetaData metaData = resultSet.getMetaData();
        // 获取列数
        int columnCount = metaData.getColumnCount();
        while (resultSet.next()) {
          for (int i=1;i<=columnCount;i++) {
            System.out.print(resultSet.getString(i)+",");
          }
          System.out.println("");
        }
    }
    
    //查询并且打印数据
    public void queryAndPrint(String query) throws SQLException {
      query(query);
      printQueryResult(resultSet);
    }
    
    //查询并且打印数据
    public void queryAndPrint(Connection con,String query,Map<String, String> condition) throws SQLException {
      query(con,query,condition);
      printQueryResult(resultSet);
    }
    
    //添加数据
    public void addDataToHiveTable(Connection con,String tableName,String[] newValue,String like,String map) {
        try {
          String insertSql = "INSERT INTO person SELECT ?,?,?,"+like+","+map;
          System.out.println(like);
          preparedstatement = con.prepareStatement(insertSql);
          preparedstatement.setInt(1, Integer.parseInt(newValue[0]));
          preparedstatement.setString(2, newValue[1]);
          preparedstatement.setInt(3, Integer.parseInt(newValue[2]));
          preparedstatement.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
    
    //将文件中的数据加载到表中
    public void loadDataForLocal(String tableName,String path) throws SQLException {
      String query = "LOAD DATA LOCAL INPATH '"+path+"' INTO TABLE "+tableName;
      statement.execute(query);
    }
    
    //清空数据表
    public void truncateTable(Connection con,String tableName) throws SQLException {
      String query = "truncate table "+tableName;
      con.setAutoCommit(true); // 确保自动提交
      Statement statement = con.createStatement();
      statement.execute(query);
    }
    
public static void main(String[] args) throws SQLException {
  HiveClient hive = new HiveClient();
  String tableName = "person";
  String like = "array('basketball', 'music', 'dance')";
  String map = "map('address','xxxx')";
  String[] newAddValue = {"10","elite0","50"};
  Connection con = hive.getConnection();
  String query = "SELECT * FROM "+tableName;
  Map<String, String> condition = new HashMap<String, String>();
  condition.put("name","elite0");
  condition.put("age","50");
  String inpath = "/home/jerry/hive/person";
  try {
    System.out.println("全表查询:");
    hive.queryAndPrint(query);
    hive.addDataToHiveTable(con,tableName,newAddValue,like,map);
    System.out.println("插入数据后全表查询:");
    hive.queryAndPrint(query);
    System.out.println("条件查询:");
    hive.queryAndPrint(con,query,condition);
    hive.truncateTable(con,tableName);
    System.out.println("清空表:");
    hive.queryAndPrint(query);
    hive.loadDataForLocal(tableName,inpath);
    System.out.println("从文件中加载:");
    hive.queryAndPrint(query);
    hive.disconnect(con);
  } catch (Exception e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
  }
  }
}

运行结果

代码语言:javascript

复制

全表查询:
1,elite0,10,["basketball","music","dance"],{"adderss":"xx"},
2,elite1,20,["basketball","music","dance"],{"adderss":"xx"},
3,elite2,10,["basketball","music","dance"],{"adderss":"xx"},
4,elite3,20,["basketball","music","dance"],{"adderss":"xx"},
5,elite4,10,["basketball","music","dance"],{"adderss":"xx"},
6,elite5,20,["basketball","music","dance"],{"adderss":"xx"},
array('basketball', 'music', 'dance')
插入数据后全表查询:
10,elite0,50,["basketball","music","dance"],{"address":"xxxx"},
1,elite0,10,["basketball","music","dance"],{"adderss":"xx"},
2,elite1,20,["basketball","music","dance"],{"adderss":"xx"},
3,elite2,10,["basketball","music","dance"],{"adderss":"xx"},
4,elite3,20,["basketball","music","dance"],{"adderss":"xx"},
5,elite4,10,["basketball","music","dance"],{"adderss":"xx"},
6,elite5,20,["basketball","music","dance"],{"adderss":"xx"},
条件查询:
10,elite0,50,["basketball","music","dance"],{"address":"xxxx"},
清空表:
从文件中加载:
1,elite0,10,["basketball","music","dance"],{"adderss":"xx"},
2,elite1,20,["basketball","music","dance"],{"adderss":"xx"},
3,elite2,10,["basketball","music","dance"],{"adderss":"xx"},
4,elite3,20,["basketball","music","dance"],{"adderss":"xx"},
5,elite4,10,["basketball","music","dance"],{"adderss":"xx"},
6,elite5,20,["basketball","music","dance"],{"adderss":"xx"},

3 用Python开发Hive应用

pip3

代码语言:javascript

复制

pip install sasl
pip install thrift
pip install thrift-sasl
pip install PyHive

Python

代码语言:javascript

复制

import pandas as pd
from pyhive import hive
from sqlalchemy import create_engine
from pyhive import hive
class Hive:
    def __init__(self):
        self.database= 'demo'
        self.host = '192.168.31.184'
        self.port = '10000'
        
    def getconnect(self):
        conn = hive.Connection(host=self.host, port=self.port,database=self.database)
        return conn;
        
    def getEngine(self):
        # 创建 Hive 数据库连接
        hive_uri = f"hive://"+self.host+":"+self.port+"/"+self.database
        return create_engine(hive_uri)
    def disconnect(self,engine,conn):
        engine.dispose()
        conn.close()
    #执行查询
    def query(self,sql,engine,condition=None):
        try:
            if condition is None:
            # 执行 SQL 查询
                df = pd.read_sql(sql, engine)
                print(df)
            else:
                values = []
                where = " where "
                for key in condition:
                    where = where+key+" = %s and "
                    values.append(condition[key])
                where = where+"1=1"
                sql = sql + where
                params = tuple(values)
                df = pd.read_sql(sql, engine, params=params)
                print(df)
        except Exception as e:
            print("Error occurred:", e)
    #添加数据
    def addDataToHiveTable(self,conn,tableName,data):
        like_array = f"array({', '.join(map(lambda x: f'\'{x}\'', data['like']))})"  # 使用单引号包裹字符串
        address_map = f"map('{list(data['address'].keys())[0]}', '{list(data['address'].values())[0]}')"  # 创建 MAP 格式
        # 创建游标
        cursor = conn.cursor()
        insertSql = "INSERT INTO person SELECT %s,%s,%s,"+like_array+","+address_map
        # 执行插入操作
        try:
            cursor.execute(insertSql, (data['id'], data['name'], data['age']))
        except Exception as e:
            print(f"Error inserting data: {e}")
        conn.commit()
        cursor.close()
    #将文件中的数据加载到表中
    def loadDataForLocal(self,conn,tableName,path):
        cursor = conn.cursor()
        query = "LOAD DATA LOCAL INPATH '"+path+"' INTO TABLE "+tableName
        cursor.execute(query)
        conn.commit()
        cursor.close()
    
    #清空数据表
    def truncateTable(self,conn,tableName):
        cursor = conn.cursor()
        query = "truncate table "+tableName;
        #con.setAutoCommit(true) #确保自动提交
        cursor.execute(query)
        conn.commit()
        cursor.close()
        
if __name__ == "__main__":
    sql = "SELECT * FROM person"
    condition={"name":"elite1","age":"20"}
    # 准备要插入的数据
    data = {
        'id': "50",
        'name': "Jerry",
        'age': 50,  # 确保这里是整数
        'like': ["basketball", "music", "dance"],
        'address': {"address": "xx"}
    }
    tableName = "person"
    path = "/home/jerry/hive/person"
    myhive = Hive()
    print("建立连接")
    conn = myhive.getconnect()
    engine = myhive.getEngine()
    print("全表查询")
    myhive.query(sql,engine)
    print("条件查询")
    myhive.query(sql,engine,condition)
    print("加数据进入表")
    myhive.addDataToHiveTable(conn,tableName,data)
    myhive.query(sql,engine)
    print("清空表中所有数据")
    myhive.truncateTable(conn,tableName)
    print("从文件中导入数据")
    myhive.loadDataForLocal(conn,tableName,path)
    myhive.query(sql,engine)
    print("断开连接")
    myhive.disconnect(engine,conn)
  • connect:用于其他操作
  • engine:用于查询

运行结果

代码语言:javascript

复制

建立连接
全表查询
   id    name  age                           likes           address
0   1  elite0   10  ["basketball","music","dance"]  {"adderss":"xx"}
1   2  elite1   20  ["basketball","music","dance"]  {"adderss":"xx"}
2   3  elite2   10  ["basketball","music","dance"]  {"adderss":"xx"}
3   4  elite3   20  ["basketball","music","dance"]  {"adderss":"xx"}
4   5  elite4   10  ["basketball","music","dance"]  {"adderss":"xx"}
5   6  elite5   20  ["basketball","music","dance"]  {"adderss":"xx"}
条件查询
   id    name  age                           likes           address
0   2  elite1   20  ["basketball","music","dance"]  {"adderss":"xx"}
加数据进入表
   id    name  age                           likes           address
0  50   Jerry   50  ["basketball","music","dance"]  {"address":"xx"}
1   1  elite0   10  ["basketball","music","dance"]  {"adderss":"xx"}
2   2  elite1   20  ["basketball","music","dance"]  {"adderss":"xx"}
3   3  elite2   10  ["basketball","music","dance"]  {"adderss":"xx"}
4   4  elite3   20  ["basketball","music","dance"]  {"adderss":"xx"}
5   5  elite4   10  ["basketball","music","dance"]  {"adderss":"xx"}
6   6  elite5   20  ["basketball","music","dance"]  {"adderss":"xx"}
清空表中所有数据
从文件中导入数据
   id    name  age                           likes           address
0   1  elite0   10  ["basketball","music","dance"]  {"adderss":"xx"}
1   2  elite1   20  ["basketball","music","dance"]  {"adderss":"xx"}
2   3  elite2   10  ["basketball","music","dance"]  {"adderss":"xx"}
3   4  elite3   20  ["basketball","music","dance"]  {"adderss":"xx"}
4   5  elite4   10  ["basketball","music","dance"]  {"adderss":"xx"}
5   6  elite5   20  ["basketball","music","dance"]  {"adderss":"xx"}
断开连接
目录
相关文章
|
13天前
|
Java API 微服务
2025 年 Java 核心技术全面升级与实战应用详解
这份Java校招实操内容结合了最新技术趋势,涵盖核心技术、微服务架构、响应式编程、DevOps及前沿技术等六大模块。从函数式编程到Spring Cloud微服务,再到容器化与Kubernetes部署,帮助你掌握企业级开发技能。同时,提供AI集成、区块链实践和面试技巧,包括高频算法题与系统设计案例。通过学习这些内容,可应对90%以上的Java校招技术面试,并快速上手实际项目开发。资源链接:[点此获取](https://pan.quark.cn/s/14fcf913bae6)。
100 41
|
17天前
|
存储 安全 Java
现代应用场景中 Java 集合框架的核心技术与实践要点
本内容聚焦Java 17及最新技术趋势,通过实例解析Java集合框架的高级用法与性能优化。涵盖Record类简化数据模型、集合工厂方法创建不可变集合、HashMap初始容量调优、ConcurrentHashMap高效并发处理、Stream API复杂数据操作与并行流、TreeMap自定义排序等核心知识点。同时引入JMH微基准测试与VisualVM工具分析性能,总结现代集合框架最佳实践,如泛型使用、合适集合类型选择及线程安全策略。结合实际案例,助你深入掌握Java集合框架的高效应用与优化技巧。
40 4
|
14天前
|
人工智能 Java
Java中的反射机制:深入探索与应用
本文介绍了Java反射机制的基本概念、用途及其实现方式。反射机制允许程序在运行时动态获取类的属性和方法,并调用它们,适用于处理私有成员或权限受限的情况。文章详细讲解了`Class`类的功能,包括获取类的方法、属性、注解、构造器等信息,以及通过四种方式获取`Class`对象的示例代码。此外,还探讨了类加载器、继承关系判断、动态代理等高级内容,展示了如何在运行时创建接口实例并处理方法调用。文末提供了完整的代码示例以加深理解。
Java中的反射机制:深入探索与应用
|
15天前
|
SQL Java 数据库
解决Java Spring Boot应用中MyBatis-Plus查询问题的策略。
保持技能更新是侦探的重要素质。定期回顾最佳实践和新技术。比如,定期查看MyBatis-Plus的更新和社区的最佳做法,这样才能不断提升查询效率和性能。
59 1
|
25天前
|
存储 监控 算法
企业数据泄露风险防控视域下 Python 布隆过滤器算法的应用研究 —— 怎样防止员工私下接单,监控为例
本文探讨了布隆过滤器在企业员工行为监控中的应用。布隆过滤器是一种高效概率数据结构,具有空间复杂度低、查询速度快的特点,适用于大规模数据过滤场景。文章分析了其在网络访问监控和通讯内容筛查中的实践价值,并通过Python实现示例展示其技术优势。同时,文中指出布隆过滤器存在误判风险,需在准确性和资源消耗间权衡。最后强调构建多维度监控体系的重要性,结合技术与管理手段保障企业运营安全。
48 10
|
19天前
|
前端开发 Java 数据库连接
Java 编程进阶实操之工具集整合应用指南
本文聚焦Java编程进阶实操,涵盖并发编程、性能优化及数据库操作优化等核心知识点,并结合Hutool、Postman、Git等实用工具,提供从理论到实践的学习路径。通过小型图书管理系统实战项目,详细解析技术选型与实现步骤,助力开发者掌握Spring Boot、MyBatis等框架应用。同时展望Java新特性与技术趋势,为职业发展奠定基础。资源链接:[点此获取](https://pan.quark.cn/s/14fcf913bae6)。
52 1
|
22天前
|
机器学习/深度学习 算法 测试技术
图神经网络在信息检索重排序中的应用:原理、架构与Python代码解析
本文探讨了基于图的重排序方法在信息检索领域的应用与前景。传统两阶段检索架构中,初始检索速度快但结果可能含噪声,重排序阶段通过强大语言模型提升精度,但仍面临复杂需求挑战
56 0
图神经网络在信息检索重排序中的应用:原理、架构与Python代码解析
|
1月前
|
安全 Java API
Spring Boot 功能模块全解析:构建现代Java应用的技术图谱
Spring Boot不是一个单一的工具,而是一个由众多功能模块组成的生态系统。这些模块可以根据应用需求灵活组合,构建从简单的REST API到复杂的微服务系统,再到现代的AI驱动应用。
262 8
|
12月前
|
数据采集 数据可视化 大数据
Python在大数据处理中的应用实践
Python在大数据处理中扮演重要角色,借助`requests`和`BeautifulSoup`抓取数据,`pandas`进行清洗预处理,面对大规模数据时,`Dask`提供分布式处理能力,而`matplotlib`和`seaborn`则助力数据可视化。通过这些工具,数据工程师和科学家能高效地管理、分析和展示海量数据。
553 4
|
7月前
|
设计模式 开发者 Python
Python编程中的设计模式应用与实践感悟####
本文作为一篇技术性文章,旨在深入探讨Python编程中设计模式的应用价值与实践心得。在快速迭代的软件开发领域,设计模式如同导航灯塔,指引开发者构建高效、可维护的软件架构。本文将通过具体案例,展现设计模式如何在实际项目中解决复杂问题,提升代码质量,并分享个人在实践过程中的体会与感悟。 ####

推荐镜像

更多