Skip to content

Commit

Permalink
feat: use throttled punctuator (#424)
Browse files Browse the repository at this point in the history
* wrap up

* helmify

* nit

* refactor

* nit

* tune

* edit

* wrap up for wall clock time changes

* try changing clock

* fix tests

* nit

* nit

* nit

* update

* nit

* minor refactor to migrate to kafka streams new apis

* Nit

* suppressions

* update test

* netty

* update

* update

* test

* revert

* update

* log

* wf changes

* inspect

* wf test

* Revert "wf test"

This reverts commit b8fd337.

* fix

---------

Co-authored-by: Laxman Ch <laxman@traceable.ai>
  • Loading branch information
Kishan Sairam Adapa and laxmanchekka authored Oct 17, 2023
1 parent 7b5ec6c commit 3c038a9
Show file tree
Hide file tree
Showing 18 changed files with 380 additions and 373 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/docker-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ jobs:
working-directory: ./.github/workflows/hypertrace-ingester
# Below tests a docker-compose.yml service named 'sut' with a valid HEALTHCHECK instruction:
run: |
docker-compose -f docker-compose.yml -f docker-compose-zipkin-example.yml up -d || { ../scripts/inspect.sh ; exit 1 ; }
docker-compose -f docker-compose.yml -f docker-compose-zipkin-example.yml up -d || { ./scripts/inspect.sh docker-compose.yml docker-compose-zipkin-example.yml ; exit 1 ; }
- name: Waits for some stability
working-directory: ./.github/workflows/hypertrace-ingester
run: |
sleep 60 # you can decrease it but never increase it
docker-compose -f docker-compose.yml ps
./scripts/inspect.sh docker-compose.yml docker-compose-zipkin-example.yml
- name: Runs tests
working-directory: ./.github/workflows/hypertrace-ingester/scripts
Expand All @@ -67,13 +68,14 @@ jobs:
working-directory: ./.github/workflows/hypertrace-ingester
# Below tests a docker-compose.yml service named 'sut' with a valid HEALTHCHECK instruction:
run: |
docker-compose -f postgres/docker-compose.yml -f docker-compose-zipkin-example.yml up -d || { ../scripts/inspect.sh ; exit 1 ; }
docker-compose -f postgres/docker-compose.yml -f docker-compose-zipkin-example.yml up -d || { ./scripts/inspect.sh postgres/docker-compose.yml docker-compose-zipkin-example.yml ; exit 1 ; }
- name: Waits for some stability
working-directory: ./.github/workflows/hypertrace-ingester
run: |
sleep 60 # you can decrease it but never increase it
docker-compose -f postgres/docker-compose.yml ps
./scripts/inspect.sh postgres/docker-compose.yml docker-compose-zipkin-example.yml
- name: Runs tests
working-directory: ./.github/workflows/hypertrace-ingester/scripts
Expand Down
7 changes: 2 additions & 5 deletions .github/workflows/hypertrace-ingester/scripts/inspect.sh
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
#!/bin/bash

# This script displays the state and logs for the containers in the docker-compose.
echo "Inspecting compose containers from $1 and $2"

SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
DOCKER_COMPOSE_FILE_DIR="$(dirname $SCRIPT_DIR)/docker"

containers=$(docker-compose -f ${DOCKER_COMPOSE_FILE_DIR}/docker-compose.yml -f ${DOCKER_COMPOSE_FILE_DIR}/docker-compose-zipkin-example.yml ps -q -a)
containers=$(docker-compose -f $1 -f $2 ps -q -a)
while IFS= read -r container; do
name=$(docker inspect $container | jq -r '.[0].Name')
echo "=================="
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/hypertrace-ingester/scripts/tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ echo "Calling the frontend to generate a trace..."
echo ""
curl -o /dev/null -s http://${FRONTEND_SERVICE_HOST}:8081 || { echo "Host $FRONTEND_SERVICE_HOST is down." ; exit 1; }

# grouper depends on stream time advancement, hence we are sending another set of traces just to meet that condition
sleep 35
curl -o /dev/null -s http://${FRONTEND_SERVICE_HOST}:8081 || { echo "Host $FRONTEND_SERVICE_HOST is down." ; exit 1; }

echo "Retrieving the list of traces."
echo ""

Expand Down
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ hypertrace-entity-service = "0.8.78"
hypertrace-config-service = "0.1.54"
hypertrace-grpc-utils = "0.12.4"
hypertrace-serviceFramework = "0.1.60"
hypertrace-kafkaStreams = "0.3.8"
hypertrace-kafkaStreams = "0.4.0"
hypertrace-view-generator = "0.4.19"
grpc = "1.57.2"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.jaegertracing.api_v2.JaegerSpanInternalModel.Span;
import java.io.File;
import java.nio.file.Path;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -86,6 +85,9 @@ public void testIngestionPacketFlow(@TempDir Path tempDir) {
new JaegerSpanSerde().serializer());

spanNormalizerInputTopic.pipeInput(span);
// we use stream time for emit in grouper hence this additional pipe of a span after grouping
// window
spanNormalizerInputTopic.pipeInput(null, span, System.currentTimeMillis() + 30_001);

// create output topic for span-normalizer topology
TestOutputTopic spanNormalizerOutputTopic =
Expand All @@ -96,8 +98,6 @@ public void testIngestionPacketFlow(@TempDir Path tempDir) {
new AvroSerde<>().deserializer());
assertNotNull(spanNormalizerOutputTopic.readKeyValue());

topologyTestDriver.advanceWallClockTime(Duration.ofSeconds(32));

// create output topic for span-grouper topology
TestOutputTopic spanGrouperOutputTopic =
topologyTestDriver.createOutputTopic(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ enricher {
}

TraceStatsEnricher {
class = "org.hypertrace.traceenricher.enrichment.enrichers.TraceStatsEnrichere"
class = "org.hypertrace.traceenricher.enrichment.enrichers.TraceStatsEnricher"
dependencies = ["EndpointEnricher"]
}

Expand Down
37 changes: 10 additions & 27 deletions owasp-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,36 +60,21 @@
<packageUrl regex="true">^pkg:maven/io\.opentelemetry/opentelemetry\-exporter\-prometheus@.*$</packageUrl>
<cpe>cpe:/a:prometheus:prometheus</cpe>
</suppress>
<suppress until="2023-07-30Z">
<notes><![CDATA[
file name: json-20230618.jar
]]></notes>
<packageUrl regex="true">^pkg:maven/org\.json/json@.*$</packageUrl>
<cve>CVE-2022-45688</cve>
</suppress>
<suppress until="2023-07-30Z">
<notes><![CDATA[
file name: jackson-databind-2.15.2.jar
Its a disputed one, recheck after a month.
]]></notes>
<packageUrl regex="true">^pkg:maven/com\.fasterxml\.jackson\.core/jackson\-databind@.*$</packageUrl>
<cve>CVE-2023-35116</cve>
</suppress>
<suppress>
<notes><![CDATA[
file name: avro-partitioners-0.2.13.jar. Affects go based projects
]]></notes>
<packageUrl regex="true">^pkg:maven/org\.hypertrace\.core\.kafkastreams\.framework/avro\-partitioners@.*$</packageUrl>
<cve>CVE-2023-37475</cve>
</suppress>
<suppress until="2023-10-31Z">
<suppress until="2023-12-31Z">
<notes><![CDATA[
Not yet fixed in quartz. file name: quartz-2.3.2.jar
]]></notes>
<packageUrl regex="true">^pkg:maven/org\.quartz\-scheduler/quartz@.*$</packageUrl>
<cve>CVE-2023-39017</cve>
</suppress>
<suppress until="2023-11-30Z">
<suppress until="2023-12-30Z">
<notes><![CDATA[
file name: json-20230618.jar
]]></notes>
Expand All @@ -104,18 +89,16 @@
<packageUrl regex="true">^pkg:maven/com\.fasterxml\.jackson\.core/jackson\-databind@.*$</packageUrl>
<cve>CVE-2023-35116</cve>
</suppress>
<suppress until="2023-09-30Z">
<notes><![CDATA[
file name: netty-handler-4.1.94.Final.jar
]]></notes>
<packageUrl regex="true">^pkg:maven/io\.netty/netty\-handler@.*$</packageUrl>
<vulnerabilityName>CVE-2023-4586</vulnerabilityName>
</suppress>
<suppress>
<suppress until="2023-12-30Z">
<notes><![CDATA[
file name: netty-handler-4.1.94.Final.jar
This vulnerability is disputed, with the argument that SSL configuration is the responsibility of the client rather
than the transport. The change in default is under consideration for the next major Netty release, revisit then.
Regardless, our client (which is what brings in this dependency) enables the concerned feature, hostname verification
Ref:
https://github.com/grpc/grpc-java/issues/10033
https://github.com/netty/netty/issues/8537#issuecomment-1527896917
]]></notes>
<packageUrl regex="true">^pkg:maven/io\.netty/netty\-handler@.*$</packageUrl>
<packageUrl regex="true">^pkg:maven/io\.netty/netty.*@.*$</packageUrl>
<vulnerabilityName>CVE-2023-4586</vulnerabilityName>
</suppress>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,7 @@ data:
}
}
{{- end }}
{{- if hasKey .Values.rawSpansGrouperConfig "traceEmitPunctuatorFrequency" }}
trace.emit.punctuator.frequency = {{ .Values.rawSpansGrouperConfig.traceEmitPunctuatorFrequency }}
{{- end }}
1 change: 1 addition & 0 deletions raw-spans-grouper/helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ rawSpansGrouperConfig:
groupPartitionerEnabled: false
groupPartitionerConfigServiceHost: config-service
groupPartitionerConfigServicePort: 50104
traceEmitPunctuatorFrequency: 5s

