Cromwell 工作流引擎支持

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
对象存储 OSS,20GB 3个月
简介: -

Cromwell 是 Broad Institute 开发的工作流管理系统,当前已获得阿里云批量计算服务的支持。通过 Cromwell 可以将 WDL 描述的 workflow 转化为批量计算的作业(Job)运行。用户将为作业运行时实际消耗的计算和存储资源付费,不需要支付资源之外的附加费用。本文将介绍如何使用 Cromwell 在阿里云批量计算服务上运行工作流。

1. 准备工作

A) 开通批量计算服务

要使用批量计算服务,请根据官方文档里面的指导开通批量计算和其依赖的相关服务,如OSS等。

注意:创建 OSS Bucket 的区域,需要和使用批量计算的区域一致。

B) 下载 Cromwell

Cromwell 官方下载

注意:为了确保所有的特性可用,建议下载45及之后的最新版本。

C) 开通 ECS 作为 Cromwell server

当前批量计算提供了 Cromwell server 的 ECS 镜像,用户可以用此镜像开通一台 ESC 作为 server。镜像中提供了 Cromwell 官网要求的基本配置和常用软件。在此镜像中,Cromwell 的工作目录位于/home/cromwell,上一步下载的 Crowwell jar 包可以放置在 /home/cromwell/cromwell 目录下。

注意:用户也可以自己按照 Cromwell 官方的要求自己搭建 Cromwell server, 上面的镜像只是提供了方便的方式,不是强制要求。

2. 使用 Cromwell

配置文件

Cromwell 运行的配置文件,包括:

  • Cromwell 公共配置。
  • 批量计算相关配置,包含了批量计算作为后端需要的存储、计算等资源配置。

关于配置参数的详细介绍请参考 Cromwell 官方文档。如下是一个批量计算配置文件的例子 bcs.conf:

include required(classpath("application"))
database {
   
  profile = "slick.jdbc.MySQLProfile$"
  db {
   
    driver = "com.mysql.jdbc.Driver"
    url = "jdbc:mysql://localhost/db_cromwell?rewriteBatchedStatements=true&useSSL=false&allowPublicKeyRetrieval=true"
    user = "user_cromwell"
    #Your mysql password
    password = ""
    connectionTimeout = 5000
  }
}
workflow-options {
   
    workflow-log-dir = "/home/cromwell/cromwell/logs/"
}
call-caching {
   
  # Allows re-use of existing results for jobs you've already run
  # (default: false)
  enabled = false
  # Whether to invalidate a cache result forever if we cannot reuse them. Disable this if you expect some cache copies
  # to fail for external reasons which should not invalidate the cache (e.g. auth differences between users):
  # (default: true)
  invalidate-bad-cache-results = true
}
docker {
   
  hash-lookup {
   
    enabled = false
    # Set this to match your available quota against the Google Container Engine API
    #gcr-api-queries-per-100-seconds = 1000
    # Time in minutes before an entry expires from the docker hashes cache and needs to be fetched again
    #cache-entry-ttl = "20 minutes"
    # Maximum number of elements to be kept in the cache. If the limit is reached, old elements will be removed from the cache
    #cache-size = 200
    # How should docker hashes be looked up. Possible values are "local" and "remote"
    # "local": Lookup hashes on the local docker daemon using the cli
    # "remote": Lookup hashes on docker hub and gcr
    method = "remote"
    #method = "local"
    alibabacloudcr {
   
      num-threads = 5
      #aliyun CR credentials
      auth {
   
    #endpoint = "cr.cn-shanghai.aliyuncs.com"
        access-id = ""
        access-key = ""
      }
    }
  }
}
engine {
   
  filesystems {
   
    oss {
   
      auth {
   
        endpoint = "oss-cn-shanghai.aliyuncs.com"
        access-id = ""
        access-key = ""
      }
    }
  }
}
backend {
   
  default = "BCS"
  providers {
   
    BCS {
   
      actor-factory = "cromwell.backend.impl.bcs.BcsBackendLifecycleActorFactory"
      config {
   
        root = "oss://your-bucket/cromwell_dir"
        region = "cn-shanghai"
        access-id = ""
        access-key = ""
        filesystems {
   
          oss {
   
            auth {
   
              endpoint = "oss-cn-shanghai.aliyuncs.com"
              access-id = ""
              access-key = ""
            }
        caching {
   
              # When a cache hit is found, the following duplication strategy will be followed to use the cached outputs
              # Possible values: "copy", "reference". Defaults to "copy"
              # "copy": Copy the output files
              # "reference": DO NOT copy the output files but point to the original output files instead.
              #              Will still make sure than all the original output files exist and are accessible before
              #              going forward with the cache hit.
              duplication-strategy = "reference"
            }
          }
        }
        default-runtime-attributes {
   
          failOnStderr: false
          continueOnReturnCode: 0
          autoReleaseJob: true
          cluster: "OnDemand ecs.sn1.medium img-ubuntu-vpc"
          #cluster: cls-6kihku8blloidu3s1t0006
          vpc: "192.168.0.0/16"
        } 
      }
    }
  }
}

