根据《用Java、Python来开发Hive应用》一文,建立了使用Python、来开发Hive应用的方法,产生的代码如下(做了修改):
代码语言:javascript
复制
import pandas as pd from pyhive import hive from sqlalchemy import create_engine from pyhive import hive class Hive: def __init__(self): self.database= 'demo' self.host = '192.168.31.184' self.port = '10000' def getConnect(self): conn = hive.Connection(host=self.host, port=self.port,database=self.database) return conn; def getEngine(self): # 创建 Hive 数据库连接 hive_uri = f"hive://"+self.host+":"+self.port+"/"+self.database return create_engine(hive_uri) def disconnect(self,engine,conn): engine.dispose() conn.close() #执行查询 def query(self,sql,engine,condition=None): try: if condition is None: # 执行 SQL 查询 rs = pd.read_sql(sql, engine) return rs else: values = [] where = " where " for key in condition: where = where+key+" = %s and " values.append(condition[key]) where = where+"1=1" sql = sql + where params = tuple(values) rs = pd.read_sql(sql, engine, params=params) return rs except Exception as e: print("Error occurred:", e) #添加数据 def addDataToHiveTable(self,conn,tableName,data): like_array = f"array({', '.join(map(lambda x: f'\'{x}\'', data['like']))})" # 使用单引号包裹字符串 address_map = f"map('{list(data['address'].keys())[0]}', '{list(data['address'].values())[0]}')" # 创建 MAP 格式 # 创建游标 cursor = conn.cursor() insertSql = "INSERT INTO person SELECT %s,%s,%s,"+like_array+","+address_map # 执行插入操作 try: cursor.execute(insertSql, (data['id'], data['name'], data['age'])) except Exception as e: print(f"Error inserting data: {e}") conn.commit() cursor.close() #将文件中的数据加载到表中 def loadDataForLocal(self,conn,tableName,path): cursor = conn.cursor() query = "LOAD DATA LOCAL INPATH '"+path+"' INTO TABLE "+tableName cursor.execute(query) conn.commit() cursor.close() #清空数据表 def truncateTable(self,conn,tableName): cursor = conn.cursor() query = "truncate table "+tableName; #con.setAutoCommit(true) #确保自动提交 cursor.execute(query) conn.commit() cursor.close()
现在,使用pytest来进行测试。
1)建立全局变量
代码语言:javascript
复制
hive = Hive() tableName = "person" sql = "SELECT * FROM "+tableName conn = None engine = None
2)建立setup_class(self)和teardown_class(self)函数
代码语言:javascript
复制
def setup_class(self): #导入数据路径 path = "/home/jerry/hive/person" #建立连接,conn用于查询相关的SQL self.conn = self.hive.getConnect() #建立引擎,engine用于非查询相关的SQL self.engine = self.hive.getEngine() #导入测试初始化数据 self.hive.loadDataForLocal(self.conn,self.tableName,path) def teardown_class(self): #清空测试数据 self.hive.truncateTable(self.conn,self.tableName) #断开链接 self.hive.disconnect(self.engine,self.conn)
3)测试查询
代码语言:javascript
复制
@allure.feature('Python访问Hive数据库') @allure.story('根据query进行查询') @allure.severity('Critical') #测试根据Query查询 def test_query(self): #建立查询 rs = self.hive.query(self.sql,self.engine) #获得记录个数 row_count = len(rs) #验证记录个数 assert row_count == 6 #遍历记录 for index, row in rs.iterrows(): #验证编号行是不是数字 assert isinstance(row.iloc[0], int) #验证姓名行是不是包含"elite" assert "elite" in row.iloc[1] #验证年龄行是不是数字 assert isinstance(row.iloc[2], int) #验证爱好行是不是包含"basketball" assert "basketball" in row.iloc[3] #验证地址行是不是包含"address" assert "address" in row.iloc[4] @allure.feature('Python访问Hive数据库') @allure.story('带一个的条件查询') @allure.severity('Normal') def test_query_with_one_condition(self): #查询条件 condition={"name":"elite1"} #建立查询 rs = self.hive.query(self.sql,self.engine,condition) #获得记录个数 row_count = len(rs) #验证记录个数 assert row_count == 1 #遍历记录 for index, row in rs.iterrows(): #验证是不是符合查询条件 #验证姓名行是不是包含"elite" assert "elite1" == row.iloc[1] @allure.feature('Python访问Hive数据库') @allure.story('带两个的条件查询') @allure.severity('Normal') def test_query_with_Two_condition(self): #查询条件 condition={"name":"elite1","age":"20"} #建立查询 rs = self.hive.query(self.sql,self.engine,condition) #获得记录个数 row_count = len(rs) #验证记录个数 assert row_count == 1 #遍历记录 for index, row in rs.iterrows(): #验证是不是符合查询条件 #验证姓名行是不是"elite" assert "elite1" == row.iloc[1] #验证年龄行是不是50 assert "50" == row.iloc[2] #验证编号行是不是数字 assert isinstance(row.iloc[0], int) #验证姓名行是不是包含"elite" assert "elite" in row.iloc[1] #验证年龄行是不是数字 assert isinstance(row.iloc[2], int) #验证爱好行是不是包含"basketball" assert "basketball" in row.iloc[3] #验证地址行是不是包含"address" assert "address" in row.iloc[4] @allure.feature('Python访问Hive数据库') @allure.story('带两个的条件查询') @allure.severity('Normal') def test_query_with_Two_condition(self): #查询条件 condition={"name":"elite1","age":"20"} #建立查询 rs = self.hive.query(self.sql,self.engine,condition) #获得记录个数 row_count = len(rs) #验证记录个数 assert row_count == 1 #遍历记录 for index, row in rs.iterrows(): #验证是不是符合查询条件 #验证姓名行是不是"elite" assert "elite1" == row.iloc[1] #验证年龄行是不是20 assert 20 == row.iloc[2] @allure.feature('Python访问Hive数据库') @allure.story('带三个的条件查询') @allure.severity('Normal') def test_query_with_three_condition(self): #查询条件 condition={"id":"1","name":"elite0","age":"10"} #建立查询 rs = self.hive.query(self.sql,self.engine,condition) #获得记录个数 row_count = len(rs) #验证记录个数 assert row_count == 1 #遍历记录 for index, row in rs.iterrows(): #验证是不是符合查询条件 #验证编号行是不是数字 assert 1 == row.iloc[0] #验证姓名行是不是包含"elite" assert "elite0" in row.iloc[1] #验证年龄行是不是数字 assert 10 == row.iloc[2]
4)测试添加数据
代码语言:javascript
复制
@allure.feature('Python访问Hive数据库') @allure.story('插入数据') @allure.severity('Normal') def test_addDataToHiveTable(self): #构造插入数据 data = { 'id': "50", 'name': "Jerry", 'age': "50", 'like': ["basketball", "music", "dance"], 'address': {"address": "xx"} } #插入数据 self.hive.addDataToHiveTable(self.conn,self.tableName,data) #查询插入数据是否可以查询出来 condition = {"name":"Jerry","age":"50"} rs = self.hive.query(self.sql,self.engine,condition) row_count = len(rs) assert row_count == 1 #验证插入数据 for index, row in rs.iterrows(): assert "Jerry" in row.iloc[1] assert "50" in str(row.iloc[2])
主函数改为
代码语言:javascript
复制
if __name__ == '__main__': pytest.main(['-sv', '-q', '--alluredir', './report/xml'])
建立项目文件
代码语言:javascript
复制
environment.properties Project Name=Hive Author = Jerry Gu System Version= Win11 java version "17.0.10" Allure Version= 2.20.1
代码语言:javascript
复制
pytest --alluredir=.\report\xml copy environment.properties .\report\xml allure serve .\report\xml\