-
Notifications
You must be signed in to change notification settings - Fork 547
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow subscribing to a single resource #1037
Open
PapaCharlie
wants to merge
1
commit into
master
Choose a base branch
from
pc/single
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -83,6 +83,7 @@ public class XdsClientImpl extends XdsClient | |
Stream.of(ResourceType.NODE, ResourceType.D2_URI_MAP) | ||
.collect(Collectors.toMap(Function.identity(), e -> new HashMap<>()))); | ||
private final Map<ResourceType, WildcardResourceSubscriber> _wildcardSubscribers = Maps.newEnumMap(ResourceType.class); | ||
private final Map<String, D2UriSubscriber> _d2UriSubscribers = new HashMap<>(); | ||
private final Node _node; | ||
private final ManagedChannel _managedChannel; | ||
private final ScheduledExecutorService _executorService; | ||
|
@@ -206,11 +207,41 @@ public void watchAllXdsResources(WildcardResourceWatcher watcher) | |
}); | ||
} | ||
|
||
@Override | ||
public void watchD2Uri(String cluster, String uri, D2UriResourceWatcher watcher) | ||
{ | ||
_executorService.execute(() -> | ||
{ | ||
String urn = GlobCollectionUtils.globCollectionUrn(cluster, uri); | ||
D2UriSubscriber subscriber = getD2UriSubscribers().get(urn); | ||
if (subscriber == null) | ||
{ | ||
subscriber = new D2UriSubscriber(urn); | ||
getD2UriSubscribers().put(urn, subscriber); | ||
|
||
_log.info("Subscribing to D2URI: {}", urn); | ||
|
||
if (_adsStream == null && !isInBackoff()) | ||
{ | ||
startRpcStreamLocal(); | ||
} | ||
if (_adsStream != null) | ||
{ | ||
_adsStream.sendDiscoveryRequest(ResourceType.D2_URI, Collections.singletonList(urn)); | ||
} | ||
} | ||
|
||
subscriber.addWatcher(watcher); | ||
}); | ||
} | ||
|
||
@Override | ||
public void startRpcStream() | ||
{ | ||
_executorService.execute(() -> { | ||
if (!isInBackoff()) { | ||
_executorService.execute(() -> | ||
{ | ||
if (!isInBackoff()) | ||
{ | ||
try | ||
{ | ||
startRpcStreamLocal(); | ||
|
@@ -429,14 +460,45 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data) | |
ResourceSubscriber subscriber = | ||
getResourceSubscriberMap(ResourceType.D2_URI_MAP).get(uriId.getClusterResourceName()); | ||
WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(ResourceType.D2_URI_MAP); | ||
if (subscriber == null && wildcardSubscriber == null) | ||
D2UriSubscriber d2UriSubscriber = getD2UriSubscribers().get(resourceName); | ||
if (subscriber == null && wildcardSubscriber == null && d2UriSubscriber == null) | ||
{ | ||
String msg = String.format("Ignoring D2URI resource update for untracked cluster: %s", resourceName); | ||
_log.warn(msg); | ||
errors.add(msg); | ||
return; | ||
} | ||
|
||
XdsD2.D2URI uri; | ||
if (resource != null) | ||
{ | ||
try | ||
{ | ||
uri = resource.getResource().unpack(XdsD2.D2URI.class); | ||
} | ||
catch (InvalidProtocolBufferException e) | ||
{ | ||
_log.warn("Failed to unpack D2URI", e); | ||
errors.add("Failed to unpack D2URI"); | ||
return; | ||
} | ||
} | ||
else | ||
{ | ||
uri = null; | ||
} | ||
|
||
|
||
if (d2UriSubscriber != null) | ||
{ | ||
d2UriSubscriber.onData(uri, _serverMetricsProvider); | ||
} | ||
|
||
if (subscriber == null && wildcardSubscriber == null) | ||
{ | ||
return; | ||
} | ||
|
||
// Get or create a new D2URIMapUpdate which is a copy of the existing data for that cluster. | ||
D2URIMapUpdate update = updates.computeIfAbsent(uriId.getClusterResourceName(), k -> | ||
{ | ||
|
@@ -461,8 +523,8 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data) | |
} | ||
}); | ||
|
||
// If the resource is null, it's being deleted | ||
if (resource == null) | ||
// If the uri is null, it's being deleted | ||
if (uri == null) | ||
{ | ||
// This is the special case where the entire collection is being deleted. This either means the client | ||
// subscribed to a cluster that does not exist, or all hosts stopped announcing to the cluster. | ||
|
@@ -478,16 +540,7 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data) | |
} | ||
else | ||
{ | ||
try | ||
{ | ||
XdsD2.D2URI uri = resource.getResource().unpack(XdsD2.D2URI.class); | ||
update.putUri(uriId.getUriName(), uri); | ||
} | ||
catch (InvalidProtocolBufferException e) | ||
{ | ||
_log.warn("Failed to unpack D2URI", e); | ||
errors.add("Failed to unpack D2URI"); | ||
} | ||
update.putUri(uriId.getUriName(), uri); | ||
} | ||
}); | ||
sendAckOrNack(data.getResourceType(), data.getNonce(), errors); | ||
|
@@ -565,6 +618,10 @@ private void notifyStreamError(Status error) | |
{ | ||
wildcardResourceSubscriber.onError(error); | ||
} | ||
for (D2UriSubscriber uriSubscriber : _d2UriSubscribers.values()) | ||
{ | ||
uriSubscriber.onError(error); | ||
} | ||
_xdsClientJmx.setIsConnected(false); | ||
} | ||
|
||
|
@@ -581,6 +638,10 @@ private void notifyStreamReconnect() | |
{ | ||
wildcardResourceSubscriber.onReconnect(); | ||
} | ||
for (D2UriSubscriber uriSubscriber : _d2UriSubscribers.values()) | ||
{ | ||
uriSubscriber.onReconnect(); | ||
} | ||
_xdsClientJmx.setIsConnected(true); | ||
} | ||
|
||
|
@@ -596,6 +657,12 @@ WildcardResourceSubscriber getWildcardResourceSubscriber(ResourceType type) | |
return _wildcardSubscribers.get(type); | ||
} | ||
|
||
@VisibleForTesting | ||
Map<String, D2UriSubscriber> getD2UriSubscribers() | ||
{ | ||
return _d2UriSubscribers; | ||
} | ||
|
||
static class ResourceSubscriber | ||
{ | ||
private final ResourceType _type; | ||
|
@@ -858,6 +925,75 @@ void onRemoval(String resourceName) | |
} | ||
} | ||
|
||
static class D2UriSubscriber | ||
{ | ||
private final Set<D2UriResourceWatcher> _watchers = new HashSet<>(); | ||
private XdsD2.D2URI _d2Uri; | ||
private final String _name; | ||
|
||
D2UriSubscriber(String name) | ||
{ | ||
_name = name; | ||
} | ||
|
||
|
||
@VisibleForTesting | ||
public XdsD2.D2URI getData() | ||
{ | ||
return _d2Uri; | ||
} | ||
|
||
@VisibleForTesting | ||
public void setData(XdsD2.D2URI d2Uri) | ||
{ | ||
_d2Uri = d2Uri; | ||
} | ||
|
||
void addWatcher(D2UriResourceWatcher watcher) | ||
{ | ||
_watchers.add(watcher); | ||
watcher.onChanged(_d2Uri); | ||
_log.debug("Notifying watcher of current data for D2URI {}: {}", _name, _d2Uri); | ||
} | ||
|
||
private void onData(XdsD2.D2URI d2Uri, XdsServerMetricsProvider metricsProvider) | ||
{ | ||
if (Objects.equals(_d2Uri, d2Uri)) | ||
{ | ||
_log.debug("Received resource update data equal to the current data for {}. Will not perform the update.", | ||
_name); | ||
return; | ||
} | ||
if (d2Uri == null) | ||
{ | ||
for (D2UriResourceWatcher watcher : _watchers) | ||
{ | ||
watcher.onDelete(); | ||
} | ||
} | ||
else | ||
{ | ||
metricsProvider.trackLatency(System.currentTimeMillis() - d2Uri.getModifiedTime().getSeconds() * 1000); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also need to update _d2Uri and notify the watchers. |
||
} | ||
} | ||
|
||
private void onError(Status error) | ||
{ | ||
for (D2UriResourceWatcher watcher : _watchers) | ||
{ | ||
watcher.onError(error); | ||
} | ||
} | ||
|
||
private void onReconnect() | ||
{ | ||
for (D2UriResourceWatcher watcher : _watchers) | ||
{ | ||
watcher.onReconnect(); | ||
} | ||
} | ||
} | ||
|
||
final class RpcRetryTask implements Runnable | ||
{ | ||
@Override | ||
|
@@ -890,6 +1026,10 @@ public void run() | |
} | ||
_adsStream.sendDiscoveryRequest(rewrittenType, resources); | ||
} | ||
if (!_d2UriSubscribers.isEmpty()) | ||
{ | ||
_adsStream.sendDiscoveryRequest(ResourceType.D2_URI, _d2UriSubscribers.keySet()); | ||
} | ||
} | ||
} | ||
|
||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should make this extends ResourceWatcher for better code structure, so that: