Query optimizations in a relational query engine can be broadly classified as logical query optimizations and physical query optimizations. Logical query optimizations generally refer to query optimizations that can be derived based on relational algebra independent (Calcite) of the physical layer in which query is executed. Physical query optimizations are query optimizations that are cognizant of physical layer primitives. For Hive, the physical layer implies Map-Reduce and Tez primitives.
Currently logical query optimizations in Hive can be broadly categorized as follows:
- Projection Pruning
- Deducing Transitive Predicates
- Predicate Push down
- Merging of Select-Select, Filter-Filter in to single operator
- Multi-way Join
- Query Rewrite to accommodate for Join skew on some column values
Physical optimizations in Hive can be broadly classified as follows:
- Partition Pruning
- Scan pruning based on partitions and bucketing
- Scan pruning if query is based on sampling
- Apply Group By on the map side in some cases
- Perform Join on the Mapper
- Optimize Union so that union can be performed on map side only
- Decide which table to stream last, based on user hint, in a multi way join
- Remove unnecessary reduce sink operators
- For queries with limit clause, reduce the number of files that needs to be scanned for the table.
- For queries with Limit clause, restrict the output coming from mapper by restricting what Reduce Sink operator generates.
- Reduce the number of Tez jobs needed for answering user submitted SQL query
- Avoid Map-Reduce jobs in case of simple fetch query
- For simple fetch queries with aggregation, perform aggregation without Map-Reduce tasks
- Rewrite Group By Query to use index table instead of original table
- Use Index scans when predicates above table scan is equality predicates and columns in predicate have indexes on it.
Cost Based Optimization:
History:
Calcite is an open source, Apache Licensed, query planning and execution framework. Relational algebra is at the heart of Calcite. Every query is represented as a tree of relational operators. You can translate from SQL to relational algebra, or you can build the tree directly.
Planner rules transform expression trees using mathematical identities that preserve semantics. For example, it is valid to push a filter into an input of an inner join if the filter does not reference columns from the other input.
Calcite optimizes queries by repeatedly applying planner rules to a relational expression. A cost model guides the process, and the planner engine generates an alternative expression that has the same semantics as the original but a lower cost.
The planning process is extensible. You can add your own relational operators, planner rules, cost model, and statistics.
Enabling the CBO:
The CBO is enabled by default in Hive 0.14 and later. If you need to enable it manually, set the following property in hive-site.xml:
SET hive.cbo.enable=true;
For the physical optimizer, set the following properties in hive-site.xml to generate statistics:
SET hive.stats.fetch.column.stats=true;
SET hive.stats.fetch.partition.stats=true;
Gathering Statistics--Critical to the CBO:
The CBO requires both table-level and column-level statistics to generate efficient execution plans by examining the tables and conditions specified in the query, ultimately cutting down on query execution time and reducing resource utilization.
Table-level statistics:
Table-level statistics should always be collected. Make sure the following property is set as follows in hive-site.xml to collect table-level statistics:
SET hive.stats.autogather=true;
- This is applicable for newly created tables and/or partitions (that are populated through the INSERT OVERWRITE command), statistics are automatically computed by default.
- This would add to the execution time of the queries along with Reducers tasks being created.
If you have an existing table that does not have statistics collected, you can collect them by running the following query:
ANALYZE TABLE <table_name> COMPUTE STATISTICS;
Column-level statistics (critical):
Column-level statistics are expensive to compute and are not yet automated. The recommended process to use for Hive 0.14 and later is to compute column statistics for all of your existing tables using the following command:
ANALYZE TABLE <table_name> COMPUTE STATISTICS for COLUMNS;
As new partitions are added to the table, if the table is partitioned on "col1" and the new partition has the key "x," then you must also use the following command:
ANALYZE TABLE <table_name> partition (coll="x") COMPUTE STATISTICS for COLUMNS;
Please note: Analyze statements are recommended to run when the cluster is idle (no production load). This query runs are heavy and would acquire most resources on YARN and locks on the Hive tables.
Quick Overview:
Description |
Stored in |
Collected by |
Since |
Number of partition the dataset consists of |
Fictional metastore property: numPartitions |
computed during displaying the properties of a partitioned table |
|
Number of files the dataset consists of |
Metastore table property: numFiles |
Automatically during Metastore operations |
|
Total size of the dataset as its seen at the filesystem level |
Metastore table property: totalSize |
|
|
Uncompressed size of the dataset |
Metastore table property: rawDataSize |
Computed, these are the basic statistics. Calculated automatically when hive.stats.autogather is enabled. |
|
Number of rows the dataset consist of |
Metastore table property: numRows |
|
|
Column level statistics |
Metastore; TAB_COL_STATS table |
Computed, Calculated automatically when hive.stats.column.autogather is enabled. |
|
Validate the CBO with explain plans:
Hive provides an EXPLAIN command that shows the execution plan for a query.
Example:
EXPLAIN select sum(hash(key)), sum(hash(value)) from src_orc_merge_test_part where ds='2012-01-03' and ts='2012-01-03+14:46:31'
Output:
Plan optimized by CBO.
Vertex dependency in root stage
Reducer 2 <- Map 1 (SIMPLE_EDGE)
Stage-0
Fetch Operator
limit:-1
Stage-1
Reducer 2
File Output Operator [FS_8]
compressed:false
Statistics:Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE
table:{"serde:":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe","input format:":"org.apache.hadoop.mapred.TextInputFormat","output format:":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"}
Group By Operator [GBY_6]
| aggregations:["sum(VALUE._col0)","sum(VALUE._col1)"]
| outputColumnNames:["_col0","_col1"]
| Statistics:Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE
…
Upcoming Hive 4.1 feature wrt Hive explain and CBO clause:
The CBO clause outputs the plan generated by Calcite optimizer. It can optionally include information about the cost of the plan using Calcite default cost model and cost model used for join reordering.
Available from: Hive release 4.0.0 (HIVE-17503 / HIVE-21184).
EXPLAIN [FORMATTED] CBO [COST|JOINCOST]
COST option prints the plan and cost calculated using Calcite default cost model.
JOINCOST option prints the plan and cost calculated using the cost model used for join reordering.
Sample output:
EXPLAIN CBO COST
WITH customer_total_return AS
(SELECT sr_customer_sk AS ctr_customer_sk,
sr_store_sk AS ctr_store_sk,
SUM(SR_FEE) AS ctr_total_return
FROM store_returns, date_dim
WHERE sr_returned_date_sk = d_date_sk
AND d_year =2000
GROUP BY sr_customer_sk, sr_store_sk)
SELECT c_customer_id
FROM customer_total_return ctr1, store, customer
WHERE ctr1.ctr_total_return > (SELECT AVG(ctr_total_return)*1.2
FROM customer_total_return ctr2
WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk)
AND s_store_sk = ctr1.ctr_store_sk
AND s_state = 'NM'
AND ctr1.ctr_customer_sk = c_customer_sk
ORDER BY c_customer_id
LIMIT 100
It will produce a similar plan, but the cost for each operator will be embedded next to the operator descriptors:
CBO PLAN:
HiveSortLimit(sort0=[$0], dir0=[ASC], fetch=[100]): rowcount = 100.0, cumulative cost = {2.395588892021712E26 rows, 1.197794434438787E26 cpu, 0.0 io}, id = 1683
HiveProject(c_customer_id=[$1]): rowcount = 1.1977944344387866E26, cumulative cost = {2.395588892021712E26 rows, 1.197794434438787E26 cpu, 0.0 io}, id = 1681
HiveJoin(condition=[AND(=($3, $7), >($4, $6))], joinType=[inner], algorithm=[none], cost=[not available]): rowcount = 1.1977944344387866E26, cumulative cost = {1.1977944575829254E26 rows, 4.160211553874922E10 cpu, 0.0 io}, id = 1679
HiveJoin(condition=[=($2, $0)], joinType=[inner], algorithm=[none], cost=[not available]): rowcount = 2.3144135067474273E18, cumulative cost = {2.3144137967122499E18 rows, 1.921860676139634E10 cpu, 0.0 io}, id = 1663
HiveProject(c_customer_sk=[$0], c_customer_id=[$1]): rowcount = 7.2E7, cumulative cost = {2.24E8 rows, 3.04000001E8 cpu, 0.0 io}, id = 1640
HiveFilter(condition=[IS NOT NULL($0)]): rowcount = 7.2E7, cumulative cost = {1.52E8 rows, 1.60000001E8 cpu, 0.0 io}, id = 1638
HiveTableScan(table=[[default, customer]], table:alias=[customer]): rowcount = 8.0E7, cumulative cost = {8.0E7 rows, 8.0000001E7 cpu, 0.0 io}, id = 1055
HiveJoin(condition=[=($3, $1)], joinType=[inner], algorithm=[none], cost=[not available]): rowcount = 2.1429754692105807E11, cumulative cost = {2.897408225471977E11 rows, 1.891460676039634E10 cpu, 0.0 io}, id = 1661
HiveProject(sr_customer_sk=[$0], sr_store_sk=[$1], $f2=[$2]): rowcount = 6.210443022113779E9, cumulative cost = {7.544327346205959E10 rows, 1.891460312135634E10 cpu, 0.0 io}, id = 1685
HiveAggregate(group=[{1, 2}], agg#0=[sum($3)]): rowcount = 6.210443022113779E9, cumulative cost = {6.92328304399458E10 rows, 2.8327405501500005E8 cpu, 0.0 io}, id = 1654
HiveJoin(condition=[=($0, $4)], joinType=[inner], algorithm=[none], cost=[not available]): rowcount = 6.2104430221137794E10, cumulative cost = {6.2246082040067795E10 rows, 2.8327405501500005E8 cpu, 0.0 io}, id = 1652
HiveProject(sr_returned_date_sk=[$0], sr_customer_sk=[$3], sr_store_sk=[$7], sr_fee=[$14]): rowcount = 4.198394835000001E7, cumulative cost = {1.4155904670000002E8 rows, 2.8311809440000004E8 cpu, 0.0 io}, id = 1645
HiveFilter(condition=[AND(IS NOT NULL($0), IS NOT NULL($7), IS NOT NULL($3))]): rowcount = 4.198394835000001E7, cumulative cost = {9.957509835000001E7 rows, 1.15182301E8 cpu, 0.0 io}, id = 1643
HiveTableScan(table=[[default, store_returns]], table:alias=[store_returns]): rowcount = 5.759115E7, cumulative cost = {5.759115E7 rows, 5.7591151E7 cpu, 0.0 io}, id = 1040
HiveProject(d_date_sk=[$0]): rowcount = 9861.615, cumulative cost = {92772.23000000001 rows, 155960.615 cpu, 0.0 io}, id = 1650
HiveFilter(condition=[AND(=($6, 2000), IS NOT NULL($0))]): rowcount = 9861.615, cumulative cost = {82910.615 rows, 146099.0 cpu, 0.0 io}, id = 1648
HiveTableScan(table=[[default, date_dim]], table:alias=[date_dim]): rowcount = 73049.0, cumulative cost = {73049.0 rows, 73050.0 cpu, 0.0 io}, id = 1043
HiveProject(s_store_sk=[$0]): rowcount = 230.04000000000002, cumulative cost = {2164.08 rows, 3639.04 cpu, 0.0 io}, id = 1659
HiveFilter(condition=[AND(=($24, _UTF-16LE'NM'), IS NOT NULL($0))]): rowcount = 230.04000000000002, cumulative cost = {1934.04 rows, 3409.0 cpu, 0.0 io}, id = 1657
HiveTableScan(table=[[default, store]], table:alias=[store]): rowcount = 1704.0, cumulative cost = {1704.0 rows, 1705.0 cpu, 0.0 io}, id = 1050
HiveProject(_o__c0=[*(/($1, $2), 1.2)], ctr_store_sk=[$0]): rowcount = 6.900492246793088E8, cumulative cost = {8.537206083312463E10 rows, 2.2383508777352882E10 cpu, 0.0 io}, id = 1677
HiveAggregate(group=[{1}], agg#0=[sum($2)], agg#1=[count($2)]): rowcount = 6.900492246793088E8, cumulative cost = {8.468201160844533E10 rows, 2.1003410327994267E10 cpu, 0.0 io}, id = 1675
HiveProject(sr_customer_sk=[$0], sr_store_sk=[$1], $f2=[$2]): rowcount = 6.900492246793088E9, cumulative cost = {8.381945007759619E10 rows, 2.1003410327994267E10 cpu, 0.0 io}, id = 1686
HiveAggregate(group=[{1, 2}], agg#0=[sum($3)]): rowcount = 6.900492246793088E9, cumulative cost = {7.69189578308031E10 rows, 3.01933587615E8 cpu, 0.0 io}, id = 1673
HiveJoin(condition=[=($0, $4)], joinType=[inner], algorithm=[none], cost=[not available]): rowcount = 6.900492246793088E10, cumulative cost = {6.915590405316087E10 rows, 3.01933587615E8 cpu, 0.0 io}, id = 1671
HiveProject(sr_returned_date_sk=[$0], sr_customer_sk=[$3], sr_store_sk=[$7], sr_fee=[$14]): rowcount = 4.66488315E7, cumulative cost = {1.50888813E8 rows, 3.01777627E8 cpu, 0.0 io}, id = 1667
HiveFilter(condition=[AND(IS NOT NULL($0), IS NOT NULL($7))]): rowcount = 4.66488315E7, cumulative cost = {1.042399815E8 rows, 1.15182301E8 cpu, 0.0 io}, id = 1665
HiveTableScan(table=[[default, store_returns]], table:alias=[store_returns]): rowcount = 5.759115E7, cumulative cost = {5.759115E7 rows, 5.7591151E7 cpu, 0.0 io}, id = 1040
HiveProject(d_date_sk=[$0]): rowcount = 9861.615, cumulative cost = {92772.23000000001 rows, 155960.615 cpu, 0.0 io}, id = 1650
HiveFilter(condition=[AND(=($6, 2000), IS NOT NULL($0))]): rowcount = 9861.615, cumulative cost = {82910.615 rows, 146099.0 cpu, 0.0 io}, id = 1648
HiveTableScan(table=[[default, date_dim]], table:alias=[date_dim]): rowcount = 73049.0, cumulative cost = {73049.0 rows, 73050.0 cpu, 0.0 io}, id = 1043
References:
https://cwiki.apache.org/confluence/display/Hive/Cost-based+optimization+in+Hive
https://cwiki.apache.org/confluence/display/hive/statsdev
Posted at https://sl.advdat.com/3ow1ynV