TensorFlow目前进行数据分布式训练的主流方式是Horovod,AIACC-Training 1.5支持使用Horovod API兼容的方式对TensorFlow分布式训练进行加速。本文为您介绍使用AIACC-Training TensorFlow版的具体操作及可能遇到的问题。
适配Horovod API
本小节介绍如何使用Horovod兼容API进行TensorFlow分布式训练的基本步骤,以下操作为原始训练代码适配到AIACC-Traninig的一般过程。
AIACC-Training for TensorFlow支持Horovod API。适配AIACC-Training的方式与Horovod一致,如果您之前是使用Horovod进行分布式训练,只需替换import模块即可。替换内容后的内容如下所示:
import perseus.tensorflow.horovod as hvd
如果您的训练代码是非分布式代码,可以参考以下操作步骤将训练代码升级为Horovod接口的分布式训练代码。
- 在main函数的开头部分,执行如下命令,初始化Perseus Horovod模块。
说明:请务必在使用其他Perseus API之前进行调用。
hvd.init()
- 将训练数据集切分为子数据集。
每个进程需要使用不同的数据,来达到数据并行训练的目的,支持手动或自动进行数据切分。TensorFlow为tf.data.Dataset类提供了自动切分数据的shard()接口,可结合size()、local_rank()或rank()函数进行自定义的数据切分,命令如下:
# 每个机器使用同一份数据 dataset = tf.dataset.shard(hvd.size(), hvd.rank()) # 已将数据切分到每个机器上,每个机器使用数据的不同子集 dataset = tf.dataset.shard(hvd.size(), hvd.local_rank())
函数说明如下:
- size():返回所有参与训练的GPU卡数,例如双机,每个机器8张卡,则返回值为16。您也可以用该参数来调整学习率、以及checkpoint保存的batch number等。
- local_rank():返回当前GPU在本机器上的编号,例如两台8卡机器,则每台机器上的卡的local_rank()范围是0~7。您也可以用该参数来绑定进程到相应的GPU卡。
- rank():返回当前GPU在所有机器及GPU中的编号,例如四台8卡机器,rank()的范围是0~31。
重要
- 为保证算法的准确性和性能的稳定性,请将shard()函数置于数据集的shuffle()和repeat()操作之前。
- 请勿对evaluation或test数据集进行切分,否则将导致各进程的评估结果不一致。
- 通常情况下,对于训练的step参数和warmup step参数,需要除以总的进程数hvd.size(),学习率需要对应增大hvd.size()倍。
说明:部分模型不需要增大学习率,如BERT模型,具体请根据训练收敛情况作判断。
step = step // hvd.size() learning_rate = learning_rate * hvd.size()
- 重载Optimizer。
将原单机的Optimizer使用此函数进行重载。使用示例如下:
# original optimizer optimizer = tf.train.AdamOptimizer(learning_rate) # AIACC-Training: wrap the Distributed Optimizer. optimizer = hvd.DistributedOptimizer(optimizer)
DistributedOptimizer(TensorFlow_Optimizer):用于重载Optimizer,使其可以进行AIACC-Training分布式训练。输入参数为标准TensorFlow下的Optimizer,输出为被AIACC-Training重载过的Optimizer。
- 将当前process绑定对应的GPU卡。
- 对于TensorFlow 1.x版本:
config = tf.ConfigProto() config.gpu_options.allow_growth= True config.gpu_options.visible_device_list =str(hvd.local_rank())
你还需要根据以下不同情况,将config传入相应的函数中:
- 若使用tf.Session.run(),则需要在初始化tf.Session时传入config。
with tf.Session(config=config) as sess: sess.run(...)
- 若使用Estimator,则需要在初始化Estimator时,以tf.estimator.RunConfig(session_config=config)封装传入。
mnist_classifier = tf.estimator.Estimator( model_fn=cnn_model_fn, model_dir=model_dir, config=tf.estimator.RunConfig(session_config=config))
- 若使用Keras,需要封装后传入。
from tensorflow.keras import backend as K K.set_session(tf.Session(config=config))
- 对于TensorFlow 2.x版本:
gpus = tf.config.experimental.list_physical_devices('GPU') for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) if gpus: tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
- 在开始训练之前,需要同步所有训练进程的初始状态,根据不同的使用形式参考以下内容进行适配。
- 对于传统的tf.Session.run()形式,需在训练前先运行tf.Session.run(hvd.broadcast_global_variables(0))。
- 对于Keras,需在训练的callbacks中增加hvd.keras.callbacks.BroadcastGlobalVariablesCallback(0)。
- 对于Estimator,需在session hook中加入hvd.BroadcastGlobalVariablesHook(0)。
- checkpoint或模型只需要在root rank上保存,其它rank设置为none。
说明:此步骤可以防止各个进程因保存模型而发生覆盖冲突,因此务必执行。
checkpoint_dir = './checkpoints' if hvd.rank() == 0 else None
- 通过启动perseusrun来进行训练脚本的分布式训练。以8卡2机的启动方式为例,命令示例如下:
perseusrun -np 16 -H {IP1:8},{IP2:8} python xxx.py
其中,IP为内网IP。
说明:关于启动方式的更多信息,请参见分布式启动方式说明。
示范用例
AIACC-Training软件包路径中为您提供了针对tf.Session.run()、Keras、Estimator、tf2 eager等训练方式的适配示例代码,您可以通过以下操作体验训练过程。
- 进入适配示例代码目录。
cd `echo $(python -c "import perseus; print(perseus)") | cut -d\' -f 4 | sed "s/\_\_init\_\_\.py//"`examples/
- 启动分布式训练。
以Keras形式的MNIST训练模型启动单机2卡为例,启动命令如下:
perseusrun -np 2 -H localhost:2 python tensorflow_keras_mnist.py
常见问题
训练过程中出现OOM(显存不足)报错
您可以识别以下几种可能情况并解决。
- 使用nvidia-smi检测启动过程中,各显卡占用显存是否均衡增加,单个process对应单个GPU,每个process所使用显存应该接近。
- 若存在多个process绑定同一张卡的情况,需要检查config.gpu_options.visible_device_list是否正确设置。
- 若存在显存直接被用尽,您可以尝试增加config.allow_growth = True。
- 若启用了XLA,您可以尝试增加参数config.gpu_options.per_process_gpu_memory_fraction = 0.9或者0.8。
如何快速判断是否是梯度通信带来的性能瓶颈
您可以将适配代码中的DistributedOptimizer(opt)参数注释掉,此时,将不会产生梯度通信,您即可进一步排查数据IO、CPU预处理等可能造成性能瓶颈的原因。
对数据集做shard的注意事项
由于AIACC-Training是由多个进程启动同一份训练代码,因此您需要对数据集做数据集切分为子数据集,使每个进程处理与训练不同的子数据集。TensorFlow为tf.data.Dataset类提供了自动切分数据的shard()接口,您可以结合size()、local_rank()或rank()函数进行自定义的数据切分,例如:
dataset = tf.dataset.shard(hvd.size(), hvd.rank())
重要:为保证算法的准确性,shard使用时必须放在数据集shuffle操作之前,且为了保证性能稳定,shard操作不要放在repeat操作之后。否则会带来额外的预处理负担,将严重影响性能。
好啦!小弹的分享到此为止。我们更欢迎您分享您对阿里云产品的设想、对功能的建议或者各种吐槽,请扫描提交问卷并获得社区积分或精美礼品一份。https://survey.aliyun.com/apps/zhiliao/P4y44bm_8
【扫码填写上方调研问卷】
欢迎每位来到弹性计算的开发者们来反馈问题哦~