【DSW Gallery】如何使用DLC进行TensorFlow 1.x 分布式训练

本文涉及的产品
模型训练 PAI-DLC,100CU*H 3个月
交互式建模 PAI-DSW,每月250计算时 3个月
模型在线服务 PAI-EAS,A10/V100等 500元 1个月
简介: Tensorflow1.x DEMO

直接使用

请打开如何使用DLC进行TensorFlow 1.x 分布式训练,并点击右上角 “ 在DSW中打开” 。

image.png

如何使用DLC进行Tensorflow1.x的分布式训练

基于云原生深度学习训练平台DLC可以进行分布式训练,本文以TensorFlow分布式训练为例,讲述如何使用DLC进行TensorFlow分布式训练。一般来说,分布式TensorFlow的使用者需要关心下面3件事情:

  1. 寻找足够运行训练的资源。在数据并行的情况下,有下列两种模式: a. PS/Worker模式 b. AllReduce模式 两种模式各有优点,PS/Worker模式为例,展示如何使用DLC进行TensorFlow的数据并行模式的分布式训练。
  2. 安装和配置支撑程序运算的软件和应用
  3. 根据分布式TensorFlow的设计,需要配置ClusterSpec。这个json格式的ClusterSpec是用来描述整个分布式训练集群的架构,比如需要使用两个Worker和PS。一般而言,我们需要在模型代码中去手动的配置ClusterSpec中的worker和ps的属性,这样告诉我们的训练框架他们的彼此的位置,然后在训练的过程中实现weight的归集和分发。

针对上面的三个点,DLC都相应的给出了很便捷,基于云原生的解决方案。

  1. DLC是云原生的深度学习训练平台,所以基于k8s的调度能力和云的资源,可以很好的实现CPU/GPU的按需高效调度
  2. 第二点实际上是运行环境,这一点Docker完美契合这个场景
  3. 最后一点, 针对不同版本的TensorFlow,需要有不同的处理方式。对于TensorFlow 1.x版本而言,我们将配置ClusterSpec的工作自动化的完成,用户可以从模型代码中获取TF_CONFIG环境变量,然后按照实际的模型逻辑的需求进行适配.

Tensorflow 1.x的单机训练代码,需要加入如下的逻辑来构建分布式训练

# 从环境变量TF_CONFIG中读取json格式的配置信息
tf_config_json = os.environ.get("TF_CONFIG", "{}")
# 反序列化为python对象
tf_config = json.loads(tf_config_json)
# 拿到ClusterSpec的内容
cluster_spec = tf_config.get("cluster", {})
cluster_spec_object = tf.train.ClusterSpec(cluster_spec)
# 获取角色类型和id, 比如这里的job_name 是 "worker" and task_id 是 0
task = tf_config.get("task", {})
job_name = task["type"]
task_id = task["index"]
# 创建TensorFlow Training Server对象
server_def = tf.train.ServerDef(
    cluster=cluster_spec_object.as_cluster_def(),
    protocol="grpc",
    job_name=job_name,
    task_index=task_id)
server = tf.train.Server(server_def)
# 如果job_name为ps,则调用server.join()
if job_name == 'ps':
    server.join()
