如何通过API构建自动补数据工具

本文涉及的产品
智能数据建设与治理Dataphin,200数据处理单元
简介: 在Dataphin V2.9.2及以上版本,Dataphin提供了一套OpenAPI以满足客户定制化的需求。本文为您社少如何通过Open API的运维模块开发适用于特定业务场景的个性化的补数据或运维工具。

Dataphin版本:2.9.2及以上

需开通OpenAPI模块:Dataphin-OpenAPI(运维)


Dataphin平台提供了补数据功能,当需要补数据时,用户可手动对某个节点及其下游的节点的特定业务日期的补数据。在实际场景中,有些上游的数据到达的时间晚于预期时间,比如门店的数据延迟几天或者一个月的时间才收集上报,或者上游的数据错误需进行更正时,就需要进行补数据操作。此类操作重复度高,且由于补数据的时间较长带来较大的运维成本。通过Open API的运维模块就可以根据特定的业务场景开发适用于特定业务场景的个性化的补数据或运维工具。


以下的例子是创建批量补数据工具的API调用的基本步骤:

  1. 根据需要补数据的节点的特点查询节点:ListNode
  2. 选择需要补数据的节点的下游节点,可利用QueryDagFromPhysicalNode查询下游节点
  3. 选择需要补数据的节点及其下游节点和需要补数据的业务日期,创建补数据工作流:CreatePhysicalNodeSupplement
  4. 查询补数据流下的每个业务日期下对应的DagRun的运行状态:ListSupplementDagrun
  5. 查询补数据工作流下的业务日期的补数据实例并获取实例状态:ListSupplementInstance


请注意,批量补数据时,若不想影响线上正常的周期调度任务,您需要控制补数据实例的生成和运行的频次或避开高峰期以控制系统压力。

以下为样例代码

/* import 省略... *//*** 搜索节点,对节点及下游补数据,并查看补数据实例,搜索时假设不确定是哪一个节点,* 即不指定node_id,对返回的结果取第一个节点补数据(为方便示例,假设该节点存在下游)** @date 2021-05-17 17:22*/publicclassSupplementTest {
privatestaticfinalStringENV="PROD";
privatestaticfinalLongtestProjectId=1022928L;
// 实际使用请通过AK/endpoint自行创建API clientprivateDataphinAcsClientclient=LocalDataphinAcsClient.getTestEnvClient();
publicvoidsupplementTest() throwsClientException {
//Step1: 根据需要补数据的节点的特点查询节点NodeOverviewnodeOverview=getOneNode();
//Step2: 获取需要补数据的下游节点,此处假设获取下两层的下游节点intdownStreamDepth,
List<String>downstreamNodes=getDownNode(nodeOverview.getNodeId(), downStreamDepth);
//Step3:选择需要补数据的节点及其下游节点和需要补数据的业务日期,创建补数据工作流StringflowId=createSupplementFlow(nodeOverview, downstreamNodes, "2021-05-11", "2021-05-15");
//Step4: 查询补数据流下的每个业务日期下对应的DagRun的运行状态List<ScheduleDagrun>dagRunList=listSupplementDagrun(flowId);
//Step5: 查询补数据工作流下的业务日期的补数据实例并获取实例状态//查看 2021-05-11 的 补数据实例List<Instance>instances=Lists.newArrayList();
dagRunList.stream().filter(d->d.getBizDate().equals("2021-05-11"))
            .findFirst()
            .ifPresent(d->instances.addAll(listInstanceByDagRun(d.getDagrunId())));
// 查看实例的状态instances.forEach(ins->System.out.println(ins.getStatus()));
    }
/*** 列出 dagRun下面的实例** @param dagRunId dagRunId* @return dagRun下面的实例列表*/privateList<Instance>listInstanceByDagRun(StringdagRunId) {
ListSupplementInstanceRequestrequest=newListSupplementInstanceRequest();
request.setDagRunId(dagRunId);
request.setEnv(ENV);
try {
ListSupplementInstanceResponseresponse=client.getAcsResponse(request);
if(null==response.getData() ||response.getData().size() ==0) {
thrownewRuntimeException("not found any instance by dagrun id "+dagRunId);
            }
returnresponse.getData();
        } catch (ClientExceptione) {
thrownewRuntimeException(e.getMessage());
        }
    }
/*** 列出补数据的 dagrun,当前例子中,正常应该有5个** @param flowId 补数据的工作流的ID* @return dagrun 列表*/privateList<ScheduleDagrun>listSupplementDagrun(StringflowId) throwsClientException {
ListSupplementDagrunRequestrequest=newListSupplementDagrunRequest();
request.setEnv(ENV);
request.setFlowId(flowId);
ListSupplementDagrunResponseresponse=client.getAcsResponse(request);
if(response.getData() ==null||response.getData().size() ==0) {
thrownewRuntimeException("not found any dagrun by flow id "+flowId);
        }
returnresponse.getData();
    }
/*** 创建补数据工作流** @param nodeOverview    补数据起始节点* @param downStreamDepth 补数据的层级深度* @return 补数据的工作流ID*/privateStringcreateSupplementFlow(NodeOverviewnodeOverview, List<String>downstreamNodes, StringminPartition, StringmaxPartition) throwsClientException {
CreatePhysicalNodeSupplementRequestcreateNodeSupplementRequest=newCreatePhysicalNodeSupplementRequest();
NodeSupplementCommandnodeSupplementCommand=newNodeSupplementCommand();
nodeSupplementCommand.setName("open_api_test_20210508_"+System.currentTimeMillis());
nodeSupplementCommand.setProjectId(Long.toString(testProjectId));
nodeSupplementCommand.setMinPartition(minPartition);
nodeSupplementCommand.setMaxPartition(maxPartition);
nodeSupplementCommand.setParallelism(1);
nodeSupplementCommand.setSupplementNodeId(nodeOverview.getNodeId());
//包含全部下游,或者这里可以选择部分下游nodeSupplementCommand.setIncludedNodeIdList(downstreamNodes);
createNodeSupplementRequest.setNodeSupplementCommand(nodeSupplementCommand);
createNodeSupplementRequest.setEnv(ENV);
CreatePhysicalNodeSupplementResponsecreateNodeSupplementResponse=client.getAcsResponse(
createNodeSupplementRequest);
returncreateNodeSupplementResponse.getFlowId();
    }
/*** 查询节点的下游** @param startNodeId     起始节点* @param downStreamDepth 下游层级深度* @return 下游节点的ID列表*/privateList<String>getDownNode(StringstartNodeId, intdownStreamDepth) throwsClientException {
QueryDagFromPhysicalNodeRequestqueryNodeDagDownStreamRequest=newQueryDagFromPhysicalNodeRequest();
NodeDagQueryCommandnodeDagQueryDownStreamCommand=newNodeDagQueryCommand();
nodeDagQueryDownStreamCommand.setStartNodeId(startNodeId);
//只搜索向下两层的节点nodeDagQueryDownStreamCommand.setDownStreamDepth(downStreamDepth);
nodeDagQueryDownStreamCommand.setUpStreamDepth(0);
queryNodeDagDownStreamRequest.setNodeDagQueryCommand(nodeDagQueryDownStreamCommand);
queryNodeDagDownStreamRequest.setEnv(ENV);
QueryDagFromPhysicalNodeResponsequeryNodeDagDownStreamResponse=client.getAcsResponse(queryNodeDagDownStreamRequest);
NodeDagInfonodeDagDownStreamInfo=queryNodeDagDownStreamResponse.getNodeDagInfo();
List<LogicalNodeInfo>downNodes=nodeDagDownStreamInfo.getNodes();
if(null==downNodes) {
returnLists.newArrayListWithCapacity(0);
        }
returndownNodes.stream().map(n->n.getBasicInfo().getNodeId().getId()).collect(Collectors.toList());
    }
/*** 列出节点列表*/privateNodeOverviewgetOneNode() throwsClientException {
ListNodesRequest.PageParampageParam=newPageParam();
pageParam.setPageNum(1);
pageParam.setPageSize(20);
ListNodesRequestlistNodesRequest=newListNodesRequest();
ListNodesRequest.NodeQueryCommandnodeQueryCommand=newNodeQueryCommand();
nodeQueryCommand.setNodeBizType("SCRIPT");
nodeQueryCommand.setNodeScheduleType("NORMAL");
listNodesRequest.setNodeQueryCommand(nodeQueryCommand);
listNodesRequest.setPageParam(pageParam);
listNodesRequest.setProjectId(testProjectId);
listNodesRequest.setEnv(ENV);
ListNodesResponselistNodesResponse=client.getAcsResponse(listNodesRequest);
PagedDataoverviewPagedData=listNodesResponse.getPagedNodes();
List<NodeOverview>nodeOverviewList=overviewPagedData.getData();
if(null==nodeOverviewList||nodeOverviewList.size() ==0) {
thrownewRuntimeException("not found any node");
        }
returnnodeOverviewList.get(0);
    }
}


点击这里查看Dataphin OpenAPI概览。

目录
相关文章
|
17天前
|
人工智能 关系型数据库 MySQL
数据魔力,一触即发 —— Dataphin数据服务API,百炼插件新星降临!
本文通过一个利用百炼大模型平台和Dataphin数据服务API构建一个客户360智能应用的案例,介绍如何使用Dataphin数据服务API在百炼平台创建一个自定义插件,用于智能应用的开发,提升企业智能化应用水平。
数据魔力,一触即发 —— Dataphin数据服务API,百炼插件新星降临!
|
3天前
|
监控 安全 API
深入浅出:构建高效RESTful API的最佳实践
在数字化时代,API已成为连接不同软件和服务的桥梁。本文将带你深入了解如何设计和维护一个高效、可扩展且安全的RESTful API。我们将从基础概念出发,逐步深入到高级技巧,让你能够掌握创建优质API的关键要素。无论你是初学者还是有经验的开发者,这篇文章都将为你提供实用的指导和启示。让我们一起探索API设计的奥秘,打造出色的后端服务吧!
|
15天前
|
SQL 缓存 测试技术
构建高性能RESTful API:最佳实践与避坑指南###
—— 本文深入探讨了构建高性能RESTful API的关键技术要点,从设计原则、状态码使用、版本控制到安全性考虑,旨在为开发者提供一套全面的最佳实践框架。通过避免常见的设计陷阱,本文将指导你如何优化API性能,提升用户体验,确保系统的稳定性和可扩展性。 ###
53 12
|
8天前
|
缓存 API 开发者
构建高效后端服务:RESTful API设计原则与实践
【10月更文挑战第43天】在数字化时代的浪潮中,后端服务的稳定性和效率成为企业竞争力的关键。本文将深入探讨如何构建高效的后端服务,重点介绍RESTful API的设计原则和实践技巧,帮助开发者提升服务的可用性、可扩展性和安全性。通过实际代码示例,我们将展示如何将这些原则应用到日常开发工作中,以确保后端服务能够支撑起现代Web和移动应用的需求。
|
15天前
|
存储 SQL API
探索后端开发:构建高效API与数据库交互
【10月更文挑战第36天】在数字化时代,后端开发是连接用户界面和数据存储的桥梁。本文深入探讨如何设计高效的API以及如何实现API与数据库之间的无缝交互,确保数据的一致性和高性能。我们将从基础概念出发,逐步深入到实战技巧,为读者提供一个清晰的后端开发路线图。
|
9天前
|
JSON API 开发者
构建高效API:后端开发中的RESTful最佳实践####
在数字化时代,API作为不同系统间通信的桥梁,其重要性日益凸显。本文将深入探讨RESTful API的设计原则与最佳实践,通过实际案例分析,揭示如何构建高效、可维护且易于使用的API接口,助力后端开发者提升项目质量与用户体验。 ####
|
10天前
|
缓存 JavaScript API
探索后端开发:构建高效API的艺术
【10月更文挑战第40天】本文深入探讨了后端开发的核心—构建高效且可维护的API。通过分析设计原则、技术选型、性能优化以及安全性考量,文章旨在为读者提供一套完整的方法论,以实现高质量API的开发。从理论到实践,我们不仅讨论了RESTful架构和GraphQL等技术的优劣,还涵盖了缓存策略、数据库优化等性能提升技巧。同时,安全性章节将引导读者如何保护API免受常见攻击。最后,通过一个实际的代码示例,展示了如何将这些概念应用于创建一个简洁而强大的API。
|
12天前
|
JSON JavaScript API
深入浅出Node.js:从零开始构建RESTful API
【10月更文挑战第39天】 在数字化时代的浪潮中,API(应用程序编程接口)已成为连接不同软件应用的桥梁。本文将带领读者从零基础出发,逐步深入Node.js的世界,最终实现一个功能完备的RESTful API。通过实践,我们将探索如何利用Node.js的异步特性和强大的生态系统来构建高效、可扩展的服务。准备好迎接代码和概念的碰撞,一起解锁后端开发的新篇章。
|
14天前
|
存储 前端开发 搜索推荐
淘宝 1688 API 接口助力构建高效淘宝代购集运系统
在全球化商业背景下,淘宝代购集运业务蓬勃发展,满足了海外消费者对中国商品的需求。掌握淘宝1688 API接口是构建成功代购系统的關鍵。本文详细介绍如何利用API接口进行系统架构设计、商品数据同步、订单处理与物流集成,以及用户管理和客户服务,帮助你打造一个高效便捷的代购集运系统,实现商业价值与用户满意度的双赢。
|
14天前
|
XML 数据可视化 API
商品详情数据实战案例,API接口系列
淘宝商品详情数据在电商领域具有广泛的应用价值,而淘宝商品详情API接口则为开发者提供了获取这些数据的重要途径。通过合理利用这些接口和数据,可以提升业务效率、优化用户体验,为电商行业的发展注入新的活力。
下一篇
无影云桌面