





















































In this article by Vishal Shukla, author of the book Elasticsearch for Hadoop, we will take a look at how ES-Hadoop can integrate with Pig and Spark with ease.
Elasticsearch is great in getting insights into the indexed data. The Hadoop ecosystem does a great job in making Hadoop easily usable for different users by providing a comfortable interface. Some of the examples are Hive and Pig. Apart from these, Hadoop integrates well with other computing engines and platforms, such as Spark and Cascading.
(For more resources related to this topic, see here.)
For many use cases, Pig is one of the easiest ways to fiddle around with the data in the Hadoop ecosystem. Pig wins when it comes to ease of use and simple syntax for designing data flow pipelines without getting into complex programming. Assuming that you know Pig, we will cover how to move the data to and from Elasticsearch. If you don't know Pig yet, never mind. You can still carry on with the steps, and by the end of the article, you will at least know how to use Pig to perform data ingestion and reading with Elasticsearch.
Let's start by setting up Apache Pig. At the time of writing this article, the latest Pig version available is 0.15.0. You can use the following steps to set up the same version:
$ sudo wget –O /usr/local/pig.tar.gz
http://mirrors.sonic.net/apache/pig/pig-0.15.0/pig-0.15.0.tar.gz
$ cd /userusr/local
$ sudo tar –xvf pig.tar.gz
$ sudo mv pig-0.15.0 pig
export PIG_HOME=/usr/local/pig
export PATH=$PATH:$PIG_HOME/bin
$ source ~/.bashrc
$ mr-jobhistory-daemon.sh start historyserver
$ pig
grunt>
It's easy to forget to start the job history daemon once you restart your machine or VM. You may make this daemon run on start up, or you need to ensure this manually.
Now, we have Pig up and running. In order to use Pig with Elasticsearch, we must ensure that the ES-Hadoop JAR file is available in the Pig classpath.
Let's take the ES-Hadoop JAR file and and import it to HDFS using the following steps:
$ wget http://central.maven.org/maven2/org/elasticsearch/elasticsearch-hadoop/2.1.1/elasticsearch-hadoop-2.1.1.jar
Then, move the downloaded JAR to a convenient name as follows:
$ sudo mkdir /opt/lib
Now, import the JAR to HDFS:
$ hadoop fs –mkdir /lib
$ hadoop fs –put elasticsearch-hadoop-2.1.1.jar /lib/elasticsearch-hadoop-2.1.1.jar
Throughout this article, we will use a crime dataset that is tailored from the open dataset provided at https://data.cityofchicago.org/. This tailored dataset can be downloaded from http://www.packtpub.com/support, where all the code files required for this article are available.
Once you have downloaded the dataset, import it to HDFS at /ch07/crime_data.csv.
Let's import the crime dataset to Elasticsearch using Pig with ES-Hadoop. This provides the EsStorage class as Pig Storage.
grunt> REGISTER hdfs://localhost:9000/lib/elasticsearch-hadoop-2.1.1.jar;
Then, load the CSV data file as a relation with the following code:
grunt> SOURCE = load '/ch07/crimes_dataset.csv' using PigStorage(',') as (id:chararray,
caseNumber:chararray, date:datetime, block:chararray, iucr:chararray, primaryType:chararray, description:chararray, location:chararray, arrest:boolean, domestic:boolean, lat:double,lon:double);
This command reads the CSV fields and maps each token in the data to the respective field in the preceding command. The resulting relation, SOURCE, represents a relation with the Bag data structure that contains multiple Tuples.
grunt> TARGET = foreach SOURCE generate id, caseNumber, date, block, iucr, primaryType,
description, location, arrest, domestic, TOTUPLE(lon, lat) AS geoLocation;
Here, we need the nested object with the geoLocation name in the target Elasticsearch document. We can achieve this with a Tuple to represent the lat and lon fields. TOTUPLE() helps us to create this tuple. We then assigned the geoLocation alias for this tuple.
grunt> STORE TARGET INTO 'esh_pig/crimes' USING org.elasticsearch.hadoop.pig.EsStorage('es.http.timeout = 5m', 'es.index.auto.create = true', 'es.mapping.names=arrest:isArrest, domestic:isDomestic', 'es.mapping.id=id');
We can specify the target index and type to store indexed documents. The EsStorage class can accept multiple Elasticsearch configurations.es.mapping.names maps the Pig field name to Elasticsearch document's field name. You can use Pig's field id to assign a custom _id value for the Elasticsearch document using the es.mapping.id option. Similarly, you can set the _ttl and _timestamp metadata fields as well.
Pig uses just one reducer in the default configuration. It is recommended to change this behavior to have a parallelism that matches the number of shards available, as shown in the following command:
grunt> SET default_parallel 5;
Pig also combines the input splits, irrespective of its size. This makes it efficient for small files by reducing the number of mappers. However, this will give performance issues for large files. You can disable this behavior in the Pig script, as shown in the following command:
grunt> SET pig.splitCombination FALSE;
Executing the preceding commands will create the Elasticsearch index and import crime data documents. If you observe the created documents in Elasticsearch, you can see the geoLocation value isan array in the [-87.74274476, 41.87404405]format. This is because by default, ES-Hadoop ignores the tuple field names and simply converts them as an ordered array. If you wish to make your geoLocation field look similar to the key/value-based object with the lat/lon keys, you can do so by including the following configuration in EsStorage:
es.mapping.pig.tuple.use.field.names=true
If you have inputs as a well-formed JSON file, you can avoid conversion and transformations and directly pass the JSON document to Elasticsearch for indexing purposes.
You may have the JSON data in Pig as chararray, bytearray, or in any other form that translates to well-formed JSON by calling the toString() method, as shown in the following code:
grunt> JSON_DATA = LOAD '/ch07/crimes.json' USING PigStorage() AS (json:chararray);
grunt> STORE JSON_DATA INTO 'esh_pig/crimes_json' USING org.elasticsearch.hadoop.pig.EsStorage('es.input.json=true');
Take a look at the the type mapping of the esh_pig index in Elasticsearch. It maps the geoLocation type to double. This is done because Elasticsearch inferred the double type based on the field type we specified in Pig. To map geoLocation to geo_point, you must create the Elasticsearch mapping for it manually before executing the script.
Although Elasticsearch provides a data type detection based on the type of field in the incoming document, it is always good to create the type mapping beforehand in Elasticsearch. This is a one-time activity that you should do. Then, you can run the MapReduce, Pig, Hive, Cascading, or Spark jobs multiple times. This will avoid any surprises in the type detection.
For your reference, here is a list of some of the field types of Pig and Elasticsearch that map to each other. The table doesn't list no-brainer and absolutely intuitive type mappings:
Pig type
|
Elasticsearch type
|
chararray
|
This specifies string
|
bytearray
|
This indicates binary
|
tuple
|
This denotes an array(default) or object
|
bag
|
This specifies an array
|
map
|
This denotes an object
|
bigdecimal
|
This indicates Not supported
|
biginteger
|
This denotes Not supported
|
Reading data from Elasticsearch using Pig is as simple as writing a single command with the Elasticsearch query.
Here is a snippet of how to print tuples that has crimes related to theft:
grunt> REGISTER hdfs://localhost:9000/lib/elasticsearch-hadoop-2.1.1.jar
grunt> ES = LOAD 'esh_pig/crimes' using org.elasticsearch.hadoop.pig.EsStorage('{"query" :
{ "term" : { "primaryType" : "theft" } } }');
grunt> dump ES;
Executing the preceding commands will print the tuples Pig console.
Spark is a distributed computing system that provides huge performance boost compared to Hadoop MapReduce. It works on an abstraction of RDD (Resilient-distributed Datasets). This can be created for any data residing in Hadoop. Without any surprises, ES-Hadoop provides easy integration with Spark by enabling the creation of RDD from the data in Elasticsearch.
Spark's increasing support of integrating with various data sources, such as HDFS, Parquet, Avro, S3, Cassandra, relational databases, and streaming data makes it special when it comes to data integration. This means that when you use ES-Hadoop with Spark, you can make all these sources integrate with Elasticsearch easily.
In order to set up Apache Spark in order to execute a job, you can perform the following steps:
$ sudo wget –O /usr/local/spark.tgz
http://www.apache.org/dyn/closer.cgi/spark/spark-1.4.1/spark-1.4.1-bin-hadoop2.4.tgz
$ cd /user/local
$ sudo tar –xvf spark.tgz
$ sudo mv spark-1.4.1-bin-hadoop2.4 spark
To import the crime dataset to Elasticsearch with Spark, let's see how we can write a Spark job. We will continue using Java to write Spark jobs for consistency. Here are the driver program's snippets:
SparkConf conf = new SparkConf().setAppName("esh-spark").setMaster("local[4]");
conf.set("es.index.auto.create", "true");
JavaSparkContext context = new JavaSparkContext(conf);
Set up the SparkConf object to configure the spark job. As always, you can also set most options (such as es.index.auto.create) and other configurations that we have seen throughout the article. Using this configuration, we created the JavaSparkContext object as follows:
JavaRDD<String> textFile =
context.textFile("hdfs://localhost:9000/ch07/crimes_dataset.csv");
Read the crime data CSV file as JavaRDD. Here, RDD is still of the type String that represents each line:
JavaRDD<Crime> dataSplits = textFile.map(new Function<String, Crime>() {
@Override
public Crime call(String line) throws Exception {
CSVParser parser = CSVParser.parse(line,
CSVFormat.RFC4180);
Crime c = new Crime();
CSVRecord record = parser.getRecords().get(0);
c.setId(record.get(0));
..
..
String lat = record.get(10);
String lon = record.get(11);
Map<String, Double> geoLocation = new HashMap<>();
geoLocation.put("lat", StringUtils.isEmpty(lat)?
null:Double.parseDouble(lat));
geoLocation.put("lon",StringUtils.isEmpty(lon)?null:Double.
parseDouble(lon));
c.setGeoLocation(geoLocation);
return c;
}
});
In the preceding snippet, we called the map() method on JavaRDD to map each of the input line to the Crime object. Note that we created a simple JavaBean class called Crime that implements the Serializable interface and maps to the Elasticsearch document structure. Using CSVParser, we parsed each field into the Crime object. We mapped nested the geoLocation object by embedding Map in the Crime object. This map is populated with the lat and lon fields. This map() method returns another JavaRDD that contains the Crime objects, as shown in the following code:
JavaEsSpark.saveToEs(dataSplits, "esh_spark/crimes");
Save JavaRDD<Crime> to Elasticsearch with the JavaEsSpark class provided by Elasticsearch.
For all the ES-Hadoop integrations, such as Pig, Hive, Cascading, Apache Storm, and Spark, you can use all the standard ES-Hadoop configurations and techniques. This includes dynamic/multiresource writes with a pattern similar to esh_spark/{primaryType} and use JSON strings to directly import the data to Elasticsearch as well.
To control the Elasticsearch document metadata from being indexed, you can use the saveToEsWithMeta() method of JavaEsSpark. You can pass an instance of JavaPairRDD that contains Tuple2<Metadata, Object>, where Metadata represents a map that has the key/value pairs of the document metadata fields, such as id, ttl, timestamp, and version.
ES-Hadoop also bridges Elasticsearch with the SparkSQL module. SparkSQL 1.3+ versions provide the DataFrame abstraction that represent a collection of Row. We will not discuss the details of DataFrame here. ES-Hadoop lets you persist your DataFrame instance to Elasticsearch transparently. Let's see how we can do this with the following code:
SQLContext sqlContext = new SQLContext(context);
DataFrame df = sqlContext.createDataFrame(dataSplits,
Crime.class);
Create an SQLContext instance using the JavaSparkContext instance. Using the SqlContextSqlContext instance, you can create DataFrame by calling the createDataFrame() method and passing the existing JavaRDD<T> and Class<T>, where T is a JavaBean class that implements the Serializable interface. Note that the passing class instance is required to infer a schema for DataFrame. If you wish to use nonJavaBean-based RDD, you can create the schema manually. The article source code contains the implementations of both the approaches for your reference. Take a look at the following code:
JavaEsSparkSQL.saveToEs(df, "esh_sparksql/crimes_reflection");
Once you have the DataFrame instance, you can save it to Elasticsearch with the JavaEsSparkSQL class, as shown in the preceding code.
Here is the snippet of SparkEsReader that finds crimes related to theft:
JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(context,
"esh_spark/crimes", "{"query" : { "term" : { "primaryType" :
"theft" } } }").values();
for(Map<String,Object> item: esRDD.collect()){
System.out.println(item);
}
We used the same JavaEsSpark class to create RDD with documents that match the Elasticsearch query.
ES-Hadoop provides a org.elasticsearch.spark.sql data source provider to read the data from Elasticsearch using SparkSQL, as shown in the following code:
Map<String, String> options = new HashMap<>();
options.put("pushdown","true");
options.put("es.nodes","localhost");
DataFrame df = sqlContext.read()
.options(options)
.format("org.elasticsearch.spark.sql")
.load("esh_sparksql/crimes_reflection");
The preceding code snippet uses the org.elasticsearch.spark.sql data source to load data from Elasticsearch. You can set the pushdown option to true to push the query execution down to Elasticsearch. This greatly increases its efficiency as the query execution is collocated where the data resides, as shown in the following code:
df.registerTempTable("crimes");
DataFrame theftCrimes = sqlContext.sql("SELECT * FROM crimes WHERE primaryType='THEFT'");
for(Row row: theftCrimes.javaRDD().collect()){
System.out.println(row);
}
We registered table with the data frame and executed the SQL query on SqlContext. Note that we need to collect the final results locally to print in a driver class.
In this article, we looked at the various Hadoop ecosystem technologies. We set up Pig with ES-Hadoop and developed the script to interact with Elasticsearch. You also learned how to use ES-Hadoop to integrate Elasticsearch with Spark and empower it with powerful SQL engine SparkSQL.
Further resources on this subject: