Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow subscribing to a single resource #1037

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions d2/src/main/java/com/linkedin/d2/xds/GlobCollectionUtils.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.d2.xds;

import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import javax.annotation.Nullable;

public class GlobCollectionUtils
Expand Down Expand Up @@ -81,4 +83,19 @@ public static String globCollectionUrlForClusterResource(String clusterPath)
clusterPath.substring(clusterPath.lastIndexOf('/') + 1) +
GLOB_COLLECTION_SUFFIX;
}

public static String globCollectionUrn(String clusterName, String uri)
{
try
{
return D2_URI_NODE_GLOB_COLLECTION_PREFIX + clusterName + "/" + URLEncoder.encode(uri, "UTF-8");
}
catch (UnsupportedEncodingException e)
{
// Note that this is impossible. It is only thrown if the charset isn't recognized, and UTF-8 is known to be
// supported.
throw new RuntimeException(e);
}

}
}
24 changes: 24 additions & 0 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,23 @@ final void onChanged(ResourceUpdate update)
}
}

public static abstract class D2UriResourceWatcher
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should make this extends ResourceWatcher for better code structure, so that:

  1. watchD2Uri is not needed, and can just reuse watchXdsResource(globCollectionUrn(cluster, uri), watcher).
  2. D2UriSubscriber is not needed, and can just use ResourceSubscriber.

{
public abstract void onChanged(XdsD2.D2URI d2Uri);

public abstract void onDelete();

/**
* Called when the resource discovery RPC encounters some transient error.
*/
public abstract void onError(Status error);

/**
* Called when the resource discovery RPC reestablishes connection.
*/
public abstract void onReconnect();
}

public static abstract class WildcardResourceWatcher
{
private final ResourceType _type;
Expand Down Expand Up @@ -355,6 +372,13 @@ static ResourceType fromTypeUrl(String typeUrl)
*/
public abstract void watchAllXdsResources(WildcardResourceWatcher watcher);

/**
* Subscribes the given {@link D2UriResourceWatcher} to a specific URI in a specific cluster. The watcher will be
* notified whenever the URI is added or removed. Repeated calls to this function with the same watcher will always
* notify the given watcher of the current data.
*/
public abstract void watchD2Uri(String cluster, String uri, D2UriResourceWatcher watcher);

/**
* Initiates the RPC stream to the xDS server.
*/
Expand Down
170 changes: 155 additions & 15 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class XdsClientImpl extends XdsClient
Stream.of(ResourceType.NODE, ResourceType.D2_URI_MAP)
.collect(Collectors.toMap(Function.identity(), e -> new HashMap<>())));
private final Map<ResourceType, WildcardResourceSubscriber> _wildcardSubscribers = Maps.newEnumMap(ResourceType.class);
private final Map<String, D2UriSubscriber> _d2UriSubscribers = new HashMap<>();
private final Node _node;
private final ManagedChannel _managedChannel;
private final ScheduledExecutorService _executorService;
Expand Down Expand Up @@ -206,11 +207,41 @@ public void watchAllXdsResources(WildcardResourceWatcher watcher)
});
}

@Override
public void watchD2Uri(String cluster, String uri, D2UriResourceWatcher watcher)
{
_executorService.execute(() ->
{
String urn = GlobCollectionUtils.globCollectionUrn(cluster, uri);
D2UriSubscriber subscriber = getD2UriSubscribers().get(urn);
if (subscriber == null)
{
subscriber = new D2UriSubscriber(urn);
getD2UriSubscribers().put(urn, subscriber);

_log.info("Subscribing to D2URI: {}", urn);

if (_adsStream == null && !isInBackoff())
{
startRpcStreamLocal();
}
if (_adsStream != null)
{
_adsStream.sendDiscoveryRequest(ResourceType.D2_URI, Collections.singletonList(urn));
}
}

subscriber.addWatcher(watcher);
});
}

