From baf7f84270bf1479212d2ec59e7dea44d6ea8526 Mon Sep 17 00:00:00 2001 From: Gerardo Viedma <88890631+gviedma@users.noreply.github.com> Date: Tue, 21 Jan 2025 10:19:35 -0600 Subject: [PATCH] Adds push-based-ingestion plugin POC --- .../pinot-pulsar/pom.xml.versionsBackup | 237 ++++++++++++++++++ .../pinot-push-api/pom.xml | 86 +++++++ .../plugin/stream/push/BufferedRecord.java | 50 ++++ .../stream/push/PushApiApplication.java | 137 ++++++++++ .../stream/push/PushBasedIngestionBuffer.java | 82 ++++++ .../push/PushBasedIngestionBufferManager.java | 43 ++++ .../stream/push/PushBasedIngestionConfig.java | 41 +++ .../push/PushBasedIngestionConsumer.java | 79 ++++++ .../PushBasedIngestionConsumerFactory.java | 61 +++++ ...ushBasedIngestionConsumerPartitionLag.java | 41 +++ .../push/PushBasedIngestionMessageBatch.java | 54 ++++ .../PushBasedIngestionMessageMetadata.java | 42 ++++ .../PushBasedIngestionMetadataProvider.java | 100 ++++++++ .../push/resources/PushRecordResource.java | 91 +++++++ .../push/PushBasedIngestionConsumerTest.java | 117 +++++++++ ...ushBasedIngestionMetadataProviderTest.java | 73 ++++++ pinot-plugins/pinot-stream-ingestion/pom.xml | 1 + ...ithubPushEvents_realtime_table_config.json | 60 +++++ .../githubPushEvents_schema.json | 31 +++ .../rawdata/githubPushEvents_data.json | 1 + 20 files changed, 1427 insertions(+) create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-pulsar/pom.xml.versionsBackup create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/pom.xml create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/BufferedRecord.java create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushApiApplication.java create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionBuffer.java create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionBufferManager.java create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConfig.java create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumer.java create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumerFactory.java create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumerPartitionLag.java create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMessageBatch.java create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMessageMetadata.java create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMetadataProvider.java create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/resources/PushRecordResource.java create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/test/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumerTest.java create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/test/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMetadataProviderTest.java create mode 100644 pinot-tools/src/main/resources/examples/stream/pushBasedIngestionDemo/githubPushEvents_realtime_table_config.json create mode 100644 pinot-tools/src/main/resources/examples/stream/pushBasedIngestionDemo/githubPushEvents_schema.json create mode 100644 pinot-tools/src/main/resources/examples/stream/pushBasedIngestionDemo/rawdata/githubPushEvents_data.json diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/pom.xml.versionsBackup b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/pom.xml.versionsBackup new file mode 100644 index 000000000000..7926c5e33f25 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/pom.xml.versionsBackup @@ -0,0 +1,237 @@ + + + + 4.0.0 + + pinot-stream-ingestion + org.apache.pinot + 1.2.0-SNAPSHOT + .. + + + pinot-pulsar + Pinot Pulsar + https://pinot.apache.org/ + + + package + ${basedir}/../../.. + 2.11.0 + 9.4.51.v20230217 + 3.1.0 + 2.1 + 2.39 + 0.16.0 + 2.29.0 + 1.60.1 + 1.75 + 2.6.2 + 1.17 + 1.2 + 4.4 + 1.19.0 + + + + + org.eclipse.jetty + jetty-server + ${jetty-server.version} + + + javax.servlet + javax.servlet-api + + + + + org.testcontainers + pulsar + 1.17.1 + test + + + org.apache.pulsar + pulsar-client-original + ${pulsar.version} + + + javax.ws.rs + javax.ws.rs-api + + + commons-codec + commons-codec + + + org.bouncycastle + bcpkix-jdk15on + + + org.bouncycastle + bcprov-ext-jdk15on + + + org.eclipse.jetty + jetty-util + + + + + org.apache.pulsar + pulsar-client-admin-original + ${pulsar.version} + + + javax.servlet + javax.servlet-api + ${javax.servlet-api.version} + + + javax.ws.rs + javax.ws.rs-api + ${javax.ws.rs-api.version} + + + org.glassfish.jersey.containers + jersey-container-grizzly2-http + ${jersey-container-grizzly2-http.version} + + + org.glassfish.jersey.core + jersey-server + ${jersey-container-grizzly2-http.version} + + + org.glassfish.jersey.containers + jersey-container-servlet-core + ${jersey-container-grizzly2-http.version} + + + io.netty + netty-resolver + + + io.prometheus + simpleclient_common + ${simpleclient_common.version} + + + com.google.api.grpc + proto-google-common-protos + + + io.grpc + grpc-context + ${grpc-context.version} + + + commons-codec + commons-codec + + + io.prometheus + simpleclient + ${simpleclient_common.version} + + + org.eclipse.jetty + jetty-servlet + ${jetty-server.version} + + + com.squareup.okio + okio + + + io.prometheus + simpleclient_hotspot + ${simpleclient_common.version} + + + io.grpc + grpc-protobuf-lite + ${grpc-protobuf-lite.version} + + + io.grpc + grpc-context + + + + + org.apache.commons + commons-collections4 + ${commons-collections.version} + + + javax.annotation + javax.annotation-api + ${javax.annotation-api.version} + + + org.codehaus.mojo + animal-sniffer-annotations + ${codehaus-annotations.version} + + + com.github.ben-manes.caffeine + caffeine + ${caffeine.version} + + + io.netty + netty-codec-socks + + + org.bouncycastle + bcpkix-jdk15to18 + ${bouncycastle.version} + + + org.bouncycastle + bcprov-ext-jdk15to18 + ${bouncycastle.version} + + + org.bouncycastle + bcprov-jdk15to18 + ${bouncycastle.version} + + + org.apache.pinot + pinot-spi + + + commons-codec + commons-codec + + + commons-logging + commons-logging + + + + + diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/pom.xml new file mode 100644 index 000000000000..f4ef4c34e8c3 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/pom.xml @@ -0,0 +1,86 @@ + + + + 4.0.0 + + pinot-stream-ingestion + org.apache.pinot + 1.4.0-SNAPSHOT + + + pinot-push-api + Pinot Pused-Based Ingestion + https://pinot.apache.org/ + + ${basedir}/../../.. + + + + + org.mockito + mockito-core + test + + + org.apache.pinot + pinot-avro + + + + org.glassfish.jersey.core + jersey-server + + + org.glassfish.jersey.containers + jersey-container-grizzly2-http + + + org.glassfish.jersey.inject + jersey-hk2 + + + org.glassfish.jersey.media + jersey-media-json-jackson + + + + + + build-shaded-jar + + + skipShade + !true + + + + package + + + + pinot-fastdev + + none + + + + diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/BufferedRecord.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/BufferedRecord.java new file mode 100644 index 000000000000..c6ab20cd03b4 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/BufferedRecord.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.push; + +public class BufferedRecord { + private final byte[] _value; + private final long _arrivalTimestamp; + private volatile long _offset; + + public BufferedRecord(byte[] value) { + _value = value; + _arrivalTimestamp = System.nanoTime(); + } + + void setOffset(long offset) { + _offset = offset; + } + + public byte[] getValue() { + return _value; + } + + public long getArrivalTimestampNanos() { + return _arrivalTimestamp; + } + + public long getOffset() { + return _offset; + } + + public int getValueSize() { + return _value.length; + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushApiApplication.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushApiApplication.java new file mode 100644 index 000000000000..4a7f61206f9c --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushApiApplication.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.push; + +import java.io.IOException; +import java.net.URI; +import java.time.Instant; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.ws.rs.container.ContainerRequestContext; +import javax.ws.rs.container.ContainerResponseContext; +import javax.ws.rs.container.ContainerResponseFilter; +import org.glassfish.grizzly.http.server.HttpServer; +import org.glassfish.grizzly.http.server.NetworkListener; +import org.glassfish.hk2.utilities.binding.AbstractBinder; +import org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpServerFactory; +import org.glassfish.jersey.internal.guava.ThreadFactoryBuilder; +import org.glassfish.jersey.jackson.JacksonFeature; +import org.glassfish.jersey.process.JerseyProcessingUncaughtExceptionHandler; +import org.glassfish.jersey.server.ResourceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class PushApiApplication extends ResourceConfig { + private static final Logger LOGGER = LoggerFactory.getLogger(PushApiApplication.class); + + public static final String START_TIME = "serverStartTime"; + + private static final String RESOURCE_PACKAGES = "org.apache.pinot.plugin.stream.push.resources"; + + private final AtomicBoolean _shutDownInProgress = new AtomicBoolean(); + private HttpServer _httpServer; + private final PushBasedIngestionBufferManager _bufferManager; + + public PushApiApplication(PushBasedIngestionBufferManager bufferManager) { + _bufferManager = bufferManager; + + packages(RESOURCE_PACKAGES); + Instant serverStartTime = Instant.now(); + + register(new AbstractBinder() { + @Override + protected void configure() { + bind(_shutDownInProgress).to(AtomicBoolean.class); + bind(_bufferManager).to(PushBasedIngestionBufferManager.class); + bind(serverStartTime).named(START_TIME); + } + }); + + register(JacksonFeature.class); + + register(new ContainerResponseFilter() { + @Override + public void filter(ContainerRequestContext containerRequestContext, + ContainerResponseContext containerResponseContext) + throws IOException { + containerResponseContext.getHeaders().add("Access-Control-Allow-Origin", "*"); + } + }); + } + + public boolean start(int port) { + _httpServer = buildHttpServer(this, port); + try { + LOGGER.info("Starting push API on port {}", port); + _httpServer.start(); + } catch (IOException e) { + throw new RuntimeException("Failed to start http server", e); + } + + return true; + } + + public static HttpServer buildHttpServer(ResourceConfig resConfig, int port) { + // The URI is irrelevant since the default listener will be manually rewritten. + HttpServer httpServer = GrizzlyHttpServerFactory.createHttpServer(URI.create("http://0.0.0.0/"), resConfig, false); + + // Listeners cannot be configured with the factory. Manual overrides are required as instructed by Javadoc. + // @see org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpServerFactory.createHttpServer(java.net.URI, org + // .glassfish.jersey.grizzly2.httpserver.GrizzlyHttpContainer, boolean, org.glassfish.grizzly.ssl + // .SSLEngineConfigurator, boolean) + httpServer.removeListener("grizzly"); + + final NetworkListener listener = new NetworkListener("push-api-" + port, "0.0.0.0", port); + + listener.getTransport().getWorkerThreadPoolConfig().setThreadFactory( + new ThreadFactoryBuilder().setNameFormat("grizzly-http-server-push-api-%d") + .setUncaughtExceptionHandler(new JerseyProcessingUncaughtExceptionHandler()).build()) + .setCorePoolSize(10) + .setMaxPoolSize(10); + +// if (CommonConstants.HTTPS_PROTOCOL.equals(listenerConfig.getProtocol())) { +// listener.setSecure(true); +// listener.setSSLEngineConfig(buildSSLEngineConfigurator(listenerConfig.getTlsConfig())); +// } + httpServer.addListener(listener); + return httpServer; + } + + /** + * Starts shutting down the HTTP server, which rejects all requests except for the liveness check. + */ + public void startShuttingDown() { + _shutDownInProgress.set(true); + } + + /** + * Stops the HTTP server. + */ + public void stop() { + _httpServer.shutdownNow(); + } + + public HttpServer getHttpServer() { + return _httpServer; + } + + public PushBasedIngestionBufferManager getBufferManager() { + return _bufferManager; + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionBuffer.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionBuffer.java new file mode 100644 index 000000000000..3ce88238c402 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionBuffer.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.push; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + + +/** + * Kinesis stream specific config + */ +public class PushBasedIngestionBuffer { + private final BufferedRecord[] _buffer; + private final int _capacity; + + private final ReadWriteLock _bufferLock = new ReentrantReadWriteLock(); + private int _writeOffset = 0; + + public PushBasedIngestionBuffer(int capacity) { + _buffer = new BufferedRecord[capacity]; + _capacity = capacity; + } + + public void append(BufferedRecord record) { + _bufferLock.writeLock().lock(); + try { + record.setOffset(_writeOffset); + _buffer[_writeOffset % _capacity] = record; + _writeOffset++; + } finally { + _bufferLock.writeLock().unlock(); + } + } + + public void append(List records) { + _bufferLock.writeLock().lock(); + try { + for (BufferedRecord record : records) { + record.setOffset(_writeOffset); + _buffer[_writeOffset % _capacity] = record; + _writeOffset++; + } + } finally { + _bufferLock.writeLock().unlock(); + } + } + + public List readNextBatch(long offset) { + _bufferLock.readLock().lock(); + try { + if (offset >= _writeOffset) { + return Collections.emptyList(); + } + List messages = new ArrayList<>((int) (_writeOffset - offset)); + for (long i = offset; i < _writeOffset; i++) { + messages.add(_buffer[(int) i % _capacity]); + } + return messages; + } finally { + _bufferLock.readLock().unlock(); + } + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionBufferManager.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionBufferManager.java new file mode 100644 index 000000000000..4b1b6c6ee730 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionBufferManager.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.push; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Kinesis stream specific config + */ +public class PushBasedIngestionBufferManager { + private static final Logger LOGGER = LoggerFactory.getLogger(PushBasedIngestionBufferManager.class); + + private final int _bufferCapacity = 1000; + private final Map _buffersByTable; + + public PushBasedIngestionBufferManager() { + _buffersByTable = new ConcurrentHashMap<>(); + } + + public PushBasedIngestionBuffer getBufferForTable(String tableName) { + return _buffersByTable.computeIfAbsent(tableName, k -> new PushBasedIngestionBuffer(_bufferCapacity)); + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConfig.java new file mode 100644 index 000000000000..d109da2a7741 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConfig.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.push; + +import org.apache.pinot.spi.stream.StreamConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Kinesis stream specific config + */ +public class PushBasedIngestionConfig { + private static final Logger LOGGER = LoggerFactory.getLogger(PushBasedIngestionConfig.class); + + private final String _tableName; + + public PushBasedIngestionConfig(StreamConfig streamConfig) { + _tableName = streamConfig.getTableNameWithType(); + } + + public String getTableName() { + return _tableName; + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumer.java new file mode 100644 index 000000000000..fd07383f4187 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumer.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.push; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.pinot.spi.stream.BytesStreamMessage; +import org.apache.pinot.spi.stream.LongMsgOffset; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.StreamMessageMetadata; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A {@link PartitionGroupConsumer} implementation for push-based ingestion + */ +public class PushBasedIngestionConsumer implements PartitionGroupConsumer { + private static final Logger LOGGER = LoggerFactory.getLogger(PushBasedIngestionConsumer.class); + + private final PushBasedIngestionConfig _config; + private final PushBasedIngestionBufferManager _bufferManager; + + @VisibleForTesting + public PushBasedIngestionConsumer(PushBasedIngestionConfig config, PushBasedIngestionBufferManager bufferManager) { + _config = config; + _bufferManager = bufferManager; + LOGGER.info("Created push based consumer for table: {}", config.getTableName()); + } + + @Override + public synchronized PushBasedIngestionMessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, + int timeoutMs) { + LongMsgOffset startOffset = (LongMsgOffset) startMsgOffset; + PushBasedIngestionBuffer buffer = _bufferManager.getBufferForTable(_config.getTableName()); + List records = buffer.readNextBatch(startOffset.getOffset()); + LongMsgOffset offsetOfNextBatch = startOffset; + List messages = Collections.emptyList(); + if (!records.isEmpty()) { + messages = records.stream().map(record -> extractStreamMessage(record)).collect(Collectors.toList()); + StreamMessageMetadata lastMessageMetadata = messages.get(messages.size() - 1).getMetadata(); + offsetOfNextBatch = (LongMsgOffset) lastMessageMetadata.getNextOffset(); + } + return new PushBasedIngestionMessageBatch(messages, offsetOfNextBatch); + } + + private BytesStreamMessage extractStreamMessage(BufferedRecord record) { + LOGGER.info("Consuming record with sequence id {} and ingestion delay of {}ns", record.getOffset(), + System.nanoTime() - record.getArrivalTimestampNanos()); + StreamMessageMetadata messageMetadata = + new StreamMessageMetadata.Builder().setRecordIngestionTimeMs(record.getArrivalTimestampNanos() / 1000000) + .setSerializedValueSize(record.getValueSize()) + .setOffset(new LongMsgOffset(record.getOffset()), new LongMsgOffset(record.getOffset() + 1)).build(); + return new BytesStreamMessage(null, record.getValue(), messageMetadata); + } + + @Override + public void close() { + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumerFactory.java new file mode 100644 index 000000000000..ab617df9bcd7 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumerFactory.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.push; + +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamMetadataProvider; + + +/** + * {@link StreamConsumerFactory} implementation for the Kinesis stream + */ +public class PushBasedIngestionConsumerFactory extends StreamConsumerFactory { + + private static volatile PushApiApplication _pushApiApplication; + + @Override + public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) { + return new PushBasedIngestionMetadataProvider(clientId, _streamConfig); + } + + @Override + public StreamMetadataProvider createStreamMetadataProvider(String clientId) { + return new PushBasedIngestionMetadataProvider(clientId, _streamConfig); + } + + @Override + public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, + PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) { + // TODO this can be moved to pinot-server initialization + // launch the application the first time + if (_pushApiApplication == null) { + synchronized (PushBasedIngestionConsumer.class) { + if (_pushApiApplication == null) { + _pushApiApplication = new PushApiApplication(new PushBasedIngestionBufferManager()); + // TODO make port configurable + _pushApiApplication.start(8989); + } + } + } + return new PushBasedIngestionConsumer(new PushBasedIngestionConfig(_streamConfig), + _pushApiApplication.getBufferManager()); + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumerPartitionLag.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumerPartitionLag.java new file mode 100644 index 000000000000..a3b4e471b7cd --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumerPartitionLag.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.push; + +import org.apache.pinot.spi.stream.PartitionLagState; + + +public class PushBasedIngestionConsumerPartitionLag extends PartitionLagState { + private final String _recordsLag; + private final String _availabilityLagMs; + + public PushBasedIngestionConsumerPartitionLag(String recordsLag, String availabilityLagMs) { + _recordsLag = recordsLag; + _availabilityLagMs = availabilityLagMs; + } + + public String getRecordsLag() { + return _recordsLag; + } + + @Override + public String getAvailabilityLagMs() { + return _availabilityLagMs; + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMessageBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMessageBatch.java new file mode 100644 index 000000000000..7bba178803d8 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMessageBatch.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.push; + +import java.util.List; +import org.apache.pinot.spi.stream.BytesStreamMessage; +import org.apache.pinot.spi.stream.LongMsgOffset; +import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; + + +/** + * A {@link MessageBatch} for collecting records from the Kinesis stream + */ +public class PushBasedIngestionMessageBatch implements MessageBatch { + private final List _messages; + private final LongMsgOffset _offsetOfNextBatch; + + public PushBasedIngestionMessageBatch(List messages, LongMsgOffset offsetOfNextBatch) { + _messages = messages; + _offsetOfNextBatch = offsetOfNextBatch; + } + + @Override + public int getMessageCount() { + return _messages.size(); + } + + @Override + public BytesStreamMessage getStreamMessage(int index) { + return _messages.get(index); + } + + @Override + public StreamPartitionMsgOffset getOffsetOfNextBatch() { + return _offsetOfNextBatch; + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMessageMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMessageMetadata.java new file mode 100644 index 000000000000..d67b8ddba561 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMessageMetadata.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.push; + +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.stream.StreamMessageMetadata; + + +// TODO: Make it a util class +public class PushBasedIngestionMessageMetadata extends StreamMessageMetadata { + public static final String APPRX_ARRIVAL_TIMESTAMP_KEY = "apprxArrivalTimestamp"; + public static final String SEQUENCE_NUMBER_KEY = "sequenceNumber"; + + @Deprecated + public PushBasedIngestionMessageMetadata(long recordIngestionTimeMs, @Nullable GenericRow headers) { + super(recordIngestionTimeMs, headers); + } + + @Deprecated + public PushBasedIngestionMessageMetadata(long recordIngestionTimeMs, @Nullable GenericRow headers, + Map metadata) { + super(recordIngestionTimeMs, headers, metadata); + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMetadataProvider.java new file mode 100644 index 000000000000..332ce1c8f620 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMetadataProvider.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.push; + +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.spi.stream.ConsumerPartitionState; +import org.apache.pinot.spi.stream.LongMsgOffset; +import org.apache.pinot.spi.stream.OffsetCriteria; +import org.apache.pinot.spi.stream.PartitionLagState; +import org.apache.pinot.spi.stream.RowMetadata; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; +import org.apache.pinot.spi.stream.StreamMetadataProvider; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A {@link StreamMetadataProvider} implementation for the Kinesis stream + */ +public class PushBasedIngestionMetadataProvider implements StreamMetadataProvider { + private final StreamConsumerFactory _pushBasedIngestionConsumerFactory; + private final String _clientId; + private static final Logger LOGGER = LoggerFactory.getLogger(PushBasedIngestionMetadataProvider.class); + + public PushBasedIngestionMetadataProvider(String clientId, StreamConfig streamConfig) { + _pushBasedIngestionConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); + _clientId = clientId; + } + + public PushBasedIngestionMetadataProvider(String clientId, StreamConfig streamConfig, + StreamConsumerFactory streamConsumerFactory) { + _pushBasedIngestionConsumerFactory = streamConsumerFactory; + _clientId = clientId; + } + + @Override + public int fetchPartitionCount(long timeoutMillis) { + return 1; + } + + @Override + public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) { + return new LongMsgOffset(0L); + } + + @Override + public Map getCurrentPartitionLagState( + Map currentPartitionStateMap) { + Map perPartitionLag = new HashMap<>(); + for (Map.Entry entry : currentPartitionStateMap.entrySet()) { + ConsumerPartitionState partitionState = entry.getValue(); + // Compute records-lag + StreamPartitionMsgOffset currentOffset = partitionState.getCurrentOffset(); + StreamPartitionMsgOffset upstreamLatest = partitionState.getUpstreamLatestOffset(); + String offsetLagString = "UNKNOWN"; + + if (currentOffset instanceof LongMsgOffset && upstreamLatest instanceof LongMsgOffset) { + long offsetLag = ((LongMsgOffset) upstreamLatest).getOffset() - ((LongMsgOffset) currentOffset).getOffset(); + offsetLagString = String.valueOf(offsetLag); + } + + // Compute record-availability + String availabilityLagMs = "UNKNOWN"; + RowMetadata lastProcessedMessageMetadata = partitionState.getLastProcessedRowMetadata(); + if (lastProcessedMessageMetadata != null && partitionState.getLastProcessedTimeMs() > 0) { + long availabilityLag = + partitionState.getLastProcessedTimeMs() - lastProcessedMessageMetadata.getRecordIngestionTimeMs(); + availabilityLagMs = String.valueOf(availabilityLag); + } + + perPartitionLag.put(entry.getKey(), + new PushBasedIngestionConsumerPartitionLag(offsetLagString, availabilityLagMs)); + } + return perPartitionLag; + } + + @Override + public void close() { + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/resources/PushRecordResource.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/resources/PushRecordResource.java new file mode 100644 index 000000000000..060dc1c10cd7 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/resources/PushRecordResource.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.push.resources; + +import com.google.common.base.Preconditions; +import java.io.BufferedInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import javax.inject.Inject; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.plugin.stream.push.BufferedRecord; +import org.apache.pinot.plugin.stream.push.PushBasedIngestionBufferManager; +import org.glassfish.grizzly.http.server.Request; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This resource API can be used to retrieve instance level information like instance tags. + */ +@Path("/ingestion") +public class PushRecordResource { + + private static final Logger LOGGER = LoggerFactory.getLogger(PushRecordResource.class); + + @Inject + private PushBasedIngestionBufferManager _bufferManager; + + private static final String REALTIME_SUFFIX = "_REALTIME"; + + @POST + @Path("/records/{tableName}") + @Produces(MediaType.APPLICATION_JSON) + public Response ingestRecords(@PathParam("tableName") String tableName, @Context Request request) { + Preconditions.checkArgument(StringUtils.isNotEmpty(tableName), "'tableName' cannot be null or empty"); + LOGGER.info("Processing records for table {}", tableName); + if (!tableName.endsWith(REALTIME_SUFFIX)) { + tableName = tableName + REALTIME_SUFFIX; + } + try { + _bufferManager.getBufferForTable(tableName).append(generateRecord(request)); + return Response.ok().build(); + } catch (IOException e) { + LOGGER.error("Caught exception while processing records for table {}", tableName, e); + throw new RuntimeException("Failed to read request body", e); + } + } + + private BufferedRecord generateRecord(Request request) + throws IOException { + // Get the input stream of the request to read the body + InputStream inputStream = request.getInputStream(); + final byte[] bytes = drain(new BufferedInputStream(inputStream)); + return new BufferedRecord(bytes); + } + + byte[] drain(InputStream inputStream) + throws IOException { + final byte[] buf = new byte[1024]; + int len; + final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + while ((len = inputStream.read(buf)) > 0) { + byteArrayOutputStream.write(buf, 0, len); + } + return byteArrayOutputStream.toByteArray(); + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/test/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/test/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumerTest.java new file mode 100644 index 000000000000..ef02720e9a11 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/test/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumerTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.push; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLConnection; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.spi.stream.LongMsgOffset; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConfigProperties; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; + + +public class PushBasedIngestionConsumerTest { + private static final String STREAM_TYPE = "push"; + private static final String TABLE_NAME_WITH_TYPE = "pushTest_REALTIME"; + private static final String STREAM_NAME = "push-test"; + private static final int TIMEOUT = 1000; + private static final int NUM_RECORDS = 10; + private static final String DUMMY_RECORD_PREFIX = "DUMMY_RECORD-"; + + private PushBasedIngestionConfig _config; + private PushBasedIngestionBufferManager _bufferManager; + private List _records; + private PushApiApplication _pushApiApplication; + + private PushBasedIngestionConfig getConfig() { + Map props = new HashMap<>(); + props.put(StreamConfigProperties.STREAM_TYPE, STREAM_TYPE); + props.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_TOPIC_NAME), + STREAM_NAME); + props.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, + StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), PushBasedIngestionConsumerFactory.class.getName()); + props.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_DECODER_CLASS), + "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder"); + return new PushBasedIngestionConfig(new StreamConfig(TABLE_NAME_WITH_TYPE, props)); + } + + @BeforeClass + public void setUp() + throws InterruptedException, IOException { + _bufferManager = new PushBasedIngestionBufferManager(); + _pushApiApplication = new PushApiApplication(_bufferManager); + _pushApiApplication.start(8989); + _config = getConfig(); + _records = new ArrayList<>(NUM_RECORDS); + + for (int i = 0; i < NUM_RECORDS; i++) { + byte[] record = (DUMMY_RECORD_PREFIX + i).getBytes(StandardCharsets.UTF_8); + postRecord(record); + } + } + + private void postRecord(byte[] record) throws IOException { + URL url = new URL("http://localhost:8989/ingestion/records/" + TABLE_NAME_WITH_TYPE); + URLConnection con = url.openConnection(); + HttpURLConnection http = (HttpURLConnection) con; + http.setRequestMethod("POST"); // PUT is another valid option + http.setDoOutput(true); + int length = record.length; + http.setFixedLengthStreamingMode(length); + http.setRequestProperty("Content-Type", "application/octet-stream"); + http.setRequestProperty("Content-Length", Integer.toString(length)); + http.connect(); + try (OutputStream os = http.getOutputStream()) { + os.write(record); + } + assertEquals(200, http.getResponseCode()); + } + + @AfterClass + public void shutDown() { + _pushApiApplication.stop(); + } + + @Test + public void testBasicConsumer() { + PushBasedIngestionConsumer consumer = new PushBasedIngestionConsumer(_config, _bufferManager); + // Fetch first batch + LongMsgOffset startOffset = new LongMsgOffset(0); + PushBasedIngestionMessageBatch messageBatch = consumer.fetchMessages(startOffset, TIMEOUT); + assertEquals(messageBatch.getMessageCount(), NUM_RECORDS); + for (int i = 0; i < NUM_RECORDS; i++) { + assertEquals(new String(messageBatch.getStreamMessage(i).getValue(), StandardCharsets.UTF_8), + DUMMY_RECORD_PREFIX + i); + } + assertFalse(messageBatch.isEndOfPartitionGroup()); + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/test/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMetadataProviderTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/test/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMetadataProviderTest.java new file mode 100644 index 000000000000..dd9da47b3509 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/test/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMetadataProviderTest.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.push; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConfigProperties; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; + + +public class PushBasedIngestionMetadataProviderTest { + private static final String STREAM_NAME = "kinesis-test"; + private static final String CLIENT_ID = "dummy"; + private static final int TIMEOUT = 1000; + + private PushBasedIngestionMetadataProvider _metadataProvider; + private StreamConsumerFactory _streamConsumerFactory; + private PartitionGroupConsumer _partitionGroupConsumer; + + private StreamConfig getStreamConfig() { + Map props = new HashMap<>(); + props.put(StreamConfigProperties.STREAM_TYPE, "push"); + props.put("stream.push.consumer.type", "lowLevel"); + props.put("stream.push.topic.name", STREAM_NAME); + props.put("stream.push.decoder.class.name", "ABCD"); + props.put("stream.push.consumer.factory.class.name", + "org.apache.pinot.plugin.stream.push.PushBasedIngestionConsumerFactory"); + return new StreamConfig("pushTest", props); + } + + @BeforeMethod + public void setupTest() { + _streamConsumerFactory = mock(StreamConsumerFactory.class); + _partitionGroupConsumer = mock(PartitionGroupConsumer.class); + _metadataProvider = new PushBasedIngestionMetadataProvider(CLIENT_ID, getStreamConfig(), _streamConsumerFactory); + } + + @Test + public void getPartitionsGroupInfoListTest() + throws Exception { + List result = + _metadataProvider.computePartitionGroupMetadata(CLIENT_ID, getStreamConfig(), new ArrayList<>(), TIMEOUT); + + Assert.assertEquals(result.size(), 1); + Assert.assertEquals(result.get(0).getPartitionGroupId(), 0); + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pom.xml b/pinot-plugins/pinot-stream-ingestion/pom.xml index bc8ab7b77f25..e7c57a563246 100644 --- a/pinot-plugins/pinot-stream-ingestion/pom.xml +++ b/pinot-plugins/pinot-stream-ingestion/pom.xml @@ -41,6 +41,7 @@ pinot-kafka-3.0 pinot-kinesis pinot-pulsar + pinot-push-api diff --git a/pinot-tools/src/main/resources/examples/stream/pushBasedIngestionDemo/githubPushEvents_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/pushBasedIngestionDemo/githubPushEvents_realtime_table_config.json new file mode 100644 index 000000000000..55df570a45ac --- /dev/null +++ b/pinot-tools/src/main/resources/examples/stream/pushBasedIngestionDemo/githubPushEvents_realtime_table_config.json @@ -0,0 +1,60 @@ +{ + "tableName": "githubPushEvents", + "tableType": "REALTIME", + "tenants": {}, + "segmentsConfig": { + "timeColumnName": "DaysSinceEpoch", + "retentionTimeUnit": "DAYS", + "retentionTimeValue": "5", + "replication": "1" + }, + "tableIndexConfig": {}, + "routing": { + "segmentPrunerTypes": [ + "time" + ] + }, + "ingestionConfig": { + "streamIngestionConfig": { + "streamConfigMaps": [ + { + "streamType": "push", + "stream.push.topic.name": "githubPushEvents", + "stream.push.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder", + "stream.push.consumer.factory.class.name": "org.apache.pinot.plugin.stream.push.PushBasedIngestionConsumerFactory", + "realtime.segment.flush.threshold.time": "3600000", + "realtime.segment.flush.threshold.size": "50000" + } + ] + }, + "transformConfigs": [ + { + "columnName": "ts", + "transformFunction": "fromEpochDays(DaysSinceEpoch)" + }, + { + "columnName": "tsRaw", + "transformFunction": "fromEpochDays(DaysSinceEpoch)" + } + ] + }, + "fieldConfigList": [ + { + "name": "ts", + "encodingType": "DICTIONARY", + "indexTypes": [ + "TIMESTAMP" + ], + "timestampConfig": { + "granularities": [ + "DAY", + "WEEK", + "MONTH" + ] + } + } + ], + "metadata": { + "customConfigs": {} + } +} diff --git a/pinot-tools/src/main/resources/examples/stream/pushBasedIngestionDemo/githubPushEvents_schema.json b/pinot-tools/src/main/resources/examples/stream/pushBasedIngestionDemo/githubPushEvents_schema.json new file mode 100644 index 000000000000..ab3b6c2a71e5 --- /dev/null +++ b/pinot-tools/src/main/resources/examples/stream/pushBasedIngestionDemo/githubPushEvents_schema.json @@ -0,0 +1,31 @@ +{ + "schemaName": "githubPushEvents", + "dimensionFieldSpecs": [ + { + "name": "id", + "dataType": "STRING" + }, + { + "name": "actorId", + "dataType": "STRING" + }, + { + "name": "type", + "dataType": "STRING" + } + ], + "dateTimeFieldSpecs": [ + { + "name": "created_at", + "dataType": "STRING", + "format": "1:SECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd'T'HH:mm:ssZ", + "granularity": "1:HOURS" + }, + { + "name": "created_at_timestamp", + "dataType": "TIMESTAMP", + "format": "1:MILLISECONDS:TIMESTAMP", + "granularity": "1:SECONDS" + } + ] +} diff --git a/pinot-tools/src/main/resources/examples/stream/pushBasedIngestionDemo/rawdata/githubPushEvents_data.json b/pinot-tools/src/main/resources/examples/stream/pushBasedIngestionDemo/rawdata/githubPushEvents_data.json new file mode 100644 index 000000000000..6c43eb3d29bc --- /dev/null +++ b/pinot-tools/src/main/resources/examples/stream/pushBasedIngestionDemo/rawdata/githubPushEvents_data.json @@ -0,0 +1 @@ +{"Quarter":1,"FlightNum":2,"Origin":"JFK","LateAircraftDelay":null,"DivActualElapsedTime":null,"DivWheelsOns":null,"DivWheelsOffs":null,"ArrDel15":0,"AirTime":359,"DivTotalGTimes":null,"DepTimeBlk":"0900-0959","DestCityMarketID":32575,"DaysSinceEpoch":16071,"DivAirportSeqIDs":null,"DepTime":914,"Month":1,"DestStateName":"California","CRSElapsedTime":385,"Carrier":"AA","DestAirportID":12892,"Distance":2475,"ArrTimeBlk":"1200-1259","SecurityDelay":null,"DivArrDelay":null,"LongestAddGTime":null,"OriginWac":22,"WheelsOff":934,"UniqueCarrier":"AA","DestAirportSeqID":1289203,"DivReachedDest":null,"Diverted":0,"ActualElapsedTime":384,"AirlineID":19805,"OriginStateName":"New York","FlightDate":"2014-01-01","DepartureDelayGroups":0,"DivAirportLandings":0,"OriginCityName":"New York, NY","OriginStateFips":36,"OriginState":"NY","DistanceGroup":10,"WeatherDelay":null,"DestWac":91,"WheelsOn":1233,"OriginAirportID":12478,"OriginCityMarketID":31703,"NASDelay":null,"DestState":"CA","ArrTime":1238,"ArrivalDelayGroups":0,"Flights":1,"DayofMonth":1,"RandomAirports":["SEA","PSC","PHX","MSY","ATL","TYS","DEN","CHS","PDX","LAX","EWR","SFO","PIT","RDU","RAP","LSE","SAN","SBN","IAH","OAK","BRO","JFK","SAT","ORD","ACY","DFW","BWI","TPA","BFL","BOS","SNA","ISN"],"TotalAddGTime":null,"CRSDepTime":900,"DayOfWeek":3,"Dest":"LAX","CancellationCode":null,"FirstDepTime":null,"DivTailNums":null,"DepDelayMinutes":14,"DepDelay":14,"TaxiIn":5,"OriginAirportSeqID":1247802,"DestStateFips":6,"ArrDelay":13,"Cancelled":0,"DivAirportIDs":null,"TaxiOut":20,"DepDel15":0,"CarrierDelay":null,"DivLongestGTimes":null,"DivAirports":null,"DivDistance":null,"Year":2014,"CRSArrTime":1225,"ArrDelayMinutes":13,"TailNum":"N338AA","DestCityName":"Los Angeles, CA"} \ No newline at end of file