创建数据同步任务(API)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 本文为您介绍如何使用OpenAPI创建数据集成同步任务,同步来源端数据至去向端。

本文为您介绍如何使用OpenAPI创建数据集成同步任务,同步来源端数据至去向端。

前提条件

使用限制

  • DataWorks当前仅支持使用OpenAPI创建数据集成离线同步任务。

  • 调用CreateDISyncTask接口创建数据集成同步任务,仅支持使用脚本模式配置同步任务内容,详情请参见通过脚本模式配置任务

  • DataWorks暂不支持使用OpenAPI创建业务流程,您需要使用现有的业务流程创建数据同步任务。

配置环境依赖及账号认证

  • 配置Maven依赖。

    • 打开Maven项目下的pom.xml文件,添加aliyun-java-sdk-core

      • <dependency>
          <groupId>com.aliyun</groupId>
          <artifactId>aliyun-java-sdk-core</artifactId>
          <version>4.5.20</version>
        </dependency>
    • 打开Maven项目下的pom.xml文件,添加aliyun-java-sdk-dataworks-public

      • <dependency>
          <groupId>com.aliyun</groupId>
          <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
          <version>3.4.2</version>
        </dependency>
  • 客户端认证。

    • 使用OpenAPI创建数据同步任务前,需要调用如下语句对登录阿里云的账号相关信息进行认证。如果阿里云的账号信息认证通过,则继续执行后续任务,如果认证不通过,则该调用会报错,您需要根据实际报错处理相关问题。

      •  DefaultProfile profile = DefaultProfile.getProfile(
                                   "regionId",            //DataWorks工作空间所在的地域,例如cn-hangzhou。
                                   "<yourAccessKeyId>",   //登录DataWorks工作空间的阿里云账号的AccessKey ID。
                                   "<yourAccessSecret>"); //登录DataWorks工作空间的阿里云账号的AccessKey Secret。
         IAcsClient client = new DefaultAcsClient(profile);
      • 您可以登录DataWorks控制台鼠标悬停至右上角的用户头像,单击AccessKey管理,进入AccessKey管理页面获取AccessKey ID和AccessKey Secret。

配置流程

完成上述 配置环境依赖及账号认证后,您可以通过OpenAPI调用相关接口,创建数据同步任务,同步来源端数据至去向端。配置流程如下:
  1. 创建数据集成任务。

  2. 配置任务的调度依赖。

  3. 提交数据集成任务。

  4. 查询同步任务提交状态。

  5. 发布同步任务至生产环境。

  6. 查询同步任务发布状态。

配置步骤

  1. 创建数据集成任务。

    1. 调用CreateDISyncTask接口,创建数据集成任务。如下代码仅示例部分参数的配置,更多参数详情请参见CreateDISyncTask

    2. public void createFile() throws ClientException{
              CreateDISyncTaskRequest request = new CreateDISyncTaskRequest();
              request.setProjectId(181565L);
              request.setTaskType("DI_OFFLINE");
              request.setTaskContent("{\"type\":\"job\",\"version\":\"2.0\",\"steps\":[{\"stepType\":\"mysql\",\"parameter\":{\"envType\":1,\"datasource\":\"dh_mysql\",\"column\":[\"id\",\"name\"],\"tableComment\":\"same表comment\",\"connection\":[{\"datasource\":\"dh_mysql\",\"table\":[\"same\"]}],\"where\":\"\",\"splitPk\":\"id\",\"encoding\":\"UTF-8\"},\"name\":\"Reader\",\"category\":\"reader\"},{\"stepType\":\"odps\",\"parameter\":{\"partition\":\"pt=${bizdate}\",\"truncate\":true,\"datasource\":\"odps_first\",\"envType\":1,\"column\":[\"id\",\"name\"],\"emptyAsNull\":false,\"tableComment\":\"same表comment\",\"table\":\"same\"},\"name\":\"Writer\",\"category\":\"writer\"}],\"setting\":{\"errorLimit\":{\"record\":\"\"},\"speed\":{\"throttle\":false,\"concurrent\":2}},\"order\":{\"hops\":[{\"from\":\"Reader\",\"to\":\"Writer\"}]}}");
              request.setTaskParam("{\"FileFolderPath\":\"业务流程/new_biz/数据集成\",\"ResourceGroup\":\"S_res_group_280749521950784_1602767279794\"}");
              request.setTaskName("new_di_task_0607_1416");
      
      
              String akId = "XXX";
              String akSecret = "XXXX";
              String regionId = "cn-hangzhou";
              IClientProfile profile = DefaultProfile.getProfile(regionId, akId, akSecret);
              DefaultProfile.addEndpoint("cn-hangzhou","dataworks-public","dataworks.cn-hangzhou.aliyuncs.com");
              IAcsClient client;
      
              client = new DefaultAcsClient(profile);
      
              CreateDISyncTaskResponse response1 = client.getAcsResponse(request);
              Gson gson1 = new Gson();
      
              System.out.println(gson1.toJson(response1));
      }
  2. 设置任务的调度依赖。

    1. 调用UpdateFile接口,设置数据集成任务的调度依赖。

    2. public static void updateFile(Long fileId) throws Exception {
              UpdateFileRequest request = new UpdateFileRequest();
              request.setProjectId(2043L);
              request.setFileId(fileId);
              request.setAutoRerunTimes(3);
              request.setRerunMode("FAILURE_ALLOWED");
              request.setCronExpress("00 30 05 * * ?");
              request.setCycleType("DAY");
              request.setResourceGroupIdentifier("S_res_group_XXX");
              // 使用调度独享资源组
              request.setInputList("dataworks_di_autotest_root");
              UpdateFileResponse response1 = client.getAcsResponse(request);
          }
    3. 上述代码仅示例部分参数的配置,更多参数详情请参见UpdateFile

  3. 提交数据集成任务。

    1. 调用SubmitFile接口,提交数据集成任务至调度系统的开发环境。

    2.  public void submitFile() throws ClientException{
              SubmitFileRequest request = new SubmitFileRequest();
              request.setProjectId(78837L);
              request.setProjectIdentifier("zxy_8221431");
            // 此节点ID为创建节点时返回的ID,对应数据库File表的file_id。
              request.setFileId(501576542L);
              request.setComment("备注");
              SubmitFileResponse acsResponse = client.getAcsResponse(request);
            //调用GetDeployment接口,获取本次发布的具体情况。
            Long deploymentId = acsResponse.getData();
              log.info(acsResponse.toString());
      }
    3. 上述代码仅示例部分参数的配置,更多参数详情请参见SubmitFile

  4. 查询同步任务的提交状态。

    1. 任务提交后,Response会返回deploymentId,您可以调用GetDeployment接口,通过deploymentId获取本次发布包的详细信息。当GetDeployment接口的返回参数Status取值为1时,则表示发布包此次提交成功。任务提交成功后才可进行后续的发布操作,如果任务提交失败,则请根据报错尽快排查处理。

    2. public void getDeployment() throws ClientException{
              GetDeploymentRequest request = new GetDeploymentRequest();
              request.setProjectId(78837L);
              request.setProjectIdentifier("zxy_8221431");
            //DeploymentId为提交或发布的返回值。
              request.setDeploymentId(2776067L);
              GetDeploymentResponse acsResponse = client.getAcsResponse(request);
              log.info(acsResponse.getData().toString());
          }
    3. 上述代码仅示例部分参数的配置,更多参数详情请参见GetDeployment

  5. 发布同步任务到生产环境。

    1. 调用DeployFile接口,发布数据集成同步任务至生产环境。

    2. public void deploy() throws ClientException{
              DeployFileRequest request = new DeployFileRequest();
              request.setProjectIdentifier("zxy_8221431");
              request.setFileId(501576542L);
              request.setComment("备注");
              //NodeId和file_id二选一。NodeId的值为调度配置中基础属性的节点ID。
              request.setNodeId(700004537241L);
              DeployFileResponse acsResponse = client.getAcsResponse(request);
               //调用GetDeployment接口,获取本次发布的具体情况。
              Long deploymentId = acsResponse.getData();
              log.info(acsResponse.getData().toString());
          }
    3. 上述代码仅示例部分参数的配置,更多参数详情请参见DeployFile

  6. 查询同步任务的发布状态。

    1. 任务发布后,Response会返回deploymentId,您可以调用GetDeployment接口,通过deploymentId获取本次发布包的详细信息。当GetDeployment接口的返回参数Status取值为1时,则表示发布包此次发布成功。

    2. public void getDeployment() throws ClientException{
              GetDeploymentRequest request = new GetDeploymentRequest();
              request.setProjectId(78837L);
              request.setProjectIdentifier("zxy_8221431");
            //DeploymentId为提交或发布的返回值。
              request.setDeploymentId(2776067L);
              GetDeploymentResponse acsResponse = client.getAcsResponse(request);
              log.info(acsResponse.getData().toString());
          }
    3. 上述代码仅示例部分参数的配置,更多参数详情请参见GetDeployment

管理同步任务

成功创建数据集成同步任务后,您可以对同步任务执行如下管理操作:
  • 修改同步任务的相关配置:调用UpdateDISyncTask接口更新任务的Content,或通过TaskParam参数来更新使用的独享资源组,详情请参见UpdateDISyncTask

  • 为同步任务批量补数据:调用RunCycleDagNodes接口,为目标任务执行批量补数据操作,详情请参见

    RunCycleDagNodes

Java SDK调用代码示例

创建数据集成离线同步任务的完整代码示例如下。
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.dataworks_public.model.v20200518.*;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import java.util.List;

public class createofflineTask {
    static Long createTask(String fileName) throws  Exception {
        Long projectId = 2043L;
        String taskType = "DI_OFFLINE";
        String taskContent = "{\n" +
                "    \"type\": \"job\",\n" +
                "    \"version\": \"2.0\",\n" +
                "    \"steps\": [\n" +
                "        {\n" +
                "            \"stepType\": \"mysql\",\n" +
                "            \"parameter\": {\n" +
                "                \"envType\": 0,\n" +
                "                \"datasource\": \"mysql_autotest_dev\",\n" +
                "                \"column\": [\n" +
                "                    \"id\",\n" +
                "                    \"name\"\n" +
                "                ],\n" +
                "                \"connection\": [\n" +
                "                    {\n" +
                "                        \"datasource\": \"mysql_autotest_dev\",\n" +
                "                        \"table\": [\n" +
                "                            \"user\"\n" +
                "                        ]\n" +
                "                    }\n" +
                "                ],\n" +
                "                \"where\": \"\",\n" +
                "                \"splitPk\": \"\",\n" +
                "                \"encoding\": \"UTF-8\"\n" +
                "            },\n" +
                "            \"name\": \"Reader\",\n" +
                "            \"category\": \"reader\"\n" +
                "        },\n" +
                "        {\n" +
                "            \"stepType\": \"odps\",\n" +
                "            \"parameter\": {\n" +
                "                \"partition\": \"pt=${bizdate}\",\n" +
                "                \"truncate\": true,\n" +
                "                \"datasource\": \"odps_first\",\n" +
                "                \"envType\": 0,\n" +
                "                \"column\": [\n" +
                "                    \"id\",\n" +
                "                    \"name\"\n" +
                "                ],\n" +
                "                \"emptyAsNull\": false,\n" +
                "                \"tableComment\": \"null\",\n" +
                "                \"table\": \"user\"\n" +
                "            },\n" +
                "            \"name\": \"Writer\",\n" +
                "            \"category\": \"writer\"\n" +
                "        }\n" +
                "    ],\n" +
                "    \"setting\": {\n" +
                "        \"executeMode\": null,\n" +
                "        \"errorLimit\": {\n" +
                "            \"record\": \"\"\n" +
                "        },\n" +
                "        \"speed\": {\n" +
                "            \"concurrent\": 2,\n" +
                "            \"throttle\": false\n" +
                "        }\n" +
                "    },\n" +
                "    \"order\": {\n" +
                "        \"hops\": [\n" +
                "            {\n" +
                "                \"from\": \"Reader\",\n" +
                "                \"to\": \"Writer\"\n" +
                "            }\n" +
                "        ]\n" +
                "    }\n" +
                "}";
        CreateDISyncTaskRequest request = new CreateDISyncTaskRequest();
        request.setProjectId(projectId);
        request.setTaskType(taskType);
        request.setTaskContent(taskContent);
        request.setTaskName(fileName);
        request.setTaskParam("{\"FileFolderPath\":\"业务流程/自动化测试空间_勿动/数据集成\",\"ResourceGroup\":\"S_res_group_XXX\"}");
        // 使用数据集成独享资源组,
        CreateDISyncTaskResponse response1 = client.getAcsResponse(request);
        return response1.getData().getFileId();
    }

