diff --git a/.github/workflows/docker-tests.yml b/.github/workflows/docker-tests.yml
index ecb2c0027..e3bd80e14 100644
--- a/.github/workflows/docker-tests.yml
+++ b/.github/workflows/docker-tests.yml
@@ -34,13 +34,14 @@ jobs:
working-directory: ./.github/workflows/hypertrace-ingester
# Below tests a docker-compose.yml service named 'sut' with a valid HEALTHCHECK instruction:
run: |
- docker-compose -f docker-compose.yml -f docker-compose-zipkin-example.yml up -d || { ../scripts/inspect.sh ; exit 1 ; }
+ docker-compose -f docker-compose.yml -f docker-compose-zipkin-example.yml up -d || { ./scripts/inspect.sh docker-compose.yml docker-compose-zipkin-example.yml ; exit 1 ; }
- name: Waits for some stability
working-directory: ./.github/workflows/hypertrace-ingester
run: |
sleep 60 # you can decrease it but never increase it
docker-compose -f docker-compose.yml ps
+ ./scripts/inspect.sh docker-compose.yml docker-compose-zipkin-example.yml
- name: Runs tests
working-directory: ./.github/workflows/hypertrace-ingester/scripts
@@ -67,13 +68,14 @@ jobs:
working-directory: ./.github/workflows/hypertrace-ingester
# Below tests a docker-compose.yml service named 'sut' with a valid HEALTHCHECK instruction:
run: |
- docker-compose -f postgres/docker-compose.yml -f docker-compose-zipkin-example.yml up -d || { ../scripts/inspect.sh ; exit 1 ; }
+ docker-compose -f postgres/docker-compose.yml -f docker-compose-zipkin-example.yml up -d || { ./scripts/inspect.sh postgres/docker-compose.yml docker-compose-zipkin-example.yml ; exit 1 ; }
- name: Waits for some stability
working-directory: ./.github/workflows/hypertrace-ingester
run: |
sleep 60 # you can decrease it but never increase it
docker-compose -f postgres/docker-compose.yml ps
+ ./scripts/inspect.sh postgres/docker-compose.yml docker-compose-zipkin-example.yml
- name: Runs tests
working-directory: ./.github/workflows/hypertrace-ingester/scripts
diff --git a/.github/workflows/hypertrace-ingester/scripts/inspect.sh b/.github/workflows/hypertrace-ingester/scripts/inspect.sh
index ec16206c1..5e86e330e 100755
--- a/.github/workflows/hypertrace-ingester/scripts/inspect.sh
+++ b/.github/workflows/hypertrace-ingester/scripts/inspect.sh
@@ -1,11 +1,8 @@
#!/bin/bash
-# This script displays the state and logs for the containers in the docker-compose.
+echo "Inspecting compose containers from $1 and $2"
-SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
-DOCKER_COMPOSE_FILE_DIR="$(dirname $SCRIPT_DIR)/docker"
-
-containers=$(docker-compose -f ${DOCKER_COMPOSE_FILE_DIR}/docker-compose.yml -f ${DOCKER_COMPOSE_FILE_DIR}/docker-compose-zipkin-example.yml ps -q -a)
+containers=$(docker-compose -f $1 -f $2 ps -q -a)
while IFS= read -r container; do
name=$(docker inspect $container | jq -r '.[0].Name')
echo "=================="
diff --git a/.github/workflows/hypertrace-ingester/scripts/tests.sh b/.github/workflows/hypertrace-ingester/scripts/tests.sh
index b271c9f2e..c4a8f4f3a 100755
--- a/.github/workflows/hypertrace-ingester/scripts/tests.sh
+++ b/.github/workflows/hypertrace-ingester/scripts/tests.sh
@@ -38,6 +38,10 @@ echo "Calling the frontend to generate a trace..."
echo ""
curl -o /dev/null -s http://${FRONTEND_SERVICE_HOST}:8081 || { echo "Host $FRONTEND_SERVICE_HOST is down." ; exit 1; }
+# grouper depends on stream time advancement, hence we are sending another set of traces just to meet that condition
+sleep 35
+curl -o /dev/null -s http://${FRONTEND_SERVICE_HOST}:8081 || { echo "Host $FRONTEND_SERVICE_HOST is down." ; exit 1; }
+
echo "Retrieving the list of traces."
echo ""
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index f0ae77bc2..41c3bccee 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -3,7 +3,7 @@ hypertrace-entity-service = "0.8.78"
hypertrace-config-service = "0.1.54"
hypertrace-grpc-utils = "0.12.4"
hypertrace-serviceFramework = "0.1.60"
-hypertrace-kafkaStreams = "0.3.8"
+hypertrace-kafkaStreams = "0.4.0"
hypertrace-view-generator = "0.4.19"
grpc = "1.57.2"
diff --git a/hypertrace-ingester/src/test/java/org/hypertrace/ingester/HypertraceIngesterTest.java b/hypertrace-ingester/src/test/java/org/hypertrace/ingester/HypertraceIngesterTest.java
index c2b17280c..e38d417e9 100644
--- a/hypertrace-ingester/src/test/java/org/hypertrace/ingester/HypertraceIngesterTest.java
+++ b/hypertrace-ingester/src/test/java/org/hypertrace/ingester/HypertraceIngesterTest.java
@@ -9,7 +9,6 @@
import io.jaegertracing.api_v2.JaegerSpanInternalModel.Span;
import java.io.File;
import java.nio.file.Path;
-import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -86,6 +85,9 @@ public void testIngestionPacketFlow(@TempDir Path tempDir) {
new JaegerSpanSerde().serializer());
spanNormalizerInputTopic.pipeInput(span);
+ // we use stream time for emit in grouper hence this additional pipe of a span after grouping
+ // window
+ spanNormalizerInputTopic.pipeInput(null, span, System.currentTimeMillis() + 30_001);
// create output topic for span-normalizer topology
TestOutputTopic spanNormalizerOutputTopic =
@@ -96,8 +98,6 @@ public void testIngestionPacketFlow(@TempDir Path tempDir) {
new AvroSerde<>().deserializer());
assertNotNull(spanNormalizerOutputTopic.readKeyValue());
- topologyTestDriver.advanceWallClockTime(Duration.ofSeconds(32));
-
// create output topic for span-grouper topology
TestOutputTopic spanGrouperOutputTopic =
topologyTestDriver.createOutputTopic(
diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/resources/configs/common/application.conf b/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/resources/configs/common/application.conf
index f328b9cfd..2be70f962 100644
--- a/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/resources/configs/common/application.conf
+++ b/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/resources/configs/common/application.conf
@@ -114,7 +114,7 @@ enricher {
}
TraceStatsEnricher {
- class = "org.hypertrace.traceenricher.enrichment.enrichers.TraceStatsEnrichere"
+ class = "org.hypertrace.traceenricher.enrichment.enrichers.TraceStatsEnricher"
dependencies = ["EndpointEnricher"]
}
diff --git a/owasp-suppressions.xml b/owasp-suppressions.xml
index 509d87f40..3432d98a0 100644
--- a/owasp-suppressions.xml
+++ b/owasp-suppressions.xml
@@ -60,21 +60,6 @@
^pkg:maven/io\.opentelemetry/opentelemetry\-exporter\-prometheus@.*$
cpe:/a:prometheus:prometheus
-
-
- ^pkg:maven/org\.json/json@.*$
- CVE-2022-45688
-
-
-
- ^pkg:maven/com\.fasterxml\.jackson\.core/jackson\-databind@.*$
- CVE-2023-35116
-
^pkg:maven/org\.hypertrace\.core\.kafkastreams\.framework/avro\-partitioners@.*$
CVE-2023-37475
-
+
^pkg:maven/org\.quartz\-scheduler/quartz@.*$
CVE-2023-39017
-
+
@@ -104,18 +89,16 @@
^pkg:maven/com\.fasterxml\.jackson\.core/jackson\-databind@.*$
CVE-2023-35116
-
-
- ^pkg:maven/io\.netty/netty\-handler@.*$
- CVE-2023-4586
-
-
+
- ^pkg:maven/io\.netty/netty\-handler@.*$
+ ^pkg:maven/io\.netty/netty.*@.*$
CVE-2023-4586
\ No newline at end of file
diff --git a/raw-spans-grouper/helm/templates/raw-spans-grouper-config.yaml b/raw-spans-grouper/helm/templates/raw-spans-grouper-config.yaml
index 220bacf1c..ca65a46e4 100644
--- a/raw-spans-grouper/helm/templates/raw-spans-grouper-config.yaml
+++ b/raw-spans-grouper/helm/templates/raw-spans-grouper-config.yaml
@@ -81,3 +81,7 @@ data:
}
}
{{- end }}
+
+ {{- if hasKey .Values.rawSpansGrouperConfig "traceEmitPunctuatorFrequency" }}
+ trace.emit.punctuator.frequency = {{ .Values.rawSpansGrouperConfig.traceEmitPunctuatorFrequency }}
+ {{- end }}
diff --git a/raw-spans-grouper/helm/values.yaml b/raw-spans-grouper/helm/values.yaml
index 028f87f4b..139c0855c 100644
--- a/raw-spans-grouper/helm/values.yaml
+++ b/raw-spans-grouper/helm/values.yaml
@@ -132,6 +132,7 @@ rawSpansGrouperConfig:
groupPartitionerEnabled: false
groupPartitionerConfigServiceHost: config-service
groupPartitionerConfigServicePort: 50104
+ traceEmitPunctuatorFrequency: 5s
span:
groupby:
diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java
index 244782024..7c91a30d7 100644
--- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java
+++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java
@@ -17,4 +17,8 @@ public class RawSpanGrouperConstants {
public static final String DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT = "default.max.span.count";
public static final String DROPPED_SPANS_COUNTER = "hypertrace.dropped.spans";
public static final String TRUNCATED_TRACES_COUNTER = "hypertrace.truncated.traces";
+ static final String TRACE_EMIT_PUNCTUATOR = "trace-emit-punctuator";
+ static final String TRACE_EMIT_PUNCTUATOR_STORE_NAME = "emitter-store";
+ static final String TRACE_EMIT_PUNCTUATOR_FREQUENCY_CONFIG_KEY =
+ "trace.emit.punctuator.frequency";
}
diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouper.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouper.java
index d70d8ba9f..aaa89e693 100644
--- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouper.java
+++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouper.java
@@ -5,12 +5,17 @@
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.OUTPUT_TOPIC_PRODUCER;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.RAW_SPANS_GROUPER_JOB_CONFIG;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.SPAN_STATE_STORE_NAME;
+import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_EMIT_PUNCTUATOR_STORE_NAME;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_STATE_STORE;
+import com.google.common.annotations.VisibleForTesting;
import com.typesafe.config.Config;
+import java.time.Clock;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
@@ -44,6 +49,15 @@ public StreamsBuilder buildTopology(
Map properties,
StreamsBuilder streamsBuilder,
Map> inputStreams) {
+ return buildTopologyWithClock(Clock.systemUTC(), properties, streamsBuilder, inputStreams);
+ }
+
+ @VisibleForTesting
+ StreamsBuilder buildTopologyWithClock(
+ Clock clock,
+ Map properties,
+ StreamsBuilder streamsBuilder,
+ Map> inputStreams) {
Config jobConfig = getJobConfig(properties);
String inputTopic = jobConfig.getString(INPUT_TOPIC_CONFIG_KEY);
String outputTopic = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY);
@@ -72,8 +86,16 @@ public StreamsBuilder buildTopology(
Stores.persistentKeyValueStore(SPAN_STATE_STORE_NAME), keySerde, valueSerde)
.withCachingEnabled();
+ StoreBuilder> traceEmitPunctuatorStoreBuilder =
+ Stores.keyValueStoreBuilder(
+ Stores.persistentKeyValueStore(TRACE_EMIT_PUNCTUATOR_STORE_NAME),
+ Serdes.Long(),
+ Serdes.ListSerde(ArrayList.class, valueSerde))
+ .withCachingEnabled();
+
streamsBuilder.addStateStore(spanStoreBuilder);
streamsBuilder.addStateStore(traceStateStoreBuilder);
+ streamsBuilder.addStateStore(traceEmitPunctuatorStoreBuilder);
StreamPartitioner groupPartitioner =
new GroupPartitionerBuilder()
@@ -89,11 +111,12 @@ public StreamsBuilder buildTopology(
outputTopicProducer = outputTopicProducer.withName(OUTPUT_TOPIC_PRODUCER);
inputStream
- .transform(
- RawSpansProcessor::new,
+ .process(
+ () -> new RawSpansProcessor(clock),
Named.as(RawSpansProcessor.class.getSimpleName()),
SPAN_STATE_STORE_NAME,
- TRACE_STATE_STORE)
+ TRACE_STATE_STORE,
+ TRACE_EMIT_PUNCTUATOR_STORE_NAME)
.to(outputTopic, outputTopicProducer);
return streamsBuilder;
diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java
index 1dc710bcc..885b023f4 100644
--- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java
+++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java
@@ -1,6 +1,7 @@
package org.hypertrace.core.rawspansgrouper;
import static org.hypertrace.core.datamodel.shared.AvroBuilderCache.fastNewBuilder;
+import static org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp.KAFKA_STREAMS_CONFIG_KEY;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DROPPED_SPANS_COUNTER;
@@ -9,6 +10,9 @@
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.RAW_SPANS_GROUPER_JOB_CONFIG;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.SPAN_GROUPBY_SESSION_WINDOW_INTERVAL_CONFIG_KEY;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.SPAN_STATE_STORE_NAME;
+import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_EMIT_PUNCTUATOR;
+import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_EMIT_PUNCTUATOR_FREQUENCY_CONFIG_KEY;
+import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_EMIT_PUNCTUATOR_STORE_NAME;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_STATE_STORE;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRUNCATED_TRACES_COUNTER;
@@ -16,8 +20,10 @@
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import java.nio.ByteBuffer;
+import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -25,17 +31,16 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.hypertrace.core.datamodel.RawSpan;
import org.hypertrace.core.datamodel.StructuredTrace;
import org.hypertrace.core.datamodel.shared.HexUtils;
+import org.hypertrace.core.kafkastreams.framework.punctuators.ThrottledPunctuatorConfig;
import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry;
import org.hypertrace.core.spannormalizer.SpanIdentity;
import org.hypertrace.core.spannormalizer.TraceIdentity;
@@ -47,26 +52,17 @@
* Receives spans keyed by trace_id and stores them. A {@link TraceEmitPunctuator} is scheduled to
* run after the {@link RawSpansProcessor#groupingWindowTimeoutMs} interval to emit the trace. If
* any spans for the trace arrive within the {@link RawSpansProcessor#groupingWindowTimeoutMs}
- * interval then the {@link RawSpansProcessor#groupingWindowTimeoutMs} will get reset and the trace
- * will get an additional {@link RawSpansProcessor#groupingWindowTimeoutMs} time to accept spans.
+ * interval then the trace will get an additional {@link RawSpansProcessor#groupingWindowTimeoutMs}
+ * time to accept spans.
*/
public class RawSpansProcessor
- implements Transformer> {
+ implements Processor {
private static final Logger logger = LoggerFactory.getLogger(RawSpansProcessor.class);
private static final String PROCESSING_LATENCY_TIMER =
"hypertrace.rawspansgrouper.processing.latency";
private static final ConcurrentMap tenantToSpansGroupingTimer =
new ConcurrentHashMap<>();
- private ProcessorContext context;
- private KeyValueStore spanStore;
- private KeyValueStore traceStateStore;
- private long groupingWindowTimeoutMs;
- private To outputTopic;
- private double dataflowSamplingPercent = -1;
- private static final Map maxSpanCountMap = new HashMap<>();
- private long defaultMaxSpanCountLimit = Long.MAX_VALUE;
-
// counter for number of spans dropped per tenant
private static final ConcurrentMap droppedSpansCounter =
new ConcurrentHashMap<>();
@@ -74,14 +70,24 @@ public class RawSpansProcessor
// counter for number of truncated traces per tenant
private static final ConcurrentMap truncatedTracesCounter =
new ConcurrentHashMap<>();
+ private final Clock clock;
+ private KeyValueStore spanStore;
+ private KeyValueStore traceStateStore;
+ private long groupingWindowTimeoutMs;
+ private double dataflowSamplingPercent = -1;
+ private static final Map maxSpanCountMap = new HashMap<>();
+ private long defaultMaxSpanCountLimit = Long.MAX_VALUE;
+ private TraceEmitPunctuator traceEmitPunctuator;
+ private Cancellable traceEmitTasksPunctuatorCancellable;
+
+ public RawSpansProcessor(Clock clock) {
+ this.clock = clock;
+ }
@Override
- public void init(ProcessorContext context) {
- this.context = context;
- this.spanStore =
- (KeyValueStore) context.getStateStore(SPAN_STATE_STORE_NAME);
- this.traceStateStore =
- (KeyValueStore) context.getStateStore(TRACE_STATE_STORE);
+ public void init(ProcessorContext context) {
+ this.spanStore = context.getStateStore(SPAN_STATE_STORE_NAME);
+ this.traceStateStore = context.getStateStore(TRACE_STATE_STORE);
Config jobConfig = (Config) (context.appConfigs().get(RAW_SPANS_GROUPER_JOB_CONFIG));
this.groupingWindowTimeoutMs =
jobConfig.getLong(SPAN_GROUPBY_SESSION_WINDOW_INTERVAL_CONFIG_KEY) * 1000;
@@ -94,7 +100,8 @@ public void init(ProcessorContext context) {
if (jobConfig.hasPath(INFLIGHT_TRACE_MAX_SPAN_COUNT)) {
Config subConfig = jobConfig.getConfig(INFLIGHT_TRACE_MAX_SPAN_COUNT);
- subConfig.entrySet().stream()
+ subConfig
+ .entrySet()
.forEach(
(entry) -> {
maxSpanCountMap.put(entry.getKey(), subConfig.getLong(entry.getKey()));
@@ -105,19 +112,50 @@ public void init(ProcessorContext context) {
defaultMaxSpanCountLimit = jobConfig.getLong(DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT);
}
- this.outputTopic = To.child(OUTPUT_TOPIC_PRODUCER);
- restorePunctuators();
+ KeyValueStore> traceEmitPunctuatorStore =
+ context.getStateStore(TRACE_EMIT_PUNCTUATOR_STORE_NAME);
+ traceEmitPunctuator =
+ new TraceEmitPunctuator(
+ new ThrottledPunctuatorConfig(
+ jobConfig.getConfig(KAFKA_STREAMS_CONFIG_KEY), TRACE_EMIT_PUNCTUATOR),
+ traceEmitPunctuatorStore,
+ context,
+ spanStore,
+ traceStateStore,
+ OUTPUT_TOPIC_PRODUCER,
+ groupingWindowTimeoutMs,
+ dataflowSamplingPercent);
+ // Punctuator scheduled on stream time => no input messages => no emits will happen
+ // We will almost never have input down to 0, i.e., there are no spans coming to platform,
+ // While using wall clock time handles that case, there is an issue with using wall clock.
+ // In cases of lag being burnt, we are processing message produced at different time stamp
+ // intervals, probably at higher rate than which they were produced, now not doing punctuation
+ // often will increase the amount of work yielding punctuator in next iterations and will keep
+ // on piling up until lag is burnt completely and only then the punctuator will catch up back to
+ // normal input rate. This is undesirable, here the outputs are only emitted from punctuator.
+ // If we burn lag from input topic, we want to push it down to output & downstream as soon
+ // as possible, if we hog it more and more it will delay cascading lag to downstream. Given
+ // grouper stays at start of pipeline and also that input dying down almost never happens
+ // it is better to use stream time over wall clock time for yielding trace emit tasks punctuator
+ traceEmitTasksPunctuatorCancellable =
+ context.schedule(
+ jobConfig.getDuration(TRACE_EMIT_PUNCTUATOR_FREQUENCY_CONFIG_KEY),
+ PunctuationType.STREAM_TIME,
+ traceEmitPunctuator);
}
- public KeyValue transform(TraceIdentity key, RawSpan value) {
+ @Override
+ public void process(Record record) {
Instant start = Instant.now();
- long currentTimeMs = System.currentTimeMillis();
+ long currentTimeMs = clock.millis();
+ TraceIdentity key = record.key();
+ RawSpan value = record.value();
TraceState traceState = traceStateStore.get(key);
boolean firstEntry = (traceState == null);
if (shouldDropSpan(key, traceState)) {
- return null;
+ return;
}
String tenantId = key.getTenantId();
@@ -125,35 +163,27 @@ public KeyValue transform(TraceIdentity key, Raw
ByteBuffer spanId = value.getEvent().getEventId();
spanStore.put(new SpanIdentity(tenantId, traceId, spanId), value);
- /*
- the trace emit ts is essentially currentTs + groupingWindowTimeoutMs
- i.e. if there is no span added in the next 'groupingWindowTimeoutMs' interval
- then the trace can be finalized and emitted
- */
- long traceEmitTs = currentTimeMs + groupingWindowTimeoutMs;
- if (logger.isDebugEnabled()) {
- logger.debug(
- "Updating trigger_ts=[{}] for for tenant_id=[{}], trace_id=[{}]",
- Instant.ofEpochMilli(traceEmitTs),
- key.getTenantId(),
- HexUtils.getHex(traceId));
- }
-
if (firstEntry) {
traceState =
fastNewBuilder(TraceState.Builder.class)
.setTraceStartTimestamp(currentTimeMs)
.setTraceEndTimestamp(currentTimeMs)
- .setEmitTs(traceEmitTs)
+ .setEmitTs(-1) // deprecated, not used anymore
.setTenantId(tenantId)
.setTraceId(traceId)
.setSpanIds(List.of(spanId))
.build();
- schedulePunctuator(key);
+ traceEmitPunctuator.scheduleTask(currentTimeMs, key);
} else {
traceState.getSpanIds().add(spanId);
+ long prevScheduleTimestamp = traceState.getTraceEndTimestamp();
traceState.setTraceEndTimestamp(currentTimeMs);
- traceState.setEmitTs(traceEmitTs);
+ if (!traceEmitPunctuator.rescheduleTask(
+ prevScheduleTimestamp, currentTimeMs + groupingWindowTimeoutMs, key)) {
+ logger.debug(
+ "Failed to reschedule task on getting span for trace key {}, schedule already dropped!",
+ key);
+ }
}
traceStateStore.put(key, traceState);
@@ -165,13 +195,13 @@ public KeyValue transform(TraceIdentity key, Raw
PlatformMetricsRegistry.registerTimer(
PROCESSING_LATENCY_TIMER, Map.of("tenantId", k)))
.record(Duration.between(start, Instant.now()).toMillis(), TimeUnit.MILLISECONDS);
- // the punctuator will emit the trace
- return null;
+ // no need to do context.forward. the punctuator will emit the trace once it's eligible to be
+ // emitted
+ return;
}
private boolean shouldDropSpan(TraceIdentity key, TraceState traceState) {
- int inFlightSpansPerTrace =
- traceState != null ? traceState.getSpanIds().size() : Integer.MIN_VALUE;
+ int inFlightSpansPerTrace = traceState != null ? traceState.getSpanIds().size() : 0;
long maxSpanCountTenantLimit =
maxSpanCountMap.containsKey(key.getTenantId())
? maxSpanCountMap.get(key.getTenantId())
@@ -213,45 +243,8 @@ private boolean shouldDropSpan(TraceIdentity key, TraceState traceState) {
return false;
}
- private void schedulePunctuator(TraceIdentity key) {
- TraceEmitPunctuator punctuator =
- new TraceEmitPunctuator(
- key,
- context,
- spanStore,
- traceStateStore,
- outputTopic,
- groupingWindowTimeoutMs,
- dataflowSamplingPercent);
- Cancellable cancellable =
- context.schedule(
- Duration.ofMillis(groupingWindowTimeoutMs), PunctuationType.STREAM_TIME, punctuator);
- punctuator.setCancellable(cancellable);
- logger.debug(
- "Scheduled a punctuator to emit trace for key=[{}] to run after [{}] ms",
- key,
- groupingWindowTimeoutMs);
- }
-
@Override
- public void close() {}
-
- /**
- * Punctuators are not persisted - so on restart we recover punctuators and schedule them to run
- * after {@link RawSpansProcessor#groupingWindowTimeoutMs}
- */
- void restorePunctuators() {
- long count = 0;
- Instant start = Instant.now();
- try (KeyValueIterator it = traceStateStore.all()) {
- while (it.hasNext()) {
- schedulePunctuator(it.next().key);
- count++;
- }
- logger.info(
- "Restored=[{}] punctuators, Duration=[{}]",
- count,
- Duration.between(start, Instant.now()));
- }
+ public void close() {
+ traceEmitTasksPunctuatorCancellable.cancel();
}
}
diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuator.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuator.java
index 0c0cae52a..28b6f81cb 100644
--- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuator.java
+++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuator.java
@@ -9,8 +9,7 @@
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import java.nio.ByteBuffer;
-import java.time.Duration;
-import java.time.Instant;
+import java.time.Clock;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -20,11 +19,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
-import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.hypertrace.core.datamodel.RawSpan;
import org.hypertrace.core.datamodel.StructuredTrace;
@@ -33,6 +29,11 @@
import org.hypertrace.core.datamodel.shared.DataflowMetricUtils;
import org.hypertrace.core.datamodel.shared.HexUtils;
import org.hypertrace.core.datamodel.shared.trace.StructuredTraceBuilder;
+import org.hypertrace.core.kafkastreams.framework.punctuators.AbstractThrottledPunctuator;
+import org.hypertrace.core.kafkastreams.framework.punctuators.ThrottledPunctuatorConfig;
+import org.hypertrace.core.kafkastreams.framework.punctuators.action.CompletedTaskResult;
+import org.hypertrace.core.kafkastreams.framework.punctuators.action.RescheduleTaskResult;
+import org.hypertrace.core.kafkastreams.framework.punctuators.action.TaskResult;
import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry;
import org.hypertrace.core.spannormalizer.SpanIdentity;
import org.hypertrace.core.spannormalizer.TraceIdentity;
@@ -41,10 +42,10 @@
import org.slf4j.LoggerFactory;
/**
- * Checks if a trace can be finalized and emitted based on inactivity period of {@link
+ * Check if a trace can be finalized and emitted based on inactivity period of {@link
* RawSpansProcessor#groupingWindowTimeoutMs}
*/
-class TraceEmitPunctuator implements Punctuator {
+class TraceEmitPunctuator extends AbstractThrottledPunctuator {
private static final Logger logger = LoggerFactory.getLogger(TraceEmitPunctuator.class);
private static final Object mutex = new Object();
@@ -54,10 +55,6 @@ class TraceEmitPunctuator implements Punctuator {
private static final String TRACES_EMITTER_COUNTER = "hypertrace.emitted.traces";
private static final ConcurrentMap tenantToTraceEmittedCounter =
new ConcurrentHashMap<>();
- private static final String PUNCTUATE_LATENCY_TIMER =
- "hypertrace.rawspansgrouper.punctuate.latency";
- private static final ConcurrentMap tenantToPunctuateLatencyTimer =
- new ConcurrentHashMap<>();
private static final String SPANS_PER_TRACE = "hypertrace.rawspansgrouper.spans.per.trace";
private static final ConcurrentMap tenantToSpanPerTraceCounter =
new ConcurrentHashMap<>();
@@ -70,24 +67,24 @@ class TraceEmitPunctuator implements Punctuator {
private static final ConcurrentMap tenantToTraceWithDuplicateSpansCounter =
new ConcurrentHashMap<>();
+ private static final CompletedTaskResult COMPLETED_TASK_RESULT = new CompletedTaskResult();
private final double dataflowSamplingPercent;
- private final TraceIdentity key;
- private final ProcessorContext context;
+ private final ProcessorContext context;
private final KeyValueStore spanStore;
private final KeyValueStore traceStateStore;
- private final To outputTopicProducer;
+ private final String outputTopicProducer;
private final long groupingWindowTimeoutMs;
- private Cancellable cancellable;
TraceEmitPunctuator(
- TraceIdentity key,
- ProcessorContext context,
+ ThrottledPunctuatorConfig throttledPunctuatorConfig,
+ KeyValueStore> throttledPunctuatorStore,
+ ProcessorContext context,
KeyValueStore spanStore,
KeyValueStore traceStateStore,
- To outputTopicProducer,
+ String outputTopicProducer,
long groupingWindowTimeoutMs,
double dataflowSamplingPercent) {
- this.key = key;
+ super(Clock.systemUTC(), throttledPunctuatorConfig, throttledPunctuatorStore);
this.context = context;
this.spanStore = spanStore;
this.traceStateStore = traceStateStore;
@@ -96,143 +93,108 @@ class TraceEmitPunctuator implements Punctuator {
this.dataflowSamplingPercent = dataflowSamplingPercent;
}
- public void setCancellable(Cancellable cancellable) {
- this.cancellable = cancellable;
- }
-
- /**
- * @param timestamp correspond to current system time
- */
- @Override
- public void punctuate(long timestamp) {
- Instant startTime = Instant.now();
- // always cancel the punctuator else it will get re-scheduled automatically
- cancellable.cancel();
-
+ protected TaskResult executeTask(long punctuateTimestamp, TraceIdentity key) {
TraceState traceState = traceStateStore.get(key);
if (null == traceState
|| null == traceState.getSpanIds()
|| traceState.getSpanIds().isEmpty()) {
- /*
- todo - debug why this happens .
- Typically seen when punctuators are created via {@link RawSpansGroupingTransformer.restorePunctuators}
- */
logger.warn(
"TraceState for tenant_id=[{}], trace_id=[{}] is missing.",
key.getTenantId(),
HexUtils.getHex(key.getTraceId()));
- return;
+ return COMPLETED_TASK_RESULT;
}
-
- long emitTs = traceState.getEmitTs();
- if (emitTs <= timestamp) {
- // we can emit this trace so just delete the entry for this 'key'
+ if (punctuateTimestamp - traceState.getTraceEndTimestamp() >= groupingWindowTimeoutMs) {
// Implies that no new spans for the trace have arrived within the last
// 'groupingWindowTimeoutMs' interval
// so the trace can be finalized and emitted
- traceStateStore.delete(key);
-
- ByteBuffer traceId = traceState.getTraceId();
- String tenantId = traceState.getTenantId();
- List rawSpanList = new ArrayList<>();
-
- Set spanIds = new HashSet<>(traceState.getSpanIds());
- spanIds.forEach(
- v -> {
- SpanIdentity spanIdentity = new SpanIdentity(tenantId, traceId, v);
- RawSpan rawSpan = spanStore.delete(spanIdentity);
- // ideally this shouldn't happen
- if (rawSpan != null) {
- rawSpanList.add(rawSpan);
- }
- });
+ emitTrace(key, traceState);
+ // no need of running again for this
+ return COMPLETED_TASK_RESULT;
+ }
+ return new RescheduleTaskResult(traceState.getTraceEndTimestamp() + groupingWindowTimeoutMs);
+ }
- if (traceState.getSpanIds().size() != spanIds.size()) {
- tenantToTraceWithDuplicateSpansCounter
- .computeIfAbsent(
- tenantId,
- k ->
- PlatformMetricsRegistry.registerCounter(
- TRACE_WITH_DUPLICATE_SPANS, Map.of("tenantId", k)))
- .increment();
- if (logger.isDebugEnabled()) {
- logger.debug(
- "Duplicate spanIds: [{}], unique spanIds count: [{}] for tenant: [{}] trace: [{}]",
- traceState.getSpanIds().size(),
- spanIds.size(),
+ private void emitTrace(TraceIdentity key, TraceState traceState) {
+ traceStateStore.delete(key);
+
+ ByteBuffer traceId = traceState.getTraceId();
+ String tenantId = traceState.getTenantId();
+ List rawSpanList = new ArrayList<>();
+
+ Set spanIds = new HashSet<>(traceState.getSpanIds());
+ spanIds.forEach(
+ v -> {
+ SpanIdentity spanIdentity = new SpanIdentity(tenantId, traceId, v);
+ RawSpan rawSpan = spanStore.delete(spanIdentity);
+ // ideally this shouldn't happen
+ if (rawSpan != null) {
+ rawSpanList.add(rawSpan);
+ }
+ });
+
+ if (traceState.getSpanIds().size() != spanIds.size()) {
+ tenantToTraceWithDuplicateSpansCounter
+ .computeIfAbsent(
tenantId,
- HexUtils.getHex(traceId));
- }
- }
-
- recordSpansPerTrace(rawSpanList.size(), List.of(Tag.of("tenant_id", tenantId)));
- Timestamps timestamps =
- trackEndToEndLatencyTimestamps(timestamp, traceState.getTraceStartTimestamp());
- StructuredTrace trace =
- StructuredTraceBuilder.buildStructuredTraceFromRawSpans(
- rawSpanList, traceId, tenantId, timestamps);
-
+ k ->
+ PlatformMetricsRegistry.registerCounter(
+ TRACE_WITH_DUPLICATE_SPANS, Map.of("tenantId", k)))
+ .increment();
if (logger.isDebugEnabled()) {
logger.debug(
- "Emit tenant_id=[{}], trace_id=[{}], spans_count=[{}]",
+ "Duplicate spanIds: [{}], unique spanIds count: [{}] for tenant: [{}] trace: [{}]",
+ traceState.getSpanIds().size(),
+ spanIds.size(),
tenantId,
- HexUtils.getHex(traceId),
- rawSpanList.size());
- }
-
- // report entries in spanStore
- if (spanStoreCountRateLimiter.tryAcquire()) {
- tenantToSpanStoreCountCounter
- .computeIfAbsent(
- tenantId,
- k ->
- PlatformMetricsRegistry.registerCounter(
- SPAN_STORE_COUNT, Map.of("tenantId", k)))
- .increment(spanStore.approximateNumEntries() * 1.0);
+ HexUtils.getHex(traceId));
}
+ }
- // report count of spanIds per trace
- tenantToSpanPerTraceCounter
- .computeIfAbsent(
- tenantId,
- k -> PlatformMetricsRegistry.registerCounter(SPANS_PER_TRACE, Map.of("tenantId", k)))
- .increment(spanIds.size() * 1.0);
+ recordSpansPerTrace(rawSpanList.size(), List.of(Tag.of("tenant_id", tenantId)));
+ Timestamps timestamps =
+ trackEndToEndLatencyTimestamps(
+ System.currentTimeMillis(), traceState.getTraceStartTimestamp());
+ StructuredTrace trace =
+ StructuredTraceBuilder.buildStructuredTraceFromRawSpans(
+ rawSpanList, traceId, tenantId, timestamps);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Emit tenant_id=[{}], trace_id=[{}], spans_count=[{}]",
+ tenantId,
+ HexUtils.getHex(traceId),
+ rawSpanList.size());
+ }
- // report trace emitted count
- tenantToTraceEmittedCounter
+ // report entries in spanStore
+ if (spanStoreCountRateLimiter.tryAcquire()) {
+ tenantToSpanStoreCountCounter
.computeIfAbsent(
tenantId,
- k ->
- PlatformMetricsRegistry.registerCounter(
- TRACES_EMITTER_COUNTER, Map.of("tenantId", k)))
- .increment();
+ k -> PlatformMetricsRegistry.registerCounter(SPAN_STORE_COUNT, Map.of("tenantId", k)))
+ .increment(spanStore.approximateNumEntries() * 1.0);
+ }
- // report punctuate latency
- tenantToPunctuateLatencyTimer
- .computeIfAbsent(
- tenantId,
- k ->
- PlatformMetricsRegistry.registerTimer(
- PUNCTUATE_LATENCY_TIMER, Map.of("tenantId", k)))
- .record(Duration.between(startTime, Instant.now()).toMillis(), TimeUnit.MILLISECONDS);
+ // report count of spanIds per trace
+ tenantToSpanPerTraceCounter
+ .computeIfAbsent(
+ tenantId,
+ k -> PlatformMetricsRegistry.registerCounter(SPANS_PER_TRACE, Map.of("tenantId", k)))
+ .increment(spanIds.size() * 1.0);
- context.forward(key, trace, outputTopicProducer);
- } else {
- // implies spans for the trace have arrived within the last 'sessionTimeoutMs' interval
- // so the session inactivity window is extended from the last timestamp
- if (logger.isDebugEnabled()) {
- logger.debug(
- "Re-scheduling emit trigger for tenant_id=[{}], trace_id=[{}] to [{}]",
- key.getTenantId(),
- HexUtils.getHex(key.getTraceId()),
- Instant.ofEpochMilli(emitTs + groupingWindowTimeoutMs));
- }
- long newEmitTs = emitTs + groupingWindowTimeoutMs;
- // if current timestamp is ahead of newEmitTs then just add a grace of 100ms and fire it
- long duration = Math.max(100, newEmitTs - timestamp);
- cancellable =
- context.schedule(Duration.ofMillis(duration), PunctuationType.WALL_CLOCK_TIME, this);
- }
+ // report trace emitted count
+ tenantToTraceEmittedCounter
+ .computeIfAbsent(
+ tenantId,
+ k ->
+ PlatformMetricsRegistry.registerCounter(
+ TRACES_EMITTER_COUNTER, Map.of("tenantId", k)))
+ .increment();
+
+ context.forward(
+ new Record(key, trace, System.currentTimeMillis()),
+ outputTopicProducer);
}
private Timestamps trackEndToEndLatencyTimestamps(
diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/resources/configs/common/application.conf b/raw-spans-grouper/raw-spans-grouper/src/main/resources/configs/common/application.conf
index cf10638b4..1849d5e97 100644
--- a/raw-spans-grouper/raw-spans-grouper/src/main/resources/configs/common/application.conf
+++ b/raw-spans-grouper/raw-spans-grouper/src/main/resources/configs/common/application.conf
@@ -18,6 +18,7 @@ kafka.streams.config = {
replication.factor = 3
replication.factor = ${?REPLICATION_FACTOR}
topic.cleanup.policy = "delete,compact"
+ consumer.session.timeout.ms = 300000
producer.max.request.size = 10485760
default.production.exception.handler = org.hypertrace.core.kafkastreams.framework.exceptionhandlers.IgnoreProductionExceptionHandler
@@ -60,3 +61,8 @@ metrics.reporter {
}
dataflow.metriccollection.sampling.percent = 10.0
+
+trace.emit.punctuator {
+ frequency = 5s
+ frequency = ${?TRACE_EMIT_PUNCTUATOR_FREQUENCY}
+}
\ No newline at end of file
diff --git a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java
index 3d96e9b53..04c1d6d1c 100644
--- a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java
+++ b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java
@@ -2,6 +2,8 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -9,7 +11,7 @@
import java.io.File;
import java.nio.ByteBuffer;
import java.nio.file.Path;
-import java.time.Duration;
+import java.time.Clock;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -52,8 +54,9 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir
mergedProps.put(RawSpanGrouperConstants.RAW_SPANS_GROUPER_JOB_CONFIG, config);
mergedProps.put(StreamsConfig.STATE_DIR_CONFIG, file.getAbsolutePath());
+ Clock clock = mock(Clock.class);
StreamsBuilder streamsBuilder =
- underTest.buildTopology(mergedProps, new StreamsBuilder(), new HashMap<>());
+ underTest.buildTopologyWithClock(clock, mergedProps, new StreamsBuilder(), new HashMap<>());
Properties props = new Properties();
mergedProps.forEach(props::put);
@@ -200,21 +203,41 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir
.setEvent(createEvent("event-19", tenant2))
.build();
- inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-1"), span1);
- inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-2"), span4);
- td.advanceWallClockTime(Duration.ofSeconds(1));
- inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-1"), span2);
+ // dummyTenant is used to advance stream time as required, all spans will be dropped
+ // because of setting max span count to 0, no traces for this tenant will be emitted
+ String dummyTenant = "dummyTenant";
+ TraceIdentity dummyTraceIdentity = createTraceIdentity(dummyTenant, "dummyTrace");
+ RawSpan dummySpan =
+ RawSpan.newBuilder()
+ .setTraceId(ByteBuffer.wrap("dummyTrace".getBytes()))
+ .setCustomerId(tenant2)
+ .setEvent(createEvent("dummyEvent", dummyTenant))
+ .build();
+
+ long messageTime = advanceAndSyncClockMock(0, clock, 0);
+
+ inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-1"), span1, messageTime);
+ messageTime = advanceAndSyncClockMock(messageTime, clock, 1000);
+ inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-1"), span2, messageTime);
+ inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-2"), span4, messageTime);
// select a value < 30s (groupingWindowTimeoutInMs)
- // this shouldn't trigger a punctuate call
- td.advanceWallClockTime(Duration.ofMillis(200));
+ // this shouldn't trigger a span emit
+ messageTime = advanceAndSyncClockMock(messageTime, clock, 200);
+ inputTopic.pipeInput(dummyTraceIdentity, dummySpan, messageTime);
assertTrue(outputTopic.isEmpty());
- // the next advance should trigger a punctuate call and emit a trace with 2 spans
- td.advanceWallClockTime(Duration.ofSeconds(32));
+ // the next advance should and emit a trace1 with 2 spans, trace2 with one span
+ messageTime = advanceAndSyncClockMock(messageTime, clock, 35_000);
+ inputTopic.pipeInput(dummyTraceIdentity, dummySpan, messageTime);
- // trace1 should have 2 span span1, span2
+ // trace2 should have 1 span span3
StructuredTrace trace = outputTopic.readValue();
+ assertEquals(1, trace.getEventList().size());
+ assertEquals("event-4", new String(trace.getEventList().get(0).getEventId().array()));
+
+ // trace1 should have 2 span span1, span2
+ trace = outputTopic.readValue();
assertEquals(2, trace.getEventList().size());
Set traceEventIds =
trace.getEventList().stream()
@@ -223,16 +246,12 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir
assertTrue(traceEventIds.contains("event-1"));
assertTrue(traceEventIds.contains("event-2"));
- // trace2 should have 1 span span3
- trace = outputTopic.readValue();
- assertEquals(1, trace.getEventList().size());
- assertEquals("event-4", new String(trace.getEventList().get(0).getEventId().array()));
-
- inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-1"), span3);
- td.advanceWallClockTime(Duration.ofSeconds(45));
- inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-2"), span5);
- // the next advance should trigger a punctuate call and emit a trace with 2 spans
- td.advanceWallClockTime(Duration.ofSeconds(35));
+ inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-1"), span3, messageTime);
+ messageTime = advanceAndSyncClockMock(messageTime, clock, 45_000);
+ inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-2"), span5, messageTime);
+ // the next advance should emit a trace1, trace2 with one span
+ messageTime = advanceAndSyncClockMock(messageTime, clock, 35_000);
+ inputTopic.pipeInput(dummyTraceIdentity, dummySpan, messageTime);
// trace1 should have 1 span i.e. span3
trace = outputTopic.readValue();
@@ -244,29 +263,33 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir
assertEquals(1, trace.getEventList().size());
assertEquals("event-5", new String(trace.getEventList().get(0).getEventId().array()));
- inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span6);
- inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span7);
- inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span8);
- inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span9);
- inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span10);
- inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span11);
- td.advanceWallClockTime(Duration.ofSeconds(35));
-
- // trace should be truncated with 5 spans
+ inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span6, messageTime);
+ inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span7, messageTime);
+ inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span8, messageTime);
+ inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span9, messageTime);
+ inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span10, messageTime);
+ inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span11, messageTime);
+ // the next advance should emit trace3
+ messageTime = advanceAndSyncClockMock(messageTime, clock, 35_000);
+ inputTopic.pipeInput(dummyTraceIdentity, dummySpan, messageTime);
+
+ // trace3 should be truncated with 5 spans because of tenant limit
trace = outputTopic.readValue();
assertEquals(5, trace.getEventList().size());
// input 8 spans of trace-4 for tenant2, as there is global upper limit apply, it will emit only
// 6
- inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span12);
- inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span13);
- inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span14);
- inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span15);
- inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span16);
- inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span17);
- inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span18);
- inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span19);
- td.advanceWallClockTime(Duration.ofSeconds(35));
+ inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span12, messageTime);
+ inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span13, messageTime);
+ inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span14, messageTime);
+ inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span15, messageTime);
+ inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span16, messageTime);
+ inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span17, messageTime);
+ inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span18, messageTime);
+ inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span19, messageTime);
+ // the next advance should emit trace 4
+ messageTime = advanceAndSyncClockMock(messageTime, clock, 35_000);
+ inputTopic.pipeInput(dummyTraceIdentity, dummySpan, messageTime);
TestRecord testRecord = outputTopic.readRecord();
@@ -274,6 +297,12 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir
assertEquals(6, testRecord.getValue().getEventList().size());
}
+ private long advanceAndSyncClockMock(long messageTime, Clock clock, long advanceMs) {
+ long finalMessageTime = messageTime + advanceMs;
+ when(clock.millis()).thenAnswer((inv) -> finalMessageTime);
+ return finalMessageTime;
+ }
+
private Event createEvent(String eventId, String tenantId) {
return Event.newBuilder()
.setCustomerId(tenantId)
diff --git a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuatorTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuatorTest.java
index 43b526c42..eb4f39030 100644
--- a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuatorTest.java
+++ b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuatorTest.java
@@ -1,5 +1,7 @@
package org.hypertrace.core.rawspansgrouper;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@@ -9,12 +11,14 @@
import java.nio.ByteBuffer;
import java.util.List;
-import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.hypertrace.core.datamodel.Event;
import org.hypertrace.core.datamodel.RawSpan;
+import org.hypertrace.core.datamodel.StructuredTrace;
+import org.hypertrace.core.kafkastreams.framework.punctuators.ThrottledPunctuatorConfig;
+import org.hypertrace.core.kafkastreams.framework.punctuators.action.TaskResult;
import org.hypertrace.core.kafkastreams.framework.serdes.AvroSerde;
import org.hypertrace.core.spannormalizer.SpanIdentity;
import org.hypertrace.core.spannormalizer.TraceIdentity;
@@ -23,95 +27,86 @@
import org.junit.jupiter.api.Test;
class TraceEmitPunctuatorTest {
-
- private TraceEmitPunctuator underTest;
+ private static final long groupingWindowTimeoutMs = 300;
+ private static final TraceIdentity traceIdentity =
+ TraceIdentity.newBuilder()
+ .setTenantId("__default")
+ .setTraceId(ByteBuffer.wrap("trace-1".getBytes()))
+ .build();
+ private TraceEmitPunctuator emitCallback;
private KeyValueStore spanStore;
private KeyValueStore traceStateStore;
@BeforeEach
public void setUp() {
AvroSerde avroSerde = new AvroSerde();
- ProcessorContext context = mock(ProcessorContext.class);
+ ProcessorContext context = mock(ProcessorContext.class);
when(context.keySerde()).thenReturn(avroSerde);
spanStore = mock(KeyValueStore.class);
traceStateStore = mock(KeyValueStore.class);
To outputTopicProducer = mock(To.class);
- underTest =
+ emitCallback =
new TraceEmitPunctuator(
- TraceIdentity.newBuilder()
- .setTenantId("__default")
- .setTraceId(ByteBuffer.wrap("trace-1".getBytes()))
- .build(),
+ mock(ThrottledPunctuatorConfig.class),
+ mock(KeyValueStore.class),
context,
spanStore,
traceStateStore,
- outputTopicProducer,
- 100,
+ RawSpanGrouperConstants.OUTPUT_TOPIC_PRODUCER,
+ groupingWindowTimeoutMs,
-1);
- underTest.setCancellable(mock(Cancellable.class));
}
@Test
- public void whenPunctuatorIsRescheduledExpectNoChangesToTraceEmitTriggerStore() {
- when(traceStateStore.get(
- eq(
- TraceIdentity.newBuilder()
- .setTenantId("__default")
- .setTraceId(ByteBuffer.wrap("trace-1".getBytes()))
- .build())))
- .thenReturn(
- TraceState.newBuilder()
- .setSpanIds(List.of(ByteBuffer.wrap("span-1".getBytes())))
- .setEmitTs(300)
- .setTraceStartTimestamp(150)
- .setTraceEndTimestamp(300)
- .setTenantId("tenant")
- .setTraceId(ByteBuffer.wrap("trace-1".getBytes()))
- .build());
- underTest.punctuate(200);
+ public void testWhenWindowIsExtended() {
+ TraceState traceState =
+ TraceState.newBuilder()
+ .setSpanIds(List.of(ByteBuffer.wrap("span-1".getBytes())))
+ .setEmitTs(-1)
+ .setTraceStartTimestamp(100)
+ .setTraceEndTimestamp(200)
+ .setTenantId("tenant")
+ .setTraceId(ByteBuffer.wrap("trace-1".getBytes()))
+ .build();
+ when(traceStateStore.get(eq(traceIdentity))).thenReturn(traceState);
+
+ TaskResult callbackAction = emitCallback.executeTask(300, traceIdentity);
+ assertEquals(
+ traceState.getTraceEndTimestamp() + groupingWindowTimeoutMs,
+ callbackAction.getRescheduleTimestamp().get());
// the above when() call should be the only interaction
- verify(traceStateStore, times(1)).get(any());
+ verify(traceStateStore, times(1)).get(traceIdentity);
}
@Test
- public void whenTraceIsEmittedExpectDeleteOperationOnTraceStateStore() {
+ public void testWhenTraceToBeEmitted() {
+ TraceState traceState =
+ TraceState.newBuilder()
+ .setSpanIds(List.of(ByteBuffer.wrap("span-1".getBytes())))
+ .setEmitTs(-1)
+ .setTraceStartTimestamp(100)
+ .setTraceEndTimestamp(130)
+ .setTenantId("tenant")
+ .setTraceId(ByteBuffer.wrap("trace-1".getBytes()))
+ .build();
+ when(traceStateStore.get(eq(traceIdentity))).thenReturn(traceState);
+
+ RawSpan rawSpan =
+ RawSpan.newBuilder()
+ .setCustomerId("__default")
+ .setEvent(
+ Event.newBuilder()
+ .setEventId(ByteBuffer.wrap("span-1".getBytes()))
+ .setCustomerId("__default")
+ .build())
+ .setTraceId(ByteBuffer.wrap("trace-1".getBytes()))
+ .build();
+ when(spanStore.delete(any())).thenReturn(rawSpan);
- when(traceStateStore.get(
- eq(
- TraceIdentity.newBuilder()
- .setTenantId("__default")
- .setTraceId(ByteBuffer.wrap("trace-1".getBytes()))
- .build())))
- .thenReturn(
- TraceState.newBuilder()
- .setSpanIds(List.of(ByteBuffer.wrap("span-1".getBytes())))
- .setEmitTs(180)
- .setTraceStartTimestamp(100)
- .setTraceEndTimestamp(130)
- .setTenantId("tenant")
- .setTraceId(ByteBuffer.wrap("trace-1".getBytes()))
- .build());
+ assertTrue(emitCallback.executeTask(450, traceIdentity).getRescheduleTimestamp().isEmpty());
- when(spanStore.delete(any()))
- .thenReturn(
- RawSpan.newBuilder()
- .setCustomerId("__default")
- .setEvent(
- Event.newBuilder()
- .setEventId(ByteBuffer.wrap("span-1".getBytes()))
- .setCustomerId("__default")
- .build())
- .setTraceId(ByteBuffer.wrap("trace-1".getBytes()))
- .build());
- underTest.punctuate(200); // the above when() call should be the only interaction
- verify(traceStateStore, times(1)).get(any());
+ verify(traceStateStore, times(1)).get(traceIdentity);
verify(spanStore, times(1)).delete(any());
- verify(traceStateStore)
- .delete(
- eq(
- TraceIdentity.newBuilder()
- .setTenantId("__default")
- .setTraceId(ByteBuffer.wrap("trace-1".getBytes()))
- .build()));
+ verify(traceStateStore).delete(eq(traceIdentity));
}
}
diff --git a/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf b/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf
index 5284b3714..6ecac3b23 100644
--- a/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf
+++ b/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf
@@ -11,6 +11,7 @@ precreate.topics = false
kafka.streams.config = {
application.id = raw-spans-to-structured-traces-grouping-job
num.stream.threads = 2
+ consumer.session.timeout.ms = 300000
topic.cleanup.policy = "delete,compact"
replication.factor = 3
bootstrap.servers = "localhost:9092"
@@ -37,10 +38,13 @@ span.window.store.segment.size.mins = ${?SPAN_WINDOW_STORE_SEGMENT_SIZE_MINS}
default.max.span.count = 6
max.span.count = {
tenant1 = 5
+ dummyTenant = 0
}
-span.groupby.session.window.interval = 5
+span.groupby.session.window.interval = 30
span.groupby.session.window.graceperiod.ms = 100
dataflow.metriccollection.sampling.percent = 10.0
+
+trace.emit.punctuator.frequency = 15s
\ No newline at end of file
diff --git a/span-normalizer/span-normalizer-api/src/main/avro/TraceState.avdl b/span-normalizer/span-normalizer-api/src/main/avro/TraceState.avdl
index f651a183d..49520ca30 100644
--- a/span-normalizer/span-normalizer-api/src/main/avro/TraceState.avdl
+++ b/span-normalizer/span-normalizer-api/src/main/avro/TraceState.avdl
@@ -3,7 +3,7 @@ protocol TraceStateProtocol {
record TraceState {
long trace_start_timestamp;
long trace_end_timestamp;
- long emit_ts;
+ /** @deprecated unused */ long emit_ts;
string tenant_id;
bytes trace_id;
array span_ids;