import time from batchcompute import Cient, ClientError from batchcompute import CN_SHENZHEN as REGION from batchcompute.resources import ( JobDescription, TaskDescription, DAG, GroupDescription, ClusterDescription, )
access_key_id = ... # your access key id access_key_secret = ... # your access key secret image_id = ... # the id of a image created before instance_type = ... # instance type client = Client(REGION, access_key_id, access_key_secret) try: # Create cluster. cluster_desc = ClusterDescription() group_desc = GroupDescription() group_desc.DesiredVMCount = 1 group_desc.InstanceType = instance_type cluster_desc.add_group('group1', group_desc) cluster_desc.Name = "BatchcomputeCluster" cluster_desc.ImageId = image_id cluster_desc.Description = "Python SDK test" cluster_id = client.create_cluster(cluster_desc).Id # Create job. job_desc = JobDescription() echo_task = TaskDescription() # Create map task. echo_task.Parameters.Command.CommandLine = "echo Batchcompute service" echo_task.Parameters.Command.PackagePath = "" echo_task.Parameters.StdoutRedirectPath = "oss://xxx/xxx/" # Better replace this path echo_task.Parameters.StderrRedirectPath = "oss://xxx/xxx/" # Better replace this path echo_task.InstanceCount = 3 echo_task.ClusterId = cluster_id # Create task dag. task_dag = DAG() task_dag.add_task(task_name="Echo", task=echo_task) # Create job description. job_desc.DAG = task_dag job_desc.Priority = 99 # 0-1000 job_desc.Name = "PythonSDKDemo" job_desc.JobFailOnInstanceFail = True job_id = client.create_job(job_desc).Id # Wait for job finished. errs = client.poll(job_id) if errs: print ("Some errors occur: %s" % '\n'.join(errs)) # Delete cluster client.delete_cluster(cluster_id) except ClientError, e: print (e.get_status_code(), e.get_code(), e.get_requestid(), e.get_msg()) 2、 类和常量 2.1 接口类型 接口类型提供了BatchCompute服务所有API接口的Python实现以及其他一些有用的辅助接口。
序号 名称 可序列化 描述 1. Client No 与BatchCompute服务交互的客户端类型 2.2 描述类型 描述类型主要作为创建资源时的参数类型或获取资源状态信息时由服务的返回类型。
序号 名称 可序列化 描述 1. JobDescription Yes 描述用户作业的类 2. DAG Yes 描述作业任务以及任务间互相之间依赖关系的类 3. TaskDescription Yes 描述任务的类 4. Parameters Yes 描述任务运行参数的类 5. Command Yes 配置任务命令行执行环境类 6. ClusterDescription Yes 描述用户集群的类 7. GroupDescription Yes 描述用户集群实例配置的类 8. Job Yes 描述给定作业当前状态信息的类 9. Task Yes 描述给定的作业任务的当前状态信息的类 10. Instance Yes 描述给定的任务实例当前状态信息的类 11. Result Yes 描述给定的任务实例运行结果的类 12. InstanceMetrics Yes 描述给定作业或任务实例统计信息的类 13. TaskMetrics Yes 描述给定作业中任务统计信息的类 14. Cluster Yes 描述给定集群状态信息的类 15. Group Yes 描述给定集群中机器组状态信息的类 16. Metrics Yes 描述给定集群作业统计信息的类 关于可序列化 描述类型均为可序列化的类型,SDK中所有可序列化的类均从内部类型 Jsonizable 继承而来。以下是关于 Jsonizable 类型及其子类的描述;
参数说明:
Jsonizable 及其子类对象均可通过字典,Jsonizable 对象或者描述字典的JSON串初始化。注意,在初始化 Jsonizable 对象及其子类时,会丢弃字典或者JSON串中所有不合法的属性描述信息。
参数 类型 描述 properties dict, str, Jsonizable object 属性描述信息 通过字典初始化 Jsonizable 类: e.g.
from batchcompute.resources import JobDescription
properties = { "Name": "PythonSDKDemo", "Description": "Batchcompute" } jsonizable = JobDescription(properties) print (jsonizable.Name) print (jsonizable.Description) 通过JSON字符串初始化 Jsonizable 类 e.g.
from batchcompute.resources import JobDescription
properties = '''{ "Name": "PythonSDKDemo", "Description": "Batchcompute" }''' jsonizable = JobDescription(properties) print (jsonizable.Name) print (jsonizable.Description) 通过相同类的对象初始化 Jsonizable 类。 e.g.
form batchcompute.resources import JobDescription
jsonizable1 = JobDescription() jsonizable1.Name = "PythonSDKDemo" jsonizable1.Description = "Batchcompute" jsonizable2 = JobDescription(jsonizable1) print(jsonizable2.Name) print(jsonizable2.Description) 方法说明:
序号 方法名 描述 1. update 接受一个字典对象,更新类的部分属性,不合法的属性将被丢弃 2. detail 返回一个包含类属性的字典,如果属性为空将不被包含 3. load 接受一个字符串,该字符串是一个JSON化的字典,类的属性均被更新,不合法的属性会被丢弃 4. dump 返回一个字符串,内容JSON化的字典,包含所有类属性信息,如果属性为空将不被包含 5. str 被print调用的内置函数,其内部调用了dump函数 关于类属性 可序列化的类型均具有各种属性。属性均可以通过其名称直接读取,例如,你可以通过如下代码获取作业的ID:另外属性名与Python规范PEP8中类的命名方式保持一致(区别于类方法的命名规则),遵循CamelCase的拼写规则.
job = ... job_id = job.Id print (job_id) 另外,可以通过字典取值的方式获取属性,例如:
job = ... job_id = job["Id"] print (job_id) 对于类 JobDescription, DAG, TaskDescription, 可以通过赋值的方式更改某个属性的值,例如: from batchcompute.resources import JobDescription job_desc = JobDescription() job_desc.Name = "PythonSDKDemo" 对于类 JobDescription, DAG, TaskDescription, 可以通过字典的方式对类的属性进行赋值, 例如: from batchcompute.resources import JobDescription job_desc = JobDescription() job_desc["Id"] = "PythonSDKDemo" 2.3 响应类型 序号 名称 可序列化 描述 1. CreateResponse No 创建资源成功后, Client返回的响应类 2. GetResponse No 获取资源状态信息, Client返回的响应类 3. ActionResponse No 对资源进行各种操作时由Client返回的响应类 4. ListResponse No 列举资源时,由Client返回的响应类 关于响应类 所有的响应类型 (CreateResponse, GetResponse, ActionRespnse, ListResponse)均继承自内部类型RawResponse.以下描述适用于所有RawResponse的子类。
属性说明:
属性 类型 描述 RequestId str Client的所有请求的识别码 StatusCode int Client的所有请求的状态码 e.g.
... response = client.create_job(job_desc) print (response.RequestId) print (response.StatusCode) 2.4 异常类型 非法的参数或者非法请求时会抛出异常
序号 名称 可序列化 描述 1. ClientError No 异常类 2. FieldError No 异常类 3. ValidationError No 异常类 4. JSONError No 异常类 5. ConfigError No 异常类 2.5 常量 序号 名称 可序列化 描述 1. CN_QINGDAO No 常量,BatchCompute的青岛(华北1)endpoint 2. CN_SHENZHEN No 常量,BatchCompute的深圳(华南1)endpoint 3. CN_BEIJING No 常量,BatchCompute的北京(华北2)endpoint 4. CN_HANGZHOU No 常量,BatchCompute的杭州(华东1)endpoint
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。