From e52f281118e31ca6ae3752b2e694750f245bb5ae Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Wed, 22 Jan 2025 16:16:58 +0800 Subject: [PATCH] add comments and fix tests --- .../org/apache/pegasus/rpc/async/MetaSession.java | 13 +++++++++---- .../apache/pegasus/rpc/async/MetaSessionTest.java | 2 +- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/java-client/src/main/java/org/apache/pegasus/rpc/async/MetaSession.java b/java-client/src/main/java/org/apache/pegasus/rpc/async/MetaSession.java index 8b002db050..8931afc993 100644 --- a/java-client/src/main/java/org/apache/pegasus/rpc/async/MetaSession.java +++ b/java-client/src/main/java/org/apache/pegasus/rpc/async/MetaSession.java @@ -264,26 +264,30 @@ void resolveHost(String hostPort) throws IllegalArgumentException { return; } - Set newSet = new TreeSet<>(Arrays.asList(addrs)); Set oldSet = metaList.stream() .map(ReplicaSession::getAddress) .collect(Collectors.toCollection(TreeSet::new)); + Set newSet = new TreeSet<>(Arrays.asList(addrs)); - // fast path: do nothing if meta list is unchanged. + // Do nothing if meta list is unchanged. if (newSet.equals(oldSet)) { return; } - // removed metas + // Find the meta servers that should be removed. Set removedSet = new HashSet<>(oldSet); removedSet.removeAll(newSet); + // Iterate over the current meta list: once a meta server is found in the removed set, + // it would be removed from the meta list after its session is closed. Iterator iterator = metaList.iterator(); while (iterator.hasNext()) { ReplicaSession session = iterator.next(); rpc_address addr = session.getAddress(); if (!removedSet.contains(addr)) { + // This meta server is not found in the removed set, which means it should just be + // retained. continue; } @@ -292,10 +296,11 @@ void resolveHost(String hostPort) throws IllegalArgumentException { logger.info("meta server {} was removed", addr); } - // newly added metas + // Find the meta servers that should be added. Set addedSet = new HashSet<>(newSet); addedSet.removeAll(oldSet); + // Add each new meta servers to the meta list. for (rpc_address addr : addedSet) { metaList.add(clusterManager.getReplicaSession(addr)); logger.info("meta server {} was added", addr); diff --git a/java-client/src/test/java/org/apache/pegasus/rpc/async/MetaSessionTest.java b/java-client/src/test/java/org/apache/pegasus/rpc/async/MetaSessionTest.java index a9d7a4fe0b..01261757a5 100644 --- a/java-client/src/test/java/org/apache/pegasus/rpc/async/MetaSessionTest.java +++ b/java-client/src/test/java/org/apache/pegasus/rpc/async/MetaSessionTest.java @@ -256,7 +256,7 @@ public void testMetaForwardUnknownPrimary() throws Exception { FieldUtils.writeField(op, "response", new query_cfg_response(), true); op.get_response().err = new error_code(); op.get_response().err.errno = error_code.error_types.ERR_FORWARD_TO_OTHERS; - op.get_response().partitions = Collections.singletonList(new partition_configuration[1]); + op.get_response().partitions = Collections.singletonList(new partition_configuration()); op.get_response().partitions.set(0, new partition_configuration()); op.get_response().partitions.get(0).primary = rpc_address.fromIpPort("172.0.0.3:34601"); MetaSession.MetaRequestRound round =