【DSW Gallery】如何使用DLC进行TensorFlow 1.x 分布式训练

简介: Tensorflow1.x DEMO

直接使用

请打开如何使用DLC进行TensorFlow 1.x 分布式训练,并点击右上角 “ 在DSW中打开” 。

image.png

如何使用DLC进行Tensorflow1.x的分布式训练

基于云原生深度学习训练平台DLC可以进行分布式训练,本文以TensorFlow分布式训练为例,讲述如何使用DLC进行TensorFlow分布式训练。一般来说,分布式TensorFlow的使用者需要关心下面3件事情:

  1. 寻找足够运行训练的资源。在数据并行的情况下,有下列两种模式: a. PS/Worker模式 b. AllReduce模式 两种模式各有优点,PS/Worker模式为例,展示如何使用DLC进行TensorFlow的数据并行模式的分布式训练。
  2. 安装和配置支撑程序运算的软件和应用
  3. 根据分布式TensorFlow的设计,需要配置ClusterSpec。这个json格式的ClusterSpec是用来描述整个分布式训练集群的架构,比如需要使用两个Worker和PS。一般而言,我们需要在模型代码中去手动的配置ClusterSpec中的worker和ps的属性,这样告诉我们的训练框架他们的彼此的位置,然后在训练的过程中实现weight的归集和分发。

针对上面的三个点,DLC都相应的给出了很便捷,基于云原生的解决方案。

  1. DLC是云原生的深度学习训练平台,所以基于k8s的调度能力和云的资源,可以很好的实现CPU/GPU的按需高效调度
  2. 第二点实际上是运行环境,这一点Docker完美契合这个场景
  3. 最后一点, 针对不同版本的TensorFlow,需要有不同的处理方式。对于TensorFlow 1.x版本而言,我们将配置ClusterSpec的工作自动化的完成,用户可以从模型代码中获取TF_CONFIG环境变量,然后按照实际的模型逻辑的需求进行适配.

Tensorflow 1.x的单机训练代码,需要加入如下的逻辑来构建分布式训练

# 从环境变量TF_CONFIG中读取json格式的配置信息
tf_config_json = os.environ.get("TF_CONFIG", "{}")
# 反序列化为python对象
tf_config = json.loads(tf_config_json)
# 拿到ClusterSpec的内容
cluster_spec = tf_config.get("cluster", {})
cluster_spec_object = tf.train.ClusterSpec(cluster_spec)
# 获取角色类型和id, 比如这里的job_name 是 "worker" and task_id 是 0
task = tf_config.get("task", {})
job_name = task["type"]
task_id = task["index"]
# 创建TensorFlow Training Server对象
server_def = tf.train.ServerDef(
    cluster=cluster_spec_object.as_cluster_def(),
    protocol="grpc",
    job_name=job_name,
    task_index=task_id)
server = tf.train.Server(server_def)
# 如果job_name为ps,则调用server.join()
if job_name == 'ps':
    server.join()