# 检查当前进程是否是master, 如果是master,就需要负责创建session和保存summary。
is_chief = (job_name == 'master')
# 其余的代码就可以按照正常的规范去写了
with tf.compat.v1.train.MonitoredTrainingSession(
    master=server.target,
    ......

Sample代码(PS-Worker模式):

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import os
import sys
import ast
import json
import time
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
FLAGS = None
def train():
  tf_config_json = os.environ.get("TF_CONFIG", "{}")
  tf_config = json.loads(tf_config_json)
  task = tf_config.get("task", {})
  cluster_spec = tf_config.get("cluster", {})
  cluster_spec_object = tf.train.ClusterSpec(cluster_spec)
  job_name = task["type"]
  task_id = task["index"]
  print("job_name",job_name)
  print("task_id: ",task_id)
  server_def = tf.train.ServerDef(
      cluster=cluster_spec_object.as_cluster_def(),
      protocol="grpc",
      job_name=job_name,
      task_index=task_id)
  server = tf.distribute.Server(server_def)
  print("server: ",server.target)
  is_chief = (job_name == 'master')
  if is_chief:
        print("Worker %d: Initializing session..." % task_id)
        tf.reset_default_graph()
  else:
        print("Worker %d: Waiting for session to be initialized..." % task_id)
  batch_size = 5000  #  As big as will fit on my gpu
  learning_rate = 0.016 #  Fast learning
  training_epochs = 5
  n_hidden = 2000
  logs_path = "/tmp/mnist/2"
# load mnist data set
  mnist = input_data.read_data_sets('MNIST_data', one_hot=True)
  if job_name == "ps":
      server.join()
  elif job_name == "worker":
    # Between-graph replication
    with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % task_id, cluster=cluster_spec)):
        # count the number of updates
        global_step = tf.get_variable('global_step', [], initializer=tf.constant_initializer(0), trainable=False)
        # input images
        with tf.name_scope('input'):
            # None -> batch size can be any size, 784 -> flattened mnist image
            x = tf.placeholder(tf.float32, shape=[None, 784], name="x-input")
            # target 10 output classes
            y_ = tf.placeholder(tf.float32, shape=[None, 10], name="y-input")
        # model parameters will change during training so we use tf.Variable
        tf.set_random_seed(1)
        with tf.name_scope("weights"):
            W1 = tf.get_variable('W1',
                                 shape=(784, n_hidden),
                                 initializer=tf.contrib.layers.xavier_initializer())
            W2 = tf.get_variable('W2',
                                 shape=(n_hidden, 10),
                                 initializer=tf.contrib.layers.xavier_initializer())
        # bias
        with tf.name_scope("biases"):
            b1 = tf.Variable(tf.zeros([n_hidden]))
            b2 = tf.Variable(tf.zeros([10]))
        # implement model
        with tf.name_scope("softmax"):
            # y is our prediction
            z2 = tf.add(tf.matmul(x, W1), b1)
            a2 = tf.nn.sigmoid(z2)
            logits = tf.add(tf.matmul(a2, W2), b2)
            dropout_logits = tf.nn.dropout(logits, 0.3)
            softmax_logits = tf.nn.softmax(logits)
        # specify cost function
        with tf.name_scope('cross_entropy'):
            # this is our cost
            cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=dropout_logits, labels=y_)
            loss = tf.reduce_mean(cross_entropy)
        # specify optimizer
        with tf.name_scope('train'):
            # optimizer is an "operation" which we can execute in a session
            # grad_op = tf.train.AdamOptimizer(learning_rate=learning_rate)
            grad_op = tf.compat.v1.train.AdamOptimizer(learning_rate=learning_rate)
            train_op = grad_op.minimize(loss, global_step=global_step)
        with tf.name_scope('Accuracy'):
            # accuracy
            correct_prediction = tf.equal(tf.argmax(softmax_logits, 1), tf.argmax(y_, 1))
            accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
        # create a summary for our cost and accuracy
        # tf.scalar_summary("cost", loss)
        # tf.scalar_summary("accuracy", accuracy)
        tf.compat.v1.summary.scalar("cost", loss)
        tf.compat.v1.summary.scalar("accuracy", accuracy)
        # merge all summaries into a single "operation" which we can execute in a session
        # summary_op = tf.merge_all_summaries()
        summary_op = tf.compat.v1.summary.merge_all()
        # init_op = tf.initialize_all_variables()
        print("Variables initialized ...")
    # sv = tf.train.Supervisor(is_chief=(task_id == 0), global_step=global_step, init_op=init_op)
    config = tf.compat.v1.ConfigProto(device_filters=[
        '/job:ps', '/job:worker/task:%d' % task_id])
    begin_time = time.time()
    frequency = 100
    # with sv.prepare_or_wait_for_session(server.target) as sess:
    with tf.compat.v1.train.MonitoredTrainingSession(
        master=server.target,
        config=config,
        is_chief=(task_id == 0 and (
                job_name == 'worker'))) as sess:
        # create log writer object (this will log on every machine)
        # writer = tf.train.SummaryWriter(logs_path, graph=tf.get_default_graph())
        while not sess.should_stop():
            # perform training cycles
            start_time = time.time()
            for epoch in range(training_epochs):
                # number of batches in one epoch
                batch_count = int(mnist.train.num_examples / batch_size)
                count = 0
                for i in range(batch_count):
                    batch_x, batch_y = mnist.train.next_batch(batch_size)
                    # perform the operations we defined earlier on batch
                    _, cost, summary, step, train_accuracy = sess.run([train_op, loss, summary_op, global_step, accuracy],
                        feed_dict={x: batch_x, y_: batch_y})
                    # writer.add_summary(summary, step)
                    print("step: ",step,"--train_accuracy: ",train_accuracy)
                    count += 1
                    if count % frequency == 0 or i + 1 == batch_count:
                        elapsed_time = time.time() - start_time
                        start_time = time.time()
                        print("Step: %d," % (step + 1), " Epoch: %2d," % (epoch + 1),
                            " Batch: %3d of %3d," % (i + 1, batch_count), " Cost: %.4f," % cost,
                            " Train acc %2.2f" % (train_accuracy * 100),
                            " AvgTime: %3.2fms" % float(elapsed_time * 1000 / frequency))
                        count = 0
            print("Test-Accuracy: %2.2f" % (sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels}) *100))
            print("Total Time: %3.2fs" % float(time.time() - begin_time))
            print("Final Cost: %.4f" % cost)
            break
    # sess.close()
    print("done")
def main(_):
  train()
if __name__ == '__main__':
  tf.compat.v1.app.run(main=main, argv=[sys.argv[0]])

如何在DLC上面提交一个分布式的训练任务

1. 创建新的工作空间

image.png

2. 进入工作空间,新增代码配置

也可以参考: https://help.aliyun.com/document_detail/202277.html

