Spark内置图像数据源初探

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 在Apache Spark 2.4中引入了一个新的内置数据源, 图像数据源.用户可以通过DataFrame API加载指定目录的中图像文件,生成一个DataFrame对象.通过该DataFrame对象,用户可以对图像数据进行简单的处理,然后使用MLlib进行特定的训练和分类计算。

作者:林武康,花名知瑕, 阿里巴巴计算平台事业部EMR团队的高级开发工程师,Apache HUE Contributor, 参与了多个开源项目的研发工作,对于分布式系统设计应用有较丰富的经验,目前主要专注于EMR数据开发相关的产品的研发工作。

概述

在Apache Spark 2.4中引入了一个新的内置数据源, 图像数据源.用户可以通过DataFrame API加载指定目录的中图像文件,生成一个DataFrame对象.通过该DataFrame对象,用户可以对图像数据进行简单的处理,然后使用MLlib进行特定的训练和分类计算.
本文将介绍图像数据源的实现细节和使用方法.

简单使用

先通过一个例子来简单的了解下图像数据源使用方法. 本例设定有一组图像文件存放在阿里云的OSS上, 需要对这组图像加水印,并压缩存储到parquet文件中. 废话不说,先上代码:

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]")
    val spark = SparkSession.builder()
      .config(conf)
      .getOrCreate()

    val imageDF = spark.read.format("image").load("oss://<bucket>/path/to/src/dir")
    imageDF.select("image.origin", "image.width", "image.height", "image.nChannels", "image.mode", "image.data")
        .map(row => {
          val origin = row.getAs[String]("origin")
          val width = row.getAs[Int]("width")
          val height = row.getAs[Int]("height")
          val mode = row.getAs[Int]("mode")
          val nChannels = row.getAs[Int]("nChannels")
          val data = row.getAs[Array[Byte]]("data")
          Row(Row(origin, height, width, nChannels, mode,
            markWithText(width, height, BufferedImage.TYPE_3BYTE_BGR, data, "EMR")))
        }).write.format("parquet").save("oss://<bucket>/path/to/dst/dir")
  }

  def markWithText(width: Int, height: Int, imageType: Int, data: Array[Byte], text: String): Array[Byte] = {
    val image = new BufferedImage(width, height, imageType)
    val raster = image.getData.asInstanceOf[WritableRaster]
    val pixels = data.map(_.toInt)
    raster.setPixels(0, 0, width, height, pixels)
    image.setData(raster)
    val buffImg = new BufferedImage(width, height, imageType)
    val g = buffImg.createGraphics
    g.drawImage(image, 0, 0, null)
    g.setColor(Color.red)
    g.setFont(new Font("宋体", Font.BOLD, 30))
    g.drawString(text, width/2, height/2)
    g.dispose()
    val buffer = new ByteArrayOutputStream
    ImageIO.write(buffImg, "JPG", buffer)
    buffer.toByteArray
  }

从生成的parquet文件中抽取一条图像二进制数据,保存为本地jpg,效果如下:
gh_jpeg
图1 左图为原始图像,右图为处理后的图像

你可能注意到两个图像到颜色并不相同,这是因为Spark的图像数据将图像解码为BGR顺序的数据,而示例程序在保存的时候,没有处理这个变换,导致颜色出现了反差.

实现初窥

下面我们深入到spark源码中来看一下实现细节.Apache Spark内置图像数据源的实现代码在spark-mllib这个模块中.主要包括两个类:

  • org.apache.spark.ml.image.ImageSchema
  • org.apache.spark.ml.source.image.ImageFileFormat

其中,ImageSchema定义了图像文件加载为DataFrame的Row的格式和解码方法.ImageFileFormat提供了面向存储层的读写接口.

格式定义

