1.Writing R data frames returned from SparkR:::map

简介: stack overflow原文地址弱鸡小白在使用SparkR处理大规模的R dataframe时想使用map的方式进行数据操作。数据都是结构化的,并且每个分区都是相同的结构。

stack overflow原文地址
弱鸡小白在使用SparkR处理大规模的R dataframe时想使用map的方式进行数据操作。数据都是结构化的,并且每个分区都是相同的结构。本想的将这些数据作为parquet这样就可以避免collect的Action操作。现在很担心能不能再程序输出的output list后进行write.df的操作,能否使用worker tasks编写替代指定parquet进行操作?
小白的程序如下:

#! /usr/bin/Rscript
library(SparkR, lib.loc="/opt/spark-1.5.1-bin-without-hadoop/R/lib")
source("jdbc-utils.R")
options(stringsAsFactors = FALSE)

# I dislike having these here but when I move them into main(), it breaks - the sqlContext drops.
assign("sc", sparkR.init(master = "spark://poc-master-1:7077", sparkHome = "/opt/spark-1.5.1-bin-without-hadoop/", appName = "Peter Spark test", list(spark.executor.memory="4G")), envir = .GlobalEnv)
assign("sqlContext", sparkRSQL.init(sc), envir =.GlobalEnv)

#### MAP function ####
run.model <- function(v) {
 x <- v$xs[1]
 y <- v$ys[1] 
startTime <- format(Sys.time(), "%F %T") 
xs <- c(1:x)
endTime <- format(Sys.time(), "%F %T")
hostname <- system("hostname", intern = TRUE) 
xys <- data.frame(xs,y,startTime,endTime,hostname,stringsAsFactors = FALSE) 
return(xys)
}

# HERE BE THE SCRIPT BIT
main <- function() { 
# Make unique identifiers for each run 
xs <- c(1:365) 
ys <- c(1:1) 
xys <- data.frame(xs,ys,stringsAsFactors = FALSE) 
# Convert to Spark dataframe for mapping 
sqlContext <- get("sqlContext", envir = .GlobalEnv)
 xys.sdf <- createDataFrame(sqlContext, xys) 
# Let Spark do what Spark does 
output.list <- SparkR:::map(xys.sdf, run.model) 
# Reduce gives us a single R dataframe, which may not be what we want. 
output.redux <- SparkR:::reduce(output.list, rbind) 
# Or you can have it as a list of data frames. output.col <- collect(output.list) 
return(NULL)
}

小白心里是这样想的,先生成一个名字叫xys的dataframe,两列数据,一列是1:365,另一列是1。通过createDataFrame将其转换成为RDD,然后进行map和reduce的操作。同时编写了一个demo小函数,用来进行map。

程序结果.png

小白同学的心中是充满疑惑的:

  1. 并没有想象中的需要避免绝对的collect使用,而去将结果组合作为Parquet进行存储;
  2. 同时,也并不确信:::map的函数形式真正实现了并行,难道需要一直申明parallelise

对于小白的疑惑,大腿同学是这样解释的:
假设你的数据差不多是下面这个样子的:

rdd <- SparkR:::parallelize(sc, 1:5)
dfs <- SparkR:::map(rdd, function(x) mtcars[(x * 5):((x + 1) * 5), ])

首先给你瞅一眼mtcars的数据:

mtcars.png

瞅一眼程序结果:


程序结果.png

同时大腿也给出了自己的思路:
因为要对所有数据的列进行操作,完全可以把它转换成为row-wise的逐行操作类型;

rows <- SparkR:::flatMap(dfs, function(x) { 
data <- as.list(x) 
args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
do.call(mapply, append(args, data))})
sdf <- createDataFrame(sqlContext, rows)
head(sdf)
结果.png

大腿这里用了append秒rbind一万条街;用flatmap实现了map实现的捉襟见肘的多分区集合,小白深感佩服。
看到小白一脸葱白的样子,大神接着说:
接下来就可以使用简单的write.df / saveDF
小白啊,你的问题主要是一开始使用了一个内部方法map,他被从最初版本移除的一个重要原因是如果直接使用是不健全的,而且也不清楚将来会不会被支持,谁知道呢。
于是小白关注了大腿同学。

目录
相关文章
collect2: fatal error: ld terminated with signal 11 [Segmentation fault], core dumped
collect2: fatal error: ld terminated with signal 11 [Segmentation fault], core dumped
478 0
|
JSON 数据格式
遇到【Unexpected character (‘“‘ (code 34)): was expecting comma to separate Object entries】的解决办法
遇到【Unexpected character (‘“‘ (code 34)): was expecting comma to separate Object entries】的解决办法
遇到【Unexpected character (‘“‘ (code 34)): was expecting comma to separate Object entries】的解决办法
|
TensorFlow 算法框架/工具 Python
成功解决File "frozen importlib._bootstrap", line 219, in _call_with_frames_removed ImportError: DLL lo
成功解决File "frozen importlib._bootstrap", line 219, in _call_with_frames_removed ImportError: DLL lo
成功解决File "frozen importlib._bootstrap", line 219, in _call_with_frames_removed ImportError: DLL lo
|
编解码 算法 测试技术
[译]Page Multiplexing and Ordering in a Physical Ogg Stream
Ogg容器格式的设计和排列受几个高级设计决策支配,这些决策构成了特定的低级设计决策的依据。
84 0
libgsm.a relocation R_X86_64_PC32 can not be used when making a shared object; recompile with -fPIC
libgsm.a relocation R_X86_64_PC32 can not be used when making a shared object; recompile with -fPIC
236 0
error: x264_bit_depth undeclared (first use in this function) did you mean x264_picture_t
error: x264_bit_depth undeclared (first use in this function) did you mean x264_picture_t
163 0
|
Python
解决办法:RuntimeWarning: numpy.dtype size changed, may indicate binary incompatibility. Expected 96
解决办法:RuntimeWarning: numpy.dtype size changed, may indicate binary incompatibility. Expected 96
565 0
成功解决ValueError: numpy.ufunc size changed, may indicate binary incompatibility. Expected 216 from C h
成功解决ValueError: numpy.ufunc size changed, may indicate binary incompatibility. Expected 216 from C h
成功解决ValueError: numpy.ufunc size changed, may indicate binary incompatibility. Expected 216 from C h
成功解决KeyError: “Passing list-likes to .loc or [] with any missing labels is no longer supported. The
成功解决KeyError: “Passing list-likes to .loc or [] with any missing labels is no longer supported. The
成功解决ParserError: Error tokenizing data. C error: Expected 2 fields in line 53, saw 3
成功解决ParserError: Error tokenizing data. C error: Expected 2 fields in line 53, saw 3