Skip to content

Commit

Permalink
chore: interface change to list for throttled punctuator (#84)
Browse files Browse the repository at this point in the history
* chore: interface change to list for throttled punctuator

* nit
  • Loading branch information
Kishan Sairam Adapa authored Oct 18, 2023
1 parent 58173bd commit efc777e
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -16,19 +17,19 @@
@Slf4j
public abstract class AbstractThrottledPunctuator<T> implements Punctuator {
private final Clock clock;
private final KeyValueStore<Long, ArrayList<T>> eventStore;
private final KeyValueStore<Long, List<T>> eventStore;
private final ThrottledPunctuatorConfig config;

public AbstractThrottledPunctuator(
Clock clock, ThrottledPunctuatorConfig config, KeyValueStore<Long, ArrayList<T>> eventStore) {
Clock clock, ThrottledPunctuatorConfig config, KeyValueStore<Long, List<T>> eventStore) {
this.clock = clock;
this.config = config;
this.eventStore = eventStore;
}

public void scheduleTask(long scheduleMs, T event) {
long windowMs = normalize(scheduleMs);
ArrayList<T> events = Optional.ofNullable(eventStore.get(windowMs)).orElse(new ArrayList<>());
List<T> events = Optional.ofNullable(eventStore.get(windowMs)).orElse(new ArrayList<>());
events.add(event);
eventStore.put(windowMs, events);
}
Expand All @@ -40,7 +41,7 @@ public boolean rescheduleTask(long oldScheduleMs, long newScheduleMs, T event) {

public boolean cancelTask(long scheduleMs, T event) {
long windowMs = normalize(scheduleMs);
ArrayList<T> events = Optional.ofNullable(eventStore.get(windowMs)).orElse(new ArrayList<>());
List<T> events = Optional.ofNullable(eventStore.get(windowMs)).orElse(new ArrayList<>());
boolean removed = events.remove(event);
if (removed) {
if (events.isEmpty()) {
Expand All @@ -67,13 +68,13 @@ public final void punctuate(long timestamp) {
"Processing tasks with throttling yield of {} until timestamp {}",
config.getYieldMs(),
timestamp);
try (KeyValueIterator<Long, ArrayList<T>> it =
try (KeyValueIterator<Long, List<T>> it =
eventStore.range(getRangeStart(timestamp), getRangeEnd(timestamp))) {
// iterate through all keys in range until yield timeout is reached
while (it.hasNext() && !shouldYieldNow(startTime)) {
KeyValue<Long, ArrayList<T>> kv = it.next();
KeyValue<Long, List<T>> kv = it.next();
totalProcessedWindows++;
ArrayList<T> events = kv.value;
List<T> events = kv.value;
long windowMs = kv.key;
// collect all tasks to be rescheduled by key to perform bulk reschedules
Map<Long, ArrayList<T>> rescheduledTasks = new HashMap<>();
Expand All @@ -94,7 +95,7 @@ public final void punctuate(long timestamp) {
// process all reschedules
rescheduledTasks.forEach(
(newWindowMs, rescheduledEvents) -> {
ArrayList<T> windowTasks =
List<T> windowTasks =
Optional.ofNullable(eventStore.get(newWindowMs)).orElse(new ArrayList<>());
windowTasks.addAll(rescheduledEvents);
eventStore.put(newWindowMs, windowTasks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
class AbstractThrottledPunctuatorTest {
TestPunctuator underTest;
Clock clock;
KeyValueStore<Long, ArrayList<String>> objectStore;
KeyValueStore<Long, List<String>> objectStore;

@BeforeEach
void setup() {
Expand Down Expand Up @@ -100,9 +100,9 @@ void testPunctuateNoYield() {

underTest.punctuate(300);

try (KeyValueIterator<Long, ArrayList<String>> it = objectStore.range(0L, 601L)) {
try (KeyValueIterator<Long, List<String>> it = objectStore.range(0L, 601L)) {
while (it.hasNext()) {
KeyValue<Long, ArrayList<String>> kv = it.next();
KeyValue<Long, List<String>> kv = it.next();
switch (kv.key.intValue()) {
case 400:
assertEquals(List.of("schedule4", "schedule1"), kv.value);
Expand All @@ -125,9 +125,9 @@ void testPunctuateNoYield() {

underTest.punctuate(500);

try (KeyValueIterator<Long, ArrayList<String>> it = objectStore.range(0L, 601L)) {
try (KeyValueIterator<Long, List<String>> it = objectStore.range(0L, 601L)) {
while (it.hasNext()) {
KeyValue<Long, ArrayList<String>> kv = it.next();
KeyValue<Long, List<String>> kv = it.next();
if (kv.key.intValue() == 600) {
assertEquals(List.of("schedule3"), kv.value);
} else {
Expand All @@ -139,7 +139,7 @@ void testPunctuateNoYield() {
underTest.punctuate(600);

// all tasks done state store will be empty
try (KeyValueIterator<Long, ArrayList<String>> it = objectStore.range(0L, 601L)) {
try (KeyValueIterator<Long, List<String>> it = objectStore.range(0L, 601L)) {
assertFalse(it.hasNext());
}
}
Expand Down Expand Up @@ -170,9 +170,9 @@ void testPunctuateWithYield() {

underTest.punctuate(301);

try (KeyValueIterator<Long, ArrayList<String>> it = objectStore.range(0L, 601L)) {
try (KeyValueIterator<Long, List<String>> it = objectStore.range(0L, 601L)) {
while (it.hasNext()) {
KeyValue<Long, ArrayList<String>> kv = it.next();
KeyValue<Long, List<String>> kv = it.next();
switch (kv.key.intValue()) {
case 200:
assertEquals(List.of("schedule2.2"), kv.value);
Expand All @@ -193,7 +193,7 @@ class TestPunctuator extends AbstractThrottledPunctuator<String> {
public TestPunctuator(
Clock clock,
ThrottledPunctuatorConfig config,
KeyValueStore<Long, ArrayList<String>> objectStore) {
KeyValueStore<Long, List<String>> objectStore) {
super(clock, config, objectStore);
}

Expand Down

0 comments on commit efc777e

Please sign in to comment.