0 背景 AttachCluster作业是批量计算最新推出的作业类型。它结合了固定集群作业和AutoCluster作业的优势,既能自动管理集群生命周期,弹性伸缩资源,又能使用分布式缓存节省资源。本文的目的在于介绍在阿里云批量计算服务上运行AttachCluster作业。
1 准备工作 1.1 开通阿里云批量计算服务 要使用批量计算服务,请根据官方文档里面的指导开通批量计算和其依赖的相关服务,如OSS等。
1.2 升级Python SDK 若您未安装批量计算Python SDK,请您参照安装方法安装该SDK。如果您检查已经安装之后,请您参照Python SDK升级方法, 升级批量计算Python SDK至最新版。
2 创建集群 AttachCluster作业首次使用时,需要创建一个集群,创建方法可参考官方文档 。该集群对配置没有特殊需求,实例数可设置为0。以下是创建集群的Python源代码。
import time import random import string import batchcompute from batchcompute import CN_SHENZHEN as REGION from batchcompute import Client, ClientError from batchcompute.resources import ( JobDescription, TaskDescription, DAG, GroupDescription, ClusterDescription, Configs, Networks, VPC, Classic, Mounts, Notification, Topic ) ACCESS_KEY_ID = 'Your Access Key Id' ACCESS_KEY_SECRET = 'Your Access Key Secret' IMAGE_ID = 'img-ubuntu' INSTANCE_TYPE = 'ecs.sn2ne.large' client = Client(REGION, ACCESS_KEY_ID, ACCESS_KEY_SECRET) def create_cluster(idempotent_token=''): try: # Cluster description. cluster_desc = ClusterDescription() cluster_desc.Name = "test-cluster" cluster_desc.Description = "demo" cluster_desc.ImageId = IMAGE_ID cluster_desc.InstanceType = INSTANCE_TYPE #Group description group_desc1 = GroupDescription() group_desc1.DesiredVMCount = 4 group_desc1.InstanceType = 'ecs.sn1ne.large' #user group special instance type group_desc1.ResourceType = 'OnDemand' cluster_desc.add_group('group1', group_desc1) #cluster_desc.add_group('group2', group_desc2) #Configs configs = Configs() #Configs.Disks configs.add_system_disk(50, 'cloud_efficiency') configs.add_data_disk(500, 'cloud_efficiency', '/home/my-data-disk') #Configs.Networks networks = Networks() vpc = VPC() vpc.CidrBlock = '192.168.0.0/16' #vpc.VpcId = 'vpc-xxxxx' networks.VPC = vpc
configs.Networks = networks cluster_desc.Configs = configs print cluster_desc rsp = client.create_cluster(cluster_desc, idempotent_token) # get cluster id for attach cluster job return rsp.Id except ClientError, e: print (e.get_status_code(), e.get_code(), e.get_requestid(), e.get_msg()) return "" if name == 'main': #Not Use idempotent token cluster_id = create_cluster() print cluster_id 3 创建作业 在创建作业的时候需要步骤2中的集群Id,填入task的AutoCluster的ClusterId字段中。以下是创建作业的Python源代码。
from batchcompute import Client, ClientError from batchcompute import CN_ZHANGJIAKOU as REGION from batchcompute.resources import ( ClusterDescription, GroupDescription, Configs, Networks, VPC, JobDescription, TaskDescription, DAG,Mounts, AutoCluster,Disks,Notification, ) access_key_id = "" # your access key id access_key_secret = "" # your access key secret image_id = "m-8vbd8lo9xxxx" # the id of a image created before,镜像需要确保已经注册给批量计算 instance_type = "ecs.sn1.medium" # instance type inputOssPath = "oss://xxx/input/" # your input oss path outputOssPath = "oss://xxx/output/" #your output oss path stdoutOssPath = "oss://xxx/log/stdout/" #your stdout oss path stderrOssPath = "oss://xxx/log/stderr/" #your stderr oss path def getAutoClusterDesc(): auto_desc = AutoCluster() # attach cluster这里里填入上一步创建的集群Id auto_desc.ClusterId = cls-xxxxx auto_desc.ECSImageId = image_id auto_desc.ReserveOnFail = False # 实例规格 auto_desc.InstanceType = instance_type #case1 设置上限价格的竞价实例; # auto_desc.ResourceType = "Spot" # auto_desc.SpotStrategy = "SpotWithPriceLimit" # auto_desc.SpotPriceLimit = 0.5 #case2 系统自动出价,最高按量付费价格 # auto_desc.ResourceType = "Spot" # auto_desc.SpotStrategy = "SpotAsPriceGo" #case3 按量 auto_desc.ResourceType = "OnDemand" #Configs configs = Configs() #Configs.Networks networks = Networks() vpc = VPC() #case1 只给CidrBlock vpc.CidrBlock = '192.168.0.0/16' #case2 CidrBlock和VpcId 都传入,必须保证VpcId的CidrBlock 和传入的CidrBlock保持一致 # vpc.CidrBlock = '172.26.0.0/16' # vpc.VpcId = "vpc-8vbfxdyhxxxx" networks.VPC = vpc configs.Networks = networks # 设置系统盘type(cloud_efficiency/cloud_ssd)以及size(单位GB) configs.add_system_disk(size=40, type_='cloud_efficiency') #设置数据盘type(必须和系统盘type保持一致) size(单位GB) 挂载点 # case1 linux环境 # configs.add_data_disk(size=40, type_='cloud_efficiency', mount_point='/path/to/mount/') # case2 windows环境 # configs.add_data_disk(size=40, type_='cloud_efficiency', mount_point='E:') # 设置节点个数 configs.InstanceCount = 1 auto_desc.Configs = configs return auto_desc def getDagJobDesc(clusterId = None): job_desc = JobDescription() dag_desc = DAG() mounts_desc = Mounts() job_desc.Name = "testBatchSdkJob" job_desc.Description = "test job" job_desc.Priority = 1 # 订阅job完成或者失败事件 noti_desc = Notification() noti_desc.Topic['Name'] = "test-topic" noti_desc.Topic['Endpoint'] = "http://[UserId].mns.[Region].aliyuncs.com/" noti_desc.Topic['Events'] = ["OnJobFinished", "OnJobFailed"] # job_desc.Notification = noti_desc job_desc.JobFailOnInstanceFail = False # 作业运行成功后户自动会被立即释放掉 job_desc.AutoRelease = False job_desc.Type = "DAG" echo_task = TaskDescription() # echo_task.InputMapping = {"oss://xxx/input/": "/home/test/input/", # "oss://xxx/test/file": "/home/test/test/file"} echo_task.InputMapping = {inputOssPath: "/home/test/input/"} echo_task.OutputMapping = {"/home/test/output/":outputOssPath} #触发程序运行的命令行 #case1 执行linux命令行 echo_task.Parameters.Command.CommandLine = "/bin/bash -c 'echo BatchcomputeService'" #case2 执行Windows CMD.exe # echo_task.Parameters.Command.CommandLine = "cmd /c 'echo BatchcomputeService'" #case3 输入可执行文件 # PackagePath存放commandLine中的可执行文件或者二进制包 # echo_task.Parameters.Command.PackagePath = "oss://xxx/package/test.sh" # echo_task.Parameters.Command.CommandLine = "sh test.sh" # 设置程序运行过程中相关环境变量信息 echo_task.Parameters.Command.EnvVars["key1"] = "value1" echo_task.Parameters.Command.EnvVars["key2"] = "value2" # 设置程序的标准输出地址,程序中的print打印会实时上传到指定的oss地址 echo_task.Parameters.StdoutRedirectPath = stdoutOssPath # 设置程序的标准错误输出地址,程序抛出的异常错误会实时上传到指定的oss地址 echo_task.Parameters.StderrRedirectPath = stderrOssPath # 设置任务的超时时间 echo_task.Timeout = 600 # 设置任务所需实例个数 # 环境变量BATCH_COMPUTE_INSTANCE_ID为0到InstanceCount-1 # 在执行程序中访问BATCH_COMPUTE_INSTANCE_ID,实现数据访问的切片实现单任务并发执行 echo_task.InstanceCount = 1 # 设置任务失败后重试次数 echo_task.MaxRetryCount = 0 # NAS数据挂载 #采用NAS时必须保证网络和NAS在同一个VPC内 nasMountEntry = { "Source": "nas://xxxx.nas.aliyuncs.com:/", "Destination": "/home/mnt/", "WriteSupport":True, } mounts_desc.add_entry(nasMountEntry) mounts_desc.Locale = "utf-8" mounts_desc.Lock = False # echo_task.Mounts = mounts_desc # attach cluster作业该集群字段设置为空 echo_task.ClusterId = "" echo_task.AutoCluster = getAutoClusterDesc() # 添加任务 dag_desc.add_task('echoTask', echo_task) # 可以设置多个task,每个task可以根据需求进行设置各项参数 # dag_desc.add_task('echoTask2', echo_task) # Dependencies设置多个task之间的依赖关系,echoTask2依赖echoTask;echoTask3依赖echoTask2 # dag_desc.Dependencies = {"echoTask":["echoTask2"], "echoTask2":["echoTask3"]} job_desc.DAG = dag_desc return job_desc if name == "main": client = Client(REGION, access_key_id, access_key_secret) try: job_desc = getDagJobDesc() job_id = client.create_job(job_desc).Id print('job created: %s' % job_id) except ClientError,e: print (e.get_status_code(), e.get_code(), e.get_requestid(), e.get_msg())
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。