本文为您介绍如何使用OpenAPI创建数据集成同步任务,同步来源端数据至去向端。
使用限制
DataWorks当前仅支持使用OpenAPI创建数据集成离线同步任务。
调用CreateDISyncTask接口创建数据集成同步任务,仅支持使用脚本模式配置同步任务内容,详情请参见通过脚本模式配置任务。
DataWorks暂不支持使用OpenAPI创建业务流程,您需要使用现有的业务流程创建数据同步任务。
配置环境依赖及账号认证
配置Maven依赖。
打开Maven项目下的pom.xml文件,添加
aliyun-java-sdk-core
。<dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>4.5.20</version> </dependency>
打开Maven项目下的pom.xml文件,添加
aliyun-java-sdk-dataworks-public
。<dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-dataworks-public</artifactId> <version>3.4.2</version> </dependency>
客户端认证。
使用OpenAPI创建数据同步任务前,需要调用如下语句对登录阿里云的账号相关信息进行认证。如果阿里云的账号信息认证通过,则继续执行后续任务,如果认证不通过,则该调用会报错,您需要根据实际报错处理相关问题。
DefaultProfile profile = DefaultProfile.getProfile( "regionId", //DataWorks工作空间所在的地域,例如cn-hangzhou。 "<yourAccessKeyId>", //登录DataWorks工作空间的阿里云账号的AccessKey ID。 "<yourAccessSecret>"); //登录DataWorks工作空间的阿里云账号的AccessKey Secret。 IAcsClient client = new DefaultAcsClient(profile);
您可以登录DataWorks控制台鼠标悬停至右上角的用户头像,单击AccessKey管理,进入AccessKey管理页面获取AccessKey ID和AccessKey Secret。
配置流程
配置步骤
创建数据集成任务。
调用CreateDISyncTask接口,创建数据集成任务。如下代码仅示例部分参数的配置,更多参数详情请参见CreateDISyncTask。
public void createFile() throws ClientException{ CreateDISyncTaskRequest request = new CreateDISyncTaskRequest(); request.setProjectId(181565L); request.setTaskType("DI_OFFLINE"); request.setTaskContent("{\"type\":\"job\",\"version\":\"2.0\",\"steps\":[{\"stepType\":\"mysql\",\"parameter\":{\"envType\":1,\"datasource\":\"dh_mysql\",\"column\":[\"id\",\"name\"],\"tableComment\":\"same表comment\",\"connection\":[{\"datasource\":\"dh_mysql\",\"table\":[\"same\"]}],\"where\":\"\",\"splitPk\":\"id\",\"encoding\":\"UTF-8\"},\"name\":\"Reader\",\"category\":\"reader\"},{\"stepType\":\"odps\",\"parameter\":{\"partition\":\"pt=${bizdate}\",\"truncate\":true,\"datasource\":\"odps_first\",\"envType\":1,\"column\":[\"id\",\"name\"],\"emptyAsNull\":false,\"tableComment\":\"same表comment\",\"table\":\"same\"},\"name\":\"Writer\",\"category\":\"writer\"}],\"setting\":{\"errorLimit\":{\"record\":\"\"},\"speed\":{\"throttle\":false,\"concurrent\":2}},\"order\":{\"hops\":[{\"from\":\"Reader\",\"to\":\"Writer\"}]}}"); request.setTaskParam("{\"FileFolderPath\":\"业务流程/new_biz/数据集成\",\"ResourceGroup\":\"S_res_group_280749521950784_1602767279794\"}"); request.setTaskName("new_di_task_0607_1416"); String akId = "XXX"; String akSecret = "XXXX"; String regionId = "cn-hangzhou"; IClientProfile profile = DefaultProfile.getProfile(regionId, akId, akSecret); DefaultProfile.addEndpoint("cn-hangzhou","dataworks-public","dataworks.cn-hangzhou.aliyuncs.com"); IAcsClient client; client = new DefaultAcsClient(profile); CreateDISyncTaskResponse response1 = client.getAcsResponse(request); Gson gson1 = new Gson(); System.out.println(gson1.toJson(response1)); }
设置任务的调度依赖。
调用UpdateFile接口,设置数据集成任务的调度依赖。
public static void updateFile(Long fileId) throws Exception { UpdateFileRequest request = new UpdateFileRequest(); request.setProjectId(2043L); request.setFileId(fileId); request.setAutoRerunTimes(3); request.setRerunMode("FAILURE_ALLOWED"); request.setCronExpress("00 30 05 * * ?"); request.setCycleType("DAY"); request.setResourceGroupIdentifier("S_res_group_XXX"); // 使用调度独享资源组 request.setInputList("dataworks_di_autotest_root"); UpdateFileResponse response1 = client.getAcsResponse(request); }
上述代码仅示例部分参数的配置,更多参数详情请参见UpdateFile。
提交数据集成任务。
调用SubmitFile接口,提交数据集成任务至调度系统的开发环境。
public void submitFile() throws ClientException{ SubmitFileRequest request = new SubmitFileRequest(); request.setProjectId(78837L); request.setProjectIdentifier("zxy_8221431"); // 此节点ID为创建节点时返回的ID,对应数据库File表的file_id。 request.setFileId(501576542L); request.setComment("备注"); SubmitFileResponse acsResponse = client.getAcsResponse(request); //调用GetDeployment接口,获取本次发布的具体情况。 Long deploymentId = acsResponse.getData(); log.info(acsResponse.toString()); }
上述代码仅示例部分参数的配置,更多参数详情请参见SubmitFile。
查询同步任务的提交状态。
任务提交后,Response会返回deploymentId,您可以调用GetDeployment接口,通过deploymentId获取本次发布包的详细信息。当GetDeployment接口的返回参数Status取值为1时,则表示发布包此次提交成功。任务提交成功后才可进行后续的发布操作,如果任务提交失败,则请根据报错尽快排查处理。
public void getDeployment() throws ClientException{ GetDeploymentRequest request = new GetDeploymentRequest(); request.setProjectId(78837L); request.setProjectIdentifier("zxy_8221431"); //DeploymentId为提交或发布的返回值。 request.setDeploymentId(2776067L); GetDeploymentResponse acsResponse = client.getAcsResponse(request); log.info(acsResponse.getData().toString()); }
上述代码仅示例部分参数的配置,更多参数详情请参见GetDeployment。
发布同步任务到生产环境。
调用DeployFile接口,发布数据集成同步任务至生产环境。
public void deploy() throws ClientException{ DeployFileRequest request = new DeployFileRequest(); request.setProjectIdentifier("zxy_8221431"); request.setFileId(501576542L); request.setComment("备注"); //NodeId和file_id二选一。NodeId的值为调度配置中基础属性的节点ID。 request.setNodeId(700004537241L); DeployFileResponse acsResponse = client.getAcsResponse(request); //调用GetDeployment接口,获取本次发布的具体情况。 Long deploymentId = acsResponse.getData(); log.info(acsResponse.getData().toString()); }
上述代码仅示例部分参数的配置,更多参数详情请参见DeployFile。
查询同步任务的发布状态。
任务发布后,Response会返回deploymentId,您可以调用GetDeployment接口,通过deploymentId获取本次发布包的详细信息。当GetDeployment接口的返回参数Status取值为1时,则表示发布包此次发布成功。
public void getDeployment() throws ClientException{ GetDeploymentRequest request = new GetDeploymentRequest(); request.setProjectId(78837L); request.setProjectIdentifier("zxy_8221431"); //DeploymentId为提交或发布的返回值。 request.setDeploymentId(2776067L); GetDeploymentResponse acsResponse = client.getAcsResponse(request); log.info(acsResponse.getData().toString()); }
上述代码仅示例部分参数的配置,更多参数详情请参见GetDeployment。
管理同步任务
修改同步任务的相关配置:调用UpdateDISyncTask接口更新任务的Content,或通过TaskParam参数来更新使用的独享资源组,详情请参见UpdateDISyncTask。
为同步任务批量补数据:调用RunCycleDagNodes接口,为目标任务执行批量补数据操作,详情请参见
Java SDK调用代码示例
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.dataworks_public.model.v20200518.*;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import java.util.List;
public class createofflineTask {
static Long createTask(String fileName) throws Exception {
Long projectId = 2043L;
String taskType = "DI_OFFLINE";
String taskContent = "{\n" +
" \"type\": \"job\",\n" +
" \"version\": \"2.0\",\n" +
" \"steps\": [\n" +
" {\n" +
" \"stepType\": \"mysql\",\n" +
" \"parameter\": {\n" +
" \"envType\": 0,\n" +
" \"datasource\": \"mysql_autotest_dev\",\n" +
" \"column\": [\n" +
" \"id\",\n" +
" \"name\"\n" +
" ],\n" +
" \"connection\": [\n" +
" {\n" +
" \"datasource\": \"mysql_autotest_dev\",\n" +
" \"table\": [\n" +
" \"user\"\n" +
" ]\n" +
" }\n" +
" ],\n" +
" \"where\": \"\",\n" +
" \"splitPk\": \"\",\n" +
" \"encoding\": \"UTF-8\"\n" +
" },\n" +
" \"name\": \"Reader\",\n" +
" \"category\": \"reader\"\n" +
" },\n" +
" {\n" +
" \"stepType\": \"odps\",\n" +
" \"parameter\": {\n" +
" \"partition\": \"pt=${bizdate}\",\n" +
" \"truncate\": true,\n" +
" \"datasource\": \"odps_first\",\n" +
" \"envType\": 0,\n" +
" \"column\": [\n" +
" \"id\",\n" +
" \"name\"\n" +
" ],\n" +
" \"emptyAsNull\": false,\n" +
" \"tableComment\": \"null\",\n" +
" \"table\": \"user\"\n" +
" },\n" +
" \"name\": \"Writer\",\n" +
" \"category\": \"writer\"\n" +
" }\n" +
" ],\n" +
" \"setting\": {\n" +
" \"executeMode\": null,\n" +
" \"errorLimit\": {\n" +
" \"record\": \"\"\n" +
" },\n" +
" \"speed\": {\n" +
" \"concurrent\": 2,\n" +
" \"throttle\": false\n" +
" }\n" +
" },\n" +
" \"order\": {\n" +
" \"hops\": [\n" +
" {\n" +
" \"from\": \"Reader\",\n" +
" \"to\": \"Writer\"\n" +
" }\n" +
" ]\n" +
" }\n" +
"}";
CreateDISyncTaskRequest request = new CreateDISyncTaskRequest();
request.setProjectId(projectId);
request.setTaskType(taskType);
request.setTaskContent(taskContent);
request.setTaskName(fileName);
request.setTaskParam("{\"FileFolderPath\":\"业务流程/自动化测试空间_勿动/数据集成\",\"ResourceGroup\":\"S_res_group_XXX\"}");
// 使用数据集成独享资源组,
CreateDISyncTaskResponse response1 = client.getAcsResponse(request);
return response1.getData().getFileId();
}
public static void updateFile(Long fileId) throws Exception {
UpdateFileRequest request = new UpdateFileRequest();
request.setProjectId(2043L);
request.setFileId(fileId);
request.setAutoRerunTimes(3);
request.setRerunMode("FAILURE_ALLOWED");
request.setCronExpress("00 30 05 * * ?");
request.setCycleType("DAY");
request.setResourceGroupIdentifier("S_res_group_XXX");
// 使用调度独享资源组
request.setInputList("dataworks_di_autotest_root");
UpdateFileResponse response1 = client.getAcsResponse(request);
}
public static Long submitFile(Long fileId) throws Exception {
SubmitFileRequest request = new SubmitFileRequest();
request.setProjectId(2043L);
request.setFileId(fileId);
SubmitFileResponse acsResponse = client.getAcsResponse(request);
Long deploymentId = acsResponse.getData();
return deploymentId;
}
public static void getDeployment(Long deploymentId) throws Exception {
GetDeploymentRequest request = new GetDeploymentRequest();
request.setProjectId(2043L);
request.setDeploymentId(deploymentId);
GetDeploymentResponse acsResponse = client.getAcsResponse(request);
System.out.println(acsResponse.getData().getDeployment().getStatus());
}
public static Long deploy(Long fileId) throws Exception {
DeployFileRequest request = new DeployFileRequest();
request.setProjectId(2043L);
request.setFileId(fileId);
DeployFileResponse acsResponse = client.getAcsResponse(request);
Long deploymentId = acsResponse.getData();
return deploymentId;
}
public static Long listNode(String nodeName) throws Exception {
ListNodesRequest request = new ListNodesRequest();
request.setProjectId(2043L);
request.setNodeName(nodeName);
request.setProjectEnv("PROD");
ListNodesResponse acsResponse = client.getAcsResponse(request);
List<ListNodesResponse.Data.NodesItem> nodesItemList = acsResponse.getData().getNodes();
return nodesItemList.get(0).getNodeId();
}
public static void RunCycleDagNodes(Long nodeId) throws Exception {
RunCycleDagNodesRequest request = new RunCycleDagNodesRequest();
request.setIncludeNodeIds(nodeId.toString());
request.setName("rerun_job");
request.setParallelism(false);
request.setProjectEnv("PROD");
request.setRootNodeId(nodeId);
request.setStartBizDate("2021-09-29 00:00:00");
request.setEndBizDate("2021-09-29 00:00:00");
request.setProjectEnv("PROD");
RunCycleDagNodesResponse acsResponse = client.getAcsResponse(request);
}
static IAcsClient client;
public static void main(String[] args) throws Exception {
String akId = "XX";
String akSecret = "XX"; // 使用个人的ak
String regionId = "cn-chengdu";
IClientProfile profile = DefaultProfile.getProfile(regionId, akId, akSecret);
DefaultProfile.addEndpoint(regionId, "dataworks-public", "dataworks." + regionId + ".aliyuncs.com");
client = new DefaultAcsClient(profile);
String taskName = "offline_job_0930_1648";
Long fileId = createTask(taskName); //创建数据集成任务。
updateFile(fileId); //修改数据集成任务的调度属性。
Long deployId = submitFile(fileId); //提交数据集成任务。
getDeployment(deployId); //查询发布包状态。
Thread.sleep(10000); //等待提交检查成功。
getDeployment(deployId); //查询发布包状态。
deployId = deploy(fileId); //发布数据集成任务。
getDeployment(deployId); //查询发布包状态。
Thread.sleep(10000); //等待发布检查成功。
getDeployment(deployId); //查询发布包状态。
Long nodeId = listNode(taskName); //查找节点ID。
RunCycleDagNodes(nodeId); //批量补数据。
}
}