Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
All Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletter Hub
Free Learning
Arrow right icon
timer SALE ENDS IN
0 Days
:
00 Hours
:
00 Minutes
:
00 Seconds

Performance Considerations

Save for later
  • 780 min read
  • 2015-03-03 00:00:00

article-image

In this article by Dayong Du, the author of Apache Hive Essentials, we will look at the different performance considerations when using Hive. Although Hive is built to deal with big data, we still cannot ignore the importance of performance. Most of the time, a better Hive query can rely on the smart query optimizer to find the best execution strategy as well as the default setting best practice from vendor packages. However, as experienced users, we should learn more about the theory and practice of performance tuning in Hive, especially when working in a performance-based project or environment. We will start from utilities available in Hive to find potential issues causing poor performance. Then, we introduce the best practices of performance considerations in the areas of queries and job.

(For more resources related to this topic, see here.)

Performance utilities

Hive provides the EXPLAIN and ANALYZE statements that can be used as utilities to check and identify the performance of queries.

The EXPLAIN statement

Hive provides an EXPLAIN command to return a query execution plan without running the query. We can use an EXPLAIN command for queries if we have a doubt or a concern about performance. The EXPLAIN command will help to see the difference between two or more queries for the same purpose. The syntax for EXPLAIN is as follows:

EXPLAIN [EXTENDED|DEPENDENCY|AUTHORIZATION] hive_query

The following keywords can be used:

  • EXTENDED: This provides additional information for the operators in the plan, such as file pathname and abstract syntax tree.
  • DEPENDENCY: This provides a JSON format output that contains a list of tables and partitions that the query depends on. It is available since HIVE 0.10.0.
  • AUTHORIZATION: This lists all entities needed to be authorized including input and output to run the Hive query and authorization failures, if any. It is available since HIVE 0.14.0.

A typical query plan contains the following three sections. We will also have a look at an example later:

  • Abstract syntax tree (AST): Hive uses a pacer generator called ANTLR (see http://www.antlr.org/) to automatically generate a tree of syntax for HQL. We can usually ignore this most of the time.
  • Stage dependencies: This lists all dependencies and number of stages used to run the query.
  • Stage plans: It contains important information, such as operators and sort orders, for running the job.

The following is what a typical query plan looks like. From the following example, we can see that the AST section is not shown since the EXTENDED keyword is not used with EXPLAIN. In the STAGE DEPENDENCIES section, both Stage-0 and Stage-1 are independent root stages. In the STAGE PLANS section, Stage-1 has one map and reduce referred to by Map Operator Tree and Reduce Operator Tree. Inside each Map/Reduce Operator Tree section, all operators corresponding to Hive query keywords as well as expressions and aggregations are listed. The Stage-0 stage does not have map and reduce. It is just a Fetch operation.

jdbc:hive2://> EXPLAIN SELECT sex_age.sex, count(*)
. . . . . . .> FROM employee_partitioned
. . . . . . .> WHERE year=2014 GROUP BY sex_age.sex LIMIT 2;
+-----------------------------------------------------------------------------+
| Explain |
+-----------------------------------------------------------------------------+
| STAGE DEPENDENCIES: |
| Stage-1 is a root stage |
| Stage-0 is a root stage |
| |
| STAGE PLANS: |
| Stage: Stage-1 |
| Map Reduce |
| Map Operator Tree: |
| TableScan |
| alias: employee_partitioned |
| Statistics: Num rows: 0 Data size: 227 Basic stats:PARTIAL |
| Column stats: NONE |
| Select Operator |
| expressions: sex_age (type: struct<sex:string,age:int>) |
| outputColumnNames: sex_age |
| Statistics: Num rows: 0 Data size: 227 Basic stats:PARTIAL |
| Column stats: NONE |
| Group By Operator |
| aggregations: count() |
| keys: sex_age.sex (type: string) |
| mode: hash |
| outputColumnNames: _col0, _col1 |
| Statistics: Num rows: 0 Data size: 227 Basic stats:PARTIAL |
| Column stats: NONE |
| Reduce Output Operator |
| key expressions: _col0 (type: string) |
| sort order: + |
| Map-reduce partition columns: _col0 (type: string) |
| Statistics: Num rows: 0 Data size: 227 Basic stats:PARTIAL|
| Column stats: NONE |
| value expressions: _col1 (type: bigint) |
| Reduce Operator Tree: |
| Group By Operator |
| aggregations: count(VALUE._col0) |
| keys: KEY._col0 (type: string) |
| mode: mergepartial |
| outputColumnNames: _col0, _col1 |
| Statistics: Num rows: 0 Data size: 0 Basic stats: NONE |
| Column stats: NONE |
| Select Operator |
| expressions: _col0 (type: string), _col1 (type: bigint) |
| outputColumnNames: _col0, _col1 |
| Statistics: Num rows: 0 Data size: 0 Basic stats: NONE |
| Column stats: NONE |
| Limit |
| Number of rows: 2 |
| Statistics: Num rows: 0 Data size: 0 Basic stats: NONE |
| Column stats: NONE |
| File Output Operator |
| compressed: false |
| Statistics: Num rows: 0 Data size: 0 Basic stats: NONE |
| Column stats: NONE |
| table: |
| input format: org.apache.hadoop.mapred.TextInputFormat |
| output format:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat|
| serde:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe|
| |
| Stage: Stage-0 |
| Fetch Operator |
| limit: 2 |
+-----------------------------------------------------------------------------+
53 rows selected (0.26 seconds)

The ANALYZE statement

Hive statistics are a collection of data that describe more details, such as the number of rows, number of files, and raw data size, on the objects in the Hive database. Statistics is a metadata of Hive data. Hive supports statistics at the table, partition, and column level. These statistics serve as an input to the Hive Cost-Based Optimizer (CBO), which is an optimizer to pick the query plan with the lowest cost in terms of system resources required to complete the query.

The statistics are gathered through the ANALYZE statement since Hive 0.10.0 on tables, partitions, and columns as given in the following examples:

jdbc:hive2://> ANALYZE TABLE employee COMPUTE STATISTICS;
No rows affected (27.979 seconds)
jdbc:hive2://> ANALYZE TABLE employee_partitioned
. . . . . . .> PARTITION(year=2014, month=12) COMPUTE STATISTICS;
No rows affected (45.054 seconds)
jdbc:hive2://> ANALYZE TABLE employee_id COMPUTE STATISTICS
. . . . . . .> FOR COLUMNS employee_id;
No rows affected (41.074 seconds)

Once the statistics are built, we can check the statistics by the DESCRIBE EXTENDED/FORMATTED statement. From the table/partition output, we can find the statistics information inside the parameters, such as parameters:{numFiles=1, COLUMN_STATS_ACCURATE=true, transient_lastDdlTime=1417726247, numRows=4, totalSize=227, rawDataSize=223}). The following is an example:

jdbc:hive2://> DESCRIBE EXTENDED employee_partitioned
. . . . . . .> PARTITION(year=2014, month=12);
jdbc:hive2://> DESCRIBE EXTENDED employee;

parameters:{numFiles=1, COLUMN_STATS_ACCURATE=true, transient_
lastDdlTime=1417726247, numRows=4, totalSize=227, rawDataSize=223}).
jdbc:hive2://> DESCRIBE FORMATTED employee.name;
+--------+---------+---+---+---------+--------------+-----------+-----------+
|col_name|data_type|min|max|num_nulls|distinct_count|avg_col_len|max_col_len|
+--------+---------+---+---+---------+--------------+-----------+-----------+
| name | string | | | 0 | 5 | 5.6 | 7 |
+--------+---------+---+---+---------+--------------+-----------+-----------+
+---------+----------+-----------------+
|num_trues|num_falses| comment |
+---------+----------+-----------------+
| | |from deserializer|
+---------+----------+-----------------+
3 rows selected (0.116 seconds)

Hive statistics are persisted in the metastore to avoid computing them every time. For newly created tables and/or partitions, statistics are automatically computed by default if we enable the following setting:

jdbc:hive2://> SET hive.stats.autogather=ture;

Hive logs

