diff --git a/d2/src/main/java/com/linkedin/d2/xds/GlobCollectionUtils.java b/d2/src/main/java/com/linkedin/d2/xds/GlobCollectionUtils.java index 7733011cb..3eb9d0f15 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/GlobCollectionUtils.java +++ b/d2/src/main/java/com/linkedin/d2/xds/GlobCollectionUtils.java @@ -1,5 +1,7 @@ package com.linkedin.d2.xds; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; import javax.annotation.Nullable; public class GlobCollectionUtils @@ -81,4 +83,19 @@ public static String globCollectionUrlForClusterResource(String clusterPath) clusterPath.substring(clusterPath.lastIndexOf('/') + 1) + GLOB_COLLECTION_SUFFIX; } + + public static String globCollectionUrn(String clusterName, String uri) + { + try + { + return D2_URI_NODE_GLOB_COLLECTION_PREFIX + clusterName + "/" + URLEncoder.encode(uri, "UTF-8"); + } + catch (UnsupportedEncodingException e) + { + // Note that this is impossible. It is only thrown if the charset isn't recognized, and UTF-8 is known to be + // supported. + throw new RuntimeException(e); + } + + } } diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java index f5040dcb4..f04a1a6b2 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java @@ -95,6 +95,23 @@ final void onChanged(ResourceUpdate update) } } + public static abstract class D2UriResourceWatcher + { + public abstract void onChanged(XdsD2.D2URI d2Uri); + + public abstract void onDelete(); + + /** + * Called when the resource discovery RPC encounters some transient error. + */ + public abstract void onError(Status error); + + /** + * Called when the resource discovery RPC reestablishes connection. + */ + public abstract void onReconnect(); + } + public static abstract class WildcardResourceWatcher { private final ResourceType _type; @@ -355,6 +372,13 @@ static ResourceType fromTypeUrl(String typeUrl) */ public abstract void watchAllXdsResources(WildcardResourceWatcher watcher); + /** + * Subscribes the given {@link D2UriResourceWatcher} to a specific URI in a specific cluster. The watcher will be + * notified whenever the URI is added or removed. Repeated calls to this function with the same watcher will always + * notify the given watcher of the current data. + */ + public abstract void watchD2Uri(String cluster, String uri, D2UriResourceWatcher watcher); + /** * Initiates the RPC stream to the xDS server. */ diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index 05e4a922a..4247338bd 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -83,6 +83,7 @@ public class XdsClientImpl extends XdsClient Stream.of(ResourceType.NODE, ResourceType.D2_URI_MAP) .collect(Collectors.toMap(Function.identity(), e -> new HashMap<>()))); private final Map _wildcardSubscribers = Maps.newEnumMap(ResourceType.class); + private final Map _d2UriSubscribers = new HashMap<>(); private final Node _node; private final ManagedChannel _managedChannel; private final ScheduledExecutorService _executorService; @@ -206,11 +207,41 @@ public void watchAllXdsResources(WildcardResourceWatcher watcher) }); } + @Override + public void watchD2Uri(String cluster, String uri, D2UriResourceWatcher watcher) + { + _executorService.execute(() -> + { + String urn = GlobCollectionUtils.globCollectionUrn(cluster, uri); + D2UriSubscriber subscriber = getD2UriSubscribers().get(urn); + if (subscriber == null) + { + subscriber = new D2UriSubscriber(urn); + getD2UriSubscribers().put(urn, subscriber); + + _log.info("Subscribing to D2URI: {}", urn); + + if (_adsStream == null && !isInBackoff()) + { + startRpcStreamLocal(); + } + if (_adsStream != null) + { + _adsStream.sendDiscoveryRequest(ResourceType.D2_URI, Collections.singletonList(urn)); + } + } + + subscriber.addWatcher(watcher); + }); + } + @Override public void startRpcStream() { - _executorService.execute(() -> { - if (!isInBackoff()) { + _executorService.execute(() -> + { + if (!isInBackoff()) + { try { startRpcStreamLocal(); @@ -429,7 +460,8 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data) ResourceSubscriber subscriber = getResourceSubscriberMap(ResourceType.D2_URI_MAP).get(uriId.getClusterResourceName()); WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(ResourceType.D2_URI_MAP); - if (subscriber == null && wildcardSubscriber == null) + D2UriSubscriber d2UriSubscriber = getD2UriSubscribers().get(resourceName); + if (subscriber == null && wildcardSubscriber == null && d2UriSubscriber == null) { String msg = String.format("Ignoring D2URI resource update for untracked cluster: %s", resourceName); _log.warn(msg); @@ -437,6 +469,36 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data) return; } + XdsD2.D2URI uri; + if (resource != null) + { + try + { + uri = resource.getResource().unpack(XdsD2.D2URI.class); + } + catch (InvalidProtocolBufferException e) + { + _log.warn("Failed to unpack D2URI", e); + errors.add("Failed to unpack D2URI"); + return; + } + } + else + { + uri = null; + } + + + if (d2UriSubscriber != null) + { + d2UriSubscriber.onData(uri, _serverMetricsProvider); + } + + if (subscriber == null && wildcardSubscriber == null) + { + return; + } + // Get or create a new D2URIMapUpdate which is a copy of the existing data for that cluster. D2URIMapUpdate update = updates.computeIfAbsent(uriId.getClusterResourceName(), k -> { @@ -461,8 +523,8 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data) } }); - // If the resource is null, it's being deleted - if (resource == null) + // If the uri is null, it's being deleted + if (uri == null) { // This is the special case where the entire collection is being deleted. This either means the client // subscribed to a cluster that does not exist, or all hosts stopped announcing to the cluster. @@ -478,16 +540,7 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data) } else { - try - { - XdsD2.D2URI uri = resource.getResource().unpack(XdsD2.D2URI.class); - update.putUri(uriId.getUriName(), uri); - } - catch (InvalidProtocolBufferException e) - { - _log.warn("Failed to unpack D2URI", e); - errors.add("Failed to unpack D2URI"); - } + update.putUri(uriId.getUriName(), uri); } }); sendAckOrNack(data.getResourceType(), data.getNonce(), errors); @@ -565,6 +618,10 @@ private void notifyStreamError(Status error) { wildcardResourceSubscriber.onError(error); } + for (D2UriSubscriber uriSubscriber : _d2UriSubscribers.values()) + { + uriSubscriber.onError(error); + } _xdsClientJmx.setIsConnected(false); } @@ -581,6 +638,10 @@ private void notifyStreamReconnect() { wildcardResourceSubscriber.onReconnect(); } + for (D2UriSubscriber uriSubscriber : _d2UriSubscribers.values()) + { + uriSubscriber.onReconnect(); + } _xdsClientJmx.setIsConnected(true); } @@ -596,6 +657,12 @@ WildcardResourceSubscriber getWildcardResourceSubscriber(ResourceType type) return _wildcardSubscribers.get(type); } + @VisibleForTesting + Map getD2UriSubscribers() + { + return _d2UriSubscribers; + } + static class ResourceSubscriber { private final ResourceType _type; @@ -858,6 +925,75 @@ void onRemoval(String resourceName) } } + static class D2UriSubscriber + { + private final Set _watchers = new HashSet<>(); + private XdsD2.D2URI _d2Uri; + private final String _name; + + D2UriSubscriber(String name) + { + _name = name; + } + + + @VisibleForTesting + public XdsD2.D2URI getData() + { + return _d2Uri; + } + + @VisibleForTesting + public void setData(XdsD2.D2URI d2Uri) + { + _d2Uri = d2Uri; + } + + void addWatcher(D2UriResourceWatcher watcher) + { + _watchers.add(watcher); + watcher.onChanged(_d2Uri); + _log.debug("Notifying watcher of current data for D2URI {}: {}", _name, _d2Uri); + } + + private void onData(XdsD2.D2URI d2Uri, XdsServerMetricsProvider metricsProvider) + { + if (Objects.equals(_d2Uri, d2Uri)) + { + _log.debug("Received resource update data equal to the current data for {}. Will not perform the update.", + _name); + return; + } + if (d2Uri == null) + { + for (D2UriResourceWatcher watcher : _watchers) + { + watcher.onDelete(); + } + } + else + { + metricsProvider.trackLatency(System.currentTimeMillis() - d2Uri.getModifiedTime().getSeconds() * 1000); + } + } + + private void onError(Status error) + { + for (D2UriResourceWatcher watcher : _watchers) + { + watcher.onError(error); + } + } + + private void onReconnect() + { + for (D2UriResourceWatcher watcher : _watchers) + { + watcher.onReconnect(); + } + } + } + final class RpcRetryTask implements Runnable { @Override @@ -890,6 +1026,10 @@ public void run() } _adsStream.sendDiscoveryRequest(rewrittenType, resources); } + if (!_d2UriSubscribers.isEmpty()) + { + _adsStream.sendDiscoveryRequest(ResourceType.D2_URI, _d2UriSubscribers.keySet()); + } } }