TensorFlow分布式实践

简介: 基于单机的建模很难满足企业不断增长的数据量级的需求,开发者需要使用分布式的开发方式,在集群上进行建模。而单机和分布式的开发代码有一定的区别,本文就将为开发者们介绍,基于TensorFlow进行分布式开发的两种方式,帮助开发者在实践的过程中,更好地选择模块的开发方向。

大数据时代,基于单机的建模很难满足企业不断增长的数据量级的需求,开发者需要使用分布式的开发方式,在集群上进行建模。而单机和分布式的开发代码有一定的区别,本文就将为开发者们介绍,基于TensorFlow进行分布式开发的两种方式,帮助开发者在实践的过程中,更好地选择模块的开发方向。

基于TensorFlow原生的分布式开发

分布式开发会涉及到更新梯度的方式,有同步和异步的两个方案,同步更新的方式在模型的表现上能更快地进行收敛,而异步更新时,迭代的速度则会更加快。两种更新方式的图示如下:

01


*

同步更新流程
(图片来源:TensorFlow:Large-Scale Machine Learning on Heterogeneous Distributed Systems)

*

02


*异步更新流程
(图片来源:TensorFlow:Large-Scale Machine Learning on Heterogeneous Distributed Systems)
*


TensorFlow是基于ps、work 两种服务器进行分布式的开发。ps服务器可以只用于参数的汇总更新,让各个work进行梯度的计算。

基于TensorFlow原生的分布式开发的具体流程如下:

首先指定ps 服务器启动参数 –job_name=ps:

python distribute.py --ps_hosts=192.168.100.42:2222 --worker_hosts=192.168.100.42:2224,192.168.100.253:2225 --job_name=ps --task_index=0

接着指定work服务器参数(启动两个work 节点) –job_name=work2:

python distribute.py --ps_hosts=192.168.100.42:2222 --worker_hosts=192.168.100.42:2224,192.168.100.253:2225 --job_name=worker --task_index=0
python distribute.py --ps_hosts=192.168.100.42:2222 --worker_hosts=192.168.100.42:2224,192.168.100.253:2225 --job_name=worker --task_index=1

之后,上述指定的参数 worker_hosts ps_hosts job_name task_index 都需要在py文件中接受使用:

tf.app.flags.DEFINE_string("worker_hosts", "默认值", "描述说明")

接收参数后,需要分别注册ps、work,使他们各司其职:

ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
server = tf.train.Server(cluster,job_name=FLAGS.job_name,task_index=FLAGS.task_index)

issync = FLAGS.issync
if FLAGS.job_name == "ps":
   server.join()
elif FLAGS.job_name == "worker":
   with tf.device(tf.train.replica_device_setter(
                   worker_device="/job:worker/task:%d" % FLAGS.task_index,
                   cluster=cluster)):

继而更新梯度。

(1)同步更新梯度:

rep_op = tf.train.SyncReplicasOptimizer(optimizer,
                                               replicas_to_aggregate=len(worker_hosts),
                                               replica_id=FLAGS.task_index,
                                               total_num_replicas=len(worker_hosts),
                                               use_locking=True)
train_op = rep_op.apply_gradients(grads_and_vars,global_step=global_step)
init_token_op = rep_op.get_init_tokens_op()
chief_queue_runner = rep_op.get_chief_queue_runner()

(2)异步更新梯度:

train_op = optimizer.apply_gradients(grads_and_vars,global_step=global_step)

最后,使用tf.train.Supervisor 进行真的迭代

另外,开发者还要注意,如果是同步更新梯度,则还需要加入如下代码:

sv.start_queue_runners(sess, [chief_queue_runner])
sess.run(init_token_op)

需要注意的是,上述异步的方式需要自行指定集群IP和端口,不过,开发者们也可以借助TensorFlowOnSpark,使用Yarn进行管理。

基于TensorFlowOnSpark的分布式开发

