From 5f9c9fa5e22569dbfa8a08f7340c4cf888ccb1cd Mon Sep 17 00:00:00 2001 From: "nastassia.dailidava" Date: Sat, 19 Oct 2024 21:01:49 +0200 Subject: [PATCH] allegro-internal/flex-roadmap#819 returned all other metrics --- .../MetricsDiscoveryServerCallbacks.kt | 22 +++++++- .../servicemesh/envoycontrol/utils/Metrics.kt | 11 ++-- .../envoycontrol/utils/ReactorUtils.kt | 53 ++++++++++++++++++- .../envoycontrol/utils/ReactorUtilsTest.kt | 2 - .../infrastructure/ControlPlaneConfig.kt | 18 ++++++- .../MetricsDiscoveryServerCallbacksTest.kt | 10 ++-- 6 files changed, 96 insertions(+), 20 deletions(-) diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/MetricsDiscoveryServerCallbacks.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/MetricsDiscoveryServerCallbacks.kt index 05e610263..9acc69af1 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/MetricsDiscoveryServerCallbacks.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/MetricsDiscoveryServerCallbacks.kt @@ -6,6 +6,8 @@ import io.micrometer.core.instrument.MeterRegistry import io.micrometer.core.instrument.Tags import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTIONS_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTION_TYPE_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.DISCOVERY_REQ_TYPE_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.REQUESTS_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.STREAM_TYPE_TAG import java.util.concurrent.atomic.AtomicInteger import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest as V3DeltaDiscoveryRequest @@ -58,14 +60,30 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry) } override fun onV3StreamRequest(streamId: Long, request: V3DiscoveryRequest) { - // noop + meterRegistry.counter( + REQUESTS_METRIC, + Tags.of( + CONNECTION_TYPE_TAG, "grpc", + STREAM_TYPE_TAG, StreamType.fromTypeUrl(request.typeUrl).name.lowercase(), + DISCOVERY_REQ_TYPE_TAG, "total" + ) + ) + .increment() } override fun onV3StreamDeltaRequest( streamId: Long, request: V3DeltaDiscoveryRequest ) { - // noop + meterRegistry.counter( + REQUESTS_METRIC, + Tags.of( + CONNECTION_TYPE_TAG, "grpc", + STREAM_TYPE_TAG, StreamType.fromTypeUrl(request.typeUrl).name.lowercase(), + DISCOVERY_REQ_TYPE_TAG, "delta" + ) + ) + .increment() } override fun onStreamCloseWithError(streamId: Long, typeUrl: String, error: Throwable) { diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Metrics.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Metrics.kt index 1d249fa98..c1e0ca903 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Metrics.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Metrics.kt @@ -7,16 +7,16 @@ import io.micrometer.core.instrument.noop.NoopTimer val noopTimer = NoopTimer(Meter.Id("", Tags.empty(), null, null, Meter.Type.TIMER)) const val REACTOR_METRIC = "reactor.stats" const val SERVICES_STATE_METRIC = "services.state" -const val SERVICES_STATE_ERRORS_METRIC = "services.state.errors" +const val SERVICES_STATE_ERRORS_METRIC = "services.state.errors.total" const val SNAPSHOT_METRIC = "snapshot" const val SNAPSHOT_UPDATE_DURATION_METRIC = "snapshot.update.duration.seconds" const val SNAPSHOT_ERROR_METRIC = "snapshot.errors" -const val SNAPSHOT_GROUP_ERROR_METRIC = "snapshot.group.errors" -const val WATCH_ERRORS_METRIC = "watch.errors.total" +const val SNAPSHOT_GROUP_ERROR_METRIC = "snapshot.group.errors.total" const val COMMUNICATION_MODE_ERROR_METRIC = "communication.errors.total" const val CONNECTIONS_METRIC = "connections.stats" -const val REQUESTS_METRIC = "stream.requests" -const val WATCH_METRIC = "watch.stats" +const val REQUESTS_METRIC = "requests.stats" +const val WATCH_ERRORS_METRIC = "service.watch.errors.total" +const val WATCH_METRIC = "service.watch" const val ENVOY_CONTROL_WARM_UP_METRIC = "envoy.control.warmup.seconds" const val CROSS_DC_SYNC_METRIC = "cross.dc.synchronization" const val CROSS_DC_SYNC_CANCELLED_METRIC = "$CROSS_DC_SYNC_METRIC.cancelled.total" @@ -35,7 +35,6 @@ const val WATCH_TYPE_TAG = "watch-type" const val DISCOVERY_REQ_TYPE_TAG = "discovery-request-type" const val METRIC_TYPE_TAG = "metric-type" const val METRIC_EMITTER_TAG = "metric-emitter" -const val SNAPSHOT_STATUS_TAG = "snapshot-status" const val UPDATE_TRIGGER_TAG = "update-trigger" const val SERVICE_TAG = "service" const val OPERATION_TAG = "operation" diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtils.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtils.kt index 1cfc0457a..ab4806a09 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtils.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtils.kt @@ -1,6 +1,7 @@ package pl.allegro.tech.servicemesh.envoycontrol.utils import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tags import org.reactivestreams.Subscription import org.slf4j.LoggerFactory import reactor.core.Disposable @@ -11,6 +12,7 @@ import reactor.core.scheduler.Scheduler import reactor.core.scheduler.Schedulers import java.time.Duration import java.util.concurrent.TimeUnit +import kotlin.streams.asSequence private val logger = LoggerFactory.getLogger("pl.allegro.tech.servicemesh.envoycontrol.utils.ReactorUtils") private val defaultScheduler by lazy { Schedulers.newSingle("reactor-utils-scheduler") } @@ -110,7 +112,12 @@ private fun measureQueueSubscriptionBuffer( name: String, meterRegistry: MeterRegistry ) { - logger.info("subscription $subscription name: $name meterRegistry: $meterRegistry") + meterRegistry.gauge( + REACTOR_METRIC, + Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, name), + subscription, + queueSubscriptionBufferExtractor + ) } private fun measureScannableBuffer( @@ -119,7 +126,49 @@ private fun measureScannableBuffer( innerSources: Int, meterRegistry: MeterRegistry ) { - logger.info("scannable $scannable name: $name innerSources: $innerSources meterRegistry: $meterRegistry") + val buffered = scannable.scan(Scannable.Attr.BUFFERED) + if (buffered == null) { + logger.error( + "Cannot register metric $REACTOR_METRIC 'with $METRIC_EMITTER_TAG: $name'. Buffer size not available. " + + "Use measureBuffer() only on supported reactor operators" + ) + return + } + + meterRegistry.gauge( + REACTOR_METRIC, + Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, name), + scannable, + scannableBufferExtractor + ) + + /** + * Special case for FlatMap derived operators like merge(). The main buffer attribute doesn't return actual + * buffer (that is controlled by `prefetch` parameter) size. Instead it returns simply number of connected sources. + * + * To access actual buffer size, we need to extract it from inners(). We don't know how many sources will + * be available, so it must be stated explicitly as innerSources parameter. + */ + for (i in 0 until innerSources) { + meterRegistry.gauge( + REACTOR_METRIC, + Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, "${(name)}_$i"), + scannable, + innerBufferExtractor(i) + ) + } +} + +private val scannableBufferExtractor = { s: Scannable -> s.scan(Scannable.Attr.BUFFERED)?.toDouble() ?: -1.0 } +private fun innerBufferExtractor(index: Int) = { s: Scannable -> + s.inners().asSequence() + .elementAtOrNull(index) + ?.let(scannableBufferExtractor) + ?: -1.0 +} + +private val queueSubscriptionBufferExtractor = { s: Fuseable.QueueSubscription<*> -> + s.size.toDouble() } sealed class ParallelizableScheduler diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtilsTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtilsTest.kt index 111a1ee7b..c2f43c8f0 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtilsTest.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtilsTest.kt @@ -3,7 +3,6 @@ package pl.allegro.tech.servicemesh.envoycontrol.utils import io.micrometer.core.instrument.Tags import io.micrometer.core.instrument.simple.SimpleMeterRegistry import org.assertj.core.api.Assertions.assertThat -import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.junit.jupiter.api.fail import org.testcontainers.shaded.org.awaitility.Awaitility @@ -13,7 +12,6 @@ import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit import java.util.function.BiFunction -@Disabled class ReactorUtilsTest { @Test diff --git a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/ControlPlaneConfig.kt b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/ControlPlaneConfig.kt index 8d50b62b5..6b88dc8a3 100644 --- a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/ControlPlaneConfig.kt +++ b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/ControlPlaneConfig.kt @@ -3,6 +3,7 @@ package pl.allegro.tech.servicemesh.envoycontrol.infrastructure import com.ecwid.consul.v1.ConsulClient import com.fasterxml.jackson.databind.ObjectMapper import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tags import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.ConfigurationProperties @@ -40,6 +41,10 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.transformers.RegexServi import pl.allegro.tech.servicemesh.envoycontrol.services.transformers.ServiceInstancesTransformer import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.EnvoyHttpFilters import pl.allegro.tech.servicemesh.envoycontrol.synchronization.GlobalStateChanges +import pl.allegro.tech.servicemesh.envoycontrol.utils.CACHE_GROUP_COUNT_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.STATUS_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.WATCH_ERRORS_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.WATCH_METRIC import reactor.core.scheduler.Schedulers import java.net.URI @@ -172,7 +177,18 @@ class ControlPlaneConfig { ConsulClient(properties.host, properties.port).agentSelf.value?.config?.datacenter ?: "local" fun controlPlaneMetrics(meterRegistry: MeterRegistry): DefaultEnvoyControlMetrics { - return DefaultEnvoyControlMetrics(meterRegistry = meterRegistry) + return DefaultEnvoyControlMetrics(meterRegistry = meterRegistry).also { + meterRegistry.gauge(WATCH_METRIC, Tags.of(STATUS_TAG, "added"), it.servicesAdded) + meterRegistry.gauge(WATCH_METRIC, Tags.of(STATUS_TAG, "removed"), it.servicesRemoved) + meterRegistry.gauge(WATCH_METRIC, Tags.of(STATUS_TAG, "instance-changed"), it.instanceChanges) + meterRegistry.gauge(WATCH_METRIC, Tags.of(STATUS_TAG, "snapshot-changed"), it.snapshotChanges) + meterRegistry.gauge(CACHE_GROUP_COUNT_METRIC, it.cacheGroupsCount) + it.meterRegistry.more().counter( + WATCH_ERRORS_METRIC, + listOf(), + it.errorWatchingServices + ) + } } @Bean diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt index b7ebbd690..bd1497e65 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt @@ -2,7 +2,6 @@ package pl.allegro.tech.servicemesh.envoycontrol import io.micrometer.core.instrument.Tags import org.assertj.core.api.Assertions.assertThat -import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension import pl.allegro.tech.servicemesh.envoycontrol.assertions.untilAsserted @@ -21,15 +20,14 @@ import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscover import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscoveryServerCallbacks.StreamType.RDS import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscoveryServerCallbacks.StreamType.SDS import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscoveryServerCallbacks.StreamType.UNKNOWN -import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTION_TYPE_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTIONS_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTION_TYPE_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.DISCOVERY_REQ_TYPE_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.REQUESTS_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.STREAM_TYPE_TAG import java.util.function.Consumer import java.util.function.Predicate -@Disabled class XdsMetricsDiscoveryServerCallbacksTest : MetricsDiscoveryServerCallbacksTest { companion object { @@ -89,7 +87,6 @@ class XdsMetricsDiscoveryServerCallbacksTest : MetricsDiscoveryServerCallbacksTe ) } -@Disabled class AdsMetricsDiscoveryServerCallbackTest : MetricsDiscoveryServerCallbacksTest { companion object { @@ -149,7 +146,6 @@ class AdsMetricsDiscoveryServerCallbackTest : MetricsDiscoveryServerCallbacksTes ) } -@Disabled class DeltaAdsMetricsDiscoveryServerCallbackTest : MetricsDiscoveryServerCallbacksTest { companion object { @@ -209,7 +205,6 @@ class DeltaAdsMetricsDiscoveryServerCallbackTest : MetricsDiscoveryServerCallbac ) } -@Disabled interface MetricsDiscoveryServerCallbacksTest { companion object { private val logger by logger() @@ -251,7 +246,8 @@ interface MetricsDiscoveryServerCallbacksTest { ).isNotNull assertThat( meterRegistry.get(metric) - .tags(Tags.of(STREAM_TYPE_TAG, type.name.lowercase(), CONNECTION_TYPE_TAG, "grpc")).gauge().value() + .tags(Tags.of(STREAM_TYPE_TAG, type.name.lowercase(), CONNECTION_TYPE_TAG, "grpc")).gauge() + .value() .toInt() ).isEqualTo(value) }