学习笔记TF065:TensorFlowOnSpark

简介:

Hadoop生态大数据系统分为Yam、 HDFS、MapReduce计算框架。TensorFlow分布式相当于MapReduce计算框架,Kubernetes相当于Yam调度系统。TensorFlowOnSpark,利用远程直接内存访问(Remote Direct Memory Access,RDMA)解决存储功能和调度,实现深度学习和大数据融合。TensorFlowOnSpark(TFoS),雅虎开源项目。https://github.com/yahoo/TensorFlowOnSpark 。支持ApacheSpark集群分布式TensorFlow训练、预测。TensorFlowOnSpark提供桥接程序,每个Spark Executor启动一个对应TensorFlow进程,通过远程进程通信(RPC)交互。

TensorFlowOnSpark架构。TensorFlow训练程序用Spark集群运行,管理Spark集群步骤:预留,在Executor执行每个TensorFlow进程保留一个端口,启动数据消息监听器。启动,在Executor启动TensorFlow主函数。数据获取,TensorFlow Readers和QueueRunners机制直接读取HDFS数据文件,Spark不访问数据;Feeding,SparkRDD 数据发送TensorFlow节点,数据通过feed_dict机制传入TensorFlow计算图。关闭,关闭Executor TensorFlow计算节点、参数服务节点。Spark Driver->Spark Executor->参数服务器->TensorFlow Core->gRPC、RDMA->HDFS数据集。http://yahoohadoop.tumblr.com/post/157196317141/open-sourcing-tensorflowonspark-distributed-deep

TensorFlowOnSpark MNIST。https://github.com/yahoo/TensorFlowOnSpark/wiki/GetStarted_standalone 。Standalone模式Spark集群,一台计算机。安装 Spark、Hadoop。部署Java 1.8.0 JDK。下载Spark2.1.0版 http://spark.apache.org/downloads.html 。下载Hadoop2.7.3版 http://hadoop.apache.org/#Download+Hadoop 。0.12.1版本支持较好。
修改配置文件,设置环境变量,启动Hadoop:$HADOOP_HOME/sbin/start-all.sh。检出TensorFlowOnSpark源代码:

git clone --recurse-submodules https://github.com/yahoo/TensorFlowOnSpark.git
cd TensorFlowOnSpark
git submodule init
git submodule update --force
git submodule foreach --recursive git clean -dfx

源代码打包,提交任务使用:

cd TensorflowOnSpark/src
zip -r ../tfspark.zip *

设置TensorFlowOnSpark根目录环境变量:

cd TensorFlowOnSpark
export TFoS_HOME=$(pwd)

启动Spark主节点(master):

$(SPARK_HOME)/sbin/start-master.sh

配置两个工作节点(worker)实例,master-spark-URL连接主节点:

export MASTER=spark://$(hostname):7077
export SPARK_WORKER_INSTANCES=2
export CORES_PER_WORKER=1
export TOTAL_CORES=$(($(CORES_PER_WORKER)*$(SPARK_WORKER_INSTANCES)))
$(SPARK_HOME)/sbin/start-slave.sh -c $CORES_PER_WORKER -m 3G $(MASTER)

提交任务,MNIST zip文件转换为HDFS RDD 数据集:

$(SPARK_HOME)/bin/spark-submit \
--master $(MASTER) --conf spark.ui.port=4048 --verbose \
$(TFoS_HOME)/examples/mnist/mnist_data_setup.py \
--output examples/mnist/csv \
--format csv

查看处理过的数据集:

hadoop fs -ls hdfs://localhost:9000/user/libinggen/examples/mnist/csv

查看保存图片、标记向量:

hadoop fs -ls hdfs://localhost:9000/user/libinggen/examples/mnist/csv/train/labels

把训练集、测试集分别保存RDD数据。
https://github.com/yahoo/TensorFlowOnSpark/blob/master/examples/mnist/mnist_data_setup.py

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy
import tensorflow as tf
from array import array
from tensorflow.contrib.learn.python.learn.datasets import mnist
def toTFExample(image, label):
  """Serializes an image/label as a TFExample byte string"""
  example = tf.train.Example(
    features = tf.train.Features(
      feature = {
        'label': tf.train.Feature(int64_list=tf.train.Int64List(value=label.astype("int64"))),
        'image': tf.train.Feature(int64_list=tf.train.Int64List(value=image.astype("int64")))
      }
    )
  )
  return example.SerializeToString()
def fromTFExample(bytestr):
  """Deserializes a TFExample from a byte string"""
  example = tf.train.Example()
  example.ParseFromString(bytestr)
  return example
