日志报错异常信息:TFJob dlc1qvkyrnwcpbxz is failed because 1 Worker replica(s) failed
1.问题复现过程简单演示
- 操作流程
DataWorks数据准备
- 1.Dataworks中创建数据表
- SQL脚本
create table if not exists bank_datapython
(
age BIGINT comment '年龄',
job STRING comment '工作类型',
marital STRING comment '婚否',
education STRING comment '教育程度',
credit STRING comment '是否有信用卡',
housing STRING comment '是否有房贷'
);
show tables;
INSERT into table bank_datapython values (18,"支持","否","高中","否","有");
select * from bank_datapython;
- 配置读数据表组件
- 配置Python脚本V2组件
- main.py
import os
import argparse
import json
"""
Python V2 组件示例代码
"""
# 当前工作空间下的默认MaxCompute执行环境,包含MaxComputeProject的名称以及Endpoint.
# 需要当前的工作空间下有MaxCompute项目时,作业的执行环境才会注入。
# 示例: {"endpoint": "http://service.cn.maxcompute.aliyun-inc.com/api", "odpsProject": "lq_test_mc_project"}
ENV_JOB_MAX_COMPUTE_EXECUTION = "JOB_MAX_COMPUTE_EXECUTION"
def init_odps():
"""初始化一个ODPS实例,用于读写MaxCompute数据.
具体API请参考PyODPS的文档: https://pyodps.readthedocs.io/
"""
from odps import ODPS
# 当前工作空间的默认MaxCompute项目信息.
mc_execution = json.loads(os.environ[ENV_JOB_MAX_COMPUTE_EXECUTION])
o = ODPS(
access_id="XXXXXXXXX",
secret_access_key="XXXXXXXXX",
# 请根据Project所在的Region选择: https://help.aliyun.com/document_detail/34951.html
endpoint=mc_execution["http://service.cn-shanghai.maxcompute.aliyun.com/api"],
project=mc_execution["anPythonTest"],
)
return o
def parse_odps_url(table_uri):
"""解析输入的MaxCompute Table URI
需要打开的MaxCompute表名,格式为odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/
示例:odps://test/tables/iris/pa=1/pb=1,其中pa=1/pb=1是一个多级partition。
Returns:
返回三元组(ProjectName, TableName, Partition)
"""
from urllib import parse
parsed = parse.urlparse(table_uri)
project_name = parsed.hostname
r = parsed.path.split("/", 2)
table_name = r[2]
if len(r) > 3:
partition = r[3]
else:
partition = None
return project_name, table_name, partition
def parse_args():
"""解析给到脚本的arguments."""
parser = argparse.ArgumentParser(description="PythonV2 component script example.")
# 从上游连线输入当前组件端口的输入,会通过arguments的方式传递给到执行的脚本
# 1. 组件输入
# - OSS的输入:
# 来自上游组件的OSS输入,会被挂载到脚本执行的节点上, 然后挂载后的文件路径,会arguments的形式,传递给到运行的脚本。
# 例如 "python main.py --input1 /ml/input/data/input1 "
# - MaxComputeTable的输入:
# MaxComputeTable的输入不支持挂载,对应的Table信息会以URI的形式,作为arguments传递给到运行脚本
# 例如 "python main.py --input1 odps://some-project-name/tables/table
# 对于ODPS URI形式输入,可以用示例的parse_odps_url函数解析出对应的元信息。
parser.add_argument("--input1", type=str, default=None, help="Component input port 1.")
parser.add_argument("--input2", type=str, default=None, help="Component input port 2.")
parser.add_argument("--input3", type=str, default=None, help="Component input port 3.")
parser.add_argument("--input4", type=str, default=None, help="Component input port 4.")
# 组件输出
# - OSS输出
# 组件的输出端口1和输出端口2是两个OSS输出端口,可以用于下游的使用OSS路径作为输入的组件。
# 配置组件输出任务输出路径,对应的输出目录会被挂载到 /ml/output/ 下。
# 组件的输出端口 "OSS输出-1"和"OSS输出-2",分别对应子目录/ml/output/output1 和 ml/output/output2。
# - MaxComputeTable的输出
# 组件的输出端口3和输出端口4是MaxComputeTable输出.
# 如果当前的工作空间配置了MaxComputeProject项目,则组件传递一个临时表URI给到脚本。
# 例如 python main.py --output3 odps://<some-project-name>/tables/<output-table-name>
# 用户的代码可以构建对应的表,写出数据到对应表,然后通过组件连线将表传递给到下游组件。
parser.add_argument("--output1", type=str, default=None, help="Output OSS port 1.")
parser.add_argument("--output2", type=str, default=None, help="Output OSS port 2.")
parser.add_argument("--output3", type=str, default=None, help="Output MaxComputeTable 1.")
parser.add_argument("--output4", type=str, default=None, help="Output MaxComputeTable 2.")
args, _ = parser.parse_known_args()
return args
def write_table_example(args):
"""示例:复制将PAI提供公共表的数据,作为当前组件的临时表输出:
更多PyODPS请参考PyODPS文档: https://pyodps.readthedocs.io/
"""
output_table_uri = args.output3
o = init_odps()
project_name, table_name, partition = parse_odps_url(output_table_uri)
o.run_sql(f"create table {project_name}.{table_name} as select * from pai_online_project.heart_disease_prediction;")
def write_output1(args):
"""将数据结果写入Mount的OSS路径上(output1子目录),对应的结果可以通过连线传递到下游"""
output_path = args.output1
os.makedirs(output_path, exist_ok=True)
p = os.path.join(output_path, "result.text")
with open(p, "w") as f:
f.write("TestAccuracy=0.88")
if __name__ == "__main__":
args = parse_args()
print("Input1={}".format(args.input1))
print("Output1={}".format(args.output1))
# write_table_example(args)
# write_output1(args)
- 运行测试
2.问题临时解决方案
- 这个一般是配置了相同的OSS Path (代码和任务输出路径)导致的bug,可以将代码和任务输出路径配置成不一样的临时性解决。
- 代码和任务输出路径配置为两个不同路径
- 再次运行测试