# 检查当前进程是否是master, 如果是master,就需要负责创建session和保存summary。
is_chief = (job_name == 'master')
# 其余的代码就可以按照正常的规范去写了
with tf.compat.v1.train.MonitoredTrainingSession(
    master=server.target,
    ......

Sample代码(PS-Worker模式):

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import os
import sys
import ast
import json
import time
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
FLAGS = None
def train():
  tf_config_json = os.environ.get("TF_CONFIG", "{}")
  tf_config = json.loads(tf_config_json)
  task = tf_config.get("task", {})
  cluster_spec = tf_config.get("cluster", {})
  cluster_spec_object = tf.train.ClusterSpec(cluster_spec)
  job_name = task["type"]
  task_id = task["index"]
  print("job_name",job_name)
  print("task_id: ",task_id)
  server_def = tf.train.ServerDef(
      cluster=cluster_spec_object.as_cluster_def(),
      protocol="grpc",
      job_name=job_name,
      task_index=task_id)
  server = tf.distribute.Server(server_def)
  print("server: ",server.target)
  is_chief = (job_name == 'master')
  if is_chief:
        print("Worker %d: Initializing session..." % task_id)
        tf.reset_default_graph()
  else:
        print("Worker %d: Waiting for session to be initialized..." % task_id)
  batch_size = 5000  #  As big as will fit on my gpu
  learning_rate = 0.016 #  Fast learning
  training_epochs = 5
  n_hidden = 2000
  logs_path = "/tmp/mnist/2"
# load mnist data set
  mnist = input_data.read_data_sets('MNIST_data', one_hot=True)
  if job_name == "ps":
      server.join()
  elif job_name == "worker":
    # Between-graph replication
    with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % task_id, cluster=cluster_spec)):
        # count the number of updates
        global_step = tf.get_variable('global_step', [], initializer=tf.constant_initializer(0), trainable=False)
        # input images
        with tf.name_scope('input'):
            # None -> batch size can be any size, 784 -> flattened mnist image
            x = tf.placeholder(tf.float32, shape=[None, 784], name="x-input")
            # target 10 output classes
            y_ = tf.placeholder(tf.float32, shape=[None, 10], name="y-input")
        # model parameters will change during training so we use tf.Variable
        tf.set_random_seed(1)
        with tf.name_scope("weights"):
            W1 = tf.get_variable('W1',
                                 shape=(784, n_hidden),
                                 initializer=tf.contrib.layers.xavier_initializer())
            W2 = tf.get_variable('W2',
                                 shape=(n_hidden, 10),
                                 initializer=tf.contrib.layers.xavier_initializer())
        # bias
        with tf.name_scope("biases"):
            b1 = tf.Variable(tf.zeros([n_hidden]))
            b2 = tf.Variable(tf.zeros([10]))
        # implement model
        with tf.name_scope("softmax"):
            # y is our prediction
            z2 = tf.add(tf.matmul(x, W1), b1)
            a2 = tf.nn.sigmoid(z2)
            logits = tf.add(tf.matmul(a2, W2), b2)
            dropout_logits = tf.nn.dropout(logits, 0.3)
            softmax_logits = tf.nn.softmax(logits)
        # specify cost function
        with tf.name_scope('cross_entropy'):
            # this is our cost
            cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=dropout_logits, labels=y_)
            loss = tf.reduce_mean(cross_entropy)
        # specify optimizer
        with tf.name_scope('train'):
            # optimizer is an "operation" which we can execute in a session
            # grad_op = tf.train.AdamOptimizer(learning_rate=learning_rate)
            grad_op = tf.compat.v1.train.AdamOptimizer(learning_rate=learning_rate)
            train_op = grad_op.minimize(loss, global_step=global_step)
        with tf.name_scope('Accuracy'):
            # accuracy
            correct_prediction = tf.equal(tf.argmax(softmax_logits, 1), tf.argmax(y_, 1))
            accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
        # create a summary for our cost and accuracy
        # tf.scalar_summary("cost", loss)
        # tf.scalar_summary("accuracy", accuracy)
        tf.compat.v1.summary.scalar("cost", loss)
        tf.compat.v1.summary.scalar("accuracy", accuracy)
        # merge all summaries into a single "operation" which we can execute in a session
        # summary_op = tf.merge_all_summaries()
        summary_op = tf.compat.v1.summary.merge_all()
        # init_op = tf.initialize_all_variables()
        print("Variables initialized ...")
    # sv = tf.train.Supervisor(is_chief=(task_id == 0), global_step=global_step, init_op=init_op)
    config = tf.compat.v1.ConfigProto(device_filters=[
        '/job:ps', '/job:worker/task:%d' % task_id])
    begin_time = time.time()
    frequency = 100
    # with sv.prepare_or_wait_for_session(server.target) as sess:
    with tf.compat.v1.train.MonitoredTrainingSession(
        master=server.target,
        config=config,
        is_chief=(task_id == 0 and (
                job_name == 'worker'))) as sess:
        # create log writer object (this will log on every machine)
        # writer = tf.train.SummaryWriter(logs_path, graph=tf.get_default_graph())
        while not sess.should_stop():
            # perform training cycles
            start_time = time.time()
            for epoch in range(training_epochs):
                # number of batches in one epoch
                batch_count = int(mnist.train.num_examples / batch_size)
                count = 0
                for i in range(batch_count):
                    batch_x, batch_y = mnist.train.next_batch(batch_size)
                    # perform the operations we defined earlier on batch
                    _, cost, summary, step, train_accuracy = sess.run([train_op, loss, summary_op, global_step, accuracy],
                        feed_dict={x: batch_x, y_: batch_y})
                    # writer.add_summary(summary, step)
                    print("step: ",step,"--train_accuracy: ",train_accuracy)
                    count += 1
                    if count % frequency == 0 or i + 1 == batch_count:
                        elapsed_time = time.time() - start_time
                        start_time = time.time()
                        print("Step: %d," % (step + 1), " Epoch: %2d," % (epoch + 1),
                            " Batch: %3d of %3d," % (i + 1, batch_count), " Cost: %.4f," % cost,
                            " Train acc %2.2f" % (train_accuracy * 100),
                            " AvgTime: %3.2fms" % float(elapsed_time * 1000 / frequency))
                        count = 0
            print("Test-Accuracy: %2.2f" % (sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels}) *100))
            print("Total Time: %3.2fs" % float(time.time() - begin_time))
            print("Final Cost: %.4f" % cost)
            break
    # sess.close()
    print("done")