    public static void updateFile(Long fileId) throws Exception {
        UpdateFileRequest request = new UpdateFileRequest();
        request.setProjectId(2043L);
        request.setFileId(fileId);
        request.setAutoRerunTimes(3);
        request.setRerunMode("FAILURE_ALLOWED");
        request.setCronExpress("00 30 05 * * ?");
        request.setCycleType("DAY");
        request.setResourceGroupIdentifier("S_res_group_XXX");
        // 使用调度独享资源组
        request.setInputList("dataworks_di_autotest_root");
        UpdateFileResponse response1 = client.getAcsResponse(request);
    }

    public static  Long submitFile(Long fileId) throws  Exception {
        SubmitFileRequest request = new SubmitFileRequest();
        request.setProjectId(2043L);
        request.setFileId(fileId);
        SubmitFileResponse acsResponse = client.getAcsResponse(request);
        Long deploymentId = acsResponse.getData();
        return deploymentId;
    }

    public static  void getDeployment(Long deploymentId) throws Exception {
        GetDeploymentRequest request = new GetDeploymentRequest();
        request.setProjectId(2043L);
        request.setDeploymentId(deploymentId);
        GetDeploymentResponse acsResponse = client.getAcsResponse(request);
        System.out.println(acsResponse.getData().getDeployment().getStatus());
    }

    public static  Long deploy(Long fileId) throws Exception {
        DeployFileRequest request = new DeployFileRequest();
        request.setProjectId(2043L);
        request.setFileId(fileId);
        DeployFileResponse acsResponse = client.getAcsResponse(request);
        Long deploymentId = acsResponse.getData();
        return deploymentId;
    }

    public static Long listNode(String nodeName) throws Exception {
        ListNodesRequest request = new ListNodesRequest();
        request.setProjectId(2043L);
        request.setNodeName(nodeName);
        request.setProjectEnv("PROD");
        ListNodesResponse acsResponse = client.getAcsResponse(request);
        List<ListNodesResponse.Data.NodesItem> nodesItemList = acsResponse.getData().getNodes();
        return nodesItemList.get(0).getNodeId();
    }

    public static void RunCycleDagNodes(Long nodeId) throws Exception {
        RunCycleDagNodesRequest request = new RunCycleDagNodesRequest();
        request.setIncludeNodeIds(nodeId.toString());
        request.setName("rerun_job");
        request.setParallelism(false);
        request.setProjectEnv("PROD");
        request.setRootNodeId(nodeId);
        request.setStartBizDate("2021-09-29 00:00:00");
        request.setEndBizDate("2021-09-29 00:00:00");
        request.setProjectEnv("PROD");
        RunCycleDagNodesResponse acsResponse = client.getAcsResponse(request);
    }

