The first step to a Spark program in R
SparkR is an R package which provides a frontend to use Apache Spark from R. In Spark 1.6.0; SparkR provides a distributed data frame on large datasets. SparkR also supports distributed machine learning using MLlib. This is something you should try out while reading machine learning chapters.
SparkR DataFrames
DataFrame
is a collection of data organized into names columns that are distributed. This concept is very similar to a relational database or a data frame of R but with much better optimizations. Source of these data frames could be a CSV, a TSV, Hive tables, local R data frames, and so on.
Spark distribution can be run using the ./bin/sparkR shell
.
Following on from the preceding examples, we will now write an R version. We assume that you have R (R version 3.0.2 (2013-09-25)-Frisbee Sailing), R Studio and higher installed on your system (for example, most Linux and Mac OS X systems come with Python preinstalled).
The example program is included in the sample code for this chapter, in the directory named r-spark-app
, which also contains the CSV data file under the data
subdirectory. The project contains a script, r-script-01.R
, which is provided in the following. Make sure you change PATH
to appropriate value for your environment.
Sys.setenv(SPARK_HOME = "/PATH/spark-2.0.0-bin-hadoop2.7") .libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths())) #load the Sparkr library library(SparkR) sc <- sparkR.init(master = "local", sparkPackages="com.databricks:spark-csv_2.10:1.3.0") sqlContext <- sparkRSQL.init(sc) user.purchase.history <- "/PATH/ml-resources/spark-ml/Chapter_01/r-spark-app/data/UserPurchaseHistory.csv" data <- read.df(sqlContext, user.purchase.history, "com.databricks.spark.csv", header="false") head(data) count(data) parseFields <- function(record) { Sys.setlocale("LC_ALL", "C") # necessary for strsplit() to work correctly parts <- strsplit(as.character(record), ",") list(name=parts[1], product=parts[2], price=parts[3]) } parsedRDD <- SparkR:::lapply(data, parseFields) cache(parsedRDD) numPurchases <- count(parsedRDD) sprintf("Number of Purchases : %d", numPurchases) getName <- function(record){ record[1] } getPrice <- function(record){ record[3] } nameRDD <- SparkR:::lapply(parsedRDD, getName) nameRDD = collect(nameRDD) head(nameRDD) uniqueUsers <- unique(nameRDD) head(uniqueUsers) priceRDD <- SparkR:::lapply(parsedRDD, function(x) { as.numeric(x$price[1])}) take(priceRDD,3) totalRevenue <- SparkR:::reduce(priceRDD, "+") sprintf("Total Revenue : %.2f", s) products <- SparkR:::lapply(parsedRDD, function(x) { list( toString(x$product[1]), 1) }) take(products, 5) productCount <- SparkR:::reduceByKey(products, "+", 2L) productsCountAsKey <- SparkR:::lapply(productCount, function(x) { list( as.integer(x[2][1]), x[1][1])}) productCount <- count(productsCountAsKey) mostPopular <- toString(collect(productsCountAsKey)[[productCount]][[2]]) sprintf("Most Popular Product : %s", mostPopular)
Run the script with the following command on the bash terminal:
$ Rscript r-script-01.R
Your output will be similar to the following listing:
> sprintf("Number of Purchases : %d", numPurchases) [1] "Number of Purchases : 5" > uniqueUsers <- unique(nameRDD) > head(uniqueUsers) [[1]] [[1]]$name [[1]]$name[[1]] [1] "John" [[2]] [[2]]$name [[2]]$name[[1]] [1] "Jack" [[3]] [[3]]$name [[3]]$name[[1]] [1] "Jill" [[4]] [[4]]$name [[4]]$name[[1]] [1] "Bob" > sprintf("Total Revenue : %.2f", totalRevenueNum) [1] "Total Revenue : 39.91" > sprintf("Most Popular Product : %s", mostPopular) [1] "Most Popular Product : iPad Cover"