def toCSV(vec):
  """Converts a vector/array into a CSV string"""
  return ','.join([str(i) for i in vec])
def fromCSV(s):
  """Converts a CSV string to a vector/array"""
  return [float(x) for x in s.split(',') if len(s) > 0]
def writeMNIST(sc, input_images, input_labels, output, format, num_partitions):
  """Writes MNIST image/label vectors into parallelized files on HDFS"""
  # load MNIST gzip into memory
  # MNIST图像、标记向量写入HDFS
  with open(input_images, 'rb') as f:
    images = numpy.array(mnist.extract_images(f))
  with open(input_labels, 'rb') as f:
    if format == "csv2":
      labels = numpy.array(mnist.extract_labels(f, one_hot=False))
    else:
      labels = numpy.array(mnist.extract_labels(f, one_hot=True))
  shape = images.shape
  print("images.shape: {0}".format(shape))          # 60000 x 28 x 28
  print("labels.shape: {0}".format(labels.shape))   # 60000 x 10
  # create RDDs of vectors
  imageRDD = sc.parallelize(images.reshape(shape[0], shape[1] * shape[2]), num_partitions)
  labelRDD = sc.parallelize(labels, num_partitions)
  output_images = output + "/images"
  output_labels = output + "/labels"
  # save RDDs as specific format
  # RDDs保存特定格式
  if format == "pickle":
    imageRDD.saveAsPickleFile(output_images)
    labelRDD.saveAsPickleFile(output_labels)
  elif format == "csv":
    imageRDD.map(toCSV).saveAsTextFile(output_images)
    labelRDD.map(toCSV).saveAsTextFile(output_labels)
  elif format == "csv2":
    imageRDD.map(toCSV).zip(labelRDD).map(lambda x: str(x[1]) + "|" + x[0]).saveAsTextFile(output)
  else: # format == "tfr":
    tfRDD = imageRDD.zip(labelRDD).map(lambda x: (bytearray(toTFExample(x[0], x[1])), None))
    # requires: --jars tensorflow-hadoop-1.0-SNAPSHOT.jar
    tfRDD.saveAsNewAPIHadoopFile(output, "org.tensorflow.hadoop.io.TFRecordFileOutputFormat",
                                keyClass="org.apache.hadoop.io.BytesWritable",
                                valueClass="org.apache.hadoop.io.NullWritable")
#  Note: this creates TFRecord files w/o requiring a custom Input/Output format
#  else: # format == "tfr":
#    def writeTFRecords(index, iter):
#      output_path = "{0}/part-{1:05d}".format(output, index)
#      writer = tf.python_io.TFRecordWriter(output_path)
#      for example in iter:
#        writer.write(example)
#      return [output_path]
#    tfRDD = imageRDD.zip(labelRDD).map(lambda x: toTFExample(x[0], x[1]))
#    tfRDD.mapPartitionsWithIndex(writeTFRecords).collect()
def readMNIST(sc, output, format):
  """Reads/verifies previously created output"""
  output_images = output + "/images"
  output_labels = output + "/labels"
  imageRDD = None
  labelRDD = None
  if format == "pickle":
    imageRDD = sc.pickleFile(output_images)
    labelRDD = sc.pickleFile(output_labels)
  elif format == "csv":
    imageRDD = sc.textFile(output_images).map(fromCSV)
    labelRDD = sc.textFile(output_labels).map(fromCSV)
  else: # format.startswith("tf"):
    # requires: --jars tensorflow-hadoop-1.0-SNAPSHOT.jar
    tfRDD = sc.newAPIHadoopFile(output, "org.tensorflow.hadoop.io.TFRecordFileInputFormat",
                              keyClass="org.apache.hadoop.io.BytesWritable",
                              valueClass="org.apache.hadoop.io.NullWritable")
    imageRDD = tfRDD.map(lambda x: fromTFExample(str(x[0])))
  num_images = imageRDD.count()
  num_labels = labelRDD.count() if labelRDD is not None else num_images
  samples = imageRDD.take(10)
  print("num_images: ", num_images)
  print("num_labels: ", num_labels)
  print("samples: ", samples)
if __name__ == "__main__":
  import argparse
  from pyspark.context import SparkContext
  from pyspark.conf import SparkConf
  parser = argparse.ArgumentParser()
  parser.add_argument("-f", "--format", help="output format", choices=["csv","csv2","pickle","tf","tfr"], default="csv")
  parser.add_argument("-n", "--num-partitions", help="Number of output partitions", type=int, default=10)
  parser.add_argument("-o", "--output", help="HDFS directory to save examples in parallelized format", default="mnist_data")
  parser.add_argument("-r", "--read", help="read previously saved examples", action="store_true")
  parser.add_argument("-v", "--verify", help="verify saved examples after writing", action="store_true")