创建新的代码配置: 如下图,用户在创建代码配置的时候,需要输入下列信息:

  1. 代码配置的名称
  2. 代码仓库的Git地址
  3. 如果用用户名密码或者Token,也请输入
  4. 本地存储目录,这个参数定义了代码被clone到容器中之后,保存的路径,默认是 /root/code/ 输入上述信息之后,保存代码配置
  5. image.png

3. 创建数据集配置

目前工作空间中支持下列四种数据集(如下图): 推荐使用阿里云存储,这样无论是性能还是可靠性都有保障

image.png

下面以阿里云存储的数据集为例,展示如何创建数据集。 目前支持两种阿里云存储:

NAS:

这里有三个参数:

  1. 数据存储类型:本例选择NAS
  2. 选择要挂载的NAS文件系统的ID,这里会有一个列表,列出当前用户所有的NAS文件系统
  3. 挂载路径:这是指定要挂载的文件系统的目录,本例中挂载文件系统的根目录
  4. image.png
  5. OSS:
  6. 数据存储类型:本例选择OSS
  7. 路径: 要挂载的文件系统的路径,可以点击红色框中的按钮选择当前用户所有的OSS文件系统
  8. image.png

4. 提交JOB

本文以PS/Worker模式的分布式训练为例,展示如何使用DLC创建一个TensorFlow分布式训练任务。

image.png

上面图中的配置是JOB的通用配置,下面选择“进阶模式”,配置分部署训练相关的规格信息:

image.png


相关实践学习
使用PAI-EAS一键部署ChatGLM及LangChain应用
本场景中主要介绍如何使用模型在线服务(PAI-EAS)部署ChatGLM的AI-Web应用以及启动WebUI进行模型推理,并通过LangChain集成自己的业务数据。
机器学习概览及常见算法
机器学习(Machine Learning, ML)是人工智能的核心,专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能,它是使计算机具有智能的根本途径,其应用遍及人工智能的各个领域。 本课程将带你入门机器学习,掌握机器学习的概念和常用的算法。
相关文章
|
3月前
|
数据采集 TensorFlow 算法框架/工具
【大作业-03】手把手教你用tensorflow2.3训练自己的分类数据集
本教程详细介绍了如何使用TensorFlow 2.3训练自定义图像分类数据集,涵盖数据集收集、整理、划分及模型训练与测试全过程。提供完整代码示例及图形界面应用开发指导,适合初学者快速上手。[教程链接](https://www.bilibili.com/video/BV1rX4y1A7N8/),配套视频更易理解。
67 0
【大作业-03】手把手教你用tensorflow2.3训练自己的分类数据集
|
3月前
|
机器学习/深度学习 自然语言处理 监控
利用 PyTorch Lightning 搭建一个文本分类模型
利用 PyTorch Lightning 搭建一个文本分类模型
89 8
利用 PyTorch Lightning 搭建一个文本分类模型
|
6月前
|
TensorFlow 算法框架/工具 C++
构建NLP 开发问题之如何将模型导出为 ONNX、TensorRT 或 Tensorflow 格式以便部署
构建NLP 开发问题之如何将模型导出为 ONNX、TensorRT 或 Tensorflow 格式以便部署
|
6月前
|
机器学习/深度学习 PyTorch TensorFlow
PAI DLC与其他深度学习框架如TensorFlow或PyTorch的异同
PAI DLC与其他深度学习框架如TensorFlow或PyTorch的异同
|
8月前
|
机器学习/深度学习 Kubernetes TensorFlow
基于ASK+TFJob快速完成分布式Tensorflow训练任务
本文介绍如何使用TFJob在ASK+ECI场景下,快速完成基于GPU的TensorFlow分布式训练任务。
280 0
基于ASK+TFJob快速完成分布式Tensorflow训练任务
|
存储 机器学习/深度学习 Cloud Native
|
存储 机器学习/深度学习 Kubernetes
【DSW Gallery】如何在DLC中进行Pytorch DDP分布式训练任务
本文基于Pytorch 1.8版本,介绍了如何使用DLC进行Pytorch DDP分布式训练任务.
【DSW Gallery】如何在DLC中进行Pytorch DDP分布式训练任务
|
机器学习/深度学习 算法
【DSW Gallery】如何使用EasyRec训练DeepFM模型
本文基于EasyRec 0.4.7 展示了如何使用EasyRec快速的训练一个DeepFM模型
【DSW Gallery】如何使用EasyRec训练DeepFM模型
|
机器学习/深度学习 人工智能 Kubernetes
PAI-DLC 深度学习训练平台介绍|学习笔记
快速学习 PAI-DLC 深度学习训练平台介绍。
706 0
PAI-DLC 深度学习训练平台介绍|学习笔记
|
机器学习/深度学习 数据采集 数据可视化
【DSW Gallery】Tensorflow 2构建CNN模型
本文基于TensorFlow2版本,构建了一个CNN网络,然后基于Mnist手写体数据集进行手写体的识别。本文从模型的定义,数据的加载,处理,模型的训练到最后的结果的分析以及可视化等方面提供了一个端到端的sample。用户可以基于本文了解使用TensorFlow2进行模型开发的整个流程。
【DSW Gallery】Tensorflow 2构建CNN模型