Logs provide useful information to find out how a Hive query/job runs. By checking the Hive logs, we can identify runtime problems and issues that may cause bad performance. There are two types of logs available in Hive: system log and job log.

The system log contains the Hive running status and issues. It is configured in {HIVE_HOME}/conf/hive-log4j.properties. The following three lines for Hive log can be found:

hive.root.logger=WARN,DRFA
hive.log.dir=/tmp/${user.name}
hive.log.file=hive.log

To modify the status, we can either modify the preceding lines in hive-log4j.properties (applies to all users) or set from the Hive CLI (only applies to the current user and current session) as follows:

hive --hiveconf hive.root.logger=DEBUG,console

The job log contains Hive query information and is saved at the same place, /tmp/${user.name}, by default as one file for each Hive user session. We can override it in hive-site.xml with the hive.querylog.location property. If a Hive query generates MapReduce jobs, those logs can also be viewed through the Hadoop JobTracker Web UI.

Job and query optimization

Job and query optimization covers experience and skills to improve performance in the area of job-running mode, JVM reuse, job parallel running, and query optimizations in JOIN.

Local mode

Hadoop can run in standalone, pseudo-distributed, and fully distributed mode. Most of the time, we need to configure Hadoop to run in fully distributed mode. When the data to process is small, it is an overhead to start distributed data processing since the launching time of the fully distributed mode takes more time than the job processing time. Since Hive 0.7.0, Hive supports automatic conversion of a job to run in local mode with the following settings:

jdbc:hive2://> SET hive.exec.mode.local.auto=true; --default false
jdbc:hive2://> SET hive.exec.mode.local.auto.inputbytes.max=50000000;
jdbc:hive2://> SET hive.exec.mode.local.auto.input.files.max=5;
--default 4

A job must satisfy the following conditions to run in the local mode:

Unlock access to the largest independent learning library in Tech for FREE!
Get unlimited access to 7500+ expert-authored eBooks and video courses covering every tech area you can think of.
Renews at AU $19.99/month. Cancel anytime
  • The total input size of the job is lower than hive.exec.mode.local.auto.inputbytes.max
  • The total number of map tasks is less than hive.exec.mode.local.auto.input.files.max
  • The total number of reduce tasks required is 1 or 0

JVM reuse

By default, Hadoop launches a new JVM for each map or reduce job and runs the map or reduce task in parallel. When the map or reduce job is a lightweight job running only for a few seconds, the JVM startup process could be a significant overhead. The MapReduce framework (version 1 only, not Yarn) has an option to reuse JVM by sharing the JVM to run mapper/reducer serially instead of parallel. JVM reuse applies to map or reduce tasks in the same job. Tasks from different jobs will always run in a separate JVM. To enable the reuse, we can set the maximum number of tasks for a single job for JVM reuse using the mapred.job.reuse.jvm.num.tasks property. Its default value is 1:

jdbc:hive2://> SET mapred.job.reuse.jvm.num.tasks=5;

We can also set the value to –1 to indicate that all the tasks for a job will run in the same JVM.

Parallel execution

Hive queries commonly are translated into a number of stages that are executed by the default sequence. These stages are not always dependent on each other. Instead, they can run in parallel to save the overall job running time. We can enable this feature with the following settings:

jdbc:hive2://> SET hive.exec.parallel=true; -- default false
jdbc:hive2://> SET hive.exec.parallel.thread.number=16;
-- default 8, it defines the max number for running in parallel

Parallel execution will increase the cluster utilization. If the utilization of a cluster is already very high, parallel execution will not help much in terms of overall performance.

Join optimization

Here, we'll briefly review the key settings for join improvement.

Common join

The common join is also called reduce side join. It is a basic join in Hive and works for most of the time. For common joins, we need to make sure the big table is on the right-most side or specified by hit, as follows:

/*+ STREAMTABLE(stream_table_name) */.

Map join

Map join is used when one of the join tables is small enough to fit in the memory, so it is very fast but limited. Since Hive 0.7.0, Hive can convert map join automatically with the following settings:

jdbc:hive2://> SET hive.auto.convert.join=true; --default false
jdbc:hive2://> SET hive.mapjoin.smalltable.filesize=600000000;
--default 25M
jdbc:hive2://> SET hive.auto.convert.join.noconditionaltask=true;
--default false. Set to true so that map join hint is not needed
jdbc:hive2://> SET hive.auto.convert.join.noconditionaltask.
size=10000000;
--The default value controls the size of table to fit in memory

Once autoconvert is enabled, Hive will automatically check if the smaller table file size is bigger than the value specified by hive.mapjoin.smalltable.filesize, and then Hive will convert the join to a common join. If the file size is smaller than this threshold, it will try to convert the common join into a map join. Once autoconvert join is enabled, there is no need to provide the map join hints in the query.

Bucket map join

Bucket map join is a special type of map join applied on the bucket tables. To enable bucket map join, we need to enable the following settings:

jdbc:hive2://> SET hive.auto.convert.join=true; --default false
jdbc:hive2://> SET hive.optimize.bucketmapjoin=true; --default false

In bucket map join, all the join tables must be bucket tables and join on buckets columns. In addition, the buckets number in bigger tables must be a multiple of the bucket number in the small tables.

Sort merge bucket (SMB) join

SMB is the join performed on the bucket tables that have the same sorted, bucket, and join condition columns. It reads data from both bucket tables and performs common joins (map and reduce triggered) on the bucket tables. We need to enable the following properties to use SMB:

jdbc:hive2://> SET hive.input.format=
. . . . . . .> org.apache.hadoop.hive.ql.io.
BucketizedHiveInputFormat;
jdbc:hive2://> SET hive.auto.convert.sortmerge.join=true;
jdbc:hive2://> SET hive.optimize.bucketmapjoin=true;
jdbc:hive2://> SET hive.optimize.bucketmapjoin.sortedmerge=true;
jdbc:hive2://> SET hive.auto.convert.sortmerge.join.
noconditionaltask=true;

Sort merge bucket map (SMBM) join

SMBM join is a special bucket join but triggers map-side join only. It can avoid caching all rows in the memory like map join does. To perform SMBM joins, the join tables must have the same bucket, sort, and join condition columns. To enable such joins, we need to enable the following settings:

jdbc:hive2://> SET hive.auto.convert.join=true;
jdbc:hive2://> SET hive.auto.convert.sortmerge.join=true
jdbc:hive2://> SET hive.optimize.bucketmapjoin=true;
jdbc:hive2://> SET hive.optimize.bucketmapjoin.sortedmerge=true;
jdbc:hive2://> SET hive.auto.convert.sortmerge.join.
noconditionaltask=true;
jdbc:hive2://> SET hive.auto.convert.sortmerge.join.bigtable.
selection.policy=
org.apache.hadoop.hive.ql.optimizer.
TableSizeBasedBigTableSelectorForAutoSMJ;

Skew join

When working with data that has a highly uneven distribution, the data skew could happen in such a way that a small number of compute nodes must handle the bulk of the computation. The following setting informs Hive to optimize properly if data skew happens:

jdbc:hive2://> SET hive.optimize.skewjoin=true;
--If there is data skew in join, set it to true. Default is false.
jdbc:hive2://> SET hive.skewjoin.key=100000;
--This is the default value. If the number of key is bigger than
--this, the new keys will send to the other unused reducers.

Skew data could happen on the GROUP BY data too. To optimize it, we need to do the following settings to enable skew data optimization in the GROUP BY result:

SET hive.groupby.skewindata=true;

Once configured, Hive will first trigger an additional MapReduce job whose map output will randomly distribute to the reducer to avoid data skew.

For more information about Hive join optimization, please refer to the Apache Hive wiki available at https://cwiki.apache.org/confluence/display/Hive/LanguageManual+JoinOptimization and https://cwiki.apache.org/confluence/display/Hive/Skewed+Join+Optimization.

Summary

In this article, we first covered how to identify performance bottlenecks using the EXPLAIN and ANALYZE statements. Then, we discussed job and query optimization in Hive.

Resources for Article:


Further resources on this subject: