From baf7f84270bf1479212d2ec59e7dea44d6ea8526 Mon Sep 17 00:00:00 2001
From: Gerardo Viedma <88890631+gviedma@users.noreply.github.com>
Date: Tue, 21 Jan 2025 10:19:35 -0600
Subject: [PATCH] Adds push-based-ingestion plugin POC
---
.../pinot-pulsar/pom.xml.versionsBackup | 237 ++++++++++++++++++
.../pinot-push-api/pom.xml | 86 +++++++
.../plugin/stream/push/BufferedRecord.java | 50 ++++
.../stream/push/PushApiApplication.java | 137 ++++++++++
.../stream/push/PushBasedIngestionBuffer.java | 82 ++++++
.../push/PushBasedIngestionBufferManager.java | 43 ++++
.../stream/push/PushBasedIngestionConfig.java | 41 +++
.../push/PushBasedIngestionConsumer.java | 79 ++++++
.../PushBasedIngestionConsumerFactory.java | 61 +++++
...ushBasedIngestionConsumerPartitionLag.java | 41 +++
.../push/PushBasedIngestionMessageBatch.java | 54 ++++
.../PushBasedIngestionMessageMetadata.java | 42 ++++
.../PushBasedIngestionMetadataProvider.java | 100 ++++++++
.../push/resources/PushRecordResource.java | 91 +++++++
.../push/PushBasedIngestionConsumerTest.java | 117 +++++++++
...ushBasedIngestionMetadataProviderTest.java | 73 ++++++
pinot-plugins/pinot-stream-ingestion/pom.xml | 1 +
...ithubPushEvents_realtime_table_config.json | 60 +++++
.../githubPushEvents_schema.json | 31 +++
.../rawdata/githubPushEvents_data.json | 1 +
20 files changed, 1427 insertions(+)
create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-pulsar/pom.xml.versionsBackup
create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/pom.xml
create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/BufferedRecord.java
create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushApiApplication.java
create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionBuffer.java
create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionBufferManager.java
create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConfig.java
create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumer.java
create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumerFactory.java
create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumerPartitionLag.java
create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMessageBatch.java
create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMessageMetadata.java
create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMetadataProvider.java
create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/resources/PushRecordResource.java
create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/test/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumerTest.java
create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/test/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMetadataProviderTest.java
create mode 100644 pinot-tools/src/main/resources/examples/stream/pushBasedIngestionDemo/githubPushEvents_realtime_table_config.json
create mode 100644 pinot-tools/src/main/resources/examples/stream/pushBasedIngestionDemo/githubPushEvents_schema.json
create mode 100644 pinot-tools/src/main/resources/examples/stream/pushBasedIngestionDemo/rawdata/githubPushEvents_data.json
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/pom.xml.versionsBackup b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/pom.xml.versionsBackup
new file mode 100644
index 000000000000..7926c5e33f25
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/pom.xml.versionsBackup
@@ -0,0 +1,237 @@
+
+
+
+ 4.0.0
+
+ pinot-stream-ingestion
+ org.apache.pinot
+ 1.2.0-SNAPSHOT
+ ..
+
+
+ pinot-pulsar
+ Pinot Pulsar
+ https://pinot.apache.org/
+
+
+ package
+ ${basedir}/../../..
+ 2.11.0
+ 9.4.51.v20230217
+ 3.1.0
+ 2.1
+ 2.39
+ 0.16.0
+ 2.29.0
+ 1.60.1
+ 1.75
+ 2.6.2
+ 1.17
+ 1.2
+ 4.4
+ 1.19.0
+
+
+
+
+ org.eclipse.jetty
+ jetty-server
+ ${jetty-server.version}
+
+
+ javax.servlet
+ javax.servlet-api
+
+
+
+
+ org.testcontainers
+ pulsar
+ 1.17.1
+ test
+
+
+ org.apache.pulsar
+ pulsar-client-original
+ ${pulsar.version}
+
+
+ javax.ws.rs
+ javax.ws.rs-api
+
+
+ commons-codec
+ commons-codec
+
+
+ org.bouncycastle
+ bcpkix-jdk15on
+
+
+ org.bouncycastle
+ bcprov-ext-jdk15on
+
+
+ org.eclipse.jetty
+ jetty-util
+
+
+
+
+ org.apache.pulsar
+ pulsar-client-admin-original
+ ${pulsar.version}
+
+
+ javax.servlet
+ javax.servlet-api
+ ${javax.servlet-api.version}
+
+
+ javax.ws.rs
+ javax.ws.rs-api
+ ${javax.ws.rs-api.version}
+
+
+ org.glassfish.jersey.containers
+ jersey-container-grizzly2-http
+ ${jersey-container-grizzly2-http.version}
+
+
+ org.glassfish.jersey.core
+ jersey-server
+ ${jersey-container-grizzly2-http.version}
+
+
+ org.glassfish.jersey.containers
+ jersey-container-servlet-core
+ ${jersey-container-grizzly2-http.version}
+
+
+ io.netty
+ netty-resolver
+
+
+ io.prometheus
+ simpleclient_common
+ ${simpleclient_common.version}
+
+
+ com.google.api.grpc
+ proto-google-common-protos
+
+
+ io.grpc
+ grpc-context
+ ${grpc-context.version}
+
+
+ commons-codec
+ commons-codec
+
+
+ io.prometheus
+ simpleclient
+ ${simpleclient_common.version}
+
+
+ org.eclipse.jetty
+ jetty-servlet
+ ${jetty-server.version}
+
+
+ com.squareup.okio
+ okio
+
+
+ io.prometheus
+ simpleclient_hotspot
+ ${simpleclient_common.version}
+
+
+ io.grpc
+ grpc-protobuf-lite
+ ${grpc-protobuf-lite.version}
+
+
+ io.grpc
+ grpc-context
+
+
+
+
+ org.apache.commons
+ commons-collections4
+ ${commons-collections.version}
+
+
+ javax.annotation
+ javax.annotation-api
+ ${javax.annotation-api.version}
+
+
+ org.codehaus.mojo
+ animal-sniffer-annotations
+ ${codehaus-annotations.version}
+
+
+ com.github.ben-manes.caffeine
+ caffeine
+ ${caffeine.version}
+
+
+ io.netty
+ netty-codec-socks
+
+
+ org.bouncycastle
+ bcpkix-jdk15to18
+ ${bouncycastle.version}
+
+
+ org.bouncycastle
+ bcprov-ext-jdk15to18
+ ${bouncycastle.version}
+
+
+ org.bouncycastle
+ bcprov-jdk15to18
+ ${bouncycastle.version}
+
+
+ org.apache.pinot
+ pinot-spi
+
+
+ commons-codec
+ commons-codec
+
+
+ commons-logging
+ commons-logging
+
+
+
+
+
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/pom.xml
new file mode 100644
index 000000000000..f4ef4c34e8c3
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/pom.xml
@@ -0,0 +1,86 @@
+
+
+
+ 4.0.0
+
+ pinot-stream-ingestion
+ org.apache.pinot
+ 1.4.0-SNAPSHOT
+
+
+ pinot-push-api
+ Pinot Pused-Based Ingestion
+ https://pinot.apache.org/
+
+ ${basedir}/../../..
+
+
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+ org.apache.pinot
+ pinot-avro
+
+
+
+ org.glassfish.jersey.core
+ jersey-server
+
+
+ org.glassfish.jersey.containers
+ jersey-container-grizzly2-http
+
+
+ org.glassfish.jersey.inject
+ jersey-hk2
+
+
+ org.glassfish.jersey.media
+ jersey-media-json-jackson
+
+
+
+
+
+ build-shaded-jar
+
+
+ skipShade
+ !true
+
+
+
+ package
+
+
+
+ pinot-fastdev
+
+ none
+
+
+
+
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/BufferedRecord.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/BufferedRecord.java
new file mode 100644
index 000000000000..c6ab20cd03b4
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/BufferedRecord.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.push;
+
+public class BufferedRecord {
+ private final byte[] _value;
+ private final long _arrivalTimestamp;
+ private volatile long _offset;
+
+ public BufferedRecord(byte[] value) {
+ _value = value;
+ _arrivalTimestamp = System.nanoTime();
+ }
+
+ void setOffset(long offset) {
+ _offset = offset;
+ }
+
+ public byte[] getValue() {
+ return _value;
+ }
+
+ public long getArrivalTimestampNanos() {
+ return _arrivalTimestamp;
+ }
+
+ public long getOffset() {
+ return _offset;
+ }
+
+ public int getValueSize() {
+ return _value.length;
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushApiApplication.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushApiApplication.java
new file mode 100644
index 000000000000..4a7f61206f9c
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushApiApplication.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.push;
+
+import java.io.IOException;
+import java.net.URI;
+import java.time.Instant;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.ws.rs.container.ContainerRequestContext;
+import javax.ws.rs.container.ContainerResponseContext;
+import javax.ws.rs.container.ContainerResponseFilter;
+import org.glassfish.grizzly.http.server.HttpServer;
+import org.glassfish.grizzly.http.server.NetworkListener;
+import org.glassfish.hk2.utilities.binding.AbstractBinder;
+import org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpServerFactory;
+import org.glassfish.jersey.internal.guava.ThreadFactoryBuilder;
+import org.glassfish.jersey.jackson.JacksonFeature;
+import org.glassfish.jersey.process.JerseyProcessingUncaughtExceptionHandler;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PushApiApplication extends ResourceConfig {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PushApiApplication.class);
+
+ public static final String START_TIME = "serverStartTime";
+
+ private static final String RESOURCE_PACKAGES = "org.apache.pinot.plugin.stream.push.resources";
+
+ private final AtomicBoolean _shutDownInProgress = new AtomicBoolean();
+ private HttpServer _httpServer;
+ private final PushBasedIngestionBufferManager _bufferManager;
+
+ public PushApiApplication(PushBasedIngestionBufferManager bufferManager) {
+ _bufferManager = bufferManager;
+
+ packages(RESOURCE_PACKAGES);
+ Instant serverStartTime = Instant.now();
+
+ register(new AbstractBinder() {
+ @Override
+ protected void configure() {
+ bind(_shutDownInProgress).to(AtomicBoolean.class);
+ bind(_bufferManager).to(PushBasedIngestionBufferManager.class);
+ bind(serverStartTime).named(START_TIME);
+ }
+ });
+
+ register(JacksonFeature.class);
+
+ register(new ContainerResponseFilter() {
+ @Override
+ public void filter(ContainerRequestContext containerRequestContext,
+ ContainerResponseContext containerResponseContext)
+ throws IOException {
+ containerResponseContext.getHeaders().add("Access-Control-Allow-Origin", "*");
+ }
+ });
+ }
+
+ public boolean start(int port) {
+ _httpServer = buildHttpServer(this, port);
+ try {
+ LOGGER.info("Starting push API on port {}", port);
+ _httpServer.start();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to start http server", e);
+ }
+
+ return true;
+ }
+
+ public static HttpServer buildHttpServer(ResourceConfig resConfig, int port) {
+ // The URI is irrelevant since the default listener will be manually rewritten.
+ HttpServer httpServer = GrizzlyHttpServerFactory.createHttpServer(URI.create("http://0.0.0.0/"), resConfig, false);
+
+ // Listeners cannot be configured with the factory. Manual overrides are required as instructed by Javadoc.
+ // @see org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpServerFactory.createHttpServer(java.net.URI, org
+ // .glassfish.jersey.grizzly2.httpserver.GrizzlyHttpContainer, boolean, org.glassfish.grizzly.ssl
+ // .SSLEngineConfigurator, boolean)
+ httpServer.removeListener("grizzly");
+
+ final NetworkListener listener = new NetworkListener("push-api-" + port, "0.0.0.0", port);
+
+ listener.getTransport().getWorkerThreadPoolConfig().setThreadFactory(
+ new ThreadFactoryBuilder().setNameFormat("grizzly-http-server-push-api-%d")
+ .setUncaughtExceptionHandler(new JerseyProcessingUncaughtExceptionHandler()).build())
+ .setCorePoolSize(10)
+ .setMaxPoolSize(10);
+
+// if (CommonConstants.HTTPS_PROTOCOL.equals(listenerConfig.getProtocol())) {
+// listener.setSecure(true);
+// listener.setSSLEngineConfig(buildSSLEngineConfigurator(listenerConfig.getTlsConfig()));
+// }
+ httpServer.addListener(listener);
+ return httpServer;
+ }
+
+ /**
+ * Starts shutting down the HTTP server, which rejects all requests except for the liveness check.
+ */
+ public void startShuttingDown() {
+ _shutDownInProgress.set(true);
+ }
+
+ /**
+ * Stops the HTTP server.
+ */
+ public void stop() {
+ _httpServer.shutdownNow();
+ }
+
+ public HttpServer getHttpServer() {
+ return _httpServer;
+ }
+
+ public PushBasedIngestionBufferManager getBufferManager() {
+ return _bufferManager;
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionBuffer.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionBuffer.java
new file mode 100644
index 000000000000..3ce88238c402
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionBuffer.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.push;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+
+/**
+ * Kinesis stream specific config
+ */
+public class PushBasedIngestionBuffer {
+ private final BufferedRecord[] _buffer;
+ private final int _capacity;
+
+ private final ReadWriteLock _bufferLock = new ReentrantReadWriteLock();
+ private int _writeOffset = 0;
+
+ public PushBasedIngestionBuffer(int capacity) {
+ _buffer = new BufferedRecord[capacity];
+ _capacity = capacity;
+ }
+
+ public void append(BufferedRecord record) {
+ _bufferLock.writeLock().lock();
+ try {
+ record.setOffset(_writeOffset);
+ _buffer[_writeOffset % _capacity] = record;
+ _writeOffset++;
+ } finally {
+ _bufferLock.writeLock().unlock();
+ }
+ }
+
+ public void append(List records) {
+ _bufferLock.writeLock().lock();
+ try {
+ for (BufferedRecord record : records) {
+ record.setOffset(_writeOffset);
+ _buffer[_writeOffset % _capacity] = record;
+ _writeOffset++;
+ }
+ } finally {
+ _bufferLock.writeLock().unlock();
+ }
+ }
+
+ public List readNextBatch(long offset) {
+ _bufferLock.readLock().lock();
+ try {
+ if (offset >= _writeOffset) {
+ return Collections.emptyList();
+ }
+ List messages = new ArrayList<>((int) (_writeOffset - offset));
+ for (long i = offset; i < _writeOffset; i++) {
+ messages.add(_buffer[(int) i % _capacity]);
+ }
+ return messages;
+ } finally {
+ _bufferLock.readLock().unlock();
+ }
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionBufferManager.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionBufferManager.java
new file mode 100644
index 000000000000..4b1b6c6ee730
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionBufferManager.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.push;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Kinesis stream specific config
+ */
+public class PushBasedIngestionBufferManager {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PushBasedIngestionBufferManager.class);
+
+ private final int _bufferCapacity = 1000;
+ private final Map _buffersByTable;
+
+ public PushBasedIngestionBufferManager() {
+ _buffersByTable = new ConcurrentHashMap<>();
+ }
+
+ public PushBasedIngestionBuffer getBufferForTable(String tableName) {
+ return _buffersByTable.computeIfAbsent(tableName, k -> new PushBasedIngestionBuffer(_bufferCapacity));
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConfig.java
new file mode 100644
index 000000000000..d109da2a7741
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConfig.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.push;
+
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Kinesis stream specific config
+ */
+public class PushBasedIngestionConfig {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PushBasedIngestionConfig.class);
+
+ private final String _tableName;
+
+ public PushBasedIngestionConfig(StreamConfig streamConfig) {
+ _tableName = streamConfig.getTableNameWithType();
+ }
+
+ public String getTableName() {
+ return _tableName;
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumer.java
new file mode 100644
index 000000000000..fd07383f4187
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumer.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.push;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.pinot.spi.stream.BytesStreamMessage;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PartitionGroupConsumer} implementation for push-based ingestion
+ */
+public class PushBasedIngestionConsumer implements PartitionGroupConsumer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PushBasedIngestionConsumer.class);
+
+ private final PushBasedIngestionConfig _config;
+ private final PushBasedIngestionBufferManager _bufferManager;
+
+ @VisibleForTesting
+ public PushBasedIngestionConsumer(PushBasedIngestionConfig config, PushBasedIngestionBufferManager bufferManager) {
+ _config = config;
+ _bufferManager = bufferManager;
+ LOGGER.info("Created push based consumer for table: {}", config.getTableName());
+ }
+
+ @Override
+ public synchronized PushBasedIngestionMessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset,
+ int timeoutMs) {
+ LongMsgOffset startOffset = (LongMsgOffset) startMsgOffset;
+ PushBasedIngestionBuffer buffer = _bufferManager.getBufferForTable(_config.getTableName());
+ List records = buffer.readNextBatch(startOffset.getOffset());
+ LongMsgOffset offsetOfNextBatch = startOffset;
+ List messages = Collections.emptyList();
+ if (!records.isEmpty()) {
+ messages = records.stream().map(record -> extractStreamMessage(record)).collect(Collectors.toList());
+ StreamMessageMetadata lastMessageMetadata = messages.get(messages.size() - 1).getMetadata();
+ offsetOfNextBatch = (LongMsgOffset) lastMessageMetadata.getNextOffset();
+ }
+ return new PushBasedIngestionMessageBatch(messages, offsetOfNextBatch);
+ }
+
+ private BytesStreamMessage extractStreamMessage(BufferedRecord record) {
+ LOGGER.info("Consuming record with sequence id {} and ingestion delay of {}ns", record.getOffset(),
+ System.nanoTime() - record.getArrivalTimestampNanos());
+ StreamMessageMetadata messageMetadata =
+ new StreamMessageMetadata.Builder().setRecordIngestionTimeMs(record.getArrivalTimestampNanos() / 1000000)
+ .setSerializedValueSize(record.getValueSize())
+ .setOffset(new LongMsgOffset(record.getOffset()), new LongMsgOffset(record.getOffset() + 1)).build();
+ return new BytesStreamMessage(null, record.getValue(), messageMetadata);
+ }
+
+ @Override
+ public void close() {
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumerFactory.java
new file mode 100644
index 000000000000..ab617df9bcd7
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumerFactory.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.push;
+
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+
+
+/**
+ * {@link StreamConsumerFactory} implementation for the Kinesis stream
+ */
+public class PushBasedIngestionConsumerFactory extends StreamConsumerFactory {
+
+ private static volatile PushApiApplication _pushApiApplication;
+
+ @Override
+ public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) {
+ return new PushBasedIngestionMetadataProvider(clientId, _streamConfig);
+ }
+
+ @Override
+ public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
+ return new PushBasedIngestionMetadataProvider(clientId, _streamConfig);
+ }
+
+ @Override
+ public PartitionGroupConsumer createPartitionGroupConsumer(String clientId,
+ PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) {
+ // TODO this can be moved to pinot-server initialization
+ // launch the application the first time
+ if (_pushApiApplication == null) {
+ synchronized (PushBasedIngestionConsumer.class) {
+ if (_pushApiApplication == null) {
+ _pushApiApplication = new PushApiApplication(new PushBasedIngestionBufferManager());
+ // TODO make port configurable
+ _pushApiApplication.start(8989);
+ }
+ }
+ }
+ return new PushBasedIngestionConsumer(new PushBasedIngestionConfig(_streamConfig),
+ _pushApiApplication.getBufferManager());
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumerPartitionLag.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumerPartitionLag.java
new file mode 100644
index 000000000000..a3b4e471b7cd
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumerPartitionLag.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.push;
+
+import org.apache.pinot.spi.stream.PartitionLagState;
+
+
+public class PushBasedIngestionConsumerPartitionLag extends PartitionLagState {
+ private final String _recordsLag;
+ private final String _availabilityLagMs;
+
+ public PushBasedIngestionConsumerPartitionLag(String recordsLag, String availabilityLagMs) {
+ _recordsLag = recordsLag;
+ _availabilityLagMs = availabilityLagMs;
+ }
+
+ public String getRecordsLag() {
+ return _recordsLag;
+ }
+
+ @Override
+ public String getAvailabilityLagMs() {
+ return _availabilityLagMs;
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMessageBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMessageBatch.java
new file mode 100644
index 000000000000..7bba178803d8
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMessageBatch.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.push;
+
+import java.util.List;
+import org.apache.pinot.spi.stream.BytesStreamMessage;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+
+/**
+ * A {@link MessageBatch} for collecting records from the Kinesis stream
+ */
+public class PushBasedIngestionMessageBatch implements MessageBatch {
+ private final List _messages;
+ private final LongMsgOffset _offsetOfNextBatch;
+
+ public PushBasedIngestionMessageBatch(List messages, LongMsgOffset offsetOfNextBatch) {
+ _messages = messages;
+ _offsetOfNextBatch = offsetOfNextBatch;
+ }
+
+ @Override
+ public int getMessageCount() {
+ return _messages.size();
+ }
+
+ @Override
+ public BytesStreamMessage getStreamMessage(int index) {
+ return _messages.get(index);
+ }
+
+ @Override
+ public StreamPartitionMsgOffset getOffsetOfNextBatch() {
+ return _offsetOfNextBatch;
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMessageMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMessageMetadata.java
new file mode 100644
index 000000000000..d67b8ddba561
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMessageMetadata.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.push;
+
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
+
+
+// TODO: Make it a util class
+public class PushBasedIngestionMessageMetadata extends StreamMessageMetadata {
+ public static final String APPRX_ARRIVAL_TIMESTAMP_KEY = "apprxArrivalTimestamp";
+ public static final String SEQUENCE_NUMBER_KEY = "sequenceNumber";
+
+ @Deprecated
+ public PushBasedIngestionMessageMetadata(long recordIngestionTimeMs, @Nullable GenericRow headers) {
+ super(recordIngestionTimeMs, headers);
+ }
+
+ @Deprecated
+ public PushBasedIngestionMessageMetadata(long recordIngestionTimeMs, @Nullable GenericRow headers,
+ Map metadata) {
+ super(recordIngestionTimeMs, headers, metadata);
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMetadataProvider.java
new file mode 100644
index 000000000000..332ce1c8f620
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMetadataProvider.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.push;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.spi.stream.ConsumerPartitionState;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionLagState;
+import org.apache.pinot.spi.stream.RowMetadata;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link StreamMetadataProvider} implementation for the Kinesis stream
+ */
+public class PushBasedIngestionMetadataProvider implements StreamMetadataProvider {
+ private final StreamConsumerFactory _pushBasedIngestionConsumerFactory;
+ private final String _clientId;
+ private static final Logger LOGGER = LoggerFactory.getLogger(PushBasedIngestionMetadataProvider.class);
+
+ public PushBasedIngestionMetadataProvider(String clientId, StreamConfig streamConfig) {
+ _pushBasedIngestionConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
+ _clientId = clientId;
+ }
+
+ public PushBasedIngestionMetadataProvider(String clientId, StreamConfig streamConfig,
+ StreamConsumerFactory streamConsumerFactory) {
+ _pushBasedIngestionConsumerFactory = streamConsumerFactory;
+ _clientId = clientId;
+ }
+
+ @Override
+ public int fetchPartitionCount(long timeoutMillis) {
+ return 1;
+ }
+
+ @Override
+ public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) {
+ return new LongMsgOffset(0L);
+ }
+
+ @Override
+ public Map getCurrentPartitionLagState(
+ Map currentPartitionStateMap) {
+ Map perPartitionLag = new HashMap<>();
+ for (Map.Entry entry : currentPartitionStateMap.entrySet()) {
+ ConsumerPartitionState partitionState = entry.getValue();
+ // Compute records-lag
+ StreamPartitionMsgOffset currentOffset = partitionState.getCurrentOffset();
+ StreamPartitionMsgOffset upstreamLatest = partitionState.getUpstreamLatestOffset();
+ String offsetLagString = "UNKNOWN";
+
+ if (currentOffset instanceof LongMsgOffset && upstreamLatest instanceof LongMsgOffset) {
+ long offsetLag = ((LongMsgOffset) upstreamLatest).getOffset() - ((LongMsgOffset) currentOffset).getOffset();
+ offsetLagString = String.valueOf(offsetLag);
+ }
+
+ // Compute record-availability
+ String availabilityLagMs = "UNKNOWN";
+ RowMetadata lastProcessedMessageMetadata = partitionState.getLastProcessedRowMetadata();
+ if (lastProcessedMessageMetadata != null && partitionState.getLastProcessedTimeMs() > 0) {
+ long availabilityLag =
+ partitionState.getLastProcessedTimeMs() - lastProcessedMessageMetadata.getRecordIngestionTimeMs();
+ availabilityLagMs = String.valueOf(availabilityLag);
+ }
+
+ perPartitionLag.put(entry.getKey(),
+ new PushBasedIngestionConsumerPartitionLag(offsetLagString, availabilityLagMs));
+ }
+ return perPartitionLag;
+ }
+
+ @Override
+ public void close() {
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/resources/PushRecordResource.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/resources/PushRecordResource.java
new file mode 100644
index 000000000000..060dc1c10cd7
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/main/java/org/apache/pinot/plugin/stream/push/resources/PushRecordResource.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.push.resources;
+
+import com.google.common.base.Preconditions;
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import javax.inject.Inject;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.plugin.stream.push.BufferedRecord;
+import org.apache.pinot.plugin.stream.push.PushBasedIngestionBufferManager;
+import org.glassfish.grizzly.http.server.Request;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This resource API can be used to retrieve instance level information like instance tags.
+ */
+@Path("/ingestion")
+public class PushRecordResource {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PushRecordResource.class);
+
+ @Inject
+ private PushBasedIngestionBufferManager _bufferManager;
+
+ private static final String REALTIME_SUFFIX = "_REALTIME";
+
+ @POST
+ @Path("/records/{tableName}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response ingestRecords(@PathParam("tableName") String tableName, @Context Request request) {
+ Preconditions.checkArgument(StringUtils.isNotEmpty(tableName), "'tableName' cannot be null or empty");
+ LOGGER.info("Processing records for table {}", tableName);
+ if (!tableName.endsWith(REALTIME_SUFFIX)) {
+ tableName = tableName + REALTIME_SUFFIX;
+ }
+ try {
+ _bufferManager.getBufferForTable(tableName).append(generateRecord(request));
+ return Response.ok().build();
+ } catch (IOException e) {
+ LOGGER.error("Caught exception while processing records for table {}", tableName, e);
+ throw new RuntimeException("Failed to read request body", e);
+ }
+ }
+
+ private BufferedRecord generateRecord(Request request)
+ throws IOException {
+ // Get the input stream of the request to read the body
+ InputStream inputStream = request.getInputStream();
+ final byte[] bytes = drain(new BufferedInputStream(inputStream));
+ return new BufferedRecord(bytes);
+ }
+
+ byte[] drain(InputStream inputStream)
+ throws IOException {
+ final byte[] buf = new byte[1024];
+ int len;
+ final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ while ((len = inputStream.read(buf)) > 0) {
+ byteArrayOutputStream.write(buf, 0, len);
+ }
+ return byteArrayOutputStream.toByteArray();
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/test/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/test/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumerTest.java
new file mode 100644
index 000000000000..ef02720e9a11
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/test/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionConsumerTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.push;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+
+
+public class PushBasedIngestionConsumerTest {
+ private static final String STREAM_TYPE = "push";
+ private static final String TABLE_NAME_WITH_TYPE = "pushTest_REALTIME";
+ private static final String STREAM_NAME = "push-test";
+ private static final int TIMEOUT = 1000;
+ private static final int NUM_RECORDS = 10;
+ private static final String DUMMY_RECORD_PREFIX = "DUMMY_RECORD-";
+
+ private PushBasedIngestionConfig _config;
+ private PushBasedIngestionBufferManager _bufferManager;
+ private List _records;
+ private PushApiApplication _pushApiApplication;
+
+ private PushBasedIngestionConfig getConfig() {
+ Map props = new HashMap<>();
+ props.put(StreamConfigProperties.STREAM_TYPE, STREAM_TYPE);
+ props.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_TOPIC_NAME),
+ STREAM_NAME);
+ props.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
+ StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), PushBasedIngestionConsumerFactory.class.getName());
+ props.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_DECODER_CLASS),
+ "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder");
+ return new PushBasedIngestionConfig(new StreamConfig(TABLE_NAME_WITH_TYPE, props));
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws InterruptedException, IOException {
+ _bufferManager = new PushBasedIngestionBufferManager();
+ _pushApiApplication = new PushApiApplication(_bufferManager);
+ _pushApiApplication.start(8989);
+ _config = getConfig();
+ _records = new ArrayList<>(NUM_RECORDS);
+
+ for (int i = 0; i < NUM_RECORDS; i++) {
+ byte[] record = (DUMMY_RECORD_PREFIX + i).getBytes(StandardCharsets.UTF_8);
+ postRecord(record);
+ }
+ }
+
+ private void postRecord(byte[] record) throws IOException {
+ URL url = new URL("http://localhost:8989/ingestion/records/" + TABLE_NAME_WITH_TYPE);
+ URLConnection con = url.openConnection();
+ HttpURLConnection http = (HttpURLConnection) con;
+ http.setRequestMethod("POST"); // PUT is another valid option
+ http.setDoOutput(true);
+ int length = record.length;
+ http.setFixedLengthStreamingMode(length);
+ http.setRequestProperty("Content-Type", "application/octet-stream");
+ http.setRequestProperty("Content-Length", Integer.toString(length));
+ http.connect();
+ try (OutputStream os = http.getOutputStream()) {
+ os.write(record);
+ }
+ assertEquals(200, http.getResponseCode());
+ }
+
+ @AfterClass
+ public void shutDown() {
+ _pushApiApplication.stop();
+ }
+
+ @Test
+ public void testBasicConsumer() {
+ PushBasedIngestionConsumer consumer = new PushBasedIngestionConsumer(_config, _bufferManager);
+ // Fetch first batch
+ LongMsgOffset startOffset = new LongMsgOffset(0);
+ PushBasedIngestionMessageBatch messageBatch = consumer.fetchMessages(startOffset, TIMEOUT);
+ assertEquals(messageBatch.getMessageCount(), NUM_RECORDS);
+ for (int i = 0; i < NUM_RECORDS; i++) {
+ assertEquals(new String(messageBatch.getStreamMessage(i).getValue(), StandardCharsets.UTF_8),
+ DUMMY_RECORD_PREFIX + i);
+ }
+ assertFalse(messageBatch.isEndOfPartitionGroup());
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/test/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMetadataProviderTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/test/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMetadataProviderTest.java
new file mode 100644
index 000000000000..dd9da47b3509
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-push-api/src/test/java/org/apache/pinot/plugin/stream/push/PushBasedIngestionMetadataProviderTest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.push;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+
+
+public class PushBasedIngestionMetadataProviderTest {
+ private static final String STREAM_NAME = "kinesis-test";
+ private static final String CLIENT_ID = "dummy";
+ private static final int TIMEOUT = 1000;
+
+ private PushBasedIngestionMetadataProvider _metadataProvider;
+ private StreamConsumerFactory _streamConsumerFactory;
+ private PartitionGroupConsumer _partitionGroupConsumer;
+
+ private StreamConfig getStreamConfig() {
+ Map props = new HashMap<>();
+ props.put(StreamConfigProperties.STREAM_TYPE, "push");
+ props.put("stream.push.consumer.type", "lowLevel");
+ props.put("stream.push.topic.name", STREAM_NAME);
+ props.put("stream.push.decoder.class.name", "ABCD");
+ props.put("stream.push.consumer.factory.class.name",
+ "org.apache.pinot.plugin.stream.push.PushBasedIngestionConsumerFactory");
+ return new StreamConfig("pushTest", props);
+ }
+
+ @BeforeMethod
+ public void setupTest() {
+ _streamConsumerFactory = mock(StreamConsumerFactory.class);
+ _partitionGroupConsumer = mock(PartitionGroupConsumer.class);
+ _metadataProvider = new PushBasedIngestionMetadataProvider(CLIENT_ID, getStreamConfig(), _streamConsumerFactory);
+ }
+
+ @Test
+ public void getPartitionsGroupInfoListTest()
+ throws Exception {
+ List result =
+ _metadataProvider.computePartitionGroupMetadata(CLIENT_ID, getStreamConfig(), new ArrayList<>(), TIMEOUT);
+
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.get(0).getPartitionGroupId(), 0);
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pom.xml b/pinot-plugins/pinot-stream-ingestion/pom.xml
index bc8ab7b77f25..e7c57a563246 100644
--- a/pinot-plugins/pinot-stream-ingestion/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pom.xml
@@ -41,6 +41,7 @@
pinot-kafka-3.0
pinot-kinesis
pinot-pulsar
+ pinot-push-api
diff --git a/pinot-tools/src/main/resources/examples/stream/pushBasedIngestionDemo/githubPushEvents_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/pushBasedIngestionDemo/githubPushEvents_realtime_table_config.json
new file mode 100644
index 000000000000..55df570a45ac
--- /dev/null
+++ b/pinot-tools/src/main/resources/examples/stream/pushBasedIngestionDemo/githubPushEvents_realtime_table_config.json
@@ -0,0 +1,60 @@
+{
+ "tableName": "githubPushEvents",
+ "tableType": "REALTIME",
+ "tenants": {},
+ "segmentsConfig": {
+ "timeColumnName": "DaysSinceEpoch",
+ "retentionTimeUnit": "DAYS",
+ "retentionTimeValue": "5",
+ "replication": "1"
+ },
+ "tableIndexConfig": {},
+ "routing": {
+ "segmentPrunerTypes": [
+ "time"
+ ]
+ },
+ "ingestionConfig": {
+ "streamIngestionConfig": {
+ "streamConfigMaps": [
+ {
+ "streamType": "push",
+ "stream.push.topic.name": "githubPushEvents",
+ "stream.push.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
+ "stream.push.consumer.factory.class.name": "org.apache.pinot.plugin.stream.push.PushBasedIngestionConsumerFactory",
+ "realtime.segment.flush.threshold.time": "3600000",
+ "realtime.segment.flush.threshold.size": "50000"
+ }
+ ]
+ },
+ "transformConfigs": [
+ {
+ "columnName": "ts",
+ "transformFunction": "fromEpochDays(DaysSinceEpoch)"
+ },
+ {
+ "columnName": "tsRaw",
+ "transformFunction": "fromEpochDays(DaysSinceEpoch)"
+ }
+ ]
+ },
+ "fieldConfigList": [
+ {
+ "name": "ts",
+ "encodingType": "DICTIONARY",
+ "indexTypes": [
+ "TIMESTAMP"
+ ],
+ "timestampConfig": {
+ "granularities": [
+ "DAY",
+ "WEEK",
+ "MONTH"
+ ]
+ }
+ }
+ ],
+ "metadata": {
+ "customConfigs": {}
+ }
+}
diff --git a/pinot-tools/src/main/resources/examples/stream/pushBasedIngestionDemo/githubPushEvents_schema.json b/pinot-tools/src/main/resources/examples/stream/pushBasedIngestionDemo/githubPushEvents_schema.json
new file mode 100644
index 000000000000..ab3b6c2a71e5
--- /dev/null
+++ b/pinot-tools/src/main/resources/examples/stream/pushBasedIngestionDemo/githubPushEvents_schema.json
@@ -0,0 +1,31 @@
+{
+ "schemaName": "githubPushEvents",
+ "dimensionFieldSpecs": [
+ {
+ "name": "id",
+ "dataType": "STRING"
+ },
+ {
+ "name": "actorId",
+ "dataType": "STRING"
+ },
+ {
+ "name": "type",
+ "dataType": "STRING"
+ }
+ ],
+ "dateTimeFieldSpecs": [
+ {
+ "name": "created_at",
+ "dataType": "STRING",
+ "format": "1:SECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd'T'HH:mm:ssZ",
+ "granularity": "1:HOURS"
+ },
+ {
+ "name": "created_at_timestamp",
+ "dataType": "TIMESTAMP",
+ "format": "1:MILLISECONDS:TIMESTAMP",
+ "granularity": "1:SECONDS"
+ }
+ ]
+}
diff --git a/pinot-tools/src/main/resources/examples/stream/pushBasedIngestionDemo/rawdata/githubPushEvents_data.json b/pinot-tools/src/main/resources/examples/stream/pushBasedIngestionDemo/rawdata/githubPushEvents_data.json
new file mode 100644
index 000000000000..6c43eb3d29bc
--- /dev/null
+++ b/pinot-tools/src/main/resources/examples/stream/pushBasedIngestionDemo/rawdata/githubPushEvents_data.json
@@ -0,0 +1 @@
+{"Quarter":1,"FlightNum":2,"Origin":"JFK","LateAircraftDelay":null,"DivActualElapsedTime":null,"DivWheelsOns":null,"DivWheelsOffs":null,"ArrDel15":0,"AirTime":359,"DivTotalGTimes":null,"DepTimeBlk":"0900-0959","DestCityMarketID":32575,"DaysSinceEpoch":16071,"DivAirportSeqIDs":null,"DepTime":914,"Month":1,"DestStateName":"California","CRSElapsedTime":385,"Carrier":"AA","DestAirportID":12892,"Distance":2475,"ArrTimeBlk":"1200-1259","SecurityDelay":null,"DivArrDelay":null,"LongestAddGTime":null,"OriginWac":22,"WheelsOff":934,"UniqueCarrier":"AA","DestAirportSeqID":1289203,"DivReachedDest":null,"Diverted":0,"ActualElapsedTime":384,"AirlineID":19805,"OriginStateName":"New York","FlightDate":"2014-01-01","DepartureDelayGroups":0,"DivAirportLandings":0,"OriginCityName":"New York, NY","OriginStateFips":36,"OriginState":"NY","DistanceGroup":10,"WeatherDelay":null,"DestWac":91,"WheelsOn":1233,"OriginAirportID":12478,"OriginCityMarketID":31703,"NASDelay":null,"DestState":"CA","ArrTime":1238,"ArrivalDelayGroups":0,"Flights":1,"DayofMonth":1,"RandomAirports":["SEA","PSC","PHX","MSY","ATL","TYS","DEN","CHS","PDX","LAX","EWR","SFO","PIT","RDU","RAP","LSE","SAN","SBN","IAH","OAK","BRO","JFK","SAT","ORD","ACY","DFW","BWI","TPA","BFL","BOS","SNA","ISN"],"TotalAddGTime":null,"CRSDepTime":900,"DayOfWeek":3,"Dest":"LAX","CancellationCode":null,"FirstDepTime":null,"DivTailNums":null,"DepDelayMinutes":14,"DepDelay":14,"TaxiIn":5,"OriginAirportSeqID":1247802,"DestStateFips":6,"ArrDelay":13,"Cancelled":0,"DivAirportIDs":null,"TaxiOut":20,"DepDel15":0,"CarrierDelay":null,"DivLongestGTimes":null,"DivAirports":null,"DivDistance":null,"Year":2014,"CRSArrTime":1225,"ArrDelayMinutes":13,"TailNum":"N338AA","DestCityName":"Los Angeles, CA"}
\ No newline at end of file