args = parser.parse_args()

  print("args:",args)
  sc = SparkContext(conf=SparkConf().setAppName("mnist_parallelize"))
  if not args.read:
    # Note: these files are inside the mnist.zip file
    writeMNIST(sc, "mnist/train-images-idx3-ubyte.gz", "mnist/train-labels-idx1-ubyte.gz", args.output + "/train", args.format, args.num_partitions)
    writeMNIST(sc, "mnist/t10k-images-idx3-ubyte.gz", "mnist/t10k-labels-idx1-ubyte.gz", args.output + "/test", args.format, args.num_partitions)
  if args.read or args.verify:
    readMNIST(sc, args.output + "/train", args.format)

提交训练任务,开始训练,在HDFS生成mnist_model,命令:

${SPARK_HOME}/bin/spark-submit \
--master ${MASTER} \
--py-files ${TFoS_HOME}/examples/mnist/spark/mnist_dist.py \
--conf spark.cores.max=${TOTAL_CORES} \
--conf spark.task.cpus=${CORES_PER_WORKER} \
--conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" \
${TFoS_HOME}/examples/mnist/spark/mnist_spark.py \
--cluster_size ${SPARK_WORKER_INSTANCES} \
--images examples/mnist/csv/train/images \
--labels examples/mnist/csv/train/labels \
--format csv \
--mode train \
--model mnist_model

mnist_dist.py 构建TensorFlow 分布式任务,定义分布式任务主函数,启动TensorFlow主函数map_fun,数据获取方式Feeding。获取TensorFlow集群和服务器实例:

cluster, server = TFNode.start_cluster_server(ctx, 1, args.rdma)

TFNode调用tfspark.zip TFNode.py文件。

mnist_spark.py文件是训练主程序,TensorFlowOnSpark部署步骤:

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
import argparse
import os
import numpy
import sys
import tensorflow as tf
import threading
import time
from datetime import datetime
from tensorflowonspark import TFCluster
import mnist_dist
sc = SparkContext(conf=SparkConf().setAppName("mnist_spark"))
executors = sc._conf.get("spark.executor.instances")
num_executors = int(executors) if executors is not None else 1
num_ps = 1
parser = argparse.ArgumentParser()
parser.add_argument("-b", "--batch_size", help="number of records per batch", type=int, default=100)
parser.add_argument("-e", "--epochs", help="number of epochs", type=int, default=1)
parser.add_argument("-f", "--format", help="example format: (csv|pickle|tfr)", choices=["csv","pickle","tfr"], default="csv")
parser.add_argument("-i", "--images", help="HDFS path to MNIST images in parallelized format")
parser.add_argument("-l", "--labels", help="HDFS path to MNIST labels in parallelized format")
parser.add_argument("-m", "--model", help="HDFS path to save/load model during train/inference", default="mnist_model")
parser.add_argument("-n", "--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors)
parser.add_argument("-o", "--output", help="HDFS path to save test/inference output", default="predictions")
parser.add_argument("-r", "--readers", help="number of reader/enqueue threads", type=int, default=1)
parser.add_argument("-s", "--steps", help="maximum number of steps", type=int, default=1000)
parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true")
parser.add_argument("-X", "--mode", help="train|inference", default="train")
parser.add_argument("-c", "--rdma", help="use rdma connection", default=False)
args = parser.parse_args()
print("args:",args)
print("{0} ===== Start".format(datetime.now().isoformat()))
if args.format == "tfr":
  images = sc.newAPIHadoopFile(args.images, "org.tensorflow.hadoop.io.TFRecordFileInputFormat",
                              keyClass="org.apache.hadoop.io.BytesWritable",
                              valueClass="org.apache.hadoop.io.NullWritable")
  def toNumpy(bytestr):
    example = tf.train.Example()
    example.ParseFromString(bytestr)
    features = example.features.feature
    image = numpy.array(features['image'].int64_list.value)
    label = numpy.array(features['label'].int64_list.value)
    return (image, label)
  dataRDD = images.map(lambda x: toNumpy(str(x[0])))
else:
  if args.format == "csv":
    images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')])
    labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])
  else: # args.format == "pickle":
    images = sc.pickleFile(args.images)
    labels = sc.pickleFile(args.labels)
  print("zipping images and labels")
  dataRDD = images.zip(labels)
#1.为在Executor执行每个TensorFlow进程保留一个端口
cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK)
#2.启动Tensorflow主函数
cluster.start(mnist_dist.map_fun, args)
if args.mode == "train":
  #3.训练
  cluster.train(dataRDD, args.epochs)
else:
  #3.预测
  labelRDD = cluster.inference(dataRDD)
  labelRDD.saveAsTextFile(args.output)
#4.关闭Executor TensorFlow计算节点、参数服务节点
cluster.shutdown()
print("{0} ===== Stop".format(datetime.now().isoformat()))

预测命令:

${SPARK_HOME}/bin/spark-submit \
--master ${MASTER} \
--py-files ${TFoS_HOME}/examples/mnist/spark/mnist_dist.py \
--conf spark.cores.max=${TOTAL_CORES} \
--conf spark.task.cpus=${CORES_PER_WORKER} \
--conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" \
${TFoS_HOME}/examples/mnist/spark/mnist_spark.py \
--cluster_size ${SPARK_WORKER_INSTANCES} \
--images examples/mnist/csv/test/images \
--labels examples/mnist/csv/test/labels \
--mode inference \
--format csv \
--model mnist_model \
--output predictions

还可以Amazon EC2运行及在Hadoop集群采用YARN模式运行。

参考资料:
《TensorFlow技术解析与实战》

欢迎推荐上海机器学习工作机会,我的微信:qingxingfengzi

目录
相关文章
|
29天前
|
PyTorch 算法框架/工具
Pytorch学习笔记(五):nn.AdaptiveAvgPool2d()函数详解
PyTorch中的`nn.AdaptiveAvgPool2d()`函数用于实现自适应平均池化,能够将输入特征图调整到指定的输出尺寸,而不需要手动计算池化核大小和步长。
88 1
Pytorch学习笔记(五):nn.AdaptiveAvgPool2d()函数详解
|
3月前
|
SQL 分布式计算 安全
torch.argmax(dim=1)用法
)torch.argmax(input, dim=None, keepdim=False)返回指定维度最大值的序号;
635 0
|
SQL 分布式计算 Shell
198 Spark DataFrames创建
198 Spark DataFrames创建
64 0
|
SQL 存储 分布式计算
Spark DataSet 和 DataFrame 的区别
Spark DataSet 和 DataFrame 的区别
191 0
|
分布式计算 Apache Spark
|
自然语言处理 API 计算机视觉
PaddlePaddle2.0 数据加载及处理
PaddlePaddle2.0 数据加载及处理
218 0
|
机器学习/深度学习 人工智能 分布式计算
7月23日社区直播【TFPark: Distributed TensorFlow in Production on Apache Spark】
TFPark是开源AI平台Analytics Zoo中一个模块,它的可以很方便让用户在Spark集群中分布式地进行TensorFlow模型的训练和推断。一方面,TFPark利用Spark将TensorFlow 定义的AI训练或推理任务无缝的嵌入到用户的大数据流水线中,而无需对现有集群做任何修改;另一方面TFPark屏蔽了复杂的分布式系统逻辑,可以将单机开发的AI应用轻松扩展到几十甚至上百节点上。本次分享将介绍TFPark的使用,内部实现以及在生产环境中的实际案例。
7月23日社区直播【TFPark: Distributed TensorFlow in Production on Apache Spark】
|
机器学习/深度学习 分布式计算 大数据
Spark-TFRecord: Spark将全面支持TFRecord
本文中,我们将介绍 Spark 的一个新的数据源,Spark-TFRecord。Spark-TFRecord 的目的是提供在Spark中对原生的 TensorFlow 格式进行完全支持。本项目的目的是将 TFRecord 作为Spark数据源社区中的第一等公民,类似于 Avro,JSON,Parquet等。Spark-TFRecord 不仅仅提供简单的功能支持,比如 Data Frame的读取、写入,还支持一些高阶功能,比如ParititonBy。使用 Spark-TFRecord 将会使数据处理流程与训练工程完美结合。
Spark-TFRecord: Spark将全面支持TFRecord
|
Apache 流计算 消息中间件
Streaming with Apache Training
Apache Flink流式传输 本次培训主要专注在四个重要的概念:连续处理流数据,事件时间,有状态的流处理和状态快照。 流处理 流是数据天然的栖息地,无论是来自Web服务器的事件,来自证券交易所的交易,还是来自工厂车间的机器传感器读数,数据都是作为流的一部分创建的。
1294 0