Skip to content

Commit

Permalink
Merge branch 'apache:master' into groovy_injection
Browse files Browse the repository at this point in the history
  • Loading branch information
abhishekbafna authored Jan 20, 2025
2 parents f74df61 + fbbabd4 commit bf6628c
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 61 deletions.
14 changes: 7 additions & 7 deletions .github/workflows/pinot_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:
cache: 'maven'
- name: Linter Test
env:
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
MAVEN_OPTS: >
-Xmx2G -DskipShade -DfailIfNoTests=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=25
-Dmaven.wagon.http.retryHandler.count=30 -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false
Expand Down Expand Up @@ -112,7 +112,7 @@ jobs:
env:
RUN_INTEGRATION_TESTS: false
RUN_TEST_SET: ${{ matrix.testset }}
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
MAVEN_OPTS: >
-Xmx2G -DskipShade -DfailIfNoTests=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=25
-Dmaven.wagon.http.retryHandler.count=30 -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false
Expand All @@ -134,7 +134,7 @@ jobs:
RUN_INTEGRATION_TESTS: false
RUN_TEST_SET: ${{ matrix.testset }}
PINOT_OFFHEAP_SKIP_BYTEBUFFER: ${{ matrix.skip_bytebuffer }}
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
MAVEN_OPTS: >
-Xmx2G -DskipShade -DfailIfNoTests=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=25
-Dmaven.wagon.http.retryHandler.count=30 -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false
Expand Down Expand Up @@ -202,7 +202,7 @@ jobs:
- name: Build Project
env:
RUN_INTEGRATION_TESTS: true
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
MAVEN_OPTS: >
-Xmx2G -DskipShade -DfailIfNoTests=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=25
-Dmaven.wagon.http.retryHandler.count=30 -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false
Expand All @@ -219,7 +219,7 @@ jobs:
RUN_INTEGRATION_TESTS: true
RUN_TEST_SET: ${{ matrix.testset }}
PINOT_OFFHEAP_SKIP_BYTEBUFFER: ${{ matrix.skip_bytebuffer }}
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
MAVEN_OPTS: >
-Xmx2G -DskipShade -DfailIfNoTests=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=25
-Dmaven.wagon.http.retryHandler.count=30 -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false
Expand All @@ -245,7 +245,7 @@ jobs:
env:
RUN_INTEGRATION_TESTS: true
RUN_TEST_SET: ${{ matrix.testset }}
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
MAVEN_OPTS: >
-Xmx2G -DskipShade -DfailIfNoTests=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=25
-Dmaven.wagon.http.retryHandler.count=30 -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false
Expand Down Expand Up @@ -407,7 +407,7 @@ jobs:
- uses: actions/cache@v4
env:
SEGMENT_DOWNLOAD_TIMEOUT_MINS: 10
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
with:
path: ~/.m2/repository
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,4 @@ kubernetes/helm/**/Chart.lock

#Develocity
.mvn/.gradle-enterprise/
.mvn/.develocity/
19 changes: 9 additions & 10 deletions .mvn/gradle-enterprise.xml → .mvn/develocity.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,19 @@
under the License.
-->
<gradleEnterprise xmlns="https://www.gradle.com/gradle-enterprise-maven" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="https://www.gradle.com/gradle-enterprise-maven https://www.gradle.com/schema/gradle-enterprise-maven.xsd">
<develocity xmlns="https://www.gradle.com/develocity-maven" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="https://www.gradle.com/gradle-enterprise-maven https://www.gradle.com/schema/develocity-maven.xsd">
<projectId>pinot</projectId>
<server>
<url>https://ge.apache.org</url>
<url>https://develocity.apache.org</url>
<allowUntrusted>false</allowUntrusted>
</server>
<buildScan>
<capture>
<goalInputFiles>true</goalInputFiles>
<buildLogging>true</buildLogging>
<testLogging>true</testLogging>
</capture>
<backgroundBuildScanUpload>#{isFalse(env['GITHUB_ACTIONS'])}</backgroundBuildScanUpload>
<publish>ALWAYS</publish>
<publishIfAuthenticated>true</publishIfAuthenticated>
<publishing>
<onlyIf>
<![CDATA[authenticated]]>
</onlyIf>
</publishing>
<obfuscation>
<ipAddresses>#{{'0.0.0.0'}}</ipAddresses>
</obfuscation>
Expand All @@ -45,4 +44,4 @@
<enabled>false</enabled>
</remote>
</buildCache>
</gradleEnterprise>
</develocity>
4 changes: 2 additions & 2 deletions .mvn/extensions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
<extensions xmlns="http://maven.apache.org/EXTENSIONS/1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/EXTENSIONS/1.0.0 http://maven.apache.org/xsd/core-extensions-1.0.0.xsd">
<extension>
<groupId>com.gradle</groupId>
<artifactId>gradle-enterprise-maven-extension</artifactId>
<version>1.20.1</version>
<artifactId>develocity-maven-extension</artifactId>
<version>1.22.2</version>
</extension>
<extension>
<groupId>com.gradle</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Phaser;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -135,20 +136,16 @@ public void testGet()
}

@Test(expectedExceptions = ServiceUnavailableException.class)
public void testRejectHandler()
throws InterruptedException {
public void testRejectHandler() {
BrokerManagedAsyncExecutorProvider provider = new BrokerManagedAsyncExecutorProvider(1, 1, 1, _brokerMetrics);
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) provider.getExecutorService();
ExecutorService threadPoolExecutor = provider.getExecutorService();

// test the rejection policy
AtomicInteger counter = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
threadPoolExecutor.execute(() -> {
counter.incrementAndGet();
latch.countDown();
});
int taskCount = 3;
Phaser phaser = new Phaser(taskCount);
for (int i = 0; i < taskCount; i++) {
threadPoolExecutor.execute(phaser::arriveAndAwaitAdvance);
}
latch.await();
phaser.arriveAndDeregister();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
Expand All @@ -47,9 +48,9 @@
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriBuilder;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
Expand Down Expand Up @@ -212,33 +213,34 @@ public Response downloadLogFileFromInstance(
@ApiParam(value = "Instance Name", required = true) @PathParam("instanceName") String instanceName,
@ApiParam(value = "Log file path", required = true) @QueryParam("filePath") String filePath,
@Context Map<String, String> headers) {
try {
URI uri = UriBuilder.fromUri(getInstanceBaseUri(instanceName)).path("/loggers/download")
.queryParam("filePath", filePath).build();
ClassicRequestBuilder requestBuilder = ClassicRequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
if (MapUtils.isNotEmpty(headers)) {
for (Map.Entry<String, String> header : headers.entrySet()) {
requestBuilder.addHeader(header.getKey(), header.getValue());
}
URI uri = UriBuilder.fromUri(getInstanceBaseUri(instanceName)).path("/loggers/download")
.queryParam("filePath", filePath).build();
ClassicRequestBuilder requestBuilder = ClassicRequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
if (MapUtils.isNotEmpty(headers)) {
for (Map.Entry<String, String> header : headers.entrySet()) {
requestBuilder.addHeader(header.getKey(), header.getValue());
}
if (authorization != null) {
requestBuilder.addHeader(HttpHeaders.AUTHORIZATION, authorization);
}
try (CloseableHttpResponse httpResponse = _fileUploadDownloadClient.getHttpClient()
.execute(requestBuilder.build())) {
if (httpResponse.getCode() >= 400) {
throw new WebApplicationException(IOUtils.toString(httpResponse.getEntity().getContent(), "UTF-8"),
Response.Status.fromStatusCode(httpResponse.getCode()));
}
if (authorization != null) {
requestBuilder.addHeader(HttpHeaders.AUTHORIZATION, authorization);
}

StreamingOutput streamingOutput = output -> {
try (CloseableHttpResponse response = _fileUploadDownloadClient.getHttpClient().execute(requestBuilder.build());
InputStream inputStream = response.getEntity().getContent()) {
// Stream the data using a buffer
byte[] buffer = new byte[1024];
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
output.write(buffer, 0, bytesRead);
}
Response.ResponseBuilder builder = Response.ok();
builder.entity(httpResponse.getEntity().getContent());
builder.contentLocation(uri);
builder.header(HttpHeaders.CONTENT_LENGTH, httpResponse.getEntity().getContentLength());
return builder.build();
output.flush();
}
} catch (IOException e) {
throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
};
Response.ResponseBuilder builder = Response.ok();
builder.entity(streamingOutput);
builder.contentLocation(uri);
return builder.build();
}

private String getInstanceBaseUri(String instanceName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ public class PinotTaskRestletResource {

private static final String TASK_QUEUE_STATE_STOP = "STOP";
private static final String TASK_QUEUE_STATE_RESUME = "RESUME";
public static final String GENERATION_ERRORS_KEY = "generationErrors";
public static final String SCHEDULING_ERRORS_KEY = "schedulingErrors";

@Inject
PinotHelixTaskResourceManager _pinotHelixTaskResourceManager;
Expand Down Expand Up @@ -667,8 +669,8 @@ public Map<String, String> scheduleTasks(
schedulingErrors.addAll(value.getSchedulingErrors());
});
}
response.put("generationErrors", String.join(",", generationErrors));
response.put("schedulingErrors", String.join(",", schedulingErrors));
response.put(GENERATION_ERRORS_KEY, String.join(",", generationErrors));
response.put(SCHEDULING_ERRORS_KEY, String.join(",", schedulingErrors));
return response;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private static void validateCompatibilityWithTableConfig(Schema schema, TableCon
} catch (Exception e) {
throw new IllegalStateException(
"Schema is incompatible with tableConfig with name: " + tableConfig.getTableName() + " and type: "
+ tableConfig.getTableType(), e);
+ tableConfig.getTableType() + ", reason: " + e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.minion.MinionClient;
import org.apache.pinot.common.utils.tls.TlsUtils;
import org.apache.pinot.controller.api.resources.PinotTaskRestletResource;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableConfig;
Expand Down Expand Up @@ -308,6 +309,11 @@ private boolean waitForMinionTaskToFinish(Map<String, String> scheduledTasks, lo
try {
boolean allCompleted = true;
for (String taskType : scheduledTasks.keySet()) {
// ignore the error message entries
if (taskType.equals(PinotTaskRestletResource.GENERATION_ERRORS_KEY)
|| taskType.equals(PinotTaskRestletResource.SCHEDULING_ERRORS_KEY)) {
continue;
}
String taskName = scheduledTasks.get(taskType);
String taskState = _minionClient.getTaskState(taskName);
if (!COMPLETED.equalsIgnoreCase(taskState)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.tools;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.pinot.tools.admin.PinotAdministrator;


public class GeoSpatialQuickStart extends Quickstart {
private static final String QUICKSTART_IDENTIFIER = "GEOSPATIAL";
private static final String[] DATA_DIRECTORIES = new String[]{
"examples/batch/starbucksStores/",
};

@Override
public List<String> types() {
return Collections.singletonList(QUICKSTART_IDENTIFIER);
}

@Override
public String[] getDefaultBatchTableDirectories() {
return DATA_DIRECTORIES;
}

public static void main(String[] args)
throws Exception {
List<String> arguments = new ArrayList<>();
arguments.addAll(Arrays.asList("QuickStart", "-type", QUICKSTART_IDENTIFIER));
arguments.addAll(Arrays.asList(args));
PinotAdministrator.main(arguments.toArray(new String[arguments.size()]));
}
}
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@
<async-http-client.version>3.0.1</async-http-client.version>
<jersey.version>2.45</jersey.version>
<hk2.version>2.6.1</hk2.version>
<swagger.version>1.6.14</swagger.version>
<swagger.version>1.6.15</swagger.version>
<swagger-ui.version>5.18.2</swagger-ui.version>
<hadoop.version>3.4.1</hadoop.version>
<jsonpath.version>2.9.0</jsonpath.version>
Expand Down Expand Up @@ -232,8 +232,8 @@

<!-- Google Libraries -->
<protobuf.version>3.25.5</protobuf.version>
<grpc.version>1.69.0</grpc.version>
<google.cloud.libraries.version>26.52.0</google.cloud.libraries.version>
<grpc.version>1.69.1</grpc.version>
<google.cloud.libraries.version>26.53.0</google.cloud.libraries.version>
<google.auto-service.version>1.1.1</google.auto-service.version>
<google.re2j.version>1.8</google.re2j.version>
<google.errorprone.version>2.36.0</google.errorprone.version>
Expand Down

0 comments on commit bf6628c

Please sign in to comment.