- 场景
在建立客户自研的数据门户时,经常需要用到Dataphin中的元数据进行二次开发和分析,如数据开发情况和资产治理情况等,今天我们来讲一下如何通过元数据共享模型来获取Dataphin的元数据
- 解决方案及功能
元数据共享模型是一系列元数据相关的物理表,基于Dataphin的系统元数据和相应计算引擎的元数据加工汇总而成,存在Dataphin元仓租户下的元仓项目中, 数据产出时效为T+1。访问元仓共享模型,可以通过集成任务将元仓中所需的元仓共享模型的数据表从元仓租户的计算源同步到开发使用的业务租户的计算源或数据源中。共享模型表每个分区存储全量数据,建议直接使用最新分区的数据,避免因为历史分区数据不完整影响下游业务。
1、进入元仓租户(需要特定的用户账号密码),找到元仓初始化绑定的计算源(跟元仓租户管理员获取计算源的连接信息,之后在业务租户配置数据源的时候会用到)
2、切换至开发使用的业务租户,将元仓项目计算源配置为当前租户的数据源
3、接下来就可以将元仓共享模型的计算源中的数据通过集成任务同步到到我们自己想要的数据库中,其中周期调度配置上游依赖可以有多种方式:
①为了方便统一调整起调时间,可以给所有相关的集成任务配置一个统一的VIRTUAL(虚拟)任务节点为上游依赖,通过调整VIRTUAL(虚拟)任务节点的定时调度时间(比如说凌晨3:30,通常情况下,计算资源充足时元仓任务可在凌晨2点钟之前运行完成)来控制整体集成任务的运行。最后将创建的集成任务节点依赖于创建的虚拟任务节点
②如果您希望保证使用最新数据并可以及时获取数据产出通知,可以创建检查任务(如pyhive,具体根据您自身业务租户的计算引擎),并直接将检查任务配置为所有的集成任务的上游,检查任务检查到共享模型数据表更新后会触发下游集成任务的调度。
import time from pyhive import hive # Hive 连接详情 HIVE_HOST = '你的 Hive 服务器主机名或 IP 地址' HIVE_PORT = 10000 # 默认 HiveServer2 端口 HIVE_USER = '你的用户名' HIVE_DATABASE = '你的默认数据库' # 通信表可以用来检查最新数据是否产出,每天元仓共享模型产出后,将更新通信表数据;部分共享模型表和是否购买对应增值模块有关,如数据安全相关数据表。如果未购买对应功能模块,相关表将没有数据产出,此时不会产出对应通信表 table_list = [ 'data_share_finish', 'data_security_finish', 'data_service_finish', 'data_quality_finish', 'data_standard_finish' ] # 定义你的业务日期变量 ds = '${bizdate}' # 初始睡眠时间(秒) sleep_time = 15 # 存储准备好分区的表的列表 ready_table_list = [] # 最大重试次数 max_retries = 2000 # 连接到 Hive conn = hive.Connection(host=HIVE_HOST, port=HIVE_PORT, username=HIVE_USER, database=HIVE_DATABASE) # 创建游标 cursor = conn.cursor() # 循环执行最大重试次数 for i in range(1, max_retries + 1): for table in table_list: # 查询检查分区是否存在 query = f"SHOW PARTITIONS {table} LIKE '{ds}'" # 执行查询 cursor.execute(query) # 获取结果 partitions = cursor.fetchall() if len(partitions) > 0: # 分区存在,从列表中移除并标记为已准备好 table_list.remove(table) ready_table_list.append(table) print(f"表 {table} 的分区 {ds} 已准备好。") break else: # 分区尚未存在,打印状态并等待 print(f"等待 {sleep_time} 秒,表 {table} 的分区 {ds} 尚未准备好。") time.sleep(sleep_time) if not table_list: # 所有表都已准备好,退出循环 break # 检查是否所有分区都被找到 if not table_list: print(f"所有元数据表 ({', '.join(ready_table_list)}) 的分区已准备好。") else: raise Exception(f"有些分区尚未准备好: {', '.join(table_list)}")