From 62efa6ac825e2a6d33be607601b71cbfe9ff9f4c Mon Sep 17 00:00:00 2001
From: Ikhun Um <ikhun.um@linecorp.com>
Date: Wed, 11 Dec 2024 14:21:25 +0900
Subject: [PATCH] Periodically remove inactive connection pool metrics (#6024)

Motivation:

We observed that threads were blocked when multiple connections were
closed simultaneously and the endpoint had a small number of event
loops.

https://github.com/line/armeria/blob/fa76e99fa6132545df3a8d05eeb81c5681ec8953/core/src/main/java/com/linecorp/armeria/client/ConnectionPoolMetrics.java#L79-L85

We have no exact evidence, but I guess Micrometer's `remove()` operation
may take a long time. The other logic is a simple HashMap operation that
does not block for a long time.

Modifications:

- Add a dedicated GC thread to remove inactive meters whose active
connections are 0.
  - A jitter is added to prevent GC from executing simultaneously.
  - Unsed meters are removed every hour + jitter.
- `ConnectionPoolListener` now implements `SafeCloseable` so users
should close it when it is unused.

Result:

- Fix the bug where `EventLoop` is blocked for a long time by
`ConnectionPoolListener.metricCollecting()` when a connection is closed.
---
 .../armeria/client/ClientFactoryBuilder.java  |  20 +-
 .../client/ConnectionPoolListener.java        |   8 +-
 .../armeria/client/ConnectionPoolMetrics.java | 192 +++++++++++++++---
 .../armeria/client/HttpClientFactory.java     |   7 +-
 .../ConnectionPoolCollectingMetricTest.java   |  53 ++---
 .../client/ConnectionPoolMetricsTest.java     | 100 +++++++++
 .../client/Http1ResponseDecoderTest.java      |   3 +-
 7 files changed, 320 insertions(+), 63 deletions(-)
 create mode 100644 core/src/test/java/com/linecorp/armeria/client/ConnectionPoolMetricsTest.java

diff --git a/core/src/main/java/com/linecorp/armeria/client/ClientFactoryBuilder.java b/core/src/main/java/com/linecorp/armeria/client/ClientFactoryBuilder.java
index 029ffab983e..b7c4f2627f8 100644
--- a/core/src/main/java/com/linecorp/armeria/client/ClientFactoryBuilder.java
+++ b/core/src/main/java/com/linecorp/armeria/client/ClientFactoryBuilder.java
@@ -131,6 +131,7 @@ public final class ClientFactoryBuilder implements TlsSetters {
     @Nullable
     private ClientTlsConfig tlsConfig;
     private boolean staticTlsSettingsSet;
+    private boolean autoCloseConnectionPoolListener = true;
 
     ClientFactoryBuilder() {
         connectTimeoutMillis(Flags.defaultConnectTimeoutMillis());
@@ -857,11 +858,26 @@ public ClientFactoryBuilder useHttp1Pipelining(boolean useHttp1Pipelining) {
 
     /**
      * Sets the listener which is notified on a connection pool event.
+     * Note that the specified {@link ConnectionPoolListener} will be closed automatically when the
+     * {@link ClientFactory} is closed.
+     */
+    public ClientFactoryBuilder connectionPoolListener(ConnectionPoolListener connectionPoolListener) {
+        return connectionPoolListener(connectionPoolListener, true);
+    }
+
+    /**
+     * Sets the listener which is notified on a connection pool event.
+     *
+     * <p>If {@code autoClose} is true, {@link ConnectionPoolListener#close()} will be automatically called when
+     * the {@link ClientFactory} is closed. Otherwise, you need to close it manually. {@code autoClose} is
+     * enabled by default.
+     *
      */
     public ClientFactoryBuilder connectionPoolListener(
-            ConnectionPoolListener connectionPoolListener) {
+            ConnectionPoolListener connectionPoolListener, boolean autoClose) {
         option(ClientFactoryOptions.CONNECTION_POOL_LISTENER,
                requireNonNull(connectionPoolListener, "connectionPoolListener"));
+        autoCloseConnectionPoolListener = autoClose;
         return this;
     }
 
@@ -1075,7 +1091,7 @@ private ClientFactoryOptions buildOptions() {
      * Returns a newly-created {@link ClientFactory} based on the properties of this builder.
      */
     public ClientFactory build() {
-        return new DefaultClientFactory(new HttpClientFactory(buildOptions()));
+        return new DefaultClientFactory(new HttpClientFactory(buildOptions(), autoCloseConnectionPoolListener));
     }
 
     @Override
diff --git a/core/src/main/java/com/linecorp/armeria/client/ConnectionPoolListener.java b/core/src/main/java/com/linecorp/armeria/client/ConnectionPoolListener.java
index 98759228a03..87cf5fb150d 100644
--- a/core/src/main/java/com/linecorp/armeria/client/ConnectionPoolListener.java
+++ b/core/src/main/java/com/linecorp/armeria/client/ConnectionPoolListener.java
@@ -20,6 +20,7 @@
 import com.linecorp.armeria.common.SessionProtocol;
 import com.linecorp.armeria.common.annotation.UnstableApi;
 import com.linecorp.armeria.common.metric.MeterIdPrefix;
+import com.linecorp.armeria.common.util.SafeCloseable;
 import com.linecorp.armeria.common.util.Ticker;
 import com.linecorp.armeria.common.util.Unwrappable;
 
@@ -29,7 +30,7 @@
 /**
  * Listens to the client connection pool events.
  */
-public interface ConnectionPoolListener extends Unwrappable {
+public interface ConnectionPoolListener extends Unwrappable, SafeCloseable {
 
     /**
      * Returns an instance that does nothing.
@@ -130,4 +131,9 @@ void connectionClosed(SessionProtocol protocol,
     default ConnectionPoolListener unwrap() {
         return this;
     }
+
+    @Override
+    default void close() {
+        // Do nothing by default.
+    }
 }
diff --git a/core/src/main/java/com/linecorp/armeria/client/ConnectionPoolMetrics.java b/core/src/main/java/com/linecorp/armeria/client/ConnectionPoolMetrics.java
index 79701db418e..db9e33223bc 100644
--- a/core/src/main/java/com/linecorp/armeria/client/ConnectionPoolMetrics.java
+++ b/core/src/main/java/com/linecorp/armeria/client/ConnectionPoolMetrics.java
@@ -16,14 +16,29 @@
 package com.linecorp.armeria.client;
 
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 
 import com.linecorp.armeria.common.SessionProtocol;
+import com.linecorp.armeria.common.annotation.Nullable;
 import com.linecorp.armeria.common.metric.MeterIdPrefix;
+import com.linecorp.armeria.common.util.SafeCloseable;
+import com.linecorp.armeria.common.util.ThreadFactories;
 import com.linecorp.armeria.internal.common.util.ReentrantShortLock;
 
 import io.micrometer.core.instrument.Counter;
@@ -32,7 +47,15 @@
 import io.micrometer.core.instrument.MeterRegistry;
 import io.micrometer.core.instrument.Tag;
 
-final class ConnectionPoolMetrics {
+final class ConnectionPoolMetrics implements SafeCloseable {
+
+    private static final Logger logger = LoggerFactory.getLogger(ConnectionPoolMetrics.class);
+
+    private static final ScheduledExecutorService CLEANUP_EXECUTOR =
+            Executors.newSingleThreadScheduledExecutor(
+                    ThreadFactories.newThreadFactory("armeria-connection-metric-cleanup-executor",
+                                                     true));
+
     private static final String PROTOCOL = "protocol";
     private static final String REMOTE_IP = "remote.ip";
     private static final String LOCAL_IP = "local.ip";
@@ -43,13 +66,27 @@ final class ConnectionPoolMetrics {
     @GuardedBy("lock")
     private final Map<List<Tag>, Meters> metersMap = new HashMap<>();
     private final ReentrantShortLock lock = new ReentrantShortLock();
+    private final int cleanupDelaySeconds;
+    private boolean garbageCollecting;
+
+    private volatile boolean closed;
+    private volatile ScheduledFuture<?> scheduledFuture;
 
     /**
      * Creates a new instance with the specified {@link Meter} name.
      */
     ConnectionPoolMetrics(MeterRegistry meterRegistry, MeterIdPrefix idPrefix) {
+        this(meterRegistry, idPrefix, 3600 /* 1 hour */);
+    }
+
+    @VisibleForTesting
+    ConnectionPoolMetrics(MeterRegistry meterRegistry, MeterIdPrefix idPrefix, int cleanupDelaySeconds) {
         this.idPrefix = idPrefix;
         this.meterRegistry = meterRegistry;
+        this.cleanupDelaySeconds = cleanupDelaySeconds;
+        // Schedule a cleanup task to remove unused meters.
+        scheduledFuture = CLEANUP_EXECUTOR.schedule(this::cleanupInactiveMeters,
+                                                    nextCleanupDelaySeconds(), TimeUnit.SECONDS);
     }
 
     void increaseConnOpened(SessionProtocol protocol, InetSocketAddress remoteAddr,
@@ -57,8 +94,7 @@ void increaseConnOpened(SessionProtocol protocol, InetSocketAddress remoteAddr,
         final List<Tag> commonTags = commonTags(protocol, remoteAddr, localAddr);
         lock.lock();
         try {
-            final Meters meters = metersMap.computeIfAbsent(commonTags,
-                                                            key -> new Meters(idPrefix, key, meterRegistry));
+            final Meters meters = metersMap.computeIfAbsent(commonTags, Meters::new);
             meters.increment();
         } finally {
             lock.unlock();
@@ -82,61 +118,153 @@ void increaseConnClosed(SessionProtocol protocol, InetSocketAddress remoteAddr,
             if (meters != null) {
                 meters.decrement();
                 assert meters.activeConnections() >= 0 : "active connections should not be negative. " + meters;
-                if (meters.activeConnections() == 0) {
-                    // XXX(ikhoon): Should we consider to remove the gauge lazily so that collectors can get the
-                    //              value.
-                    // Remove gauges to be garbage collected because the cardinality of remoteAddr could be
-                    // high.
-                    metersMap.remove(commonTags);
-                    meters.remove(meterRegistry);
-                }
             }
         } finally {
             lock.unlock();
         }
     }
 
-    private static final class Meters {
+    void cleanupInactiveMeters() {
+        final List<Meters> unusedMetersList = new ArrayList<>();
+        try {
+            lock.lock();
+            // Prevent meter registration while cleaning up.
+            garbageCollecting = true;
+
+            // Collect unused meters.
+            try {
+                for (final Iterator<Entry<List<Tag>, Meters>> it = metersMap.entrySet().iterator();
+                     it.hasNext();) {
+                    final Entry<List<Tag>, Meters> entry = it.next();
+                    final Meters meters = entry.getValue();
+                    if (meters.activeConnections() == 0) {
+                        unusedMetersList.add(meters);
+                        it.remove();
+                    }
+                }
+
+                if (unusedMetersList.isEmpty()) {
+                    garbageCollecting = false;
+                    return;
+                }
+            } finally {
+                lock.unlock();
+            }
+
+            // Remove unused meters.
+            for (Meters meters : unusedMetersList) {
+                meters.remove(meterRegistry);
+            }
+
+            // Register metrics for the pending meters.
+            lock.lock();
+            try {
+                metersMap.values().forEach(Meters::maybeRegisterMetrics);
+                garbageCollecting = false;
+            } finally {
+                lock.unlock();
+            }
+        } catch (Throwable e) {
+            logger.warn("Failed to cleanup inactive meters.", e);
+            garbageCollecting = false;
+        }
+
+        if (closed) {
+            return;
+        }
+
+        // Schedule the next cleanup task.
+        scheduledFuture = CLEANUP_EXECUTOR.schedule(this::cleanupInactiveMeters,
+                                                    nextCleanupDelaySeconds(), TimeUnit.SECONDS);
+    }
+
+    private long nextCleanupDelaySeconds() {
+        // Schedule the cleanup task randomly between cleanupDelayMinutes and 2 * cleanupDelayMinutes.
+        return cleanupDelaySeconds + ThreadLocalRandom.current().nextInt(cleanupDelaySeconds);
+    }
+
+    @Override
+    public void close() {
+        // This method will be invoked after the connection pool is closed.
+        closed = true;
+        final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
+        scheduledFuture.cancel(false);
+        CLEANUP_EXECUTOR.execute(this::cleanupInactiveMeters);
+    }
 
-        private final Counter opened;
-        private final Counter closed;
-        private final Gauge active;
-        private int activeConnections;
+    private final class Meters {
+
+        private final List<Tag> commonTags;
+
+        @Nullable
+        private Counter opened;
+        @Nullable
+        private Counter closed;
+        @Nullable
+        private Gauge active;
+
+        private int numOpened;
+        private int numClosed;
+
+        Meters(List<Tag> commonTags) {
+            this.commonTags = commonTags;
+            if (!garbageCollecting) {
+                maybeRegisterMetrics();
+            }
+        }
+
+        void maybeRegisterMetrics() {
+            if (opened != null) {
+                return;
+            }
 
-        Meters(MeterIdPrefix idPrefix, List<Tag> commonTags, MeterRegistry registry) {
             opened = Counter.builder(idPrefix.name("connections"))
                             .tags(commonTags)
                             .tag(STATE, "opened")
-                            .register(registry);
+                            .register(meterRegistry);
+            if (numOpened > 0) {
+                opened.increment(numOpened);
+            }
+
             closed = Counter.builder(idPrefix.name("connections"))
                             .tags(commonTags)
                             .tag(STATE, "closed")
-                            .register(registry);
+                            .register(meterRegistry);
+            if (numClosed > 0) {
+                closed.increment(numClosed);
+            }
+
             active = Gauge.builder(idPrefix.name("active.connections"), this, Meters::activeConnections)
                           .tags(commonTags)
-                          .register(registry);
+                          .register(meterRegistry);
         }
 
-        Meters increment() {
-            activeConnections++;
-            opened.increment();
-            return this;
+        void increment() {
+            numOpened++;
+            if (opened != null) {
+                opened.increment();
+            }
         }
 
-        Meters decrement() {
-            activeConnections--;
-            closed.increment();
-            return this;
+        void decrement() {
+            numClosed++;
+            if (closed != null) {
+                closed.increment();
+            }
         }
 
         int activeConnections() {
-            return activeConnections;
+            return numOpened - numClosed;
         }
 
         void remove(MeterRegistry registry) {
-            registry.remove(opened);
-            registry.remove(closed);
-            registry.remove(active);
+            if (opened != null) {
+                assert closed != null;
+                assert active != null;
+                registry.remove(opened);
+                registry.remove(closed);
+                registry.remove(active);
+            }
         }
     }
 }
diff --git a/core/src/main/java/com/linecorp/armeria/client/HttpClientFactory.java b/core/src/main/java/com/linecorp/armeria/client/HttpClientFactory.java
index d4d7aacb279..35ad761e790 100644
--- a/core/src/main/java/com/linecorp/armeria/client/HttpClientFactory.java
+++ b/core/src/main/java/com/linecorp/armeria/client/HttpClientFactory.java
@@ -138,9 +138,10 @@ private static void setupTlsMetrics(List<X509Certificate> certificates, MeterReg
             () -> RequestContext.mapCurrent(
                     ctx -> ctx.eventLoop().withoutContext(), () -> eventLoopGroup().next());
     private final ClientFactoryOptions options;
+    private final boolean autoCloseConnectionPoolListener;
     private final AsyncCloseableSupport closeable = AsyncCloseableSupport.of(this::closeAsync);
 
-    HttpClientFactory(ClientFactoryOptions options) {
+    HttpClientFactory(ClientFactoryOptions options, boolean autoCloseConnectionPoolListener) {
         workerGroup = options.workerGroup();
 
         @SuppressWarnings("unchecked")
@@ -225,6 +226,7 @@ private static void setupTlsMetrics(List<X509Certificate> certificates, MeterReg
         maxConnectionAgeMillis = options.maxConnectionAgeMillis();
         maxNumRequestsPerConnection = options.maxNumRequestsPerConnection();
         channelPipelineCustomizer = options.channelPipelineCustomizer();
+        this.autoCloseConnectionPoolListener = autoCloseConnectionPoolListener;
 
         this.options = options;
 
@@ -461,6 +463,9 @@ private void closeAsync(CompletableFuture<?> future) {
                 logger.warn("Failed to close {}s:", HttpChannelPool.class.getSimpleName(), cause);
             }
 
+            if (autoCloseConnectionPoolListener) {
+                connectionPoolListener.close();
+            }
             if (shutdownWorkerGroupOnClose) {
                 workerGroup.shutdownGracefully().addListener((FutureListener<Object>) f -> {
                     if (f.cause() != null) {
diff --git a/core/src/test/java/com/linecorp/armeria/client/ConnectionPoolCollectingMetricTest.java b/core/src/test/java/com/linecorp/armeria/client/ConnectionPoolCollectingMetricTest.java
index ffd06347c00..d5e0438cde3 100644
--- a/core/src/test/java/com/linecorp/armeria/client/ConnectionPoolCollectingMetricTest.java
+++ b/core/src/test/java/com/linecorp/armeria/client/ConnectionPoolCollectingMetricTest.java
@@ -46,16 +46,16 @@ void shouldCollectConnectionPoolEvents() throws Exception {
         final InetSocketAddress addressA = new InetSocketAddress("10.10.10.10", 3333);
         final InetSocketAddress addressB = new InetSocketAddress("10.10.10.11", 3333);
 
-        final String openABMetricKey = "armeria.client.connections#count{local.ip=10.10.10.11," +
-                                       "protocol=H1,remote.ip=10.10.10.10,state=opened}";
-        final String closedABMetricKey = "armeria.client.connections#count{local.ip=10.10.10.11," +
-                                         "protocol=H1,remote.ip=10.10.10.10,state=closed}";
-        final String activeABMetricKey = "armeria.client.active.connections#value{local.ip=10.10.10.11," +
-                                         "protocol=H1,remote.ip=10.10.10.10}";
-        final String openBAMetricKey = "armeria.client.connections#count{local.ip=10.10.10.10," +
-                                       "protocol=H1,remote.ip=10.10.10.11,state=opened}";
-        final String activeBAMetricKey = "armeria.client.active.connections#value{local.ip=10.10.10.10," +
-                                         "protocol=H1,remote.ip=10.10.10.11}";
+        final String openABMetricKey = "armeria.client.connections#count{" +
+                                       "local.ip=10.10.10.11,protocol=H1,remote.ip=10.10.10.10,state=opened}";
+        final String closedABMetricKey = "armeria.client.connections#count{" +
+                                         "local.ip=10.10.10.11,protocol=H1,remote.ip=10.10.10.10,state=closed}";
+        final String activeABMetricKey = "armeria.client.active.connections#value{" +
+                                         "local.ip=10.10.10.11,protocol=H1,remote.ip=10.10.10.10}";
+        final String openBAMetricKey = "armeria.client.connections#count{" +
+                                       "local.ip=10.10.10.10,protocol=H1,remote.ip=10.10.10.11,state=opened}";
+        final String activeBAMetricKey = "armeria.client.active.connections#value{" +
+                                         "local.ip=10.10.10.10,protocol=H1,remote.ip=10.10.10.11}";
 
         final AttributeMap attributeMap = new DefaultAttributeMap();
 
@@ -64,43 +64,44 @@ void shouldCollectConnectionPoolEvents() throws Exception {
         assertThat(MoreMeters.measureAll(registry)).containsEntry(activeABMetricKey, 1.0);
 
         connectionPoolListener.connectionClosed(SessionProtocol.H1, addressA, addressB, attributeMap);
-        // If the number of connections is 0, the metric is not collected.
+        // Although the number of active connections is 0, the metrics will be not removed immediately but
+        // after an hour.
         assertThat(MoreMeters.measureAll(registry))
-                .doesNotContainKey(openABMetricKey)
-                .doesNotContainKey(closedABMetricKey)
-                .doesNotContainKey(activeABMetricKey);
+                .containsEntry(openABMetricKey, 1.0)
+                .containsEntry(closedABMetricKey, 1.0)
+                .containsEntry(activeABMetricKey, 0.0);
         connectionPoolListener.connectionOpen(SessionProtocol.H1, addressA, addressB, attributeMap);
         assertThat(MoreMeters.measureAll(registry))
-                .containsEntry(openABMetricKey, 1.0)
-                .containsEntry(closedABMetricKey, 0.0)
+                .containsEntry(openABMetricKey, 2.0)
+                .containsEntry(closedABMetricKey, 1.0)
                 .containsEntry(activeABMetricKey, 1.0);
         connectionPoolListener.connectionOpen(SessionProtocol.H1, addressA, addressB, attributeMap);
         assertThat(MoreMeters.measureAll(registry))
-                .containsEntry(openABMetricKey, 2.0)
-                .containsEntry(closedABMetricKey, 0.0)
+                .containsEntry(openABMetricKey, 3.0)
+                .containsEntry(closedABMetricKey, 1.0)
                 .containsEntry(activeABMetricKey, 2.0);
 
         connectionPoolListener.connectionOpen(SessionProtocol.H1, addressB, addressA, attributeMap);
         assertThat(MoreMeters.measureAll(registry))
-                .containsEntry(openABMetricKey, 2.0)
-                .containsEntry(closedABMetricKey, 0.0)
+                .containsEntry(openABMetricKey, 3.0)
+                .containsEntry(closedABMetricKey, 1.0)
                 .containsEntry(activeABMetricKey, 2.0)
                 .containsEntry(openBAMetricKey, 1.0)
                 .containsEntry(activeBAMetricKey, 1.0);
 
         connectionPoolListener.connectionClosed(SessionProtocol.H1, addressA, addressB, attributeMap);
         assertThat(MoreMeters.measureAll(registry))
-                .containsEntry(openABMetricKey, 2.0)
-                .containsEntry(closedABMetricKey, 1.0)
+                .containsEntry(openABMetricKey, 3.0)
+                .containsEntry(closedABMetricKey, 2.0)
                 .containsEntry(activeABMetricKey, 1.0)
                 .containsEntry(openBAMetricKey, 1.0)
                 .containsEntry(activeBAMetricKey, 1.0);
         connectionPoolListener.connectionClosed(SessionProtocol.H1, addressB, addressA, attributeMap);
         assertThat(MoreMeters.measureAll(registry))
-                .containsEntry(openABMetricKey, 2.0)
-                .containsEntry(closedABMetricKey, 1.0)
+                .containsEntry(openABMetricKey, 3.0)
+                .containsEntry(closedABMetricKey, 2.0)
                 .containsEntry(activeABMetricKey, 1.0)
-                .doesNotContainKey(openBAMetricKey)
-                .doesNotContainKey(activeBAMetricKey);
+                .containsEntry(openBAMetricKey, 1.0)
+                .containsEntry(activeBAMetricKey, 0.0);
     }
 }
diff --git a/core/src/test/java/com/linecorp/armeria/client/ConnectionPoolMetricsTest.java b/core/src/test/java/com/linecorp/armeria/client/ConnectionPoolMetricsTest.java
new file mode 100644
index 00000000000..fc5423a2ecd
--- /dev/null
+++ b/core/src/test/java/com/linecorp/armeria/client/ConnectionPoolMetricsTest.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2024 LINE Corporation
+ *
+ * LINE Corporation licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package com.linecorp.armeria.client;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+import org.junit.jupiter.api.Test;
+
+import com.linecorp.armeria.common.SessionProtocol;
+import com.linecorp.armeria.common.metric.MeterIdPrefix;
+import com.linecorp.armeria.common.metric.MoreMeters;
+
+import io.micrometer.core.instrument.Meter;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+
+class ConnectionPoolMetricsTest {
+
+    @Test
+    void shouldRemoveInactiveMetricsPeriodically() {
+        final TestMeterRemovalListener removalListener = new TestMeterRemovalListener();
+        final SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
+        meterRegistry.config().onMeterRemoved(removalListener);
+        final ConnectionPoolMetrics metrics = new ConnectionPoolMetrics(meterRegistry,
+                                                                        new MeterIdPrefix("test"),
+                                                                        2);
+
+        final InetSocketAddress remoteAddr1 = new InetSocketAddress("1.1.1.1", 80);
+        final InetSocketAddress localAddr1 = new InetSocketAddress("1.1.1.2", 80);
+        final InetSocketAddress remoteAddr2 = new InetSocketAddress("2.2.2.1", 80);
+        final InetSocketAddress localAddr2 = new InetSocketAddress("2.2.2.2", 80);
+        final InetSocketAddress remoteAddr3 = new InetSocketAddress("3.3.3.1", 80);
+        final InetSocketAddress localAddr3 = new InetSocketAddress("3.3.3.2", 80);
+        metrics.increaseConnOpened(SessionProtocol.HTTP, remoteAddr1, localAddr1);
+        metrics.increaseConnOpened(SessionProtocol.HTTP, remoteAddr1, localAddr1);
+        metrics.increaseConnClosed(SessionProtocol.HTTP, remoteAddr1, localAddr1);
+        metrics.increaseConnOpened(SessionProtocol.HTTP, remoteAddr2, localAddr2);
+        final Map<String, Double> meters = MoreMeters.measureAll(meterRegistry);
+        assertThat(meters).containsEntry(
+                "test.active.connections#value{local.ip=1.1.1.2,protocol=HTTP,remote.ip=1.1.1.1}", 1.0);
+        assertThat(meters).containsEntry(
+                "test.active.connections#value{local.ip=2.2.2.2,protocol=HTTP,remote.ip=2.2.2.1}", 1.0);
+
+        metrics.increaseConnClosed(SessionProtocol.HTTP, remoteAddr1, localAddr1);
+
+        // GC is working.
+        await().untilTrue(removalListener.removing);
+        // Make sure metrics are collected while GC is working.
+        metrics.increaseConnOpened(SessionProtocol.HTTP, remoteAddr3, localAddr3);
+        // Meters wasn't updated yet.
+        final Map<String, Double> meters1 = MoreMeters.measureAll(meterRegistry);
+        assertThat(meters1).doesNotContainKey(
+                "test.active.connections#value{local.ip=3.3.3.2,protocol=HTTP,remote.ip=3.3.3.1}");
+
+        // GC is done.
+        removalListener.waiting.complete(null);
+        await().untilAsserted(() -> {
+            final Map<String, Double> meters0 = MoreMeters.measureAll(meterRegistry);
+            assertThat(meters0).doesNotContainKey(
+                    "test.active.connections#value{local.ip=1.1.1.2,protocol=HTTP,remote.ip=1.1.1.1}");
+            assertThat(meters0).containsEntry(
+                    "test.active.connections#value{local.ip=2.2.2.2,protocol=HTTP,remote.ip=2.2.2.1}", 1.0);
+            assertThat(meters0).containsEntry(
+                    "test.active.connections#value{local.ip=3.3.3.2,protocol=HTTP,remote.ip=3.3.3.1}", 1.0);
+        });
+    }
+
+    private static final class TestMeterRemovalListener implements Consumer<Meter> {
+
+        final AtomicBoolean removing = new AtomicBoolean();
+        final CompletableFuture<Void> waiting = new CompletableFuture<>();
+
+        @Override
+        public void accept(Meter meter) {
+            removing.set(true);
+            waiting.join();
+        }
+    }
+}
diff --git a/core/src/test/java/com/linecorp/armeria/client/Http1ResponseDecoderTest.java b/core/src/test/java/com/linecorp/armeria/client/Http1ResponseDecoderTest.java
index c600209cc17..318da8e8099 100644
--- a/core/src/test/java/com/linecorp/armeria/client/Http1ResponseDecoderTest.java
+++ b/core/src/test/java/com/linecorp/armeria/client/Http1ResponseDecoderTest.java
@@ -35,7 +35,8 @@ class Http1ResponseDecoderTest {
     @Test
     void testRequestTimeoutClosesImmediately() throws Exception {
         final EmbeddedChannel channel = new EmbeddedChannel();
-        try (HttpClientFactory httpClientFactory = new HttpClientFactory(ClientFactoryOptions.of())) {
+        try (HttpClientFactory httpClientFactory = new HttpClientFactory(ClientFactoryOptions.of(),
+                                                                         true)) {
             final Http1ResponseDecoder decoder = new Http1ResponseDecoder(
                     channel, httpClientFactory, SessionProtocol.H1);
             channel.pipeline().addLast(decoder);