Dataphin功能Tips系列(24)-如何通过元数据共享模型获取Dataphin的元数据

本文涉及的产品
智能数据建设与治理Dataphin,200数据处理单元
简介: 如何通过元数据共享模型获取Dataphin的元数据
  • 场景

在建立客户自研的数据门户时,经常需要用到Dataphin中的元数据进行二次开发和分析,如数据开发情况和资产治理情况等,今天我们来讲一下如何通过元数据共享模型来获取Dataphin的元数据

  • 解决方案及功能

元数据共享模型是一系列元数据相关的物理表,基于Dataphin的系统元数据和相应计算引擎的元数据加工汇总而成,存在Dataphin元仓租户下的元仓项目中, 数据产出时效为T+1。访问元仓共享模型,可以通过集成任务将元仓中所需的元仓共享模型的数据表从元仓租户的计算源同步到开发使用的业务租户的计算源数据源中。共享模型表每个分区存储全量数据,建议直接使用最新分区的数据,避免因为历史分区数据不完整影响下游业务。

1、进入元仓租户(需要特定的用户账号密码),找到元仓初始化绑定的计算源(跟元仓租户管理员获取计算源的连接信息,之后在业务租户配置数据源的时候会用到)

2、切换至开发使用的业务租户,将元仓项目计算源配置为当前租户的数据源

3、接下来就可以将元仓共享模型的计算源中的数据通过集成任务同步到到我们自己想要的数据库中,其中周期调度配置上游依赖可以有多种方式:

①为了方便统一调整起调时间,可以给所有相关的集成任务配置一个统一的VIRTUAL(虚拟)任务节点为上游依赖,通过调整VIRTUAL(虚拟)任务节点的定时调度时间(比如说凌晨3:30,通常情况下,计算资源充足时元仓任务可在凌晨2点钟之前运行完成)来控制整体集成任务的运行。最后将创建的集成任务节点依赖于创建的虚拟任务节点

②如果您希望保证使用最新数据并可以及时获取数据产出通知,可以创建检查任务(如pyhive,具体根据您自身业务租户的计算引擎),并直接将检查任务配置为所有的集成任务的上游,检查任务检查到共享模型数据表更新后会触发下游集成任务的调度。

import time
from pyhive import hive
# Hive 连接详情
HIVE_HOST = '你的 Hive 服务器主机名或 IP 地址'
HIVE_PORT = 10000  # 默认 HiveServer2 端口
HIVE_USER = '你的用户名'
HIVE_DATABASE = '你的默认数据库'
# 通信表可以用来检查最新数据是否产出,每天元仓共享模型产出后,将更新通信表数据;部分共享模型表和是否购买对应增值模块有关,如数据安全相关数据表。如果未购买对应功能模块,相关表将没有数据产出,此时不会产出对应通信表
table_list = [
    'data_share_finish',
    'data_security_finish',
    'data_service_finish',
    'data_quality_finish',
    'data_standard_finish'
]
# 定义你的业务日期变量
ds = '${bizdate}'
# 初始睡眠时间(秒)
sleep_time = 15
# 存储准备好分区的表的列表
ready_table_list = []
# 最大重试次数
max_retries = 2000
# 连接到 Hive
conn = hive.Connection(host=HIVE_HOST, port=HIVE_PORT, username=HIVE_USER, database=HIVE_DATABASE)
# 创建游标
cursor = conn.cursor()
# 循环执行最大重试次数
for i in range(1, max_retries + 1):
    for table in table_list:
        # 查询检查分区是否存在
        query = f"SHOW PARTITIONS {table} LIKE '{ds}'"
        
        # 执行查询
        cursor.execute(query)
        
        # 获取结果
        partitions = cursor.fetchall()
        
        if len(partitions) > 0:
            # 分区存在,从列表中移除并标记为已准备好
            table_list.remove(table)
            ready_table_list.append(table)
            print(f"表 {table} 的分区 {ds} 已准备好。")
            break
        else:
            # 分区尚未存在,打印状态并等待
            print(f"等待 {sleep_time} 秒,表 {table} 的分区 {ds} 尚未准备好。")
            time.sleep(sleep_time)
    
    if not table_list:
        # 所有表都已准备好,退出循环
        break
# 检查是否所有分区都被找到
if not table_list:
    print(f"所有元数据表 ({', '.join(ready_table_list)}) 的分区已准备好。")
else:
    raise Exception(f"有些分区尚未准备好: {', '.join(table_list)}")
相关文章
|
1月前
|
运维 数据处理 调度
Dataphin功能Tips系列(30)-限流配置
某大型电商平台在每天的凌晨时段需要进行大量的数据处理任务,比如订单处理、库存同步、用户行为分析等。此外,平台还需要定期进行历史数据的补数据工作,以确保数据完整性和一致性。在进行补数据时,如果需要补的历史时间周期比较长,这些批处理任务会消耗大量的计算资源,导致批处理任务(如订单处理、库存同步)响应变慢甚至超时失败,这是我们应该怎么保障每天的批处理任务(订单处理、库存同步)的按时产出?
|
1月前
Dataphin功能Tips系列(27)-排他编辑锁
在实际开发中,为了避免多人同时编辑同一份代码而导致的问题,通常会采用锁机制来保护代码。然而,普通的锁机制有时并不能完全阻止其他开发人员在编辑时抢占锁,这使得用户可互相覆盖锁定状态,在dataphin中如何解决这一问题?
Dataphin功能Tips系列(27)-排他编辑锁
|
2月前
|
消息中间件 Kafka 搜索推荐
|
2月前
|
API 开发工具
|
1月前
|
数据处理 调度
Dataphin功能Tips系列(31)-自定义资源组
某零售企业最近在做促销活动,希望保证某些数据处理任务(订单处理、库存更新)任务能够快速按时完成,如何保证这些高优任务的调度资源不被其他任务占用,能按时执行?
|
1月前
Dataphin功能Tips系列(29)-计算任务版本对比/版本回滚
开发人员小张先前编写的一个脚本,在进行了修订之后,发现逻辑出现了偏差,但他已经不记得前一版本的具体内容了。在这种情况下,应该怎样通过版本对比来看出两版脚本之间的差别,并且回滚到之前的版本呢?
|
1月前
|
调度 Python
Dataphin功能Tips系列(28)-跨节点参数
某经销零售企业,需要每天定时查询供应商的某个服务,以确认产品目录是否有变更,如果有变更,则全量拉取最新目录数据(数据量比较大,拉取一次成本很高),如果无变更则继续沿用上一次拉取的数据,在dataphin如何实现?
|
1月前
|
数据处理 调度
Dataphin功能Tips系列(26)-事实逻辑表配置数据延迟
零售行业中,订单数据是每天晚上由pos系统同步至数据中台,但门店人员经常会没有及时将订单信息录入pos,也许隔天或是隔几天才录入,这会导致指标的不准确性,数据中台的开发人员往往需要进行批量补历史分区的数据,这时怎么才能减轻开发人员的工作,让系统能够自动补前几天分区中的事实逻辑表中的数据呢?
|
6月前
|
SQL DataWorks 监控
Dataphin常见问题之数据怎么都补不过去如何解决
Dataphin是阿里云提供的一站式数据处理服务,旨在帮助企业构建一体化的智能数据处理平台。Dataphin整合了数据建模、数据处理、数据开发、数据服务等多个功能,支持企业更高效地进行数据治理和分析。

热门文章

最新文章