Skip to content

Commit

Permalink
HIVE-10343:CBO (Calcite Return Path): Parameterize algorithm cost mod…
Browse files Browse the repository at this point in the history
…el(Laljo John Pullokkaran)

git-svn-id: https://svn.apache.org/repos/asf/hive/branches/cbo@1673948 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
jpullokkaran committed Apr 15, 2015
1 parent 6bf6bbe commit 6cf1c37
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 46 deletions.
16 changes: 14 additions & 2 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -704,8 +704,20 @@ public static enum ConfVars {
// CBO related
HIVE_CBO_ENABLED("hive.cbo.enable", true, "Flag to control enabling Cost Based Optimizations using Calcite framework."),
HIVE_CBO_RETPATH_HIVEOP("hive.cbo.returnpath.hiveop", false, "Flag to control calcite plan to hive operator conversion"),
EXTENDED_COST_MODEL("hive.cbo.costmodel.extended", false, "Flag to control enabling the extended cost model based on"
+ "CPU, IO and cardinality. Otherwise, the cost model is based on cardinality."),
HIVE_CBO_EXTENDED_COST_MODEL("hive.cbo.costmodel.extended", false, "Flag to control enabling the extended cost model based on"
+ "CPU, IO and cardinality. Otherwise, the cost model is based on cardinality."),
HIVE_CBO_COST_MODEL_CPU("hive.cbo.costmodel.cpu", "0.000001", "Default cost of a comparison"),
HIVE_CBO_COST_MODEL_NET("hive.cbo.costmodel.network", "150.0", "Default cost of a transfering a byte over network;"
+ " expressed as multiple of CPU cost"),
HIVE_CBO_COST_MODEL_LFS_WRITE("hive.cbo.costmodel.local.fs.write", "4.0", "Default cost of writing a byte to local FS;"
+ " expressed as multiple of NETWORK cost"),
HIVE_CBO_COST_MODEL_LFS_READ("hive.cbo.costmodel.local.fs.read", "4.0", "Default cost of reading a byte from local FS;"
+ " expressed as multiple of NETWORK cost"),
HIVE_CBO_COST_MODEL_HDFS_WRITE("hive.cbo.costmodel.hdfs.write", "10.0", "Default cost of writing a byte to HDFS;"
+ " expressed as multiple of Local FS write cost"),
HIVE_CBO_COST_MODEL_HDFS_READ("hive.cbo.costmodel.hdfs.read", "1.5", "Default cost of reading a byte from HDFS;"
+ " expressed as multiple of Local FS read cost"),


// hive.mapjoin.bucket.cache.size has been replaced by hive.smbjoin.cache.row,
// need to remove by hive .13. Also, do not change default (see SMB operator)
Expand Down
1 change: 1 addition & 0 deletions metastore/bin/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
/scripts/
/src/
1 change: 1 addition & 0 deletions ql/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
dependency-reduced-pom.xml
/bin/
/target/
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ public RelMetadataProvider getMetadataProvider() {
// Create cost metadata provider
final HiveCostModel cm;
if (HiveConf.getVar(this.hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")
&& HiveConf.getBoolVar(this.hiveConf, HiveConf.ConfVars.EXTENDED_COST_MODEL)) {
cm = HiveOnTezCostModel.INSTANCE;
&& HiveConf.getBoolVar(this.hiveConf, HiveConf.ConfVars.HIVE_CBO_EXTENDED_COST_MODEL)) {
cm = HiveOnTezCostModel.getCostModel(hiveConf);
} else {
cm = HiveDefaultCostModel.INSTANCE;
cm = HiveDefaultCostModel.getCostModel();
}

// Get max split size for HiveRelMdParallelism
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Pair;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo;
import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo;
import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelCollation;
Expand All @@ -39,23 +40,37 @@

public class HiveAlgorithmsUtil {

private static final double CPU_COST = 1.0;
private static final double NET_COST = 150.0 * CPU_COST;
private static final double LOCAL_WRITE_COST = 4.0 * NET_COST;
private static final double LOCAL_READ_COST = 4.0 * NET_COST;
private static final double HDFS_WRITE_COST = 10.0 * LOCAL_WRITE_COST;
private static final double HDFS_READ_COST = 1.5 * LOCAL_READ_COST;
private final double cpuCost;
private final double netCost;
private final double localFSWrite;
private final double localFSRead;
private final double hdfsWrite;
private final double hdfsRead;

HiveAlgorithmsUtil(HiveConf conf) {
cpuCost = Double.valueOf(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_CBO_COST_MODEL_CPU));
netCost = cpuCost
* Double.valueOf(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_CBO_COST_MODEL_NET));
localFSWrite = netCost
* Double.valueOf(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_CBO_COST_MODEL_LFS_WRITE));
localFSRead = netCost
* Double.valueOf(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_CBO_COST_MODEL_LFS_READ));
hdfsWrite = localFSWrite
* Double.valueOf(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_CBO_COST_MODEL_HDFS_WRITE));
hdfsRead = localFSRead
* Double.valueOf(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_CBO_COST_MODEL_HDFS_READ));
}

