1.Writing R data frames returned from SparkR:::map

stack overflow原文地址
弱鸡小白在使用SparkR处理大规模的R dataframe时想使用map的方式进行数据操作。数据都是结构化的,并且每个分区都是相同的结构。本想的将这些数据作为parquet这样就可以避免collect的Action操作。现在很担心能不能再程序输出的output list后进行write.df的操作,能否使用worker tasks编写替代指定parquet进行操作?
小白的程序如下:

#! /usr/bin/Rscript
library(SparkR, lib.loc="/opt/spark-1.5.1-bin-without-hadoop/R/lib")
source("jdbc-utils.R")
options(stringsAsFactors = FALSE)

# I dislike having these here but when I move them into main(), it breaks - the sqlContext drops.
assign("sc", sparkR.init(master = "spark://poc-master-1:7077", sparkHome = "/opt/spark-1.5.1-bin-without-hadoop/", appName = "Peter Spark test", list(spark.executor.memory="4G")), envir = .GlobalEnv)
assign("sqlContext", sparkRSQL.init(sc), envir =.GlobalEnv)

#### MAP function ####
run.model <- function(v) {
 x <- v$xs[1]
 y <- v$ys[1] 
startTime <- format(Sys.time(), "%F %T") 
xs <- c(1:x)
endTime <- format(Sys.time(), "%F %T")
hostname <- system("hostname", intern = TRUE) 
xys <- data.frame(xs,y,startTime,endTime,hostname,stringsAsFactors = FALSE) 
return(xys)
}

# HERE BE THE SCRIPT BIT
main <- function() { 
# Make unique identifiers for each run 
xs <- c(1:365) 
ys <- c(1:1) 
xys <- data.frame(xs,ys,stringsAsFactors = FALSE) 
# Convert to Spark dataframe for mapping 
sqlContext <- get("sqlContext", envir = .GlobalEnv)
 xys.sdf <- createDataFrame(sqlContext, xys) 
# Let Spark do what Spark does 
output.list <- SparkR:::map(xys.sdf, run.model) 
# Reduce gives us a single R dataframe, which may not be what we want. 
output.redux <- SparkR:::reduce(output.list, rbind) 
# Or you can have it as a list of data frames. output.col <- collect(output.list) 
return(NULL)
}

小白心里是这样想的,先生成一个名字叫xys的dataframe,两列数据,一列是1:365,另一列是1。通过createDataFrame将其转换成为RDD,然后进行map和reduce的操作。同时编写了一个demo小函数,用来进行map。

程序结果.png

小白同学的心中是充满疑惑的:

  1. 并没有想象中的需要避免绝对的collect使用,而去将结果组合作为Parquet进行存储;
  2. 同时,也并不确信:::map的函数形式真正实现了并行,难道需要一直申明parallelise

对于小白的疑惑,大腿同学是这样解释的:
假设你的数据差不多是下面这个样子的:

rdd <- SparkR:::parallelize(sc, 1:5)
dfs <- SparkR:::map(rdd, function(x) mtcars[(x * 5):((x + 1) * 5), ])

首先给你瞅一眼mtcars的数据:

mtcars.png

瞅一眼程序结果:


程序结果.png

同时大腿也给出了自己的思路:
因为要对所有数据的列进行操作,完全可以把它转换成为row-wise的逐行操作类型;

rows <- SparkR:::flatMap(dfs, function(x) { 
data <- as.list(x) 
args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
do.call(mapply, append(args, data))})
sdf <- createDataFrame(sqlContext, rows)
head(sdf)
结果.png

大腿这里用了append秒rbind一万条街;用flatmap实现了map实现的捉襟见肘的多分区集合,小白深感佩服。
看到小白一脸葱白的样子,大神接着说:
接下来就可以使用简单的write.df / saveDF
小白啊,你的问题主要是一开始使用了一个内部方法map,他被从最初版本移除的一个重要原因是如果直接使用是不健全的,而且也不清楚将来会不会被支持,谁知道呢。
于是小白关注了大腿同学。

上一篇:ASP.NET-GridView之表头设计


下一篇:Hadoop的管理目录