如何通过API构建自动补数据工具

本文涉及的产品
智能数据建设与治理Dataphin,200数据处理单元
简介: 在Dataphin V2.9.2及以上版本,Dataphin提供了一套OpenAPI以满足客户定制化的需求。本文为您社少如何通过Open API的运维模块开发适用于特定业务场景的个性化的补数据或运维工具。

Dataphin版本:2.9.2及以上

需开通OpenAPI模块:Dataphin-OpenAPI(运维)


Dataphin平台提供了补数据功能,当需要补数据时,用户可手动对某个节点及其下游的节点的特定业务日期的补数据。在实际场景中,有些上游的数据到达的时间晚于预期时间,比如门店的数据延迟几天或者一个月的时间才收集上报,或者上游的数据错误需进行更正时,就需要进行补数据操作。此类操作重复度高,且由于补数据的时间较长带来较大的运维成本。通过Open API的运维模块就可以根据特定的业务场景开发适用于特定业务场景的个性化的补数据或运维工具。


以下的例子是创建批量补数据工具的API调用的基本步骤:

  1. 根据需要补数据的节点的特点查询节点:ListNode
  2. 选择需要补数据的节点的下游节点,可利用QueryDagFromPhysicalNode查询下游节点
  3. 选择需要补数据的节点及其下游节点和需要补数据的业务日期,创建补数据工作流:CreatePhysicalNodeSupplement
  4. 查询补数据流下的每个业务日期下对应的DagRun的运行状态:ListSupplementDagrun
  5. 查询补数据工作流下的业务日期的补数据实例并获取实例状态:ListSupplementInstance


请注意,批量补数据时,若不想影响线上正常的周期调度任务,您需要控制补数据实例的生成和运行的频次或避开高峰期以控制系统压力。

以下为样例代码

/* import 省略... *//*** 搜索节点,对节点及下游补数据,并查看补数据实例,搜索时假设不确定是哪一个节点,* 即不指定node_id,对返回的结果取第一个节点补数据(为方便示例,假设该节点存在下游)** @date 2021-05-17 17:22*/publicclassSupplementTest {
privatestaticfinalStringENV="PROD";
privatestaticfinalLongtestProjectId=1022928L;
// 实际使用请通过AK/endpoint自行创建API clientprivateDataphinAcsClientclient=LocalDataphinAcsClient.getTestEnvClient();
publicvoidsupplementTest() throwsClientException {
//Step1: 根据需要补数据的节点的特点查询节点NodeOverviewnodeOverview=getOneNode();
//Step2: 获取需要补数据的下游节点,此处假设获取下两层的下游节点intdownStreamDepth,
List<String>downstreamNodes=getDownNode(nodeOverview.getNodeId(), downStreamDepth);
//Step3:选择需要补数据的节点及其下游节点和需要补数据的业务日期,创建补数据工作流StringflowId=createSupplementFlow(nodeOverview, downstreamNodes, "2021-05-11", "2021-05-15");
//Step4: 查询补数据流下的每个业务日期下对应的DagRun的运行状态List<ScheduleDagrun>dagRunList=listSupplementDagrun(flowId);
//Step5: 查询补数据工作流下的业务日期的补数据实例并获取实例状态//查看 2021-05-11 的 补数据实例List<Instance>instances=Lists.newArrayList();
dagRunList.stream().filter(d->d.getBizDate().equals("2021-05-11"))
            .findFirst()
            .ifPresent(d->instances.addAll(listInstanceByDagRun(d.getDagrunId())));
// 查看实例的状态instances.forEach(ins->System.out.println(ins.getStatus()));
    }
/*** 列出 dagRun下面的实例** @param dagRunId dagRunId* @return dagRun下面的实例列表*/privateList<Instance>listInstanceByDagRun(StringdagRunId) {
ListSupplementInstanceRequestrequest=newListSupplementInstanceRequest();
request.setDagRunId(dagRunId);
request.setEnv(ENV);
try {
ListSupplementInstanceResponseresponse=client.getAcsResponse(request);
if(null==response.getData() ||response.getData().size() ==0) {
thrownewRuntimeException("not found any instance by dagrun id "+dagRunId);
            }
returnresponse.getData();
        } catch (ClientExceptione) {
thrownewRuntimeException(e.getMessage());
        }
    }
/*** 列出补数据的 dagrun,当前例子中,正常应该有5个** @param flowId 补数据的工作流的ID* @return dagrun 列表*/privateList<ScheduleDagrun>listSupplementDagrun(StringflowId) throwsClientException {
ListSupplementDagrunRequestrequest=newListSupplementDagrunRequest();
request.setEnv(ENV);
request.setFlowId(flowId);
ListSupplementDagrunResponseresponse=client.getAcsResponse(request);
if(response.getData() ==null||response.getData().size() ==0) {
thrownewRuntimeException("not found any dagrun by flow id "+flowId);
        }
returnresponse.getData();
    }
/*** 创建补数据工作流** @param nodeOverview    补数据起始节点* @param downStreamDepth 补数据的层级深度* @return 补数据的工作流ID*/privateStringcreateSupplementFlow(NodeOverviewnodeOverview, List<String>downstreamNodes, StringminPartition, StringmaxPartition) throwsClientException {
CreatePhysicalNodeSupplementRequestcreateNodeSupplementRequest=newCreatePhysicalNodeSupplementRequest();
NodeSupplementCommandnodeSupplementCommand=newNodeSupplementCommand();
nodeSupplementCommand.setName("open_api_test_20210508_"+System.currentTimeMillis());
nodeSupplementCommand.setProjectId(Long.toString(testProjectId));
nodeSupplementCommand.setMinPartition(minPartition);
nodeSupplementCommand.setMaxPartition(maxPartition);
nodeSupplementCommand.setParallelism(1);
nodeSupplementCommand.setSupplementNodeId(nodeOverview.getNodeId());
//包含全部下游,或者这里可以选择部分下游nodeSupplementCommand.setIncludedNodeIdList(downstreamNodes);
createNodeSupplementRequest.setNodeSupplementCommand(nodeSupplementCommand);
createNodeSupplementRequest.setEnv(ENV);
CreatePhysicalNodeSupplementResponsecreateNodeSupplementResponse=client.getAcsResponse(
createNodeSupplementRequest);
returncreateNodeSupplementResponse.getFlowId();
    }
/*** 查询节点的下游** @param startNodeId     起始节点* @param downStreamDepth 下游层级深度* @return 下游节点的ID列表*/privateList<String>getDownNode(StringstartNodeId, intdownStreamDepth) throwsClientException {
QueryDagFromPhysicalNodeRequestqueryNodeDagDownStreamRequest=newQueryDagFromPhysicalNodeRequest();
NodeDagQueryCommandnodeDagQueryDownStreamCommand=newNodeDagQueryCommand();
nodeDagQueryDownStreamCommand.setStartNodeId(startNodeId);
//只搜索向下两层的节点nodeDagQueryDownStreamCommand.setDownStreamDepth(downStreamDepth);
nodeDagQueryDownStreamCommand.setUpStreamDepth(0);
queryNodeDagDownStreamRequest.setNodeDagQueryCommand(nodeDagQueryDownStreamCommand);
queryNodeDagDownStreamRequest.setEnv(ENV);
QueryDagFromPhysicalNodeResponsequeryNodeDagDownStreamResponse=client.getAcsResponse(queryNodeDagDownStreamRequest);
NodeDagInfonodeDagDownStreamInfo=queryNodeDagDownStreamResponse.getNodeDagInfo();
List<LogicalNodeInfo>downNodes=nodeDagDownStreamInfo.getNodes();
if(null==downNodes) {
returnLists.newArrayListWithCapacity(0);
        }
returndownNodes.stream().map(n->n.getBasicInfo().getNodeId().getId()).collect(Collectors.toList());
    }
/*** 列出节点列表*/privateNodeOverviewgetOneNode() throwsClientException {
ListNodesRequest.PageParampageParam=newPageParam();
pageParam.setPageNum(1);
pageParam.setPageSize(20);
ListNodesRequestlistNodesRequest=newListNodesRequest();
ListNodesRequest.NodeQueryCommandnodeQueryCommand=newNodeQueryCommand();
nodeQueryCommand.setNodeBizType("SCRIPT");
nodeQueryCommand.setNodeScheduleType("NORMAL");
listNodesRequest.setNodeQueryCommand(nodeQueryCommand);
listNodesRequest.setPageParam(pageParam);
listNodesRequest.setProjectId(testProjectId);
listNodesRequest.setEnv(ENV);
ListNodesResponselistNodesResponse=client.getAcsResponse(listNodesRequest);
PagedDataoverviewPagedData=listNodesResponse.getPagedNodes();
List<NodeOverview>nodeOverviewList=overviewPagedData.getData();
if(null==nodeOverviewList||nodeOverviewList.size() ==0) {
thrownewRuntimeException("not found any node");
        }
returnnodeOverviewList.get(0);
    }
}


点击这里查看Dataphin OpenAPI概览。

