fromdli.loggerimportloggerfrompyDLIimportdliimportpandasaspdendpoint="dli.cn-south-1.myhuaweicloud.com"project_id="1f3e574677d54fa893c3bfda48386eff"account="dragon7421"username="chuguoyu"password="sf01394546"ak="9KHRA1FTPVBYNSEJBUIJ"sk="JUxdOhetUKJEOv14ivX5HR8GzvOxcNlA75sBd8qN"queue_name="super_list"database="tpch"defget_connection(auth):
ifauth=='token':
conn=dli.Connection(host="dli://%s/%s?queuename=%s&database=%s"% (endpoint, project_id, queue_name, database),
account=account, username=username, password=password, auth="token")
else:
conn=dli.Connection(host="dli://%s/%s?queuename=%s&database=%s"% (endpoint, project_id, queue_name, database),
ak=ak, sk=sk, auth="aksk")
returnconndefdli_result2dataframe(cursor):
print("job result:")
result_list=cursor.fetchall()
result_list=pd.DataFrame(result_list)
print(result_list)
returnresult_listdefprint_result(cursor):
print("schema info:")
forcol_infoincursor.description:
print("\t%s"% (col_info,))
print("job result:")
result_list=cursor.fetchall()
forrow_datainresult_list:
print("\t%s"%row_data)
print(cursor.rownumber)
defexec_sql(cursor, sql, parameters=None, options=None):
cursor.execute(sql, parameters, options)
returndli_result2dataframe(cursor)
defexec_sql_async(cursor, sql, parameters=None, options=None):
cursor.execute(sql, parameters, options, async_=True)
status=cursor.poll()
whilestatusin ('UNKNOWN', 'LAUNCHING', 'RUNNING'):
logger.info("current job status is: %s"%status)
status=cursor.poll()
print_result(cursor)
defexec_sql_many(cursor, sql, seq_of_parameters):
cursor.executemany(sql, seq_of_parameters)
print_result(cursor)
defmain():
conn=get_connection("aksk")
exec_sql(conn.cursor(), "SELECT * FROM liangfang_car_event_recognition")
exec_sql(conn.cursor(), "INSERT overwrite TABLE liangfang_car_event_recognition SELECT * FROM liangfang_car_event_recognition limit 5")
exec_sql(conn.cursor(), "SELECT * FROM liangfang_car_event_recognition")
if__name__=='__main__':
main()