作为个推面向开发者服务的移动APP数据统计分析产品,个数所具有的用户行为预测功能模块,便是基于TensorFlowOnSpark这种分布式来实现的。基于TensorFlowOnSpark的分布式开发使其可以在屏蔽了端口和机器IP的情况下,也能够做到较好的资源申请和分配。而在多个千万级应用同时建模的情况下,集群也有良好的表现,在sparkUI中也能看到相对应的资源和进程的情况。最关键的是,TensorFlowOnSpark可以在单机过度到分布式的情况下,使代码方便修改,且容易部署。

基于TensorFlowOnSpark的分布式开发的具体流程如下:

首先,需要使用spark-submit来提交任务,同时指定spark需要运行的参数(–num-executors 6等)、模型代码、模型超参等,同样需要接受外部参数:

parser = argparse.ArgumentParser()
parser.add_argument("-i", "--tracks", help="数据集路径")  
args = parser.parse_args()

之后,准备好参数和训练数据(DataFrame),调用模型的API进行启动。

其中,soft_dist.map_fun是要调起的方法,后面均是模型训练的参数。

estimator = TFEstimator(soft_dist.map_fun, args) \
     .setInputMapping({'tracks': 'tracks', 'label': 'label'}) \
     .setModelDir(args.model) \
     .setExportDir(args.serving) \
     .setClusterSize(args.cluster_size) \
     .setNumPS(num_ps) \
     .setEpochs(args.epochs) \
     .setBatchSize(args.batch_size) \
     .setSteps(args.max_steps)
   model = estimator.fit(df)

接下来是soft_dist定义一个 map_fun(args, ctx)的方法:

def map_fun(args, ctx):
...
worker_num = ctx.worker_num  # worker数量
job_name = ctx.job_name  # job名
task_index = ctx.task_index  # 任务索引
if job_name == "ps":  # ps节点(主节点)
  time.sleep((worker_num + 1) * 5)
  cluster, server = TFNode.start_cluster_server(ctx, 1, args.rdma)
  num_workers = len(cluster.as_dict()['worker'])
  if job_name == "ps":
       server.join()
  elif job_name == "worker":
       with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % task_index, cluster=cluster)):

之后,可以使用tf.train.MonitoredTrainingSession高级API,进行模型训练和预测。

总结

基于TensorFlow的分布式开发大致就是本文中介绍的两种情况,第二种方式可以用于实际的生产环境,稳定性会更高。

在运行结束的时候,开发者们也可通过设置邮件的通知,及时地了解到模型运行的情况。

同时,如果开发者使用SessionRunHook来保存最后输出的模型,也需要了解到,框架代码中的一个BUG,即它只能在规定的时间内保存,超出规定时间,即使运行没有结束,程序也会被强制结束。如果开发者使用的版本是未修复BUG的版本,则要自行处理,放宽运行时间。

