diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt index 233fa84cd..56cd44685 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt @@ -13,7 +13,6 @@ import pl.allegro.tech.servicemesh.envoycontrol.logger import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.WATCH_TYPE_TAG -import reactor.core.observability.micrometer.Micrometer import reactor.core.publisher.Flux import reactor.core.publisher.FluxSink import java.util.function.Consumer @@ -41,7 +40,7 @@ internal class GroupChangeWatcher( .checkpoint("group-change-watcher-emitted") .name(REACTOR_METRIC) .tag(WATCH_TYPE_TAG, "group") - .tap(Micrometer.metrics(meterRegistry)) + .metrics() .doOnSubscribe { logger.info("Watching group changes") } diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt index 56643c244..2ca421461 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt @@ -10,21 +10,20 @@ import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode.XDS import pl.allegro.tech.servicemesh.envoycontrol.groups.Group import pl.allegro.tech.servicemesh.envoycontrol.logger import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState -import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelizableScheduler -import pl.allegro.tech.servicemesh.envoycontrol.utils.doOnNextScheduledOn -import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer -import pl.allegro.tech.servicemesh.envoycontrol.utils.noopTimer -import pl.allegro.tech.servicemesh.envoycontrol.utils.onBackpressureLatestMeasured -import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.ERRORS_TOTAL_METRIC -import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelizableScheduler +import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICE_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.SIMPLE_CACHE_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_STATUS_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.STATUS_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.UPDATE_TRIGGER_TAG -import reactor.core.observability.micrometer.Micrometer +import pl.allegro.tech.servicemesh.envoycontrol.utils.doOnNextScheduledOn +import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer +import pl.allegro.tech.servicemesh.envoycontrol.utils.noopTimer +import pl.allegro.tech.servicemesh.envoycontrol.utils.onBackpressureLatestMeasured import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.scheduler.Scheduler @@ -67,7 +66,7 @@ class SnapshotUpdater( .tag(METRIC_EMITTER_TAG, "snapshot-updater") .tag(SNAPSHOT_STATUS_TAG, "merged") .tag(UPDATE_TRIGGER_TAG, "global") - .tap(Micrometer.metrics(meterRegistry)) + .metrics() // step 3: group updates don't provide a snapshot, // so we piggyback the last updated snapshot state for use .scan { previous: UpdateResult, newUpdate: UpdateResult -> @@ -111,7 +110,7 @@ class SnapshotUpdater( .tag(METRIC_EMITTER_TAG, "snapshot-updater") .tag(SNAPSHOT_STATUS_TAG, "published") .tag(UPDATE_TRIGGER_TAG, "groups") - .tap(Micrometer.metrics(meterRegistry)) + .metrics() .onErrorResume { e -> meterRegistry.counter( ERRORS_TOTAL_METRIC, @@ -136,7 +135,7 @@ class SnapshotUpdater( .name(REACTOR_METRIC) .tag(UPDATE_TRIGGER_TAG, "services") .tag(STATUS_TAG, "published") - .tap(Micrometer.metrics(meterRegistry)) + .metrics() .createClusterConfigurations() .map { (states, clusters) -> var lastXdsSnapshot: GlobalSnapshot? = null diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt index 24857532d..912b6f57f 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt @@ -1,16 +1,15 @@ package pl.allegro.tech.servicemesh.envoycontrol.synchronization import io.micrometer.core.instrument.MeterRegistry +import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterStateChanges import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState.Companion.toMultiClusterState -import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterStateChanges import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.logSuppressedError import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer -import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.onBackpressureLatestMeasured -import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC -import reactor.core.observability.micrometer.Micrometer import reactor.core.publisher.Flux import reactor.core.scheduler.Schedulers @@ -50,7 +49,7 @@ class GlobalStateChanges( .checkpoint("global-service-changes-emitted") .name(REACTOR_METRIC) .tag(METRIC_EMITTER_TAG, "global-service-changes-combinator") - .tap(Micrometer.metrics(meterRegistry)) + .metrics() } private fun combinedExperimentalFlow( @@ -84,6 +83,6 @@ class GlobalStateChanges( .publishOn(scheduler, 1) .checkpoint("global-service-changes-published") .tag(CHECKPOINT_TAG, "published") - .tap(Micrometer.metrics(meterRegistry)) + .metrics() } }