Configuring a multi-node instance of Spark
Setting up a multi-node Spark cluster requires quite a few more steps to get it ready. In this recipe, we will go step-by-step through the script that will help you with this process; the script needs to run on the driver node and all the executors to set up the environment.
Getting ready
In this recipe, we are solely focusing on a Linux environment (we are using Ubuntu Server 16.04 LTS). The following prerequisites are required before you can follow with the rest of the recipe:
- A clean installation of a Linux distribution; in our case, we have installed Ubuntu Server 16.04 LTS on each machine in our cluster of three Dell R710s.
- Each machine needs to be connected to the internet and accessible from your local machine. You will need the machines' IPs and their hostnames; on Linux, you can check the IP by issuing the
ifconfig
command and reading theinet addr
. To check your hostname, type atcat/etc/hostname
.
- On each server, we added a user group called
hadoop
. Following this, we have created a user calledhduser
and added it to thehadoop
group. Also, make sure that thehduser
hassudo
rights. If you do not know how to do this, check the See also section of this recipe. - Make sure you have added the ability to reach your servers via SSH. If you cannot do this, run
sudo apt-get install openssh-server openssh-client
on each server to install the necessary environments. - If you want to read and write to Hadoop and Hive, you need to have these two environments installed and configured on your cluster. Check https://data-flair.training/blogs/install-hadoop-2-x-on-ubuntu/ for Hadoop installation and configuration and http://www.bogotobogo.com/Hadoop/BigData_hadoop_Hive_Install_On_Ubuntu_16_04.php for Hive.
Note
If you have these two environments set up, some of the steps from our script would be obsolete. However, we will present all of the steps as follows, assuming you only want the Spark environment.
No other prerequisites are required.
For the purpose of automating the deployment of the Spark environment in a cluster setup, you will also have to:
- Create a
hosts.txt
file. Each entry on the list is the IP address of one of the servers followed by two spaces and a hostname. Do not delete thedriver:
norexecutors:
lines. Also, note that we only allow one driver in our cluster (some clusters support redundant drivers). An example of the content of this file is as follows:
driver: 192.168.17.160 pathfinder
executors: 192.168.17.161 discovery1 192.168.17.162 discovery2
- On your local machine, add the IPs and hostnames to your
/etc/hosts
file so you can access the servers via hostnames instead of IPs (once again, we are assuming you are running a Unix-like system such as macOS or Linux). For example, the following command will addpathfinder
to our/etc/hosts
file:sudo echo 192.168.1.160 pathfinder >> /etc/hosts
. Repeat this for all machines from your server.
- Copy the
hosts.txt
file to each machine in your cluster; we assume the file will be placed in the root folder for thehduser
. You can attain this easily with thescp hosts.txt hduser@<your-server-name>:~
command, where<your-server-name>
is the hostname of the machine. - To run the
installOnRemote.sh
script (see theChapter01/installOnRemote.sh
file) from your local machine, do the following:ssh -tq hduser@<your-server-name> "echo $(base64 -i installOnRemote.sh) | base64 -d | sudo bash"
. We will go through these steps in detail in theinstallOnRemote.sh
script in the next section. - Follow the prompts on the screen to finalize the installation and configuration steps. Repeat step 4 for each machine in your cluster.
How to do it...
The installOnRemote.sh
script for this recipe can be found in the Chapter01
folder in the GitHub repository: http://bit.ly/2ArlBck. Some portions of the script are very similar to the ones we have outlined in the previous recipes, so we will skip those; you can refer to previous recipes for more information (especially the Installing Spark requirements and the Installing Spark from binaries recipes).
The top-level structure of the script is as follows:
#!/bin/bash
# Shell script for installing Spark from binaries # on remote servers # # PySpark Cookbook # Author: Tomasz Drabas, Denny Lee # Version: 0.1 # Date: 12/9/2017
_spark_binary="http://mirrors.ocf.berkeley.edu/apache/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz" _spark_archive=$( echo "$_spark_binary" | awk -F '/' '{print $NF}' ) _spark_dir=$( echo "${_spark_archive%.*}" ) _spark_destination="/opt/spark" _java_destination="/usr/lib/jvm/java-8-oracle" _python_binary="https://repo.continuum.io/archive/Anaconda3-5.0.1-Linux-x86_64.sh"
_python_archive=$( echo "$_python_binary" | awk -F '/' '{print $NF}' ) _python_destination="/opt/python"
_machine=$(cat /etc/hostname) _today=$( date +%Y-%m-%d )
_current_dir=$(pwd) # store current working directory
...
printHeader readIPs checkJava installScala installPython updateHosts configureSSH downloadThePackage unpack moveTheBinaries setSparkEnvironmentVariables updateSparkConfig cleanUp
We have highlighted the portions of the script that are more relevant to this recipe in bold font.
How it works...
As with the previous recipes, we will first specify where we are going to download the Spark binaries from and create all the relevant global variables we are going to use later.
Next, we read in the hosts.txt
file:
function readIPs() { input="./hosts.txt"
driver=0 executors=0 _executors="" IFS='' while read line do
if [[ "$master" = "1" ]]; then _driverNode="$line" driver=0 fi
if [[ "$slaves" = "1" ]]; then _executors=$_executors"$line\n" fi
if [[ "$line" = "driver:" ]]; then driver=1 executors=0 fi
if [[ "$line" = "executors:" ]]; then executors=1 driver=0 fi
if [[ -z "${line}" ]]; then continue fi done < "$input" }
We store the path to the file in the input
variable. The driver
and the executors
variables are flags we use to skip the "driver:"
and the "executors:"
lines from the input file. The _executors
empty string will store the list of executors, which are delimited by a newline"\n"
.
IFS stands for internal field separator. Whenever bash
reads a line from a file, it will split it on that character. Here, we will set it to an empty character ''
so that we preserve the double spaces between the IP address and the hostname.
Next, we start reading the file, line-by-line. Let's see how the logic works inside the loop; we'll start a bit out of order so that the logic is easier to understand:
- If the
line
we just read equals to"driver:"
(theif [[ "$line" = "driver:" ]];
conditional), we set thedriver
flag to1
so that when the nextline
is read, we store it as a_driverNode
(this is done inside theif [[ "$driver" = "1" ]];
conditional). Inside that conditional, we also reset theexecutors
flag to0
. The latter is done in case you start with executors first, followed by a single driver in thehosts.txt
. Once theline
with the driver node information is read, we reset thedriver
flag to0
.
- On the other hand, if the
line
we just read equals to"executors:"
(theif [[ "$line" = "executors:" ]];
conditional), we set theexecutors
flag to1
(and reset thedriver
flag to0
). This guarantees that the next line read will be appended to the_executors
string, separated by the"\n"
newline character (this happens inside theif [[ "$executors" = "1" ]];
conditional). Note that we do not set theexecutor
flag to0
as we allow for more than one executor. - If we encounter an empty line—which we can check for in bash with the
if [[ -z "${line}" ]];
conditional—we skip it.
You might notice that we use the "<"
redirection pipe to read in the data (indicated here by the input variable).
Note
You can read more about the redirection pipes here: http://www.tldp.org/LDP/abs/html/io-redirection.html.
Since Spark requires Java and Scala to work, next we have to check if Java is installed, and we will install Scala (as it normally isn't present while Java might be). This is achieved with the following functions:
function checkJava() { if type -p java; then echo "Java executable found in PATH" _java=java elif [[ -n "$JAVA_HOME" ]] && [[ -x "$JAVA_HOME/bin/java" ]]; then echo "Found Java executable in JAVA_HOME" _java="$JAVA_HOME/bin/java" else echo "No Java found. Install Java version $_java_required or higher first or specify JAVA_HOME variable that will point to your Java binaries." installJava fi }
function installJava() { sudo apt-get install python-software-properties sudo add-apt-repository ppa:webupd8team/java sudo apt-get update sudo apt-get install oracle-java8-installer }
function installScala() { sudo apt-get install scala } function installPython() { curl -O "$_python_binary" chmod 0755 ./"$_python_archive" sudo bash ./"$_python_archive" -b -u -p "$_python_destination" }
The logic here doesn't differ much from what we presented in the Installing Spark requirements recipe. The only notable difference in the checkJava
function is that if we do not find Java on the PATH
variable or inside the JAVA_HOME
folder, we do not exit but run installJava
, instead.
There are many ways to install Java; we have already presented you with one of them earlier in this book—check the Installing Java section in the Installing Spark requirements recipe. Here, we used the built-in apt-get
tool.
Note
The apt-get
tool is a convenient, fast, and efficient utility for installing packages on your Linux machine. APT stands for Advanced Packaging Tool.
First, we install the python-software-properties
. This set of tools provides an abstraction of the used apt
repositories. It enables easy management of distribution as well as independent software vendor software sources. We need this as in the next line we add the add-apt-repository
; we add a new repository as we want the Oracle Java distribution. The sudo apt-get update
command refreshes the contents of the repositories and, in our current case, fetches all the packages available in ppa:webupd8team/java
. Finally, we install the Java package: just follow the prompts on the screen. We will install Scala the same way.
Note
The default location where the package should install is /usr/lib/jvm/java-8-oracle
. If this is not the case or you want to install it in a different folder, you will have to alter the _java_destination
variable inside the script to reflect the new destination.
The advantage of using this tool is this: if there are already Java and Scala environments installed on a machine, using apt-get
will either skip the installation (if the environment is up-to-date with the one available on the server) or ask you to update to the newest version.
We will also install the Anaconda distribution of Python (as mentioned many times previously, since we highly recommend this distribution). To achieve this goal, we must download the Anaconda3-5.0.1-Linux-x86_64.sh
script first and then follow the prompts on the screen. The -b
parameter to the script will not update the .bashrc
file (we will do that later), the -u
switch will update the Python environment in case /usr/local/python
already exists, and -p
will force the installation to that folder.
Having passed the required installation steps, we will now update the /etc/hosts
files on the remote machines:
function updateHosts() {
_hostsFile="/etc/hosts"
# make a copy (if one already doesn't exist) if ! [ -f "/etc/hosts.old" ]; then sudo cp "$_hostsFile" /etc/hosts.old fi
t="###################################################\n" t=$t"#\n" t=$t"# IPs of the Spark cluster machines\n" t=$t"#\n" t=$t"# Script: installOnRemote.sh\n" t=$t"# Added on: $_today\n" t=$t"#\n" t=$t"$_driverNode\n" t=$t"$_executors\n"
sudo printf "$t" >> $_hostsFile
}
This is a simple function that, first, creates a copy of the /etc/hosts
file, and then appends the IPs and hostnames of the machines in our cluster. Note that the format required by the /etc/hosts
file is the same as in the hosts.txt
file we use: per row, an IP address of the machine followed by two spaces followed by the hostname.
Note
We use two spaces for readability purposes—one space separating an IP and the hostname would also work.
Also, note that we do not use the echo
command here, but printf
; the reason behind this is that the printf
command prints out a formatted version of the string, properly handling the newline "\n"
characters.
Next, we configure the passwordless SSH sessions (check the following See also subsection) to aid communication between the driver node and the executors:
function configureSSH() { # check if driver node IFS=" " read -ra temp <<< "$_driverNode" _driver_machine=( ${temp[1]} ) _all_machines="$_driver_machine\n" if [ "$_driver_machine" = "$_machine" ]; then # generate key pairs (passwordless) sudo -u hduser rm -f ~/.ssh/id_rsa sudo -u hduser ssh-keygen -t rsa -P "" -f ~/.ssh/id_rsa IFS="\n" read -ra temp <<< "$_executors" for executor in ${temp[@]}; do # skip if empty line if [[ -z "${executor}" ]]; then continue fi # split on space IFS=" " read -ra temp_inner <<< "$executor" echo echo "Trying to connect to ${temp_inner[1]}" cat ~/.ssh/id_rsa.pub | ssh "hduser"@"${temp_inner[1]}" 'mkdir -p .ssh && cat >> .ssh/authorized_keys' _all_machines=$_all_machines"${temp_inner[1]}\n" done fi echo "Finishing up the SSH configuration" }
Inside this function, we first check if we are on the driver node, as defined in the hosts.txt
file, as we only need to perform these tasks on the driver. The read -ra temp <<< "$_driverNode"
command reads the _driverNode
(in our case, it is 192.168.1.160 pathfinder
), and splits it at the space character (remember what IFS
stands for?). The -a
switch instructs the read
method to store the split _driverNode
string in the temp
array and the -r
parameter makes sure that the backslash does not act as an escape character. We store the name of the driver in the _driver_machine
variable and append it to the _all_machines
string (we will use this later).
If we are executing this script on the driver machine, the first thing we must do is remove the old SSH key (using the rm
function with the -f
, force switch) and create a new one. The sudo -u hduser
switch allows us to perform these actions as the hduser
(instead of the root
user).
Note
When we submit the script to run from our local machine, we start an SSH session as a root on the remote machine. You will see how this is done shortly, so take our word on that for now.
We will use the ssh-keygen
method to create the SSH key pair. The -t
switch allows us to select the encryption algorithm (we are using RSA encryption), the -P
switch determines the password to use (we want this passwordless, so we choose ""
), and the -f
parameter specifies the filename for storing the keys.
Next, we loop through all the executors: we need to append the contents of ~/.ssh/id_rsa.pub
to their ~/.ssh/authorized_keys
files. We split the _executors
at the "\n"
character and loop through all of them. To deliver the contents of the id_rsa.pub
file to the executors, we use the cat
tool to print out the contents of the id_rsa.pub
file and then pipe it to the ssh
tool. The first parameter we pass to the ssh
is the username and the hostname we want to connect to. Next, we pass the commands we want to execute on the remote machine. First, we attempt to create the .ssh
folder if one does not exist. This is followed by outputting the id_rsa.pub
file to .ssh/authorized_keys
.
Following the SSH session's configurations on the cluster, we download the Spark binaries, unpack them, and move them to _spark_destination
.
Note
We have outlined these steps in the Installing Spark from sources and Installing Spark from binaries sections, so we recommend that you check them out.
Finally, we need to set two Spark configuration files: the spark-env.sh
and the slaves
files:
function updateSparkConfig() { cd $_spark_destination/conf sudo -u hduser cp spark-env.sh.template spark-env.sh echo "export JAVA_HOME=$_java_destination" >> spark-env.sh echo "export SPARK_WORKER_CORES=12" >> spark-env.sh sudo -u hduser cp slaves.template slaves printf "$_all_machines" >> slaves }
We need to append the JAVA_HOME
variable to spark-env.sh
so that Spark can find the necessary libraries. We must also specify the number of cores per worker to be 12
; this goal is attained by setting the SPARK_WORKER_CORES
variable.
Note
You might want to tune the SPARK_WORKER_CORES
value to your needs. Check this spreadsheet for help: http://c2fo.io/img/apache-spark-config-cheatsheet/C2FO-Spark-Config-Cheatsheet.xlsx (which is available from here: http://c2fo.io/c2fo/spark/aws/emr/2016/07/06/apache-spark-config-cheatsheet/).
Next, we have to output the hostnames of all the machines in our cluster to the slaves
file.
In order to execute the script on the remote machine, and since we need to run it in an elevated mode (as root
using sudo
), we need to encrypt the script before we send it over the wire. An example of how this is done is as follows (from macOS to remote Linux):
ssh -tq hduser@pathfinder "echo $(base64 -i installOnRemote.sh) | base64 -d | sudo bash"
Or from Linux to remote Linux:
ssh -tq hduser@pathfinder "echo $(base64 -w0 installOnRemote.sh) | base64 -d | sudo bash"
The preceding script uses the base64
encryption tool to encrypt the installOnRemote.sh
script before pushing it over to the remote. Once on the remote, we once again use base64
to decrypt the script (the -d
switch) and run it as root
(via sudo
). Note that in order to run this type of script, we also pass the -tq
switch to the ssh
tool; the -t
option forces a pseudo Terminal allocation so that we can execute arbitrary screen-based scripts on the remote machine, and the -q
option quiets all the messages but those from our script.
Assuming all goes well, once the script finishes executing on all your machines, Spark has been successfully installed and configured on your cluster. However, before you can use Spark, you need either to close the connection to your driver and SSH to it again, or type:
source ~/.bashrc
This is so that the newly created environment variables are available, and your PATH
is updated.
To start your cluster, you can type:
start-all.sh
And all the machines in the cluster should be coming to life and be recognized by Spark.
In order to check if everything started properly, type:
jps
And it should return something similar to the following (in our case, we had three machines in our cluster):
40334 Master 41297 Worker 41058 Worker
See also
Here's a list of useful links that might help you to go through with this recipe:
- If you do not know how to add a user group, check this link: https://www.techonthenet.com/linux/sysadmin/ubuntu/create_group_14_04.php
- To add a
sudo
user, check this link: https://www.digitalocean.com/community/tutorials/how-to-add-and-delete-users-on-ubuntu-16-04 - Here are step-by-step manual instructions on how to install Spark: https://data-flair.training/blogs/install-apache-spark-multi-node-cluster/.
- Here is how to set a passwordless SSH communication between machines: https://www.tecmint.com/ssh-passwordless-login-using-ssh-keygen-in-5-easy-steps/