Skip to content

Commit

Permalink
feat: add throttled punctuator (#82)
Browse files Browse the repository at this point in the history
* feat: add callback registry punctuator

* change package

* nit

* expose punctuate timestamp

* expose punctuate timestamp

* make abstract

* nit

* visiblity

* empty fix

* fix

* nit

* some more changes

* rename

* pass on boolean

* logging

* nit

* renames

* address comments

* log tune

* opt

* nit

* remove list

* update

* wrap up

* minor refactor

* nit

* remove

---------

Co-authored-by: Laxman Ch <laxman@traceable.ai>
  • Loading branch information
Kishan Sairam Adapa and laxmanchekka authored Oct 13, 2023
1 parent f57eb9b commit 58173bd
Show file tree
Hide file tree
Showing 7 changed files with 415 additions and 2 deletions.
4 changes: 2 additions & 2 deletions kafka-streams-framework/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ dependencies {

implementation("org.apache.avro:avro")
implementation("org.apache.kafka:kafka-clients")
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.60")
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.60")
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.61")
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.61")
implementation("org.apache.commons:commons-lang3:3.12.0")

testCompileOnly("org.projectlombok:lombok:1.18.26")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package org.hypertrace.core.kafkastreams.framework.punctuators;

import java.time.Clock;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.hypertrace.core.kafkastreams.framework.punctuators.action.TaskResult;

@Slf4j
public abstract class AbstractThrottledPunctuator<T> implements Punctuator {
private final Clock clock;
private final KeyValueStore<Long, ArrayList<T>> eventStore;
private final ThrottledPunctuatorConfig config;

public AbstractThrottledPunctuator(
Clock clock, ThrottledPunctuatorConfig config, KeyValueStore<Long, ArrayList<T>> eventStore) {
this.clock = clock;
this.config = config;
this.eventStore = eventStore;
}

public void scheduleTask(long scheduleMs, T event) {
long windowMs = normalize(scheduleMs);
ArrayList<T> events = Optional.ofNullable(eventStore.get(windowMs)).orElse(new ArrayList<>());
events.add(event);
eventStore.put(windowMs, events);
}

public boolean rescheduleTask(long oldScheduleMs, long newScheduleMs, T event) {
scheduleTask(newScheduleMs, event);
return cancelTask(oldScheduleMs, event);
}

public boolean cancelTask(long scheduleMs, T event) {
long windowMs = normalize(scheduleMs);
ArrayList<T> events = Optional.ofNullable(eventStore.get(windowMs)).orElse(new ArrayList<>());
boolean removed = events.remove(event);
if (removed) {
if (events.isEmpty()) {
eventStore.delete(windowMs);
} else {
eventStore.put(windowMs, events);
}
} else {
log.warn(
"task cancel failed. event not found for ts: {}, window: {}",
new Date(scheduleMs),
new Date(windowMs));
}
return removed;
}

@Override
public final void punctuate(long timestamp) {
long startTime = clock.millis();
int totalProcessedWindows = 0;
int totalProcessedTasks = 0;

log.debug(
"Processing tasks with throttling yield of {} until timestamp {}",
config.getYieldMs(),
timestamp);
try (KeyValueIterator<Long, ArrayList<T>> it =
eventStore.range(getRangeStart(timestamp), getRangeEnd(timestamp))) {
// iterate through all keys in range until yield timeout is reached
while (it.hasNext() && !shouldYieldNow(startTime)) {
KeyValue<Long, ArrayList<T>> kv = it.next();
totalProcessedWindows++;
ArrayList<T> events = kv.value;
long windowMs = kv.key;
// collect all tasks to be rescheduled by key to perform bulk reschedules
Map<Long, ArrayList<T>> rescheduledTasks = new HashMap<>();
// loop through all events for this key until yield timeout is reached
int i = 0;
for (; i < events.size() && !shouldYieldNow(startTime); i++) {
T event = events.get(i);
totalProcessedTasks++;
TaskResult action = executeTask(timestamp, event);
action
.getRescheduleTimestamp()
.ifPresent(
(rescheduleTimestamp) ->
rescheduledTasks
.computeIfAbsent(normalize(rescheduleTimestamp), (t) -> new ArrayList<>())
.add(event));
}
// process all reschedules
rescheduledTasks.forEach(
(newWindowMs, rescheduledEvents) -> {
ArrayList<T> windowTasks =
Optional.ofNullable(eventStore.get(newWindowMs)).orElse(new ArrayList<>());
windowTasks.addAll(rescheduledEvents);
eventStore.put(newWindowMs, windowTasks);
});

// all tasks till i-1 have been cancelled or rescheduled hence to be removed from store
if (i == events.size()) {
// can directly delete key from store
eventStore.delete(windowMs);
} else {
eventStore.put(windowMs, new ArrayList<>(events.subList(i, events.size())));
}
}
}
log.info(
"processed windows: {}, processed tasks: {}, time taken: {}",
totalProcessedWindows,
totalProcessedTasks,
clock.millis() - startTime);
}

protected abstract TaskResult executeTask(long punctuateTimestamp, T object);

protected long getRangeStart(long punctuateTimestamp) {
return 0;
}

protected long getRangeEnd(long punctuateTimestamp) {
return punctuateTimestamp;
}

private boolean shouldYieldNow(long startTimestamp) {
return (clock.millis() - startTimestamp) > config.getYieldMs();
}

private long normalize(long timestamp) {
return timestamp - (timestamp % config.getWindowMs());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.hypertrace.core.kafkastreams.framework.punctuators;

import static org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;

import com.typesafe.config.Config;
import lombok.Getter;

@Getter
public class ThrottledPunctuatorConfig {
private static final String YIELD_CONFIG_SUFFIX = ".yield.ms";
private static final String WINDOW_CONFIG_SUFFIX = ".window.ms";
private static final long DEFAULT_WINDOW_MS = 1;
private static final double DEFAULT_YIELD_SESSION_TIMEOUT_RATIO = 1 / 5.0;
private final long yieldMs;
private final long windowMs;

public ThrottledPunctuatorConfig(Config kafkaStreamsConfig, String punctuatorName) {
if (kafkaStreamsConfig.hasPath(punctuatorName + YIELD_CONFIG_SUFFIX)) {
this.yieldMs = kafkaStreamsConfig.getLong(punctuatorName + YIELD_CONFIG_SUFFIX);
} else {
// when not configured, set to 20% of session timeout.
this.yieldMs =
(long)
(kafkaStreamsConfig.getLong(consumerPrefix(SESSION_TIMEOUT_MS_CONFIG))
* DEFAULT_YIELD_SESSION_TIMEOUT_RATIO);
}
if (kafkaStreamsConfig.hasPath(punctuatorName + WINDOW_CONFIG_SUFFIX)) {
this.windowMs = kafkaStreamsConfig.getLong(punctuatorName + WINDOW_CONFIG_SUFFIX);
} else {
this.windowMs = DEFAULT_WINDOW_MS;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.hypertrace.core.kafkastreams.framework.punctuators.action;

import java.util.Optional;

public class CompletedTaskResult implements TaskResult {

@Override
public Optional<Long> getRescheduleTimestamp() {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.hypertrace.core.kafkastreams.framework.punctuators.action;

import java.util.Optional;

public class RescheduleTaskResult implements TaskResult {
private final long rescheduleTimestamp;

public RescheduleTaskResult(long rescheduleTimestamp) {
this.rescheduleTimestamp = rescheduleTimestamp;
}

@Override
public Optional<Long> getRescheduleTimestamp() {
return Optional.of(rescheduleTimestamp);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.hypertrace.core.kafkastreams.framework.punctuators.action;

import java.util.Optional;

public interface TaskResult {
Optional<Long> getRescheduleTimestamp();
}
Loading

0 comments on commit 58173bd

Please sign in to comment.