如何通过API构建自动补数据工具

本文涉及的产品
智能数据建设与治理Dataphin,200数据处理单元
简介: 在Dataphin V2.9.2及以上版本,Dataphin提供了一套OpenAPI以满足客户定制化的需求。本文为您社少如何通过Open API的运维模块开发适用于特定业务场景的个性化的补数据或运维工具。

Dataphin版本:2.9.2及以上

需开通OpenAPI模块:Dataphin-OpenAPI(运维)


Dataphin平台提供了补数据功能,当需要补数据时,用户可手动对某个节点及其下游的节点的特定业务日期的补数据。在实际场景中,有些上游的数据到达的时间晚于预期时间,比如门店的数据延迟几天或者一个月的时间才收集上报,或者上游的数据错误需进行更正时,就需要进行补数据操作。此类操作重复度高,且由于补数据的时间较长带来较大的运维成本。通过Open API的运维模块就可以根据特定的业务场景开发适用于特定业务场景的个性化的补数据或运维工具。


以下的例子是创建批量补数据工具的API调用的基本步骤:

  1. 根据需要补数据的节点的特点查询节点:ListNode
  2. 选择需要补数据的节点的下游节点,可利用QueryDagFromPhysicalNode查询下游节点
  3. 选择需要补数据的节点及其下游节点和需要补数据的业务日期,创建补数据工作流:CreatePhysicalNodeSupplement
  4. 查询补数据流下的每个业务日期下对应的DagRun的运行状态:ListSupplementDagrun
  5. 查询补数据工作流下的业务日期的补数据实例并获取实例状态:ListSupplementInstance


请注意,批量补数据时,若不想影响线上正常的周期调度任务,您需要控制补数据实例的生成和运行的频次或避开高峰期以控制系统压力。

以下为样例代码