public static RelOptCost computeCardinalityBasedCost(HiveRelNode hr) {
return new HiveCost(hr.getRows(), 0, 0);
}

public static HiveCost computeCost(HiveTableScan t) {
public HiveCost computeCost(HiveTableScan t) {
double cardinality = t.getRows();
return new HiveCost(cardinality, 0, HDFS_WRITE_COST * cardinality * 0);
return new HiveCost(cardinality, 0, hdfsWrite * cardinality * 0);
}

public static double computeSortMergeCPUCost(
public double computeSortMergeCPUCost(
ImmutableList<Double> cardinalities,
ImmutableBitSet sorted) {
// Sort-merge join
Expand All @@ -67,16 +82,16 @@ public static double computeSortMergeCPUCost(
cpuCost += computeSortCPUCost(cardinality);
}
// Merge cost
cpuCost += cardinality * CPU_COST;
cpuCost += cardinality * cpuCost;
}
return cpuCost;
}

public static double computeSortCPUCost(Double cardinality) {
return cardinality * Math.log(cardinality) * CPU_COST;
public double computeSortCPUCost(Double cardinality) {
return cardinality * Math.log(cardinality) * cpuCost;
}

public static double computeSortMergeIOCost(
public double computeSortMergeIOCost(
ImmutableList<Pair<Double, Double>> relationInfos) {
// Sort-merge join
double ioCost = 0.0;
Expand All @@ -86,17 +101,17 @@ public static double computeSortMergeIOCost(
return ioCost;
}

public static double computeSortIOCost(Pair<Double, Double> relationInfo) {
public double computeSortIOCost(Pair<Double, Double> relationInfo) {
// Sort-merge join
double ioCost = 0.0;
double cardinality = relationInfo.left;
double averageTupleSize = relationInfo.right;
// Write cost
ioCost += cardinality * averageTupleSize * LOCAL_WRITE_COST;
ioCost += cardinality * averageTupleSize * localFSWrite;
// Read cost
ioCost += cardinality * averageTupleSize * LOCAL_READ_COST;
ioCost += cardinality * averageTupleSize * localFSRead;
// Net transfer cost
ioCost += cardinality * averageTupleSize * NET_COST;
ioCost += cardinality * averageTupleSize * netCost;
return ioCost;
}

Expand All @@ -110,12 +125,12 @@ public static double computeMapJoinCPUCost(
if (!streaming.get(i)) {
cpuCost += cardinality;
}
cpuCost += cardinality * CPU_COST;
cpuCost += cardinality * cpuCost;
}
return cpuCost;
}

public static double computeMapJoinIOCost(
public double computeMapJoinIOCost(
ImmutableList<Pair<Double, Double>> relationInfos,
ImmutableBitSet streaming, int parallelism) {
// Hash-join
Expand All @@ -124,28 +139,28 @@ public static double computeMapJoinIOCost(
double cardinality = relationInfos.get(i).left;
double averageTupleSize = relationInfos.get(i).right;
if (!streaming.get(i)) {
ioCost += cardinality * averageTupleSize * NET_COST * parallelism;
ioCost += cardinality * averageTupleSize * netCost * parallelism;
}
}
return ioCost;
}

public static double computeBucketMapJoinCPUCost(
public double computeBucketMapJoinCPUCost(
ImmutableList<Double> cardinalities,
ImmutableBitSet streaming) {
// Hash-join
double cpuCost = 0.0;
for (int i=0; i<cardinalities.size(); i++) {
double cardinality = cardinalities.get(i);
if (!streaming.get(i)) {
cpuCost += cardinality * CPU_COST;
cpuCost += cardinality * cpuCost;
}
cpuCost += cardinality * CPU_COST;
cpuCost += cardinality * cpuCost;
}
return cpuCost;
}

public static double computeBucketMapJoinIOCost(
public double computeBucketMapJoinIOCost(
ImmutableList<Pair<Double, Double>> relationInfos,
ImmutableBitSet streaming, int parallelism) {
// Hash-join
Expand All @@ -154,7 +169,7 @@ public static double computeBucketMapJoinIOCost(
double cardinality = relationInfos.get(i).left;
double averageTupleSize = relationInfos.get(i).right;
if (!streaming.get(i)) {
ioCost += cardinality * averageTupleSize * NET_COST * parallelism;
ioCost += cardinality * averageTupleSize * netCost * parallelism;
}
}
return ioCost;
Expand All @@ -165,12 +180,12 @@ public static double computeSMBMapJoinCPUCost(
// Hash-join
double cpuCost = 0.0;
for (int i=0; i<cardinalities.size(); i++) {
cpuCost += cardinalities.get(i) * CPU_COST;
cpuCost += cardinalities.get(i) * cpuCost;
}
return cpuCost;
}

public static double computeSMBMapJoinIOCost(
public double computeSMBMapJoinIOCost(
ImmutableList<Pair<Double, Double>> relationInfos,
ImmutableBitSet streaming, int parallelism) {
// Hash-join
Expand All @@ -179,7 +194,7 @@ public static double computeSMBMapJoinIOCost(
double cardinality = relationInfos.get(i).left;
double averageTupleSize = relationInfos.get(i).right;
if (!streaming.get(i)) {
ioCost += cardinality * averageTupleSize * NET_COST * parallelism;
ioCost += cardinality * averageTupleSize * netCost * parallelism;
}
}
return ioCost;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;

Expand All @@ -33,8 +34,15 @@
*/
public class HiveDefaultCostModel extends HiveCostModel {

public static final HiveDefaultCostModel INSTANCE =
new HiveDefaultCostModel();
private static HiveDefaultCostModel INSTANCE;

synchronized public static HiveDefaultCostModel getCostModel() {
if (INSTANCE == null) {
INSTANCE = new HiveDefaultCostModel();
}

return INSTANCE;
}

private HiveDefaultCostModel() {
super(Sets.newHashSet(DefaultJoinAlgorithm.INSTANCE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.calcite.util.Pair;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo;
import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
Expand All @@ -42,15 +43,26 @@
*/
public class HiveOnTezCostModel extends HiveCostModel {

public static final HiveOnTezCostModel INSTANCE =
new HiveOnTezCostModel();
private static HiveOnTezCostModel INSTANCE;

private HiveOnTezCostModel() {
private static HiveAlgorithmsUtil algoUtils;

synchronized public static HiveOnTezCostModel getCostModel(HiveConf conf) {
if (INSTANCE == null) {
INSTANCE = new HiveOnTezCostModel(conf);
}

return INSTANCE;
}

private HiveOnTezCostModel(HiveConf conf) {
super(Sets.newHashSet(
TezCommonJoinAlgorithm.INSTANCE,
TezMapJoinAlgorithm.INSTANCE,
TezBucketJoinAlgorithm.INSTANCE,
TezSMBJoinAlgorithm.INSTANCE));

algoUtils = new HiveAlgorithmsUtil(conf);
}

@Override
Expand All @@ -69,15 +81,15 @@ public RelOptCost getAggregateCost(HiveAggregate aggregate) {
return null;
}
// 2. CPU cost = sorting cost
final double cpuCost = HiveAlgorithmsUtil.computeSortCPUCost(rCount);
final double cpuCost = algoUtils.computeSortCPUCost(rCount);
// 3. IO cost = cost of writing intermediary results to local FS +
// cost of reading from local FS for transferring to GBy +
// cost of transferring map outputs to GBy operator
final Double rAverageSize = RelMetadataQuery.getAverageRowSize(aggregate.getInput());
if (rAverageSize == null) {
return null;
}
final double ioCost = HiveAlgorithmsUtil.computeSortIOCost(new Pair<Double,Double>(rCount,rAverageSize));
final double ioCost = algoUtils.computeSortIOCost(new Pair<Double,Double>(rCount,rAverageSize));
// 4. Result
return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost);
}
Expand Down Expand Up @@ -118,7 +130,7 @@ public RelOptCost getCost(HiveJoin join) {
add(leftRCount).
add(rightRCount).
build();
final double cpuCost = HiveAlgorithmsUtil.computeSortMergeCPUCost(cardinalities, join.getSortedInputs());
final double cpuCost = algoUtils.computeSortMergeCPUCost(cardinalities, join.getSortedInputs());
// 3. IO cost = cost of writing intermediary results to local FS +
// cost of reading from local FS for transferring to join +
// cost of transferring map outputs to Join operator
Expand All @@ -131,7 +143,7 @@ public RelOptCost getCost(HiveJoin join) {
add(new Pair<Double,Double>(leftRCount,leftRAverageSize)).
add(new Pair<Double,Double>(rightRCount,rightRAverageSize)).
build();
final double ioCost = HiveAlgorithmsUtil.computeSortMergeIOCost(relationInfos);
final double ioCost = algoUtils.computeSortMergeIOCost(relationInfos);
// 4. Result
return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost);
}
Expand Down Expand Up @@ -242,7 +254,7 @@ public RelOptCost getCost(HiveJoin join) {
build();
final int parallelism = RelMetadataQuery.splitCount(join) == null
? 1 : RelMetadataQuery.splitCount(join);
final double ioCost = HiveAlgorithmsUtil.computeMapJoinIOCost(relationInfos, streaming, parallelism);
final double ioCost = algoUtils.computeMapJoinIOCost(relationInfos, streaming, parallelism);
// 4. Result
return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost);
}
Expand Down Expand Up @@ -382,7 +394,7 @@ public RelOptCost getCost(HiveJoin join) {
return null;
}
ImmutableBitSet streaming = streamingBuilder.build();
final double cpuCost = HiveAlgorithmsUtil.computeBucketMapJoinCPUCost(cardinalities, streaming);
final double cpuCost = algoUtils.computeBucketMapJoinCPUCost(cardinalities, streaming);
// 3. IO cost = cost of transferring small tables to join node *
// degree of parallelism
final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(join.getLeft());
Expand All @@ -396,7 +408,7 @@ public RelOptCost getCost(HiveJoin join) {
build();
final int parallelism = RelMetadataQuery.splitCount(join) == null
? 1 : RelMetadataQuery.splitCount(join);
final double ioCost = HiveAlgorithmsUtil.computeBucketMapJoinIOCost(relationInfos, streaming, parallelism);
final double ioCost = algoUtils.computeBucketMapJoinIOCost(relationInfos, streaming, parallelism);
// 4. Result
return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost);
}
Expand Down Expand Up @@ -540,7 +552,7 @@ public RelOptCost getCost(HiveJoin join) {
build();
final int parallelism = RelMetadataQuery.splitCount(join) == null
? 1 : RelMetadataQuery.splitCount(join);
final double ioCost = HiveAlgorithmsUtil.computeSMBMapJoinIOCost(relationInfos, streaming, parallelism);
final double ioCost = algoUtils.computeSMBMapJoinIOCost(relationInfos, streaming, parallelism);
// 4. Result
return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost);
}
Expand Down

0 comments on commit 6cf1c37

Please sign in to comment.