clusterInstances = _helixAdmin.getInstancesInCluster(_helixManager.getClusterName());
+ _numBrokers = Math.max(1, (int) clusterInstances.stream()
+ .filter(instance -> instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE))
+ .count());
+ _numServers = Math.max(1, (int) clusterInstances.stream()
+ .filter(instance -> instance.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE))
+ .count());
+
+ if (_maxConcurrentQueries > 0) {
+ _semaphore = new AdjustableSemaphore(Math.max(1, _maxConcurrentQueries * _numServers / _numBrokers), true);
+ }
+ }
+
+ /**
+ * Returns true if the query can be executed (waiting until it can be executed if necessary), false otherwise.
+ *
+ * {@link #release()} should be called after the query is done executing. It is the responsibility of the caller to
+ * ensure that {@link #release()} is called exactly once for each call to this method.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the timeout argument
+ * @throws InterruptedException if the current thread is interrupted
+ */
+ public boolean tryAcquire(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ if (_maxConcurrentQueries <= 0) {
+ return true;
+ }
+ return _semaphore.tryAcquire(timeout, unit);
+ }
+
+ /**
+ * Should be called after the query is done executing. It is the responsibility of the caller to ensure that this
+ * method is called exactly once for each call to {@link #tryAcquire(long, TimeUnit)}.
+ */
+ public void release() {
+ if (_maxConcurrentQueries > 0) {
+ _semaphore.release();
+ }
+ }
+
+ @Override
+ public void processClusterChange(HelixConstants.ChangeType changeType) {
+ Preconditions.checkArgument(
+ changeType == HelixConstants.ChangeType.EXTERNAL_VIEW || changeType == HelixConstants.ChangeType.CLUSTER_CONFIG,
+ "MultiStageQuerySemaphore can only handle EXTERNAL_VIEW and CLUSTER_CONFIG changes");
+
+ if (changeType == HelixConstants.ChangeType.EXTERNAL_VIEW) {
+ List clusterInstances = _helixAdmin.getInstancesInCluster(_helixManager.getClusterName());
+ int numBrokers = Math.max(1, (int) clusterInstances.stream()
+ .filter(instance -> instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE))
+ .count());
+ int numServers = Math.max(1, (int) clusterInstances.stream()
+ .filter(instance -> instance.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE))
+ .count());
+
+ if (numBrokers != _numBrokers || numServers != _numServers) {
+ _numBrokers = numBrokers;
+ _numServers = numServers;
+ if (_maxConcurrentQueries > 0) {
+ _semaphore.setPermits(Math.max(1, _maxConcurrentQueries * _numServers / _numBrokers));
+ }
+ }
+ } else {
+ int maxConcurrentQueries = Integer.parseInt(
+ _helixAdmin.getConfig(_helixConfigScope,
+ Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES))
+ .getOrDefault(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES,
+ CommonConstants.Helix.DEFAULT_MAX_CONCURRENT_MULTI_STAGE_QUERIES));
+
+ if (_maxConcurrentQueries == maxConcurrentQueries) {
+ return;
+ }
+
+ if (_maxConcurrentQueries <= 0 && maxConcurrentQueries > 0
+ || _maxConcurrentQueries > 0 && maxConcurrentQueries <= 0) {
+ // This operation isn't safe to do while queries are running so we require a restart of the broker for this
+ // change to take effect.
+ LOGGER.warn("Enabling or disabling limitation of the maximum number of multi-stage queries running "
+ + "concurrently requires a restart of the broker to take effect");
+ return;
+ }
+
+ if (maxConcurrentQueries > 0) {
+ _semaphore.setPermits(Math.max(1, maxConcurrentQueries * _numServers / _numBrokers));
+ }
+ _maxConcurrentQueries = maxConcurrentQueries;
+ }
+ }
+
+ @VisibleForTesting
+ int availablePermits() {
+ return _semaphore.availablePermits();
+ }
+}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
index 52cf63f562e0..d14f2860138a 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
@@ -53,6 +53,7 @@
import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan;
import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest;
import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanResult;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactoryProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,6 +71,7 @@ public TimeSeriesRequestHandler(PinotConfiguration config, String brokerId, Brok
_queryEnvironment = new TimeSeriesQueryEnvironment(config, routingManager, tableCache);
_queryEnvironment.init(config);
_queryDispatcher = queryDispatcher;
+ TimeSeriesBuilderFactoryProvider.init(config);
}
@Override
@@ -117,6 +119,10 @@ public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, String
if (timeSeriesResponse == null
|| timeSeriesResponse.getStatus().equals(PinotBrokerTimeSeriesResponse.ERROR_STATUS)) {
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.TIME_SERIES_GLOBAL_QUERIES_FAILED, 1);
+ final String errorMessage = timeSeriesResponse == null ? "null time-series response"
+ : timeSeriesResponse.getError();
+ // TODO(timeseries): Remove logging for failed queries.
+ LOGGER.warn("time-series query failed with error: {}", errorMessage);
}
}
}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottlerTest.java
new file mode 100644
index 000000000000..fe2a5a124006
--- /dev/null
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottlerTest.java
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.requesthandler;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+
+public class MultiStageQueryThrottlerTest {
+
+ private AutoCloseable _mocks;
+ @Mock
+ private HelixManager _helixManager;
+ @Mock
+ private HelixAdmin _helixAdmin;
+ private MultiStageQueryThrottler _multiStageQueryThrottler;
+
+ @BeforeMethod
+ public void setUp() {
+ _mocks = MockitoAnnotations.openMocks(this);
+ when(_helixManager.getClusterManagmentTool()).thenReturn(_helixAdmin);
+ when(_helixManager.getClusterName()).thenReturn("testCluster");
+ when(_helixAdmin.getConfig(any(), any())).thenReturn(
+ Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "4"));
+ when(_helixAdmin.getInstancesInCluster(eq("testCluster"))).thenReturn(
+ List.of("Broker_0", "Broker_1", "Server_0", "Server_1"));
+ }
+
+ @AfterMethod
+ public void tearDown()
+ throws Exception {
+ _mocks.close();
+ }
+
+ @Test
+ public void testBasicAcquireRelease()
+ throws Exception {
+ _multiStageQueryThrottler = new MultiStageQueryThrottler();
+ _multiStageQueryThrottler.init(_helixManager);
+
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 3);
+ _multiStageQueryThrottler.release();
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 4);
+ }
+
+ @Test
+ public void testAcquireTimeout()
+ throws Exception {
+ when(_helixAdmin.getConfig(any(),
+ eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES)))).thenReturn(
+ Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "2"));
+ _multiStageQueryThrottler = new MultiStageQueryThrottler();
+ _multiStageQueryThrottler.init(_helixManager);
+
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 1);
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testDisabledThrottling()
+ throws Exception {
+ when(_helixAdmin.getConfig(any(), any())).thenReturn(
+ Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "-1"));
+ _multiStageQueryThrottler = new MultiStageQueryThrottler();
+ _multiStageQueryThrottler.init(_helixManager);
+
+ // If maxConcurrentQueries is <= 0, the throttling mechanism should be "disabled" and any attempt to acquire should
+ // succeed
+ for (int i = 0; i < 100; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+ }
+
+ @Test
+ public void testIncreaseNumBrokers()
+ throws Exception {
+ _multiStageQueryThrottler = new MultiStageQueryThrottler();
+ _multiStageQueryThrottler.init(_helixManager);
+
+ for (int i = 0; i < 4; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
+
+ // Increase the number of brokers
+ when(_helixAdmin.getInstancesInCluster(eq("testCluster"))).thenReturn(
+ List.of("Broker_0", "Broker_1", "Broker_2", "Broker_3", "Server_0", "Server_1"));
+ _multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.EXTERNAL_VIEW);
+
+ // Verify that the number of permits on this broker have been reduced to account for the new brokers
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), -2);
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+
+ for (int i = 0; i < 4; i++) {
+ _multiStageQueryThrottler.release();
+ }
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 2);
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testDecreaseNumBrokers()
+ throws Exception {
+ _multiStageQueryThrottler = new MultiStageQueryThrottler();
+ _multiStageQueryThrottler.init(_helixManager);
+
+ for (int i = 0; i < 4; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
+
+ // Decrease the number of brokers
+ when(_helixAdmin.getInstancesInCluster(eq("testCluster"))).thenReturn(List.of("Broker_0", "Server_0", "Server_1"));
+ _multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.EXTERNAL_VIEW);
+
+ // Ensure that the permits from the removed broker are added to this one.
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 4);
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 3);
+ }
+
+ @Test
+ public void testIncreaseNumServers()
+ throws Exception {
+ _multiStageQueryThrottler = new MultiStageQueryThrottler();
+ _multiStageQueryThrottler.init(_helixManager);
+
+ for (int i = 0; i < 4; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
+
+ // Increase the number of servers
+ when(_helixAdmin.getInstancesInCluster(eq("testCluster"))).thenReturn(
+ List.of("Broker_0", "Broker_1", "Server_0", "Server_1", "Server_2"));
+ _multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.EXTERNAL_VIEW);
+
+ // Ensure that the permits on this broker are increased to account for the new server
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 2);
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 1);
+ }
+
+ @Test
+ public void testDecreaseNumServers()
+ throws Exception {
+ _multiStageQueryThrottler = new MultiStageQueryThrottler();
+ _multiStageQueryThrottler.init(_helixManager);
+
+ for (int i = 0; i < 4; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
+
+ // Decrease the number of servers
+ when(_helixAdmin.getInstancesInCluster(eq("testCluster"))).thenReturn(List.of("Broker_0", "Broker_1", "Server_0"));
+ _multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.EXTERNAL_VIEW);
+
+ // Verify that the number of permits on this broker have been reduced to account for the removed server
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), -2);
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+
+ for (int i = 0; i < 4; i++) {
+ _multiStageQueryThrottler.release();
+ }
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 2);
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testIncreaseMaxConcurrentQueries()
+ throws Exception {
+ _multiStageQueryThrottler = new MultiStageQueryThrottler();
+ _multiStageQueryThrottler.init(_helixManager);
+
+ for (int i = 0; i < 4; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
+
+ // Increase the value of cluster config maxConcurrentQueries
+ when(_helixAdmin.getConfig(any(),
+ eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES))))
+ .thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "8"));
+ _multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.CLUSTER_CONFIG);
+
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 4);
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testDecreaseMaxConcurrentQueries()
+ throws Exception {
+ _multiStageQueryThrottler = new MultiStageQueryThrottler();
+ _multiStageQueryThrottler.init(_helixManager);
+
+ for (int i = 0; i < 4; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
+
+ // Decrease the value of cluster config maxConcurrentQueries
+ when(_helixAdmin.getConfig(any(),
+ eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES)))
+ ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "3"));
+ _multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.CLUSTER_CONFIG);
+
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), -1);
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+
+ for (int i = 0; i < 4; i++) {
+ _multiStageQueryThrottler.release();
+ }
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 3);
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testEnabledToDisabledTransitionDisallowed()
+ throws Exception {
+ _multiStageQueryThrottler = new MultiStageQueryThrottler();
+ _multiStageQueryThrottler.init(_helixManager);
+
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 4);
+
+ // Disable the throttling mechanism via cluster config change
+ when(_helixAdmin.getConfig(any(),
+ eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES)))
+ ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "-1"));
+ _multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.CLUSTER_CONFIG);
+
+ // Should not be allowed to disable the throttling mechanism if it is enabled during startup
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 4);
+
+ for (int i = 0; i < 4; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testDisabledToEnabledTransitionDisallowed()
+ throws Exception {
+ when(_helixAdmin.getConfig(any(),
+ eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES)))
+ ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "-1"));
+ _multiStageQueryThrottler = new MultiStageQueryThrottler();
+ _multiStageQueryThrottler.init(_helixManager);
+
+ // If maxConcurrentQueries is <= 0, the throttling mechanism should be "disabled" and any attempt to acquire should
+ // succeed
+ for (int i = 0; i < 100; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+
+ // Enable the throttling mechanism via cluster config change
+ when(_helixAdmin.getConfig(any(),
+ eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES)))
+ ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "4"));
+ _multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.CLUSTER_CONFIG);
+
+ // Should not be allowed to enable the throttling mechanism if it is disabled during startup
+ for (int i = 0; i < 100; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+ }
+
+ @Test
+ public void testMaxConcurrentQueriesSmallerThanNumBrokers()
+ throws Exception {
+ when(_helixAdmin.getConfig(any(),
+ eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES)))
+ ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "2"));
+ when(_helixAdmin.getInstancesInCluster(eq("testCluster"))).thenReturn(
+ List.of("Broker_0", "Broker_1", "Broker_2", "Broker_3", "Server_0", "Server_1"));
+ _multiStageQueryThrottler = new MultiStageQueryThrottler();
+ _multiStageQueryThrottler.init(_helixManager);
+
+ // The total permits should be capped at 1 even though maxConcurrentQueries * numServers / numBrokers is 0.
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 1);
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+}
diff --git a/pinot-clients/pinot-java-client/pom.xml b/pinot-clients/pinot-java-client/pom.xml
index 4678af3e4f5e..72f0d1932e15 100644
--- a/pinot-clients/pinot-java-client/pom.xml
+++ b/pinot-clients/pinot-java-client/pom.xml
@@ -24,7 +24,7 @@
pinot-clients
org.apache.pinot
- 1.3.0-SNAPSHOT
+ 1.4.0-SNAPSHOT
pinot-java-client
Pinot Java Client
diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java
index 3b2a789eac02..c2e1b98caf1a 100644
--- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java
+++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java
@@ -190,20 +190,14 @@ protected void updateBrokerData()
}
public String getBroker(String... tableNames) {
- List brokers = null;
// If tableNames is not-null, filter out nulls
- tableNames =
- tableNames == null ? tableNames : Arrays.stream(tableNames).filter(Objects::nonNull).toArray(String[]::new);
- if (!(tableNames == null || tableNames.length == 0)) {
- // returning list of common brokers hosting all the tables.
- brokers = BrokerSelectorUtils.getTablesCommonBrokers(Arrays.asList(tableNames),
- _brokerData.getTableToBrokerMap());
+ tableNames = tableNames == null ? tableNames
+ : Arrays.stream(tableNames).filter(Objects::nonNull).toArray(String[]::new);
+ if (tableNames == null || tableNames.length == 0) {
+ List brokers = _brokerData.getBrokers();
+ return brokers.get(ThreadLocalRandom.current().nextInt(brokers.size()));
}
-
- if (brokers == null || brokers.isEmpty()) {
- brokers = _brokerData.getBrokers();
- }
- return brokers.get(ThreadLocalRandom.current().nextInt(brokers.size()));
+ return BrokerSelectorUtils.getRandomBroker(Arrays.asList(tableNames), _brokerData.getTableToBrokerMap());
}
public List getBrokers() {
diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
index 6683b6a5fc60..498a68ce0be4 100644
--- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
+++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
@@ -91,10 +91,10 @@ private void refresh() {
public String selectBroker(String... tableNames) {
if (!(tableNames == null || tableNames.length == 0 || tableNames[0] == null)) {
// getting list of brokers hosting all the tables.
- List list = BrokerSelectorUtils.getTablesCommonBrokers(Arrays.asList(tableNames),
+ String randomBroker = BrokerSelectorUtils.getRandomBroker(Arrays.asList(tableNames),
_tableToBrokerListMapRef.get());
- if (list != null && !list.isEmpty()) {
- return list.get(ThreadLocalRandom.current().nextInt(list.size()));
+ if (randomBroker != null) {
+ return randomBroker;
}
}
diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java
index e3a1df44db7b..c465f101aa08 100644
--- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java
+++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java
@@ -19,9 +19,13 @@
package org.apache.pinot.client.utils;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
import org.apache.pinot.client.ExternalViewReader;
@@ -34,35 +38,52 @@ private BrokerSelectorUtils() {
*
* @param tableNames: List of table names.
* @param brokerData: map holding data for table hosting on brokers.
- * @return list of common brokers hosting all the tables.
+ * @return list of common brokers hosting all the tables or null if no common brokers found.
+ * @deprecated Use {@link #getTablesCommonBrokersSet(List, Map)} instead. It is more efficient and its semantics are
+ * clearer (ie it returns an empty set instead of null if no common brokers are found).
*/
- public static List getTablesCommonBrokers(List tableNames, Map> brokerData) {
- List> tablesBrokersList = new ArrayList<>();
- for (String name: tableNames) {
- String tableName = getTableNameWithoutSuffix(name);
- int idx = tableName.indexOf('.');
-
- if (brokerData.containsKey(tableName)) {
- tablesBrokersList.add(brokerData.get(tableName));
- } else if (idx > 0) {
- // In case tableName is formatted as .
- tableName = tableName.substring(idx + 1);
- tablesBrokersList.add(brokerData.get(tableName));
- }
+ @Nullable
+ @Deprecated
+ public static List getTablesCommonBrokers(@Nullable List tableNames,
+ Map> brokerData) {
+ Set tablesCommonBrokersSet = getTablesCommonBrokersSet(tableNames, brokerData);
+ if (tablesCommonBrokersSet == null || tablesCommonBrokersSet.isEmpty()) {
+ return null;
}
+ return new ArrayList<>(tablesCommonBrokersSet);
+ }
- // return null if tablesBrokersList is empty or contains null
- if (tablesBrokersList.isEmpty()
- || tablesBrokersList.stream().anyMatch(Objects::isNull)) {
+ /**
+ * Returns a random broker from the common brokers hosting all the tables.
+ */
+ @Nullable
+ public static String getRandomBroker(@Nullable List tableNames, Map> brokerData) {
+ Set tablesCommonBrokersSet = getTablesCommonBrokersSet(tableNames, brokerData);
+ if (tablesCommonBrokersSet.isEmpty()) {
return null;
}
+ return tablesCommonBrokersSet.stream()
+ .skip(ThreadLocalRandom.current().nextInt(tablesCommonBrokersSet.size()))
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("No broker found"));
+ }
- // Make a copy of the brokersList of the first table. retainAll does inplace modifications.
- // So lists from brokerData should not be used directly.
- List commonBrokers = new ArrayList<>(tablesBrokersList.get(0));
- for (int i = 1; i < tablesBrokersList.size(); i++) {
- commonBrokers.retainAll(tablesBrokersList.get(i));
+ /**
+ *
+ * @param tableNames: List of table names.
+ * @param brokerData: map holding data for table hosting on brokers.
+ * @return set of common brokers hosting all the tables
+ */
+ public static Set getTablesCommonBrokersSet(
+ @Nullable List tableNames, Map> brokerData) {
+ if (tableNames == null || tableNames.isEmpty()) {
+ return Collections.emptySet();
+ }
+ HashSet commonBrokers = getBrokers(tableNames.get(0), brokerData);
+ for (int i = 1; i < tableNames.size() && !commonBrokers.isEmpty(); i++) {
+ commonBrokers.retainAll(getBrokers(tableNames.get(i), brokerData));
}
+
return commonBrokers;
}
@@ -71,4 +92,28 @@ private static String getTableNameWithoutSuffix(String tableName) {
tableName.replace(ExternalViewReader.OFFLINE_SUFFIX, "").
replace(ExternalViewReader.REALTIME_SUFFIX, "");
}
+
+ /**
+ * Returns the brokers for the given table name.
+ *
+ * This means that an empty set is returned if there are no brokers for the given table name.
+ */
+ private static HashSet getBrokers(String tableName, Map> brokerData) {
+ String tableNameWithoutSuffix = getTableNameWithoutSuffix(tableName);
+ int idx = tableNameWithoutSuffix.indexOf('.');
+
+ List brokers = brokerData.get(tableNameWithoutSuffix);
+ if (brokers != null) {
+ return new HashSet<>(brokers);
+ } else if (idx > 0) {
+ // TODO: This is probably unnecessary and even wrong. `brokerData` should include the fully qualified name.
+ // In case tableNameWithoutSuffix is formatted as . and not found in the fully qualified name
+ tableNameWithoutSuffix = tableNameWithoutSuffix.substring(idx + 1);
+ List brokersWithoutDb = brokerData.get(tableNameWithoutSuffix);
+ if (brokersWithoutDb != null) {
+ return new HashSet<>(brokersWithoutDb);
+ }
+ }
+ return new HashSet<>();
+ }
}
diff --git a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/DynamicBrokerSelectorTest.java b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/DynamicBrokerSelectorTest.java
index d52438ab542c..986b4773c7c2 100644
--- a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/DynamicBrokerSelectorTest.java
+++ b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/DynamicBrokerSelectorTest.java
@@ -152,4 +152,24 @@ public void testCloseZkClient() {
Mockito.verify(_mockZkClient, times(1)).close();
}
+
+ @Test
+ public void testSelectBrokerWithInvalidTable() {
+ Map> tableToBrokerListMap = new HashMap<>();
+ tableToBrokerListMap.put("table1", Collections.singletonList("broker1"));
+ when(_mockExternalViewReader.getTableToBrokersMap()).thenReturn(tableToBrokerListMap);
+ _dynamicBrokerSelectorUnderTest.handleDataChange("dataPath", "data");
+ String result = _dynamicBrokerSelectorUnderTest.selectBroker("invalidTable");
+ assertEquals(result, "broker1");
+ }
+
+ @Test
+ public void testSelectBrokerWithTwoTablesOneInvalid() {
+ Map> tableToBrokerListMap = new HashMap<>();
+ tableToBrokerListMap.put("table1", Collections.singletonList("broker1"));
+ when(_mockExternalViewReader.getTableToBrokersMap()).thenReturn(tableToBrokerListMap);
+ _dynamicBrokerSelectorUnderTest.handleDataChange("dataPath", "data");
+ String result = _dynamicBrokerSelectorUnderTest.selectBroker("table1", "invalidTable");
+ assertEquals(result, "broker1");
+ }
}
diff --git a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/utils/BrokerSelectorUtilsTest.java b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/utils/BrokerSelectorUtilsTest.java
new file mode 100644
index 000000000000..512a0a3c862a
--- /dev/null
+++ b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/utils/BrokerSelectorUtilsTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.utils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+
+public class BrokerSelectorUtilsTest {
+
+ HashMap> _brokerData = new HashMap<>();
+ @Test
+ public void getTablesCommonBrokersSetNullTables() {
+ Set tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(null, _brokerData);
+ Assert.assertEquals(tableSet, Set.of());
+ }
+
+ @Test
+ public void getTablesCommonBrokersListNullTables() {
+ List tableList = BrokerSelectorUtils.getTablesCommonBrokers(null, _brokerData);
+ Assert.assertNull(tableList);
+ }
+
+ @Test
+ public void getTablesCommonBrokersSetEmptyTables() {
+ Set tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(List.of(), _brokerData);
+ Assert.assertEquals(tableSet, Set.of());
+ }
+
+ @Test
+ public void getTablesCommonBrokersListEmptyTables() {
+ List tableList = BrokerSelectorUtils.getTablesCommonBrokers(List.of(), _brokerData);
+ Assert.assertNull(tableList);
+ }
+
+ @Test
+ public void getTablesCommonBrokersSetNotExistentTable() {
+ Set tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(List.of("notExistent"), _brokerData);
+ Assert.assertEquals(tableSet, Set.of());
+ }
+
+ @Test
+ public void getTablesCommonBrokersListNotExistentTable() {
+ List tableList = BrokerSelectorUtils.getTablesCommonBrokers(List.of("notExistent"), _brokerData);
+ Assert.assertNull(tableList);
+ }
+
+ @Test
+ public void getTablesCommonBrokersSetOneTable() {
+ _brokerData.put("table1", List.of("broker1"));
+ Set tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(List.of("table1"), _brokerData);
+ Assert.assertEquals(tableSet, Set.of("broker1"));
+ }
+
+ @Test
+ public void getTablesCommonBrokersListOneTable() {
+ _brokerData.put("table1", List.of("broker1"));
+ List tableList = BrokerSelectorUtils.getTablesCommonBrokers(List.of("table1"), _brokerData);
+ Assert.assertNotNull(tableList);
+ Assert.assertEquals(tableList, List.of("broker1"));
+ }
+
+ @Test
+ public void getTablesCommonBrokersSetTwoTables() {
+ _brokerData.put("table1", List.of("broker1"));
+ _brokerData.put("table2", List.of("broker1"));
+ Set tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(List.of("table1", "table2"), _brokerData);
+ Assert.assertNotNull(tableSet);
+ Assert.assertEquals(tableSet, Set.of("broker1"));
+ }
+
+ @Test
+ public void getTablesCommonBrokersListTwoTables() {
+ _brokerData.put("table1", List.of("broker1"));
+ _brokerData.put("table2", List.of("broker1"));
+ List tableList = BrokerSelectorUtils.getTablesCommonBrokers(List.of("table1", "table2"), _brokerData);
+ Assert.assertNotNull(tableList);
+ Assert.assertEquals(tableList, List.of("broker1"));
+ }
+
+ @Test
+ public void getTablesCommonBrokersSetTwoTablesDifferentBrokers() {
+ _brokerData.put("table1", List.of("broker1"));
+ _brokerData.put("table2", List.of("broker2"));
+ Set tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(List.of("table1", "table2"), _brokerData);
+ Assert.assertEquals(tableSet, Set.of());
+ }
+
+ @Test
+ public void getTablesCommonBrokersListTwoTablesDifferentBrokers() {
+ _brokerData.put("table1", List.of("broker1"));
+ _brokerData.put("table2", List.of("broker2"));
+ List tableList = BrokerSelectorUtils.getTablesCommonBrokers(List.of("table1", "table2"), _brokerData);
+ Assert.assertNull(tableList);
+ }
+
+ @AfterMethod
+ public void tearDown() {
+ _brokerData.clear();
+ }
+}
diff --git a/pinot-clients/pinot-jdbc-client/pom.xml b/pinot-clients/pinot-jdbc-client/pom.xml
index 4dbc070ff367..210f8fc8e8b1 100644
--- a/pinot-clients/pinot-jdbc-client/pom.xml
+++ b/pinot-clients/pinot-jdbc-client/pom.xml
@@ -24,7 +24,7 @@
pinot-clients
org.apache.pinot
- 1.3.0-SNAPSHOT
+ 1.4.0-SNAPSHOT
pinot-jdbc-client
Pinot JDBC Client
diff --git a/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/utils/DateTimeUtils.java b/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/utils/DateTimeUtils.java
index 3ca537b518fe..7e9b4df15233 100644
--- a/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/utils/DateTimeUtils.java
+++ b/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/utils/DateTimeUtils.java
@@ -32,48 +32,49 @@ private DateTimeUtils() {
private static final String TIMESTAMP_FORMAT_STR = "yyyy-MM-dd HH:mm:ss";
private static final String DATE_FORMAT_STR = "yyyy-MM-dd";
- private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat(DATE_FORMAT_STR);
- private static final SimpleDateFormat TIMESTAMP_FORMAT = new SimpleDateFormat(TIMESTAMP_FORMAT_STR);
+ private static final ThreadLocal DATE_FORMAT =
+ ThreadLocal.withInitial(() -> new SimpleDateFormat(DATE_FORMAT_STR));
+ private static final ThreadLocal TIMESTAMP_FORMAT =
+ ThreadLocal.withInitial(() -> new SimpleDateFormat(TIMESTAMP_FORMAT_STR));
public static Date getDateFromString(String value, Calendar cal)
throws ParseException {
- DATE_FORMAT.setTimeZone(cal.getTimeZone());
- java.util.Date date = DATE_FORMAT.parse(value);
- Date sqlDate = new Date(date.getTime());
- return sqlDate;
+ SimpleDateFormat dateFormat = DATE_FORMAT.get();
+ dateFormat.setTimeZone(cal.getTimeZone());
+ java.util.Date date = dateFormat.parse(value);
+ return new Date(date.getTime());
}
public static Time getTimeFromString(String value, Calendar cal)
throws ParseException {
- TIMESTAMP_FORMAT.setTimeZone(cal.getTimeZone());
- java.util.Date date = TIMESTAMP_FORMAT.parse(value);
- Time sqlTime = new Time(date.getTime());
- return sqlTime;
+ SimpleDateFormat timestampFormat = TIMESTAMP_FORMAT.get();
+ timestampFormat.setTimeZone(cal.getTimeZone());
+ java.util.Date date = timestampFormat.parse(value);
+ return new Time(date.getTime());
}
public static Timestamp getTimestampFromString(String value, Calendar cal)
throws ParseException {
- TIMESTAMP_FORMAT.setTimeZone(cal.getTimeZone());
- java.util.Date date = TIMESTAMP_FORMAT.parse(value);
- Timestamp sqlTime = new Timestamp(date.getTime());
- return sqlTime;
+ SimpleDateFormat timestampFormat = TIMESTAMP_FORMAT.get();
+ timestampFormat.setTimeZone(cal.getTimeZone());
+ java.util.Date date = timestampFormat.parse(value);
+ return new Timestamp(date.getTime());
}
public static Timestamp getTimestampFromLong(Long value) {
- Timestamp sqlTime = new Timestamp(value);
- return sqlTime;
+ return new Timestamp(value);
}
public static String dateToString(Date date) {
- return DATE_FORMAT.format(date.getTime());
+ return DATE_FORMAT.get().format(date.getTime());
}
public static String timeToString(Time time) {
- return TIMESTAMP_FORMAT.format(time.getTime());
+ return TIMESTAMP_FORMAT.get().format(time.getTime());
}
public static String timeStampToString(Timestamp timestamp) {
- return TIMESTAMP_FORMAT.format(timestamp.getTime());
+ return TIMESTAMP_FORMAT.get().format(timestamp.getTime());
}
public static long timeStampToLong(Timestamp timestamp) {
diff --git a/pinot-clients/pinot-jdbc-client/src/test/java/org/apache/pinot/client/PinotResultSetTest.java b/pinot-clients/pinot-jdbc-client/src/test/java/org/apache/pinot/client/PinotResultSetTest.java
index 255d14d47087..c62a9b9e5465 100644
--- a/pinot-clients/pinot-jdbc-client/src/test/java/org/apache/pinot/client/PinotResultSetTest.java
+++ b/pinot-clients/pinot-jdbc-client/src/test/java/org/apache/pinot/client/PinotResultSetTest.java
@@ -26,6 +26,10 @@
import java.util.Collections;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils;
import org.apache.pinot.client.utils.DateTimeUtils;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -139,7 +143,7 @@ public void testFetchDates()
@Test
public void testFetchBigDecimals()
- throws Exception {
+ throws Exception {
ResultSetGroup resultSetGroup = getResultSet(TEST_RESULT_SET_RESOURCE);
ResultSet resultSet = resultSetGroup.getResultSet(0);
PinotResultSet pinotResultSet = new PinotResultSet(resultSet);
@@ -207,6 +211,79 @@ public void testGetCalculatedScale() {
Assert.assertEquals(calculatedResult, 3);
}
+ @Test
+ public void testDateFromStringConcurrent()
+ throws Throwable {
+ ExecutorService executorService = Executors.newFixedThreadPool(10);
+ AtomicReference throwable = new AtomicReference<>();
+ for (int i = 0; i < 10; i++) {
+ executorService.submit(() -> {
+ try {
+ Assert.assertEquals(DateTimeUtils.getDateFromString("2020-01-01", Calendar.getInstance()).toString(),
+ "2020-01-01");
+ } catch (Throwable t) {
+ throwable.set(t);
+ }
+ });
+ }
+
+ executorService.shutdown();
+ executorService.awaitTermination(1000, TimeUnit.MILLISECONDS);
+
+ if (throwable.get() != null) {
+ throw throwable.get();
+ }
+ }
+
+ @Test
+ public void testTimeFromStringConcurrent()
+ throws Throwable {
+ ExecutorService executorService = Executors.newFixedThreadPool(10);
+ AtomicReference throwable = new AtomicReference<>();
+ for (int i = 0; i < 10; i++) {
+ executorService.submit(() -> {
+ try {
+ Assert.assertEquals(DateTimeUtils.getTimeFromString("2020-01-01 12:00:00", Calendar.getInstance()).toString(),
+ "12:00:00");
+ } catch (Throwable t) {
+ throwable.set(t);
+ }
+ });
+ }
+
+ executorService.shutdown();
+ executorService.awaitTermination(1000, TimeUnit.MILLISECONDS);
+
+ if (throwable.get() != null) {
+ throw throwable.get();
+ }
+ }
+
+ @Test
+ public void testTimestampFromStringConcurrent()
+ throws Throwable {
+ ExecutorService executorService = Executors.newFixedThreadPool(10);
+ AtomicReference throwable = new AtomicReference<>();
+ for (int i = 0; i < 10; i++) {
+ executorService.submit(() -> {
+ try {
+ Assert.assertEquals(
+ DateTimeUtils.getTimestampFromString("2020-01-01 12:00:00", Calendar.getInstance()).toString(),
+ "2020-01-01 12:00:00.0");
+ } catch (Throwable t) {
+ throwable.set(t);
+ }
+ });
+ }
+
+ executorService.shutdown();
+ executorService.awaitTermination(1000, TimeUnit.MILLISECONDS);
+
+ if (throwable.get() != null) {
+ throw throwable.get();
+ }
+ }
+
private ResultSetGroup getResultSet(String resourceName) {
_dummyJsonTransport._resource = resourceName;
Connection connection = ConnectionFactory.fromHostList(Collections.singletonList("dummy"), _dummyJsonTransport);
diff --git a/pinot-clients/pom.xml b/pinot-clients/pom.xml
index 66cb0f2f30e7..40368b3ed7a0 100644
--- a/pinot-clients/pom.xml
+++ b/pinot-clients/pom.xml
@@ -24,7 +24,7 @@
pinot
org.apache.pinot
- 1.3.0-SNAPSHOT
+ 1.4.0-SNAPSHOT
pinot-clients
pom
diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml
index af2001a9e14c..59dc5dd7a9f0 100644
--- a/pinot-common/pom.xml
+++ b/pinot-common/pom.xml
@@ -24,7 +24,7 @@
pinot
org.apache.pinot
- 1.3.0-SNAPSHOT
+ 1.4.0-SNAPSHOT
pinot-common
Pinot Common
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawDoubleSingleColumnDistinctOnlyExecutor.java b/pinot-common/src/main/java/org/apache/pinot/common/concurrency/AdjustableSemaphore.java
similarity index 50%
rename from pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawDoubleSingleColumnDistinctOnlyExecutor.java
rename to pinot-common/src/main/java/org/apache/pinot/common/concurrency/AdjustableSemaphore.java
index ded36ea9a354..2bbc25e42a0d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawDoubleSingleColumnDistinctOnlyExecutor.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/concurrency/AdjustableSemaphore.java
@@ -16,26 +16,36 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.query.distinct.raw;
+package org.apache.pinot.common.concurrency;
-import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.core.query.distinct.DistinctExecutor;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.Semaphore;
/**
- * {@link DistinctExecutor} for distinct only queries with single raw DOUBLE column.
+ * A semaphore that allows adjusting the number of permits in a non-blocking way.
*/
-public class RawDoubleSingleColumnDistinctOnlyExecutor extends BaseRawDoubleSingleColumnDistinctExecutor {
+public class AdjustableSemaphore extends Semaphore {
- public RawDoubleSingleColumnDistinctOnlyExecutor(ExpressionContext expression, DataType dataType, int limit,
- boolean nullHandlingEnabled) {
- super(expression, dataType, limit, nullHandlingEnabled);
+ private int _totalPermits;
+
+ public AdjustableSemaphore(int permits) {
+ super(permits);
+ _totalPermits = permits;
+ }
+
+ public AdjustableSemaphore(int permits, boolean fair) {
+ super(permits, fair);
+ _totalPermits = permits;
}
- @Override
- protected boolean add(double value) {
- _valueSet.add(value);
- return _valueSet.size() >= _limit;
+ public void setPermits(int permits) {
+ Preconditions.checkArgument(permits > 0, "Permits must be a positive integer");
+ if (permits < _totalPermits) {
+ reducePermits(_totalPermits - permits);
+ } else if (permits > _totalPermits) {
+ release(permits - _totalPermits);
+ }
+ _totalPermits = permits;
}
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/cursors/AbstractResponseStore.java b/pinot-common/src/main/java/org/apache/pinot/common/cursors/AbstractResponseStore.java
new file mode 100644
index 000000000000..186a668d651a
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/cursors/AbstractResponseStore.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.cursors;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.TimeUtils;
+
+
+public abstract class AbstractResponseStore implements ResponseStore {
+
+ protected String _brokerHost;
+ protected int _brokerPort;
+ protected String _brokerId;
+ protected BrokerMetrics _brokerMetrics;
+ protected long _expirationIntervalInMs;
+
+ protected void init(String brokerHost, int brokerPort, String brokerId, BrokerMetrics brokerMetrics,
+ String expirationTime) {
+ _brokerMetrics = brokerMetrics;
+ _brokerHost = brokerHost;
+ _brokerPort = brokerPort;
+ _brokerId = brokerId;
+ _expirationIntervalInMs = TimeUtils.convertPeriodToMillis(expirationTime);
+ }
+
+ /**
+ * Initialize the store.
+ * @param config Subset configuration of pinot.broker.cursor.response.store.<type>
+ * @param brokerHost Hostname of the broker where ResponseStore is created
+ * @param brokerPort Port of the broker where the ResponseStore is created
+ * @param brokerId ID of the broker where the ResponseStore is created.
+ * @param brokerMetrics Metrics utility to track cursor metrics.
+ */
+ public abstract void init(PinotConfiguration config, String brokerHost, int brokerPort, String brokerId,
+ BrokerMetrics brokerMetrics, String expirationTime)
+ throws Exception;
+
+ /**
+ * Get the hostname of the broker where the query is executed
+ * @return String containing the hostname
+ */
+ protected String getBrokerHost() {
+ return _brokerHost;
+ }
+
+ /**
+ * Get the port of the broker where the query is executed
+ * @return int containing the port
+ */
+ protected int getBrokerPort() {
+ return _brokerPort;
+ }
+
+ /**
+ * Get the expiration interval of a query response.
+ * @return long containing the expiration interval.
+ */
+ protected long getExpirationIntervalInMs() {
+ return _expirationIntervalInMs;
+ }
+
+ /**
+ * Write a CursorResponse
+ * @param requestId Request ID of the response
+ * @param response The response to write
+ * @throws Exception Thrown if there is any error while writing the response
+ */
+ protected abstract void writeResponse(String requestId, CursorResponse response)
+ throws Exception;
+
+ /**
+ * Write a {@link ResultTable} to the store
+ * @param requestId Request ID of the response
+ * @param resultTable The {@link ResultTable} of the query
+ * @throws Exception Thrown if there is any error while writing the result table.
+ * @return Returns the number of bytes written
+ */
+ protected abstract long writeResultTable(String requestId, ResultTable resultTable)
+ throws Exception;
+
+ /**
+ * Read the response (excluding the {@link ResultTable}) from the store
+ * @param requestId Request ID of the response
+ * @return CursorResponse (without the {@link ResultTable})
+ * @throws Exception Thrown if there is any error while reading the response
+ */
+ public abstract CursorResponse readResponse(String requestId)
+ throws Exception;
+
+ /**
+ * Read the {@link ResultTable} of a query response
+ * @param requestId Request ID of the query
+ * @param offset Offset of the result slice
+ * @param numRows Number of rows required in the slice
+ * @return {@link ResultTable} of the query
+ * @throws Exception Thrown if there is any error while reading the result table
+ */
+ protected abstract ResultTable readResultTable(String requestId, int offset, int numRows)
+ throws Exception;
+
+ protected abstract boolean deleteResponseImpl(String requestId)
+ throws Exception;
+
+ /**
+ * Stores the response in the store. {@link CursorResponse} and {@link ResultTable} are stored separately.
+ * @param response Response to be stored
+ * @throws Exception Thrown if there is any error while storing the response.
+ */
+ public void storeResponse(BrokerResponse response)
+ throws Exception {
+ String requestId = response.getRequestId();
+
+ CursorResponse cursorResponse = new CursorResponseNative(response);
+
+ long submissionTimeMs = System.currentTimeMillis();
+ // Initialize all CursorResponse specific metadata
+ cursorResponse.setBrokerHost(getBrokerHost());
+ cursorResponse.setBrokerPort(getBrokerPort());
+ cursorResponse.setSubmissionTimeMs(submissionTimeMs);
+ cursorResponse.setExpirationTimeMs(submissionTimeMs + getExpirationIntervalInMs());
+ cursorResponse.setOffset(0);
+ cursorResponse.setNumRows(response.getNumRowsResultSet());
+
+ try {
+ long bytesWritten = writeResultTable(requestId, response.getResultTable());
+
+ // Remove the resultTable from the response as it is serialized in a data file.
+ cursorResponse.setResultTable(null);
+ cursorResponse.setBytesWritten(bytesWritten);
+ writeResponse(requestId, cursorResponse);
+ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.CURSOR_RESPONSE_STORE_SIZE, bytesWritten);
+ } catch (Exception e) {
+ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.CURSOR_WRITE_EXCEPTION, 1);
+ deleteResponse(requestId);
+ throw e;
+ }
+ }
+
+ /**
+ * Reads the response from the store and populates it with a slice of the {@link ResultTable}
+ * @param requestId Request ID of the query
+ * @param offset Offset of the result slice
+ * @param numRows Number of rows required in the slice
+ * @return A CursorResponse with a slice of the {@link ResultTable}
+ * @throws Exception Thrown if there is any error during the operation.
+ */
+ public CursorResponse handleCursorRequest(String requestId, int offset, int numRows)
+ throws Exception {
+
+ CursorResponse response;
+ ResultTable resultTable;
+
+ try {
+ response = readResponse(requestId);
+ } catch (Exception e) {
+ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.CURSOR_READ_EXCEPTION, 1);
+ throw e;
+ }
+
+ int totalTableRows = response.getNumRowsResultSet();
+
+ if (totalTableRows == 0 && offset == 0) {
+ // If sum records is 0, then result set is empty.
+ response.setResultTable(null);
+ response.setOffset(0);
+ response.setNumRows(0);
+ return response;
+ } else if (offset >= totalTableRows) {
+ throw new RuntimeException("Offset " + offset + " should be lesser than totalRecords " + totalTableRows);
+ }
+
+ long fetchStartTime = System.currentTimeMillis();
+ try {
+ resultTable = readResultTable(requestId, offset, numRows);
+ } catch (Exception e) {
+ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.CURSOR_READ_EXCEPTION, 1);
+ throw e;
+ }
+
+ response.setResultTable(resultTable);
+ response.setCursorFetchTimeMs(System.currentTimeMillis() - fetchStartTime);
+ response.setOffset(offset);
+ response.setNumRows(resultTable.getRows().size());
+ response.setNumRowsResultSet(totalTableRows);
+ return response;
+ }
+
+ /**
+ * Returns the list of responses created by the broker.
+ * Note that the ResponseStore object in a broker should only return responses created by it.
+ * @return A list of CursorResponse objects created by the specific broker
+ * @throws Exception Thrown if there is an error during an operation.
+ */
+ public List getAllStoredResponses()
+ throws Exception {
+ List responses = new ArrayList<>();
+
+ for (String requestId : getAllStoredRequestIds()) {
+ responses.add(readResponse(requestId));
+ }
+
+ return responses;
+ }
+
+ @Override
+ public boolean deleteResponse(String requestId) throws Exception {
+ if (!exists(requestId)) {
+ return false;
+ }
+
+ long bytesWritten = readResponse(requestId).getBytesWritten();
+ boolean isSucceeded = deleteResponseImpl(requestId);
+ if (isSucceeded) {
+ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.CURSOR_RESPONSE_STORE_SIZE, bytesWritten * -1);
+ }
+ return isSucceeded;
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArithmeticFunctions.java b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArithmeticFunctions.java
index 27c4952b1fcf..d27a3fa6cccd 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArithmeticFunctions.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArithmeticFunctions.java
@@ -40,11 +40,59 @@ public static double divide(double a, double b, double defaultValue) {
return (b == 0) ? defaultValue : a / b;
}
+ @ScalarFunction
+ public static long intDiv(double a, double b) {
+ return (long) Math.floor(a / b);
+ }
+
+ @ScalarFunction
+ public static long intDivOrZero(double a, double b) {
+ //Same as intDiv but returns zero when dividing by zero or when dividing a minimal negative number by minus one.
+ return (b == 0 || (a == Long.MIN_VALUE && b == -1)) ? 0 : intDiv(a, b);
+ }
+
+ @ScalarFunction
+ public static int isFinite(double value) {
+ return Double.isFinite(value) ? 1 : 0;
+ }
+
+ @ScalarFunction
+ public static int isInfinite(double value) {
+ return Double.isInfinite(value) ? 1 : 0;
+ }
+
+ @ScalarFunction
+ public static double ifNotFinite(double valueToCheck, double defaultValue) {
+ return Double.isFinite(valueToCheck) ? valueToCheck : defaultValue;
+ }
+
+ @ScalarFunction
+ public static int isNaN(double value) {
+ return Double.isNaN(value) ? 1 : 0;
+ }
+
@ScalarFunction
public static double mod(double a, double b) {
return a % b;
}
+ @ScalarFunction
+ public static double moduloOrZero(double a, double b) {
+ //Same as mod but returns zero when dividing by zero or when dividing a minimal negative number by minus one.
+ return (b == 0 || (a == Long.MIN_VALUE && b == -1)) ? 0 : mod(a, b);
+ }
+
+ @ScalarFunction
+ public static double positiveModulo(double a, double b) {
+ double result = a % b;
+ return result >= 0 ? result : result + Math.abs(b);
+ }
+
+ @ScalarFunction
+ public static double negate(double a) {
+ return -a;
+ }
+
@ScalarFunction
public static double least(double a, double b) {
return Double.min(a, b);
@@ -117,7 +165,6 @@ public static double power(double a, double exponent) {
return Math.pow(a, exponent);
}
-
// Big Decimal Implementation has been used here to avoid overflows
// when multiplying by Math.pow(10, scale) for rounding
@ScalarFunction
@@ -143,4 +190,33 @@ public static double truncate(double a, int scale) {
public static double truncate(double a) {
return Math.signum(a) * Math.floor(Math.abs(a));
}
+
+ @ScalarFunction
+ public static long gcd(long a, long b) {
+ return a == 0 ? Math.abs(b) : gcd(b % a, a);
+ }
+
+ @ScalarFunction
+ public static long lcm(long a, long b) {
+ if (a == 0 || b == 0) {
+ return 0;
+ }
+ return Math.abs(a) / gcd(a, b) * Math.abs(b);
+ }
+
+ @ScalarFunction
+ public static double hypot(double a, double b) {
+ return Math.hypot(a, b);
+ }
+
+ @ScalarFunction
+ public static int byteswapInt(int a) {
+ return Integer.reverseBytes(a);
+ }
+
+ @ScalarFunction
+ public static long byteswapLong(long a) {
+ // Skip the heading 0s in the long value
+ return Long.reverseBytes(a);
+ }
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index ea6a66251ce8..22be35405f4b 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -169,7 +169,27 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
* For each query with at least one window function, this meter is increased as many times as window functions in the
* query.
*/
- WINDOW_COUNT("queries", true),;
+ WINDOW_COUNT("queries", true),
+
+ /**
+ * Number of queries executed with cursors. This count includes queries that use SSE and MSE
+ */
+ CURSOR_QUERIES_GLOBAL("queries", true),
+
+ /**
+ * Number of exceptions when writing a response to the response store
+ */
+ CURSOR_WRITE_EXCEPTION("exceptions", true),
+
+ /**
+ * Number of exceptions when reading a response and result table from the response store
+ */
+ CURSOR_READ_EXCEPTION("exceptions", true),
+
+ /**
+ * The number of bytes stored in the response store. Only the size of the result table is tracked.
+ */
+ CURSOR_RESPONSE_STORE_SIZE("bytes", true);
private final String _brokerMeterName;
private final String _unit;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index cdb99f0f904d..a978219343ec 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -68,6 +68,7 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
NUM_MINION_SUBTASKS_WAITING("NumMinionSubtasksWaiting", true),
NUM_MINION_SUBTASKS_RUNNING("NumMinionSubtasksRunning", true),
NUM_MINION_SUBTASKS_ERROR("NumMinionSubtasksError", true),
+ NUM_MINION_SUBTASKS_UNKNOWN("NumMinionSubtasksUnknown", true),
PERCENT_MINION_SUBTASKS_IN_QUEUE("PercentMinionSubtasksInQueue", true),
PERCENT_MINION_SUBTASKS_IN_ERROR("PercentMinionSubtasksInError", true),
TIER_BACKEND_TABLE_COUNT("TierBackendTableCount", true),
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index b999e7b8e435..7c1826582a70 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -77,6 +77,8 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT("upsertValidDocIdSnapshotCount", false),
UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT("upsertPrimaryKeysInSnapshotCount", false),
REALTIME_INGESTION_OFFSET_LAG("offsetLag", false),
+ REALTIME_INGESTION_UPSTREAM_OFFSET("upstreamOffset", false),
+ REALTIME_INGESTION_CONSUMING_OFFSET("consumingOffset", false),
REALTIME_CONSUMER_DIR_USAGE("bytes", true);
private final String _gaugeName;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/CursorResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/CursorResponse.java
new file mode 100644
index 000000000000..14e65f6fbb4b
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/CursorResponse.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response;
+
+public interface CursorResponse extends BrokerResponse {
+
+ void setBrokerHost(String brokerHost);
+
+ /**
+ * get hostname of the processing broker
+ * @return String containing the hostname
+ */
+ String getBrokerHost();
+
+ void setBrokerPort(int brokerPort);
+
+ /**
+ * get port of the processing broker
+ * @return int containing the port.
+ */
+ int getBrokerPort();
+
+ /**
+ * Set the starting offset of result table slice
+ * @param offset Offset of the result table slice
+ */
+ void setOffset(int offset);
+
+ /**
+ * Current offset in the query result.
+ * Starts from 0.
+ * @return current offset.
+ */
+ int getOffset();
+
+ /**
+ * Set the number of rows in the result table slice.
+ * @param numRows Number of rows in the result table slice
+ */
+ void setNumRows(int numRows);
+
+ /**
+ * Number of rows in the current response.
+ * @return Number of rows in the current response.
+ */
+ int getNumRows();
+
+ /**
+ * Return the time to write the results to the response store.
+ * @return time in milliseconds
+ */
+ long getCursorResultWriteTimeMs();
+
+ /**
+ * Time taken to write cursor results to query storage.
+ * @param cursorResultWriteMs Time in milliseconds.
+ */
+ void setCursorResultWriteTimeMs(long cursorResultWriteMs);
+
+ /**
+ * Return the time to fetch results from the response store.
+ * @return time in milliseconds.
+ */
+ long getCursorFetchTimeMs();
+
+ /**
+ * Set the time taken to fetch a cursor. The time is specific to the current call.
+ * @param cursorFetchTimeMs time in milliseconds
+ */
+ void setCursorFetchTimeMs(long cursorFetchTimeMs);
+
+ /**
+ * Unix timestamp when the query was submitted. The timestamp is used to calculate the expiration time when the
+ * response will be deleted from the response store.
+ * @param submissionTimeMs Unix timestamp when the query was submitted.
+ */
+ void setSubmissionTimeMs(long submissionTimeMs);
+
+ /**
+ * Get the unix timestamp when the query was submitted
+ * @return Submission unix timestamp when the query was submitted
+ */
+ long getSubmissionTimeMs();
+
+ /**
+ * Set the expiration time (unix timestamp) when the response will be deleted from the response store.
+ * @param expirationTimeMs unix timestamp when the response expires in the response store
+ */
+ void setExpirationTimeMs(long expirationTimeMs);
+
+ /**
+ * Get the expiration time (unix timestamp) when the response will be deleted from the response store.
+ * @return expirationTimeMs unix timestamp when the response expires in the response store
+ */
+ long getExpirationTimeMs();
+
+ /**
+ * Set the number of rows in the result set. This is required because BrokerResponse checks the ResultTable
+ * to get the number of rows. However the ResultTable is set to null in CursorResponse. So the numRowsResultSet has to
+ * be remembered.
+ * @param numRowsResultSet Number of rows in the result set.
+ */
+ void setNumRowsResultSet(int numRowsResultSet);
+
+ /**
+ * Set the number of bytes written to the response store when storing the result table.
+ * @param bytesWritten Number of bytes written
+ */
+ void setBytesWritten(long bytesWritten);
+
+ /**
+ * Get the number of bytes written when storing the result table
+ * @return number of bytes written
+ */
+ long getBytesWritten();
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
index 96320b8326a1..4a1f347d16a6 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
@@ -118,7 +118,7 @@ private static PinotBrokerTimeSeriesResponse convertBucketedSeriesBlock(TimeSeri
for (TimeSeries timeSeries : listOfTimeSeries) {
Object[][] values = new Object[timeValues.length][];
for (int i = 0; i < timeValues.length; i++) {
- Object nullableValue = timeSeries.getValues()[i];
+ Object nullableValue = timeSeries.getDoubleValues()[i];
values[i] = new Object[]{timeValues[i], nullableValue == null ? null : nullableValue.toString()};
}
result.add(new PinotBrokerTimeSeriesResponse.Value(metricMap, values));
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/CursorResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/CursorResponseNative.java
new file mode 100644
index 000000000000..d4c220374984
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/CursorResponseNative.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response.broker;
+
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+
+
+@JsonPropertyOrder({
+ "resultTable", "numRowsResultSet", "partialResult", "exceptions", "numGroupsLimitReached", "timeUsedMs",
+ "requestId", "brokerId", "numDocsScanned", "totalDocs", "numEntriesScannedInFilter", "numEntriesScannedPostFilter",
+ "numServersQueried", "numServersResponded", "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched",
+ "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed", "numConsumingSegmentsMatched",
+ "minConsumingFreshnessTimeMs", "numSegmentsPrunedByBroker", "numSegmentsPrunedByServer",
+ "numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit", "numSegmentsPrunedByValue", "brokerReduceTimeMs",
+ "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs",
+ "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
+ "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs",
+ "explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo", "tableQueries",
+ // Fields specific to CursorResponse
+ "offset", "numRows", "cursorResultWriteTimeMs", "cursorFetchTimeMs", "submissionTimeMs", "expirationTimeMs",
+ "brokerHost", "brokerPort", "bytesWritten"
+})
+public class CursorResponseNative extends BrokerResponseNative implements CursorResponse {
+ private int _offset;
+ private int _numRows;
+ private long _cursorResultWriteTimeMs;
+ private long _cursorFetchTimeMs;
+ private long _submissionTimeMs;
+ private long _expirationTimeMs;
+ private String _brokerHost;
+ private int _brokerPort;
+ private long _bytesWritten;
+
+ public CursorResponseNative() {
+ }
+
+ public CursorResponseNative(BrokerResponse response) {
+ // Copy all the member variables of BrokerResponse to CursorResponse.
+ setResultTable(response.getResultTable());
+ setNumRowsResultSet(response.getNumRowsResultSet());
+ setExceptions(response.getExceptions());
+ setNumGroupsLimitReached(response.isNumGroupsLimitReached());
+ setTimeUsedMs(response.getTimeUsedMs());
+ setRequestId(response.getRequestId());
+ setBrokerId(response.getBrokerId());
+ setNumDocsScanned(response.getNumDocsScanned());
+ setTotalDocs(response.getTotalDocs());
+ setNumEntriesScannedInFilter(response.getNumEntriesScannedInFilter());
+ setNumEntriesScannedPostFilter(response.getNumEntriesScannedPostFilter());
+ setNumServersQueried(response.getNumServersQueried());
+ setNumServersResponded(response.getNumServersResponded());
+ setNumSegmentsQueried(response.getNumSegmentsQueried());
+ setNumSegmentsProcessed(response.getNumSegmentsProcessed());
+ setNumSegmentsMatched(response.getNumSegmentsMatched());
+ setNumConsumingSegmentsQueried(response.getNumConsumingSegmentsQueried());
+ setNumConsumingSegmentsProcessed(response.getNumConsumingSegmentsProcessed());
+ setNumConsumingSegmentsMatched(response.getNumConsumingSegmentsMatched());
+ setMinConsumingFreshnessTimeMs(response.getMinConsumingFreshnessTimeMs());
+ setNumSegmentsPrunedByBroker(response.getNumSegmentsPrunedByBroker());
+ setNumSegmentsPrunedByServer(response.getNumSegmentsPrunedByServer());
+ setNumSegmentsPrunedInvalid(response.getNumSegmentsPrunedInvalid());
+ setNumSegmentsPrunedByLimit(response.getNumSegmentsPrunedByLimit());
+ setNumSegmentsPrunedByValue(response.getNumSegmentsPrunedByValue());
+ setBrokerReduceTimeMs(response.getBrokerReduceTimeMs());
+ setOfflineThreadCpuTimeNs(response.getOfflineThreadCpuTimeNs());
+ setRealtimeThreadCpuTimeNs(response.getRealtimeThreadCpuTimeNs());
+ setOfflineSystemActivitiesCpuTimeNs(response.getOfflineSystemActivitiesCpuTimeNs());
+ setRealtimeSystemActivitiesCpuTimeNs(response.getRealtimeSystemActivitiesCpuTimeNs());
+ setOfflineResponseSerializationCpuTimeNs(response.getOfflineResponseSerializationCpuTimeNs());
+ setRealtimeResponseSerializationCpuTimeNs(response.getRealtimeResponseSerializationCpuTimeNs());
+ setExplainPlanNumEmptyFilterSegments(response.getExplainPlanNumEmptyFilterSegments());
+ setExplainPlanNumMatchAllFilterSegments(response.getExplainPlanNumMatchAllFilterSegments());
+ setTraceInfo(response.getTraceInfo());
+ setTablesQueried(response.getTablesQueried());
+ }
+
+ @Override
+ public String getBrokerHost() {
+ return _brokerHost;
+ }
+
+ @Override
+ public void setBrokerHost(String brokerHost) {
+ _brokerHost = brokerHost;
+ }
+
+ @Override
+ public int getBrokerPort() {
+ return _brokerPort;
+ }
+
+ @Override
+ public void setBrokerPort(int brokerPort) {
+ _brokerPort = brokerPort;
+ }
+
+ @Override
+ public void setOffset(int offset) {
+ _offset = offset;
+ }
+
+ @Override
+ public void setNumRows(int numRows) {
+ _numRows = numRows;
+ }
+
+ @Override
+ public void setCursorFetchTimeMs(long cursorFetchTimeMs) {
+ _cursorFetchTimeMs = cursorFetchTimeMs;
+ }
+
+ public long getSubmissionTimeMs() {
+ return _submissionTimeMs;
+ }
+
+ @Override
+ public void setSubmissionTimeMs(long submissionTimeMs) {
+ _submissionTimeMs = submissionTimeMs;
+ }
+
+ public long getExpirationTimeMs() {
+ return _expirationTimeMs;
+ }
+
+ @Override
+ public void setBytesWritten(long bytesWritten) {
+ _bytesWritten = bytesWritten;
+ }
+
+ @Override
+ public long getBytesWritten() {
+ return _bytesWritten;
+ }
+
+ @Override
+ public void setExpirationTimeMs(long expirationTimeMs) {
+ _expirationTimeMs = expirationTimeMs;
+ }
+
+ @Override
+ public int getOffset() {
+ return _offset;
+ }
+
+ @Override
+ public int getNumRows() {
+ return _numRows;
+ }
+
+ @Override
+ public long getCursorResultWriteTimeMs() {
+ return _cursorResultWriteTimeMs;
+ }
+
+ @Override
+ public void setCursorResultWriteTimeMs(long cursorResultWriteMs) {
+ _cursorResultWriteTimeMs = cursorResultWriteMs;
+ }
+
+ @Override
+ public long getCursorFetchTimeMs() {
+ return _cursorFetchTimeMs;
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsMetadataInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsMetadataInfo.java
index ce54424d16ed..500cfff946c8 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsMetadataInfo.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsMetadataInfo.java
@@ -30,17 +30,20 @@ public class ValidDocIdsMetadataInfo {
private final long _totalDocs;
private final String _segmentCrc;
private final ValidDocIdsType _validDocIdsType;
+ private final long _segmentSizeInBytes;
public ValidDocIdsMetadataInfo(@JsonProperty("segmentName") String segmentName,
@JsonProperty("totalValidDocs") long totalValidDocs, @JsonProperty("totalInvalidDocs") long totalInvalidDocs,
@JsonProperty("totalDocs") long totalDocs, @JsonProperty("segmentCrc") String segmentCrc,
- @JsonProperty("validDocIdsType") ValidDocIdsType validDocIdsType) {
+ @JsonProperty("validDocIdsType") ValidDocIdsType validDocIdsType,
+ @JsonProperty("segmentSizeInBytes") long segmentSizeInBytes) {
_segmentName = segmentName;
_totalValidDocs = totalValidDocs;
_totalInvalidDocs = totalInvalidDocs;
_totalDocs = totalDocs;
_segmentCrc = segmentCrc;
_validDocIdsType = validDocIdsType;
+ _segmentSizeInBytes = segmentSizeInBytes;
}
public String getSegmentName() {
@@ -66,4 +69,8 @@ public String getSegmentCrc() {
public ValidDocIdsType getValidDocIdsType() {
return _validDocIdsType;
}
+
+ public long getSegmentSizeInBytes() {
+ return _segmentSizeInBytes;
+ }
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/PauselessConsumptionUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/PauselessConsumptionUtils.java
new file mode 100644
index 000000000000..36449a54229f
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/PauselessConsumptionUtils.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import java.util.Optional;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
+
+
+public class PauselessConsumptionUtils {
+
+ private PauselessConsumptionUtils() {
+ // Private constructor to prevent instantiation of utility class
+ }
+
+ /**
+ * Checks if pauseless consumption is enabled for the given table configuration.
+ * Returns false if any configuration component is missing or if the flag is not set to true.
+ *
+ * @param tableConfig The table configuration to check. Must not be null.
+ * @return true if pauseless consumption is explicitly enabled, false otherwise
+ * @throws NullPointerException if tableConfig is null
+ */
+ public static boolean isPauselessEnabled(@NotNull TableConfig tableConfig) {
+ return Optional.ofNullable(tableConfig.getIngestionConfig()).map(IngestionConfig::getStreamIngestionConfig)
+ .map(StreamIngestionConfig::isPauselessConsumptionEnabled).orElse(false);
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java
index 45a791bc9af2..f034bb3fdcd5 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java
@@ -24,14 +24,13 @@
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.services.ServiceRole;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.pinot.spi.utils.CommonConstants.CONFIG_OF_TIMEZONE;
-
public class ServiceStartableUtils {
private ServiceStartableUtils() {
@@ -44,7 +43,10 @@ private ServiceStartableUtils() {
protected static String _timeZone;
/**
- * Applies the ZK cluster config to the given instance config if it does not already exist.
+ * Applies the ZK cluster config to:
+ * - The given instance config if it does not already exist.
+ * - Set the timezone.
+ * - Initialize the default values in {@link ForwardIndexConfig}.
*
* In the ZK cluster config:
* - pinot.all.* will be replaced to role specific config, e.g. pinot.controller.* for controllers
@@ -70,7 +72,8 @@ public static void applyClusterConfig(PinotConfiguration instanceConfig, String
zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, clusterName, clusterName), true);
if (clusterConfigZNRecord == null) {
LOGGER.warn("Failed to find cluster config for cluster: {}, skipping applying cluster config", clusterName);
- setupTimezone(instanceConfig);
+ setTimezone(instanceConfig);
+ initForwardIndexConfig(instanceConfig);
return;
}
@@ -90,9 +93,10 @@ public static void applyClusterConfig(PinotConfiguration instanceConfig, String
}
}
} finally {
- zkClient.close();
+ ZkStarter.closeAsync(zkClient);
}
- setupTimezone(instanceConfig);
+ setTimezone(instanceConfig);
+ initForwardIndexConfig(instanceConfig);
}
private static void addConfigIfNotExists(PinotConfiguration instanceConfig, String key, String value) {
@@ -101,10 +105,31 @@ private static void addConfigIfNotExists(PinotConfiguration instanceConfig, Stri
}
}
- private static void setupTimezone(PinotConfiguration instanceConfig) {
+ private static void setTimezone(PinotConfiguration instanceConfig) {
TimeZone localTimezone = TimeZone.getDefault();
- _timeZone = instanceConfig.getProperty(CONFIG_OF_TIMEZONE, localTimezone.getID());
+ _timeZone = instanceConfig.getProperty(CommonConstants.CONFIG_OF_TIMEZONE, localTimezone.getID());
System.setProperty("user.timezone", _timeZone);
LOGGER.info("Timezone: {}", _timeZone);
}
+
+ private static void initForwardIndexConfig(PinotConfiguration instanceConfig) {
+ String defaultRawIndexWriterVersion =
+ instanceConfig.getProperty(CommonConstants.ForwardIndexConfigs.CONFIG_OF_DEFAULT_RAW_INDEX_WRITER_VERSION);
+ if (defaultRawIndexWriterVersion != null) {
+ LOGGER.info("Setting forward index default raw index writer version to: {}", defaultRawIndexWriterVersion);
+ ForwardIndexConfig.setDefaultRawIndexWriterVersion(Integer.parseInt(defaultRawIndexWriterVersion));
+ }
+ String defaultTargetMaxChunkSize =
+ instanceConfig.getProperty(CommonConstants.ForwardIndexConfigs.CONFIG_OF_DEFAULT_TARGET_MAX_CHUNK_SIZE);
+ if (defaultTargetMaxChunkSize != null) {
+ LOGGER.info("Setting forward index default target max chunk size to: {}", defaultTargetMaxChunkSize);
+ ForwardIndexConfig.setDefaultTargetMaxChunkSize(defaultTargetMaxChunkSize);
+ }
+ String defaultTargetDocsPerChunk =
+ instanceConfig.getProperty(CommonConstants.ForwardIndexConfigs.CONFIG_OF_DEFAULT_TARGET_DOCS_PER_CHUNK);
+ if (defaultTargetDocsPerChunk != null) {
+ LOGGER.info("Setting forward index default target docs per chunk to: {}", defaultTargetDocsPerChunk);
+ ForwardIndexConfig.setDefaultTargetDocsPerChunk(Integer.parseInt(defaultTargetDocsPerChunk));
+ }
+ }
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/Timer.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/Timer.java
new file mode 100644
index 000000000000..23d3ca2da4a3
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/Timer.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+/**
+ * Utility class that works with a timeout in milliseconds and provides methods to check remaining time and expiration.
+ */
+public class Timer {
+ private final long _timeoutMillis;
+ private final long _startTime;
+
+ /**
+ * Initializes the Timer with the specified timeout in milliseconds.
+ *
+ * @param timeoutMillis the timeout duration in milliseconds
+ */
+ public Timer(long timeoutMillis) {
+ _timeoutMillis = timeoutMillis;
+ _startTime = System.currentTimeMillis();
+ }
+
+ /**
+ * Returns the remaining time in milliseconds. If the timeout has expired, it returns 0.
+ *
+ * @return the remaining time in milliseconds
+ */
+ public long getRemainingTime() {
+ long elapsedTime = System.currentTimeMillis() - _startTime;
+ long remainingTime = _timeoutMillis - elapsedTime;
+ return Math.max(remainingTime, 0);
+ }
+
+ /**
+ * Checks if the timer has expired.
+ *
+ * @return true if the timer has expired, false otherwise
+ */
+ public boolean hasExpired() {
+ return getRemainingTime() == 0;
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/ZkStarter.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/ZkStarter.java
index de3be516dbb0..3a15089710cf 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/ZkStarter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/ZkStarter.java
@@ -21,6 +21,8 @@
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.pinot.spi.utils.NetUtils;
@@ -179,10 +181,9 @@ public void run() {
// Wait until the ZK server is started
for (int retry = 0; retry < DEFAULT_ZK_CLIENT_RETRIES; retry++) {
try {
- Thread.sleep(1000L);
ZkClient client = new ZkClient("localhost:" + port, 1000 * (DEFAULT_ZK_CLIENT_RETRIES - retry));
client.waitUntilConnected(DEFAULT_ZK_CLIENT_RETRIES - retry, TimeUnit.SECONDS);
- client.close();
+ closeAsync(client);
break;
} catch (Exception e) {
if (retry < DEFAULT_ZK_CLIENT_RETRIES - 1) {
@@ -191,6 +192,7 @@ public void run() {
LOGGER.warn("Failed to connect to zk server.", e);
throw e;
}
+ Thread.sleep(50L);
}
}
return new ZookeeperInstance(zookeeperServerMain, dataDirPath, port);
@@ -200,6 +202,17 @@ public void run() {
}
}
+ public static void closeAsync(ZkClient client) {
+ if (client != null) {
+ ZK_DISCONNECTOR.submit(() -> {
+ client.close();
+ });
+ }
+ }
+
+ private static final ExecutorService ZK_DISCONNECTOR =
+ Executors.newFixedThreadPool(1, new NamedThreadFactory("zk-disconnector"));
+
/**
* Stops a local Zk instance, deleting its data directory
*/
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 8dbd4bb40228..5f88a9691c0b 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -190,6 +190,15 @@ public static boolean isUseMultistageEngine(Map queryOptions) {
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.USE_MULTISTAGE_ENGINE));
}
+ public static boolean isGetCursor(Map queryOptions) {
+ return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.GET_CURSOR));
+ }
+
+ public static Integer getCursorNumRows(Map queryOptions) {
+ String cursorNumRows = queryOptions.get(QueryOptionKey.CURSOR_NUM_ROWS);
+ return checkedParseIntPositive(QueryOptionKey.CURSOR_NUM_ROWS, cursorNumRows);
+ }
+
public static Optional isExplainAskingServers(Map queryOptions) {
String value = queryOptions.get(QueryOptionKey.EXPLAIN_ASKING_SERVERS);
if (value == null) {
@@ -204,6 +213,13 @@ public static Integer getMaxExecutionThreads(Map queryOptions) {
return checkedParseIntPositive(QueryOptionKey.MAX_EXECUTION_THREADS, maxExecutionThreadsString);
}
+ @Nullable
+ public static Integer getGroupTrimSize(Map queryOptions) {
+ String groupTrimSize = queryOptions.get(QueryOptionKey.GROUP_TRIM_SIZE);
+ // NOTE: Non-positive value means turning off the intermediate level trim
+ return uncheckedParseInt(QueryOptionKey.GROUP_TRIM_SIZE, groupTrimSize);
+ }
+
@Nullable
public static Integer getMinSegmentGroupTrimSize(Map queryOptions) {
String minSegmentGroupTrimSizeString = queryOptions.get(QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE);
@@ -259,6 +275,10 @@ public static Integer getMultiStageLeafLimit(Map queryOptions) {
return checkedParseIntNonNegative(QueryOptionKey.MULTI_STAGE_LEAF_LIMIT, maxLeafLimitStr);
}
+ public static boolean getErrorOnNumGroupsLimit(Map queryOptions) {
+ return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.ERROR_ON_NUM_GROUPS_LIMIT));
+ }
+
@Nullable
public static Integer getNumGroupsLimit(Map queryOptions) {
String maxNumGroupLimit = queryOptions.get(QueryOptionKey.NUM_GROUPS_LIMIT);
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
index b8c013427d1c..2d1e38d84a64 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
@@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -42,6 +43,7 @@
import org.apache.calcite.sql.SqlNumericLiteral;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.function.TransformFunctionType;
import org.apache.pinot.common.request.DataSource;
import org.apache.pinot.common.request.Expression;
import org.apache.pinot.common.request.ExpressionType;
@@ -53,6 +55,7 @@
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
+import org.apache.pinot.spi.utils.TimestampIndexUtils;
import org.apache.pinot.sql.FilterKind;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.apache.pinot.sql.parsers.SqlCompilationException;
@@ -631,4 +634,32 @@ public static Map getOptionsFromJson(JsonNode request, String op
public static Map getOptionsFromString(String optionStr) {
return Splitter.on(';').omitEmptyStrings().trimResults().withKeyValueSeparator('=').split(optionStr);
}
+
+ public static void applyTimestampIndexOverrideHints(Expression expression, PinotQuery query) {
+ applyTimestampIndexOverrideHints(expression, query, timeColumnWithGranularity -> true);
+ }
+
+ public static void applyTimestampIndexOverrideHints(
+ Expression expression, PinotQuery query, Predicate timeColumnWithGranularityPredicate
+ ) {
+ if (!expression.isSetFunctionCall()) {
+ return;
+ }
+ Function function = expression.getFunctionCall();
+ if (!function.getOperator().equalsIgnoreCase(TransformFunctionType.DATE_TRUNC.getName())) {
+ return;
+ }
+ String granularString = function.getOperands().get(0).getLiteral().getStringValue().toUpperCase();
+ Expression timeExpression = function.getOperands().get(1);
+ if (((function.getOperandsSize() == 2) || (function.getOperandsSize() == 3 && "MILLISECONDS".equalsIgnoreCase(
+ function.getOperands().get(2).getLiteral().getStringValue()))) && TimestampIndexUtils.isValidGranularity(
+ granularString) && timeExpression.getIdentifier() != null) {
+ String timeColumn = timeExpression.getIdentifier().getName();
+ String timeColumnWithGranularity = TimestampIndexUtils.getColumnWithGranularity(timeColumn, granularString);
+
+ if (timeColumnWithGranularityPredicate.test(timeColumnWithGranularity)) {
+ query.putToExpressionOverrideHints(expression, getIdentifierExpression(timeColumnWithGranularity));
+ }
+ }
+ }
}
diff --git a/pinot-common/src/main/proto/plan.proto b/pinot-common/src/main/proto/plan.proto
index 49d357307648..5e3d733e45e4 100644
--- a/pinot-common/src/main/proto/plan.proto
+++ b/pinot-common/src/main/proto/plan.proto
@@ -69,6 +69,8 @@ message AggregateNode {
repeated int32 groupKeys = 3;
AggType aggType = 4;
bool leafReturnFinalResult = 5;
+ repeated Collation collations = 6;
+ int32 limit = 7;
}
message FilterNode {
@@ -144,13 +146,15 @@ message MailboxReceiveNode {
}
message MailboxSendNode {
- int32 receiverStageId = 1;
+ // kept for backward compatibility. Brokers populate it, but servers should prioritize receiverStageIds
+ int32 receiverStageId = 1 [deprecated = true];
ExchangeType exchangeType = 2;
DistributionType distributionType = 3;
repeated int32 keys = 4;
bool prePartitioned = 5;
repeated Collation collations = 6;
bool sort = 7;
+ repeated int32 receiverStageIds = 8;
}
message ProjectNode {
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/BrokerPrometheusMetricsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/BrokerPrometheusMetricsTest.java
index 399e5b400b19..79add5d557d5 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/BrokerPrometheusMetricsTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/BrokerPrometheusMetricsTest.java
@@ -50,6 +50,8 @@ public abstract class BrokerPrometheusMetricsTest extends PinotPrometheusMetrics
BrokerMeter.ENTRIES_SCANNED_POST_FILTER, BrokerMeter.TOTAL_SERVER_RESPONSE_SIZE,
BrokerMeter.QUERY_QUOTA_EXCEEDED);
+ private static final List GAUGES_ACCEPTING_RAW_TABLE_NAME = List.of(BrokerGauge.REQUEST_SIZE);
+
private BrokerMetrics _brokerMetrics;
@BeforeClass
@@ -77,7 +79,7 @@ public void gaugeTest(BrokerGauge gauge) {
_brokerMetrics.setOrUpdateGlobalGauge(gauge, () -> 5L);
assertGaugeExportedCorrectly(gauge.getGaugeName(), EXPORTED_METRIC_PREFIX);
} else {
- if (gauge == BrokerGauge.REQUEST_SIZE) {
+ if (GAUGES_ACCEPTING_RAW_TABLE_NAME.contains(gauge)) {
_brokerMetrics.setOrUpdateTableGauge(PinotPrometheusMetricsTest.ExportedLabelValues.TABLENAME, gauge, 5L);
assertGaugeExportedCorrectly(gauge.getGaugeName(), PinotPrometheusMetricsTest.ExportedLabels.TABLENAME,
EXPORTED_METRIC_PREFIX);
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ControllerPrometheusMetricsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ControllerPrometheusMetricsTest.java
index 7fcb76eae194..1f458a444829 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ControllerPrometheusMetricsTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ControllerPrometheusMetricsTest.java
@@ -40,6 +40,7 @@ public abstract class ControllerPrometheusMetricsTest extends PinotPrometheusMet
private static final List GLOBAL_GAUGES_ACCEPTING_TASKTYPE =
List.of(ControllerGauge.NUM_MINION_TASKS_IN_PROGRESS, ControllerGauge.NUM_MINION_SUBTASKS_RUNNING,
ControllerGauge.NUM_MINION_SUBTASKS_WAITING, ControllerGauge.NUM_MINION_SUBTASKS_ERROR,
+ ControllerGauge.NUM_MINION_SUBTASKS_UNKNOWN,
ControllerGauge.PERCENT_MINION_SUBTASKS_IN_QUEUE, ControllerGauge.PERCENT_MINION_SUBTASKS_IN_ERROR);
//local gauges that accept partition
@@ -52,8 +53,7 @@ public abstract class ControllerPrometheusMetricsTest extends PinotPrometheusMet
ControllerGauge.TIME_MS_SINCE_LAST_SUCCESSFUL_MINION_TASK_GENERATION,
ControllerGauge.LAST_MINION_TASK_GENERATION_ENCOUNTERS_ERROR);
- private static final List GAUGES_ACCEPTING_RAW_TABLENAME =
- List.of(ControllerGauge.OFFLINE_TABLE_ESTIMATED_SIZE);
+ private static final List GAUGES_ACCEPTING_RAW_TABLENAME = List.of();
private ControllerMetrics _controllerMetrics;
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/MinionPrometheusMetricsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/MinionPrometheusMetricsTest.java
index 84de2f4d81b1..1dd982d6273f 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/MinionPrometheusMetricsTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/MinionPrometheusMetricsTest.java
@@ -43,7 +43,6 @@ public void setup() {
@Test(dataProvider = "minionTimers")
public void timerTest(MinionTimer timer) {
-
if (timer.isGlobal()) {
_minionMetrics.addTimedValue(timer, 30L, TimeUnit.MILLISECONDS);
assertTimerExportedCorrectly(timer.getTimerName(), EXPORTED_METRIC_PREFIX);
@@ -51,18 +50,10 @@ public void timerTest(MinionTimer timer) {
_minionMetrics.addTimedValue(ExportedLabelValues.MINION_TASK_SEGMENT_IMPORT, timer, 30L, TimeUnit.MILLISECONDS);
assertTimerExportedCorrectly(timer.getTimerName(),
List.of(ExportedLabelKeys.ID, ExportedLabelValues.MINION_TASK_SEGMENT_IMPORT), EXPORTED_METRIC_PREFIX);
-
_minionMetrics.addTimedTableValue(TABLE_NAME_WITH_TYPE, ExportedLabelValues.MINION_TASK_SEGMENT_IMPORT, timer,
30L, TimeUnit.MILLISECONDS);
-
- if (timer == MinionTimer.TASK_THREAD_CPU_TIME_NS) {
- assertTimerExportedCorrectly(timer.getTimerName(),
- List.of(ExportedLabelKeys.DATABASE, ExportedLabelValues.TABLENAME_WITH_TYPE_REALTIME,
- ExportedLabelKeys.TABLE, "myTable_REALTIME.SegmentImportTask"), EXPORTED_METRIC_PREFIX);
- } else {
- assertTimerExportedCorrectly(timer.getTimerName(), ExportedLabels.TABLENAME_TABLETYPE_MINION_TASKTYPE,
- EXPORTED_METRIC_PREFIX);
- }
+ assertTimerExportedCorrectly(timer.getTimerName(), ExportedLabels.TABLENAME_TABLETYPE_MINION_TASKTYPE,
+ EXPORTED_METRIC_PREFIX);
}
}
@@ -90,7 +81,6 @@ private void validateMetersWithLabels(MinionMeter meter) {
assertMeterExportedCorrectly(meter.getMeterName(),
List.of(ExportedLabelKeys.ID, ExportedLabelValues.MINION_TASK_SEGMENT_IMPORT), EXPORTED_METRIC_PREFIX);
} else if (meter == MinionMeter.SEGMENT_UPLOAD_FAIL_COUNT || meter == MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT) {
-
_minionMetrics.addMeteredTableValue(TABLE_NAME_WITH_TYPE, meter, 1L);
assertMeterExportedCorrectly(meter.getMeterName(), List.of(ExportedLabelKeys.ID, TABLE_NAME_WITH_TYPE),
EXPORTED_METRIC_PREFIX);
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/PinotPrometheusMetricsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/PinotPrometheusMetricsTest.java
index a3f21ad91d9d..2de1ce8c8b03 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/PinotPrometheusMetricsTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/PinotPrometheusMetricsTest.java
@@ -181,9 +181,10 @@ protected void assertTimerExportedCorrectly(String exportedTimerPrefix, List