目录
相关文章
|
3天前
|
存储 供应链 监控
1688商品数据实战:API搜索接口开发与供应链分析应用
本文详细介绍了如何通过1688开放API实现商品数据的获取与应用,涵盖接入准备、签名流程、数据解析存储及商业化场景。开发者可完成智能选品、价格监控和供应商评级等功能,同时提供代码示例与问题解决方案,确保法律合规与数据安全。适合企业开发者快速构建供应链管理系统。
|
6天前
|
网络协议 API 开发者
深入解密 :Postman、Apipost和Apifox API 协议与工具选择
作为全栈开发者,每天与API打交道是常态。本文总结了多年经验,深入解析常见API协议(HTTP(s)、SSE、gRPC、WebSocket、Socket.IO)及其适用场景,并对比三款主流调试工具(Postman、Apipost、ApiFox)。从基础特性到高级应用,帮助开发者根据需求选择最优方案,提升效率,让开发更顺畅!
|
9天前
|
数据可视化 测试技术 API
前后端分离开发:如何高效调试API?有工具 vs 无工具全解析
在前后端分离开发中,API调试至关重要。本文探讨有无调试工具时如何高效调试API,重点分析Postman、Swagger等工具优势及无工具代码调试方法。通过实际场景如用户登录接口,对比两者特性。同时介绍Apipost-Hepler(IDEA插件),将可视化与代码调试结合,提供全局请求头配置、历史记录保存等功能,优化团队协作与开发效率,助力API调试进入全新阶段。
|
22天前
|
JSON API 数据格式
Python 请求微店商品详情数据 API 接口
微店开放平台允许开发者通过API获取商品详情数据。使用Python请求微店商品详情API的主要步骤包括:1. 注册并申请API权限,获得app_key和app_secret;2. 确定API接口地址与请求参数,如商品ID;3. 生成签名确保请求安全合法;4. 使用requests库发送HTTP请求获取数据;5. 处理返回的JSON格式响应数据。开发时需严格遵循微店API文档要求。
|
22天前
|
缓存 监控 API
微店商品详情API接口实战指南:从零实现商品数据自动化获取
本文介绍了微店商品详情API接口的应用,涵盖申请与鉴权、签名加密、数据解析等内容。通过Python实战演示了5步获取商品数据的流程,并提供了多平台同步、价格监控等典型应用场景。开发者可利用此接口实现自动化操作,提升电商运营效率,降低人工成本。文中还总结了频率限制、数据缓存等避坑指南,助力开发者高效使用API。
|
10天前
|
前端开发 Cloud Native Java
Java||Springboot读取本地目录的文件和文件结构,读取服务器文档目录数据供前端渲染的API实现
博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
Java||Springboot读取本地目录的文件和文件结构,读取服务器文档目录数据供前端渲染的API实现
|
12天前
|
测试技术 API 数据安全/隐私保护
API 调试与管理工具选型思考:Apifox vs Apipost,企业究竟该如何选择
在企业级API调试与管理中,Apifox和Apipost是两款备受关注的工具。Apifox主打轻量级、易用性和团队协作,适合中小型团队的基础调试需求;而Apipost功能更全面,涵盖API调试、文档生成、Mock服务、自动化测试等,尤其适合复杂业务场景及大中型企业。本文从功能对比、用户体验、企业适配度和性价比等多个维度进行详细解析,帮助企业做出明智选择。
|
18天前
|
机器学习/深度学习 设计模式 API
Python 高级编程与实战:构建 RESTful API
本文深入探讨了使用 Python 构建 RESTful API 的方法,涵盖 Flask、Django REST Framework 和 FastAPI 三个主流框架。通过实战项目示例,详细讲解了如何处理 GET、POST 请求,并返回相应数据。学习这些技术将帮助你掌握构建高效、可靠的 Web API。
|
12天前
|
数据采集 消息中间件 API
微店API开发全攻略:解锁电商数据与业务自动化的核心能力
微店开放平台提供覆盖商品、订单、用户、营销、物流五大核心模块的API接口,支持企业快速构建电商中台系统。其API体系具备模块化设计、双重认证机制、高并发支持和数据隔离等特性。文档详细解析了商品管理、订单处理、营销工具等核心接口功能,并提供实战代码示例。同时,介绍了企业级整合方案设计,如订单全链路自动化和商品数据中台架构,以及性能优化与稳定性保障措施。最后,针对高频问题提供了排查指南,帮助开发者高效利用API实现电商数智化转型。适合中高级开发者阅读。
|
25天前
|
测试技术 API 数据安全/隐私保护
API 调试与管理工具选型思考:Apifox vs Apipost,企业究竟该如何抉择?
API开发管理工具选型建议:Apifox:适合个人开发者或小团队;系统需求侧重“调试”阶段;Apipost :适合需要实现 API 的全生命周期管理的各类大中型企业。
64 15