如果使用前面章节中的镜像开通 ECS 作为 Cromwell server,配置文件位于 /home/cromwell/cromwell/bcs_sample.conf,只需要填写自己的配置即可使用 Cromwell。

*注意:Cromwell 可以在公网环境(如本地服务器、配置了公网 IP 的阿里云 ECS 等)运行,也可以在阿里云 VPC 环境下运行。在 VPC 环境下使用时,有如下几处要修改为 VPC 内网下的配置:

  • OSS 的内网 endpoint :

    • engine.filesystems.oss.auth.endpoint = "oss-cn-shanghai-internal.aliyuncs.com"
    • backend.providers.BCS.config.filesystems.oss.auth.endpoint ="oss-cn-shanghai-internal.aliyuncs.com"
  • 添加批量计算的内网 endpoint:

    • backend.providers.BCS.config.user-defined-region = "cn-shanghai-vpc"
    • backend.providers.BCS.config.user-defined-domain = "batchcompute-vpc.cn-shanghai.aliyuncs.com"
  • 添加容器镜像服务的内网 endpoint:

    • docker.hash-lookup.alibabacloudcr.auth.endpoint = "cr-vpc.cn-shanghai.aliyuncs.com"*

运行模式

Cromwell支持两种模式

  • run 模式
  • server 模式

关于两种模式的详细描述,请参考 Cromwell 官网文档。下面重点介绍这两种模式下如何使用批量计算。

A) run模式

run模式适用于本地运行一个单独的 WDL 文件描述的工作流,命令行如下:java -Dconfig.file=bcs.conf -jar cromwell.jar run echo.wdl --inputs echo.inputs

  • WDL 文件:描述详细的工作流。工作流中每个 task 对应批量计算的一个作业(Job)。
  • inputs文件:是 WDL 中定义的工作流的输入信息inputs 文件是用来描述 WDL 文件中定义的工作流及其 task 的输入文件。如下所示:
    {
         
    "workflow_name.task_name.input1": "xxxxxx"
    }
    
    运行成功后,WDL 文件中描述的工作流中的一个 task 会作为批量计算的一个作业(Job)来提交。此时登录批量计算的控制台就可以看到当前的 Job 状态。

image.png

当 workflow 中所有的 task 对应的作业运行完成后,工作流运行完成。

B) server 模式

启动 server

相比 run 模式一次运行只能处理一个 WDL 文件,server 模式可以并行处理多个 WDL 文件。关于 server 模式的更多信息,请参考 Cromwell 官方文档。可以采用如下命令行启动 server:java -Dconfig.file=bsc.conf -jar cromwell.jar serverserver 启动成功后,就可以接收来自 client 的工作流处理请求。下面分别介绍如何使用 API 和 CLI 的方式向 server 提交工作流。

使用 API 提交工作流

server 启动后,可以通过浏览器访问 Cromwell Server,比如 Server 的 IP 为39.105.xxx.yyy,则在浏览器中输入http://39.105.xxx.yyy:8000,通过如下图所示的界面提交任务:

image.png

更多API接口及用法,请参考 Cromwell 官网文档

使用 CLI 提交工作流[推荐]

除了可以使用 API 提交工作流以外,Cromwell 官方还提供了一个开源的 CLI 命令行工具 widder。可以使用如下的命令提交一个工作流:

python widdler.py run echo.wdl echo.inputs -o bcs_workflow_tag:tagxxx -S localhost

其中-o key:value是用于设置option,批量计算提供了 bcs_workflow_tag:tagxxx选项,用于配置作业输出目录的tag(下一节查看运行结果中会介绍)。

如果使用前面章节中的镜像开通 ECS 作为 Cromwell server,镜像中已经安装了 widdler,位于 /home/cromwell/widdler。可以使用如下的命令提交工作流:

widdler run echo.wdl echo.inputs -o bcs_workflow_tag:tagxxx -S localhost

更多命令用法可使用widdler -h命令查看,或参考官方文档

3. 查看运行结果

工作流运行结束后,输出结果被上传到了配置文件或 WDL 中定义的 OSS 路径下。在OSS路径上面的目录结构如下:

image.png

如上图所示,在配置文件中的config.root目录下有如下输出目录:

  • 第一层:workflowname 工作流的名称
  • 第二层:通过上一节中 CLI 命令的-o设置的目录tag
  • 第三层:workflow id,每次运行会生成一个
  • 第四层:workflow 中每个 task 的运行输出,比如上图中的 workflow 15e45adf-6dc7-4727-850c-89545faf81b0 有两个 task,每个task对应的目录命名是call-taskname,目录中包含三部分内容:

    • 批量计算的日志,包括 bcs-stdout 和 bcs-stderr
    • 当前 task 的输出,比如图中的 output1/output2 等
    • 当前 task 执行的 stdout 和 stderr

4. 使用建议

在使用过程中,关于 BCS 的配置,有如下的建议供参考:

使用集群

批量计算提供了两种使用集群的方式:

  • 自动集群
  • 固定集群

A) 自动集群

在config配置文件中指定默认的资源类型、实例类型以及镜像类型,在提交批量计算 Job 时就会使用这些配置自动创建集群,比如:

default-runtime-attributes {
   
        cluster : "OnDemand ecs.sn1ne.large img-ubuntu-vpc"
      }

如果在某些 workflow 中不使用默认集群配置,也可以通过inputs文件中指定 workflow 中某个 task 的对应的批量计算的集群配置(将 cluster_config 作为 task 的一个输入),比如:

{
   
      "workflow_name.task_name.cluster_config": "OnDemand ecs.sn2ne.8xlarge img-ubuntu-vpc"
}

然后在 task 中重新设置运行配置:

task task_demo {
   
    String cluster_config
    runtime {
   
        cluster: cluster_config
  }
}

就会覆盖默认配置,使用新的配置信息创建集群。
####B) 固定集群
使用自动集群时,需要创建新集群,会有一个等待集群的时间。如果对于启动时间有要求,或者有了大量的作业提交,可以考虑使用固定集群。比如:

default-runtime-attributes {
   
        cluster : "cls-xxxxxxxxxx"
      }

注意:使用固定集群时,如果使用完毕,请及时释放集群,否则集群中的实例会持续收费。

Cromwell Server 配置建议

  • 大压力作业时,建议使用较高配置的机器作为 Cromwell Server,比如ecs.sn1ne.8xlarge等32核64GB的机器。
  • 大压力作业时,修改 Cromwell Server 的最大打开文件数。比如在ubuntu下可以通过修改/etc/security/limits.conf文件,比如修改最大文件数为100万:
    ```js
    root soft nofile 1000000
    root hard nofile 1000000
  • soft nofile 1000000
  • hard nofile 1000000
    ```
  • 确认 Cromwell Server 有配置数据库,防止作业信息丢失。
  • 设置 bcs.conf 里面的并发作业数,比如 system.max-concurrent-workflows = 1000

开通批量计算相关配额

如果有大压力场景,可能需要联系批量计算服务开通对应的配额,比如:

  • 一个用户所有作业的数量(包括完成的、运行的、等待的等多种状态下);
  • 同时运行的作业的集群的数量(包括固定集群和自动集群);

使用 NAS

使用 NAS 时要注意以下几点:

  • NAS 必须在 VPC 内使用,要求添加挂载点时,必须指定 VPC;
  • 所以要求在 runtime 中必须包含:

    • VPC 信息
    • mounts 信息

下面的例子可供参考:

runtime {
   
    cluster: cluster_config
    mounts: "nas://1f****04-xkv88.cn-beijing.nas.aliyuncs.com:/ /mnt/ true"
    vpc: "192.168.0.0/16 vpc-2zexxxxxxxx1hxirm"
  }

高级特性支持

Glob

Cromwell 支持使用 glob 来指定工作流中多个文件作为 task 的输出,比如:

task globber {
   
  command <<<
    for i in `seq 1 5`
    do
      mkdir out-$i
      echo globbing is my number $i best hobby  out-$i/$i.txt
    done
  >>>
  output {
   
    Array[File] outFiles = glob("out-*/*.txt")
  }
}
workflow test {
   
  call globber
}

当 task 执行结束时,通过 glob 指定的多个文件会作为输出,上传到 OSS 上。

Call Caching

Call Caching 是 Cromwell 提供的高级特性,如果检测到工作流中某个 task (对应一个批量计算的 job )和之前已经执行过的某个 task 具有相同的输入和运行时等条件,则不需要再执行,直接取之前的运行结果,这样可以为客户节省时间和费用。一个常见的场景是如果一个工作流有 n 个 task,当执行到中间某一个 task 时由于某些原因失败了,排除了错误之后,再次提交这个工作流运行后,Cromwell 判断如果满足条件,则已经完成的几个 task 不需要重新执行,只需要从出错的 task 开始继续运行。

配置 Call Caching

要在 BCS 后端情况下使用 Call Caching 特性,需要如下配置项:

database {
   
  profile = "slick.jdbc.MySQLProfile$"
  db {
   
    driver = "com.mysql.jdbc.Driver"
    url = "jdbc:mysql://localhost/db_cromwell?rewriteBatchedStatements=true&useSSL=false"
    user = "user_cromwell"
    password = "xxxxx"
    connectionTimeout = 5000
  }
}
call-caching {
   
  # Allows re-use of existing results for jobs you have already run
  # (default: false)
  enabled = true
  # Whether to invalidate a cache result forever if we cannot reuse them. Disable this if you expect some cache copies
  # to fail for external reasons which should not invalidate the cache (e.g. auth differences between users):
  # (default: true)
  invalidate-bad-cache-results = true
}
docker {
   
  hash-lookup {
   
    enabled = true
    # How should docker hashes be looked up. Possible values are local and remote
    # local: Lookup hashes on the local docker daemon using the cli
    # remote: Lookup hashes on alibab cloud Container Registry
    method = remote
    alibabacloudcr {
   
      num-threads = 10
      auth {
   
        access-id = "xxxx"
        access-key = "yyyy"
      }
    }
  }
}
engine {
   
  filesystems {
   
    oss {
   
      auth {
   
        endpoint = "oss-cn-shanghai.aliyuncs.com"
        access-id = "xxxx"
        access-key = "yyyy"
      }
    }
  }
}
backend {
   
  default = "BCS"
  providers {
   
    BCS {
   
      actor-factory = "cromwell.backend.impl.bcs.BcsBackendLifecycleActorFactory"
      config {
   
        #其他配置省略
        filesystems {
   
          oss {
   
            auth {
   
              endpoint = "oss-cn-shanghai.aliyuncs.com"
              access-id = "xxxx"
              access-key = "yyyy"
            }
            caching {
   
              # When a cache hit is found, the following duplication strategy will be followed to use the cached outputs
              # Possible values: copy, reference. Defaults to copy
              # copy: Copy the output files
              # reference: DO NOT copy the output files but point to the original output files instead.
              #              Will still make sure than all the original output files exist and are accessible before
              #              going forward with the cache hit.
              duplication-strategy = "reference"
            }
          }
        }
        default-runtime-attributes {
   
          failOnStderr: false
          continueOnReturnCode: 0
          cluster: "OnDemand ecs.sn1.medium img-ubuntu-vpc"
          vpc: "192.168.0.0/16"
        }
      }
    }
  }
}
  • database 配置:Cromwell 将 workflow 的执行元数据存储在数据库中,所以需要添加数据库配置,详细情况参考Cromwell 官网指导
  • call-caching 配置:Call Caching 的开关配置等;
  • docker.hash-lookup 配置: 设置 Hash 查找开关及阿里云 CR 等信息,用于查找镜像的 Hash 值。
  • backend.providers.BCS.config.filesystems.oss.caching 配置:设置 Call Caching命中后,使用原来输出的方式,批量计算在这里支持 reference 模式,不需要拷贝原有的结果,节省时间和成本。

命中条件

使用批量计算作为后端时,Cromwell 通过如下条件判断一个 task 是否需要重新执行:

条件

解释
inputs task 的输入,比如 OSS 上的样本文件
continueOnReturnCode 公共运行时参数,可以继续执行的返回码
docker 公共运行时参数,后端的Docker配置
failOnStderr 公共运行时参数,stderr非空时是否失败
imageId 批量计算后端运行时参数,标识作业运行的 ECS 镜像,如果使用的官方镜像如img-ubuntu-vpc可不用填写此项
userData 批量计算后端,用户自定义数据

如果一个 task 的上述参数未发生改变,Cromwell 会判定为不需要执行的 task,直接获取上次执行的结果,并继续工作流的执行。

本文作者:阿里云批量计算团队

目录
相关文章
|
XML 数据可视化 Java
非常轻量、高性能、可集成、可扩展的流程引擎compileflow
compileflow Process引擎是淘宝工作流TBBPM引擎之一,是专注于纯内存执行,无状态的流程引擎,通过将流程文件转换生成java代码编译执行,简洁高效。当前是阿里业务中台交易等多个核心系统的流程引擎。
|
7月前
|
XML JavaScript 前端开发
基于jeecgboot的flowable流程支持服务任务的功能
基于jeecgboot的flowable流程支持服务任务的功能
194 0
|
7月前
|
人工智能 供应链 监控
推荐一款TinyEngine低代码引擎!支持自定义DSL 生成定制的源码、跨技术栈!
推荐一款TinyEngine低代码引擎!支持自定义DSL 生成定制的源码、跨技术栈!
141 0
|
监控 数据可视化 Java
灵活轻巧的工作流引擎:探索 Flowable
在现代的企业环境中,业务流程的自动化和优化变得愈发重要。Flowable,作为一款开源、灵活、轻巧的工作流引擎,为企业提供了一种高效的方式来管理和执行各种业务流程。本文将为您详细介绍 Flowable 的核心概念、特性以及在业务流程管理中的应用。
500 0
|
监控 数据可视化 Java
高效流程引擎:深入探索 Activiti 工作流引擎
在现代的企业环境中,业务流程的自动化和优化变得越来越重要。Activiti,作为一款轻量级、可嵌入的工作流引擎,为企业提供了一种高效的方式来管理和执行各种业务流程。本文将为您详细介绍 Activiti 的核心概念、特性以及在业务流程管理中的应用。
575 0
|
存储 数据可视化 Java
Kstry流程编排框架
Kstry是流程编排框架、组件化框架、并发框架、微服务整合框架
1253 1
Kstry流程编排框架
|
SQL 存储 消息中间件
基于 EventBridge 构建数据库应用集成
本文重点介绍 EventBridge 的新特性:数据库 Sink 事件目标。
250 0
基于 EventBridge  构建数据库应用集成
|
XML 存储 供应链
[微服务]BPMN和微服务编排,流程语言,引擎和永恒模式(第1部分)
[微服务]BPMN和微服务编排,流程语言,引擎和永恒模式(第1部分)
|
XML 存储 物联网
OneCode低代码引擎-流程引擎白皮书
在低代码应用中,应用比例非常高的一种应用便是以流程+表单驱动为模型的各种审批类引用。但流程在低代码平台中的应用绝不是简简单单的流程+表单的模型。而是站在更高的层次上在自然时间轴为基础的维度上,将事件、数据、响应、人工交互等因素进行特定场景下的编排逻辑处理。
OneCode低代码引擎-流程引擎白皮书
|
新零售 人工智能 应用服务中间件
flowable工作流选型对比
flowable工作流选型对比
700 0
flowable工作流选型对比