





















































In this article by Garry Turkington and Gabriele Modena, the author of the book Learning Hadoop 2. MapReduce is a powerful paradigm that enables complex data processing that can reveal valuable insights. However, it does require a different mindset and some training and experience on the model of breaking processing analytics into a series of map and reduce steps. There are several products that are built atop Hadoop to provide higher-level or more familiar views of the data held within HDFS, and Pig is a very popular one. This article will explore the other most common abstraction implemented atop Hadoop SQL.
(For more resources related to this topic, see here.)
In this article, we will cover the following topics:
Until now, we saw how to write Hadoop programs using the MapReduce APIs and how Pig Latin provides a scripting abstraction and a wrapper for custom business logic by means of UDFs. Pig is a very powerful tool, but its dataflow-based programming model is not familiar to most developers or business analysts. The traditional tool of choice for such people to explore data is SQL.
Back in 2008, Facebook released Hive, the first widely used implementation of SQL on Hadoop.
Instead of providing a way of more quickly developing map and reduce tasks, Hive offers an implementation of HiveQL, a query language based on SQL. Hive takes HiveQL statements and immediately and automatically translates the queries into one or more MapReduce jobs. It then executes the overall MapReduce program and returns the results to the user.
This interface to Hadoop not only reduces the time required to produce results from data analysis, it also significantly widens the net as to who can use Hadoop. Instead of requiring software development skills, anyone who's familiar with SQL can use Hive.
The combination of these attributes is that HiveQL is often used as a tool for business and data analysts to perform ad hoc queries on the data stored on HDFS. With Hive, the data analyst can work on refining queries without the involvement of a software developer. Just as with Pig, Hive also allows HiveQL to be extended by means of user-defined functions, enabling the base SQL dialect to be customized with business-specific functionality.
Though Hive was the first product to introduce and support HiveQL, it is no longer the only one. Later in this article, we will also discuss Impala, released in 2013 and already a very popular tool, particularly for low-latency queries. There are others, but we will mostly discuss Hive and Impala as they have been the most successful.
While introducing the core features and capabilities of SQL on Hadoop, however, we will give examples using Hive; even though Hive and Impala share many SQL features, they also have numerous differences. We don't want to constantly have to caveat each new feature with exactly how it is supported in Hive compared to Impala. We'll generally be looking at aspects of the feature set that is common to both, but if you use both products, it's important to read the latest release notes to understand the differences.
Before diving into specific technologies, let's generate some data that we'll use in the examples throughout this article. We'll create a modified version of a former Pig script as the main functionality for this. The script in this article assumes that the Elephant Bird JARs used previously are available in the /jar directory on HDFS. The full source code is at https://github.com/learninghadoop2/book-examples/ch7/extract_for_hive.pig, but the core of extract_for_hive.pig is as follows:
-- load JSON data tweets = load '$inputDir' using com.twitter.elephantbird.pig.load.JsonLoader('-nestedLoad'); -- Tweets tweets_tsv = foreach tweets { generate (chararray)CustomFormatToISO($0#'created_at', 'EEE MMMM d HH:mm:ss Z y') as dt, (chararray)$0#'id_str', (chararray)$0#'text' as text, (chararray)$0#'in_reply_to', (boolean)$0#'retweeted' as is_retweeted, (chararray)$0#'user'#'id_str' as user_id, (chararray)$0#'place'#'id' as place_id; } store tweets_tsv into '$outputDir/tweets' using PigStorage('u0001'); -- Places needed_fields = foreach tweets { generate (chararray)CustomFormatToISO($0#'created_at', 'EEE MMMM d HH:mm:ss Z y') as dt, (chararray)$0#'id_str' as id_str, $0#'place' as place; } place_fields = foreach needed_fields { generate (chararray)place#'id' as place_id, (chararray)place#'country_code' as co, (chararray)place#'country' as country, (chararray)place#'name' as place_name, (chararray)place#'full_name' as place_full_name, (chararray)place#'place_type' as place_type; } filtered_places = filter place_fields by co != ''; unique_places = distinct filtered_places; store unique_places into '$outputDir/places' using PigStorage('u0001'); -- Users users = foreach tweets { generate (chararray)CustomFormatToISO($0#'created_at', 'EEE MMMM d HH:mm:ss Z y') as dt, (chararray)$0#'id_str' as id_str, $0#'user' as user; } user_fields = foreach users { generate (chararray)CustomFormatToISO(user#'created_at', 'EEE MMMM d HH:mm:ss Z y') as dt, (chararray)user#'id_str' as user_id, (chararray)user#'location' as user_location, (chararray)user#'name' as user_name, (chararray)user#'description' as user_description, (int)user#'followers_count' as followers_count, (int)user#'friends_count' as friends_count, (int)user#'favourites_count' as favourites_count, (chararray)user#'screen_name' as screen_name, (int)user#'listed_count' as listed_count; } unique_users = distinct user_fields; store unique_users into '$outputDir/users' using PigStorage('u0001');
Have a look at the following code:
$ pig –f extract_for_hive.pig –param inputDir=<json input> -param outputDir=<output path>
The preceding code writes data into three separate TSV files for the tweet, user, and place information. Notice that in the store command, we pass an argument when calling PigStorage. This single argument changes the default field separator from a tab character to unicode value U0001, or you can also use Ctrl +C + A. This is often used as a separator in Hive tables and will be particularly useful to us as our tweet data could contain tabs in other fields.
We will now show how you can import data into Hive and run a query against the table abstraction Hive provides over the data. In this example, and in the remainder of the article, we will assume that queries are typed into the shell that can be invoked by executing the hive command.
Even though the classic CLI tool for Hive was the tool with the same name, it is specifically called hive (all lowercase); recently a client called Beeline also became available and will likely be the preferred CLI client in the near future.
When importing any new data into Hive, there is generally a three-stage process, as follows:
Most of the HiveQL statements are direct analogues to similarly named statements in standard SQL. We assume only a passing knowledge of SQL throughout this article, but if you need a refresher, there are numerous good online learning resources.
Hive gives a structured query view of our data, and to enable that, we must first define the specification of the table's columns and import the data into the table before we can execute any queries. A table specification is generated using a CREATE statement that specifies the table name, the name and types of its columns, and some metadata about how the table is stored:
CREATE table tweets ( created_at string, tweet_id string, text string, in_reply_to string, retweeted boolean, user_id string, place_id string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY 'u0001' STORED AS TEXTFILE;
The statement creates a new table tweet defined by a list of names for columns in the dataset and their data type. We specify that fields are delimited by a tab character t and that the format used to store data is TEXTFILE.
Data can be imported from a location in HDFS tweets/ into hive using the LOAD DATA statement:
LOAD DATA INPATH 'tweets' OVERWRITE INTO TABLE tweets;
By default, data for Hive tables is stored on HDFS under /user/hive/warehouse. If a LOAD statement is given a path to data on HDFS, it will not simply copy the data into /user/hive/warehouse, but will move it there instead. If you want to analyze data on HDFS that is used by other applications, then either create a copy or use the EXTERNAL mechanism that will be described later.
Once data has been imported into Hive, we can run queries against it. For instance:
SELECT COUNT(*) FROM tweets;
The preceding code will return the total number of tweets present in the dataset. HiveQL, like SQL, is not case sensitive in terms of keywords, columns, or table names. By convention, SQL statements use uppercase for SQL language keywords, and we will generally follow this when using HiveQL within files, as will be shown later. However, when typing interactive commands, we will frequently take the line of least resistance and use lowercase.
If you look closely at the time taken by the various commands in the preceding example, you'll notice that loading data into a table takes about as long as creating the table specification, but even the simple count of all rows takes significantly longer. The output also shows that table creation and the loading of data do not actually cause MapReduce jobs to be executed, which explains the very short execution times.
Although Hive copies the data file into its working directory, it does not actually process the input data into rows at that point.
Neither the CREATE TABLE nor LOAD DATA statements don't truly create concrete table data as such; instead, they produce the metadata that will be used when Hive generates MapReduce jobs to access the data conceptually stored in the table but actually residing on HDFS. Even though the HiveQL statements refer to a specific table structure, it is Hive's responsibility to generate code that correctly maps this to the actual on-disk format in which the data files are stored.
This might seem to suggest that Hive isn't a real database; this is true, it isn't. Whereas a relational database will require a table schema to be defined before data is ingested and then ingest only data that conforms to that specification, Hive is much more flexible. The less concrete nature of Hive tables means that schemas can be defined based on the data as it has already arrived and not on some assumption of how the data should be, which might prove to be wrong. Though changeable data formats are troublesome regardless of technology, the Hive model provides an additional degree of freedom in handling the problem when, not if, it arises.
Until version 2, Hadoop was primarily a batch system. MapReduce jobs tend to have high latency and overhead derived from submission and scheduling. Internally, Hive compiles HiveQL statements into MapReduce jobs. Hive queries have traditionally been characterized by high latency. This has changed with the Stinger initiative and the improvements introduced in Hive 0.13 that we will discuss later.
Hive runs as a client application that processes HiveQL queries, converts them into MapReduce jobs, and submits these to a Hadoop cluster either to native MapReduce in Hadoop 1 or to the MapReduce Application Master running on YARN in Hadoop 2.
Regardless of the model, Hive uses a component called the metastore, in which it holds all its metadata about the tables defined in the system. Ironically, this is stored in a relational database dedicated to Hive's usage. In the earliest versions of Hive, all clients communicated directly with the metastore, but this meant that every user of the Hive CLI tool needed to know the metastore username and password.
HiveServer was created to act as a point of entry for remote clients, which could also act as a single access-control point and which controlled all access to the underlying metastore. Because of limitations in HiveServer, the newest way to access Hive is through the multi-client HiveServer2.
HiveServer2 introduces a number of improvements over its predecessor, including user authentication and support for multiple connections from the same client. More information can be found at https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2.
Instances of HiveServer and HiveServer2 can be manually executed with the hive --service hiveserver and hive --service hiveserver2 commands, respectively.
In the examples we saw before and in the remainder of this article, we implicitly use HiveServer to submit queries via the Hive command-line tool. HiveServer2 comes with Beeline. For compatibility and maturity reasons, Beeline being relatively new, both tools are available on Cloudera and most other major distributions. The Beeline client is part of the core Apache Hive distribution and so is also fully open source. Beeline can be executed in embedded version with the following command:
$ beeline -u jdbc:hive2://
HiveQL supports many of the common data types provided by standard database systems. These include primitive types, such as float, double, int, and string, through to structured collection types that provide the SQL analogues to types such as arrays, structs, and unions (structs with options for some fields). Since Hive is implemented in Java, primitive types will behave like their Java counterparts. We can distinguish Hive data types into the following five broad categories:
HiveQL provides a number of statements to create, delete, and alter databases, tables, and views. The CREATE DATABASE <name> statement creates a new database with the given name. A database represents a namespace where table and view metadata is contained. If multiple databases are present, the USE <database name> statement specifies which one to use to query tables or create new metadata. If no database is explicitly specified, Hive will run all statements against the default database. The following SHOW [DATABASES, TABLES, VIEWS] statement displays the databases currently available within a data warehouse and which table and view metadata is present within the database currently in use:
CREATE DATABASE twitter; SHOW databases; USE twitter; SHOW TABLES;
The CREATE TABLE [IF NOT EXISTS] <name> statement creates a table with the given name. As alluded to earlier, what is really created is the metadata representing the table and its mapping to files on HDFS as well as a directory in which to store the data files. If a table or view with the same name already exists, Hive will raise an exception.
Both table and column names are case insensitive. In older versions of Hive (0.12 and earlier), only alphanumeric and underscore characters were allowed in table and column names. As of Hive 0.13, the system supports unicode characters in column names. Reserved words, such as load and create, need to be escaped by backticks (the ` character) to be treated literally.
The EXTERNAL keyword specifies that the table exists in resources out of Hive's control, which can be a useful mechanism to extract data from another source at the beginning of a Hadoop-based Extract-Transform-Load (ETL) pipeline. The LOCATION clause specifies where the source file (or directory) is to be found. The EXTERNAL keyword and LOCATION clause have been used in the following code:
CREATE EXTERNAL TABLE tweets ( created_at string, tweet_id string, text string, in_reply_to string, retweeted boolean, user_id string, place_id string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY 'u0001' STORED AS TEXTFILE LOCATION '${input}/tweets';
This table will be created in metastore, but the data will not be copied into the /user/hive/warehouse directory.
Note that Hive has no concept of primary key or unique identifier. Uniqueness and data normalization are aspects to be addressed before loading data into the data warehouse.
The CREATE VIEW <view name> … AS SELECT statement creates a view with the given name. For example, we might want to create a view to isolate retweets from other messages, as follows:
CREATE VIEW retweets COMMENT 'Tweets that have been retweeted' AS SELECT * FROM tweets WHERE retweeted = true;
Unless otherwise specified, column names are derived from the defining SELECT statement. Hive does not currently support materialized views.
The DROP TABLE and DROP VIEW statements remove both metadata and data for a given table or view. When dropping an EXTERNAL table or a view, only metadata will be removed and the actual data files will not be affected.
Hive allows table metadata to be altered via the ALTER TABLE statement, which can be used to change a column type, name, position, and comment or to add and replace columns.
When adding columns, it is important to remember that only metadata will be changed and not the dataset itself. This means that if we were to add a column in the middle of the table, which didn't exist in older files, then, while selecting from older data, we might get wrong values in the wrong columns. This is because we would be looking at old files with a new format.
Similarly, ALTER VIEW <view name> AS <select statement> changes the definition of an existing view.
The data files underlying a Hive table are no different from any other file on HDFS. Users can directly read the HDFS files in the Hive tables using other tools. They can also use other tools to write to HDFS files that can be loaded into Hive through CREATE EXTERNAL TABLE or through LOAD DATA INPATH.
Hive uses the Serializer and Deserializer classes, SerDe, as well as FileFormat to read and write table rows. A native SerDe is used if ROW FORMAT is not specified or ROW FORMAT DELIMITED is specified in a CREATE TABLE statement. The DELIMITED clause instructs the system to read delimited files. Delimiter characters can be escaped using the ESCAPED BY clause.
Hive currently uses the following FileFormat classes to read and write HDFS files:
Additionally, the following SerDe classes can be used to serialize and deserialize data:
As of version 0.13, Hive ships with the native org.apache.hive.hcatalog.data.JsonSerDe JSON SerDe. For older versions of Hive, Hive-JSON-Serde (found at https://github.com/rcongiu/Hive-JSON-Serde) is arguably one of the most feature-rich JSON serialization/deserialization modules.
We can use either module to load JSON tweets without any need for preprocessing and just define a Hive schema that matches the content of a JSON document. In the following example, we use Hive-JSON-Serde.
As with any third-party module, we load the SerDe JARS into Hive with the following code:
ADD JAR JAR json-serde-1.3-jar-with-dependencies.jar;
Then, we issue the usual create statement, as follows:
CREATE EXTERNAL TABLE tweets ( contributors string, coordinates struct < coordinates: array <float>, type: string>, created_at string, entities struct < hashtags: array <struct < indices: array <tinyint>, text: string>>, … ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS TEXTFILE LOCATION 'tweets';
With this SerDe, we can map nested documents (such as entities or users) to the struct or map types. We tell Hive that the data stored at LOCATION 'tweets' is text (STORED AS TEXTFILE) and that each row is a JSON object (ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'). In Hive 0.13 and later, we can express this property as ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'.
Manually specifying the schema for complex documents can be a tedious and error-prone process. The hive-json module (found at https://github.com/hortonworks/hive-json) is a handy utility to analyze large documents and generate an appropriate Hive schema. Depending on the document collection, further refinement might be necessary.
In our example, we used a schema generated with hive-json that maps the tweets JSON to a number of struct data types. This allows us to query the data using a handy dot notation. For instance, we can extract the screen name and description fields of a user object with the following code:
SELECT user.screen_name, user.description FROM tweets_json LIMIT 10;
AvroSerde (https://cwiki.apache.org/confluence/display/Hive/AvroSerDe) allows us to read and write data in Avro format. Starting from 0.14, Avro-backed tables can be created using the STORED AS AVRO statement, and Hive will take care of creating an appropriate Avro schema for the table. Prior versions of Hive are a bit more verbose.
This dataset was created using Pig's AvroStorage class, which generated the following schema:
{ "type":"record", "name":"record", "fields": [ {"name":"topic","type":["null","int"]}, {"name":"source","type":["null","int"]}, {"name":"rank","type":["null","float"]} ] }
The structure is quite self-explanatory. The table structure is captured in an Avro record, which contains header information (a name and optional namespace to qualify the name) and an array of the fields. Each field is specified with its name and type as well as an optional documentation string.
For a few of the fields, the type is not a single value, but instead a pair of values, one of which is null. This is an Avro union, and this is the idiomatic way of handling columns that might have a null value. Avro specifies null as a concrete type, and any location where another type might have a null value needs to be specified in this way. This will be handled transparently for us when we use the following schema.
With this definition, we can now create a Hive table that uses this schema for its table specification, as follows:
CREATE EXTERNAL TABLE tweets_pagerank ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' WITH SERDEPROPERTIES ('avro.schema.literal'='{ "type":"record", "name":"record", "fields": [ {"name":"topic","type":["null","int"]}, {"name":"source","type":["null","int"]}, {"name":"rank","type":["null","float"]} ] }') STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' LOCATION '${data}/ch5-pagerank';
Then, look at the following table definition from within Hive (note also that HCatalog):
DESCRIBE tweets_pagerank; OK topic int from deserializer source int from deserializer rank float from deserializer
In the ddl, we told Hive that data is stored in the Avro format using AvroContainerInputFormat and AvroContainerOutputFormat. Each row needs to be serialized and deserialized using org.apache.hadoop.hive.serde2.avro.AvroSerDe. The table schema is inferred by Hive from the Avro embedded in avro.schema.literal.
Alternatively, we can store a schema on HDFS and have Hive read it to determine the table structure. Create the preceding schema in a file called pagerank.avsc—this is the standard file extension for Avro schemas. Then place it on HDFS; we want to have a common location for schema files such as /schema/avro. Finally, define the table using the avro.schema.url SerDe property WITH SERDEPROPERTIES ('avro.schema.url'='hdfs://<namenode>/schema/avro/pagerank.avsc').
If Avro dependencies are not present in the classpath, we need to add the Avro MapReduce JAR to our environment before accessing individual fields. Within Hive, for example, on the Cloudera CDH5 VM:
ADD JAR /opt/cloudera/parcels/CDH/lib/avro/avro-mapred-hadoop2.jar;
We can also use this table like any other. For instance, we can query the data to select the user and topic pairs with a high PageRank:
SELECT source, topic from tweets_pagerank WHERE rank >= 0.9;
We will see how Avro and avro.schema.url play an instrumental role in enabling schema migrations.
Hive can also take advantage of columnar storage via the ORC (https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC) and Parquet (https://cwiki.apache.org/confluence/display/Hive/Parquet) formats.
If a table is defined with very many columns, it is not unusual for any given query to only process a small subset of these columns. But even in SequenceFile, each full row and all its columns will be read from the disk, decompressed, and processed. This consumes a lot of system resources for data that we know in advance is not of interest.
Traditional relational databases also store data on a row basis, and a type of database called columnarchanged this to be column-focused. In the simplest model, instead of one file for each table, there would be one file for each column in the table. If a query only needed to access five columns in a table with 100 columns in total, then only the files for those five columns will be read. Both ORC and Parquet use this principle as well as other optimizations to enable much faster queries.
Tables can be queried using the familiar SELECT … FROM statement. The WHERE statement allows the specification of filtering conditions, GROUP BY aggregates records, ORDER BY specifies sorting criteria, and LIMIT specifies the number of records to retrieve. Aggregate functions, such as count and sum, can be applied to aggregated records. For instance, the following code returns the top 10 most prolific users in the dataset:
SELECT user_id, COUNT(*) AS cnt FROM tweets GROUP BY user_id ORDER BY cnt DESC LIMIT 10
The following are the top 10 most prolific users in the dataset:
NULL 7091 1332188053 4 959468857 3 1367752118 3 362562944 3 58646041 3 2375296688 3 1468188529 3 37114209 3 2385040940 3
This allows us to identify the number of tweets, 7,091, with no user object.
We can improve the readability of the hive output by setting the following code:
SET hive.cli.print.header=true;
This will instruct hive, though not beeline, to print column names as part of the output.
You can add the command to the .hiverc file usually found in the root of the executing user's home directory to have it apply to all hive CLI sessions.
HiveQL implements a JOIN operator that enables us to combine tables together. In the Prerequisites section, we generated separate datasets for the user and place objects. Let's now load them into hive using external tables.
We first create a user table to store user data, as follows:
CREATE EXTERNAL TABLE user ( created_at string, user_id string, `location` string, name string, description string, followers_count bigint, friends_count bigint, favourites_count bigint, screen_name string, listed_count bigint ) ROW FORMAT DELIMITED FIELDS TERMINATED BY 'u0001' STORED AS TEXTFILE LOCATION '${input}/users';
We then create a place table to store location data, as follows:
CREATE EXTERNAL TABLE place ( place_id string, country_code string, country string, `name` string, full_name string, place_type string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY 'u0001' STORED AS TEXTFILE LOCATION '${input}/places';
We can use the JOIN operator to display the names of the 10 most prolific users, as follows:
SELECT tweets.user_id, user.name, COUNT(tweets.user_id) AS cnt FROM tweets JOIN user ON user.user_id = tweets.user_id GROUP BY tweets.user_id, user.user_id, user.name ORDER BY cnt DESC LIMIT 10;
Only equality, outer, and left (semi) joins are supported in Hive.
Notice that there might be multiple entries with a given user ID but different values for the followers_count, friends_count, and favourites_count columns. To avoid duplicate entries, we count only user_id from the tweets tables.
Alternatively, we can rewrite the previous query as follows:
SELECT tweets.user_id, u.name, COUNT(*) AS cnt FROM tweets join (SELECT user_id, name FROM user GROUP BY user_id, name) u ON u.user_id = tweets.user_id GROUP BY tweets.user_id, u.name ORDER BY cnt DESC LIMIT 10;
Instead of directly joining the user table, we execute a subquery, as follows:
SELECT user_id, name FROM user GROUP BY user_id, name;
The subquery extracts unique user IDs and names. Note that Hive has limited support for subqueries. Historically, only permitting a subquery in the FROM clause of a SELECT statement. Hive 0.13 has added limited support for subqueries within the WHERE clause also.
HiveQL is an ever-evolving rich language, a full exposition of which is beyond the scope of this article. A description of its query and ddl capabilities can be found at https://cwiki.apache.org/confluence/display/Hive/LanguageManual.
Often Hive isn't used in isolation, instead tables are created with particular workloads in mind or with needs invoked in ways that are suitable for inclusion in automated processes. We'll now explore some of these scenarios.
With columnar file formats, we explained the benefits of excluding unneeded data as early as possible when processing a query. A similar concept has been used in SQL for some time: table partitioning.
When creating a partitioned table, a column is specified as the partition key. All values with that key are then stored together. In Hive's case, different subdirectories for each partition key are created under the table directory in the warehouse location on HDFS.
It's important to understand the cardinality of the partition column. With too few distinct values, the benefits are reduced as the files are still very large. If there are too many values, then queries might need a large number of files to be scanned to access all the required data. Perhaps, the most common partition key is one based on date. We could, for example, partition our user table from earlier based on the created_at column, that is, the date the user was first registered. Note that since partitioning a table by definition affects its file structure, we create this table now as a non-external one, as follows:
CREATE TABLE partitioned_user ( created_at string, user_id string, `location` string, name string, description string, followers_count bigint, friends_count bigint, favourites_count bigint, screen_name string, listed_count bigint ) PARTITIONED BY (created_at_date string) ROW FORMAT DELIMITED FIELDS TERMINATED BY 'u0001' STORED AS TEXTFILE;
To load data into a partition, we can explicitly give a value for the partition in which to insert the data, as follows:
INSERT INTO TABLE partitioned_user PARTITION( created_at_date = '2014-01-01') SELECT created_at, user_id, location, name, description, followers_count, friends_count, favourites_count, screen_name, listed_count FROM user;
This is at best verbose, as we need a statement for each partition key value; if a single LOAD or INSERT statement contains data for multiple partitions, it just won't work. Hive also has a feature called dynamic partitioning, which can help us here. We set the following three variables:
SET hive.exec.dynamic.partition = true; SET hive.exec.dynamic.partition.mode = nonstrict; SET hive.exec.max.dynamic.partitions.pernode=5000;
The first two statements enable all partitions (nonstrict option) to be dynamic. The third one allows 5,000 distinct partitions to be created on each mapper and reducer node.
We can then simply use the name of the column to be used as the partition key, and Hive will insert data into partitions depending on the value of the key for a given row:
INSERT INTO TABLE partitioned_user PARTITION( created_at_date ) SELECT created_at, user_id, location, name, description, followers_count, friends_count, favourites_count, screen_name, listed_count, to_date(created_at) as created_at_date FROM user;
Even though we use only a single partition column, here we can partition a table by multiple column keys; just have them as a comma-separated list in the PARTITIONED BY clause.
Note that the partition key columns need to be included as the last columns in any statement being used to insert into a partitioned table as in the preceding code. We use Hive's to_date function to convert the created_at timestamp to a YYYY-MM-DD formatted string.
Partitioned data is stored in HDFS as /path/to/warehouse/<database>/<table>/key=<value>. In our example, the partitioned_user table structure will look like /user/hive/warehouse/default/partitioned_user/created_at=2014-04-01.
If data is added directly to the filesystem, for instance, by some third-party processing tool or by hadoop fs -put, the metastore won't automatically detect the new partitions. The user will need to manually run an ALTER TABLE statement such as the following for each newly added partition:
ALTER TABLE <table_name> ADD PARTITION <location>;
Using the MSCK REPAIR TABLE <table_name>; statement, all metadata for all partitions not currently present in the metastore will be added. On EMR, this is equivalent to executing the following code:
ALTER TABLE <table_name> RECOVER PARTITIONS;
Notice that both statements will work also with EXTERNAL tables. In the following article, we will see how this pattern can be exploited to create flexible and interoperable pipelines.
Partitioning is also useful when we need to update a portion of a table. Normally a statement of the following form will replace all the data for the destination table:
INSERT OVERWRITE INTO <table>…
If OVERWRITE is omitted, then each INSERT statement will add additional data to the table. Sometimes, this is desirable, but often, the source data being ingested into a Hive table is intended to fully update a subset of the data and keep the rest untouched.
If we perform an INSERT OVERWRITE statement (or a LOAD OVERWRITE statement) into a partition of a table, then only the specified partition will be affected. Thus, if we were inserting user data and only wanted to affect the partitions with data in the source file, we could achieve this by adding the OVERWRITE keyword to our previous INSERT statement.
We can also add caveats to the SELECT statement. Say, for example, we only wanted to update data for a certain month:
INSERT INTO TABLE partitioned_user PARTITION (created_at_date) SELECT created_at , user_id, location, name, description, followers_count, friends_count, favourites_count, screen_name, listed_count, to_date(created_at) as created_at_date FROM user WHERE to_date(created_at) BETWEEN '2014-03-01' and '2014-03-31';
Partitioning a table is a construct that you take explicit advantage of by using the partition column (or columns) in the WHERE clause of queries against the tables. There is another mechanism called bucketing that can further segment how a table is stored and does so in a way that allows Hive itself to optimize its internal query plans to take advantage of the structure.
Let's create bucketed versions of our tweets and user tables; note the following additional CLUSTER BY and SORT BY statements in the CREATE TABLE statements:
CREATE table bucketed_tweets ( tweet_id string, text string, in_reply_to string, retweeted boolean, user_id string, place_id string ) PARTITIONED BY (created_at string) CLUSTERED BY(user_ID) into 64 BUCKETS ROW FORMAT DELIMITED FIELDS TERMINATED BY 'u0001' STORED AS TEXTFILE; CREATE TABLE bucketed_user ( user_id string, `location` string, name string, description string, followers_count bigint, friends_count bigint, favourites_count bigint, screen_name string, listed_count bigint ) PARTITIONED BY (created_at string) CLUSTERED BY(user_ID) SORTED BY(name) into 64 BUCKETS ROW FORMAT DELIMITED FIELDS TERMINATED BY 'u0001' STORED AS TEXTFILE;
Note that we changed the tweets table to also be partitioned; you can only bucket a table that is partitioned.
Just as we need to specify a partition column when inserting into a partitioned table, we must also take care to ensure that data inserted into a bucketed table is correctly clustered. We do this by setting the following flag before inserting the data into the table:
SET hive.enforce.bucketing=true;
Just as with partitioned tables, you cannot apply the bucketing function when using the LOAD DATA statement; if you wish to load external data into a bucketed table, first insert it into a temporary table, and then use the INSERT…SELECT… syntax to populate the bucketed table.
When data is inserted into a bucketed table, rows are allocated to a bucket based on the result of a hash function applied to the column specified in the CLUSTERED BY clause.
One of the greatest advantages of bucketing a table comes when we need to join two tables that are similarly bucketed, as in the previous example. So, for example, any query of the following form would be vastly improved:
SET hive.optimize.bucketmapjoin=true; SELECT … FROM bucketed_user u JOIN bucketed_tweet t ON u.user_id = t.user_id;
With the join being performed on the column used to bucket the table, Hive can optimize the amount of processing as it knows that each bucket contains the same set of user_id columns in both tables. While determining which rows against which to match, only those in the bucket need to be compared against, and not the whole table. This does require that the tables are both clustered on the same column and that the bucket numbers are either identical or one is a multiple of the other. In the latter case, with say one table clustered into 32 buckets and another into 64, the nature of the default hash function used to allocate data to a bucket means that the IDs in bucket 3 in the first table will cover those in both buckets 3 and 35 in the second.
Bucketing a table can also help while using Hive's ability to sample data in a table. Sampling allows a query to gather only a specified subset of the overall rows in the table. This is useful when you have an extremely large table with moderately consistent data patterns. In such a case, applying a query to a small fraction of the data will be much faster and will still give a broadly representative result. Note, of course, that this only applies to queries where you are looking to determine table characteristics, such as pattern ranges in the data; if you are trying to count anything, then the result needs to be scaled to the full table size.
For a non-bucketed table, you can sample in a mechanism similar to what we saw earlier by specifying that the query should only be applied to a certain subset of the table:
SELECT max(friends_count) FROM user TABLESAMPLE(BUCKET 2 OUT OF 64 ON name);
In this query, Hive will effectively hash the rows in the table into 64 buckets based on the name column. It will then only use the second bucket for the query. Multiple buckets can be specified, and if RAND() is given as the ON clause, then the entire row is used by the bucketing function.
Though successful, this is highly inefficient as the full table needs to be scanned to generate the required subset of data. If we sample on a bucketed table and ensure the number of buckets sampled is equal to or a multiple of the buckets in the table, then Hive will only read the buckets in question. The following code is representative of this case:
SELECT MAX(friends_count) FROM bucketed_user TABLESAMPLE(BUCKET 2 OUT OF 32 on user_id);
In the preceding query against the bucketed_user table, which is created with 64 buckets on the user_id column, the sampling, since it is using the same column, will only read the required buckets. In this case, these will be buckets 2 and 34 from each partition.
A final form of sampling is block sampling. In this case, we can specify the required amount of the table to be sampled, and Hive will use an approximation of this by only reading enough source data blocks on HDFS to meet the required size. Currently, the data size can be specified as either a percentage of the table, as an absolute data size, or as a number of rows (in each block). The syntax for TABLESAMPLE is as follows, which will sample 0.5 percent of the table, 1 GB of data or 100 rows per split, respectively:
TABLESAMPLE(0.5 PERCENT) TABLESAMPLE(1G) TABLESAMPLE(100 ROWS)
If these latter forms of sampling are of interest, then consult the documentation, as there are some specific limitations on the input format and file formats that are supported.
We can place Hive commands in a file and run them with the -f option in the hive CLI utility:
$ cat show_tables.hql show tables; $ hive -f show_tables.hql
We can parameterize HiveQL statements by means of the hiveconf mechanism. This allows us to specify an environment variable name at the point it is used rather than at the point of invocation. For example:
$ cat show_tables2.hql show tables like '${hiveconf:TABLENAME}'; $ hive -hiveconf TABLENAME=user -f show_tables2.hql
The variable can also be set within the Hive script or an interactive session:
SET TABLE_NAME='user';
The preceding hiveconf argument will add any new variables in the same namespace as the Hive configuration options. As of Hive 0.8, there is a similar option called hivevar that adds any user variables into a distinct namespace. Using hivevar, the preceding command would be as follows:
$ cat show_tables3.hql show tables like '${hivevar:TABLENAME}'; $ hive -hivevar TABLENAME=user –f show_tables3.hql
Or we can write the command interactively:
SET hivevar_TABLE_NAME='user';
With ElasticMapReduce as the AWS Hadoop-on-demand service, it is of course possible to run Hive on an EMR cluster. But it is also possible to use Amazon storage services, particularly S3, from any Hadoop cluster, be it within EMR or your own local cluster.
It is possible to specify a default filesystem other than HDFS for Hadoop and S3 is one option. But, it doesn't have to be an all-or-nothing thing; it is possible to have specific tables stored in S3. The data for these tables will be retrieved into the cluster to be processed, and any resulting data can either be written to a different S3 location (the same table cannot be the source and destination of a single query) or onto HDFS.
We can take a file of our tweet data and place it onto a location in S3 with a command such as the following:
$ aws s3 put tweets.tsv s3://<bucket-name>/tweets/
We firstly need to specify the access key and secret access key that can access the bucket. This can be done in three ways:
Then we can create a table referencing this data, as follows:
CREATE table remote_tweets ( created_at string, tweet_id string, text string, in_reply_to string, retweeted boolean, user_id string, place_id string ) CLUSTERED BY(user_ID) into 64 BUCKETS ROW FORMAT DELIMITED FIELDS TERMINATED BY 't' LOCATION 's3n://<bucket-name>/tweets'
This can be an incredibly effective way of pulling S3 data into a local Hadoop cluster for processing.
In order to use AWS credentials in the URI of an S3 location regardless of how the parameters are passed, the secret and access keys must not contain /, +, =, or characters. If necessary, a new set of credentials can be generated from the IAM console at https://console.aws.amazon.com/iam/.
In theory, you can just leave the data in the external table and refer to it when needed to avoid WAN data transfer latencies (and costs), even though it often makes sense to pull the data into a local table and do future processing from there. If the table is partitioned, then you might find yourself retrieving a new partition each day, for example.
On one level, using Hive within Amazon ElasticMapReduce is just the same as everything discussed in this article. You can create a persistent cluster, log in to the master node, and use the Hive CLI to create tables and submit queries. Doing all this will use the local storage on the EC2 instances for the table data.
Not surprisingly, jobs on EMR clusters can also refer to tables whose data is stored on S3 (or DynamoDB). And, not surprisingly, Amazon has made extensions to its version of Hive to make all this very seamless. It is quite simple from within an EMR job to pull data from a table stored in S3, process it, write any intermediate data to the EMR local storage, and then write the output results into S3, DynamoDB, or one of a growing list of other AWS services.
The pattern mentioned earlier where new data is added to a new partition directory for a table each day has proved very effective in S3; it is often the storage location of choice for large and incrementally growing datasets. There is a syntax difference when using EMR; instead of the MSCK command mentioned earlier, the command to update a Hive table with new data added to a partition directory is as follows:
ALTER TABLE <table-name> RECOVER PARTITIONS;
Consult the EMR documentation for the latest enhancements at http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-hive-additional-features.html. Also, consult the broader EMR documentation. In particular, the integration points with other AWS services is an area of rapid growth.
The HiveQL language can be extended by means of plugins and third-party functions. In Hive, there are three types of functions characterized by the number of rows they take as input and produce as output:
These APIs are provided only in Java. For other languages, it is possible to stream data through a user-defined script using the TRANSFORM, MAP, and REDUCE clauses that act as a frontend to Hadoop's streaming capabilities.
Two APIs are available to write UDFs. A simple API org.apache.hadoop.hive.ql.exec.UDF can be used for functions that take read and return basic writable types. A richer API, which provides support for data types other than writable is available in the org.apache.hadoop.hive.ql.udf.generic.GenericUDF package. We'll now illustrate how org.apache.hadoop.hive.ql.exec.UDF can be used to implement a string to ID function similar to the one we used in Iterative Computation with Spark, to map hashtags to integers in Pig. Building a UDF with this API only requires extending the UDF class and writing an evaluate() method, as follows:
public class StringToInt extends UDF { public Integer evaluate(Text input) { if (input == null) return null; String str = input.toString(); return str.hashCode(); } }
The function takes a Text object as input and maps it to an integer value with the hashCode() method. The source code of this function can be found at https://github.com/learninghadoop2/book-examples/ch7/udf/ com/learninghadoop2/hive/udf/StringToInt.java.
A more robust hash function should be used in production.
We compile the class and archive it into a JAR file, as follows:
$ javac -classpath $(hadoop classpath):/opt/cloudera/parcels/CDH/lib/hive/lib/* com/learninghadoop2/hive/udf/StringToInt.java $ jar cvf myudfs-hive.jar com/learninghadoop2/hive/udf/StringToInt.class
Before being able to use it, a UDF must be registered in Hive with the following commands:
ADD JAR myudfs-hive.jar; CREATE TEMPORARY FUNCTION string_to_int AS 'com.learninghadoop2.hive.udf.StringToInt';
The ADD JAR statement adds a JAR file to the distributed cache. The CREATE TEMPORARY FUNCTION <function> AS <class> statement registers as a function in Hive that implements a given Java class. The function will be dropped once the Hive session is closed. As of Hive 0.13, it is possible to create permanent functions whose definition is kept in the metastore using CREATE FUNCTION … .
Once registered, StringToInt can be used in a Hive query just as any other function. In the following example, we first extract a list of hashtags from the tweet's text by applying the regexp_extract function. Then, we use string_to_int to map each tag to a numerical ID:
SELECT unique_hashtags.hashtag, string_to_int(unique_hashtags.hashtag) AS tag_id FROM ( SELECT regexp_extract(text, '(?:\s|\A|^)[##]+([A-Za-z0-9-_]+)') as hashtag FROM tweets GROUP BY regexp_extract(text, '(?:\s|\A|^)[##]+([A-Za-z0-9-_]+)') ) unique_hashtags GROUP BY unique_hashtags.hashtag, string_to_int(unique_hashtags.hashtag);
We can use the preceding query to create a lookup table, as follows:
CREATE TABLE lookuptable (tag string, tag_id bigint); INSERT OVERWRITE TABLE lookuptable SELECT unique_hashtags.hashtag, string_to_int(unique_hashtags.hashtag) as tag_id FROM ( SELECT regexp_extract(text, '(?:\s|\A|^)[##]+([A-Za-z0-9-_]+)') AS hashtag FROM tweets GROUP BY regexp_extract(text, '(?:\s|\A|^)[##]+([A-Za-z0-9-_]+)') ) unique_hashtags GROUP BY unique_hashtags.hashtag, string_to_int(unique_hashtags.hashtag);
In addition to the hive and beeline command-line tools, it is possible to submit HiveQL queries to the system via the JDBC and Thrift programmatic interfaces. Support for odbc was bundled in older versions of Hive, but as of Hive 0.12, it needs to be built from scratch. More information on this process can be found at https://cwiki.apache.org/confluence/display/Hive/HiveODBC.
A Hive client written using JDBC APIs looks exactly the same as a client program written for other database systems (for example MySQL). The following is a sample Hive client program using JDBC APIs. The source code for this example can be found at https://github.com/learninghadoop2/book-examples/ch7/clients/ com/learninghadoop2/hive/client/HiveJdbcClient.java.
public class HiveJdbcClient { private static String driverName = " org.apache.hive.jdbc.HiveDriver"; // connection string public static String URL = "jdbc:hive2://localhost:10000"; // Show all tables in the default database public static String QUERY = "show tables"; public static void main(String[] args) throws SQLException { try { Class.forName (driverName); } catch (ClassNotFoundException e) { e.printStackTrace(); System.exit(1); } Connection con = DriverManager.getConnection (URL); Statement stmt = con.createStatement(); ResultSet resultSet = stmt.executeQuery(QUERY); while (resultSet.next()) { System.out.println(resultSet.getString(1)); } } }
The URL part is the JDBC URI that describes the connection end point. The format for establishing a remote connection is jdbc:hive2:<host>:<port>/<database>. Connections in embedded mode can be established by not specifying a host or port jdbc:hive2://.
The hive and hive2 part are the drivers to be used when connecting to HiveServer and HiveServer2. The QUERY statement contains the HiveQL query to be executed.
Hive's JDBC interface exposes only the default database. In order to access other databases, you need to reference them explicitly in the underlying queries using the <database>.<table> notation.
First we load the HiveServer2 JDBC driver org.apache.hive.jdbc.HiveDriver.
Use org.apache.hadoop.hive.jdbc.HiveDriver to connect to HiveServer.
Then, like with any other JDBC program, we establish a connection to URL and use it to instantiate a Statement class. We execute QUERY, with no authentication, and store the output dataset into the ResultSet object. Finally, we scan resultSet and print its content to the command line.
Compile and execute the example with the following commands:
$ javac HiveJdbcClient.java $ java -cp $(hadoop classpath):/opt/cloudera/parcels/CDH/lib/hive/lib/*:/opt/cloudera/parcels/CDH
/lib/hive/lib/hive-jdbc.jar: com.learninghadoop2.hive.client.HiveJdbcClient
Thrift provides lower-level access to Hive and has a number of advantages over the JDBC implementation of HiveServer. Primarily, it allows multiple connections from the same client, and it allows programming languages other than Java to be used with ease. With HiveServer2, it is a less commonly used option but still worth mentioning for compatibility. A sample Thrift client implemented using the Java API can be found at https://github.com/learninghadoop2/book-examples/ch7/clients/ com/learninghadoop2/hive/client/HiveThriftClient.java. This client can be used to connect to HiveServer, but due to protocol differences, the client won't work with HiveServer2.
In the example, we define a getClient() method that takes as input the host and port of a HiveServer service and returns an instance of org.apache.hadoop.hive.service.ThriftHive.Client.
A client is obtained by first instantiating a socket connection, org.apache.thrift.transport.TSocket, to the HiveServer service, and by specifying a protocol, org.apache.thrift.protocol.TBinaryProtocol, to serialize and transmit data, as follows:
TSocket transport = new TSocket(host, port); transport.setTimeout(TIMEOUT); transport.open(); TBinaryProtocol protocol = new TBinaryProtocol(transport); client = new ThriftHive.Client(protocol);
Finally, we call getClient() from the main method and use the client to execute a query against an instance of HiveServer running on localhost on port 11111, as follows:
public static void main(String[] args) throws Exception { Client client = getClient("localhost", 11111); client.execute("show tables"); List<String> results = client.fetchAll(); for (String result : results) { System.out.println(result); } }
Make sure that HiveServer is running on port 11111, and if not, start an instance with the following command:
$ sudo hive --service hiveserver -p 11111
Compile and execute the HiveThriftClient.java example with the following command:
$ javac $(hadoop classpath):/opt/cloudera/parcels/CDH/lib/hive/lib/* com/learninghadoop2/hive/
client/HiveThriftClient.java $ java -cp $(hadoop classpath):/opt/cloudera/parcels/CDH/lib/hive/lib/*: com.learninghadoop2.hive.
client.HiveThriftClient
Hive has remained very successful and capable since its earliest releases, particularly in its ability to provide SQL-like processing on enormous datasets. But other technologies did not stand still, and Hive acquired a reputation of being relatively slow, particularly in regard to lengthy startup times on large jobs and its inability to give quick responses to conceptually simple queries.
These perceived limitations were less due to Hive itself and more a consequence of how translation of SQL queries into the MapReduce model has much built-in inefficiency when compared to other ways of implementing a SQL query. Particularly, in regard to very large datasets, MapReduce saw lots of I/OI/O (and consequently time) spent writing out the results of one MapReduce job just to have them read by another. Processing - MapReduce and Beyond, this is a major driver in the design of Tez, which can schedule tasks on a Hadoop cluster as a graph of tasks that does not require inefficient writes and reads between tasks in the graph.
The following is a query on the MapReduce framework versus Tez:
SELECT a.country, COUNT(b.place_id) FROM place a JOIN tweets b ON (a. place_id = b.place_id) GROUP BY a.country;
The following figure contrasts the execution plan for the preceding query on the MapReduce framework versus Tez:
Hive on MapReduce versus Tez
In plain MapReduce, two jobs are created for the GROUP BY and JOIN clauses. The first job is composed of a set of MapReduce tasks that read data from the disk to carry out grouping. The reducers write intermediate results to the disk so that output can be synchronized. The mappers in the second job read the intermediate results from the disk as well as data from table b. The combined dataset is then passed to the reducer where shared keys are joined. Were we to execute an ORDER BY statement, this would have resulted in a third job and further MapReduce passes. The same query is executed on Tez as a single job by a single set of Map tasks that read data from the disk. I/O grouping and joining are pipelined across reducers.
Alongside these architectural limitations, there were quite a few areas around SQL language support that could also provide better efficiency, and in early 2013, the Stinger initiative was launched with an explicit goal of making Hive over 100 times as fast and with much richer SQL support. Hive 0.13 has all the features of the three phases of Stinger, resulting in a much more complete SQL dialect. Also, Tez is offered as an execution framework in addition to a more efficient MapReduce-based implementation atop YARN.
With Tez as the execution engine, Hive is no longer limited to a series of linear MapReduce jobs and can instead build a processing graph where any given step can, for example, stream results to multiple sub-steps.
To take advantage of the Tez framework, there is a new Hive variable setting, as follows:
set hive.execution.engine=tez;
This setting relies on Tez being installed on the cluster; it is available in source form from http://tez.incubator.apache.org or in several distributions, though at the time of writing, not Cloudera, due to its support of Impala.
The alternative value is mr, which uses the classic MapReduce model (atop YARN), so it is possible in a single installation to compare the performance of Hive using Tez.
Hive is not the only product providing the SQL-on-Hadoop capability. The second most widely used is likely Impala, announced in late 2012 and released in spring 2013. Though originally developed internally within Cloudera, its source code is periodically pushed to an open source Git repository (https://github.com/cloudera/impala).
Impala was created out of the same perception of Hive's weaknesses that led to the Stinger initiative.
Impala also took some inspiration from Google Dremel (http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/36632.pdf) which was first openly described by a paper published in 2009. Dremel was built at Google to address the gap between the need for very fast queries on very large datasets and the high latency inherent in the existing MapReduce model underpinning Hive at the time. Dremel was a sophisticated approach to this problem that, rather than building mitigations atop MapReduce such as implemented by Hive, instead created a new service that accessed the same data stored in HDFS. Dremel also benefited from significant work to optimize the storage format of the data in a way that made it more amenable to very fast analytic queries.
The basic architecture has three main components; the Impala daemons, the state store, and the clients. Recent versions have added additional components that improve the service, but we'll focus on the high-level architecture.
The Impala daemon (impalad) should be run on each host where a DataNode process is managing HDFS data. Note that impalad does not access the filesystem blocks through the full HDFS FileSystem API; instead, it uses a feature called short-circuit reads to make data access more efficient.
When a client submits a query, it can do so to any of the running impalad processes, and this one will become the coordinator for the execution of that query. The key aspect of Impala's performance is that for each query, it generates custom native code, which is then pushed to and executed by all the impalad processes on the system. This highly optimized code performs the query on the local data, and each impalad then returns its subset of the result set to the coordinator node, which performs the final data consolidation to produce the final result. This type of architecture should be familiar to anyone who has worked with any of the (usually commercial and expensive) Massively Parallel Processing (MPP) (the term used for this type of shared scale-out architecture) data warehouse solutions available today. As the cluster runs the state, the store ensures that each impalad process is aware of all the others and provides a view of the overall cluster health.
Impala, as a newer product, tends to have a more restricted set of SQL data types and supports a more constrained dialect of SQL than Hive. It is, however, expanding this support with each new release. Refer to the Impala documentation (http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDH5/latest/Impala/impala.html) to get an overview of the current level of support.
Impala supports the Hive metastore mechanism used by Hive as a store of the metadata surrounding its table structure and storage. This means that on a cluster with an existing Hive setup, it should be immediately possible to use Impala as it will access the same metastore and therefore provide access to the same tables available in Hive.
But be warned that the differences in SQL dialect and data types might cause unexpected results when working in a combined Hive and Impala environment. Some queries might work on one but not the other, they might show very different performance characteristics (more on this later), or they might actually give different results. This last point might become apparent when using data types such as float and double that are simply treated differently in the underlying systems (Hive is implemented on Java while Impala is written in C++).
As of version 1.2, it supports UDFs written both in C++ and Java, although C++ is strongly recommended as a much faster solution. Keep this in mind if you are looking to share custom functions between Hive and Impala.
When Impala was first released, its greatest benefit was in how it truly enabled what is often called speed of thought analysis. Queries could be returned sufficiently fast that an analyst could explore a thread of analysis in a completely interactive fashion without having to wait for minutes at a time for each query to complete. It's fair to say that most adopters of Impala were at times stunned by its performance, especially when compared to the version of Hive shipping at the time.
The Impala focus has remained mostly on these shorter queries, and this does impose some limitations on the system. Impala tends to be quite memory-heavy as it relies on in-memory processing to achieve much of its performance. If a query requires a dataset to be held in memory rather than being available on the executing node, then that query will simply fail in versions of Impala before 2.0.
Comparing the work on Stinger to Impala, it could be argued that Impala has a much stronger focus on excelling in the shorter (and arguably more common) queries that support interactive data analysis. Many business intelligence tools and services are now certified to directly run on Impala. The Stinger initiative has put less effort into making Hive just as fast in the area where Impala excels but has instead improved Hive (to varying degrees) for all workloads. Impala is still developing at a fast pace and Stinger has put additional momentum into Hive, so it is most likely wise to consider both products and determine which best meets the performance and functionality requirements of your projects and workflows.
It should also be kept in mind that there are competitive commercial pressures shaping the direction of Impala and Hive. Impala was created and is still driven by Cloudera, the most popular vendor of Hadoop distributions. The Stinger initiative, though contributed to by many companies as diverse as Microsoft (yes, really!) and Intel, was lead by Hortonworks, probably the second largest vendor of Hadoop distributions. The fact is that if you are using the Cloudera distribution of Hadoop, then some of the core features of Hive might be slower to arrive, whereas Impala will always be up-to-date. Conversely, if you use another distribution, you might get the latest Hive release, but that might either have an older Impala or, as is currently the case, you might have to download and install it yourself.
A similar situation has arisen with the Parquet and ORC file formats mentioned earlier. Parquet is preferred by Impala and developed by a group of companies led by Cloudera, while ORC is preferred by Hive and is championed by Hortonworks.
Unfortunately, the reality is that Parquet support is often very quick to arrive in the Cloudera distribution but less so in say the Hortonworks distribution, where the ORC file format is preferred.
These themes are a little concerning since, although competition in this space is a good thing, and arguably the announcement of Impala helped energize the Hive community, there is a greater risk that your choice of distribution might have a larger impact on the tools and file formats that will be fully supported, unlike in the past. Hopefully, the current situation is just an artifact of where we are in the development cycles of all these new and improved technologies, but do consider your choice of distribution carefully in relation to your SQL-on-Hadoop needs.
You should also consider that SQL on Hadoop no longer only refers to Hive or Impala. Apache Drill (http://drill.incubator.apache.org) is a fuller implementation of the Dremel model first described by Google. Although Impala implements the Dremel architecture across HDFS data, Drill looks to provide similar functionality across multiple data sources. It is still in its early stages, but if your needs are broader than what Hive or Impala provides, it might be worth considering.
Tajo (http://tajo.apache.org) is another Apache project that seeks to be a full data warehouse system on Hadoop data. With an architecture similar to that of Impala, it offers a much richer system with components such as multiple optimizers and ETL tools that are commonplace in traditional data warehouses but less frequently bundled in the Hadoop world. It has a much smaller user base but has been used by certain companies very successfully for a significant length of time, and might be worth considering if you need a fuller data warehousing solution.
Other products are also emerging in this space, and it's a good idea to do some research. Hive and Impala are awesome tools, but if you find that they don't meet your needs, then look around—something else might.
In its early days, Hadoop was sometimes erroneously seen as the latest supposed relational database killer. Over time, it has become more apparent that the more sensible approach is to view it as a complement to RDBMS technologies and that, in fact, the RDBMS community has developed tools such as SQL that are also valuable in the Hadoop world.
HiveQL is an implementation of SQL on Hadoop and was the primary focus of this article. In regard to HiveQL and its implementations, we covered the following topics:
With Pig and Hive, we've introduced alternative models to process MapReduce data, but so far we've not looked at another question: what approaches and tools are required to actually allow this massive dataset being collected in Hadoop to remain useful and manageable over time? In the next article, we'll take a slight step up the abstraction hierarchy and look at how to manage the life cycle of this enormous data asset.
Further resources on this subject: