开发者社区> 问答> 正文

批量计算如何实现Cromwell 工作流管理系统?

批量计算如何实现Cromwell 工作流管理系统?

展开
收起
小天使爱美 2020-03-28 20:37:56 1513 0
1 条回答
写回答
取消 提交回答
  • 背景 Cromwell 的 Call Caching 功能如何开启和关闭? 在一些场景下,提交工作流时不想使用 Call Caching,需要无条件执行,该如何设置? 工作流重新提交后,有一些 task 预期不需要重新执行,但依然执行了,Call Caching 疑似没有生效,怎么查看原因? 本篇文档将对 Call Caching 的使用做一个详细的介绍,包括功能的开启和关闭、如何通过查看元数据的方式,确认 Call Caching 未生效的原因等。

    Call Caching 设置 配置文件中设置全局 Call Caching 开关状态 如果要使用 Cromwell 的 Call Caching 功能,需要在 Server 的配置文件中设置:

    call-caching { # Allows re-use of existing results for jobs you have already run # (default: false) enabled = true # Whether to invalidate a cache result forever if we cannot reuse them. Disable this if you expect some cache copies # to fail for external reasons which should not invalidate the cache (e.g. auth differences between users): # (default: true) invalidate-bad-cache-results = true } call-caching.enabled 是 Call Caching 功能的开关,可以按照自己的需求开启和关闭。

    在 Option 中设置单个 Workflow 是否使用 Call Caching 在 Call Caching 功能全局开启的状态下,提交工作流时,可以通过携带如下两个 option 选项设置本次执行是否使用 Call Caching:

    { "write_to_cache": true, "read_from_cache": true } write_to_cache: 表示本次 workflow 执行结果是否写入 Cache,实际上就是是否给后面的工作流复用。默认是 true。 read_from_cache: 表示本次 workflow 执行是否从 Cache 中读取之前的结果,也就是是否复用以前的结果,默认是 true,如果设置为 false,表示本次执行不使用 Call Caching,强制执行。 查看元数据 工作流执行时,每一个 task 的每一个 call(对应批量计算的一个作业)都会有 metadata,记录了这个步骤的运行过程,当然也包括 Call Caching 的详细信息,通过下面的命令可以查询一个工作流的 metadata:

    widdler query -m [WorkflowId] 在元数据信息中找到对应的 task 的详细信息,比如:

    { "callRoot": "oss://gene-test/cromwell_test/GATK4_VariantDiscovery_pipeline_hg38/53cfd3fc-e9d5-4431-83ec-be6c51ab9365/call-HaplotypeCaller/shard-10", "inputs": { "gatk_path": "/gatk/gatk", "ref_fasta": "oss://genomics-public-data-shanghai/broad-references/hg38/v0/Homo_sapiens_assembly38.fasta", "cluster_config": "OnDemand ecs.sn2ne.xlarge img-ubuntu-vpc", "input_bam_index": "oss://gene-test/cromwell_test/GATK4_VariantDiscovery_pipeline_hg38/cf55a2d1-572c-4490-8edf-07656802a79b/call-GatherBamFiles/NA12878.hg38.ready.bam.bai", "output_filename": "NA12878.hg38.vcf.gz", "contamination": null, "ref_fasta_index": "oss://genomics-public-data-shanghai/broad-references/hg38/v0/Homo_sapiens_assembly38.fasta.fai", "ref_dict": "oss://genomics-public-data-shanghai/broad-references/hg38/v0/Homo_sapiens_assembly38.dict", "interval_list": "/home/data/GATK_human_genome_resource_bundle/hg38_from_GCP/hg38_wgs_scattered_calling_intervals/temp_0047_of_50/scattered.interval_list", "input_bam": "oss://gene-test/cromwell_test/GATK4_VariantDiscovery_pipeline_hg38/cf55a2d1-572c-4490-8edf-07656802a79b/call-GatherBamFiles/NA12878.hg38.ready.bam.bam", "docker_image": "registry.cn-shanghai.aliyuncs.com/wgs_poc/poc:4.0.10.1" }, "returnCode": 0, "callCaching": { "allowResultReuse": true, "hashes": { "output expression": { "File output_vcf_index": "A162250CB6F52CC32CB75F5C5793E8BB", "File output_vcf": "7FD061EEA1D3C63912D7B5FB1F3C5218" }, "runtime attribute": { "userData": "N/A", "docker": "F323AFFA030FBB5B352C60BD7D615255", "failOnStderr": "68934A3E9455FA72420237EB05902327", "imageId": "N/A", "continueOnReturnCode": "CFCD208495D565EF66E7DFF9F98764DA" }, "output count": "C81E728D9D4C2F636F067F89CC14862C", "input count": "D3D9446802A44259755D38E6D163E820", "command template": "9104DF40289AB292A52C2A753FBF58D2", "input": { "File interval_list": "04dc2cb895d13a40657d5e2aa7d31e8c", "String output_filename": "2B77B986117FC94D088273AD4D592964", "File ref_fasta": "9A513FB0533F04ED87AE9CB6281DC19B-400", "File input_bam_index": "D7CA83047E1B6B8269DF095F637621FE-1", "String gatk_path": "EB83BBB666B0660B076106408FFC0A9B", "String docker_image": "0981A914F6271269D58AA49FD18A6C13", "String cluster_config": "B4563EC1789E5EB82B3076D362E6D88F", "File ref_dict": "3884C62EB0E53FA92459ED9BFF133AE6", "File input_bam": "9C0AC9A52F5640AA06A0EBCE6A97DF51-301", "File ref_fasta_index": "F76371B113734A56CDE236BC0372DE0A" }, "backend name": "AE9178757DD2A29CF80C1F5B9F34882E" }, "effectiveCallCachingMode": "ReadAndWriteCache", "hit": false, "result": "Cache Miss" }, "stderr": "oss://gene-test/cromwell_test/GATK4_VariantDiscovery_pipeline_hg38/53cfd3fc-e9d5-4431-83ec-be6c51ab9365/call-HaplotypeCaller/shard-10/stderr", "shardIndex": 10, "stdout": "oss://gene-test/cromwell_test/GATK4_VariantDiscovery_pipeline_hg38/53cfd3fc-e9d5-4431-83ec-be6c51ab9365/call-HaplotypeCaller/shard-10/stdout", "outputs": { "output_vcf": "oss://gene-test/cromwell_test/GATK4_VariantDiscovery_pipeline_hg38/53cfd3fc-e9d5-4431-83ec-be6c51ab9365/call-HaplotypeCaller/shard-10/NA12878.hg38.vcf.gz", "output_vcf_index": "oss://gene-test/cromwell_test/GATK4_VariantDiscovery_pipeline_hg38/53cfd3fc-e9d5-4431-83ec-be6c51ab9365/call-HaplotypeCaller/shard-10/NA12878.hg38.vcf.gz.tbi" }, "commandLine": "set -e\n\n /gatk/gatk --java-options "-Xmx4g -Xmx4g" \\n HaplotypeCaller \\n -R /cromwell_inputs/73a7571e/Homo_sapiens_assembly38.fasta \\n -I /cromwell_inputs/02f1b5ca/NA12878.hg38.ready.bam.bam \\n -L /home/data/GATK_human_genome_resource_bundle/hg38_from_GCP/hg38_wgs_scattered_calling_intervals/temp_0047_of_50/scattered.interval_list \\n -O NA12878.hg38.vcf.gz \\n -contamination 0", "attempt": 1, "jobId": "job-000000005DB051A800006F970001CAC8", "start": "2019-10-25T02:38:03.522Z", "backendStatus": "Finished", "runtimeAttributes": { "cluster": "Right(AutoClusterConfiguration(OnDemand,ecs.sn2ne.xlarge,img-ubuntu-vpc,None,None,None))", "continueOnReturnCode": "0", "failOnStderr": "false", "vpc": "BcsVpcConfiguration(Some(10.20.200.0/24),Some(vpc-uf61zj30k0ebuen0xi7ci))", "mounts": "BcsInputMount(Right(nas://10.20.66.4:/data/ali_yun_test/),Left(/home/data),true)", "docker": "BcsDockerWithoutPath(registry.cn-shanghai.aliyuncs.com/wgs_poc/poc:4.0.10.1)", "autoReleaseJob": "false", "maxRetries": "0" }, "executionStatus": "Done", "end": "2019-10-25T03:22:23.481Z", "executionEvents": [ { "endTime": "2019-10-25T03:22:21.626Z", "description": "RunningJob", "startTime": "2019-10-25T02:38:03.645Z" }, { "endTime": "2019-10-25T03:22:22.481Z", "description": "UpdatingCallCache", "startTime": "2019-10-25T03:22:21.626Z" }, { "endTime": "2019-10-25T02:38:03.645Z", "description": "CallCacheReading", "startTime": "2019-10-25T02:38:03.643Z" }, { "endTime": "2019-10-25T02:38:03.522Z", "description": "Pending", "startTime": "2019-10-25T02:38:03.522Z" }, { "endTime": "2019-10-25T02:38:03.542Z", "description": "WaitingForValueStore", "startTime": "2019-10-25T02:38:03.542Z" }, { "endTime": "2019-10-25T03:22:23.481Z", "description": "UpdatingJobStore", "startTime": "2019-10-25T03:22:22.481Z" }, { "endTime": "2019-10-25T02:38:03.643Z", "description": "PreparingJob", "startTime": "2019-10-25T02:38:03.542Z" }, { "endTime": "2019-10-25T02:38:03.542Z", "description": "RequestingExecutionToken", "startTime": "2019-10-25T02:38:03.522Z" } ], "backend": "BCS" } 在上面的元数据中,有一项 callCaching,主要记录了如下信息:

    allowResultReuse:是否允许其他工作流复用。 如果当前工作流设置了不允许写入 Cache,则不可以复用 如果当前工作流设置了允许写入 Cache,则只有任务执行成功,才允许复用 hashes:当前任务的输入、输出、运行时等参数的 hash 记录,用于比对两次运行条件是否一样。 effectiveCallCachingMode:Call Caching 的模式,比如是否从 Cache 中读取,或者是否写入 Cache 等。 hit:当前任务在 Cache 是否命中。 result:当前任务在 Cache 中命中的详情,比如哪个工作流的哪个 task 的哪个 shard。 综合上面的解释,我们看到实例中的这个 call, 是 GATK4_VariantDiscovery_pipeline_hg38 这个工作流的 HaplotypeCaller 这个 task 的10号 shard,Call Cache 情况如下:

    未在 Cache 中命中,完整的执行了一次 执行成功,可以允许后的流程复用 Call Caching 未生效问题排查 如果遇到不符合预期的 task,可以通过如下步骤排查原因:

    查看当前 workflow 重新执行的 task 的 Call Caching 元数据 如果当前 task 的 Call Caching 的模式是不使用Cache(可能是提交作业时设置了不使用 Call Caching 的选项),则不会去利用之前的结果,确实会强制重新执行,是符合预期的 如果当前 task 未命中 Cache,则需要查看之前的 workflow, 进一步确认未命中的原因 查看之前的 workflow 的 task 的 CalCaching 元数据,确认之前的 task 是否执行成功,是否可以复用 如果之前的 task 的 不允许复用,可能是执行失败了,或者虽然执行成功,但 Cache 模式设置的不写入 Cache,即不允许复用 如果之前的 task 允许复用,但未命中,则需要比较两次的 hash 记录,可能是由于 Call Caching 相关的参数变化引起的

    背景 Cromwell server 的启动需要以下组件配合:

    启动 Mysql 的 docker 容器作为 Crowmell 的持久化数据库,包括配置用户名,密码等 填写 Cromwell 配置文件,包括 BCS 后端配置及数据库等配置 使用 Cromwell 的 jar 包,启动 server 实际上 Cromwell 除了发布 jar 包,也会发布对应的 docker 镜像,我们可以考虑使用 docker-compose来简化以上步骤。docker-compose 是 Docker 官方的开源项目,其定位是定义和运行多个 Docker 容器的应用(Defining and running multi-container Docker applications)。

    使用docker-compose 可以将容器化的 Cromwell 和 Mysql 两个 service 拉起,作为一个应用来运行。再配合脚本来简化配置,可以将 Cromwell 的服务做成一键启停。

    开通 ECS 作为 Crowmell server 首先使用 Cromwell server 镜像开通一台 ECS,ssh 登入机器后,可以运行目录下的cromwell-server.sh,进行Cromwell Server的管理:

    ./cromwell-server.sh

    cromwell-server.sh - init cromwell config and start/stop service Usage: cromwell-server.sh [options...] [init/start/stop/status] Options: --id=STRING Access id --key=STRING Access key --root=STRING Oss root for cromwell, e.g: oss://my-bucket/cromwell/ --instance=STRING default runtime: instance type [ecs.sn1.medium] --image=STRING default runtime: image id [img-ubuntu-vpc] 第一次配置与启动服务 初次使用,需要做一些初始配置,可以使用下面的命令完成一键初始化与启动:

    ./cromwell-server.sh init --id=LTAI8xxxxx --key=vVGZVE8qUNjxxxxxxxx --root=oss://gtx-wgs-demo/cromwell/ 上面的命令完成了以下配置:

    --id: 批量计算的 Access Id --key: 批量计算的 Access Key --root: Crowmell 运行时在 OSS 上的工作根目录 --instance: Cromwell 默认运行时参数,实例类型 --image: Cromwell 默认运行时参数,镜像ID执行完以上命令后,会根据 Crowmell 配置文件模板生成配置文件,并通过 docker-compose 启动 Cromwell server,并在后台运行。 服务启动后,就可以通过镜像中的命令行工具 widdler 执行工作流的提交:

    cd /home/cromwell/cromwell/ widdler run echo.wdl inputs.json -o bcs_workflow_tag:test_echo 停止服务 使用下面的命令可以一键停止服务:

    ./cromwell-server stop 再次启动服务 在已经完成配置的情况下,使用下面的命令,可以完成服务启动:

    ./cromwell-server start 重新配置并启动服务 如果需要修改配置,在服务停止的情况下,再次使用 init 命令可以完成新配置重新启动:

    ./cromwell-server.sh init --id=LTAI8xxxxx --key=vVGZVE8qUNjxxxxxxxx --root=oss://gtx-wgs-demo/cromwell/ 使用option文件设置默认运行时参数 在 Crowmell 的配置文件中,可以设置每个 backend 的默认运行时参数 default-runtime-attibutes,也可以在提交工作流时通过 option 覆盖原有设置。

    所以如果您在提交工作流时用到了数据盘、NAS等,都可以在 option 文件中设置:

    { "default_runtime_attributes": { "vpc": "192.168.0.0/24", "autoReleaseJob": true, "mounts": "nas://1f****04-xkv88.cn-beijing.nas.aliyuncs.com:/ /mnt/ true", "dataDisk": "cloud_ssd 250 /home/mount/" }, "bcs_workflow_tag": "Tagxxx", "read_from_cache": true } 使用 widdler 命令行的 -O (大写的O)参数提交 option 文件:

    widdler run echo.wdl inputs.json -O options.json

    在 Cromwell Server 配置完成后,如何快捷的进行提交、停止工作流、流程失败后如何快速定位问题以及工作流完成后如何如何快速查看运行日志、查看工作流的 Metrics 信息、工作流产生的费用等手段,这些问题就变成了 server 运维工作的基本诉求。

    Cromwell 以 server 的方式运行后是支持通过 API 接口获取以上信息的,但是有二次开发的工作量;而 widdler 是针对 Cromwell Server API 接口开发的命令行工具,通过 widdler 命令行工具减少运维成本。

    本文主要介绍阿里云对 widdler 工具的扩展,通过 widdler 工具查询指定工作流的作业运行状态、后端引擎的运行时信息、费用查询、问题定位调查以及子任务日志查看等功能。

    1. widdler 安装 widdler 默认在阿里云批量计算提供的 Cromwell server 镜像中安装。可以直接使用,无需做安装操作。

    2. 配置 widdler 由于涉及到个人阿里云运行数据的查询,需要在使用之前设置对应账号的 AK 信息、以及后端执行引擎所部署的region信息。

    config

    命令格式:

    widdler config -i id -k key -r cn-zhangjiakou 3. 校验 WDL 提交工作流之前对 WDL 做语法校验,排除部分低级问题;减少后续提交工作流后由于低级问题导致的流程失败。

    validate

    命令格式:

    widdler validate echo.wdl inputs.json 4. 提交工作流 run

    命令格式:

    widdler run echo.wdl inputs.json -l test 其中:test 为 label,可以根据样本进行打标签,后续可以按 label 做过滤。

    1. 终止工作流 命令格式:

    widdler abort workflowId abort

    1. 获取工作流 6.1 获取工作流列表 命令格式:

    widdler query query

    其中:默认获取当前 user 7 天内的工作流信息;可以根据 user label等信息来筛选工作流信息;其他使用方法参考 help 信息。

    6.2 获取工作流 Meta 命令格式:

    widdler query workflowId 7. 获取工作流运行状态 命令格式:

    widdler describe workflowId describe

    其中:”stepName” 表示工作流的某个自步骤的名称;”status” 表示当前步骤的运行状态”成功、失败、运行中”;”progress” 表示当前步骤的进度,如”4/4” 表示当前步骤存在4个子任务全部执行完成; 若是”2/4” 则表示当前步骤存在4个任务,已经完成2个。”elapse”表示当前步骤的所有子任务执行总耗时时间(不包括机器的启动时间); “coretime”表示当前步骤所有子任务消耗的核时时间(不包括机器的启动时间)。

    命令格式:

    widdler describe workflowId -t stepName subdescribe

    shardIndex 是 Cromwell 将每个 task 的子任务按 shardIndex 做索引,对应的是批量计算的一个作业。通过该命令可以看到指定 task 对应的统计信息。

    1. 获取工作流统计信息 命令格式:

    widdler stat workflowId stat

    其中:”cpuCore” 表示当前步骤中使用对应实例的 CPU 核数,”cpuUsage” 表示当前步骤所有任务从开始到当前(若当前任务结束状态则表示从开始到结束)的 CPU 平均利用率;”memSize” 表示当前步骤中使用对应实例的内存大小,”memUsage” 表示当前步骤中所有任务从开始到当前的MEM平均利用率;”sysDisk” 表示当前步骤实例的系统盘大小(默认 40GB),”sysDiskUsage” 表示当前步骤的所有任务在当前时间点的磁盘平均利用率;”dataDisk” 表示实例的数据盘大小(默认没有),”dataDiskUsage” 表示当前步骤的所有任务在当前时间点的磁盘平均利用率。

    命令格式:

    widdler stat workflowId -t stepName substat

    查询某个步骤对应的 Metrics 信息;可能某个步骤存在多个 scatter,那么每个 scatter 运行情况如何,则可以通过本命令获取到。

    1. 获取工作流费用 命令格式:

    widdler billing workflowId newbill

    1. 获取工作流运行日志 命令格式:

    widdler log workflowId 通过 log 查询命令,可以查看工作流的实际运行情况,执行过程是否符合预期可以通过该命令做到一键查看。stdout 以及 stderr 日志小于 1MB 的,会直接在屏幕上显示出来;超过 1MB 的需要借助 OSS 工具查看。

    log

    1. 工作流问题定位 命令格式:

    widdler explain workflowId 通过该命令可以一键查询工作流失败的原因,展示出现问题的步骤,输出该步骤的对应失败任务的 stdout 以及 stderr 信息,快速排查问题。

    explain

    更多其他功能请参考 widdler 的帮助信息

    2020-03-28 21:40:48
    赞同 展开评论 打赏
问答分类:
问答标签:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
如何使用Tair增强数据结构构建丰富在线实时场景 立即下载
实战-如何基于HBase构建图片视频数据的统一存储检索方案 立即下载
DataWorks高级功能场景化案例分享 立即下载