Hive Performance Tuning Guide

Using Hive to build offline data warehouse is a very common scheme in enterprises. Although Hive's usage scenario is to process big data through batch processing, it is usually insensitive to processing time. However, in the case of limited resources, we need to pay attention to Hive's performance tuning to facilitate the rapid output of data. At the same time, Hive performance tuning is also a common question in the interview. Therefore, mastering some methods of Hive performance tuning can not only improve efficiency in the work, but also stand out in the interview. This article will introduce Hive performance tuning in four aspects, mainly including:

√ tools for performance tuning

√ design optimization

√ data storage optimization

√ job optimization

Tools for performance tuning

HQL provides two tools for viewing query performance: explain and analyze. In addition, Hive's log also provides very detailed information for viewing execution performance and troubleshooting errors.

Make good use of explain statements

explain statement is a tool often used to view execution plans. You can use this statement to analyze and query execution plans. The specific syntax is as follows:


In the above execution statement, there are four optional keywords, which have the following specific meanings:

  • FORMATTED: formats the execution plan and returns the execution plan in JSON format

  • EXTENDED: provides some additional information, such as file path information

  • DEPENDENCY: returns the list of tables and partitions that the query depends on in JSON format from hive0 10 start to use, as shown in the figure below

  • AUTHORIZATION: list the items that need to be authorized, including input and output, from hive0 14 start to use, as shown in the figure below

A typical query execution plan mainly includes three parts, as follows:

  • Abstract Syntax Tree (AST): abstract syntax tree. Hive uses a parsing generator called antlr to automatically generate HQL into an abstract syntax tree

  • Stage Dependencies: it will list all dependencies of running query and the number of stages

  • Stage Plans: contains very important information, such as operator s and sort orders when running jobs

Take a chestnut

Suppose there is a table:

CREATE TABLE employee_partitioned
  name string,
  work_place ARRAY<string>,
  gender_age STRUCT<gender:string,age:int>,
  skills_score MAP<string,int>,
  depart_title MAP<STRING,ARRAY<STRING>>

View execution plan:

SELECT gender_age.gender,
FROM employee_partitioned
GROUP BY gender_age.gender

Overview of the implementation plan:

As shown in the figure above: Map/Reduce operator tree is the AST part of the abstract syntax tree; Stage requirements includes three stages: stage-0, Stage-1 and Stage-2. Stage-0 is the root stage, that is, Stage-1 and Stage-2 depend on stage-0; In STAGE PLANS, both Stage-1 and stage 2 contain a Map Operator Tree and a Reduce Operator Tree. Stage-0 does not contain map and reduce, but only a fetch data operation.

Execution plan details:

  Stage-1 is a root stage
  Stage-2 depends on stages: Stage-1
  Stage-0 depends on stages: Stage-2

  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
            alias: employee_partitioned
            filterExpr: (year = 2020) (type: boolean)
            Statistics: Num rows: 1 Data size: 227 Basic stats: PARTIAL Column stats: NONE
            Select Operator
              expressions: gender_age (type: struct<gender:string,age:int>)
              outputColumnNames: gender_age
              Statistics: Num rows: 1 Data size: 227 Basic stats: PARTIAL Column stats: NONE
              Reduce Output Operator
                key expressions: gender_age.gender (type: string)
                sort order: +
                Map-reduce partition columns: rand() (type: double)
                Statistics: Num rows: 1 Data size: 227 Basic stats: PARTIAL Column stats: NONE
      Reduce Operator Tree:
        Group By Operator
          aggregations: count()
          keys: KEY._col0 (type: string)
          mode: partial1
          outputColumnNames: _col0, _col1
          Statistics: Num rows: 1 Data size: 227 Basic stats: COMPLETE Column stats: NONE
          File Output Operator
            compressed: false
                input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                output format:
                serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

  Stage: Stage-2
    Map Reduce
      Map Operator Tree:
            Reduce Output Operator
              key expressions: _col0 (type: string)
              sort order: +
              Map-reduce partition columns: _col0 (type: string)
              Statistics: Num rows: 1 Data size: 227 Basic stats: COMPLETE Column stats: NONE
              value expressions: _col1 (type: bigint)
      Reduce Operator Tree:
        Group By Operator
          aggregations: count(VALUE._col0)
          keys: KEY._col0 (type: string)
          mode: final
          outputColumnNames: _col0, _col1
          Statistics: Num rows: 1 Data size: 227 Basic stats: COMPLETE Column stats: NONE
            Number of rows: 2
            Statistics: Num rows: 1 Data size: 227 Basic stats: COMPLETE Column stats: NONE
            File Output Operator
              compressed: false
              Statistics: Num rows: 1 Data size: 227 Basic stats: COMPLETE Column stats: NONE
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format:
                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: 2
      Processor Tree:

Skillfully using analyze statement

The analyze statement can collect some detailed statistical information, such as the number of rows in the table, the number of files, the size of data and so on. These statistics are stored as metadata in hive's metabase. Hive supports table, partition and column level statistics (similar to Impala). This information is used as the input of hive's cost based optimizer (CBO). The main function of the optimizer is to select the query plan that consumes the least system resources. Actually, in hive 3 In version 2.0, these statistics can be collected automatically. Of course, you can also manually count the information of tables, partitions or fields through the analyze statement. The specific usage is as follows:

  • 1. Collect the statistical information of the table (non partition table). When the NOSCAN keyword is specified, the scanned file content will be ignored, and only the number and size of files will be counted, which will be faster

-- Not used NOSCAN keyword
Table default.user_behavior stats: [numFiles=1, numRows=10, totalSize=229, rawDataSize=219]
Time taken: 23.504 seconds
-- use NOSCAN keyword
Table default.user_behavior stats: [numFiles=1, numRows=10, totalSize=229, rawDataSize=219]
Time taken: 0.309 seconds
  • 2. Collect statistical information of partition table

-- Collect statistics for specific partitions
hive> ANALYZE TABLE employee_partitioned PARTITION(year=2020, month=06) COMPUTE STATISTICS;
Partition default.employee_partitioned{year=2020, month=06} stats: [numFiles=1, numRows=0, totalSize=227, rawDataSize=0]
Time taken: 19.283 seconds

-- Collect statistics for all partitions
hive> ANALYZE TABLE employee_partitioned PARTITION(year, month) COMPUTE STATISTICS;
Partition default.employee_partitioned{year=2020, month=06} stats: [numFiles=1, numRows=0, totalSize=227, rawDataSize=0]
Time taken: 17.528 seconds
  • 3. Collect statistics of a field in the table


Scream tips:

You can set: set hive stats. Autogather = true to collect statistics automatically. For tables or partitions operated by INSERT OVERWRITE/INTO, statistics can be collected automatically. It is worth noting that the LOAD operation does not automatically collect statistics

Once these statistics are collected, you can query the statistics through the DESCRIBE EXTENDED/FORMATTED statement. The specific usage is as follows:

-- View statistics of a partition
hive> DESCRIBE FORMATTED employee_partitioned PARTITION(year=2020, month=06);
Partition Parameters:            
        COLUMN_STATS_ACCURATE   true                
        numFiles                1                   
        numRows                 0                   
        rawDataSize             0                   
        totalSize               227                 
        transient_lastDdlTime   1591437967 
-- View statistics of a table
hive> DESCRIBE FORMATTED employee_partitioned;
Table Parameters:                
        numPartitions           1                   
        transient_lastDdlTime   1591431482 
-- View statistics for a column
hive> DESCRIBE FORMATTED  user_behavior.user_id;

Common log analysis

Logs provide detailed information about job operation. By viewing the log information, you can analyze the problems that lead to job execution bottlenecks, mainly including two types of logs: system logs and job logs.

The system log contains information such as Hive running status, which can be accessed through {Hive_home} / conf / Hive-log4j The main configuration options are:

hive.root.logger=WARN,DRFA ##Log level
hive.log.dir=/tmp/${} ##Log path
hive.log.file=hive.log ##Log name

You can also set the log level through the Hive cli command line: $hive -- hiveconf hive root. Logger = debug, console this method can only take effect in the current session.

The job information contained in the job log is usually managed by YARN. You can use YARN logs - applicationid < application_ ID > command to view the job log.

design optimization

Partition table