目录
相关文章
|
2月前
|
运维 Kubernetes 调度
阿里云容器服务 ACK One 分布式云容器企业落地实践
3年前的云栖大会,我们发布分布式云容器平台ACK One,随着3年的发展,很高兴看到ACK One在混合云,分布式云领域帮助到越来越多的客户,今天给大家汇报下ACK One 3年来的发展演进,以及如何帮助客户解决分布式领域多云多集群管理的挑战。
阿里云容器服务 ACK One 分布式云容器企业落地实践
|
3月前
|
自然语言处理 C# 开发者
Uno Platform多语言开发秘籍大公开:轻松驾驭全球用户,一键切换语言,让你的应用成为跨文化交流的桥梁!
【8月更文挑战第31天】Uno Platform 是一个强大的开源框架,允许使用 C# 和 XAML 构建跨平台的原生移动、Web 和桌面应用程序。本文详细介绍如何通过 Uno Platform 创建多语言应用,包括准备工作、设置多语言资源、XAML 中引用资源、C# 中加载资源以及处理语言更改。通过简单的步骤和示例代码,帮助开发者轻松实现应用的国际化。
41 1
|
3月前
|
存储 分布式计算 Hadoop
【揭秘Hadoop背后的秘密!】HDFS读写流程大曝光:从理论到实践,带你深入了解Hadoop分布式文件系统!
【8月更文挑战第24天】Hadoop分布式文件系统(HDFS)是Hadoop生态系统的关键组件,专为大规模数据集提供高效率存储及访问。本文深入解析HDFS数据读写流程并附带示例代码。HDFS采用NameNode和DataNode架构,前者负责元数据管理,后者承担数据块存储任务。文章通过Java示例演示了如何利用Hadoop API实现数据的写入与读取,有助于理解HDFS的工作原理及其在大数据处理中的应用价值。
102 1
|
3月前
|
机器学习/深度学习 人工智能 负载均衡
【AI大模型】分布式训练:深入探索与实践优化
在人工智能的浩瀚宇宙中,AI大模型以其惊人的性能和广泛的应用前景,正引领着技术创新的浪潮。然而,随着模型参数的指数级增长,传统的单机训练方式已难以满足需求。分布式训练作为应对这一挑战的关键技术,正逐渐成为AI研发中的标配。
201 5
|
3月前
|
存储 Kubernetes 监控
深入浅出分布式事务:理论与实践
在数字化时代的浪潮中,分布式系统如同星辰大海般浩瀚而深邃。本文将带你航行于这片星辰大海,探索分布式事务的奥秘。我们将从事务的基本概念出发,逐步深入到分布式事务的核心机制,最后通过一个实战案例,让你亲自体验分布式事务的魅力。让我们一起揭开分布式事务的神秘面纱,领略其背后的科学与艺术。
85 1
|
3月前
|
Go API 数据库
[go 面试] 分布式事务框架选择与实践
[go 面试] 分布式事务框架选择与实践
|
3月前
|
持续交付 测试技术 jenkins
JSF 邂逅持续集成,紧跟技术热点潮流,开启高效开发之旅,引发开发者强烈情感共鸣
【8月更文挑战第31天】在快速发展的软件开发领域,JavaServer Faces(JSF)这一强大的Java Web应用框架与持续集成(CI)结合,可显著提升开发效率及软件质量。持续集成通过频繁的代码集成及自动化构建测试,实现快速反馈、高质量代码、加强团队协作及简化部署流程。以Jenkins为例,配合Maven或Gradle,可轻松搭建JSF项目的CI环境,通过JUnit和Selenium编写自动化测试,确保每次构建的稳定性和正确性。
62 0
|
3月前
|
UED 存储 数据管理
深度解析 Uno Platform 离线状态处理技巧:从网络检测到本地存储同步,全方位提升跨平台应用在无网环境下的用户体验与数据管理策略
【8月更文挑战第31天】处理离线状态下的用户体验是现代应用开发的关键。本文通过在线笔记应用案例,介绍如何使用 Uno Platform 优雅地应对离线状态。首先,利用 `NetworkInformation` 类检测网络状态;其次,使用 SQLite 实现离线存储;然后,在网络恢复时同步数据;最后,通过 UI 反馈提升用户体验。
94 0
|
3月前
|
机器学习/深度学习 TensorFlow 数据处理
分布式训练在TensorFlow中的全面应用指南:掌握多机多卡配置与实践技巧,让大规模数据集训练变得轻而易举,大幅提升模型训练效率与性能
【8月更文挑战第31天】本文详细介绍了如何在Tensorflow中实现多机多卡的分布式训练,涵盖环境配置、模型定义、数据处理及训练执行等关键环节。通过具体示例代码,展示了使用`MultiWorkerMirroredStrategy`进行分布式训练的过程,帮助读者更好地应对大规模数据集与复杂模型带来的挑战,提升训练效率。
90 0
|
3月前
|
存储 负载均衡 中间件
构建可扩展的分布式数据库:技术策略与实践
【8月更文挑战第3天】构建可扩展的分布式数据库是一个复杂而具有挑战性的任务。通过采用数据分片、复制与一致性模型、分布式事务管理和负载均衡与自动扩展等关键技术策略,并合理设计节点、架构模式和网络拓扑等关键组件,可以构建出高可用性、高性能和可扩展的分布式数据库系统。然而,在实际应用中还需要注意解决数据一致性、故障恢复与容错性以及分布式事务的复杂性等挑战。随着技术的不断发展和创新,相信分布式数据库系统将在未来发挥更加重要的作用。
下一篇
无影云桌面