如何通过API构建自动补数据工具

本文涉及的产品
智能数据建设与治理Dataphin,200数据处理单元
简介: 在Dataphin V2.9.2及以上版本,Dataphin提供了一套OpenAPI以满足客户定制化的需求。本文为您社少如何通过Open API的运维模块开发适用于特定业务场景的个性化的补数据或运维工具。

Dataphin版本:2.9.2及以上

需开通OpenAPI模块:Dataphin-OpenAPI(运维)


Dataphin平台提供了补数据功能,当需要补数据时,用户可手动对某个节点及其下游的节点的特定业务日期的补数据。在实际场景中,有些上游的数据到达的时间晚于预期时间,比如门店的数据延迟几天或者一个月的时间才收集上报,或者上游的数据错误需进行更正时,就需要进行补数据操作。此类操作重复度高,且由于补数据的时间较长带来较大的运维成本。通过Open API的运维模块就可以根据特定的业务场景开发适用于特定业务场景的个性化的补数据或运维工具。


以下的例子是创建批量补数据工具的API调用的基本步骤:

  1. 根据需要补数据的节点的特点查询节点:ListNode
  2. 选择需要补数据的节点的下游节点,可利用QueryDagFromPhysicalNode查询下游节点
  3. 选择需要补数据的节点及其下游节点和需要补数据的业务日期,创建补数据工作流:CreatePhysicalNodeSupplement
  4. 查询补数据流下的每个业务日期下对应的DagRun的运行状态:ListSupplementDagrun
  5. 查询补数据工作流下的业务日期的补数据实例并获取实例状态:ListSupplementInstance


请注意,批量补数据时,若不想影响线上正常的周期调度任务,您需要控制补数据实例的生成和运行的频次或避开高峰期以控制系统压力。

以下为样例代码

/* import 省略... *//*** 搜索节点,对节点及下游补数据,并查看补数据实例,搜索时假设不确定是哪一个节点,* 即不指定node_id,对返回的结果取第一个节点补数据(为方便示例,假设该节点存在下游)** @date 2021-05-17 17:22*/publicclassSupplementTest {
privatestaticfinalStringENV="PROD";
privatestaticfinalLongtestProjectId=1022928L;
// 实际使用请通过AK/endpoint自行创建API clientprivateDataphinAcsClientclient=LocalDataphinAcsClient.getTestEnvClient();
publicvoidsupplementTest() throwsClientException {
//Step1: 根据需要补数据的节点的特点查询节点NodeOverviewnodeOverview=getOneNode();
//Step2: 获取需要补数据的下游节点,此处假设获取下两层的下游节点intdownStreamDepth,
List<String>downstreamNodes=getDownNode(nodeOverview.getNodeId(), downStreamDepth);
//Step3:选择需要补数据的节点及其下游节点和需要补数据的业务日期,创建补数据工作流StringflowId=createSupplementFlow(nodeOverview, downstreamNodes, "2021-05-11", "2021-05-15");
//Step4: 查询补数据流下的每个业务日期下对应的DagRun的运行状态List<ScheduleDagrun>dagRunList=listSupplementDagrun(flowId);
//Step5: 查询补数据工作流下的业务日期的补数据实例并获取实例状态//查看 2021-05-11 的 补数据实例List<Instance>instances=Lists.newArrayList();
dagRunList.stream().filter(d->d.getBizDate().equals("2021-05-11"))
            .findFirst()
            .ifPresent(d->instances.addAll(listInstanceByDagRun(d.getDagrunId())));
// 查看实例的状态instances.forEach(ins->System.out.println(ins.getStatus()));
    }
/*** 列出 dagRun下面的实例** @param dagRunId dagRunId* @return dagRun下面的实例列表*/privateList<Instance>listInstanceByDagRun(StringdagRunId) {
ListSupplementInstanceRequestrequest=newListSupplementInstanceRequest();
request.setDagRunId(dagRunId);
request.setEnv(ENV);
try {
ListSupplementInstanceResponseresponse=client.getAcsResponse(request);
if(null==response.getData() ||response.getData().size() ==0) {
thrownewRuntimeException("not found any instance by dagrun id "+dagRunId);
            }
returnresponse.getData();
        } catch (ClientExceptione) {
thrownewRuntimeException(e.getMessage());
        }
    }
/*** 列出补数据的 dagrun,当前例子中,正常应该有5个** @param flowId 补数据的工作流的ID* @return dagrun 列表*/privateList<ScheduleDagrun>listSupplementDagrun(StringflowId) throwsClientException {
ListSupplementDagrunRequestrequest=newListSupplementDagrunRequest();
request.setEnv(ENV);
request.setFlowId(flowId);
ListSupplementDagrunResponseresponse=client.getAcsResponse(request);
if(response.getData() ==null||response.getData().size() ==0) {
thrownewRuntimeException("not found any dagrun by flow id "+flowId);
        }
returnresponse.getData();
    }
/*** 创建补数据工作流** @param nodeOverview    补数据起始节点* @param downStreamDepth 补数据的层级深度* @return 补数据的工作流ID*/privateStringcreateSupplementFlow(NodeOverviewnodeOverview, List<String>downstreamNodes, StringminPartition, StringmaxPartition) throwsClientException {
CreatePhysicalNodeSupplementRequestcreateNodeSupplementRequest=newCreatePhysicalNodeSupplementRequest();
NodeSupplementCommandnodeSupplementCommand=newNodeSupplementCommand();
nodeSupplementCommand.setName("open_api_test_20210508_"+System.currentTimeMillis());
nodeSupplementCommand.setProjectId(Long.toString(testProjectId));
nodeSupplementCommand.setMinPartition(minPartition);
nodeSupplementCommand.setMaxPartition(maxPartition);
nodeSupplementCommand.setParallelism(1);
nodeSupplementCommand.setSupplementNodeId(nodeOverview.getNodeId());
//包含全部下游,或者这里可以选择部分下游nodeSupplementCommand.setIncludedNodeIdList(downstreamNodes);
createNodeSupplementRequest.setNodeSupplementCommand(nodeSupplementCommand);
createNodeSupplementRequest.setEnv(ENV);
CreatePhysicalNodeSupplementResponsecreateNodeSupplementResponse=client.getAcsResponse(
createNodeSupplementRequest);
returncreateNodeSupplementResponse.getFlowId();
    }
/*** 查询节点的下游** @param startNodeId     起始节点* @param downStreamDepth 下游层级深度* @return 下游节点的ID列表*/privateList<String>getDownNode(StringstartNodeId, intdownStreamDepth) throwsClientException {
QueryDagFromPhysicalNodeRequestqueryNodeDagDownStreamRequest=newQueryDagFromPhysicalNodeRequest();
NodeDagQueryCommandnodeDagQueryDownStreamCommand=newNodeDagQueryCommand();
nodeDagQueryDownStreamCommand.setStartNodeId(startNodeId);
//只搜索向下两层的节点nodeDagQueryDownStreamCommand.setDownStreamDepth(downStreamDepth);
nodeDagQueryDownStreamCommand.setUpStreamDepth(0);
queryNodeDagDownStreamRequest.setNodeDagQueryCommand(nodeDagQueryDownStreamCommand);
queryNodeDagDownStreamRequest.setEnv(ENV);
QueryDagFromPhysicalNodeResponsequeryNodeDagDownStreamResponse=client.getAcsResponse(queryNodeDagDownStreamRequest);
NodeDagInfonodeDagDownStreamInfo=queryNodeDagDownStreamResponse.getNodeDagInfo();
List<LogicalNodeInfo>downNodes=nodeDagDownStreamInfo.getNodes();
if(null==downNodes) {
returnLists.newArrayListWithCapacity(0);
        }
returndownNodes.stream().map(n->n.getBasicInfo().getNodeId().getId()).collect(Collectors.toList());
    }
/*** 列出节点列表*/privateNodeOverviewgetOneNode() throwsClientException {
ListNodesRequest.PageParampageParam=newPageParam();
pageParam.setPageNum(1);
pageParam.setPageSize(20);
ListNodesRequestlistNodesRequest=newListNodesRequest();
ListNodesRequest.NodeQueryCommandnodeQueryCommand=newNodeQueryCommand();
nodeQueryCommand.setNodeBizType("SCRIPT");
nodeQueryCommand.setNodeScheduleType("NORMAL");
listNodesRequest.setNodeQueryCommand(nodeQueryCommand);
listNodesRequest.setPageParam(pageParam);
listNodesRequest.setProjectId(testProjectId);
listNodesRequest.setEnv(ENV);
ListNodesResponselistNodesResponse=client.getAcsResponse(listNodesRequest);
PagedDataoverviewPagedData=listNodesResponse.getPagedNodes();
List<NodeOverview>nodeOverviewList=overviewPagedData.getData();
if(null==nodeOverviewList||nodeOverviewList.size() ==0) {
thrownewRuntimeException("not found any node");
        }
returnnodeOverviewList.get(0);
    }
}


点击这里查看Dataphin OpenAPI概览。

目录
相关文章
|
1月前
|
API 数据库 决策智能
基于百炼平台qwen-max的api 打造一套 检索增强 图谱增强 智能工具调用决策的智能体
本文介绍了一种基于阿里云百炼平台的`qwen-max` API构建的智能体方案,该方案集成了检索增强、图谱增强及智能工具调用决策三大模块,旨在通过结合外部数据源、知识图谱和自动化决策提高智能回答的准确性和丰富度。通过具体代码示例展示了如何实现这些功能,最终形成一个能灵活应对多种查询需求的智能系统。
184 11
|
1月前
|
自然语言处理 NoSQL API
基于百炼平台qwen-max的api 打造一套 检索增强 图谱增强 基于指令的智能工具调用决策 智能体
基于百炼平台的 `qwen-max` API,设计了一套融合检索增强、图谱增强及指令驱动的智能工具调用决策系统。该系统通过解析用户指令,智能选择调用检索、图谱推理或模型生成等工具,以提高问题回答的准确性和丰富性。系统设计包括指令解析、工具调用决策、检索增强、图谱增强等模块,旨在通过多种技术手段综合提升智能体的能力。
199 5
|
14天前
|
存储 数据挖掘 BI
API数据源:轻松接入各类业务系统数据
在数字化转型中,企业面临多样化的数据需求。Quick BI推出API数据源功能,支持广泛的数据接入,包括实时天气、电商交易及内部业务数据,极大丰富了可分析数据范围。该功能提供灵活的连接方式(抽取和直连模式)、多元授权机制(基础认证、前置请求)和自动化数据解析,降低了操作门槛,提升了配置效率。通过动态Token获取等最佳实践,确保数据安全与实时性,满足企业具体业务需求。了解更多,请访问Quick BI官方文档或瓴羊官网。
142 77
|
21天前
|
人工智能 前端开发 API
Gemini Coder:基于 Google Gemini API 的开源 Web 应用生成工具,支持实时编辑和预览
Gemini Coder 是一款基于 Google Gemini API 的 AI 应用生成工具,支持通过文本描述快速生成代码,并提供实时代码编辑和预览功能,简化开发流程。
111 38
Gemini Coder:基于 Google Gemini API 的开源 Web 应用生成工具,支持实时编辑和预览
|
16天前
|
JSON 前端开发 搜索推荐
关于商品详情 API 接口 JSON 格式返回数据解析的示例
本文介绍商品详情API接口返回的JSON数据解析。最外层为`product`对象,包含商品基本信息(如id、name、price)、分类信息(category)、图片(images)、属性(attributes)、用户评价(reviews)、库存(stock)和卖家信息(seller)。每个字段详细描述了商品的不同方面,帮助开发者准确提取和展示数据。具体结构和字段含义需结合实际业务需求和API文档理解。
|
3天前
|
搜索推荐 数据挖掘 API
怎么利用商品详情 API 接口实现数据获取与应用?
在电商蓬勃发展的时代,数据成为驱动业务增长的关键。商品详情API接口为电商从业者、开发者和数据分析爱好者提供了获取海量商品数据的途径,助力精准营销、优化用户体验和提升运营效率。本文深入探讨如何利用商品详情API接口进行数据获取与应用,涵盖接口概念、工作原理、不同平台特点、准备工作、数据获取及处理、错误处理,并通过代码示例展示其在电商平台展示、数据分析、竞品分析和个性化推荐等场景中的应用。
22 12
|
9天前
|
JSON 缓存 API
解析电商商品详情API接口系列,json数据示例参考
电商商品详情API接口是电商平台的重要组成部分,提供了商品的详细信息,支持用户进行商品浏览和购买决策。通过合理的API设计和优化,可以提升系统性能和用户体验。希望本文的解析和示例能够为开发者提供参考,帮助构建高效、可靠的电商系统。
26 12
|
2天前
|
搜索推荐 API 开发者
深度解析:利用商品详情 API 接口实现数据获取与应用
在电商蓬勃发展的今天,数据成为驱动业务增长的核心。商品详情API接口作为连接海量商品数据的桥梁,帮助运营者、商家和开发者获取精准的商品信息(如价格、描述、图片、评价等),优化策略、提升用户体验。通过理解API概念、工作原理及不同平台特点,掌握获取权限、构建请求、处理响应和错误的方法,可以将数据应用于商品展示、数据分析、竞品分析和个性化推荐等场景,助力电商创新与发展。未来,随着技术进步,API接口将与人工智能、大数据深度融合,带来更多变革。
15 3
|
14天前
|
供应链 API 开发者
解锁电商数据的无限可能:探秘京东商品SKU信息API接口
京东商品SKU信息API接口是电商开发与运营中的重要工具,帮助开发者获取商品的详细属性,如库存、价格、规格等。通过该接口,电商平台可以丰富商品展示页面,提升用户体验;商家能实时掌握库存动态,优化销售策略;数据分析人员可深入洞察市场趋势,实现精准营销。使用前需注册京东开放平台账号、创建应用并获取API权限,同时仔细阅读API文档以确保正确调用。代码示例展示了如何用Python调用该接口,并处理返回数据。未来,该接口将在个性化推荐、智能库存管理和数据分析等领域发挥更大作用,助力电商业务创新与发展。
69 14
|
5天前
|
缓存 监控 API
如何查看商品销量 API 接口的性能指标数据
在电商蓬勃发展的时代,数据驱动业务决策至关重要。商品销量作为核心指标,依赖高效稳定的API接口获取。本文探讨如何查看和优化商品销量API的性能指标,包括响应时间、吞吐量、错误率和并发用户数,通过专业工具、日志分析及自定义代码实现监控与优化,确保业务稳定运行和用户体验提升。
20 2