For a relatively large table, designing its component area table can improve the query performance. For a query of a specific partition, only the file data of the corresponding partition path will be loaded, so the execution speed will be faster. It is worth noting that the selection of partition fields is an important factor affecting query performance. Try to avoid deep partitions, which will cause too many subfolders. Some common partition fields can be:

  • Date or time

For example, year, month, day or hour can be used when there are time or date fields in the table.

  • geographical position

Such as country, province, city, etc

  • Business logic

Such as department, sales area, customer, etc

Barrel table

Similar to the partition table, the bucket table is organized by dividing the files on HDFS into multiple files. Bucket splitting can speed up data sampling and improve the performance of the join (the field of the join is the bucket splitting field). Because bucket splitting can ensure that the data corresponding to a key is in a specific bucket (file), skillfully selecting the bucket splitting field can greatly improve the performance of the join. Generally, the bucket sorting field can select the fields that are often used in filtering or join operations.


Index creation is a common method for performance tuning of relational databases, and hive is no exception. Hive supports indexes since version 0.7. Using indexes is a relatively cheap operation compared with full table scanning. The methods of creating indexes in hive are as follows:

CREATE INDEX idx_user_id_user_behavior
ON TABLE user_behavior (user_id)

The COMPACT index created above stores the pair of the index column and its corresponding block id. In addition to this index, Hive also supports bitmap index (BITMAP), which is used as follows:

CREATE INDEX idx_behavior_user_behavior
ON TABLE user_behavior (behavior)

When creating the index above, the WITH DEFERRED REBUILD option is used. This option can prevent the index from being created immediately. When creating the index, you can use lter Rebuild command (see the following example). It is worth noting that when the base table (the table to be indexed) changes, the command needs to be executed again to update the index to the latest state.

ALTER INDEX idx_user_id_user_behavior ON user_behavior REBUILD;

Once the index is created successfully, an index table will be generated. The name format of the table is: database name__ Table name_ Index name _, You can view the index using the following command:

hive> SHOW TABLES '*idx*';
Time taken: 0.044 seconds, Fetched: 1 row(s)

The index table contains the index column, the file URI of HDFS and the offset of each row, which can be viewed through the following command:

-- View index table structure
hive> DESC default__user_behavior_idx_user_id_user_behavior__;
user_id                 int                                         
_bucketname             string                                      
_offsets                array<bigint>                               
Time taken: 0.109 seconds, Fetched: 3 row(s)
-- View index table contents
hive> SELECT * FROM default__user_behavior_idx_user_id_user_behavior__;
9       hdfs://cdh03:8020/user/hive/warehouse/user_behavior/userbehavior.csv    [181]
7       hdfs://cdh03:8020/user/hive/warehouse/user_behavior/userbehavior.csv    [136]
1       hdfs://cdh03:8020/user/hive/warehouse/user_behavior/userbehavior.csv    [0]
6       hdfs://cdh03:8020/user/hive/warehouse/user_behavior/userbehavior.csv    [113]
5       hdfs://cdh03:8020/user/hive/warehouse/user_behavior/userbehavior.csv    [90]
10      hdfs://cdh03:8020/user/hive/warehouse/user_behavior/userbehavior.csv    [205]
4       hdfs://cdh03:8020/user/hive/warehouse/user_behavior/userbehavior.csv    [66]
8       hdfs://cdh03:8020/user/hive/warehouse/user_behavior/userbehavior.csv    [158]
3       hdfs://cdh03:8020/user/hive/warehouse/user_behavior/userbehavior.csv    [44]
2       hdfs://cdh03:8020/user/hive/warehouse/user_behavior/userbehavior.csv    [22]
Time taken: 0.28 seconds, Fetched: 10 row(s)

If you want to delete an index, you can use the DROP INDEX command as follows:

DROP INDEX idx_user_id_user_behavior ON user_behavior;

Using the sketched / temporary table

Hive can use internal tables, external tables, partition tables and bucket tables, as well as sketched / temporary tables, which can also improve performance to a certain extent.

Hive has supported the skew table since version 0.10, which can alleviate data skew. The reason why this table can improve performance is that it can automatically divide the data causing data skew into different files or paths. Examples of use are as follows:

CREATE TABLE sample_skewed_table (
dept_no int, 
dept_name string
SKEWED BY (dept_no) ON (1000, 2000);-- Specify data skew field

In addition, the temporary temporary table can be used to build the data set of the public part into a temporary table. At the same time, the temporary table supports the data storage of SSD or memory, which can improve the performance.

Data storage optimization

file format

Hive supports TEXTFILE, SEQUENCEFILE, AVRO, RCFILE, ORC, and PARQUET file formats. You can specify the file format of the table in two ways:

  • CREATE TABLE ... STORE AS <file_ Format >: Specifies the file format when creating a table. The default is TEXTFILE

  • ALTER TABLE ... [PARTITION partition_spec] SET FILEFORMAT <file_ Format >: modify the file format of a specific table

Once the table with the storage file format of text is created, a text type file can be loaded directly through the load command. We can use this command to load the data into a table in text format, and then insert overwrite / into table The select command loads data into tables in other file formats.

Scream tips:

If you want to change the default file format for creating tables, you can use hive default. fileformat=<file_ Format > can be configured for all tables. You can also use hive default. fileformat. managed = <file_ Format > is configured. The configuration is only applicable to internal tables or external tables

TEXT, SEQUENCE and AVRO files are row oriented file storage formats, not the best file format, because even if only one column of data is queried, the table using these storage formats also needs to read a complete row of data. On the other hand, the column oriented storage format (RCFILE, ORC, PARQUET) can solve the above problems. The description of each file format is as follows:


The default file format when creating a table, and the data is stored in text format. Text files can be split and processed in parallel, or compressed, such as GZip, LZO or Snappy. However, most compressed files do not support segmentation and parallel processing, which will cause a job to have only one mapper to process data. When using compressed text files, ensure that the size of the file is not too large, which is generally close to the size of two HDFS blocks.


For the binary storage format of key/value pair, the advantage of sequence file is better compression than text format. Sequence file can be compressed into block level records, and block level compression is a good compression ratio. If you use block compression, you need to use the following configuration: set hive exec. compress. output=true; set io. seqfile. compression. type=BLOCK

  • AVRO

In addition to binary format files, avro is also a framework for serialization and deserialization. avro provides a specific data schema.


The full name is Record Columnar File. Firstly, the table is divided into several row groups, and the data in each row group is stored by column. The data of each column is stored separately, that is, horizontally and then vertically.

  • ORC

The full name is Optimized Row Columnar, from hive0 Since version 11, ORC format is an optimized format of RCFILE format, which provides a larger default block (256M)


Another file format of columnar storage is very similar to ORC. Compared with orc, Parquet format supports a wider range of ecology. For example, the lower version of impala does not support Orc format


Compression technology can reduce the data transmission between map and reduce, so as to improve the query performance. The configuration of compression can be in hive command line or hive site XML file

SET hive.exec.compress.intermediate=true

After compression is enabled, you can select the following compression format:

The encoder for compression can be through mapred site xml, hive-site. XML or through the command line, for example:

-- Intermediate result compression
-- Output result compression
SET hive.exec.compress.output=true;

Storage optimization

The frequently accessed data is called hot data, which can improve the query performance for hot data. For example, by increasing the number of copies of hot data, you can increase the possibility of local hit of data, so as to improve the query performance. Of course, this needs to be weighed against the storage capacity.

$ hdfs dfs -setrep -R -w 4 /user/hive/warehouse/employee

Note that a large number of small files or redundant copies will cause memory consumption of the namenode node, especially a large number of files smaller than the HDFS block size. HDSF itself provides solutions for dealing with small files:

  • Hadoop Archive/HAR: package small files into large files

  • Sequence file format: compress small files into large files

  • CombineFileInputFormat: combine small files before map and reduce processing

  • HDFS Federation: the HDFS Federation uses multiple namenode nodes to manage files

For Hive, you can use the following configuration to merge the files of query results, so as to avoid small files:

  • hive.merge.mapfiles: in a job with only map, merge the final result file. The default is true

  • hive.merge.mapredfiles: the result small file of merge mapreduce job is false by default, and can be set to true

  • hive.merge.size.per.task: defines the size of the merged file. The default is 256000000, or 256MB

  • hive.merge.smallfiles.avgsize: T is the file size threshold that triggers file merging. The default value is 16000000

When the size of the output result file of a job is less than Hive merge. smallfiles. Avgsize sets the threshold, and Hive merge. Mapfiles and Hive merge. If mapredfiles is set to true, Hive will start an additional mr job to merge the output small files into large files.

Job optimization

Local mode

When Hive processes a small amount of data, it will be a bit wasteful to start distributed data processing, because the start-up time may be longer than the data processing time, from Hive0 After version 7, Hive supports the dynamic conversion of jobs to local mode. The following configuration is required:

SET; -- default false
SET; -- default 4

As long as a job meets the following conditions, the local mode will be enabled

  • The size of the input file is smaller than hive exec. mode. local. auto. inputbytes. Max configured size

  • The number of map tasks is less than hive exec. mode. local. auto. input. files. Max configured size

  • The number of reduce tasks is 1 or 0

JVM reuse

By default, Hadoop will start a JVM for a map or reduce, so that map and reduce can be executed in parallel. When map or reduce is a lightweight job that only runs for a few seconds, the JVM takes longer to start the process than the job executes. Hadoop can reuse JVMs and run map or reduce in a serial rather than parallel manner by sharing JVMs. The reuse of JVM is applicable to the map and reduce of the same job. For tasks of different jobs, the JVM cannot be shared. If you want to enable JVM reuse, you need to configure the maximum number of tasks in a job. The default value is 1. If it is set to - 1, it means there is no limit:

SET mapreduce.job.jvm.numtasks=5;

The disadvantage of this function is that turning on JVM reuse will always occupy the task slot used for reuse, which can not be released until the task is completed. If several reduce tasks in an "unbalanced" job take much more time to execute than other reduce tasks, the reserved slots will remain idle but cannot be used by other jobs until all tasks are completed.

Parallel execution

The query of Hive is usually converted into a series of stages. These stages are not always interdependent, so these stages can be executed in parallel, which can be configured in the following ways:

SET hive.exec.parallel=true; -- default false
SET hive.exec.parallel.thread.number=16; -- Default 8

Parallel execution can increase the utilization of cluster resources. If the utilization of cluster resources is already very high, the effect of parallel execution will not be obvious.

Fetch mode

Fetch mode means that the query of some cases in Hive can be calculated without MapReduce. You can simply read the files in the storage directory corresponding to the table, and then output the query results to the console. After the fetch mode is enabled, start MapReduce in global search, field search and limit search, and configure it in the following ways:


JOIN optimization

Common join

Ordinary join, also known as reduce end join, is the most basic join and takes a long time. For large table join small table, you need to put the large table on the right, that is, small table join large table. The new version of hive has optimized small table join large table and large table join small table. There is no obvious difference between the small table on the left and the right.

map side join

Map side join is applicable to the case when a table is very small (it can be stored in memory), that is, the small table can be loaded into memory. Hive supports automatic conversion to map side join from 0.7. The specific configuration is as follows:

SET; --  hivev0.11.0 Later default true
SET hive.mapjoin.smalltable.filesize=600000000; -- default 25m
SET; -- default true,So you don't need to specify map join hint
SET; -- Controls the size of the table loaded into memory

Once the map side join configuration is enabled, Hive will automatically check whether the small table is larger than Hive mapjoin. smalltable. Filesize is the configured size. If it is greater than, it will be converted to a normal join. If it is less than, it will be converted to a map side join.

The principle of map side join is shown in the following figure:

First, task a (task executed locally by the client) is responsible for reading small table a, converting it into a HashTable data structure, writing it to the local file, and then loading it into the distributed cache.

Then, Task B will start the map task to read large table b. in the map phase, it will associate each record with the hashtable corresponding to table a in the distributed cache and output the results

Note: there is no reduce task in the map side join, so the map directly outputs the results, that is, how many map tasks will produce how many result files.

Bucket map join

bucket map join is a special map end join. The main difference is that it is applied to the bucket table. If you want to open the map end join of the bucket division, you need to open the following configuration:

SET hive.optimize.bucketmapjoin=true; -- default false

In a bucket sharing map side join, all tables participating in the join must be bucket sharing tables, and the field of the join must be a bucket sharing field (specified by CLUSTERED BY). In addition, the bucket sharing quantity of large tables must be a multiple of the bucket sharing quantity of small tables.

Compared with ordinary joins, bucket divided joins only read the required bucket data and do not need full table scanning.

Sort merge bucket (SMB) join

SMBjoin is applied to bucket splitting tables. If the two tables participating in the join are sorted and the bucket splitting fields are the same, sort merge join can be used. Its advantage is that it does not need to completely load the small table into memory. The bucket corresponding to the two bucket splitting tables will be read. The configuration of ordinary join (including map and reduce) is as follows:

SET hive.input.format=;
SET hive.optimize.bucketmapjoin=true;
SET hive.optimize.bucketmapjoin.sortedmerge=true;

Sort merge bucket map (SMBM) join

SMBM join is a special bucket map join. Unlike the map side join, it does not need to load all data rows of the small table into memory. Using SMBM join, the tables participating in the join must be sorted, have the same bucket field, and the join field is the same as the bucket field. The configuration is as follows:

SET hive.optimize.bucketmapjoin=true;
SET hive.optimize.bucketmapjoin.sortedmerge=true;

Skew join

When the distribution of processed data is extremely uneven, it will cause the phenomenon of data skew. Hive can optimize data skew through the following configurations:

-- default false,If the data is skewed, you can set it to true
SET hive.optimize.skewjoin=true;
-- The default is 100000, if key If the quantity of is greater than the configured value, the quantity of exceeds key The corresponding data will be sent to other reduce task
SET hive.skewjoin.key=100000;

Scream tips:

Data skew also occurs in the case of group by, so you can start a configuration: set hive groupby. Skewindata = true, optimize the data skew in group by. Once it is enabled, an additional mr job will be triggered when executing the job, and the output of the map task of the job will be randomly assigned to the reduce task, so as to avoid data skew

Execution engine

Hive supports a variety of execution engines, such as spark and tez. The selection of execution engine will affect the overall query performance. The configuration used is as follows:

SET hive.execution.engine=<engine>; -- <engine> = mr|tez|spark
  • mr: the default execution engine is hive2 The 0 version is marked obsolete

  • tez: you can convert multiple dependent jobs into one job, so you only need to write HDFS once, and there are fewer intermediate nodes, which greatly improves the computing performance of jobs.

  • spark: a general big data computing framework, based on memory computing, with fast speed


Similar to relational databases, hive generates and optimizes logical and physical execution plans before actually performing calculations. Hive has two kinds of optimizers: vector optimizer and cost based optimization (CBO).

Vector optimizer

The vectorization optimizer will process large quantities of data at the same time, rather than processing them line by line. To use this vectorization operation, the file format of the table is ORC, and the configuration is as follows:

SET hive.vectorized.execution.enabled=true; -- default false

Cost optimizer

Hive's CBO is based on Apache calculate. Hive's CBO will generate an efficient execution plan by querying the cost (with the statistical information collected by analyze), which will eventually reduce the execution time and resource utilization. The configuration of using CBO is as follows:

SET hive.cbo.enable=true; --from v0.14.0 default true
SET hive.compute.query.using.stats=true; -- default false
SET hive.stats.fetch.column.stats=true; -- default false
SET hive.stats.fetch.partition.stats=true; -- default true


This paper mainly introduces the basic idea of Hive tuning. It is divided into four parts. Firstly, it introduces the basic tools of tuning (explain and analyze); Then it introduces some optimization strategies (partition, bucket and index) from the level of table design; Then it introduces the optimization of data storage (file format, compression, storage optimization); Finally, it introduces the optimization skills from the job level (enabling local mode, JVM reuse, parallel execution, fetch mode, Join optimization, execution engine and optimizer). This paper mainly provides some ideas for Hive performance tuning. Specific problems need to be analyzed in the actual operation process. In a word, the heavy sword has no edge. Allocating reasonable resources for the operation can basically meet most situations. The best is the one suitable. There is no need to pursue crazy and cool skills. We should focus more on business problems, because the value of tools is to solve business problems, and we must not put the cart before the horse.

Tags: Big Data hive

Posted by Greaser9780 on Wed, 11 May 2022 06:27:37 +0300