span:
groupby:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,8 @@ public class RawSpanGrouperConstants {
public static final String DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT = "default.max.span.count";
public static final String DROPPED_SPANS_COUNTER = "hypertrace.dropped.spans";
public static final String TRUNCATED_TRACES_COUNTER = "hypertrace.truncated.traces";
static final String TRACE_EMIT_PUNCTUATOR = "trace-emit-punctuator";
static final String TRACE_EMIT_PUNCTUATOR_STORE_NAME = "emitter-store";
static final String TRACE_EMIT_PUNCTUATOR_FREQUENCY_CONFIG_KEY =
"trace.emit.punctuator.frequency";
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.OUTPUT_TOPIC_PRODUCER;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.RAW_SPANS_GROUPER_JOB_CONFIG;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.SPAN_STATE_STORE_NAME;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_EMIT_PUNCTUATOR_STORE_NAME;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_STATE_STORE;

import com.google.common.annotations.VisibleForTesting;
import com.typesafe.config.Config;
import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
Expand Down Expand Up @@ -44,6 +49,15 @@ public StreamsBuilder buildTopology(
Map<String, Object> properties,
StreamsBuilder streamsBuilder,
Map<String, KStream<?, ?>> inputStreams) {
return buildTopologyWithClock(Clock.systemUTC(), properties, streamsBuilder, inputStreams);
}

@VisibleForTesting
StreamsBuilder buildTopologyWithClock(
Clock clock,
Map<String, Object> properties,
StreamsBuilder streamsBuilder,
Map<String, KStream<?, ?>> inputStreams) {
Config jobConfig = getJobConfig(properties);
String inputTopic = jobConfig.getString(INPUT_TOPIC_CONFIG_KEY);
String outputTopic = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY);
Expand Down Expand Up @@ -72,8 +86,16 @@ public StreamsBuilder buildTopology(
Stores.persistentKeyValueStore(SPAN_STATE_STORE_NAME), keySerde, valueSerde)
.withCachingEnabled();

StoreBuilder<KeyValueStore<Long, TraceIdentity>> traceEmitPunctuatorStoreBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(TRACE_EMIT_PUNCTUATOR_STORE_NAME),
Serdes.Long(),
Serdes.ListSerde(ArrayList.class, valueSerde))
.withCachingEnabled();

streamsBuilder.addStateStore(spanStoreBuilder);
streamsBuilder.addStateStore(traceStateStoreBuilder);
streamsBuilder.addStateStore(traceEmitPunctuatorStoreBuilder);

StreamPartitioner<TraceIdentity, StructuredTrace> groupPartitioner =
new GroupPartitionerBuilder<TraceIdentity, StructuredTrace>()
Expand All @@ -89,11 +111,12 @@ public StreamsBuilder buildTopology(
outputTopicProducer = outputTopicProducer.withName(OUTPUT_TOPIC_PRODUCER);

inputStream
.transform(
RawSpansProcessor::new,
.process(
() -> new RawSpansProcessor(clock),
Named.as(RawSpansProcessor.class.getSimpleName()),
SPAN_STATE_STORE_NAME,
TRACE_STATE_STORE)
TRACE_STATE_STORE,
TRACE_EMIT_PUNCTUATOR_STORE_NAME)
.to(outputTopic, outputTopicProducer);

return streamsBuilder;
Expand Down
Loading

0 comments on commit 3c038a9

Please sign in to comment.