diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index b8c04140dc74..7f41132a5a3b 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -841,7 +841,9 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
 
       // server returns STRING as default dataType for all columns in (some) scenarios where no rows are returned
       // this is an attempt to return more faithful information based on other sources
-      ParserUtils.fillEmptyResponseSchema(brokerResponse, _tableCache, schema, database, query);
+      if (brokerResponse.getNumRowsResultSet() == 0) {
+        ParserUtils.fillEmptyResponseSchema(brokerResponse, _tableCache, schema, database, query);
+      }
 
       // Set total query processing time
       long totalTimeMs = System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis();
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
index 2fb2aef48a2e..7751d4dc5688 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
@@ -419,10 +419,6 @@ public boolean isWholeNumberArray() {
       return INTEGRAL_ARRAY_TYPES.contains(this);
     }
 
-    public boolean isUnknown() {
-      return UNKNOWN.equals(this);
-    }
-
     public boolean isCompatible(ColumnDataType anotherColumnDataType) {
       // All numbers are compatible with each other
       return this == anotherColumnDataType || (this.isNumber() && anotherColumnDataType.isNumber()) || (
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java
index d49d2555abfc..5c0c3454a928 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java
@@ -41,7 +41,6 @@
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
@@ -55,6 +54,15 @@
 public class EmptyResponseIntegrationTest extends BaseClusterIntegrationTestSet {
   private static final int NUM_BROKERS = 1;
   private static final int NUM_SERVERS = 1;
+  private static final String[] SELECT_STAR_TYPES = new String[]{
+      "INT", "INT", "LONG", "INT", "FLOAT", "DOUBLE", "INT", "STRING", "INT", "INT", "INT", "INT", "STRING", "INT",
+      "STRING", "INT", "INT", "INT", "INT", "INT", "DOUBLE", "FLOAT", "INT", "STRING", "INT", "STRING", "INT", "INT",
+      "INT", "STRING", "STRING", "INT", "STRING", "INT", "INT", "INT", "INT", "INT_ARRAY", "INT", "INT_ARRAY",
+      "STRING_ARRAY", "INT", "INT", "FLOAT_ARRAY", "INT", "STRING_ARRAY", "LONG_ARRAY", "INT_ARRAY", "INT_ARRAY",
+      "INT", "INT", "STRING", "INT", "INT", "INT", "INT", "INT", "INT", "STRING", "INT", "INT", "INT", "STRING",
+      "STRING", "INT", "STRING", "INT", "INT", "STRING_ARRAY", "INT", "STRING", "INT", "INT", "INT", "STRING", "INT",
+      "INT", "INT", "INT"
+  };
 
   private final List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbacks =
       new ArrayList<>(getNumBrokers() + getNumServers());
@@ -134,11 +142,6 @@ public void setUp()
     waitForAllDocsLoaded(600_000L);
   }
 
-  @BeforeMethod
-  public void resetMultiStage() {
-    setUseMultiStageQueryEngine(false);
-  }
-
   protected void startBrokers()
       throws Exception {
     startBrokers(getNumBrokers());
@@ -184,21 +187,17 @@ public void testInstancesStarted() {
   }
 
   @Test
-  public void testCompiledByV2StarField() throws Exception {
+  public void testStarField()
+      throws Exception {
     String sqlQuery = "SELECT * FROM myTable WHERE AirlineID = 0 LIMIT 1";
     JsonNode response = postQuery(sqlQuery);
     assertNoRowsReturned(response);
-    assertDataTypes(response, "INT", "INT", "LONG", "INT", "FLOAT", "DOUBLE", "INT", "STRING", "INT", "INT", "INT",
-        "INT", "STRING", "INT", "STRING", "INT", "INT", "INT", "INT", "INT", "DOUBLE", "FLOAT", "INT", "STRING", "INT",
-        "STRING", "INT", "INT", "INT", "STRING", "STRING", "INT", "STRING", "INT", "INT", "INT", "INT", "INT_ARRAY",
-        "INT", "INT_ARRAY", "STRING_ARRAY", "INT", "INT", "FLOAT_ARRAY", "INT", "STRING_ARRAY", "LONG_ARRAY",
-        "INT_ARRAY", "INT_ARRAY", "INT", "INT", "STRING", "INT", "INT", "INT", "INT", "INT", "INT", "STRING", "INT",
-        "INT", "INT", "STRING", "STRING", "INT", "STRING", "INT", "INT", "STRING_ARRAY", "INT", "STRING", "INT", "INT",
-        "INT", "STRING", "INT", "INT", "INT", "INT");
+    assertDataTypes(response, SELECT_STAR_TYPES);
   }
 
   @Test
-  public void testCompiledByV2SelectionFields() throws Exception {
+  public void testSelectionFields()
+      throws Exception {
     String sqlQuery = "SELECT AirlineID, ArrTime, ArrTimeBlk FROM myTable WHERE AirlineID = 0 LIMIT 1";
     JsonNode response = postQuery(sqlQuery);
     assertNoRowsReturned(response);
@@ -206,7 +205,8 @@ public void testCompiledByV2SelectionFields() throws Exception {
   }
 
   @Test
-  public void testCompiledByV2TrasformedFields() throws Exception {
+  public void testTransformedFields()
+      throws Exception {
     String sqlQuery = "SELECT AirlineID, ArrTime, ArrTime+1 FROM myTable WHERE AirlineID = 0 LIMIT 1";
     JsonNode response = postQuery(sqlQuery);
     assertNoRowsReturned(response);
@@ -214,7 +214,8 @@ public void testCompiledByV2TrasformedFields() throws Exception {
   }
 
   @Test
-  public void testCompiledByV2AggregatedFields() throws Exception {
+  public void testAggregatedFields()
+      throws Exception {
     String sqlQuery = "SELECT AirlineID, avg(ArrTime) FROM myTable WHERE AirlineID = 0 GROUP BY AirlineID LIMIT 1";
     JsonNode response = postQuery(sqlQuery);
     assertNoRowsReturned(response);
@@ -222,29 +223,27 @@ public void testCompiledByV2AggregatedFields() throws Exception {
   }
 
   @Test
-  public void testSchemaFallbackStarField() throws Exception {
+  public void testSchemaFallbackStarField()
+      throws Exception {
     String sqlQuery = "SELECT * FROM myTable WHERE AirlineID = 0 AND add(AirTime, AirTime, ArrTime) > 0 LIMIT 1";
     JsonNode response = postQuery(sqlQuery);
     assertNoRowsReturned(response);
-    assertDataTypes(response, "INT", "INT", "LONG", "INT", "FLOAT", "DOUBLE", "INT", "STRING", "INT", "INT", "INT",
-        "INT", "STRING", "INT", "STRING", "INT", "INT", "INT", "INT", "INT", "DOUBLE", "FLOAT", "INT", "STRING", "INT",
-        "STRING", "INT", "INT", "INT", "STRING", "STRING", "INT", "STRING", "INT", "INT", "INT", "INT", "INT", "INT",
-        "INT", "STRING", "INT", "INT", "FLOAT", "INT", "STRING", "LONG", "INT", "INT", "INT", "INT", "STRING", "INT",
-        "INT", "INT", "INT", "INT", "INT", "STRING", "INT", "INT", "INT", "STRING", "STRING", "INT", "STRING", "INT",
-        "INT", "STRING", "INT", "STRING", "INT", "INT", "INT", "STRING", "INT", "INT", "INT", "INT");
+    assertDataTypes(response, SELECT_STAR_TYPES);
   }
 
   @Test
-  public void testSchemaFallbackSelectionFields() throws Exception {
+  public void testSchemaFallbackSelectionFields()
+      throws Exception {
     String sqlQuery = "SELECT AirlineID, ArrTime, ArrTimeBlk FROM myTable"
-      + " WHERE AirlineID = 0 AND add(ArrTime, ArrTime, ArrTime) > 0 LIMIT 1";
+        + " WHERE AirlineID = 0 AND add(ArrTime, ArrTime, ArrTime) > 0 LIMIT 1";
     JsonNode response = postQuery(sqlQuery);
     assertNoRowsReturned(response);
     assertDataTypes(response, "LONG", "INT", "STRING");
   }
 
   @Test
-  public void testSchemaFallbackTransformedFields() throws Exception {
+  public void testSchemaFallbackTransformedFields()
+      throws Exception {
     String sqlQuery = "SELECT AirlineID, ArrTime, ArrTime+1 FROM myTable"
         + " WHERE AirlineID = 0 AND add(ArrTime, ArrTime, ArrTime) > 0 LIMIT 1";
     JsonNode response = postQuery(sqlQuery);
@@ -253,7 +252,8 @@ public void testSchemaFallbackTransformedFields() throws Exception {
   }
 
   @Test
-  public void testSchemaFallbackAggregatedFields() throws Exception {
+  public void testSchemaFallbackAggregatedFields()
+      throws Exception {
     String sqlQuery = "SELECT AirlineID, avg(ArrTime) FROM myTable"
         + " WHERE AirlineID = 0 AND add(ArrTime, ArrTime, ArrTime) > 0 GROUP BY AirlineID LIMIT 1";
     JsonNode response = postQuery(sqlQuery);
@@ -262,24 +262,13 @@ public void testSchemaFallbackAggregatedFields() throws Exception {
   }
 
   @Test
-  public void testDataSchemaForBrokerPrunedEmptyResults() throws Exception {
+  public void testDataSchemaForBrokerPrunedEmptyResults()
+      throws Exception {
     TableConfig tableConfig = getOfflineTableConfig();
     tableConfig.setRoutingConfig(
         new RoutingConfig(null, Collections.singletonList(RoutingConfig.TIME_SEGMENT_PRUNER_TYPE), null, null));
     updateTableConfig(tableConfig);
 
-    TestUtils.waitForCondition(aVoid -> {
-      try {
-        TableConfig cfg = getOfflineTableConfig();
-        if (cfg.getRoutingConfig() == null || cfg.getRoutingConfig().getSegmentPrunerTypes().isEmpty()) {
-          return false;
-        }
-        return true;
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }, 600_000L, "Failed to update table config");
-
     String query =
         "Select DestAirportID, Carrier from myTable WHERE DaysSinceEpoch < -1231231 and FlightNum > 121231231231";
 
@@ -293,31 +282,19 @@ public void testDataSchemaForBrokerPrunedEmptyResults() throws Exception {
     assertNoRowsReturned(queryResponse);
     assertDataTypes(queryResponse, "INT", "STRING");
 
-    // Reset and remove the Time Segment Pruner
-    tableConfig = getOfflineTableConfig();
-    tableConfig.setRoutingConfig(new RoutingConfig(null, Collections.emptyList(), null, null));
+    // Reset the routing config
+    tableConfig.setRoutingConfig(null);
     updateTableConfig(tableConfig);
-    TestUtils.waitForCondition(aVoid -> {
-      try {
-        TableConfig cfg = getOfflineTableConfig();
-        if (cfg.getRoutingConfig() == null || cfg.getRoutingConfig().getSegmentPrunerTypes().size() > 0) {
-          return false;
-        }
-        return true;
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }, 600_000L, "Failed to update table config");
   }
 
-
   private void assertNoRowsReturned(JsonNode response) {
     assertNotNull(response.get("resultTable"));
     assertNotNull(response.get("resultTable").get("rows"));
     assertEquals(response.get("resultTable").get("rows").size(), 0);
   }
 
-  private void assertDataTypes(JsonNode response, String... types) throws Exception {
+  private void assertDataTypes(JsonNode response, String... types)
+      throws Exception {
     assertNotNull(response.get("resultTable"));
     assertNotNull(response.get("resultTable").get("dataSchema"));
     assertNotNull(response.get("resultTable").get("dataSchema").get("columnDataTypes"));
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 3322c6bcd24e..f73852db5bd0 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -89,7 +89,6 @@
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import static org.apache.pinot.common.function.scalar.StringFunctions.*;
@@ -267,11 +266,6 @@ private void reloadAllSegments(String testQuery, boolean forceDownload, long num
     }, 600_000L, "Failed to reload table with force download");
   }
 
-  @BeforeMethod
-  public void resetMultiStage() {
-    setUseMultiStageQueryEngine(false);
-  }
-
   protected void startBrokers()
       throws Exception {
     startBrokers(getNumBrokers());
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java
index 27f2842ed738..ec58dd296c70 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.parser.utils;
 
+import com.google.common.base.Preconditions;
 import java.util.List;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.type.RelDataTypeField;
@@ -25,6 +26,7 @@
 import org.apache.pinot.common.response.BrokerResponse;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.query.QueryEnvironment;
 import org.apache.pinot.query.planner.logical.RelToPlanNodeConverter;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -53,95 +55,87 @@ public static boolean canCompileWithMultiStageEngine(String query, String databa
   }
 
   /**
-   * Tries to fill an empty or not properly filled schema when no rows have been returned.
+   * Tries to fill an empty or not properly filled {@link DataSchema} when no row has been returned.
+   *
+   * Response data schema can be inaccurate or incomplete in several forms:
+   * 1. No result table at all (when all segments have been pruned on broker).
+   * 2. Data schema has all columns set to default type (STRING) (when all segments pruned on server).
    *
    * Priority is:
-   * - Types in schema provided by V2 validation for the given query.
-   * - Types in schema provided by V1 for the given table (only appliable to selection fields).
-   * - Types in response provided by V1 server (no action).
+   * - Types from multi-stage engine validation for the given query.
+   * - Types from schema for the given table (only applicable to selection fields).
+   * - Types from single-stage engine response (no action).
+   *
+   * Multi-stage engine schema will be available only if query compiles.
    */
-  public static void fillEmptyResponseSchema(
-      BrokerResponse response, TableCache tableCache, Schema schema, String database, String query
-  ) {
-    if (response == null || response.getNumRowsResultSet() > 0) {
-      return;
-    }
+  public static void fillEmptyResponseSchema(BrokerResponse response, TableCache tableCache, Schema schema,
+      String database, String query) {
+    Preconditions.checkState(response.getNumRowsResultSet() == 0, "Cannot fill schema for non-empty response");
 
-    QueryEnvironment queryEnvironment = new QueryEnvironment(database, tableCache, null);
-    RelRoot node = queryEnvironment.getRelRootIfCanCompile(query);
-    DataSchema.ColumnDataType resolved;
+    DataSchema dataSchema = response.getResultTable() != null ? response.getResultTable().getDataSchema() : null;
 
-    // V1 schema info for the response can be inaccurate or incomplete in several forms:
-    // 1) No schema provided at all (when no segments have been even pruned).
-    // 2) Schema provided but all columns set to default type (STRING) (when no segments have been matched).
-    // V2 schema will be available only if query compiles.
+    List<RelDataTypeField> dataTypeFields = null;
+    try {
+      QueryEnvironment queryEnvironment = new QueryEnvironment(database, tableCache, null);
+      RelRoot node = queryEnvironment.getRelRootIfCanCompile(query);
+      if (node != null && node.validatedRowType != null) {
+        dataTypeFields = node.validatedRowType.getFieldList();
+      }
+    } catch (Exception ignored) {
+      // Ignored
+    }
 
-    boolean hasV1Schema = response.getResultTable() != null;
-    boolean hasV2Schema = node != null && node.validatedRowType != null;
+    if (dataSchema == null && dataTypeFields == null) {
+      // No schema available, nothing we can do
+      return;
+    }
 
-    if (hasV1Schema && hasV2Schema) {
-      // match v1 column types with v2 column types using column names
-      // if no match, rely on v1 schema based on column name
-      // if no match either, just leave it as it is
-      DataSchema responseSchema = response.getResultTable().getDataSchema();
-      List<RelDataTypeField> fields = node.validatedRowType.getFieldList();
-      for (int i = 0; i < responseSchema.size(); i++) {
-        resolved = RelToPlanNodeConverter.convertToColumnDataType(fields.get(i).getType());
-        if (resolved == null || resolved.isUnknown()) {
-          FieldSpec spec = schema.getFieldSpecFor(responseSchema.getColumnName(i));
-          try {
-            resolved = DataSchema.ColumnDataType.fromDataType(spec.getDataType(), false);
-          } catch (Exception e) {
-            try {
-              resolved = DataSchema.ColumnDataType.fromDataType(spec.getDataType(), true);
-            } catch (Exception e2) {
-              resolved = DataSchema.ColumnDataType.UNKNOWN;
-            }
-          }
-        }
-        if (resolved == null || resolved.isUnknown()) {
-          resolved = responseSchema.getColumnDataType(i);
+    if (dataSchema == null || (dataTypeFields != null && dataSchema.size() != dataTypeFields.size())) {
+      // If data schema is not available or has different number of columns than the validated row type, we use the
+      // validated row type to populate the schema.
+      int numColumns = dataTypeFields.size();
+      String[] columnNames = new String[numColumns];
+      ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns];
+      for (int i = 0; i < numColumns; i++) {
+        RelDataTypeField dataTypeField = dataTypeFields.get(i);
+        columnNames[i] = dataTypeField.getName();
+        ColumnDataType columnDataType;
+        try {
+          columnDataType = RelToPlanNodeConverter.convertToColumnDataType(dataTypeField.getType());
+        } catch (Exception ignored) {
+          columnDataType = ColumnDataType.UNKNOWN;
         }
-        responseSchema.getColumnDataTypes()[i] = resolved;
+        columnDataTypes[i] = columnDataType;
       }
-    } else if (hasV1Schema) {
-      // match v1 column types with v1 schema columns using column names
-      // if no match, just leave it as it is
-      DataSchema responseSchema = response.getResultTable().getDataSchema();
-      for (int i = 0; i < responseSchema.size(); i++) {
-        FieldSpec spec = schema.getFieldSpecFor(responseSchema.getColumnName(i));
+      response.setResultTable(new ResultTable(new DataSchema(columnNames, columnDataTypes), List.of()));
+      return;
+    }
+
+    // When data schema is available, try to fix the data types within it.
+    ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+    int numColumns = columnDataTypes.length;
+    if (dataTypeFields != null) {
+      // Fill data type with the validated row type when it is available.
+      for (int i = 0; i < numColumns; i++) {
         try {
-          // try single value first
-          resolved = DataSchema.ColumnDataType.fromDataType(spec.getDataType(), true);
-        } catch (Exception e) {
-          try {
-            // fallback to multi value
-            resolved = DataSchema.ColumnDataType.fromDataType(spec.getDataType(), false);
-          } catch (Exception e2) {
-            resolved = DataSchema.ColumnDataType.UNKNOWN;
-          }
-        }
-        if (resolved == null || resolved.isUnknown()) {
-          resolved = responseSchema.getColumnDataType(i);
+          columnDataTypes[i] = RelToPlanNodeConverter.convertToColumnDataType(dataTypeFields.get(i).getType());
+        } catch (Exception ignored) {
+          // Ignore exception and keep the type from response
         }
-        responseSchema.getColumnDataTypes()[i] = resolved;
       }
-    } else if (hasV2Schema) {
-      // trust v2 column types blindly
-      // if a type cannot be resolved, leave it as UNKNOWN
-      List<RelDataTypeField> fields = node.validatedRowType.getFieldList();
-      String[] columnNames = new String[fields.size()];
-      DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[fields.size()];
-      for (int i = 0; i < fields.size(); i++) {
-        columnNames[i] = fields.get(i).getName();
-        resolved = RelToPlanNodeConverter.convertToColumnDataType(fields.get(i).getType());
-        if (resolved == null) {
-          resolved = DataSchema.ColumnDataType.UNKNOWN;
+    } else {
+      // Fill data type with the schema when validated row type is not available.
+      String[] columnNames = dataSchema.getColumnNames();
+      for (int i = 0; i < numColumns; i++) {
+        FieldSpec fieldSpec = schema.getFieldSpecFor(columnNames[i]);
+        if (fieldSpec != null) {
+          try {
+            columnDataTypes[i] = ColumnDataType.fromDataType(fieldSpec.getDataType(), fieldSpec.isSingleValueField());
+          } catch (Exception ignored) {
+            // Ignore exception and keep the type from response
+          }
         }
-        columnDataTypes[i] = resolved;
       }
-      response.setResultTable(new ResultTable(new DataSchema(columnNames, columnDataTypes), List.of()));
     }
-    // else { /* nothing else we can do */ }
   }
 }