    static IAcsClient client;

    public static void main(String[] args) throws Exception {
        String akId = "XX";
        String akSecret = "XX"; // 使用个人的ak
        String regionId = "cn-chengdu";
        IClientProfile profile = DefaultProfile.getProfile(regionId, akId, akSecret);
        DefaultProfile.addEndpoint(regionId, "dataworks-public", "dataworks." + regionId + ".aliyuncs.com");

        client = new DefaultAcsClient(profile);
        String taskName = "offline_job_0930_1648";
        Long fileId = createTask(taskName); //创建数据集成任务。
        updateFile(fileId);   //修改数据集成任务的调度属性。
        Long deployId = submitFile(fileId); //提交数据集成任务。
        getDeployment(deployId);  //查询发布包状态。
        Thread.sleep(10000); //等待提交检查成功。
        getDeployment(deployId);  //查询发布包状态。
        deployId = deploy(fileId);  //发布数据集成任务。
        getDeployment(deployId);    //查询发布包状态。
        Thread.sleep(10000);        //等待发布检查成功。
        getDeployment(deployId);    //查询发布包状态。
        Long nodeId = listNode(taskName);  //查找节点ID。
        RunCycleDagNodes(nodeId);   //批量补数据。
    }
}

相关文章
|
7月前
|
存储 算法 关系型数据库
实时计算 Flink版产品使用合集之在Flink Stream API中,可以在任务启动时初始化一些静态的参数并将其存储在内存中吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
132 4
|
3月前
|
API Python
4. salt-api请求salt-minion执行任务 tornado超时报错
4. salt-api请求salt-minion执行任务 tornado超时报错
|
5月前
|
监控 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在进行数据同步时,重新创建了一个新的任务,但发现无法删除旧任务同步的历史数据,是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
自然语言处理 PyTorch API
`transformers`库是Hugging Face提供的一个开源库,它包含了大量的预训练模型和方便的API,用于自然语言处理(NLP)任务。在文本生成任务中,`transformers`库提供了许多预训练的生成模型,如GPT系列、T5、BART等。这些模型可以通过`pipeline()`函数方便地加载和使用,而`generate()`函数则是用于生成文本的核心函数。
`transformers`库是Hugging Face提供的一个开源库,它包含了大量的预训练模型和方便的API,用于自然语言处理(NLP)任务。在文本生成任务中,`transformers`库提供了许多预训练的生成模型,如GPT系列、T5、BART等。这些模型可以通过`pipeline()`函数方便地加载和使用,而`generate()`函数则是用于生成文本的核心函数。
|
6月前
|
分布式计算 DataWorks 关系型数据库
DataWorks操作报错合集之离线同步任务中,把表数据同步到POLARDB,显示所有数据都是脏数据,报错信息:ERROR JobContainer - 运行scheduler 模式[local]出错.是什么原因
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
6月前
|
DataWorks 安全 API
DataWorks产品使用合集之有api或者是sdk可以获取到 dataworks 的任务运行的结果吗
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
42 1
|
7月前
|
NoSQL MongoDB 数据库
实时计算 Flink版操作报错之在使用Flink CDC进行数据同步时遇到了全量同步不完全的问题,同时有任务偶尔报错,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
Java 关系型数据库 流计算
实时计算 Flink版操作报错合集之配置cats进行从MySQL到StarRocks的数据同步任务时遇到报错,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
359 0
|
6月前
|
SQL 监控 Java
实时计算 Flink版产品使用问题之在进行数据同步时,修改了YAML文件以增加新的同步表并取消了之前的同步任务,如何从之前的检查点继续执行
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
数据采集 SQL DataWorks
DataWorks操作报错合集之在阿里云DataWorks的数据同步任务中,过滤条件不生效,如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
128 2