一个图像文件被加载为DataFrame后,对应的如下:

    StructField("origin", StringType, true) ::
    StructField("height", IntegerType, false) ::
    StructField("width", IntegerType, false) ::
    StructField("nChannels", IntegerType, false) ::
    // OpenCV-compatible type: CV_8UC3 in most cases
    StructField("mode", IntegerType, false) ::
    // Bytes in OpenCV-compatible order: row-wise BGR in most cases
    StructField("data", BinaryType, false) :: Nil)

  val imageFields: Array[String] = columnSchema.fieldNames
  val imageSchema = StructType(StructField("image", columnSchema, true) :: Nil)

如果将该DataFrame打印出来,可以得到如下形式的表:

|image.origin        |image.width|image.height|image.nChannels|image.mode|image.data         |
+--------------------+-----------+------------+---------------+----------+-------------------+
|oss://.../dir/1.jpg |600        |343         |3              |16        |55 45 21 56  ...   |
+--------------------+-----------+------------+---------------+----------+-------------------+

其中:

  • origin: 原始图像文件的路径
  • width: 图像的宽度,单位像素
  • height: 图像的高度,单位像素
  • nChannels: 图像的通道数, 如常见的RGB位图为通道数为3
  • mode: 像素矩阵(data)中元素的数值类型和通道顺序, 与OpenCV的类型兼容
  • data: 解码后的像素矩阵

提示: 关于图像的基础支持,可以参考如下文档: Image file reading and writing

加载和解码

图像文件通过ImageFileFormat加载为一个Row对象.

// 为了简化说明起见,代码有删减和改动
private[image] class ImageFileFormat extends FileFormat with DataSourceRegister {
  ......

  override def prepareWrite(
      sparkSession: SparkSession,
      job: Job,
      options: Map[String, String],
      dataSchema: StructType): OutputWriterFactory = {
    throw new UnsupportedOperationException("Write is not supported for image data source")
  }

  override protected def buildReader(
      sparkSession: SparkSession,
      dataSchema: StructType,
      partitionSchema: StructType,
      requiredSchema: StructType,
      filters: Seq[Filter],
      options: Map[String, String],
      hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {    
    ......
    (file: PartitionedFile) => {
    ......
        val path = new Path(origin)
        val stream = fs.open(path)
        val bytes = ByteStreams.toByteArray(stream)
        val resultOpt = ImageSchema.decode(origin, bytes) // <-- 解码 
        val filteredResult = Iterator(resultOpt.getOrElse(ImageSchema.invalidImageRow(origin)))
     ......
          val converter = RowEncoder(requiredSchema)
          filteredResult.map(row => converter.toRow(row))
     ......
      }
    }
  }
}

从上可以看出:

  • 当前的图像数据源实现并不支持保存操作;
  • 图像数据的解码工作在ImageSchema中完成.

下面来看一下具体的解码过程:

// 为了简化说明起见,代码有删减和改动
private[spark] def decode(origin: String, bytes: Array[Byte]): Option[Row] = {
        // 使用ImageIO加载原始图像数据
    val img = ImageIO.read(new ByteArrayInputStream(bytes))
    if (img != null) {
      // 获取图像的基本属性
      val isGray = img.getColorModel.getColorSpace.getType == ColorSpace.TYPE_GRAY
      val hasAlpha = img.getColorModel.hasAlpha
      val height = img.getHeight
      val width = img.getWidth
      // ImageIO::ImageType -> OpenCV Type
      val (nChannels, mode) = if (isGray) {
        (1, ocvTypes("CV_8UC1"))
      } else if (hasAlpha) {
        (4, ocvTypes("CV_8UC4"))
      } else {
        (3, ocvTypes("CV_8UC3"))
      }
            // 解码
      val imageSize = height * width * nChannels
      // 用于存储解码后的像素矩阵
      val decoded = Array.ofDim[Byte](imageSize)
      if (isGray) {
        // 处理单通道图像
        ...
      } else {
        // 处理多通道图像
        var offset = 0
        for (h <- 0 until height) {
          for (w <- 0 until width) {
            val color = new Color(img.getRGB(w, h), hasAlpha)
            // 解码后的通道顺序为BGR(A)
            decoded(offset) = color.getBlue.toByte
            decoded(offset + 1) = color.getGreen.toByte
            decoded(offset + 2) = color.getRed.toByte
            if (hasAlpha) {
              decoded(offset + 3) = color.getAlpha.toByte
            }
            offset += nChannels
          }
        }
      }
      // 转换为一行数据
      Some(Row(Row(origin, height, width, nChannels, mode, decoded)))
    }
  }

从上可以看出:

  • 本数据源在实现上使用javax的ImageIO库实现各类格式的图像文件的解码.ImageIO虽然是一个十分强大和专业的java图像处理库,但是和更专业的CV库(如OpenCV)比起来,性能上和功能上差距还是非常大的;
  • 解码后的图像通道顺序和像素数值类型是固定的, 顺序固定为BGR(A), 像素数值类型为8U;
  • 最多支持4个通道,因此像多光谱遥感图像这类可能包含数十个波段信息的图像就无法支持了;
  • 解码后输出的信息仅包含基本的长宽、通道数和模式等字段,如果需要获取更为详细元数据,如exif,GPS坐标等就爱莫能助了;
  • 数据源在生成DataFrame时执行了图像的解码操作,并且解码后的数据存储在Java堆内内存中.这在实际项目应该是一个比较粗放的实现方式,会占用大量的资源,包括内存和带宽(如果发生shuffle的话,可以考虑参考同一个图像文件保存为BMP和JPG的大小差别).

编码和存储

从上分析可以看出,当前图像数据源并不支持对处理后的像素矩阵进行编码并保存为指定格式的图像文件.

图像处理能力

当前版本Apache Spark并没有提供面向图像数据的UDF,图像数据的处理需要借助ImageIO库或其他更专业的CV库.

小结

当前Apache Spark的内置图像数据源可以较为方便的加载图像文件进行分析.不过,当前的实现还十分简陋,性能和资源消耗应该都不会太乐观.并且,当前版本仅提供了图像数据的加载能力,并没有提供常用处理算法的封装和实现,也不能很好的支持更为专业的CV垂直领域的分析业务.当然,这和图像数据源在Spark中的定位有关(将图像数据作为输入用于训练DL模型,这类任务对图像的处理本身要求并不多).如果希望使用Spark框架完成更实际的图像处理任务,还有很多工作要做,比如:

  • 支持更加丰富的元数据模型
  • 使用更专业的编解码库和更灵活编解码流程控制
  • 封装面向CV的算子和函数
  • 更高效的内存管理
  • 支持GPU

等等诸如此类的工作,限于篇幅,这里就不展开了.
好了,再多说一句,现在Spark已经支持处理图像数据了(虽然支持有限),那么,视频流数据还会远吗?

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
分布式计算 关系型数据库 分布式数据库
大数据Spark外部数据源
大数据Spark外部数据源
101 0
|
3月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
72 0
|
8月前
|
分布式计算 数据处理 Apache
Spark Streaming与数据源连接:Kinesis、Flume等
Spark Streaming与数据源连接:Kinesis、Flume等
|
8月前
|
分布式计算 监控 分布式数据库
Spark Day05:Spark Core之Sougou日志分析、外部数据源和共享变量
Spark Day05:Spark Core之Sougou日志分析、外部数据源和共享变量
178 0
|
8月前
|
SQL 分布式计算 Java
Note_Spark_Day08:Spark SQL(Dataset是什么、外部数据源、UDF定义和分布式SQL引擎)
Note_Spark_Day08:Spark SQL(Dataset是什么、外部数据源、UDF定义和分布式SQL引擎)
89 0
|
消息中间件 存储 分布式计算
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(二)
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(二)
|
消息中间件 分布式计算 Kafka
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
|
SQL JSON 分布式计算
spark2 sql读取数据源编程学习样例2:函数实现详解
spark2 sql读取数据源编程学习样例2:函数实现详解
97 0
spark2 sql读取数据源编程学习样例2:函数实现详解
|
SQL 分布式计算 Java
spark2 sql读取数据源编程学习样例1
spark2 sql读取数据源编程学习样例1
76 0
spark2 sql读取数据源编程学习样例1