From 6cf1c37228f84dada7b0a98d59badd837f53d05a Mon Sep 17 00:00:00 2001
From: John Pullokkaran <jpullokk@apache.org>
Date: Wed, 15 Apr 2015 20:18:00 +0000
Subject: [PATCH] HIVE-10343:CBO (Calcite Return Path): Parameterize algorithm
 cost model(Laljo John Pullokkaran)

git-svn-id: https://svn.apache.org/repos/asf/hive/branches/cbo@1673948 13f79535-47bb-0310-9956-ffa450edef68
---
 .../org/apache/hadoop/hive/conf/HiveConf.java | 16 ++++-
 metastore/bin/.gitignore                      |  1 +
 ql/.gitignore                                 |  1 +
 .../HiveDefaultRelMetadataProvider.java       |  6 +-
 .../calcite/cost/HiveAlgorithmsUtil.java      | 71 +++++++++++--------
 .../calcite/cost/HiveDefaultCostModel.java    | 12 +++-
 .../calcite/cost/HiveOnTezCostModel.java      | 34 ++++++---
 7 files changed, 95 insertions(+), 46 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 51dd430355a6..39018f2e1325 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -704,8 +704,20 @@ public static enum ConfVars {
     // CBO related
     HIVE_CBO_ENABLED("hive.cbo.enable", true, "Flag to control enabling Cost Based Optimizations using Calcite framework."),
     HIVE_CBO_RETPATH_HIVEOP("hive.cbo.returnpath.hiveop", false, "Flag to control calcite plan to hive operator conversion"),
-    EXTENDED_COST_MODEL("hive.cbo.costmodel.extended", false, "Flag to control enabling the extended cost model based on"
-            + "CPU, IO and cardinality. Otherwise, the cost model is based on cardinality."),
+    HIVE_CBO_EXTENDED_COST_MODEL("hive.cbo.costmodel.extended", false, "Flag to control enabling the extended cost model based on"
+                                 + "CPU, IO and cardinality. Otherwise, the cost model is based on cardinality."),
+    HIVE_CBO_COST_MODEL_CPU("hive.cbo.costmodel.cpu", "0.000001", "Default cost of a comparison"),
+    HIVE_CBO_COST_MODEL_NET("hive.cbo.costmodel.network", "150.0", "Default cost of a transfering a byte over network;"
+                                                                  + " expressed as multiple of CPU cost"),
+    HIVE_CBO_COST_MODEL_LFS_WRITE("hive.cbo.costmodel.local.fs.write", "4.0", "Default cost of writing a byte to local FS;"
+                                                                             + " expressed as multiple of NETWORK cost"),
+    HIVE_CBO_COST_MODEL_LFS_READ("hive.cbo.costmodel.local.fs.read", "4.0", "Default cost of reading a byte from local FS;"
+                                                                           + " expressed as multiple of NETWORK cost"),
+    HIVE_CBO_COST_MODEL_HDFS_WRITE("hive.cbo.costmodel.hdfs.write", "10.0", "Default cost of writing a byte to HDFS;"
+                                                                 + " expressed as multiple of Local FS write cost"),
+    HIVE_CBO_COST_MODEL_HDFS_READ("hive.cbo.costmodel.hdfs.read", "1.5", "Default cost of reading a byte from HDFS;"
+                                                                 + " expressed as multiple of Local FS read cost"),
+
 
     // hive.mapjoin.bucket.cache.size has been replaced by hive.smbjoin.cache.row,
     // need to remove by hive .13. Also, do not change default (see SMB operator)
diff --git a/metastore/bin/.gitignore b/metastore/bin/.gitignore
index 4f00cd9a7367..0e4bba676593 100644
--- a/metastore/bin/.gitignore
+++ b/metastore/bin/.gitignore
@@ -1 +1,2 @@
+/scripts/
 /src/
diff --git a/ql/.gitignore b/ql/.gitignore
index 3285bd9a14ed..877376276dbc 100644
--- a/ql/.gitignore
+++ b/ql/.gitignore
@@ -1,2 +1,3 @@
 dependency-reduced-pom.xml
 /bin/
+/target/
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java
index 0648df850b1b..748c86d94129 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java
@@ -51,10 +51,10 @@ public RelMetadataProvider getMetadataProvider() {
     // Create cost metadata provider
     final HiveCostModel cm;
     if (HiveConf.getVar(this.hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")
-            && HiveConf.getBoolVar(this.hiveConf, HiveConf.ConfVars.EXTENDED_COST_MODEL)) {
-      cm = HiveOnTezCostModel.INSTANCE;
+            && HiveConf.getBoolVar(this.hiveConf, HiveConf.ConfVars.HIVE_CBO_EXTENDED_COST_MODEL)) {
+      cm = HiveOnTezCostModel.getCostModel(hiveConf);
     } else {
-      cm = HiveDefaultCostModel.INSTANCE;
+      cm = HiveDefaultCostModel.getCostModel();
     }
 
     // Get max split size for HiveRelMdParallelism
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveAlgorithmsUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveAlgorithmsUtil.java
index 8b2d3e662c70..1bf41c309533 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveAlgorithmsUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveAlgorithmsUtil.java
@@ -26,6 +26,7 @@
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Pair;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo;
 import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo;
 import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelCollation;
@@ -39,23 +40,37 @@
 
 public class HiveAlgorithmsUtil {
 
-  private static final double CPU_COST         = 1.0;
-  private static final double NET_COST         = 150.0 * CPU_COST;
-  private static final double LOCAL_WRITE_COST = 4.0 * NET_COST;
-  private static final double LOCAL_READ_COST  = 4.0 * NET_COST;
-  private static final double HDFS_WRITE_COST  = 10.0 * LOCAL_WRITE_COST;
-  private static final double HDFS_READ_COST   = 1.5 * LOCAL_READ_COST;
+  private final double cpuCost;
+  private final double netCost;
+  private final double localFSWrite;
+  private final double localFSRead;
+  private final double hdfsWrite;
+  private final double hdfsRead;
+
+  HiveAlgorithmsUtil(HiveConf conf) {
+    cpuCost = Double.valueOf(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_CBO_COST_MODEL_CPU));
+    netCost = cpuCost
+        * Double.valueOf(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_CBO_COST_MODEL_NET));
+    localFSWrite = netCost
+        * Double.valueOf(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_CBO_COST_MODEL_LFS_WRITE));
+    localFSRead = netCost
+        * Double.valueOf(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_CBO_COST_MODEL_LFS_READ));
+    hdfsWrite = localFSWrite
+        * Double.valueOf(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_CBO_COST_MODEL_HDFS_WRITE));
+    hdfsRead = localFSRead
+        * Double.valueOf(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_CBO_COST_MODEL_HDFS_READ));
+  }
 
   public static RelOptCost computeCardinalityBasedCost(HiveRelNode hr) {
     return new HiveCost(hr.getRows(), 0, 0);
   }
 
