Dataphin版本:2.9.2及以上
需开通OpenAPI模块:Dataphin-OpenAPI(运维)
Dataphin平台提供了补数据功能,当需要补数据时,用户可手动对某个节点及其下游的节点的特定业务日期的补数据。在实际场景中,有些上游的数据到达的时间晚于预期时间,比如门店的数据延迟几天或者一个月的时间才收集上报,或者上游的数据错误需进行更正时,就需要进行补数据操作。此类操作重复度高,且由于补数据的时间较长带来较大的运维成本。通过Open API的运维模块就可以根据特定的业务场景开发适用于特定业务场景的个性化的补数据或运维工具。
以下的例子是创建批量补数据工具的API调用的基本步骤:
- 根据需要补数据的节点的特点查询节点:ListNode
- 选择需要补数据的节点的下游节点,可利用QueryDagFromPhysicalNode查询下游节点
- 选择需要补数据的节点及其下游节点和需要补数据的业务日期,创建补数据工作流:CreatePhysicalNodeSupplement
- 查询补数据流下的每个业务日期下对应的DagRun的运行状态:ListSupplementDagrun
- 查询补数据工作流下的业务日期的补数据实例并获取实例状态:ListSupplementInstance
请注意,批量补数据时,若不想影响线上正常的周期调度任务,您需要控制补数据实例的生成和运行的频次或避开高峰期以控制系统压力。
以下为样例代码
/* import 省略... *//*** 搜索节点,对节点及下游补数据,并查看补数据实例,搜索时假设不确定是哪一个节点,* 即不指定node_id,对返回的结果取第一个节点补数据(为方便示例,假设该节点存在下游)** @date 2021-05-17 17:22*/publicclassSupplementTest { privatestaticfinalStringENV="PROD"; privatestaticfinalLongtestProjectId=1022928L; // 实际使用请通过AK/endpoint自行创建API clientprivateDataphinAcsClientclient=LocalDataphinAcsClient.getTestEnvClient(); publicvoidsupplementTest() throwsClientException { //Step1: 根据需要补数据的节点的特点查询节点NodeOverviewnodeOverview=getOneNode(); //Step2: 获取需要补数据的下游节点,此处假设获取下两层的下游节点intdownStreamDepth, List<String>downstreamNodes=getDownNode(nodeOverview.getNodeId(), downStreamDepth); //Step3:选择需要补数据的节点及其下游节点和需要补数据的业务日期,创建补数据工作流StringflowId=createSupplementFlow(nodeOverview, downstreamNodes, "2021-05-11", "2021-05-15"); //Step4: 查询补数据流下的每个业务日期下对应的DagRun的运行状态List<ScheduleDagrun>dagRunList=listSupplementDagrun(flowId); //Step5: 查询补数据工作流下的业务日期的补数据实例并获取实例状态//查看 2021-05-11 的 补数据实例List<Instance>instances=Lists.newArrayList(); dagRunList.stream().filter(d->d.getBizDate().equals("2021-05-11")) .findFirst() .ifPresent(d->instances.addAll(listInstanceByDagRun(d.getDagrunId()))); // 查看实例的状态instances.forEach(ins->System.out.println(ins.getStatus())); } /*** 列出 dagRun下面的实例** @param dagRunId dagRunId* @return dagRun下面的实例列表*/privateList<Instance>listInstanceByDagRun(StringdagRunId) { ListSupplementInstanceRequestrequest=newListSupplementInstanceRequest(); request.setDagRunId(dagRunId); request.setEnv(ENV); try { ListSupplementInstanceResponseresponse=client.getAcsResponse(request); if(null==response.getData() ||response.getData().size() ==0) { thrownewRuntimeException("not found any instance by dagrun id "+dagRunId); } returnresponse.getData(); } catch (ClientExceptione) { thrownewRuntimeException(e.getMessage()); } } /*** 列出补数据的 dagrun,当前例子中,正常应该有5个** @param flowId 补数据的工作流的ID* @return dagrun 列表*/privateList<ScheduleDagrun>listSupplementDagrun(StringflowId) throwsClientException { ListSupplementDagrunRequestrequest=newListSupplementDagrunRequest(); request.setEnv(ENV); request.setFlowId(flowId); ListSupplementDagrunResponseresponse=client.getAcsResponse(request); if(response.getData() ==null||response.getData().size() ==0) { thrownewRuntimeException("not found any dagrun by flow id "+flowId); } returnresponse.getData(); } /*** 创建补数据工作流** @param nodeOverview 补数据起始节点* @param downStreamDepth 补数据的层级深度* @return 补数据的工作流ID*/privateStringcreateSupplementFlow(NodeOverviewnodeOverview, List<String>downstreamNodes, StringminPartition, StringmaxPartition) throwsClientException { CreatePhysicalNodeSupplementRequestcreateNodeSupplementRequest=newCreatePhysicalNodeSupplementRequest(); NodeSupplementCommandnodeSupplementCommand=newNodeSupplementCommand(); nodeSupplementCommand.setName("open_api_test_20210508_"+System.currentTimeMillis()); nodeSupplementCommand.setProjectId(Long.toString(testProjectId)); nodeSupplementCommand.setMinPartition(minPartition); nodeSupplementCommand.setMaxPartition(maxPartition); nodeSupplementCommand.setParallelism(1); nodeSupplementCommand.setSupplementNodeId(nodeOverview.getNodeId()); //包含全部下游,或者这里可以选择部分下游nodeSupplementCommand.setIncludedNodeIdList(downstreamNodes); createNodeSupplementRequest.setNodeSupplementCommand(nodeSupplementCommand); createNodeSupplementRequest.setEnv(ENV); CreatePhysicalNodeSupplementResponsecreateNodeSupplementResponse=client.getAcsResponse( createNodeSupplementRequest); returncreateNodeSupplementResponse.getFlowId(); } /*** 查询节点的下游** @param startNodeId 起始节点* @param downStreamDepth 下游层级深度* @return 下游节点的ID列表*/privateList<String>getDownNode(StringstartNodeId, intdownStreamDepth) throwsClientException { QueryDagFromPhysicalNodeRequestqueryNodeDagDownStreamRequest=newQueryDagFromPhysicalNodeRequest(); NodeDagQueryCommandnodeDagQueryDownStreamCommand=newNodeDagQueryCommand(); nodeDagQueryDownStreamCommand.setStartNodeId(startNodeId); //只搜索向下两层的节点nodeDagQueryDownStreamCommand.setDownStreamDepth(downStreamDepth); nodeDagQueryDownStreamCommand.setUpStreamDepth(0); queryNodeDagDownStreamRequest.setNodeDagQueryCommand(nodeDagQueryDownStreamCommand); queryNodeDagDownStreamRequest.setEnv(ENV); QueryDagFromPhysicalNodeResponsequeryNodeDagDownStreamResponse=client.getAcsResponse(queryNodeDagDownStreamRequest); NodeDagInfonodeDagDownStreamInfo=queryNodeDagDownStreamResponse.getNodeDagInfo(); List<LogicalNodeInfo>downNodes=nodeDagDownStreamInfo.getNodes(); if(null==downNodes) { returnLists.newArrayListWithCapacity(0); } returndownNodes.stream().map(n->n.getBasicInfo().getNodeId().getId()).collect(Collectors.toList()); } /*** 列出节点列表*/privateNodeOverviewgetOneNode() throwsClientException { ListNodesRequest.PageParampageParam=newPageParam(); pageParam.setPageNum(1); pageParam.setPageSize(20); ListNodesRequestlistNodesRequest=newListNodesRequest(); ListNodesRequest.NodeQueryCommandnodeQueryCommand=newNodeQueryCommand(); nodeQueryCommand.setNodeBizType("SCRIPT"); nodeQueryCommand.setNodeScheduleType("NORMAL"); listNodesRequest.setNodeQueryCommand(nodeQueryCommand); listNodesRequest.setPageParam(pageParam); listNodesRequest.setProjectId(testProjectId); listNodesRequest.setEnv(ENV); ListNodesResponselistNodesResponse=client.getAcsResponse(listNodesRequest); PagedDataoverviewPagedData=listNodesResponse.getPagedNodes(); List<NodeOverview>nodeOverviewList=overviewPagedData.getData(); if(null==nodeOverviewList||nodeOverviewList.size() ==0) { thrownewRuntimeException("not found any node"); } returnnodeOverviewList.get(0); } }
点击这里查看Dataphin OpenAPI概览。