def main(_):
  train()
if __name__ == '__main__':
  tf.compat.v1.app.run(main=main, argv=[sys.argv[0]])

如何在DLC上面提交一个分布式的训练任务

1. 创建新的工作空间

image.png

2. 进入工作空间,新增代码配置

也可以参考: https://help.aliyun.com/document_detail/202277.html

创建新的代码配置: 如下图,用户在创建代码配置的时候,需要输入下列信息:

  1. 代码配置的名称
  2. 代码仓库的Git地址
  3. 如果用用户名密码或者Token,也请输入
  4. 本地存储目录,这个参数定义了代码被clone到容器中之后,保存的路径,默认是 /root/code/ 输入上述信息之后,保存代码配置
  5. image.png

3. 创建数据集配置

目前工作空间中支持下列四种数据集(如下图): 推荐使用阿里云存储,这样无论是性能还是可靠性都有保障

image.png

下面以阿里云存储的数据集为例,展示如何创建数据集。 目前支持两种阿里云存储:

NAS:

这里有三个参数:

  1. 数据存储类型:本例选择NAS
  2. 选择要挂载的NAS文件系统的ID,这里会有一个列表,列出当前用户所有的NAS文件系统
  3. 挂载路径:这是指定要挂载的文件系统的目录,本例中挂载文件系统的根目录
  4. image.png
  5. OSS:
  6. 数据存储类型:本例选择OSS
  7. 路径: 要挂载的文件系统的路径,可以点击红色框中的按钮选择当前用户所有的OSS文件系统
  8. image.png

4. 提交JOB

本文以PS/Worker模式的分布式训练为例,展示如何使用DLC创建一个TensorFlow分布式训练任务。

image.png

上面图中的配置是JOB的通用配置,下面选择“进阶模式”,配置分部署训练相关的规格信息:

image.png