-  public static HiveCost computeCost(HiveTableScan t) {
+  public HiveCost computeCost(HiveTableScan t) {
     double cardinality = t.getRows();
-    return new HiveCost(cardinality, 0, HDFS_WRITE_COST * cardinality * 0);
+    return new HiveCost(cardinality, 0, hdfsWrite * cardinality * 0);
   }
 
-  public static double computeSortMergeCPUCost(
+  public double computeSortMergeCPUCost(
           ImmutableList<Double> cardinalities,
           ImmutableBitSet sorted) {
     // Sort-merge join
@@ -67,16 +82,16 @@ public static double computeSortMergeCPUCost(
         cpuCost += computeSortCPUCost(cardinality);
       }
       // Merge cost
-      cpuCost += cardinality * CPU_COST;
+      cpuCost += cardinality * cpuCost;
     }
     return cpuCost;
   }
 
-  public static double computeSortCPUCost(Double cardinality) {
-    return cardinality * Math.log(cardinality) * CPU_COST;
+  public double computeSortCPUCost(Double cardinality) {
+    return cardinality * Math.log(cardinality) * cpuCost;
   }
 
-  public static double computeSortMergeIOCost(
+  public double computeSortMergeIOCost(
           ImmutableList<Pair<Double, Double>> relationInfos) {
     // Sort-merge join
     double ioCost = 0.0;
@@ -86,17 +101,17 @@ public static double computeSortMergeIOCost(
     return ioCost;
   }
 
-  public static double computeSortIOCost(Pair<Double, Double> relationInfo) {
+  public double computeSortIOCost(Pair<Double, Double> relationInfo) {
     // Sort-merge join
     double ioCost = 0.0;
     double cardinality = relationInfo.left;
     double averageTupleSize = relationInfo.right;
     // Write cost
-    ioCost += cardinality * averageTupleSize * LOCAL_WRITE_COST;
+    ioCost += cardinality * averageTupleSize * localFSWrite;
     // Read cost
-    ioCost += cardinality * averageTupleSize * LOCAL_READ_COST;
+    ioCost += cardinality * averageTupleSize * localFSRead;
     // Net transfer cost
-    ioCost += cardinality * averageTupleSize * NET_COST;
+    ioCost += cardinality * averageTupleSize * netCost;
     return ioCost;
   }
 
@@ -110,12 +125,12 @@ public static double computeMapJoinCPUCost(
       if (!streaming.get(i)) {
         cpuCost += cardinality;
       }
-      cpuCost += cardinality * CPU_COST;
+      cpuCost += cardinality * cpuCost;
     }
     return cpuCost;
   }
 
-  public static double computeMapJoinIOCost(
+  public double computeMapJoinIOCost(
           ImmutableList<Pair<Double, Double>> relationInfos,
           ImmutableBitSet streaming, int parallelism) {
     // Hash-join
@@ -124,13 +139,13 @@ public static double computeMapJoinIOCost(
       double cardinality = relationInfos.get(i).left;
       double averageTupleSize = relationInfos.get(i).right;
       if (!streaming.get(i)) {
-        ioCost += cardinality * averageTupleSize * NET_COST * parallelism;
+        ioCost += cardinality * averageTupleSize * netCost * parallelism;
       }
     }
     return ioCost;
   }
 
-  public static double computeBucketMapJoinCPUCost(
+  public double computeBucketMapJoinCPUCost(
           ImmutableList<Double> cardinalities,
           ImmutableBitSet streaming) {
     // Hash-join
@@ -138,14 +153,14 @@ public static double computeBucketMapJoinCPUCost(
     for (int i=0; i<cardinalities.size(); i++) {
       double cardinality = cardinalities.get(i);
       if (!streaming.get(i)) {
-        cpuCost += cardinality * CPU_COST;
+        cpuCost += cardinality * cpuCost;
       }
-      cpuCost += cardinality * CPU_COST;
+      cpuCost += cardinality * cpuCost;
     }
     return cpuCost;
   }
 
-  public static double computeBucketMapJoinIOCost(
+  public double computeBucketMapJoinIOCost(
           ImmutableList<Pair<Double, Double>> relationInfos,
           ImmutableBitSet streaming, int parallelism) {
     // Hash-join
@@ -154,7 +169,7 @@ public static double computeBucketMapJoinIOCost(
       double cardinality = relationInfos.get(i).left;
       double averageTupleSize = relationInfos.get(i).right;
       if (!streaming.get(i)) {
-        ioCost += cardinality * averageTupleSize * NET_COST * parallelism;
+        ioCost += cardinality * averageTupleSize * netCost * parallelism;
       }
     }
     return ioCost;
@@ -165,12 +180,12 @@ public static double computeSMBMapJoinCPUCost(
     // Hash-join
     double cpuCost = 0.0;
     for (int i=0; i<cardinalities.size(); i++) {
-      cpuCost += cardinalities.get(i) * CPU_COST;
+      cpuCost += cardinalities.get(i) * cpuCost;
     }
     return cpuCost;
   }
 
-  public static double computeSMBMapJoinIOCost(
+  public double computeSMBMapJoinIOCost(
           ImmutableList<Pair<Double, Double>> relationInfos,
           ImmutableBitSet streaming, int parallelism) {
     // Hash-join
@@ -179,7 +194,7 @@ public static double computeSMBMapJoinIOCost(
       double cardinality = relationInfos.get(i).left;
       double averageTupleSize = relationInfos.get(i).right;
       if (!streaming.get(i)) {
-        ioCost += cardinality * averageTupleSize * NET_COST * parallelism;
+        ioCost += cardinality * averageTupleSize * netCost * parallelism;
       }
     }
     return ioCost;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveDefaultCostModel.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveDefaultCostModel.java
index 7f71f56371c2..95f9b49b3be4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveDefaultCostModel.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveDefaultCostModel.java
@@ -21,6 +21,7 @@
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
 
@@ -33,8 +34,15 @@
  */
 public class HiveDefaultCostModel extends HiveCostModel {
   
-  public static final HiveDefaultCostModel INSTANCE =
-          new HiveDefaultCostModel();
+  private static HiveDefaultCostModel INSTANCE;
+
+  synchronized public static HiveDefaultCostModel getCostModel() {
+    if (INSTANCE == null) {
+      INSTANCE = new HiveDefaultCostModel();
+    }
+
+    return INSTANCE;
+  }
 
   private HiveDefaultCostModel() {
     super(Sets.newHashSet(DefaultJoinAlgorithm.INSTANCE));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java
index 3df895538c0c..d6b02023d645 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java
@@ -29,6 +29,7 @@
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.calcite.util.Pair;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
@@ -42,15 +43,26 @@
  */
 public class HiveOnTezCostModel extends HiveCostModel {
 
-  public static final HiveOnTezCostModel INSTANCE =
-          new HiveOnTezCostModel();
+  private static HiveOnTezCostModel INSTANCE;
 
-  private HiveOnTezCostModel() {
+  private static HiveAlgorithmsUtil algoUtils;
+
+  synchronized public static HiveOnTezCostModel getCostModel(HiveConf conf) {
+    if (INSTANCE == null) {
+      INSTANCE = new HiveOnTezCostModel(conf);
+    }
+
+    return INSTANCE;
+  }
+
+  private HiveOnTezCostModel(HiveConf conf) {
     super(Sets.newHashSet(
             TezCommonJoinAlgorithm.INSTANCE,
             TezMapJoinAlgorithm.INSTANCE,
             TezBucketJoinAlgorithm.INSTANCE,
             TezSMBJoinAlgorithm.INSTANCE));
+
+    algoUtils = new HiveAlgorithmsUtil(conf);
   }
 
   @Override
@@ -69,7 +81,7 @@ public RelOptCost getAggregateCost(HiveAggregate aggregate) {
         return null;
       }
       // 2. CPU cost = sorting cost
-      final double cpuCost = HiveAlgorithmsUtil.computeSortCPUCost(rCount);
+      final double cpuCost = algoUtils.computeSortCPUCost(rCount);
       // 3. IO cost = cost of writing intermediary results to local FS +
       //              cost of reading from local FS for transferring to GBy +
       //              cost of transferring map outputs to GBy operator
@@ -77,7 +89,7 @@ public RelOptCost getAggregateCost(HiveAggregate aggregate) {
       if (rAverageSize == null) {
         return null;
       }
-      final double ioCost = HiveAlgorithmsUtil.computeSortIOCost(new Pair<Double,Double>(rCount,rAverageSize));
+      final double ioCost = algoUtils.computeSortIOCost(new Pair<Double,Double>(rCount,rAverageSize));
       // 4. Result
       return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost);
     }
@@ -118,7 +130,7 @@ public RelOptCost getCost(HiveJoin join) {
               add(leftRCount).
               add(rightRCount).
               build();
-      final double cpuCost = HiveAlgorithmsUtil.computeSortMergeCPUCost(cardinalities, join.getSortedInputs());
+      final double cpuCost = algoUtils.computeSortMergeCPUCost(cardinalities, join.getSortedInputs());
       // 3. IO cost = cost of writing intermediary results to local FS +
       //              cost of reading from local FS for transferring to join +
       //              cost of transferring map outputs to Join operator
@@ -131,7 +143,7 @@ public RelOptCost getCost(HiveJoin join) {
               add(new Pair<Double,Double>(leftRCount,leftRAverageSize)).
               add(new Pair<Double,Double>(rightRCount,rightRAverageSize)).
               build();
-      final double ioCost = HiveAlgorithmsUtil.computeSortMergeIOCost(relationInfos);
+      final double ioCost = algoUtils.computeSortMergeIOCost(relationInfos);
       // 4. Result
       return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost);
     }
@@ -242,7 +254,7 @@ public RelOptCost getCost(HiveJoin join) {
               build();
       final int parallelism = RelMetadataQuery.splitCount(join) == null
               ? 1 : RelMetadataQuery.splitCount(join);
-      final double ioCost = HiveAlgorithmsUtil.computeMapJoinIOCost(relationInfos, streaming, parallelism);
+      final double ioCost = algoUtils.computeMapJoinIOCost(relationInfos, streaming, parallelism);
       // 4. Result
       return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost);
     }
@@ -382,7 +394,7 @@ public RelOptCost getCost(HiveJoin join) {
           return null;
       }
       ImmutableBitSet streaming = streamingBuilder.build();
-      final double cpuCost = HiveAlgorithmsUtil.computeBucketMapJoinCPUCost(cardinalities, streaming);
+      final double cpuCost = algoUtils.computeBucketMapJoinCPUCost(cardinalities, streaming);
       // 3. IO cost = cost of transferring small tables to join node *
       //              degree of parallelism
       final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(join.getLeft());
@@ -396,7 +408,7 @@ public RelOptCost getCost(HiveJoin join) {
               build();
       final int parallelism = RelMetadataQuery.splitCount(join) == null
               ? 1 : RelMetadataQuery.splitCount(join);
-      final double ioCost = HiveAlgorithmsUtil.computeBucketMapJoinIOCost(relationInfos, streaming, parallelism);
+      final double ioCost = algoUtils.computeBucketMapJoinIOCost(relationInfos, streaming, parallelism);
       // 4. Result
       return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost);
     }
@@ -540,7 +552,7 @@ public RelOptCost getCost(HiveJoin join) {
               build();
       final int parallelism = RelMetadataQuery.splitCount(join) == null
               ? 1 : RelMetadataQuery.splitCount(join);
-      final double ioCost = HiveAlgorithmsUtil.computeSMBMapJoinIOCost(relationInfos, streaming, parallelism);
+      final double ioCost = algoUtils.computeSMBMapJoinIOCost(relationInfos, streaming, parallelism);
       // 4. Result
       return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost);
     }