The Spark programming model
Before we delve into a high-level overview of Spark's design, we will introduce the SparkContext
object as well as the Spark shell, which we will use to interactively explore the basics of the Spark programming model.
While this section provides a brief overview and examples of using Spark, we recommend that you read the following documentation to get a detailed understanding:
Note
Refer to the following URLs:
- For the Spark Quick Start refer to, http://spark.apache.org/docs/latest/quick-start
- For the Spark Programming guide, which covers Scala, Java, Python and R--, refer to, http://spark.apache.org/docs/latest/programming-guide.html
SparkContext and SparkConf
The starting point of writing any Spark program is SparkContext
(or JavaSparkContext
in Java). SparkContext
is initialized with an instance of a SparkConf
object, which contains various Spark cluster-configuration settings (for example, the URL of the master node).
It is a main entry point for Spark functionality. A SparkContext
is a connection to a Spark cluster. It can be used to create RDDs, accumulators, and broadcast variables on the cluster.
Only one SparkContext
is active per JVM. You must call stop()
, which is the active SparkContext
, before creating a new one.
Once initialized, we will use the various methods found in the SparkContext
object to create and manipulate distributed datasets and shared variables. The Spark shell (in both Scala and Python, which is unfortunately not supported in Java) takes care of this context initialization for us, but the following lines of code show an example of creating a context running in the local mode in Scala:
val conf = new SparkConf() .setAppName("Test Spark App") .setMaster("local[4]") val sc = new SparkContext(conf)
This creates a context running in the local mode with four threads, with the name of the application set to Test Spark App
. If we wish to use the default configuration values, we could also call the following simple constructor for our SparkContext
object, which works in the exact same way:
val sc = new SparkContext("local[4]", "Test Spark App")
Note
Downloading the example code
You can download the example code files for all Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book from any other source, you can visithttp://www.packtpub.com/support and register to have the files e-mailed directly to you.
SparkSession
SparkSession
allows programming with the DataFrame
and Dataset APIs. It is a single point of entry for these APIs.
First, we need to create an instance of the SparkConf
class and use it to create the SparkSession
instance. Consider the following example:
val spConfig = (new SparkConf).setMaster("local").setAppName("SparkApp") val spark = SparkSession .builder() .appName("SparkUserData").config(spConfig) .getOrCreate()
Next we can use spark object to create a DataFrame
:
val user_df = spark.read.format("com.databricks.spark.csv") .option("delimiter", "|").schema(customSchema) .load("/home/ubuntu/work/ml-resources/spark-ml/data/ml-100k/u.user") val first = user_df.first()
The Spark shell
Spark supports writing programs interactively using the Scala, Python, or R REPL (that is, the Read-Eval-Print-Loop, or interactive shell). The shell provides instant feedback as we enter code, as this code is immediately evaluated. In the Scala shell, the return result and type is also displayed after a piece of code is run.
To use the Spark shell with Scala, simply run ./bin/spark-shell
from the Spark base directory. This will launch the Scala shell and initialize SparkContext
, which is available to us as the Scala value, sc
. With Spark 2.0, a SparkSession
instance in the form of Spark variable is available in the console as well.
Your console output should look similar to the following:
$ ~/work/spark-2.0.0-bin-hadoop2.7/bin/spark-shell Using Spark's default log4j profile: org/apache/spark/log4j- defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). 16/08/06 22:14:25 WARN NativeCodeLoader: Unable to load native- hadoop library for your platform... using builtin-java classes where applicable 16/08/06 22:14:25 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.22.180 instead (on interface eth1) 16/08/06 22:14:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 16/08/06 22:14:26 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 16/08/06 22:14:27 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect. Spark context Web UI available at http://192.168.22.180:4041 Spark context available as 'sc' (master = local[*], app id = local- 1470546866779). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / ______/ __/ '_/ /___/ .__/_,_/_/ /_/_ version 2.0.0 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_60) Type in expressions to have them evaluated. Type :help for more information. scala>
To use the Python shell with Spark, simply run the ./bin/pyspark
command. Like the Scala shell, the Python SparkContext
object should be available as the Python variable, sc
. Your output should be similar to this:
~/work/spark-2.0.0-bin-hadoop2.7/bin/pyspark Python 2.7.6 (default, Jun 22 2015, 17:58:13) [GCC 4.8.2] on linux2 Type "help", "copyright", "credits" or "license" for more information. Using Spark's default log4j profile: org/apache/spark/log4j- defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). 16/08/06 22:16:15 WARN NativeCodeLoader: Unable to load native- hadoop library for your platform... using builtin-java classes where applicable 16/08/06 22:16:15 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.22.180 instead (on interface eth1) 16/08/06 22:16:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 16/08/06 22:16:16 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / ______/ __/ '_/ /__ / .__/_,_/_/ /_/_ version 2.0.0 /_/ Using Python version 2.7.6 (default, Jun 22 2015 17:58:13) SparkSession available as 'spark'. >>>
R is a language and has a runtime environment for statistical computing and graphics. It is a GNU project. R is a different implementation of S (a language developed by Bell Labs).
R provides statistical (linear and nonlinear modeling, classical statistical tests, time-series analysis, classification, and clustering) and graphical techniques. It is considered to be highly extensible.
To use Spark using R, run the following command to open Spark-R shell:
$ ~/work/spark-2.0.0-bin-hadoop2.7/bin/sparkR R version 3.0.2 (2013-09-25) -- "Frisbee Sailing" Copyright (C) 2013 The R Foundation for Statistical Computing Platform: x86_64-pc-linux-gnu (64-bit) R is free software and comes with ABSOLUTELY NO WARRANTY. You are welcome to redistribute it under certain conditions. Type 'license()' or 'licence()' for distribution details. Natural language support but running in an English locale R is a collaborative project with many contributors. Type 'contributors()' for more information and 'citation()' on how to cite R or R packages in publications. Type 'demo()' for some demos, 'help()' for on-line help, or 'help.start()' for an HTML browser interface to help. Type 'q()' to quit R. Launching java with spark-submit command /home/ubuntu/work/spark- 2.0.0-bin-hadoop2.7/bin/spark-submit "sparkr-shell" /tmp/RtmppzWD8S/backend_porta6366144af4f Using Spark's default log4j profile: org/apache/spark/log4j- defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). 16/08/06 22:26:22 WARN NativeCodeLoader: Unable to load native- hadoop library for your platform... using builtin-java classes where applicable 16/08/06 22:26:22 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.22.186 instead (on interface eth1) 16/08/06 22:26:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 16/08/06 22:26:22 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ ____/ __/ '_/ /___/ .__/_,_/_/ /_/_ version 2.0.0 /_/ SparkSession available as 'spark'. During startup - Warning message: package 'SparkR' was built under R version 3.1.1 >
Resilient Distributed Datasets
The core of Spark is a concept called the Resilient Distributed Dataset (RDD). An RDD is a collection of records (strictly speaking, objects of some type) that are distributed or partitioned across many nodes in a cluster (for the purposes of the Spark local mode, the single multithreaded process can be thought of in the same way). An RDD in Spark is fault-tolerant; this means that if a given node or task fails (for some reason other than erroneous user code, such as hardware failure, loss of communication, and so on), the RDD can be reconstructed automatically on the remaining nodes and the job will still be completed.
Creating RDDs
RDDs can be Scala Spark shells that you launched earlier:
val collection = List("a", "b", "c", "d", "e") val rddFromCollection = sc.parallelize(collection)
RDDs can also be created from Hadoop-based input sources, including the local filesystem, HDFS, and Amazon S3. A Hadoop-based RDD can utilize any input format that implements the Hadoop InputFormat
interface, including text files, other standard Hadoop formats, HBase, Cassandra, tachyon, and many more.
The following code is an example of creating an RDD from a text file located on the local filesystem:
val rddFromTextFile = sc.textFile("LICENSE")
The preceding textFile
method returns an RDD where each record is a String
object that represents one line of the text file. The output of the preceding command is as follows:
rddFromTextFile: org.apache.spark.rdd.RDD[String] = LICENSE
MapPartitionsRDD[1] at textFile at <console>:24
The following code is an example of how to create an RDD from a text file located on the HDFS using hdfs://
protocol:
val rddFromTextFileHDFS = sc.textFile("hdfs://input/LICENSE ")
The following code is an example of how to create an RDD from a text file located on the Amazon S3 using s3n://
protocol:
val rddFromTextFileS3 = sc.textFile("s3n://input/LICENSE ")
Spark operations
Once we have created an RDD, we have a distributed collection of records that we can manipulate. In Spark's programming model, operations are split into transformations and actions. Generally speaking, a transformation operation applies some function to all the records in the dataset, changing the records in some way. An action typically runs some computation or aggregation operation and returns the result to the driver program where SparkContext
is running.
Spark operations are functional in style. For programmers familiar with functional programming in Scala, Python, or Lambda expressions in Java 8, these operations should seem natural. For those without experience in functional programming, don't worry; the Spark API is relatively easy to learn.
One of the most common transformations that you will use in Spark programs is the map operator. This applies a function to each record of an RDD, thus mapping the input to some new output. For example, the following code fragment takes the RDD we created from a local text file and applies the size
function to each record in the RDD. Remember that we created an RDD of Strings. Using map
, we can transform each string to an integer, thus returning an RDD of Ints
:
val intsFromStringsRDD = rddFromTextFile.map(line => line.size)
You should see output similar to the following line in your shell; this indicates the type of the RDD:
intsFromStringsRDD: org.apache.spark.rdd.RDD[Int] =
MapPartitionsRDD[2] at map at <console>:26
In the preceding code, we saw the use of the =>
syntax. This is the Scala syntax for an anonymous function, which is a function that is not a named method (that is, one defined using the def
keyword in Scala or Python, for example).
Note
While a detailed treatment of anonymous functions is beyond the scope of this book, they are used extensively in Spark code in Scala and Python, as well as in Java 8 (both in examples and real-world applications), so it is useful to cover a few practicalities.The line => line.size
syntax means that we are applying a function where =>
is the operator, and the output is the result of the code to the right of the =>
operator. In this case, the input is line, and the output is the result of calling line.size
. In Scala, this function that maps a string to an integer is expressed as String => Int
.This syntax saves us from having to separately define functions every time we use methods such as map; this is useful when the function is simple and will only be used once, as in this example.
Now, we can apply a common action operation, count, to return the number of records in our RDD:
intsFromStringsRDD.count
The result should look something like the following console output:
res0: Long = 299
Perhaps we want to find the average length of each line in this text file. We can first use the sum
function to add up all the lengths of all the records and then divide the sum by the number of records:
val sumOfRecords = intsFromStringsRDD.sum val numRecords = intsFromStringsRDD.count val aveLengthOfRecord = sumOfRecords / numRecords
The result will be as follows:
scala> intsFromStringsRDD.count res0: Long = 299 scala> val sumOfRecords = intsFromStringsRDD.sum sumOfRecords: Double = 17512.0 scala> val numRecords = intsFromStringsRDD.count numRecords: Long = 299 scala> val aveLengthOfRecord = sumOfRecords / numRecords aveLengthOfRecord: Double = 58.5685618729097
Spark operations, in most cases, return a new RDD, with the exception of most actions, which return the result of a computation (such as Long
for count and Double
for sum in the preceding example). This means that we can naturally chain together operations to make our program flow more concise and expressive. For example, the same result as the one in the preceding line of code can be achieved using the following code:
val aveLengthOfRecordChained = rddFromTextFile.map(line => line.size).sum / rddFromTextFile.count
An important point to note is that Spark transformations are lazy. That is, invoking a transformation on an RDD does not immediately trigger a computation. Instead, transformations are chained together and are effectively only computed when an action is called. This allows Spark to be more efficient by only returning results to the driver when necessary so that the majority of operations are performed in parallel on the cluster.
This means that if your Spark program never uses an action operation, it will never trigger an actual computation, and you will not get any results. For example, the following code will simply return a new RDD that represents the chain of transformations:
val transformedRDD = rddFromTextFile.map(line => line.size).filter(size => size > 10).map(size => size * 2)
This returns the following result in the console:
transformedRDD: org.apache.spark.rdd.RDD[Int] =
MapPartitionsRDD[6] at map at <console>:26
Notice that no actual computation happens and no result is returned. If we now call an action, such as sum, on the resulting RDD, the computation will be triggered:
val computation = transformedRDD.sum
You will now see that a Spark job is run, and it results in the following console output:
computation: Double = 35006.0
Note
The complete list of transformations and actions possible on RDDs, as well as a set of more detailed examples, are available in the Spark programming guide (located at http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations), and the API documentation (the Scala API documentation) is located at (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD).
Caching RDDs
One of the most powerful features of Spark is the ability to cache data in memory across a cluster. This is achieved through the use of the cache method on an RDD:
rddFromTextFile.cache res0: rddFromTextFile.type = MapPartitionsRDD[1] at textFile at <console>:27
Calling cache
on an RDD tells Spark that the RDD should be kept in memory. The first time an action is called on the RDD that initiates a computation, the data is read from its source and put into memory. Hence, the first time such an operation is called, the time it takes to run the task is partly dependent on the time it takes to read the data from the input source. However, when the data is accessed the next time (for example, in subsequent queries in analytics or iterations in a machine learning model), the data can be read directly from memory, thus avoiding expensive I/O operations and speeding up the computation, in many cases, by a significant factor.
If we now call the count
or sum
function on our cached RDD, the RDD is loaded into memory:
val aveLengthOfRecordChained = rddFromTextFile.map(line => line.size).sum / rddFromTextFile.count
Note
Spark also allows more fine-grained control over caching behavior. You can use the persist method to specify what approach Spark uses to cache data. More information on RDD caching can be found here:http://spark.apache.org/docs/latest/programmingguide.html#rdd-persistence
Broadcast variables and accumulators
Another core feature of Spark is the ability to create two special types of variables--broadcast variables and accumulators.
A broadcast variable is a read-only variable that is created from the driver program object and made available to the nodes that will execute the computation. This is very useful in applications that need to make the same data available to the worker nodes in an efficient manner, such as distributed systems. Spark makes creating broadcast variables as simple as calling a method on SparkContext
, as follows:
val broadcastAList = sc.broadcast(List("a", "b", "c", "d", "e"))
A broadcast variable can be accessed from nodes other than the driver program that created it (that is, the worker nodes) by calling value
on the variable:
sc.parallelize(List("1", "2", "3")).map(x => broadcastAList.value ++ x).collect
This code creates a new RDD with three records from a collection (in this case, a Scala List
) of ("1", "2", "3")
. In the map function, it returns a new collection with the relevant rom our new RDD appended to the broadcastAList
that is our broadcast variable:
... res1: Array[List[Any]] = Array(List(a, b, c, d, e, 1), List(a, b, c, d, e, 2), List(a, b, c, d, e, 3))
Notice the collect
method in the preceding code. This is a Spark action that returns the entire RDD to the driver as a Scala (or Python or Java) collection.
We will often use when we wish to apply further processing to our results locally within the driver program.
Note
Note that collect
should generally only be used in cases where we really want to return the full result set to the driver and perform further processing. If we try to call collect
on a very large dataset, we might run out of memory on the driver and crash our program.It is preferable to perform as much heavy-duty processing on our Spark cluster as possible, preventing the driver from becoming a bottleneck. In many cases, however, such as during iterations in many machine learning models, collecting results to the driver is necessary.
On inspecting the result, we will see that for each of the three records in our new RDD, we now have a record that is our original broadcasted List
, with the new element appended to it (that is, there is now "1"
, "2"
, or "3"
at the end):
An accumulator is also a variable that is broadcasted to the worker nodes. The key difference between a broadcast variable and an accumulator is that while the broadcast
variable is read-only, the accumulator can be added to. There are limitations to this, that is, in particular, the addition must be an associative operation so that the global accumulated value can be correctly computed in parallel and returned to the driver program. Each worker node can only access and add to its own local accumulator value, and only the driver program can access the global value. Accumulators are also accessed within the Spark code using the value method.
Note
For more details on broadcast variables and accumulators, refer to the Shared Variables section of the Spark Programming Guide at http://spark.apache.org/docs/latest/programming-guide.html#shared-variables.