@Override
public void startRpcStream()
{
_executorService.execute(() -> {
if (!isInBackoff()) {
_executorService.execute(() ->
{
if (!isInBackoff())
{
try
{
startRpcStreamLocal();
Expand Down Expand Up @@ -429,14 +460,45 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data)
ResourceSubscriber subscriber =
getResourceSubscriberMap(ResourceType.D2_URI_MAP).get(uriId.getClusterResourceName());
WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(ResourceType.D2_URI_MAP);
if (subscriber == null && wildcardSubscriber == null)
D2UriSubscriber d2UriSubscriber = getD2UriSubscribers().get(resourceName);
if (subscriber == null && wildcardSubscriber == null && d2UriSubscriber == null)
{
String msg = String.format("Ignoring D2URI resource update for untracked cluster: %s", resourceName);
_log.warn(msg);
errors.add(msg);
return;
}

XdsD2.D2URI uri;
if (resource != null)
{
try
{
uri = resource.getResource().unpack(XdsD2.D2URI.class);
}
catch (InvalidProtocolBufferException e)
{
_log.warn("Failed to unpack D2URI", e);
errors.add("Failed to unpack D2URI");
return;
}
}
else
{
uri = null;
}


if (d2UriSubscriber != null)
{
d2UriSubscriber.onData(uri, _serverMetricsProvider);
}

if (subscriber == null && wildcardSubscriber == null)
{
return;
}

// Get or create a new D2URIMapUpdate which is a copy of the existing data for that cluster.
D2URIMapUpdate update = updates.computeIfAbsent(uriId.getClusterResourceName(), k ->
{
Expand All @@ -461,8 +523,8 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data)
}
});

// If the resource is null, it's being deleted
if (resource == null)
// If the uri is null, it's being deleted
if (uri == null)
{
// This is the special case where the entire collection is being deleted. This either means the client
// subscribed to a cluster that does not exist, or all hosts stopped announcing to the cluster.
Expand All @@ -478,16 +540,7 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data)
}
else
{
try
{
XdsD2.D2URI uri = resource.getResource().unpack(XdsD2.D2URI.class);
update.putUri(uriId.getUriName(), uri);
}
catch (InvalidProtocolBufferException e)
{
_log.warn("Failed to unpack D2URI", e);
errors.add("Failed to unpack D2URI");
}
update.putUri(uriId.getUriName(), uri);
}
});
sendAckOrNack(data.getResourceType(), data.getNonce(), errors);
Expand Down Expand Up @@ -565,6 +618,10 @@ private void notifyStreamError(Status error)
{
wildcardResourceSubscriber.onError(error);
}
for (D2UriSubscriber uriSubscriber : _d2UriSubscribers.values())
{
uriSubscriber.onError(error);
}
_xdsClientJmx.setIsConnected(false);
}

Expand All @@ -581,6 +638,10 @@ private void notifyStreamReconnect()
{
wildcardResourceSubscriber.onReconnect();
}
for (D2UriSubscriber uriSubscriber : _d2UriSubscribers.values())
{
uriSubscriber.onReconnect();
}
_xdsClientJmx.setIsConnected(true);
}

Expand All @@ -596,6 +657,12 @@ WildcardResourceSubscriber getWildcardResourceSubscriber(ResourceType type)
return _wildcardSubscribers.get(type);
}

@VisibleForTesting
Map<String, D2UriSubscriber> getD2UriSubscribers()
{
return _d2UriSubscribers;
}

static class ResourceSubscriber
{
private final ResourceType _type;
Expand Down Expand Up @@ -858,6 +925,75 @@ void onRemoval(String resourceName)
}
}

static class D2UriSubscriber
{
private final Set<D2UriResourceWatcher> _watchers = new HashSet<>();
private XdsD2.D2URI _d2Uri;
private final String _name;

D2UriSubscriber(String name)
{
_name = name;
}


@VisibleForTesting
public XdsD2.D2URI getData()
{
return _d2Uri;
}

@VisibleForTesting
public void setData(XdsD2.D2URI d2Uri)
{
_d2Uri = d2Uri;
}

void addWatcher(D2UriResourceWatcher watcher)
{
_watchers.add(watcher);
watcher.onChanged(_d2Uri);
_log.debug("Notifying watcher of current data for D2URI {}: {}", _name, _d2Uri);
}

private void onData(XdsD2.D2URI d2Uri, XdsServerMetricsProvider metricsProvider)
{
if (Objects.equals(_d2Uri, d2Uri))
{
_log.debug("Received resource update data equal to the current data for {}. Will not perform the update.",
_name);
return;
}
if (d2Uri == null)
{
for (D2UriResourceWatcher watcher : _watchers)
{
watcher.onDelete();
}
}
else
{
metricsProvider.trackLatency(System.currentTimeMillis() - d2Uri.getModifiedTime().getSeconds() * 1000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also need to update _d2Uri and notify the watchers.

}
}

private void onError(Status error)
{
for (D2UriResourceWatcher watcher : _watchers)
{
watcher.onError(error);
}
}

private void onReconnect()
{
for (D2UriResourceWatcher watcher : _watchers)
{
watcher.onReconnect();
}
}
}

final class RpcRetryTask implements Runnable
{
@Override
Expand Down Expand Up @@ -890,6 +1026,10 @@ public void run()
}
_adsStream.sendDiscoveryRequest(rewrittenType, resources);
}
if (!_d2UriSubscribers.isEmpty())
{
_adsStream.sendDiscoveryRequest(ResourceType.D2_URI, _d2UriSubscribers.keySet());
}
}
}

Expand Down
Loading