相关实践学习
使用PAI+LLaMA Factory微调Qwen2-VL模型,搭建文旅领域知识问答机器人
使用PAI和LLaMA Factory框架,基于全参方法微调 Qwen2-VL模型,使其能够进行文旅领域知识问答,同时通过人工测试验证了微调的效果。
机器学习概览及常见算法
机器学习(Machine Learning, ML)是人工智能的核心,专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能,它是使计算机具有智能的根本途径,其应用遍及人工智能的各个领域。 本课程将带你入门机器学习,掌握机器学习的概念和常用的算法。
相关文章
|
人工智能 算法 PyTorch
TorchAcc:基于 TorchXLA 的分布式训练框架
阿里云研究员、阿里云人工智能平台 PAI 技术负责人--林伟在GTC 2024 大会 China AI Day 线上中文演讲专场上介绍了TorchAcc,这是一个基于 PyTorch/XLA 的大模型分布式训练框架。
|
存储 机器学习/深度学习 人工智能
【DSW Gallery】DSW基础使用介绍
PAI-DSW是一款云端机器学习开发IDE,为您提供交互式编程环境,适用于不同水平的开发者。本文为您介绍PAI-DSW的功能特点以及界面的基础使用。
【DSW Gallery】DSW基础使用介绍
|
Linux 数据安全/隐私保护 Windows
更换(Pypi)pip源到国内镜像
pip国内的一些镜像 阿里云 http://mirrors.aliyun.com/pypi/simple/ 中国科技大学 https://pypi.mirrors.
248043 2
|
7月前
|
Docker 容器
Docker run命令-p参数详解
本文介绍Docker端口映射的基础用法。通过`docker run -p <宿主机端口>:<容器端口>`实现端口映射,例如`-p 5000:80`将宿主机5000端口映射到容器80端口,外部访问宿主机5000端口时流量会转发至容器内部的80端口。示例命令中,`-d`用于后台运行,`--restart=always`确保容器自动重启,`--name`指定容器名称。部署完成后可通过`http://服务器IP地址:5000`验证服务是否正常运行。
|
机器学习/深度学习 分布式计算 调度
机器学习分布式框架Ray
Ray是UC Berkeley RISELab推出的一个高性能分布式执行框架,它比Spark更具计算优势,部署简单,支持机器学习和深度学习的分布式训练。Ray包括节点(head和worker)、本地调度器、object store、全局调度器(GCS),用于处理各种分布式计算任务。它支持超参数调优(Ray Tune)、梯度下降(Ray SGD)、推理服务(Ray SERVE)等。安装简单,可通过`pip install ray`。使用时,利用`@ray.remote`装饰器将函数转换为分布式任务,通过`.remote`提交并用`ray.get`获取结果。5月更文挑战第15天
2859 7
|
开发工具
Vim如何清空文件
这样,你就清空了你的文件。
1051 1
|
机器学习/深度学习 数据可视化 PyTorch
TensorFlow与PyTorch框架的深入对比:特性、优势与应用场景
【5月更文挑战第4天】本文对比了深度学习主流框架TensorFlow和PyTorch的特性、优势及应用场景。TensorFlow以其静态计算图、高性能及TensorBoard可视化工具适合大规模数据处理和复杂模型,但学习曲线较陡峭。PyTorch则以动态计算图、易用性和灵活性见长,便于研究和原型开发,但在性能和部署上有局限。选择框架应根据具体需求和场景。
1943 4
|
Docker 容器
docker: Error response from daemon: driver failed programming external connectivity on endpoint mysq
docker: Error response from daemon: driver failed programming external connectivity on endpoint mysq
|
JavaScript 前端开发 IDE
TypeScript在大型前端项目中的价值与实践策略
【4月更文挑战第6天】本文探讨了TypeScript在大型前端项目中的价值和实践策略。 TypeScript通过静态类型检查、代码提示、接口与泛型提高代码质量和开发效率。它支持最新JS语法,拥有广泛社区支持。实践策略包括逐步迁移、制定类型规范、利用IDE、维护类型定义文件以及集成自动化测试。通过培训和知识分享,团队能更好地应用TypeScript,打造高质量、可维护的前端项目。
202 1
|
数据管理 Java
Spigot开发中的事件与监听器的关系
在Spigot插件开发中,监听器(Listener)是一个非常重要的概念。它们允许你捕捉和处理各种游戏事件,使你的插件能够对玩家的行为、游戏环境的变化等做出响应。本文将详细介绍监听器是什么、它们的用途,并通过一个代码示例展示如何使用监听器。
202 0