/* import 省略... *//*** 搜索节点,对节点及下游补数据,并查看补数据实例,搜索时假设不确定是哪一个节点,* 即不指定node_id,对返回的结果取第一个节点补数据(为方便示例,假设该节点存在下游)** @date 2021-05-17 17:22*/publicclassSupplementTest {
privatestaticfinalStringENV="PROD";
privatestaticfinalLongtestProjectId=1022928L;
// 实际使用请通过AK/endpoint自行创建API clientprivateDataphinAcsClientclient=LocalDataphinAcsClient.getTestEnvClient();
publicvoidsupplementTest() throwsClientException {
//Step1: 根据需要补数据的节点的特点查询节点NodeOverviewnodeOverview=getOneNode();
//Step2: 获取需要补数据的下游节点,此处假设获取下两层的下游节点intdownStreamDepth,
List<String>downstreamNodes=getDownNode(nodeOverview.getNodeId(), downStreamDepth);
//Step3:选择需要补数据的节点及其下游节点和需要补数据的业务日期,创建补数据工作流StringflowId=createSupplementFlow(nodeOverview, downstreamNodes, "2021-05-11", "2021-05-15");
//Step4: 查询补数据流下的每个业务日期下对应的DagRun的运行状态List<ScheduleDagrun>dagRunList=listSupplementDagrun(flowId);
//Step5: 查询补数据工作流下的业务日期的补数据实例并获取实例状态//查看 2021-05-11 的 补数据实例List<Instance>instances=Lists.newArrayList();
dagRunList.stream().filter(d->d.getBizDate().equals("2021-05-11"))
            .findFirst()
            .ifPresent(d->instances.addAll(listInstanceByDagRun(d.getDagrunId())));
// 查看实例的状态instances.forEach(ins->System.out.println(ins.getStatus()));
    }
/*** 列出 dagRun下面的实例** @param dagRunId dagRunId* @return dagRun下面的实例列表*/privateList<Instance>listInstanceByDagRun(StringdagRunId) {
ListSupplementInstanceRequestrequest=newListSupplementInstanceRequest();
request.setDagRunId(dagRunId);
request.setEnv(ENV);
try {
ListSupplementInstanceResponseresponse=client.getAcsResponse(request);
if(null==response.getData() ||response.getData().size() ==0) {
thrownewRuntimeException("not found any instance by dagrun id "+dagRunId);
            }
returnresponse.getData();
        } catch (ClientExceptione) {
thrownewRuntimeException(e.getMessage());
        }
    }
/*** 列出补数据的 dagrun,当前例子中,正常应该有5个** @param flowId 补数据的工作流的ID* @return dagrun 列表*/privateList<ScheduleDagrun>listSupplementDagrun(StringflowId) throwsClientException {
ListSupplementDagrunRequestrequest=newListSupplementDagrunRequest();
request.setEnv(ENV);
request.setFlowId(flowId);
ListSupplementDagrunResponseresponse=client.getAcsResponse(request);
if(response.getData() ==null||response.getData().size() ==0) {
thrownewRuntimeException("not found any dagrun by flow id "+flowId);
        }
returnresponse.getData();
    }
/*** 创建补数据工作流** @param nodeOverview    补数据起始节点* @param downStreamDepth 补数据的层级深度* @return 补数据的工作流ID*/privateStringcreateSupplementFlow(NodeOverviewnodeOverview, List<String>downstreamNodes, StringminPartition, StringmaxPartition) throwsClientException {
CreatePhysicalNodeSupplementRequestcreateNodeSupplementRequest=newCreatePhysicalNodeSupplementRequest();
NodeSupplementCommandnodeSupplementCommand=newNodeSupplementCommand();
nodeSupplementCommand.setName("open_api_test_20210508_"+System.currentTimeMillis());
nodeSupplementCommand.setProjectId(Long.toString(testProjectId));
nodeSupplementCommand.setMinPartition(minPartition);
nodeSupplementCommand.setMaxPartition(maxPartition);
nodeSupplementCommand.setParallelism(1);
nodeSupplementCommand.setSupplementNodeId(nodeOverview.getNodeId());
//包含全部下游,或者这里可以选择部分下游nodeSupplementCommand.setIncludedNodeIdList(downstreamNodes);
createNodeSupplementRequest.setNodeSupplementCommand(nodeSupplementCommand);
createNodeSupplementRequest.setEnv(ENV);
CreatePhysicalNodeSupplementResponsecreateNodeSupplementResponse=client.getAcsResponse(
createNodeSupplementRequest);
returncreateNodeSupplementResponse.getFlowId();
    }
/*** 查询节点的下游** @param startNodeId     起始节点* @param downStreamDepth 下游层级深度* @return 下游节点的ID列表*/privateList<String>getDownNode(StringstartNodeId, intdownStreamDepth) throwsClientException {
QueryDagFromPhysicalNodeRequestqueryNodeDagDownStreamRequest=newQueryDagFromPhysicalNodeRequest();
NodeDagQueryCommandnodeDagQueryDownStreamCommand=newNodeDagQueryCommand();
nodeDagQueryDownStreamCommand.setStartNodeId(startNodeId);
//只搜索向下两层的节点nodeDagQueryDownStreamCommand.setDownStreamDepth(downStreamDepth);
nodeDagQueryDownStreamCommand.setUpStreamDepth(0);
queryNodeDagDownStreamRequest.setNodeDagQueryCommand(nodeDagQueryDownStreamCommand);
queryNodeDagDownStreamRequest.setEnv(ENV);
QueryDagFromPhysicalNodeResponsequeryNodeDagDownStreamResponse=client.getAcsResponse(queryNodeDagDownStreamRequest);
NodeDagInfonodeDagDownStreamInfo=queryNodeDagDownStreamResponse.getNodeDagInfo();
List<LogicalNodeInfo>downNodes=nodeDagDownStreamInfo.getNodes();
if(null==downNodes) {
returnLists.newArrayListWithCapacity(0);
        }
returndownNodes.stream().map(n->n.getBasicInfo().getNodeId().getId()).collect(Collectors.toList());
    }
/*** 列出节点列表*/privateNodeOverviewgetOneNode() throwsClientException {
ListNodesRequest.PageParampageParam=newPageParam();
pageParam.setPageNum(1);
pageParam.setPageSize(20);
ListNodesRequestlistNodesRequest=newListNodesRequest();
ListNodesRequest.NodeQueryCommandnodeQueryCommand=newNodeQueryCommand();
nodeQueryCommand.setNodeBizType("SCRIPT");
nodeQueryCommand.setNodeScheduleType("NORMAL");
listNodesRequest.setNodeQueryCommand(nodeQueryCommand);
listNodesRequest.setPageParam(pageParam);
listNodesRequest.setProjectId(testProjectId);
listNodesRequest.setEnv(ENV);
ListNodesResponselistNodesResponse=client.getAcsResponse(listNodesRequest);
PagedDataoverviewPagedData=listNodesResponse.getPagedNodes();
List<NodeOverview>nodeOverviewList=overviewPagedData.getData();
if(null==nodeOverviewList||nodeOverviewList.size() ==0) {
thrownewRuntimeException("not found any node");
        }
returnnodeOverviewList.get(0);
    }
}


点击这里查看Dataphin OpenAPI概览。

相关文章
|
30天前
|
JSON API 数据格式
淘宝/天猫图片搜索API接口,json返回数据。
淘宝/天猫平台虽未开放直接的图片搜索API,但可通过阿里妈妈淘宝联盟或天猫开放平台接口实现类似功能。本文提供基于淘宝联盟的图片关联商品搜索Curl示例及JSON响应说明,适用于已获权限的开发者。如需更高精度搜索,可选用阿里云视觉智能API。
|
28天前
|
JSON API 数据安全/隐私保护
深度分析淘宝卖家订单详情API接口,用json返回数据
淘宝卖家订单详情API(taobao.trade.fullinfo.get)是淘宝开放平台提供的重要接口,用于获取单个订单的完整信息,包括订单状态、买家信息、商品明细、支付与物流信息等,支撑订单管理、ERP对接及售后处理。需通过appkey、appsecret和session认证,并遵守调用频率与数据权限限制。本文详解其使用方法并附Python调用示例。
|
28天前
|
缓存 API 网络架构
淘宝item_search_similar - 搜索相似的商品API接口,用python返回数据
淘宝联盟开放平台中,可通过“物料优选接口”(taobao.tbk.dg.optimus.material)实现“搜索相似商品”功能。该接口支持根据商品 ID 获取相似推荐商品,并返回商品信息、价格、优惠等数据,适用于商品推荐、比价等场景。本文提供基于 Python 的实现示例,包含接口调用、数据解析及结果展示。使用时需配置淘宝联盟的 appkey、appsecret 和 adzone_id,并注意接口调用频率限制和使用规范。
|
1月前
|
XML 缓存 API
eBay 商品详情 API 深度解析:从基础信息到变体数据获取全方案
本文详解如何通过 eBay 的 GetItem 和 GetMultipleItems 接口获取商品详情数据,涵盖基础属性、价格、变体、卖家信息等,并提供可复用的 Python 代码。内容包括 API 核心参数、响应结构、代码实现、实战注意事项及扩展方向,助力跨境电商开发。
|
30天前
|
JSON API 开发者
淘宝店铺的所有商品API接口,Curl返回数据
淘宝平台未开放获取全店商品的公共API,开发者可通过阿里妈妈的淘宝联盟API获取参与推广的商品。需成为联盟开发者、创建应用,并通过adzone_id关联店铺。使用taobao.tbk.shop.get和taobao.tbk.item.info.get接口,可获取商品列表及详情,但需注意签名生成、调用频率限制及合规要求。未参与推广的商品无法通过该方式获取。
|
1月前
|
JSON 缓存 API
淘宝店铺所有商品API,json数据返回
淘宝店铺所有商品API的JSON数据返回通常包含商品的基本信息、动态数据以及分页信息等。以下是一个详细的JSON数据返回示例,以及相关字段的说明
|
1月前
|
JSON API 数据格式
1688店铺订单列表订单详情订单物流API响应数据解析
1688平台作为阿里巴巴旗下的B2B电商利器,提供高效订单管理API,支持订单查询、状态变更与物流同步,助力企业提升运营效率。本文附Python请求示例代码,实现便捷对接与数据获取。
|
1月前
|
JSON 缓存 供应链
1688图片搜索API秘籍!轻松获取相似商品数据
1688图片搜索API基于图像识别技术,支持通过上传商品图片搜索同款或相似商品,适用于电商选品、供应链管理等场景。提供多种搜索模式与结果过滤条件,支持Python等开发语言,提升采购效率。
|
1月前
|
设计模式 JSON Unix
微店商品详情API接口,json数据返回
微店商品详情API接口的典型JSON返回数据结构说明,基于公开的微店开放平台API文档和常见电商API设计模式整理。实际使用时请以微店官方最新文档为准
|
1月前
|
JSON 监控 API
抖音视频列表API秘籍!轻松获取视频列表数据
抖音视频列表API是抖音开放平台提供的核心接口,支持按关键词、分类、排序方式筛选视频,适用于内容推荐、趋势分析等场景。接口返回含视频ID、标题、播放量等50+字段,支持分页获取,通过HTTP GET请求调用,返回JSON格式数据,便于开发者快速集成与处理。需注册平台账号获取访问权限。
389 56