springboot项目集成dolphinscheduler调度器 可拖拽spark任务管理

简介: springboot项目集成dolphinscheduler调度器 可拖拽spark任务管理

image.png
dolphinscheduler调度器接入注意事项等信息可参考我的上一篇博客进行了解,地址在这里 ->
@[TOC]

一、功能清单

image.png

二、可拖拽spark任务管理

说明:任务管理实际是操作dolphinscheduler调度器中的项目中的用户下唯一工作流中的各种类型节点的管理操作

共用的依赖

<!--httpclient-->
<dependency>
    <groupId>commons-httpclient</groupId>
    <artifactId>commons-httpclient</artifactId>
    <version>3.1</version>
</dependency>

共用配置文件

dolphinscheduler.token=xxx
dolphinscheduler.address=http://IP:12345

共用代码

@Autowired
private RestTemplate restTemplate;
@Value("${dolphinscheduler.token}")
String token;
@Value("${dolphinscheduler.address}")
String address;
public static final int ZERO = 0;
public static final int SUCCESS = 200;
@Autowired
private DragSparkTaskService dragSparkTaskService;
@Value("${spark.main.class}")
String mainClass;
public static final String CREATE = "create";
public static final String UPDATE = "update";
public static final String ADD = "add";
public static final String DELETE = "delete";
public static final String ONLINE = "ONLINE";
public static final String OFFLINE = "OFFLINE";
public static final int ONE_THOUSAND_AND_FIVE_HUNDRED = 1500;
public static final int SIX = 6;
public static final int EIGHTY = 80;
public static final int THREE = 3;
@Autowired
private StringRedisTemplate redisTemplate;
@Value("${drag.task.state}")
String dragTaskState;
@Autowired
private DragSparkTaskMapper dragSparkTaskMapper;

1.创建可拖拽模型spark任务

image.png
image.png

请求参数举例:

{
   
  "name": "测试任务模型1",
  "describeInfo": "测试任务模型1",
  "projectName": "spark线性回归模型",
  "task": {
   
    "edges": [
      {
   
        "id": "13889c99",
        "index": 2,
        "source": "356bc2be",
        "target": "33838a0d",
        "sourceAnchor": 2,
        "targetAnchor": 0
      }
    ],
    "nodes": [
      {
   
        "x": 482.671875,
        "y": 89.125,
        "id": "356bc2be",
        "size": "72*72",
        "type": "node",
        "color": "#1890FF",
        "index": 0,
        "label": "数据源",
        "shape": "flow-circle",
        "config": {
   
          "sourceType": "mysql",
          "targetTable": "machine_learning_house_info2"
        },
        "inputs": [],
        "outputs": [],
        "modelType": "dataSource"
      },
      {
   
        "x": 502.171875,
        "y": 269.125,
        "id": "33838a0d",
        "size": "110*42",
        "type": "node",
        "color": "#66C35D",
        "index": 1,
        "label": "全表统计",
        "shape": "flow-capsule",
        "config": {
   
          "selectColumns": "*"
        },
        "inputs": [],
        "outputs": [],
        "modelType": "fullTableStatistics"
      }
    ]
  }
}

image.png

代码

/**
     * 创建任务-创建用户下唯一工作流,无则创建有则并排添加
     * @param request request
     * @param vo 任务参数
     * @author liudz
     * @date 2021/5/8
     * @return 执行结果
     **/
    @PostMapping("/project/process")
    @Transactional(rollbackFor = Exception.class)
    public Response operatorDragSparkTask(HttpServletRequest request, @RequestBody DragSparkTaskVo vo) {
   
        Long userId = Long.valueOf(request.getUserPrincipal().getName());
        vo.setCreateId(userId);
        vo.setUpdateId(userId);
        if (vo == null || org.apache.commons.lang3.StringUtils.isBlank(vo.getName()) || vo.getDescribeInfo() == null
            || vo.getCreateId() == null) {
   
            log.error("--DragSparkTaskController--addDragSparkTask--PARAM_ERROR!--");
            return Response.error(Msg.PARAM_ERROR);
        }
        Response<DragSparkTaskVo> dragSparkTaskResponse = dragSparkTaskService.addDragSparkTask(vo);
        log.info("--(1)addDragSparkTask--success");
        if (dragSparkTaskResponse.getCode() == SUCCESS) {
   
            Boolean verifyResult = verifyProcessExist(userId + "-dragSparkTask", vo.getProjectName());
            log.info("--(2)verifyProcessExist--success:{}", verifyResult);
            if (!verifyResult) {
   
                ProcessDto processDto = packageProcessParam(
                        "create", userId + "-dragSparkTask", vo, null);
                log.info("--(3)packageProcessParam--success");
                dragSparkTaskResponse =  createProcess(vo, processDto);
            } else {
   
                //获取用户下唯一工作流ID
                DolphinschedulerResponse processInfoList = getUserProcess(vo.getProjectName());
                JSONObject processJson = new JSONObject();
                log.info("--(3)getUserProcess--success:{}", processInfoList);
                List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();
                for (Map<String, Object> map : list) {
   
                    if (map.get("name").equals(userId + "-dragSparkTask")) {
   
                        processJson.fluentPutAll(map);
                    }
                }
                ProcessDto processDto = packageProcessParam(
                        "add", userId + "-dragSparkTask", vo, processJson);
                processDto.setId(processJson.getInteger("id"));
                log.info("--(4)packageProcessParam--success");
                if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {
   
                    releaseProcessDefinition(vo.getProjectName(), userId + "-dragSparkTask",
                            processDto.getId(), 0);
                    log.info("--(5)releaseProcessDefinition--OFFLINE--success");
                }
                dragSparkTaskResponse =  updateProcess(vo, processDto);
            }
        }
        return dragSparkTaskResponse;
    }
/**
     * 校验工作流是否存在
     * 
     * @param processName
     *            工作流名称
     * @param projectName 项目名称
     * @author liudz
     * @date 2021/5/8
     * @return boolean
     **/
    public Boolean verifyProcessExist(String processName, String projectName) {
   
        HttpHeaders headers = new HttpHeaders();
        headers.set("token", token);
        headers.set("Content-Type", "application/json");
        HttpEntity requestEntity = new HttpEntity(headers);
        ResponseEntity<DolphinschedulerResponse> response =
            restTemplate.exchange(address + "/dolphinscheduler/projects/" + projectName
                            + "/process/verify-name?name=" + processName,
                HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
        if (response.getBody().getCode() == ZERO) {
   
            return false;
        }
        return true;
    }
/**
     * 获取dolphinscheduler上的某用户下唯一工作流
     * @param projectName 项目名称
     * @author liudz
     * @date 2021/5/8
     * @return id
     **/
    public DolphinschedulerResponse getUserProcess(String projectName) {
   
        HttpHeaders headers = new HttpHeaders();
        headers.set("token", token);
        headers.set("Content-Type", "application/json");
        HttpEntity requestEntity = new HttpEntity(headers);
        ResponseEntity<DolphinschedulerResponse> response =
            restTemplate.exchange(address + "/dolphinscheduler/projects/" + projectName + "/process/list",
                HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
        return response.getBody();
    }
/**
     *  封装参数
     * @param type 操作类型
     * @param processName 用户工作流名称
     * @param vo 任务参数
     * @param processJson 工作流json
     * @author liudz
     * @date 2021/5/13
     * @return ProcessDto
     **/
    public ProcessDto packageProcessParam(String type, String processName, DragSparkTaskVo vo, JSONObject processJson) {
   
        ProcessDto processDto = new ProcessDto();
        processDto.setConnects("[]");
        processDto.setName(processName);
        JSONObject locationsOne = new JSONObject();
        JSONObject locationsTwo = new JSONObject();
        locationsTwo.fluentPut("name", "spark-" + vo.getId()).fluentPut("targetarr", "").fluentPut("nodenumber", "0");
        locationsTwo.fluentPut("x", 0).fluentPut("y", 0);
        locationsOne.put("tasks-" + vo.getId(), locationsTwo);

        // 创建工作流
        if (CREATE.equals(type)) {
   
            processDto = packageProcessParamOfCreate(processDto, vo, locationsOne);
         } else if (ADD.equals(type)) {
   
            //工作流添加节点
            processDto = packageProcessParamOfAdd(processDto, vo, processJson, locationsOne, locationsTwo);
        } else if (UPDATE.equals(type)) {
   
            //更新工作流-只更新参数processDefinitionJson的tasks参数
            processDto = packageProcessParamOfUpdate(processDto, processJson, vo);
        } else if (DELETE.equals(type)) {
   
            //更新工作流或删除工作流-更新则删除参数processDefinitionJson的tasks参数
            processDto = packageProcessParamOfDelete(processDto, processJson, vo);
        }
        return processDto;
    }
/**
     * packageProcessParamOfCreate
     * @param processDto 工作流参数
     * @param vo 任务参数
     * @param locationsOne locationsOne
     * @author liudz
     * @date 2021/5/7
     * @return ProcessDto
     **/
    public ProcessDto packageProcessParamOfCreate(ProcessDto processDto, DragSparkTaskVo vo, JSONObject locationsOne) {
   
        processDto.setLocations(locationsOne.toString());
        JSONObject processOne = new JSONObject();
        processOne.fluentPut("globalParams", new ArrayList<>()).fluentPut("tenantId", THREE).fluentPut("timeout", 0);
        JSONObject processTwo = new JSONObject();
        processTwo.fluentPut("type", "SPARK").fluentPut("id", "tasks-" + vo.getId());
        processTwo.fluentPut("name", "spark-" + vo.getId()).fluentPut("description", "");
        JSONArray nodesArray = parseLineAndNode(vo.getTask());
        JSONObject json = new JSONObject();
        json.put("id", vo.getId());
        json.put("name", vo.getName());
        json.put("describeInfo", vo.getDescribeInfo());
        json.put("nodes", nodesArray);
        String taskJsonString = json.toString().replace("}}", "} }").replace("{
   {", "{ {");
        processTwo.put("params", JSONObject.parseObject("{\"mainClass\":\"" + mainClass + "\","
               + "\"mainJar\":{\"id\":" + getSparkResourceJarId() + "},\"deployMode\":\"cluster\","
               + "\"resourceList\":[],\"localParams\":[],\"driverCores\":1,\"driverMemory\":\"1G\","
               + "\"numExecutors\":\"1\",\"executorMemory\":\"1G\",\"executorCores\":\"1\",\"mainArgs\":\"\\\""
               + taskJsonString.replace("\"", "\\\\\\\"") + "\\\"\",\"others\":\"\","
               + "\"programType\":\"JAVA\",\"sparkVersion\":\"SPARK2\"}"));
        JSONObject jsonTimeout = new JSONObject();
        jsonTimeout.fluentPut("strategy", "").fluentPut("interval", null).fluentPut("enable", false);
        processTwo.fluentPut("timeout", jsonTimeout).fluentPut("runFlag", "NORMAL");
        JSONObject processTree = new JSONObject();
        processTree.fluentPut("successNode", new ArrayList<>()).fluentPut("failedNode", new ArrayList<>());
        JSONObject jsonconditionResult = new JSONObject();
        jsonconditionResult.put("successNode", new ArrayList<>());
        jsonconditionResult.put("failedNode", new ArrayList<>());
        processTwo.fluentPut("conditionResult", jsonconditionResult).fluentPut("dependence", new JSONObject());
        processTwo.fluentPut("maxRetryTimes", "0").fluentPut("retryInterval", "1");
        processTwo.fluentPut("taskInstancePriority", "MEDIUM").fluentPut("workerGroup", "default");
        processTwo.fluentPut("preTasks", new ArrayList<>());
        JSONArray processTaskArray = new JSONArray();
        processTaskArray.add(processTwo);
        processOne.put("tasks", processTaskArray);
        processDto.setProcessDefinitionJson(processOne.toString());
        return processDto;
    }
/**
     * packageProcessParamOfAdd
     * @param processDto 工作流参数
     * @param locationsOne locationsOne
     * @param locationsTwo locationsTwo
     * @param vo 任务参数
     * @param processJson 工作流json
     * @author liudz
     * @date 2021/5/7
     * @return ProcessDto
     **/
    public ProcessDto packageProcessParamOfAdd(ProcessDto processDto, DragSparkTaskVo vo, JSONObject processJson,
                                               JSONObject locationsOne, JSONObject locationsTwo) {
   
        String maxTaskKey = JsonTools.getJsonStringMaxKey(processJson.getString("locations"));
        Integer x = processJson.getJSONObject("locations").getJSONObject(maxTaskKey).getInteger("x");
        Integer y = processJson.getJSONObject("locations").getJSONObject(maxTaskKey).getInteger("y");
        if (x < ONE_THOUSAND_AND_FIVE_HUNDRED) {
   
            locationsTwo.fluentPut("x", x + EIGHTY).fluentPut("y", y);
        } else if (x >= ONE_THOUSAND_AND_FIVE_HUNDRED) {
   
            locationsTwo.fluentPut("y", y + EIGHTY).fluentPut("x", 0);
        }
        locationsOne = processJson.getJSONObject("locations").fluentPut("tasks-" + vo.getId(), locationsTwo);
        processDto.setLocations(locationsOne.toString());
        processDto.setId(processJson.getInteger("id"));
        JSONObject processTwo = new JSONObject();
        processTwo.fluentPut("type", "SPARK").fluentPut("id", "tasks-" + vo.getId());
        processTwo.fluentPut("name", "spark-" + vo.getId()).fluentPut("description", "");
        JSONArray nodesArray = parseLineAndNode(vo.getTask());
        JSONObject json = new JSONObject();
        json.put("id", vo.getId());
        json.put("name", vo.getName());
        json.put("describeInfo", vo.getDescribeInfo());
        json.put("nodes", nodesArray);
        String taskJsonString = json.toString().replace("}}", "} }").replace("{
   {", "{ {");
        processTwo.put("params", JSONObject.parseObject("{\"mainClass\":\"" + mainClass + "\","
               + "\"mainJar\":{\"id\":" + getSparkResourceJarId() + "},\"deployMode\":\"cluster\","
               + "\"resourceList\":[],\"localParams\":[],\"driverCores\":1,\"driverMemory\":\"1G\","
               + "\"numExecutors\":\"1\",\"executorMemory\":\"1G\",\"executorCores\":\"1\",\"mainArgs\":\"\\\""
               + taskJsonString.replace("\"", "\\\\\\\"") + "\\\"\",\"others\":\"\","
               + "\"programType\":\"JAVA\",\"sparkVersion\":\"SPARK2\"}"));
        JSONObject jsonTimeout = new JSONObject();
        jsonTimeout.fluentPut("strategy", "").fluentPut("interval", null).fluentPut("enable", false);
        processTwo.fluentPut("timeout", jsonTimeout).fluentPut("runFlag", "NORMAL");
        JSONObject processTree = new JSONObject();
        processTree.fluentPut("successNode", new ArrayList<>()).fluentPut("failedNode", new ArrayList<>());
        JSONObject jsonconditionResult = new JSONObject();
        jsonconditionResult.put("successNode", new ArrayList<>());
        jsonconditionResult.put("failedNode", new ArrayList<>());
        processTwo.fluentPut("conditionResult", jsonconditionResult).fluentPut("dependence", new JSONObject());
        processTwo.fluentPut("maxRetryTimes", "0").fluentPut("retryInterval", "1");
        processTwo.fluentPut("taskInstancePriority", "MEDIUM").fluentPut("workerGroup", "default");
        processTwo.fluentPut("preTasks", new ArrayList<>());
        JSONObject jsonNew = processJson.getJSONObject("processDefinitionJson");
        JSONArray jsonArray = jsonNew.getJSONArray("tasks");
        jsonArray.add(processTwo);
        jsonNew.put("tasks", jsonArray);
        processDto.setProcessDefinitionJson(jsonNew.toString());
        return processDto;
    }
/**
     * 工作流【上线或者下线】
     * @param projectName 项目名称
     * @param processName 用户工作流名称
     * @param processId 工作流ID
     * @param releaseState 上下线状态操作【0:下线,1:上线】
     * @author liudz
     * @date 2021/5/7
     * @return 执行结果
     **/
    public Response releaseProcessDefinition(String projectName, String processName, Integer processId,
                  Integer releaseState) {
   
        try {
   
            String postURL = address + "/dolphinscheduler/projects/"
                   + URLEncoder.encode(projectName, "utf-8") + "/process/release";
            PostMethod postMethod = new PostMethod(postURL);
            postMethod.setRequestHeader("Content-Type",
                    "application/x-www-form-urlencoded;charset=utf-8");
            postMethod.setRequestHeader("token", token);
            // 参数设置,需要注意的就是里边不能传NULL,要传空字符串
            NameValuePair[] data = {
   new NameValuePair("name", processName),
                    new NameValuePair("processId", processId.toString()),
                    new NameValuePair("releaseState", releaseState.toString())};
            postMethod.setRequestBody(data);
            org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
            httpClient.executeMethod(postMethod);
            JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
            if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
   
                return Response.error(result.getInteger("code"), result.getString("msg"));
            }
        } catch (Exception e) {
   
            log.info("请求异常:{}", e);
        }
        return Response.success();
    }

2.更新可拖拽模型spark任务

image.png
image.png

请求参数举例:

{
   
  "name": "测试任务模型1",
  "describeInfo": "测试任务模型1",
  "projectName": "spark线性回归模型",
"id": 111,
  "task": {
   
    "edges": [
      {
   
        "id": "13889c99",
        "index": 2,
        "source": "356bc2be",
        "target": "33838a0d",
        "sourceAnchor": 2,
        "targetAnchor": 0
      }
    ],
    "nodes": [
      {
   
        "x": 482.671875,
        "y": 89.125,
        "id": "356bc2be",
        "size": "72*72",
        "type": "node",
        "color": "#1890FF",
        "index": 0,
        "label": "数据源",
        "shape": "flow-circle",
        "config": {
   
          "sourceType": "mysql",
          "targetTable": "machine_learning_house_info2"
        },
        "inputs": [],
        "outputs": [],
        "modelType": "dataSource"
      },
      {
   
        "x": 502.171875,
        "y": 269.125,
        "id": "33838a0d",
        "size": "110*42",
        "type": "node",
        "color": "#66C35D",
        "index": 1,
        "label": "全表统计",
        "shape": "flow-capsule",
        "config": {
   
          "selectColumns": "*"
        },
        "inputs": [],
        "outputs": [],
        "modelType": "fullTableStatistics"
      }
    ]
  }
}

image.png

代码:

/**
     * 更新任务
     * @param request request
     * @param vo 任务参数
     * @author liudz
     * @date 2021/5/8
     * @return 执行结果
     **/
    @PutMapping("/project/process")
    @Transactional(rollbackFor = Exception.class)
    public Response updateDragSparkTask(HttpServletRequest request, @RequestBody DragSparkTaskVo vo) {
   
        Long userId = Long.valueOf(request.getUserPrincipal().getName());
        vo.setCreateId(userId);
        vo.setUpdateId(userId);
        if (vo == null || org.apache.commons.lang3.StringUtils.isBlank(vo.getName()) || vo.getDescribeInfo() == null
            || vo.getCreateId() == null) {
   
            log.error("--DragSparkTaskController--updateDragSparkTask--PARAM_ERROR!--");
            return Response.error(Msg.PARAM_ERROR);
        }
        Response<DragSparkTaskVo> dragSparkTaskResponse = dragSparkTaskService.updateDragSparkTask(vo);
        log.info("--(1)updateDragSparkTask--mysql--success");
        if (dragSparkTaskResponse.getCode() == SUCCESS) {
   
            //获取用户下唯一工作流ID
            DolphinschedulerResponse processInfoList = getUserProcess(vo.getProjectName());
            JSONObject processJson = new JSONObject();
            log.info("--(2)getUserProcess--success:{}", processInfoList);
            List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();
            for (Map<String, Object> map : list) {
   
                if (map.get("name").equals(userId + "-dragSparkTask")) {
   
                    processJson.fluentPutAll(map);
                }
            }
            ProcessDto processDto = packageProcessParam(
                    "update", userId + "-dragSparkTask", vo, processJson);
            processDto.setId(processJson.getInteger("id"));
            log.info("--(3)packageProcessParam--success");
            if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {
   
                releaseProcessDefinition(vo.getProjectName(), userId + "-dragSparkTask",
                        processDto.getId(), 0);
                log.info("--(4)releaseProcessDefinition--OFFLINE--success");
            }
            return updateProcess(vo, processDto);
        }
        return dragSparkTaskResponse;
    }
/**
     * packageProcessParamOfUpdate
     * @param processDto 工作流参数
     * @param vo 任务参数
     * @param processJson 工作流json
     * @author liudz
     * @date 2021/5/7
     * @return ProcessDto
     **/
    public ProcessDto packageProcessParamOfUpdate(ProcessDto processDto, JSONObject processJson, DragSparkTaskVo vo) {
   
        processDto.setLocations(processJson.getString("locations"));
        processDto.setId(processJson.getInteger("id"));
        JSONArray jsonTasksArray = processJson.getJSONObject("processDefinitionJson").getJSONArray("tasks");
        JSONArray copyJsonTasksArray = new JSONArray();
        copyJsonTasksArray.addAll(jsonTasksArray);
        JSONObject processDefinitionJson = new JSONObject();
        JSONArray nodesArray = parseLineAndNode(vo.getTask());
        JSONObject json = new JSONObject();
        json.put("id", vo.getId());
        json.put("name", vo.getName());
        json.put("describeInfo", vo.getDescribeInfo());
        json.put("nodes", nodesArray);
        String taskJsonString = json.toString().replace("}}", "} }").replace("{
   {", "{ {");
        for (Object object : jsonTasksArray) {
   
            JSONObject jsonObject = JSONObject.parseObject(object.toString());
            if (Long.valueOf(jsonObject.getString("id").substring(SIX)) == vo.getId()) {
   
                String mainArgs = jsonObject.getString("mainArgs");
                mainArgs = "\"" + taskJsonString.replace("\"", "\\\"") + "\"";
                copyJsonTasksArray.remove(jsonObject);
                jsonObject.getJSONObject("params").put("mainArgs", mainArgs);
                copyJsonTasksArray.add(jsonObject);
                processDefinitionJson = processJson.getJSONObject("processDefinitionJson");
                processDefinitionJson.put("tasks", copyJsonTasksArray);
            }
        }
        processDto.setProcessDefinitionJson(processDefinitionJson.toString());
        return processDto;
    }
 /**
     * 更新工作流
     * @param vo vo
     * @param processDto processDto
     * @author liudz
     * @date 2021/5/7
     * @return 执行结果
     **/
    public Response updateProcess(DragSparkTaskVo vo, ProcessDto processDto) {
   
        try {
   

            String postURL = address + "/dolphinscheduler/projects/"
                   + URLEncoder.encode(vo.getProjectName(), "utf-8") + "/process/update";
            PostMethod postMethod = new PostMethod(postURL);
            postMethod.setRequestHeader("Content-Type",
                    "application/x-www-form-urlencoded;charset=utf-8");
            postMethod.setRequestHeader("token", token);
            // 参数设置,需要注意的就是里边不能传NULL,要传空字符串
            NameValuePair[] data = {
   new NameValuePair("connects", processDto.getConnects()),
                new NameValuePair("name", processDto.getName()),
                new NameValuePair("locations", processDto.getLocations()),
                new NameValuePair("id", processDto.getId().toString()),
                new NameValuePair("processDefinitionJson", processDto.getProcessDefinitionJson())};
            postMethod.setRequestBody(data);
            org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
            httpClient.executeMethod(postMethod);
            JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
            log.info("--(5)httpUpdateProcess:{}", result);
            if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
   
                return Response.error(result.getInteger("code"), result.getString("msg"));
            }
        } catch (Exception e) {
   
            log.info("请求异常:{}", e);
        }
        return Response.success();
    }

3.删除可拖拽模型spark任务

image.png

代码:

/**
     * 删除任务
     * @param request request
     * @param projectName 项目名称
     * @param id 任务ID
     * @author liudz
     * @date 2021/5/8
     * @return 执行结果
     **/
    @DeleteMapping("/project/process/{projectName}/{id}")
    @Transactional(rollbackFor = Exception.class)
    public Response deleteDragSparkTask(HttpServletRequest request, @PathVariable("projectName") String projectName,
                                        @PathVariable("id") Long id) {
   
        DragSparkTaskVo vo = new DragSparkTaskVo();
        Long userId = Long.valueOf(request.getUserPrincipal().getName());
        vo.setId(id);
        vo.setCreateId(userId);
        vo.setProjectName(projectName);
        if (vo == null || vo.getId() == null || vo.getCreateId() == null) {
   
            log.error("--deleteDragSparkTask--PARAM_ERROR!--");
            return Response.error(Msg.PARAM_ERROR);
        }
        Response<DragSparkTaskVo> dragSparkTaskResponse = dragSparkTaskService.deleteDragSparkTask(vo);
        log.info("--(1)deleteDragSparkTask--mysql--success");
        if (dragSparkTaskResponse.getCode() == SUCCESS) {
   
            //获取用户下唯一工作流ID
            DolphinschedulerResponse processInfoList = getUserProcess(vo.getProjectName());
            JSONObject processJson = new JSONObject();
            log.info("--(2)getUserProcess--success:{}", processInfoList);
            List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();
            for (Map<String, Object> map : list) {
   
                if (map.get("name").equals(userId + "-dragSparkTask")) {
   
                    processJson.fluentPutAll(map);
                }
            }
            ProcessDto processDto = packageProcessParam(
                    "delete", userId + "-dragSparkTask", vo, processJson);
            processDto.setId(processJson.getInteger("id"));
            log.info("--(3)packageProcessParam--success");
            if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {
   
                releaseProcessDefinition(vo.getProjectName(), userId + "-dragSparkTask",
                        processDto.getId(), 0);
                log.info("--(4)releaseProcessDefinition--OFFLINE--success");
            }
            if (JSONObject.parseObject(processDto.getLocations()).keySet().size() == 0) {
   
                //删除工作流
                deleteProcess(vo, processDto);
            } else {
   
                //更新工作流
                updateProcess(vo, processDto);
            }
        }
        return dragSparkTaskResponse;
    }
/**
     * 删除工作流
     * @param vo vo
     * @param processDto processDto
     * @author liudz
     * @date 2021/5/7
     * @return 执行结果
     **/
    public DolphinschedulerResponse deleteProcess(DragSparkTaskVo vo, ProcessDto processDto) {
   
            HttpHeaders headers = new HttpHeaders();
            headers.set("token", token);
            headers.set("Content-Type", "application/json");
            HttpEntity requestEntity = new HttpEntity(headers);
            ResponseEntity<DolphinschedulerResponse> response =
                    restTemplate.exchange(address + "/dolphinscheduler/projects/" + vo.getProjectName()
                                   + "/process/delete?processDefinitionId=" + processDto.getId(),
                            HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
            log.info("--(5)httpDeleteProcess:{}", response);
        return response.getBody();
    }

三、本人相关其他文章链接

1.springboot项目集成dolphinscheduler调度器 可拖拽spark任务管理:
https://blog.csdn.net/a924382407/article/details/117119831

2.springboot项目集成dolphinscheduler调度器 实现datax数据同步任务:
https://blog.csdn.net/a924382407/article/details/120951230

3.springboot项目集成dolphinscheduler调度器 项目管理:
https://blog.csdn.net/a924382407/article/details/117118931

4.springboot项目集成大数据第三方dolphinscheduler调度器 执行/停止任务
https://blog.csdn.net/a924382407/article/details/117121181

5.springboot项目集成大数据第三方dolphinscheduler调度器
https://blog.csdn.net/a924382407/article/details/117113848
image.png

重要信息

image.png
image.png

目录
相关文章
|
8月前
|
JSON 分布式计算 大数据
springboot项目集成大数据第三方dolphinscheduler调度器
springboot项目集成大数据第三方dolphinscheduler调度器
475 3
|
8月前
|
Java 关系型数据库 数据库连接
Spring Boot项目集成MyBatis Plus操作PostgreSQL全解析
集成 Spring Boot、PostgreSQL 和 MyBatis Plus 的步骤与 MyBatis 类似,只不过在 MyBatis Plus 中提供了更多的便利功能,如自动生成 SQL、分页查询、Wrapper 查询等。
777 3
|
8月前
|
Java 测试技术 Spring
简单学Spring Boot | 博客项目的测试
本内容介绍了基于Spring Boot的博客项目测试实践,重点在于通过测试驱动开发(TDD)优化服务层代码,提升代码质量和功能可靠性。案例详细展示了如何为PostService类编写测试用例、运行测试并根据反馈优化功能代码,包括两次优化过程。通过TDD流程,确保每项功能经过严格验证,增强代码可维护性与系统稳定性。
323 0
|
8月前
|
存储 Java 数据库连接
简单学Spring Boot | 博客项目的三层架构重构
本案例通过采用三层架构(数据访问层、业务逻辑层、表现层)重构项目,解决了集中式开发导致的代码臃肿问题。各层职责清晰,结合依赖注入实现解耦,提升了系统的可维护性、可测试性和可扩展性,为后续接入真实数据库奠定基础。
617 0
|
分布式计算 大数据 Java
springboot项目集成大数据第三方dolphinscheduler调度器 执行/停止任务
springboot项目集成大数据第三方dolphinscheduler调度器 执行/停止任务
187 0
|
5月前
|
JavaScript Java 关系型数据库
基于springboot的项目管理系统
本文探讨项目管理系统在现代企业中的应用与实现,分析其研究背景、意义及现状,阐述基于SSM、Java、MySQL和Vue等技术构建系统的关键方法,展现其在提升管理效率、协同水平与风险管控方面的价值。
|
5月前
|
搜索推荐 JavaScript Java
基于springboot的儿童家长教育能力提升学习系统
本系统聚焦儿童家长教育能力提升,针对家庭教育中理念混乱、时间不足、个性化服务缺失等问题,构建科学、系统、个性化的在线学习平台。融合Spring Boot、Vue等先进技术,整合优质教育资源,提供高效便捷的学习路径,助力家长掌握科学育儿方法,促进儿童全面健康发展,推动家庭和谐与社会进步。
|
5月前
|
JavaScript Java 关系型数据库
基于springboot的古树名木保护管理系统
本研究针对古树保护面临的严峻挑战,构建基于Java、Vue、MySQL与Spring Boot技术的信息化管理系统,实现古树资源的动态监测、数据管理与科学保护,推动生态、文化与经济可持续发展。