From 3c038a9158ba7adef78fa97f662884730956d93a Mon Sep 17 00:00:00 2001 From: Kishan Sairam Adapa Date: Tue, 17 Oct 2023 12:46:50 +0530 Subject: [PATCH] feat: use throttled punctuator (#424) * wrap up * helmify * nit * refactor * nit * tune * edit * wrap up for wall clock time changes * try changing clock * fix tests * nit * nit * nit * update * nit * minor refactor to migrate to kafka streams new apis * Nit * suppressions * update test * netty * update * update * test * revert * update * log * wf changes * inspect * wf test * Revert "wf test" This reverts commit b8fd337b65ea82e1e180736859eaff62a17186a2. * fix --------- Co-authored-by: Laxman Ch --- .github/workflows/docker-tests.yml | 6 +- .../hypertrace-ingester/scripts/inspect.sh | 7 +- .../hypertrace-ingester/scripts/tests.sh | 4 + gradle/libs.versions.toml | 2 +- .../ingester/HypertraceIngesterTest.java | 6 +- .../resources/configs/common/application.conf | 2 +- owasp-suppressions.xml | 37 +-- .../templates/raw-spans-grouper-config.yaml | 4 + raw-spans-grouper/helm/values.yaml | 1 + .../RawSpanGrouperConstants.java | 4 + .../core/rawspansgrouper/RawSpansGrouper.java | 29 ++- .../rawspansgrouper/RawSpansProcessor.java | 171 +++++++------ .../rawspansgrouper/TraceEmitPunctuator.java | 230 ++++++++---------- .../resources/configs/common/application.conf | 6 + .../rawspansgrouper/RawSpansGrouperTest.java | 107 +++++--- .../TraceEmitPunctuatorTest.java | 129 +++++----- .../raw-spans-grouper/application.conf | 6 +- .../src/main/avro/TraceState.avdl | 2 +- 18 files changed, 380 insertions(+), 373 deletions(-) diff --git a/.github/workflows/docker-tests.yml b/.github/workflows/docker-tests.yml index ecb2c0027..e3bd80e14 100644 --- a/.github/workflows/docker-tests.yml +++ b/.github/workflows/docker-tests.yml @@ -34,13 +34,14 @@ jobs: working-directory: ./.github/workflows/hypertrace-ingester # Below tests a docker-compose.yml service named 'sut' with a valid HEALTHCHECK instruction: run: | - docker-compose -f docker-compose.yml -f docker-compose-zipkin-example.yml up -d || { ../scripts/inspect.sh ; exit 1 ; } + docker-compose -f docker-compose.yml -f docker-compose-zipkin-example.yml up -d || { ./scripts/inspect.sh docker-compose.yml docker-compose-zipkin-example.yml ; exit 1 ; } - name: Waits for some stability working-directory: ./.github/workflows/hypertrace-ingester run: | sleep 60 # you can decrease it but never increase it docker-compose -f docker-compose.yml ps + ./scripts/inspect.sh docker-compose.yml docker-compose-zipkin-example.yml - name: Runs tests working-directory: ./.github/workflows/hypertrace-ingester/scripts @@ -67,13 +68,14 @@ jobs: working-directory: ./.github/workflows/hypertrace-ingester # Below tests a docker-compose.yml service named 'sut' with a valid HEALTHCHECK instruction: run: | - docker-compose -f postgres/docker-compose.yml -f docker-compose-zipkin-example.yml up -d || { ../scripts/inspect.sh ; exit 1 ; } + docker-compose -f postgres/docker-compose.yml -f docker-compose-zipkin-example.yml up -d || { ./scripts/inspect.sh postgres/docker-compose.yml docker-compose-zipkin-example.yml ; exit 1 ; } - name: Waits for some stability working-directory: ./.github/workflows/hypertrace-ingester run: | sleep 60 # you can decrease it but never increase it docker-compose -f postgres/docker-compose.yml ps + ./scripts/inspect.sh postgres/docker-compose.yml docker-compose-zipkin-example.yml - name: Runs tests working-directory: ./.github/workflows/hypertrace-ingester/scripts diff --git a/.github/workflows/hypertrace-ingester/scripts/inspect.sh b/.github/workflows/hypertrace-ingester/scripts/inspect.sh index ec16206c1..5e86e330e 100755 --- a/.github/workflows/hypertrace-ingester/scripts/inspect.sh +++ b/.github/workflows/hypertrace-ingester/scripts/inspect.sh @@ -1,11 +1,8 @@ #!/bin/bash -# This script displays the state and logs for the containers in the docker-compose. +echo "Inspecting compose containers from $1 and $2" -SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" -DOCKER_COMPOSE_FILE_DIR="$(dirname $SCRIPT_DIR)/docker" - -containers=$(docker-compose -f ${DOCKER_COMPOSE_FILE_DIR}/docker-compose.yml -f ${DOCKER_COMPOSE_FILE_DIR}/docker-compose-zipkin-example.yml ps -q -a) +containers=$(docker-compose -f $1 -f $2 ps -q -a) while IFS= read -r container; do name=$(docker inspect $container | jq -r '.[0].Name') echo "==================" diff --git a/.github/workflows/hypertrace-ingester/scripts/tests.sh b/.github/workflows/hypertrace-ingester/scripts/tests.sh index b271c9f2e..c4a8f4f3a 100755 --- a/.github/workflows/hypertrace-ingester/scripts/tests.sh +++ b/.github/workflows/hypertrace-ingester/scripts/tests.sh @@ -38,6 +38,10 @@ echo "Calling the frontend to generate a trace..." echo "" curl -o /dev/null -s http://${FRONTEND_SERVICE_HOST}:8081 || { echo "Host $FRONTEND_SERVICE_HOST is down." ; exit 1; } +# grouper depends on stream time advancement, hence we are sending another set of traces just to meet that condition +sleep 35 +curl -o /dev/null -s http://${FRONTEND_SERVICE_HOST}:8081 || { echo "Host $FRONTEND_SERVICE_HOST is down." ; exit 1; } + echo "Retrieving the list of traces." echo "" diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index f0ae77bc2..41c3bccee 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -3,7 +3,7 @@ hypertrace-entity-service = "0.8.78" hypertrace-config-service = "0.1.54" hypertrace-grpc-utils = "0.12.4" hypertrace-serviceFramework = "0.1.60" -hypertrace-kafkaStreams = "0.3.8" +hypertrace-kafkaStreams = "0.4.0" hypertrace-view-generator = "0.4.19" grpc = "1.57.2" diff --git a/hypertrace-ingester/src/test/java/org/hypertrace/ingester/HypertraceIngesterTest.java b/hypertrace-ingester/src/test/java/org/hypertrace/ingester/HypertraceIngesterTest.java index c2b17280c..e38d417e9 100644 --- a/hypertrace-ingester/src/test/java/org/hypertrace/ingester/HypertraceIngesterTest.java +++ b/hypertrace-ingester/src/test/java/org/hypertrace/ingester/HypertraceIngesterTest.java @@ -9,7 +9,6 @@ import io.jaegertracing.api_v2.JaegerSpanInternalModel.Span; import java.io.File; import java.nio.file.Path; -import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -86,6 +85,9 @@ public void testIngestionPacketFlow(@TempDir Path tempDir) { new JaegerSpanSerde().serializer()); spanNormalizerInputTopic.pipeInput(span); + // we use stream time for emit in grouper hence this additional pipe of a span after grouping + // window + spanNormalizerInputTopic.pipeInput(null, span, System.currentTimeMillis() + 30_001); // create output topic for span-normalizer topology TestOutputTopic spanNormalizerOutputTopic = @@ -96,8 +98,6 @@ public void testIngestionPacketFlow(@TempDir Path tempDir) { new AvroSerde<>().deserializer()); assertNotNull(spanNormalizerOutputTopic.readKeyValue()); - topologyTestDriver.advanceWallClockTime(Duration.ofSeconds(32)); - // create output topic for span-grouper topology TestOutputTopic spanGrouperOutputTopic = topologyTestDriver.createOutputTopic( diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/resources/configs/common/application.conf b/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/resources/configs/common/application.conf index f328b9cfd..2be70f962 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/resources/configs/common/application.conf +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/resources/configs/common/application.conf @@ -114,7 +114,7 @@ enricher { } TraceStatsEnricher { - class = "org.hypertrace.traceenricher.enrichment.enrichers.TraceStatsEnrichere" + class = "org.hypertrace.traceenricher.enrichment.enrichers.TraceStatsEnricher" dependencies = ["EndpointEnricher"] } diff --git a/owasp-suppressions.xml b/owasp-suppressions.xml index 509d87f40..3432d98a0 100644 --- a/owasp-suppressions.xml +++ b/owasp-suppressions.xml @@ -60,21 +60,6 @@ ^pkg:maven/io\.opentelemetry/opentelemetry\-exporter\-prometheus@.*$ cpe:/a:prometheus:prometheus - - - ^pkg:maven/org\.json/json@.*$ - CVE-2022-45688 - - - - ^pkg:maven/com\.fasterxml\.jackson\.core/jackson\-databind@.*$ - CVE-2023-35116 - ^pkg:maven/org\.hypertrace\.core\.kafkastreams\.framework/avro\-partitioners@.*$ CVE-2023-37475 - + ^pkg:maven/org\.quartz\-scheduler/quartz@.*$ CVE-2023-39017 - + @@ -104,18 +89,16 @@ ^pkg:maven/com\.fasterxml\.jackson\.core/jackson\-databind@.*$ CVE-2023-35116 - - - ^pkg:maven/io\.netty/netty\-handler@.*$ - CVE-2023-4586 - - + - ^pkg:maven/io\.netty/netty\-handler@.*$ + ^pkg:maven/io\.netty/netty.*@.*$ CVE-2023-4586 \ No newline at end of file diff --git a/raw-spans-grouper/helm/templates/raw-spans-grouper-config.yaml b/raw-spans-grouper/helm/templates/raw-spans-grouper-config.yaml index 220bacf1c..ca65a46e4 100644 --- a/raw-spans-grouper/helm/templates/raw-spans-grouper-config.yaml +++ b/raw-spans-grouper/helm/templates/raw-spans-grouper-config.yaml @@ -81,3 +81,7 @@ data: } } {{- end }} + + {{- if hasKey .Values.rawSpansGrouperConfig "traceEmitPunctuatorFrequency" }} + trace.emit.punctuator.frequency = {{ .Values.rawSpansGrouperConfig.traceEmitPunctuatorFrequency }} + {{- end }} diff --git a/raw-spans-grouper/helm/values.yaml b/raw-spans-grouper/helm/values.yaml index 028f87f4b..139c0855c 100644 --- a/raw-spans-grouper/helm/values.yaml +++ b/raw-spans-grouper/helm/values.yaml @@ -132,6 +132,7 @@ rawSpansGrouperConfig: groupPartitionerEnabled: false groupPartitionerConfigServiceHost: config-service groupPartitionerConfigServicePort: 50104 + traceEmitPunctuatorFrequency: 5s span: groupby: diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java index 244782024..7c91a30d7 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java @@ -17,4 +17,8 @@ public class RawSpanGrouperConstants { public static final String DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT = "default.max.span.count"; public static final String DROPPED_SPANS_COUNTER = "hypertrace.dropped.spans"; public static final String TRUNCATED_TRACES_COUNTER = "hypertrace.truncated.traces"; + static final String TRACE_EMIT_PUNCTUATOR = "trace-emit-punctuator"; + static final String TRACE_EMIT_PUNCTUATOR_STORE_NAME = "emitter-store"; + static final String TRACE_EMIT_PUNCTUATOR_FREQUENCY_CONFIG_KEY = + "trace.emit.punctuator.frequency"; } diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouper.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouper.java index d70d8ba9f..aaa89e693 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouper.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouper.java @@ -5,12 +5,17 @@ import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.OUTPUT_TOPIC_PRODUCER; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.RAW_SPANS_GROUPER_JOB_CONFIG; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.SPAN_STATE_STORE_NAME; +import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_EMIT_PUNCTUATOR_STORE_NAME; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_STATE_STORE; +import com.google.common.annotations.VisibleForTesting; import com.typesafe.config.Config; +import java.time.Clock; +import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; @@ -44,6 +49,15 @@ public StreamsBuilder buildTopology( Map properties, StreamsBuilder streamsBuilder, Map> inputStreams) { + return buildTopologyWithClock(Clock.systemUTC(), properties, streamsBuilder, inputStreams); + } + + @VisibleForTesting + StreamsBuilder buildTopologyWithClock( + Clock clock, + Map properties, + StreamsBuilder streamsBuilder, + Map> inputStreams) { Config jobConfig = getJobConfig(properties); String inputTopic = jobConfig.getString(INPUT_TOPIC_CONFIG_KEY); String outputTopic = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY); @@ -72,8 +86,16 @@ public StreamsBuilder buildTopology( Stores.persistentKeyValueStore(SPAN_STATE_STORE_NAME), keySerde, valueSerde) .withCachingEnabled(); + StoreBuilder> traceEmitPunctuatorStoreBuilder = + Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(TRACE_EMIT_PUNCTUATOR_STORE_NAME), + Serdes.Long(), + Serdes.ListSerde(ArrayList.class, valueSerde)) + .withCachingEnabled(); + streamsBuilder.addStateStore(spanStoreBuilder); streamsBuilder.addStateStore(traceStateStoreBuilder); + streamsBuilder.addStateStore(traceEmitPunctuatorStoreBuilder); StreamPartitioner groupPartitioner = new GroupPartitionerBuilder() @@ -89,11 +111,12 @@ public StreamsBuilder buildTopology( outputTopicProducer = outputTopicProducer.withName(OUTPUT_TOPIC_PRODUCER); inputStream - .transform( - RawSpansProcessor::new, + .process( + () -> new RawSpansProcessor(clock), Named.as(RawSpansProcessor.class.getSimpleName()), SPAN_STATE_STORE_NAME, - TRACE_STATE_STORE) + TRACE_STATE_STORE, + TRACE_EMIT_PUNCTUATOR_STORE_NAME) .to(outputTopic, outputTopicProducer); return streamsBuilder; diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java index 1dc710bcc..885b023f4 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java @@ -1,6 +1,7 @@ package org.hypertrace.core.rawspansgrouper; import static org.hypertrace.core.datamodel.shared.AvroBuilderCache.fastNewBuilder; +import static org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp.KAFKA_STREAMS_CONFIG_KEY; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DROPPED_SPANS_COUNTER; @@ -9,6 +10,9 @@ import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.RAW_SPANS_GROUPER_JOB_CONFIG; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.SPAN_GROUPBY_SESSION_WINDOW_INTERVAL_CONFIG_KEY; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.SPAN_STATE_STORE_NAME; +import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_EMIT_PUNCTUATOR; +import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_EMIT_PUNCTUATOR_FREQUENCY_CONFIG_KEY; +import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_EMIT_PUNCTUATOR_STORE_NAME; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_STATE_STORE; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRUNCATED_TRACES_COUNTER; @@ -16,8 +20,10 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Timer; import java.nio.ByteBuffer; +import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -25,17 +31,16 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.processor.Cancellable; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; -import org.apache.kafka.streams.processor.To; -import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.KeyValueStore; import org.hypertrace.core.datamodel.RawSpan; import org.hypertrace.core.datamodel.StructuredTrace; import org.hypertrace.core.datamodel.shared.HexUtils; +import org.hypertrace.core.kafkastreams.framework.punctuators.ThrottledPunctuatorConfig; import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry; import org.hypertrace.core.spannormalizer.SpanIdentity; import org.hypertrace.core.spannormalizer.TraceIdentity; @@ -47,26 +52,17 @@ * Receives spans keyed by trace_id and stores them. A {@link TraceEmitPunctuator} is scheduled to * run after the {@link RawSpansProcessor#groupingWindowTimeoutMs} interval to emit the trace. If * any spans for the trace arrive within the {@link RawSpansProcessor#groupingWindowTimeoutMs} - * interval then the {@link RawSpansProcessor#groupingWindowTimeoutMs} will get reset and the trace - * will get an additional {@link RawSpansProcessor#groupingWindowTimeoutMs} time to accept spans. + * interval then the trace will get an additional {@link RawSpansProcessor#groupingWindowTimeoutMs} + * time to accept spans. */ public class RawSpansProcessor - implements Transformer> { + implements Processor { private static final Logger logger = LoggerFactory.getLogger(RawSpansProcessor.class); private static final String PROCESSING_LATENCY_TIMER = "hypertrace.rawspansgrouper.processing.latency"; private static final ConcurrentMap tenantToSpansGroupingTimer = new ConcurrentHashMap<>(); - private ProcessorContext context; - private KeyValueStore spanStore; - private KeyValueStore traceStateStore; - private long groupingWindowTimeoutMs; - private To outputTopic; - private double dataflowSamplingPercent = -1; - private static final Map maxSpanCountMap = new HashMap<>(); - private long defaultMaxSpanCountLimit = Long.MAX_VALUE; - // counter for number of spans dropped per tenant private static final ConcurrentMap droppedSpansCounter = new ConcurrentHashMap<>(); @@ -74,14 +70,24 @@ public class RawSpansProcessor // counter for number of truncated traces per tenant private static final ConcurrentMap truncatedTracesCounter = new ConcurrentHashMap<>(); + private final Clock clock; + private KeyValueStore spanStore; + private KeyValueStore traceStateStore; + private long groupingWindowTimeoutMs; + private double dataflowSamplingPercent = -1; + private static final Map maxSpanCountMap = new HashMap<>(); + private long defaultMaxSpanCountLimit = Long.MAX_VALUE; + private TraceEmitPunctuator traceEmitPunctuator; + private Cancellable traceEmitTasksPunctuatorCancellable; + + public RawSpansProcessor(Clock clock) { + this.clock = clock; + } @Override - public void init(ProcessorContext context) { - this.context = context; - this.spanStore = - (KeyValueStore) context.getStateStore(SPAN_STATE_STORE_NAME); - this.traceStateStore = - (KeyValueStore) context.getStateStore(TRACE_STATE_STORE); + public void init(ProcessorContext context) { + this.spanStore = context.getStateStore(SPAN_STATE_STORE_NAME); + this.traceStateStore = context.getStateStore(TRACE_STATE_STORE); Config jobConfig = (Config) (context.appConfigs().get(RAW_SPANS_GROUPER_JOB_CONFIG)); this.groupingWindowTimeoutMs = jobConfig.getLong(SPAN_GROUPBY_SESSION_WINDOW_INTERVAL_CONFIG_KEY) * 1000; @@ -94,7 +100,8 @@ public void init(ProcessorContext context) { if (jobConfig.hasPath(INFLIGHT_TRACE_MAX_SPAN_COUNT)) { Config subConfig = jobConfig.getConfig(INFLIGHT_TRACE_MAX_SPAN_COUNT); - subConfig.entrySet().stream() + subConfig + .entrySet() .forEach( (entry) -> { maxSpanCountMap.put(entry.getKey(), subConfig.getLong(entry.getKey())); @@ -105,19 +112,50 @@ public void init(ProcessorContext context) { defaultMaxSpanCountLimit = jobConfig.getLong(DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT); } - this.outputTopic = To.child(OUTPUT_TOPIC_PRODUCER); - restorePunctuators(); + KeyValueStore> traceEmitPunctuatorStore = + context.getStateStore(TRACE_EMIT_PUNCTUATOR_STORE_NAME); + traceEmitPunctuator = + new TraceEmitPunctuator( + new ThrottledPunctuatorConfig( + jobConfig.getConfig(KAFKA_STREAMS_CONFIG_KEY), TRACE_EMIT_PUNCTUATOR), + traceEmitPunctuatorStore, + context, + spanStore, + traceStateStore, + OUTPUT_TOPIC_PRODUCER, + groupingWindowTimeoutMs, + dataflowSamplingPercent); + // Punctuator scheduled on stream time => no input messages => no emits will happen + // We will almost never have input down to 0, i.e., there are no spans coming to platform, + // While using wall clock time handles that case, there is an issue with using wall clock. + // In cases of lag being burnt, we are processing message produced at different time stamp + // intervals, probably at higher rate than which they were produced, now not doing punctuation + // often will increase the amount of work yielding punctuator in next iterations and will keep + // on piling up until lag is burnt completely and only then the punctuator will catch up back to + // normal input rate. This is undesirable, here the outputs are only emitted from punctuator. + // If we burn lag from input topic, we want to push it down to output & downstream as soon + // as possible, if we hog it more and more it will delay cascading lag to downstream. Given + // grouper stays at start of pipeline and also that input dying down almost never happens + // it is better to use stream time over wall clock time for yielding trace emit tasks punctuator + traceEmitTasksPunctuatorCancellable = + context.schedule( + jobConfig.getDuration(TRACE_EMIT_PUNCTUATOR_FREQUENCY_CONFIG_KEY), + PunctuationType.STREAM_TIME, + traceEmitPunctuator); } - public KeyValue transform(TraceIdentity key, RawSpan value) { + @Override + public void process(Record record) { Instant start = Instant.now(); - long currentTimeMs = System.currentTimeMillis(); + long currentTimeMs = clock.millis(); + TraceIdentity key = record.key(); + RawSpan value = record.value(); TraceState traceState = traceStateStore.get(key); boolean firstEntry = (traceState == null); if (shouldDropSpan(key, traceState)) { - return null; + return; } String tenantId = key.getTenantId(); @@ -125,35 +163,27 @@ public KeyValue transform(TraceIdentity key, Raw ByteBuffer spanId = value.getEvent().getEventId(); spanStore.put(new SpanIdentity(tenantId, traceId, spanId), value); - /* - the trace emit ts is essentially currentTs + groupingWindowTimeoutMs - i.e. if there is no span added in the next 'groupingWindowTimeoutMs' interval - then the trace can be finalized and emitted - */ - long traceEmitTs = currentTimeMs + groupingWindowTimeoutMs; - if (logger.isDebugEnabled()) { - logger.debug( - "Updating trigger_ts=[{}] for for tenant_id=[{}], trace_id=[{}]", - Instant.ofEpochMilli(traceEmitTs), - key.getTenantId(), - HexUtils.getHex(traceId)); - } - if (firstEntry) { traceState = fastNewBuilder(TraceState.Builder.class) .setTraceStartTimestamp(currentTimeMs) .setTraceEndTimestamp(currentTimeMs) - .setEmitTs(traceEmitTs) + .setEmitTs(-1) // deprecated, not used anymore .setTenantId(tenantId) .setTraceId(traceId) .setSpanIds(List.of(spanId)) .build(); - schedulePunctuator(key); + traceEmitPunctuator.scheduleTask(currentTimeMs, key); } else { traceState.getSpanIds().add(spanId); + long prevScheduleTimestamp = traceState.getTraceEndTimestamp(); traceState.setTraceEndTimestamp(currentTimeMs); - traceState.setEmitTs(traceEmitTs); + if (!traceEmitPunctuator.rescheduleTask( + prevScheduleTimestamp, currentTimeMs + groupingWindowTimeoutMs, key)) { + logger.debug( + "Failed to reschedule task on getting span for trace key {}, schedule already dropped!", + key); + } } traceStateStore.put(key, traceState); @@ -165,13 +195,13 @@ public KeyValue transform(TraceIdentity key, Raw PlatformMetricsRegistry.registerTimer( PROCESSING_LATENCY_TIMER, Map.of("tenantId", k))) .record(Duration.between(start, Instant.now()).toMillis(), TimeUnit.MILLISECONDS); - // the punctuator will emit the trace - return null; + // no need to do context.forward. the punctuator will emit the trace once it's eligible to be + // emitted + return; } private boolean shouldDropSpan(TraceIdentity key, TraceState traceState) { - int inFlightSpansPerTrace = - traceState != null ? traceState.getSpanIds().size() : Integer.MIN_VALUE; + int inFlightSpansPerTrace = traceState != null ? traceState.getSpanIds().size() : 0; long maxSpanCountTenantLimit = maxSpanCountMap.containsKey(key.getTenantId()) ? maxSpanCountMap.get(key.getTenantId()) @@ -213,45 +243,8 @@ private boolean shouldDropSpan(TraceIdentity key, TraceState traceState) { return false; } - private void schedulePunctuator(TraceIdentity key) { - TraceEmitPunctuator punctuator = - new TraceEmitPunctuator( - key, - context, - spanStore, - traceStateStore, - outputTopic, - groupingWindowTimeoutMs, - dataflowSamplingPercent); - Cancellable cancellable = - context.schedule( - Duration.ofMillis(groupingWindowTimeoutMs), PunctuationType.STREAM_TIME, punctuator); - punctuator.setCancellable(cancellable); - logger.debug( - "Scheduled a punctuator to emit trace for key=[{}] to run after [{}] ms", - key, - groupingWindowTimeoutMs); - } - @Override - public void close() {} - - /** - * Punctuators are not persisted - so on restart we recover punctuators and schedule them to run - * after {@link RawSpansProcessor#groupingWindowTimeoutMs} - */ - void restorePunctuators() { - long count = 0; - Instant start = Instant.now(); - try (KeyValueIterator it = traceStateStore.all()) { - while (it.hasNext()) { - schedulePunctuator(it.next().key); - count++; - } - logger.info( - "Restored=[{}] punctuators, Duration=[{}]", - count, - Duration.between(start, Instant.now())); - } + public void close() { + traceEmitTasksPunctuatorCancellable.cancel(); } } diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuator.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuator.java index 0c0cae52a..28b6f81cb 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuator.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuator.java @@ -9,8 +9,7 @@ import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Timer; import java.nio.ByteBuffer; -import java.time.Duration; -import java.time.Instant; +import java.time.Clock; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -20,11 +19,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import org.apache.kafka.streams.processor.Cancellable; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.PunctuationType; -import org.apache.kafka.streams.processor.Punctuator; -import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.KeyValueStore; import org.hypertrace.core.datamodel.RawSpan; import org.hypertrace.core.datamodel.StructuredTrace; @@ -33,6 +29,11 @@ import org.hypertrace.core.datamodel.shared.DataflowMetricUtils; import org.hypertrace.core.datamodel.shared.HexUtils; import org.hypertrace.core.datamodel.shared.trace.StructuredTraceBuilder; +import org.hypertrace.core.kafkastreams.framework.punctuators.AbstractThrottledPunctuator; +import org.hypertrace.core.kafkastreams.framework.punctuators.ThrottledPunctuatorConfig; +import org.hypertrace.core.kafkastreams.framework.punctuators.action.CompletedTaskResult; +import org.hypertrace.core.kafkastreams.framework.punctuators.action.RescheduleTaskResult; +import org.hypertrace.core.kafkastreams.framework.punctuators.action.TaskResult; import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry; import org.hypertrace.core.spannormalizer.SpanIdentity; import org.hypertrace.core.spannormalizer.TraceIdentity; @@ -41,10 +42,10 @@ import org.slf4j.LoggerFactory; /** - * Checks if a trace can be finalized and emitted based on inactivity period of {@link + * Check if a trace can be finalized and emitted based on inactivity period of {@link * RawSpansProcessor#groupingWindowTimeoutMs} */ -class TraceEmitPunctuator implements Punctuator { +class TraceEmitPunctuator extends AbstractThrottledPunctuator { private static final Logger logger = LoggerFactory.getLogger(TraceEmitPunctuator.class); private static final Object mutex = new Object(); @@ -54,10 +55,6 @@ class TraceEmitPunctuator implements Punctuator { private static final String TRACES_EMITTER_COUNTER = "hypertrace.emitted.traces"; private static final ConcurrentMap tenantToTraceEmittedCounter = new ConcurrentHashMap<>(); - private static final String PUNCTUATE_LATENCY_TIMER = - "hypertrace.rawspansgrouper.punctuate.latency"; - private static final ConcurrentMap tenantToPunctuateLatencyTimer = - new ConcurrentHashMap<>(); private static final String SPANS_PER_TRACE = "hypertrace.rawspansgrouper.spans.per.trace"; private static final ConcurrentMap tenantToSpanPerTraceCounter = new ConcurrentHashMap<>(); @@ -70,24 +67,24 @@ class TraceEmitPunctuator implements Punctuator { private static final ConcurrentMap tenantToTraceWithDuplicateSpansCounter = new ConcurrentHashMap<>(); + private static final CompletedTaskResult COMPLETED_TASK_RESULT = new CompletedTaskResult(); private final double dataflowSamplingPercent; - private final TraceIdentity key; - private final ProcessorContext context; + private final ProcessorContext context; private final KeyValueStore spanStore; private final KeyValueStore traceStateStore; - private final To outputTopicProducer; + private final String outputTopicProducer; private final long groupingWindowTimeoutMs; - private Cancellable cancellable; TraceEmitPunctuator( - TraceIdentity key, - ProcessorContext context, + ThrottledPunctuatorConfig throttledPunctuatorConfig, + KeyValueStore> throttledPunctuatorStore, + ProcessorContext context, KeyValueStore spanStore, KeyValueStore traceStateStore, - To outputTopicProducer, + String outputTopicProducer, long groupingWindowTimeoutMs, double dataflowSamplingPercent) { - this.key = key; + super(Clock.systemUTC(), throttledPunctuatorConfig, throttledPunctuatorStore); this.context = context; this.spanStore = spanStore; this.traceStateStore = traceStateStore; @@ -96,143 +93,108 @@ class TraceEmitPunctuator implements Punctuator { this.dataflowSamplingPercent = dataflowSamplingPercent; } - public void setCancellable(Cancellable cancellable) { - this.cancellable = cancellable; - } - - /** - * @param timestamp correspond to current system time - */ - @Override - public void punctuate(long timestamp) { - Instant startTime = Instant.now(); - // always cancel the punctuator else it will get re-scheduled automatically - cancellable.cancel(); - + protected TaskResult executeTask(long punctuateTimestamp, TraceIdentity key) { TraceState traceState = traceStateStore.get(key); if (null == traceState || null == traceState.getSpanIds() || traceState.getSpanIds().isEmpty()) { - /* - todo - debug why this happens . - Typically seen when punctuators are created via {@link RawSpansGroupingTransformer.restorePunctuators} - */ logger.warn( "TraceState for tenant_id=[{}], trace_id=[{}] is missing.", key.getTenantId(), HexUtils.getHex(key.getTraceId())); - return; + return COMPLETED_TASK_RESULT; } - - long emitTs = traceState.getEmitTs(); - if (emitTs <= timestamp) { - // we can emit this trace so just delete the entry for this 'key' + if (punctuateTimestamp - traceState.getTraceEndTimestamp() >= groupingWindowTimeoutMs) { // Implies that no new spans for the trace have arrived within the last // 'groupingWindowTimeoutMs' interval // so the trace can be finalized and emitted - traceStateStore.delete(key); - - ByteBuffer traceId = traceState.getTraceId(); - String tenantId = traceState.getTenantId(); - List rawSpanList = new ArrayList<>(); - - Set spanIds = new HashSet<>(traceState.getSpanIds()); - spanIds.forEach( - v -> { - SpanIdentity spanIdentity = new SpanIdentity(tenantId, traceId, v); - RawSpan rawSpan = spanStore.delete(spanIdentity); - // ideally this shouldn't happen - if (rawSpan != null) { - rawSpanList.add(rawSpan); - } - }); + emitTrace(key, traceState); + // no need of running again for this + return COMPLETED_TASK_RESULT; + } + return new RescheduleTaskResult(traceState.getTraceEndTimestamp() + groupingWindowTimeoutMs); + } - if (traceState.getSpanIds().size() != spanIds.size()) { - tenantToTraceWithDuplicateSpansCounter - .computeIfAbsent( - tenantId, - k -> - PlatformMetricsRegistry.registerCounter( - TRACE_WITH_DUPLICATE_SPANS, Map.of("tenantId", k))) - .increment(); - if (logger.isDebugEnabled()) { - logger.debug( - "Duplicate spanIds: [{}], unique spanIds count: [{}] for tenant: [{}] trace: [{}]", - traceState.getSpanIds().size(), - spanIds.size(), + private void emitTrace(TraceIdentity key, TraceState traceState) { + traceStateStore.delete(key); + + ByteBuffer traceId = traceState.getTraceId(); + String tenantId = traceState.getTenantId(); + List rawSpanList = new ArrayList<>(); + + Set spanIds = new HashSet<>(traceState.getSpanIds()); + spanIds.forEach( + v -> { + SpanIdentity spanIdentity = new SpanIdentity(tenantId, traceId, v); + RawSpan rawSpan = spanStore.delete(spanIdentity); + // ideally this shouldn't happen + if (rawSpan != null) { + rawSpanList.add(rawSpan); + } + }); + + if (traceState.getSpanIds().size() != spanIds.size()) { + tenantToTraceWithDuplicateSpansCounter + .computeIfAbsent( tenantId, - HexUtils.getHex(traceId)); - } - } - - recordSpansPerTrace(rawSpanList.size(), List.of(Tag.of("tenant_id", tenantId))); - Timestamps timestamps = - trackEndToEndLatencyTimestamps(timestamp, traceState.getTraceStartTimestamp()); - StructuredTrace trace = - StructuredTraceBuilder.buildStructuredTraceFromRawSpans( - rawSpanList, traceId, tenantId, timestamps); - + k -> + PlatformMetricsRegistry.registerCounter( + TRACE_WITH_DUPLICATE_SPANS, Map.of("tenantId", k))) + .increment(); if (logger.isDebugEnabled()) { logger.debug( - "Emit tenant_id=[{}], trace_id=[{}], spans_count=[{}]", + "Duplicate spanIds: [{}], unique spanIds count: [{}] for tenant: [{}] trace: [{}]", + traceState.getSpanIds().size(), + spanIds.size(), tenantId, - HexUtils.getHex(traceId), - rawSpanList.size()); - } - - // report entries in spanStore - if (spanStoreCountRateLimiter.tryAcquire()) { - tenantToSpanStoreCountCounter - .computeIfAbsent( - tenantId, - k -> - PlatformMetricsRegistry.registerCounter( - SPAN_STORE_COUNT, Map.of("tenantId", k))) - .increment(spanStore.approximateNumEntries() * 1.0); + HexUtils.getHex(traceId)); } + } - // report count of spanIds per trace - tenantToSpanPerTraceCounter - .computeIfAbsent( - tenantId, - k -> PlatformMetricsRegistry.registerCounter(SPANS_PER_TRACE, Map.of("tenantId", k))) - .increment(spanIds.size() * 1.0); + recordSpansPerTrace(rawSpanList.size(), List.of(Tag.of("tenant_id", tenantId))); + Timestamps timestamps = + trackEndToEndLatencyTimestamps( + System.currentTimeMillis(), traceState.getTraceStartTimestamp()); + StructuredTrace trace = + StructuredTraceBuilder.buildStructuredTraceFromRawSpans( + rawSpanList, traceId, tenantId, timestamps); + + if (logger.isDebugEnabled()) { + logger.debug( + "Emit tenant_id=[{}], trace_id=[{}], spans_count=[{}]", + tenantId, + HexUtils.getHex(traceId), + rawSpanList.size()); + } - // report trace emitted count - tenantToTraceEmittedCounter + // report entries in spanStore + if (spanStoreCountRateLimiter.tryAcquire()) { + tenantToSpanStoreCountCounter .computeIfAbsent( tenantId, - k -> - PlatformMetricsRegistry.registerCounter( - TRACES_EMITTER_COUNTER, Map.of("tenantId", k))) - .increment(); + k -> PlatformMetricsRegistry.registerCounter(SPAN_STORE_COUNT, Map.of("tenantId", k))) + .increment(spanStore.approximateNumEntries() * 1.0); + } - // report punctuate latency - tenantToPunctuateLatencyTimer - .computeIfAbsent( - tenantId, - k -> - PlatformMetricsRegistry.registerTimer( - PUNCTUATE_LATENCY_TIMER, Map.of("tenantId", k))) - .record(Duration.between(startTime, Instant.now()).toMillis(), TimeUnit.MILLISECONDS); + // report count of spanIds per trace + tenantToSpanPerTraceCounter + .computeIfAbsent( + tenantId, + k -> PlatformMetricsRegistry.registerCounter(SPANS_PER_TRACE, Map.of("tenantId", k))) + .increment(spanIds.size() * 1.0); - context.forward(key, trace, outputTopicProducer); - } else { - // implies spans for the trace have arrived within the last 'sessionTimeoutMs' interval - // so the session inactivity window is extended from the last timestamp - if (logger.isDebugEnabled()) { - logger.debug( - "Re-scheduling emit trigger for tenant_id=[{}], trace_id=[{}] to [{}]", - key.getTenantId(), - HexUtils.getHex(key.getTraceId()), - Instant.ofEpochMilli(emitTs + groupingWindowTimeoutMs)); - } - long newEmitTs = emitTs + groupingWindowTimeoutMs; - // if current timestamp is ahead of newEmitTs then just add a grace of 100ms and fire it - long duration = Math.max(100, newEmitTs - timestamp); - cancellable = - context.schedule(Duration.ofMillis(duration), PunctuationType.WALL_CLOCK_TIME, this); - } + // report trace emitted count + tenantToTraceEmittedCounter + .computeIfAbsent( + tenantId, + k -> + PlatformMetricsRegistry.registerCounter( + TRACES_EMITTER_COUNTER, Map.of("tenantId", k))) + .increment(); + + context.forward( + new Record(key, trace, System.currentTimeMillis()), + outputTopicProducer); } private Timestamps trackEndToEndLatencyTimestamps( diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/resources/configs/common/application.conf b/raw-spans-grouper/raw-spans-grouper/src/main/resources/configs/common/application.conf index cf10638b4..1849d5e97 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/resources/configs/common/application.conf +++ b/raw-spans-grouper/raw-spans-grouper/src/main/resources/configs/common/application.conf @@ -18,6 +18,7 @@ kafka.streams.config = { replication.factor = 3 replication.factor = ${?REPLICATION_FACTOR} topic.cleanup.policy = "delete,compact" + consumer.session.timeout.ms = 300000 producer.max.request.size = 10485760 default.production.exception.handler = org.hypertrace.core.kafkastreams.framework.exceptionhandlers.IgnoreProductionExceptionHandler @@ -60,3 +61,8 @@ metrics.reporter { } dataflow.metriccollection.sampling.percent = 10.0 + +trace.emit.punctuator { + frequency = 5s + frequency = ${?TRACE_EMIT_PUNCTUATOR_FREQUENCY} +} \ No newline at end of file diff --git a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java index 3d96e9b53..04c1d6d1c 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java +++ b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java @@ -2,6 +2,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -9,7 +11,7 @@ import java.io.File; import java.nio.ByteBuffer; import java.nio.file.Path; -import java.time.Duration; +import java.time.Clock; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -52,8 +54,9 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir mergedProps.put(RawSpanGrouperConstants.RAW_SPANS_GROUPER_JOB_CONFIG, config); mergedProps.put(StreamsConfig.STATE_DIR_CONFIG, file.getAbsolutePath()); + Clock clock = mock(Clock.class); StreamsBuilder streamsBuilder = - underTest.buildTopology(mergedProps, new StreamsBuilder(), new HashMap<>()); + underTest.buildTopologyWithClock(clock, mergedProps, new StreamsBuilder(), new HashMap<>()); Properties props = new Properties(); mergedProps.forEach(props::put); @@ -200,21 +203,41 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir .setEvent(createEvent("event-19", tenant2)) .build(); - inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-1"), span1); - inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-2"), span4); - td.advanceWallClockTime(Duration.ofSeconds(1)); - inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-1"), span2); + // dummyTenant is used to advance stream time as required, all spans will be dropped + // because of setting max span count to 0, no traces for this tenant will be emitted + String dummyTenant = "dummyTenant"; + TraceIdentity dummyTraceIdentity = createTraceIdentity(dummyTenant, "dummyTrace"); + RawSpan dummySpan = + RawSpan.newBuilder() + .setTraceId(ByteBuffer.wrap("dummyTrace".getBytes())) + .setCustomerId(tenant2) + .setEvent(createEvent("dummyEvent", dummyTenant)) + .build(); + + long messageTime = advanceAndSyncClockMock(0, clock, 0); + + inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-1"), span1, messageTime); + messageTime = advanceAndSyncClockMock(messageTime, clock, 1000); + inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-1"), span2, messageTime); + inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-2"), span4, messageTime); // select a value < 30s (groupingWindowTimeoutInMs) - // this shouldn't trigger a punctuate call - td.advanceWallClockTime(Duration.ofMillis(200)); + // this shouldn't trigger a span emit + messageTime = advanceAndSyncClockMock(messageTime, clock, 200); + inputTopic.pipeInput(dummyTraceIdentity, dummySpan, messageTime); assertTrue(outputTopic.isEmpty()); - // the next advance should trigger a punctuate call and emit a trace with 2 spans - td.advanceWallClockTime(Duration.ofSeconds(32)); + // the next advance should and emit a trace1 with 2 spans, trace2 with one span + messageTime = advanceAndSyncClockMock(messageTime, clock, 35_000); + inputTopic.pipeInput(dummyTraceIdentity, dummySpan, messageTime); - // trace1 should have 2 span span1, span2 + // trace2 should have 1 span span3 StructuredTrace trace = outputTopic.readValue(); + assertEquals(1, trace.getEventList().size()); + assertEquals("event-4", new String(trace.getEventList().get(0).getEventId().array())); + + // trace1 should have 2 span span1, span2 + trace = outputTopic.readValue(); assertEquals(2, trace.getEventList().size()); Set traceEventIds = trace.getEventList().stream() @@ -223,16 +246,12 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir assertTrue(traceEventIds.contains("event-1")); assertTrue(traceEventIds.contains("event-2")); - // trace2 should have 1 span span3 - trace = outputTopic.readValue(); - assertEquals(1, trace.getEventList().size()); - assertEquals("event-4", new String(trace.getEventList().get(0).getEventId().array())); - - inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-1"), span3); - td.advanceWallClockTime(Duration.ofSeconds(45)); - inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-2"), span5); - // the next advance should trigger a punctuate call and emit a trace with 2 spans - td.advanceWallClockTime(Duration.ofSeconds(35)); + inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-1"), span3, messageTime); + messageTime = advanceAndSyncClockMock(messageTime, clock, 45_000); + inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-2"), span5, messageTime); + // the next advance should emit a trace1, trace2 with one span + messageTime = advanceAndSyncClockMock(messageTime, clock, 35_000); + inputTopic.pipeInput(dummyTraceIdentity, dummySpan, messageTime); // trace1 should have 1 span i.e. span3 trace = outputTopic.readValue(); @@ -244,29 +263,33 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir assertEquals(1, trace.getEventList().size()); assertEquals("event-5", new String(trace.getEventList().get(0).getEventId().array())); - inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span6); - inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span7); - inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span8); - inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span9); - inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span10); - inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span11); - td.advanceWallClockTime(Duration.ofSeconds(35)); - - // trace should be truncated with 5 spans + inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span6, messageTime); + inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span7, messageTime); + inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span8, messageTime); + inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span9, messageTime); + inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span10, messageTime); + inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span11, messageTime); + // the next advance should emit trace3 + messageTime = advanceAndSyncClockMock(messageTime, clock, 35_000); + inputTopic.pipeInput(dummyTraceIdentity, dummySpan, messageTime); + + // trace3 should be truncated with 5 spans because of tenant limit trace = outputTopic.readValue(); assertEquals(5, trace.getEventList().size()); // input 8 spans of trace-4 for tenant2, as there is global upper limit apply, it will emit only // 6 - inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span12); - inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span13); - inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span14); - inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span15); - inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span16); - inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span17); - inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span18); - inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span19); - td.advanceWallClockTime(Duration.ofSeconds(35)); + inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span12, messageTime); + inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span13, messageTime); + inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span14, messageTime); + inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span15, messageTime); + inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span16, messageTime); + inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span17, messageTime); + inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span18, messageTime); + inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span19, messageTime); + // the next advance should emit trace 4 + messageTime = advanceAndSyncClockMock(messageTime, clock, 35_000); + inputTopic.pipeInput(dummyTraceIdentity, dummySpan, messageTime); TestRecord testRecord = outputTopic.readRecord(); @@ -274,6 +297,12 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir assertEquals(6, testRecord.getValue().getEventList().size()); } + private long advanceAndSyncClockMock(long messageTime, Clock clock, long advanceMs) { + long finalMessageTime = messageTime + advanceMs; + when(clock.millis()).thenAnswer((inv) -> finalMessageTime); + return finalMessageTime; + } + private Event createEvent(String eventId, String tenantId) { return Event.newBuilder() .setCustomerId(tenantId) diff --git a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuatorTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuatorTest.java index 43b526c42..eb4f39030 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuatorTest.java +++ b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuatorTest.java @@ -1,5 +1,7 @@ package org.hypertrace.core.rawspansgrouper; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -9,12 +11,14 @@ import java.nio.ByteBuffer; import java.util.List; -import org.apache.kafka.streams.processor.Cancellable; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.state.KeyValueStore; import org.hypertrace.core.datamodel.Event; import org.hypertrace.core.datamodel.RawSpan; +import org.hypertrace.core.datamodel.StructuredTrace; +import org.hypertrace.core.kafkastreams.framework.punctuators.ThrottledPunctuatorConfig; +import org.hypertrace.core.kafkastreams.framework.punctuators.action.TaskResult; import org.hypertrace.core.kafkastreams.framework.serdes.AvroSerde; import org.hypertrace.core.spannormalizer.SpanIdentity; import org.hypertrace.core.spannormalizer.TraceIdentity; @@ -23,95 +27,86 @@ import org.junit.jupiter.api.Test; class TraceEmitPunctuatorTest { - - private TraceEmitPunctuator underTest; + private static final long groupingWindowTimeoutMs = 300; + private static final TraceIdentity traceIdentity = + TraceIdentity.newBuilder() + .setTenantId("__default") + .setTraceId(ByteBuffer.wrap("trace-1".getBytes())) + .build(); + private TraceEmitPunctuator emitCallback; private KeyValueStore spanStore; private KeyValueStore traceStateStore; @BeforeEach public void setUp() { AvroSerde avroSerde = new AvroSerde(); - ProcessorContext context = mock(ProcessorContext.class); + ProcessorContext context = mock(ProcessorContext.class); when(context.keySerde()).thenReturn(avroSerde); spanStore = mock(KeyValueStore.class); traceStateStore = mock(KeyValueStore.class); To outputTopicProducer = mock(To.class); - underTest = + emitCallback = new TraceEmitPunctuator( - TraceIdentity.newBuilder() - .setTenantId("__default") - .setTraceId(ByteBuffer.wrap("trace-1".getBytes())) - .build(), + mock(ThrottledPunctuatorConfig.class), + mock(KeyValueStore.class), context, spanStore, traceStateStore, - outputTopicProducer, - 100, + RawSpanGrouperConstants.OUTPUT_TOPIC_PRODUCER, + groupingWindowTimeoutMs, -1); - underTest.setCancellable(mock(Cancellable.class)); } @Test - public void whenPunctuatorIsRescheduledExpectNoChangesToTraceEmitTriggerStore() { - when(traceStateStore.get( - eq( - TraceIdentity.newBuilder() - .setTenantId("__default") - .setTraceId(ByteBuffer.wrap("trace-1".getBytes())) - .build()))) - .thenReturn( - TraceState.newBuilder() - .setSpanIds(List.of(ByteBuffer.wrap("span-1".getBytes()))) - .setEmitTs(300) - .setTraceStartTimestamp(150) - .setTraceEndTimestamp(300) - .setTenantId("tenant") - .setTraceId(ByteBuffer.wrap("trace-1".getBytes())) - .build()); - underTest.punctuate(200); + public void testWhenWindowIsExtended() { + TraceState traceState = + TraceState.newBuilder() + .setSpanIds(List.of(ByteBuffer.wrap("span-1".getBytes()))) + .setEmitTs(-1) + .setTraceStartTimestamp(100) + .setTraceEndTimestamp(200) + .setTenantId("tenant") + .setTraceId(ByteBuffer.wrap("trace-1".getBytes())) + .build(); + when(traceStateStore.get(eq(traceIdentity))).thenReturn(traceState); + + TaskResult callbackAction = emitCallback.executeTask(300, traceIdentity); + assertEquals( + traceState.getTraceEndTimestamp() + groupingWindowTimeoutMs, + callbackAction.getRescheduleTimestamp().get()); // the above when() call should be the only interaction - verify(traceStateStore, times(1)).get(any()); + verify(traceStateStore, times(1)).get(traceIdentity); } @Test - public void whenTraceIsEmittedExpectDeleteOperationOnTraceStateStore() { + public void testWhenTraceToBeEmitted() { + TraceState traceState = + TraceState.newBuilder() + .setSpanIds(List.of(ByteBuffer.wrap("span-1".getBytes()))) + .setEmitTs(-1) + .setTraceStartTimestamp(100) + .setTraceEndTimestamp(130) + .setTenantId("tenant") + .setTraceId(ByteBuffer.wrap("trace-1".getBytes())) + .build(); + when(traceStateStore.get(eq(traceIdentity))).thenReturn(traceState); + + RawSpan rawSpan = + RawSpan.newBuilder() + .setCustomerId("__default") + .setEvent( + Event.newBuilder() + .setEventId(ByteBuffer.wrap("span-1".getBytes())) + .setCustomerId("__default") + .build()) + .setTraceId(ByteBuffer.wrap("trace-1".getBytes())) + .build(); + when(spanStore.delete(any())).thenReturn(rawSpan); - when(traceStateStore.get( - eq( - TraceIdentity.newBuilder() - .setTenantId("__default") - .setTraceId(ByteBuffer.wrap("trace-1".getBytes())) - .build()))) - .thenReturn( - TraceState.newBuilder() - .setSpanIds(List.of(ByteBuffer.wrap("span-1".getBytes()))) - .setEmitTs(180) - .setTraceStartTimestamp(100) - .setTraceEndTimestamp(130) - .setTenantId("tenant") - .setTraceId(ByteBuffer.wrap("trace-1".getBytes())) - .build()); + assertTrue(emitCallback.executeTask(450, traceIdentity).getRescheduleTimestamp().isEmpty()); - when(spanStore.delete(any())) - .thenReturn( - RawSpan.newBuilder() - .setCustomerId("__default") - .setEvent( - Event.newBuilder() - .setEventId(ByteBuffer.wrap("span-1".getBytes())) - .setCustomerId("__default") - .build()) - .setTraceId(ByteBuffer.wrap("trace-1".getBytes())) - .build()); - underTest.punctuate(200); // the above when() call should be the only interaction - verify(traceStateStore, times(1)).get(any()); + verify(traceStateStore, times(1)).get(traceIdentity); verify(spanStore, times(1)).delete(any()); - verify(traceStateStore) - .delete( - eq( - TraceIdentity.newBuilder() - .setTenantId("__default") - .setTraceId(ByteBuffer.wrap("trace-1".getBytes())) - .build())); + verify(traceStateStore).delete(eq(traceIdentity)); } } diff --git a/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf b/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf index 5284b3714..6ecac3b23 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf +++ b/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf @@ -11,6 +11,7 @@ precreate.topics = false kafka.streams.config = { application.id = raw-spans-to-structured-traces-grouping-job num.stream.threads = 2 + consumer.session.timeout.ms = 300000 topic.cleanup.policy = "delete,compact" replication.factor = 3 bootstrap.servers = "localhost:9092" @@ -37,10 +38,13 @@ span.window.store.segment.size.mins = ${?SPAN_WINDOW_STORE_SEGMENT_SIZE_MINS} default.max.span.count = 6 max.span.count = { tenant1 = 5 + dummyTenant = 0 } -span.groupby.session.window.interval = 5 +span.groupby.session.window.interval = 30 span.groupby.session.window.graceperiod.ms = 100 dataflow.metriccollection.sampling.percent = 10.0 + +trace.emit.punctuator.frequency = 15s \ No newline at end of file diff --git a/span-normalizer/span-normalizer-api/src/main/avro/TraceState.avdl b/span-normalizer/span-normalizer-api/src/main/avro/TraceState.avdl index f651a183d..49520ca30 100644 --- a/span-normalizer/span-normalizer-api/src/main/avro/TraceState.avdl +++ b/span-normalizer/span-normalizer-api/src/main/avro/TraceState.avdl @@ -3,7 +3,7 @@ protocol TraceStateProtocol { record TraceState { long trace_start_timestamp; long trace_end_timestamp; - long emit_ts; + /** @deprecated unused */ long emit_ts; string tenant_id; bytes trace_id; array span_ids;