diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ec7c22d2..f8f64da29 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,10 @@ Lists all changes with user impact. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). +## [0.20.15] +### Changed +- Updated java-control-plane to 1.0.45 - remove custom SimpleCache modification in favor of java-control-plane implementation + ## [0.20.14] ### Changed - Added test to check circuit breaker metric value diff --git a/build.gradle b/build.gradle index a69874cfb..c8a49dac1 100644 --- a/build.gradle +++ b/build.gradle @@ -48,7 +48,7 @@ allprojects { apply plugin: 'io.spring.dependency-management' project.ext.versions = [ - java_controlplane : '1.0.37', + java_controlplane : '1.0.45', spring_boot : '3.1.2', grpc : '1.48.1', ecwid_consul : '1.4.1', diff --git a/envoy-control-core/src/main/java/pl/allegro/tech/servicemesh/envoycontrol/SimpleCache.java b/envoy-control-core/src/main/java/pl/allegro/tech/servicemesh/envoycontrol/SimpleCache.java deleted file mode 100644 index 0dbabcc33..000000000 --- a/envoy-control-core/src/main/java/pl/allegro/tech/servicemesh/envoycontrol/SimpleCache.java +++ /dev/null @@ -1,621 +0,0 @@ -package pl.allegro.tech.servicemesh.envoycontrol; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; -import com.google.protobuf.Message; -import io.envoyproxy.controlplane.cache.*; -import io.envoyproxy.controlplane.cache.GroupCacheStatusInfo; -import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.concurrent.GuardedBy; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Consumer; -import java.util.stream.Collectors; -import java.util.function.Function; -import java.util.stream.Stream; - -import static io.envoyproxy.controlplane.cache.Resources.RESOURCE_TYPES_IN_ORDER; - -/** - * This class is copy of {@link io.envoyproxy.controlplane.cache.SimpleCache} - */ -public class SimpleCache implements SnapshotCache { - - private static final Logger LOGGER = LoggerFactory.getLogger(SimpleCache.class); - - private final NodeGroup groups; - private final boolean shouldSendMissingEndpoints; - - private final ReadWriteLock lock = new ReentrantReadWriteLock(); - private final Lock readLock = lock.readLock(); - private final Lock writeLock = lock.writeLock(); - - @GuardedBy("lock") - private final Map snapshots = new HashMap<>(); - private final CacheStatusInfoAggregator statuses = new CacheStatusInfoAggregator<>(); - - private AtomicLong watchCount = new AtomicLong(); - - /** - * Constructs a simple cache. - * - * @param groups maps an envoy host to a node group - * @param shouldSendMissingEndpoints if set to true it will respond with empty endpoints if there is no in snapshot - */ - public SimpleCache(NodeGroup groups, boolean shouldSendMissingEndpoints) { - this.groups = groups; - this.shouldSendMissingEndpoints = shouldSendMissingEndpoints; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean clearSnapshot(T group) { - // we take a writeLock to prevent watches from being created - writeLock.lock(); - try { - - // If we don't know about this group, do nothing. - if (statuses.hasStatuses(group)) { - LOGGER.warn("tried to clear snapshot for group with existing watches, group={}", group); - - return false; - } - - statuses.remove(group); - snapshots.remove(group); - - return true; - } finally { - writeLock.unlock(); - } - } - - public Watch createWatch( - boolean ads, - XdsRequest request, - Set knownResourceNames, - Consumer responseConsumer) { - return createWatch(ads, request, knownResourceNames, responseConsumer, false); - } - - /** - * {@inheritDoc} - */ - @Override - public Watch createWatch( - boolean ads, - XdsRequest request, - Set knownResourceNames, - Consumer responseConsumer, - boolean hasClusterChanged) { - Resources.ResourceType requestResourceType = request.getResourceType(); - Preconditions.checkNotNull(requestResourceType, "unsupported type URL %s", - request.getTypeUrl()); - T group; - group = groups.hash(request.v3Request().getNode()); - - // even though we're modifying, we take a readLock to allow multiple watches to be created in parallel since it - // doesn't conflict - readLock.lock(); - try { - CacheStatusInfo status = statuses.getOrAddStatusInfo(group, requestResourceType); - status.setLastWatchRequestTime(System.currentTimeMillis()); - - U snapshot = snapshots.get(group); - String version = snapshot == null ? "" : snapshot.version(requestResourceType, request.getResourceNamesList()); - - Watch watch = new Watch(ads, request, responseConsumer); - - if (snapshot != null) { - Set requestedResources = ImmutableSet.copyOf(request.getResourceNamesList()); - - // If the request is asking for resources we haven't sent to the proxy yet, see if we have additional resources. - if (!knownResourceNames.equals(requestedResources)) { - Sets.SetView newResourceHints = Sets.difference(requestedResources, knownResourceNames); - - // If any of the newly requested resources are in the snapshot respond immediately. If not we'll fall back to - // version comparisons. - if (snapshot.resources(requestResourceType) - .keySet() - .stream() - .anyMatch(newResourceHints::contains)) { - respond(watch, snapshot, group); - - return watch; - } - } else if (hasClusterChanged && requestResourceType.equals(Resources.ResourceType.ENDPOINT)) { - respond(watch, snapshot, group); - - return watch; - } - } - - // If the requested version is up-to-date or missing a response, leave an open watch. - if (snapshot == null || request.getVersionInfo().equals(version)) { - openWatch(status, watch, request.getTypeUrl(), request.getResourceNamesList(), group, request.getVersionInfo()); - - return watch; - } - - // Otherwise, the watch may be responded immediately - boolean responded = respond(watch, snapshot, group); - - if (!responded) { - openWatch(status, watch, request.getTypeUrl(), request.getResourceNamesList(), group, request.getVersionInfo()); - } - - return watch; - } finally { - readLock.unlock(); - } - } - - /** - * {@inheritDoc} - */ - @Override - public DeltaWatch createDeltaWatch( - DeltaXdsRequest request, - String requesterVersion, - Map resourceVersions, - Set pendingResources, - boolean isWildcard, - Consumer responseConsumer, - boolean hasClusterChanged) { - - Resources.ResourceType requestResourceType = request.getResourceType(); - Preconditions.checkNotNull(requestResourceType, "unsupported type URL %s", - request.getTypeUrl()); - T group; - group = groups.hash(request.v3Request().getNode()); - - // even though we're modifying, we take a readLock to allow multiple watches to be created in parallel since it - // doesn't conflict - readLock.lock(); - try { - DeltaCacheStatusInfo status = statuses.getOrAddDeltaStatusInfo(group, requestResourceType); - - status.setLastWatchRequestTime(System.currentTimeMillis()); - - U snapshot = snapshots.get(group); - String version = snapshot == null ? "" : snapshot.version(requestResourceType, Collections.emptyList()); - - DeltaWatch watch = new DeltaWatch(request, - ImmutableMap.copyOf(resourceVersions), - ImmutableSet.copyOf(pendingResources), - requesterVersion, - isWildcard, - responseConsumer); - - // If no snapshot, leave an open watch. - - if (snapshot == null) { - openWatch(status, watch, request.getTypeUrl(), watch.trackedResources().keySet(), group, requesterVersion); - return watch; - } - - // If the requested version is up-to-date or missing a response, leave an open watch. - if (version.equals(requesterVersion)) { - // If the request is not wildcard, we have pending resources and we have them, we should respond immediately. - if (!isWildcard && watch.pendingResources().size() != 0) { - // If any of the pending resources are in the snapshot respond immediately. If not we'll fall back to - // version comparisons. - Map> resources = snapshot.versionedResources(request.getResourceType()); - Map> requestedResources = watch.pendingResources() - .stream() - .filter(resources::containsKey) - .collect(Collectors.toMap(Function.identity(), resources::get)); - ResponseState responseState = respondDelta(watch, - requestedResources, - Collections.emptyList(), - version, - group); - if (responseState.isFinished()) { - return watch; - } - } else if (hasClusterChanged && requestResourceType.equals(Resources.ResourceType.ENDPOINT)) { - ResponseState responseState = respondDelta(request, watch, snapshot, version, group); - if (responseState.isFinished()) { - return watch; - } - } - - openWatch(status, watch, request.getTypeUrl(), watch.trackedResources().keySet(), group, requesterVersion); - - return watch; - } - - // Otherwise, version is different, the watch may be responded immediately - ResponseState responseState = respondDelta(request, watch, snapshot, version, group); - - if (responseState.isFinished()) { - return watch; - } - - openWatch(status, watch, request.getTypeUrl(), watch.trackedResources().keySet(), group, requesterVersion); - - return watch; - } finally { - readLock.unlock(); - } - } - - private > void openWatch(MutableStatusInfo status, - V watch, - String url, - Collection resources, - T group, - String version) { - long watchId = watchCount.incrementAndGet(); - status.setWatch(watchId, watch); - watch.setStop(() -> status.removeWatch(watchId)); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("open watch {} for {}[{}] from node {} for version {}", - watchId, - url, - String.join(", ", resources), - group, - version); - } - } - - /** - * {@inheritDoc} - */ - @Override - public U getSnapshot(T group) { - readLock.lock(); - - try { - return snapshots.get(group); - } finally { - readLock.unlock(); - } - } - - /** - * {@inheritDoc} - */ - @Override - public Collection groups() { - return ImmutableSet.copyOf(statuses.groups()); - } - - /** - * {@inheritDoc} - * - *

This method cannot be called concurrently for the same group. - * It can be called concurrently for different groups. - */ - @Override - public void setSnapshot(T group, U snapshot) { - // we take a writeLock to prevent watches from being created while we update the snapshot - Map> status; - Map> deltaStatus; - U previousSnapshot; - writeLock.lock(); - try { - // Update the existing snapshot entry. - previousSnapshot = snapshots.put(group, snapshot); - status = statuses.getStatus(group); - deltaStatus = statuses.getDeltaStatus(group); - } finally { - writeLock.unlock(); - } - - if (status.isEmpty() && deltaStatus.isEmpty()) { - return; - } - - // Responses should be in specific order and typeUrls has a list of resources in the right - // order. - respondWithSpecificOrder(group, previousSnapshot, snapshot, status, deltaStatus); - } - - /** - * {@inheritDoc} - */ - @Override - public StatusInfo statusInfo(T group) { - readLock.lock(); - - try { - Map> statusMap = statuses.getStatus(group); - Map> deltaStatusMap = statuses.getDeltaStatus(group); - - if (statusMap.isEmpty() && deltaStatusMap.isEmpty()) { - return null; - } - - List> collection = Stream.concat(statusMap.values().stream(), - deltaStatusMap.values().stream()).collect(Collectors.toList()); - - return new GroupCacheStatusInfo<>(collection); - } finally { - readLock.unlock(); - } - } - - @VisibleForTesting - protected void respondWithSpecificOrder(T group, - U previousSnapshot, U snapshot, - Map> statusMap, - Map> deltaStatusMap) { - for (Resources.ResourceType resourceType : RESOURCE_TYPES_IN_ORDER) { - CacheStatusInfo status = statusMap.get(resourceType); - if (status != null) { - status.watchesRemoveIf((id, watch) -> { - if (!watch.request().getResourceType().equals(resourceType)) { - return false; - } - String version = snapshot.version(watch.request().getResourceType(), watch.request().getResourceNamesList()); - - if (!watch.request().getVersionInfo().equals(version)) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("responding to open watch {}[{}] with new version {}", - id, - String.join(", ", watch.request().getResourceNamesList()), - version); - } - - respond(watch, snapshot, group); - - // Discard the watch. A new watch will be created for future snapshots once envoy ACKs the response. - return true; - } - - // Do not discard the watch. The request version is the same as the snapshot version, so we wait to respond. - return false; - }); - } - DeltaCacheStatusInfo deltaStatus = deltaStatusMap.get(resourceType); - if (deltaStatus != null) { - Map> previousResources = previousSnapshot == null - ? Collections.emptyMap() - : previousSnapshot.versionedResources(resourceType); - Map> snapshotResources = snapshot.versionedResources(resourceType); - - Map> snapshotChangedResources = snapshotResources.entrySet() - .stream() - .filter(entry -> { - VersionedResource versionedResource = previousResources.get(entry.getKey()); - return versionedResource == null || !versionedResource - .version().equals(entry.getValue().version()); - }) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - Set snapshotRemovedResources = previousResources.keySet() - .stream() - .filter(s -> !snapshotResources.containsKey(s)) - .collect(Collectors.toSet()); - - deltaStatus.watchesRemoveIf((id, watch) -> { - String version = snapshot.version(watch.request().getResourceType(), Collections.emptyList()); - - if (!watch.version().equals(version)) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("responding to open watch {}[{}] with new version {}", - id, - String.join(", ", watch.trackedResources().keySet()), - version); - } - - List removedResources = snapshotRemovedResources.stream() - .filter(s -> watch.trackedResources().get(s) != null) - .collect(Collectors.toList()); - - Map> changedResources = findChangedResources(watch, snapshotChangedResources); - - ResponseState responseState = respondDelta(watch, - changedResources, - removedResources, - version, - group); - // Discard the watch if it was responded or cancelled. - // A new watch will be created for future snapshots once envoy ACKs the response. - return responseState.isFinished(); - } - - // Do not discard the watch. The request version is the same as the snapshot version, so we wait to respond. - return false; - }); - } - } - } - - private Response createResponse(XdsRequest request, Map> resources, - String version) { - Collection filtered = request.getResourceNamesList().isEmpty() - ? resources.values().stream() - .map(VersionedResource::resource) - .collect(Collectors.toList()) - : request.getResourceNamesList().stream() - .map(resources::get) - .filter(Objects::nonNull) - .map(VersionedResource::resource) - .collect(Collectors.toList()); - - return Response.create(request, filtered, version); - } - - private boolean respond(Watch watch, U snapshot, T group) { - Map> snapshotResources = snapshot.versionedResources(watch.request().getResourceType()); - Map> snapshotForMissingResources = Collections.emptyMap(); - - if (!watch.request().getResourceNamesList().isEmpty() && watch.ads()) { - Collection missingNames = watch.request().getResourceNamesList().stream() - .filter(name -> !snapshotResources.containsKey(name)) - .collect(Collectors.toList()); - - if (!missingNames.isEmpty()) { - // In some cases Envoy might send EDS request with cluster names we don't have in snapshot. - // This may happen when for example Envoy disconnects from an instance of control-plane and connects to - // other instance. - // - // If shouldSendMissingEndpoints is set to false we will not respond to such request. It may cause - // Envoy to stop working correctly, because it will wait indefinitely for a response, - // not accepting any other updates. - // - // If shouldSendMissingEndpoints is set to true, we will respond to such request anyway, to prevent - // such problems with Envoy. - if (shouldSendMissingEndpoints - && watch.request().getResourceType().equals(Resources.ResourceType.ENDPOINT)) { - LOGGER.info("adding missing resources [{}] to response for {} in ADS mode from node {} at version {}", - String.join(", ", missingNames), - watch.request().getTypeUrl(), - group, - snapshot.version(watch.request().getResourceType(), watch.request().getResourceNamesList()) - ); - snapshotForMissingResources = new HashMap<>(missingNames.size()); - for (String missingName : missingNames) { - snapshotForMissingResources.put( - missingName, - VersionedResource.create(ClusterLoadAssignment.newBuilder().setClusterName(missingName).build()) - ); - } - } else { - LOGGER.info( - "not responding in ADS mode for {} from node {} at version {} for request [{}] since [{}] not in snapshot", - watch.request().getTypeUrl(), - group, - snapshot.version(watch.request().getResourceType(), watch.request().getResourceNamesList()), - String.join(", ", watch.request().getResourceNamesList()), - String.join(", ", missingNames)); - - return false; - } - } - } - - String version = snapshot.version(watch.request().getResourceType(), watch.request().getResourceNamesList()); - - LOGGER.debug("responding for {} from node {} at version {} with version {}", - watch.request().getTypeUrl(), - group, - watch.request().getVersionInfo(), - version); - Response response; - if (!snapshotForMissingResources.isEmpty()) { - snapshotForMissingResources.putAll(snapshotResources); - response = createResponse( - watch.request(), - snapshotForMissingResources, - version); - } else { - response = createResponse( - watch.request(), - snapshotResources, - version); - } - - try { - watch.respond(response); - return true; - } catch (WatchCancelledException e) { - LOGGER.error( - "failed to respond for {} from node {} at version {} with version {} because watch was already cancelled", - watch.request().getTypeUrl(), - group, - watch.request().getVersionInfo(), - version); - } - - return false; - } - - private List findRemovedResources(DeltaWatch watch, Map> snapshotResources) { - // remove resources for which client has a tracked version but do not exist in snapshot - return watch.trackedResources().keySet() - .stream() - .filter(s -> !snapshotResources.containsKey(s)) - .collect(Collectors.toList()); - } - - private Map> findChangedResources(DeltaWatch watch, - Map> snapshotResources) { - return snapshotResources.entrySet() - .stream() - .filter(entry -> { - if (watch.pendingResources().contains(entry.getKey())) { - return true; - } - String resourceVersion = watch.trackedResources().get(entry.getKey()); - if (resourceVersion == null) { - // resource is not tracked, should respond it only if watch is wildcard - return watch.isWildcard(); - } - return !entry.getValue().version().equals(resourceVersion); - }) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } - - private ResponseState respondDelta(DeltaXdsRequest request, DeltaWatch watch, U snapshot, String version, T group) { - Map> snapshotResources = snapshot.versionedResources(request.getResourceType()); - List removedResources = findRemovedResources(watch, - snapshotResources); - Map> changedResources = findChangedResources(watch, snapshotResources); - return respondDelta(watch, - changedResources, - removedResources, - version, - group); - } - - private ResponseState respondDelta(DeltaWatch watch, - Map> resources, - List removedResources, - String version, - T group) { - if (resources.isEmpty() && removedResources.isEmpty()) { - return ResponseState.UNRESPONDED; - } - - DeltaResponse response = DeltaResponse.create( - watch.request(), - resources, - removedResources, - version); - - try { - watch.respond(response); - return ResponseState.RESPONDED; - } catch (WatchCancelledException e) { - LOGGER.error( - "failed to respond for {} from node {} with version {} because watch was already cancelled", - watch.request().getTypeUrl(), - group, - version); - } - - return ResponseState.CANCELLED; - } - - private enum ResponseState { - RESPONDED, - UNRESPONDED, - CANCELLED; - - private boolean isFinished() { - return this.equals(RESPONDED) || this.equals(CANCELLED); - } - } -} diff --git a/envoy-control-core/src/main/java/pl/allegro/tech/servicemesh/envoycontrol/v3/SimpleCache.java b/envoy-control-core/src/main/java/pl/allegro/tech/servicemesh/envoycontrol/v3/SimpleCache.java deleted file mode 100644 index 46e3a62e3..000000000 --- a/envoy-control-core/src/main/java/pl/allegro/tech/servicemesh/envoycontrol/v3/SimpleCache.java +++ /dev/null @@ -1,10 +0,0 @@ -package pl.allegro.tech.servicemesh.envoycontrol.v3; - -import io.envoyproxy.controlplane.cache.NodeGroup; -import io.envoyproxy.controlplane.cache.v3.Snapshot; - -public class SimpleCache extends pl.allegro.tech.servicemesh.envoycontrol.SimpleCache { - public SimpleCache(NodeGroup nodeGroup, Boolean shouldSendMissingEndpoints) { - super(nodeGroup, shouldSendMissingEndpoints); - } -} diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt index d03592962..0367afcdd 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt @@ -2,6 +2,7 @@ package pl.allegro.tech.servicemesh.envoycontrol import io.envoyproxy.controlplane.cache.NodeGroup import io.envoyproxy.controlplane.cache.SnapshotCache +import io.envoyproxy.controlplane.cache.v3.SimpleCache import io.envoyproxy.controlplane.cache.v3.Snapshot import io.envoyproxy.controlplane.server.DefaultExecutorGroup import io.envoyproxy.controlplane.server.ExecutorGroup @@ -39,7 +40,6 @@ import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.Service import pl.allegro.tech.servicemesh.envoycontrol.utils.DirectScheduler import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelScheduler import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelizableScheduler -import pl.allegro.tech.servicemesh.envoycontrol.v3.SimpleCache import reactor.core.Disposable import reactor.core.publisher.Flux import reactor.core.scheduler.Schedulers @@ -142,7 +142,7 @@ class ControlPlane private constructor( val groupSnapshotProperties = properties.server.groupSnapshotUpdateScheduler val groupSnapshotScheduler = buildGroupSnapshotScheduler(groupSnapshotProperties) - val cache = SimpleCache(nodeGroup, properties.envoy.snapshot.shouldSendMissingEndpoints) + val cache = SimpleCache(nodeGroup) val groupChangeWatcher = GroupChangeWatcher(cache, metrics, meterRegistry) val meteredConnectionsCallbacks = MetricsDiscoveryServerCallbacks(meterRegistry) val loggingDiscoveryServerCallbacks = LoggingDiscoveryServerCallbacks( diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt index e95c897b5..d85757b06 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt @@ -7,6 +7,7 @@ import io.envoyproxy.controlplane.cache.DeltaXdsRequest import io.envoyproxy.controlplane.cache.Response import io.envoyproxy.controlplane.cache.Watch import io.envoyproxy.controlplane.cache.XdsRequest +import io.envoyproxy.controlplane.cache.v3.SimpleCache import io.micrometer.core.instrument.MeterRegistry import pl.allegro.tech.servicemesh.envoycontrol.EnvoyControlMetrics import pl.allegro.tech.servicemesh.envoycontrol.logger @@ -14,7 +15,6 @@ import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer import reactor.core.publisher.Flux import reactor.core.publisher.FluxSink import java.util.function.Consumer -import pl.allegro.tech.servicemesh.envoycontrol.v3.SimpleCache as SimpleCache /** * This class is needed to force snapshot creation in SnapshotUpdater when new group is added. @@ -47,11 +47,19 @@ internal class GroupChangeWatcher( request: XdsRequest, knownResourceNames: MutableSet, responseConsumer: Consumer, - hasClusterChanged: Boolean + hasClusterChanged: Boolean, + allowDefaultEmptyEdsUpdate: Boolean ): Watch { val oldGroups = cache.groups() - val watch = cache.createWatch(ads, request, knownResourceNames, responseConsumer, hasClusterChanged) + val watch = cache.createWatch( + ads, + request, + knownResourceNames, + responseConsumer, + hasClusterChanged, + allowDefaultEmptyEdsUpdate + ) val groups = cache.groups() metrics.setCacheGroupsCount(groups.size) if (oldGroups != groups) { diff --git a/envoy-control-core/src/test/java/pl/allegro/tech/servicemesh/envoycontrol/v3/SimpleCacheTest.java b/envoy-control-core/src/test/java/pl/allegro/tech/servicemesh/envoycontrol/v3/SimpleCacheTest.java deleted file mode 100644 index 1a70af568..000000000 --- a/envoy-control-core/src/test/java/pl/allegro/tech/servicemesh/envoycontrol/v3/SimpleCacheTest.java +++ /dev/null @@ -1,575 +0,0 @@ -package pl.allegro.tech.servicemesh.envoycontrol.v3; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Sets; -import com.google.protobuf.Message; -import io.envoyproxy.controlplane.cache.NodeGroup; -import io.envoyproxy.controlplane.cache.Resources; -import io.envoyproxy.controlplane.cache.Response; -import io.envoyproxy.controlplane.cache.StatusInfo; -import io.envoyproxy.controlplane.cache.VersionedResource; -import io.envoyproxy.controlplane.cache.Watch; -import io.envoyproxy.controlplane.cache.XdsRequest; -import io.envoyproxy.controlplane.cache.v3.Snapshot; -import io.envoyproxy.envoy.config.cluster.v3.Cluster; -import io.envoyproxy.envoy.config.core.v3.Node; -import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; -import io.envoyproxy.envoy.config.listener.v3.Listener; -import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; -import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.Secret; -import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; -import org.junit.Test; - -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; -import java.util.function.Consumer; -import java.util.stream.Collectors; - -import static io.envoyproxy.controlplane.cache.Resources.V3.CLUSTER_TYPE_URL; -import static io.envoyproxy.controlplane.cache.Resources.V3.ROUTE_TYPE_URL; -import static org.assertj.core.api.Assertions.assertThat; - -/** - * This class is copy of {@link io.envoyproxy.controlplane.cache.v3.SimpleCacheTest} - */ -public class SimpleCacheTest { - - private static final boolean ADS = ThreadLocalRandom.current().nextBoolean(); - protected static final String CLUSTER_NAME = "cluster0"; - private static final String LISTENER_NAME = "listener0"; - private static final String ROUTE_NAME = "route0"; - private static final String SECRET_NAME = "secret0"; - - private static final String VERSION1 = UUID.randomUUID().toString(); - protected static final String VERSION2 = UUID.randomUUID().toString(); - - private static final Snapshot SNAPSHOT1 = Snapshot.create( - ImmutableList.of(Cluster.newBuilder().setName(CLUSTER_NAME).build()), - ImmutableList.of(ClusterLoadAssignment.getDefaultInstance()), - ImmutableList.of(Listener.newBuilder().setName(LISTENER_NAME).build()), - ImmutableList.of(RouteConfiguration.newBuilder().setName(ROUTE_NAME).build()), - ImmutableList.of(Secret.newBuilder().setName(SECRET_NAME).build()), - VERSION1); - - private static final Snapshot SNAPSHOT2 = Snapshot.create( - ImmutableList.of(Cluster.newBuilder().setName(CLUSTER_NAME).build()), - ImmutableList.of(ClusterLoadAssignment.getDefaultInstance()), - ImmutableList.of(Listener.newBuilder().setName(LISTENER_NAME).build()), - ImmutableList.of(RouteConfiguration.newBuilder().setName(ROUTE_NAME).build()), - ImmutableList.of(Secret.newBuilder().setName(SECRET_NAME).build()), - VERSION2); - - protected static final Snapshot MULTIPLE_RESOURCES_SNAPSHOT2 = Snapshot.create( - ImmutableList.of(Cluster.newBuilder().setName(CLUSTER_NAME).build()), - ImmutableList.of(ClusterLoadAssignment.newBuilder().setClusterName(CLUSTER_NAME).build()), - ImmutableList.of(Listener.newBuilder().setName(LISTENER_NAME).build()), - ImmutableList.of(RouteConfiguration.newBuilder().setName(ROUTE_NAME).build()), - ImmutableList.of(Secret.newBuilder().setName(SECRET_NAME).build()), - VERSION2); - - protected boolean shouldSendMissingEndpoints() { - return false; - } - - @Test - public void invalidNamesListShouldReturnWatcherWithNoResponseInAdsMode() { - SimpleCache cache = new SimpleCache<>(new SingleNodeGroup(), shouldSendMissingEndpoints()); - - cache.setSnapshot(SingleNodeGroup.GROUP, SNAPSHOT1); - - ResponseTracker responseTracker = new ResponseTracker(); - - Watch watch = cache.createWatch( - true, - XdsRequest.create(DiscoveryRequest.newBuilder() - .setNode(Node.getDefaultInstance()) - .setTypeUrl(Resources.V3.ENDPOINT_TYPE_URL) - .addResourceNames("none") - .build()), - Collections.emptySet(), - responseTracker, - false); - - assertThatWatchIsOpenWithNoResponses(new WatchAndTracker(watch, responseTracker)); - } - - @Test - public void invalidNamesListShouldReturnWatcherWithResponseInXdsMode() { - SimpleCache cache = new SimpleCache<>(new SingleNodeGroup(), shouldSendMissingEndpoints()); - - cache.setSnapshot(SingleNodeGroup.GROUP, SNAPSHOT1); - - ResponseTracker responseTracker = new ResponseTracker(); - - Watch watch = cache.createWatch( - false, - XdsRequest.create(DiscoveryRequest.newBuilder() - .setNode(Node.getDefaultInstance()) - .setTypeUrl(Resources.V3.ENDPOINT_TYPE_URL) - .addResourceNames("none") - .build()), - Collections.emptySet(), - responseTracker, - false); - - assertThat(watch.isCancelled()).isFalse(); - assertThat(responseTracker.responses).isNotEmpty(); - } - - @Test - public void successfullyWatchAllResourceTypesWithSetBeforeWatch() { - SimpleCache cache = new SimpleCache<>(new SingleNodeGroup(), shouldSendMissingEndpoints()); - - cache.setSnapshot(SingleNodeGroup.GROUP, SNAPSHOT1); - - for (String typeUrl : Resources.V3.TYPE_URLS) { - ResponseTracker responseTracker = new ResponseTracker(); - - Watch watch = cache.createWatch( - ADS, - XdsRequest.create(DiscoveryRequest.newBuilder() - .setNode(Node.getDefaultInstance()) - .setTypeUrl(typeUrl) - .addAllResourceNames(SNAPSHOT1.resources(typeUrl).keySet()) - .build()), - Collections.emptySet(), - responseTracker, - false); - - assertThat(watch.request().getTypeUrl()).isEqualTo(typeUrl); - assertThat(watch.request().getResourceNamesList()).containsExactlyElementsOf( - SNAPSHOT1.resources(typeUrl).keySet()); - - assertThatWatchReceivesSnapshot(new WatchAndTracker(watch, responseTracker), SNAPSHOT1); - } - } - - @Test - public void shouldSendEdsWhenClusterChangedButEdsVersionDidnt() { - SimpleCache cache = new SimpleCache<>(new SingleNodeGroup(), shouldSendMissingEndpoints()); - - cache.setSnapshot(SingleNodeGroup.GROUP, SNAPSHOT1); - - ResponseTracker responseTracker = new ResponseTracker(); - - Watch watch = cache.createWatch( - ADS, - XdsRequest.create(DiscoveryRequest.newBuilder() - .setNode(Node.getDefaultInstance()) - .setVersionInfo(VERSION1) - .setTypeUrl(Resources.V3.ENDPOINT_TYPE_URL) - .addAllResourceNames(SNAPSHOT1.resources(Resources.V3.ENDPOINT_TYPE_URL).keySet()) - .build()), - Sets.newHashSet(""), - responseTracker, - true); - - assertThat(watch.request().getTypeUrl()).isEqualTo(Resources.V3.ENDPOINT_TYPE_URL); - assertThat(watch.request().getResourceNamesList()).containsExactlyElementsOf( - SNAPSHOT1.resources(Resources.V3.ENDPOINT_TYPE_URL).keySet()); - - assertThatWatchReceivesSnapshot(new WatchAndTracker(watch, responseTracker), SNAPSHOT1); - } - - @Test - public void successfullyWatchAllResourceTypesWithSetAfterWatch() { - SimpleCache cache = new SimpleCache<>(new SingleNodeGroup(), shouldSendMissingEndpoints()); - - Map watches = Resources.V3.TYPE_URLS.stream() - .collect(Collectors.toMap( - typeUrl -> typeUrl, - typeUrl -> { - ResponseTracker responseTracker = new ResponseTracker(); - - Watch watch = cache.createWatch( - ADS, - XdsRequest.create(DiscoveryRequest.newBuilder() - .setNode(Node.getDefaultInstance()) - .setTypeUrl(typeUrl) - .addAllResourceNames(SNAPSHOT1.resources(typeUrl).keySet()) - .build()), - Collections.emptySet(), - responseTracker, - false); - - return new WatchAndTracker(watch, responseTracker); - })); - - cache.setSnapshot(SingleNodeGroup.GROUP, SNAPSHOT1); - - for (String typeUrl : Resources.V3.TYPE_URLS) { - assertThatWatchReceivesSnapshot(watches.get(typeUrl), SNAPSHOT1); - } - } - - @Test - public void successfullyWatchAllResourceTypesWithSetBeforeWatchWithRequestVersion() { - SimpleCache cache = new SimpleCache<>(new SingleNodeGroup(), shouldSendMissingEndpoints()); - - cache.setSnapshot(SingleNodeGroup.GROUP, SNAPSHOT1); - - ResponseOrderTracker responseOrderTracker = new ResponseOrderTracker(); - - HashMap watches = new HashMap<>(); - - for (int i = 0; i < 2; ++i) { - watches.putAll(Resources.V3.TYPE_URLS.stream() - .collect(Collectors.toMap( - typeUrl -> typeUrl, - typeUrl -> { - ResponseTracker responseTracker = new ResponseTracker(); - - Watch watch = cache.createWatch( - ADS, - XdsRequest.create(DiscoveryRequest.newBuilder() - .setNode(Node.getDefaultInstance()) - .setTypeUrl(typeUrl) - .setVersionInfo(SNAPSHOT1.version(typeUrl)) - .addAllResourceNames(SNAPSHOT1.resources(typeUrl).keySet()) - .build()), - SNAPSHOT2.resources(typeUrl).keySet(), - r -> { - responseTracker.accept(r); - responseOrderTracker.accept(r); - }, - false); - - return new WatchAndTracker(watch, responseTracker); - })) - ); - } - - // The request version matches the current snapshot version, so the watches shouldn't receive any responses. - for (String typeUrl : Resources.V3.TYPE_URLS) { - assertThatWatchIsOpenWithNoResponses(watches.get(typeUrl)); - } - - cache.setSnapshot(SingleNodeGroup.GROUP, SNAPSHOT2); - - for (String typeUrl : Resources.V3.TYPE_URLS) { - assertThatWatchReceivesSnapshot(watches.get(typeUrl), SNAPSHOT2); - } - - // Verify that CDS and LDS always get triggered before EDS and RDS respectively. - assertThat(responseOrderTracker.responseTypes).containsExactly(Resources.V3.CLUSTER_TYPE_URL, - Resources.V3.CLUSTER_TYPE_URL, Resources.V3.ENDPOINT_TYPE_URL, Resources.V3.ENDPOINT_TYPE_URL, - Resources.V3.LISTENER_TYPE_URL, Resources.V3.LISTENER_TYPE_URL, Resources.V3.ROUTE_TYPE_URL, - Resources.V3.ROUTE_TYPE_URL, Resources.V3.SECRET_TYPE_URL, Resources.V3.SECRET_TYPE_URL); - } - - @Test - public void successfullyWatchAllResourceTypesWithSetBeforeWatchWithSameRequestVersionNewResourceHints() { - SimpleCache cache = new SimpleCache<>(new SingleNodeGroup(), shouldSendMissingEndpoints()); - - cache.setSnapshot(SingleNodeGroup.GROUP, MULTIPLE_RESOURCES_SNAPSHOT2); - - // Set a watch for the current snapshot with the same version but with resource hints present - // in the snapshot that the watch creator does not currently know about. - // - // Note how we're requesting the resources from MULTIPLE_RESOURCE_SNAPSHOT2 while claiming we - // only know about the ones from SNAPSHOT2 - Map watches = Resources.V3.TYPE_URLS.stream() - .collect(Collectors.toMap( - typeUrl -> typeUrl, - typeUrl -> { - ResponseTracker responseTracker = new ResponseTracker(); - - Watch watch = cache.createWatch( - ADS, - XdsRequest.create(DiscoveryRequest.newBuilder() - .setNode(Node.getDefaultInstance()) - .setTypeUrl(typeUrl) - .setVersionInfo(MULTIPLE_RESOURCES_SNAPSHOT2.version(typeUrl)) - .addAllResourceNames(MULTIPLE_RESOURCES_SNAPSHOT2.resources(typeUrl).keySet()) - .build()), - SNAPSHOT2.resources(typeUrl).keySet(), - responseTracker, - false); - - return new WatchAndTracker(watch, responseTracker); - })); - - // The snapshot version matches for all resources, but for eds and cds there are new resources present - // for the same version, so we expect the watches to trigger. - assertThatWatchReceivesSnapshot(watches.remove(Resources.V3.CLUSTER_TYPE_URL), MULTIPLE_RESOURCES_SNAPSHOT2); - assertThatWatchReceivesSnapshot(watches.remove(Resources.V3.ENDPOINT_TYPE_URL), MULTIPLE_RESOURCES_SNAPSHOT2); - - // Remaining watches should not trigger - for (WatchAndTracker watchAndTracker : watches.values()) { - assertThatWatchIsOpenWithNoResponses(watchAndTracker); - } - } - - @Test - public void successfullyWatchAllResourceTypesWithSetBeforeWatchWithSameRequestVersionNewResourceHintsNoChange() { - SimpleCache cache = new SimpleCache<>(new SingleNodeGroup(), shouldSendMissingEndpoints()); - - cache.setSnapshot(SingleNodeGroup.GROUP, SNAPSHOT2); - - // Set a watch for the current snapshot for the same version but with new resource hints not - // present in the snapshot that the watch creator does not know about. - // - // Note that we're requesting the additional resources found in MULTIPLE_RESOURCE_SNAPSHOT2 - // while we only know about the resources found in SNAPSHOT2. Since SNAPSHOT2 is the current - // snapshot, we have nothing to respond with for the new resources so we should not trigger - // the watch. - Map watches = Resources.V3.TYPE_URLS.stream() - .collect(Collectors.toMap( - typeUrl -> typeUrl, - typeUrl -> { - ResponseTracker responseTracker = new ResponseTracker(); - - Watch watch = cache.createWatch( - ADS, - XdsRequest.create(DiscoveryRequest.newBuilder() - .setNode(Node.getDefaultInstance()) - .setTypeUrl(typeUrl) - .setVersionInfo(SNAPSHOT2.version(typeUrl)) - .addAllResourceNames(MULTIPLE_RESOURCES_SNAPSHOT2.resources(typeUrl).keySet()) - .build()), - SNAPSHOT2.resources(typeUrl).keySet(), - responseTracker, - false); - - return new WatchAndTracker(watch, responseTracker); - })); - - // No watches should trigger since no new information will be returned - for (WatchAndTracker watchAndTracker : watches.values()) { - assertThatWatchIsOpenWithNoResponses(watchAndTracker); - } - } - - @Test - public void setSnapshotWithVersionMatchingRequestShouldLeaveWatchOpenWithoutAdditionalResponse() { - SimpleCache cache = new SimpleCache<>(new SingleNodeGroup(), shouldSendMissingEndpoints()); - - cache.setSnapshot(SingleNodeGroup.GROUP, SNAPSHOT1); - - Map watches = Resources.V3.TYPE_URLS.stream() - .collect(Collectors.toMap( - typeUrl -> typeUrl, - typeUrl -> { - ResponseTracker responseTracker = new ResponseTracker(); - - Watch watch = cache.createWatch( - ADS, - XdsRequest.create(DiscoveryRequest.newBuilder() - .setNode(Node.getDefaultInstance()) - .setTypeUrl(typeUrl) - .setVersionInfo(SNAPSHOT1.version(typeUrl)) - .addAllResourceNames(SNAPSHOT1.resources(typeUrl).keySet()) - .build()), - SNAPSHOT1.resources(typeUrl).keySet(), - responseTracker, - false); - - return new WatchAndTracker(watch, responseTracker); - })); - - // The request version matches the current snapshot version, so the watches shouldn't receive any responses. - for (String typeUrl : Resources.V3.TYPE_URLS) { - assertThatWatchIsOpenWithNoResponses(watches.get(typeUrl)); - } - - cache.setSnapshot(SingleNodeGroup.GROUP, SNAPSHOT1); - - // The request version still matches the current snapshot version, so the watches shouldn't receive any responses. - for (String typeUrl : Resources.V3.TYPE_URLS) { - assertThatWatchIsOpenWithNoResponses(watches.get(typeUrl)); - } - } - - @Test - public void watchesAreReleasedAfterCancel() { - SimpleCache cache = new SimpleCache<>(new SingleNodeGroup(), shouldSendMissingEndpoints()); - - Map watches = Resources.V3.TYPE_URLS.stream() - .collect(Collectors.toMap( - typeUrl -> typeUrl, - typeUrl -> { - ResponseTracker responseTracker = new ResponseTracker(); - - Watch watch = cache.createWatch( - ADS, - XdsRequest.create(DiscoveryRequest.newBuilder() - .setNode(Node.getDefaultInstance()) - .setTypeUrl(typeUrl) - .addAllResourceNames(SNAPSHOT1.resources(typeUrl).keySet()) - .build()), - Collections.emptySet(), - responseTracker, - false); - - return new WatchAndTracker(watch, responseTracker); - })); - - StatusInfo statusInfo = cache.statusInfo(SingleNodeGroup.GROUP); - - assertThat(statusInfo.numWatches()).isEqualTo(watches.size()); - - watches.values().forEach(w -> w.watch.cancel()); - - assertThat(statusInfo.numWatches()).isZero(); - - watches.values().forEach(w -> assertThat(w.watch.isCancelled()).isTrue()); - } - - @Test - public void watchIsLeftOpenIfNotRespondedImmediately() { - SimpleCache cache = new SimpleCache<>(new SingleNodeGroup(), shouldSendMissingEndpoints()); - cache.setSnapshot(SingleNodeGroup.GROUP, Snapshot.create( - ImmutableList.of(), ImmutableList.of(), ImmutableList.of(), ImmutableList.of(), ImmutableList.of(), VERSION1)); - - ResponseTracker responseTracker = new ResponseTracker(); - Watch watch = cache.createWatch( - true, - XdsRequest.create(DiscoveryRequest.newBuilder() - .setNode(Node.getDefaultInstance()) - .setTypeUrl(ROUTE_TYPE_URL) - .addAllResourceNames(Collections.singleton(ROUTE_NAME)) - .build()), - Collections.singleton(ROUTE_NAME), - responseTracker, - false); - - assertThatWatchIsOpenWithNoResponses(new WatchAndTracker(watch, responseTracker)); - } - - @Test - public void getSnapshot() { - SimpleCache cache = new SimpleCache<>(new SingleNodeGroup(), shouldSendMissingEndpoints()); - - cache.setSnapshot(SingleNodeGroup.GROUP, SNAPSHOT1); - - assertThat(cache.getSnapshot(SingleNodeGroup.GROUP)).isEqualTo(SNAPSHOT1); - } - - @Test - public void clearSnapshot() { - SimpleCache cache = new SimpleCache<>(new SingleNodeGroup(), shouldSendMissingEndpoints()); - - cache.setSnapshot(SingleNodeGroup.GROUP, SNAPSHOT1); - - assertThat(cache.clearSnapshot(SingleNodeGroup.GROUP)).isTrue(); - - assertThat(cache.getSnapshot(SingleNodeGroup.GROUP)).isNull(); - } - - @Test - public void clearSnapshotWithWatches() { - SimpleCache cache = new SimpleCache<>(new SingleNodeGroup(), shouldSendMissingEndpoints()); - - cache.setSnapshot(SingleNodeGroup.GROUP, SNAPSHOT1); - - // Create a watch with an arbitrary type URL and a versionInfo that matches the saved - // snapshot, so the watch doesn't immediately close. - final Watch watch = cache.createWatch(ADS, XdsRequest.create(DiscoveryRequest.newBuilder() - .setNode(Node.getDefaultInstance()) - .setTypeUrl(CLUSTER_TYPE_URL) - .setVersionInfo(SNAPSHOT1.version(CLUSTER_TYPE_URL)) - .build()), - Collections.emptySet(), - r -> { }, - false); - - // clearSnapshot should fail and the snapshot should be left untouched - assertThat(cache.clearSnapshot(SingleNodeGroup.GROUP)).isFalse(); - assertThat(cache.getSnapshot(SingleNodeGroup.GROUP)).isEqualTo(SNAPSHOT1); - assertThat(cache.statusInfo(SingleNodeGroup.GROUP)).isNotNull(); - - watch.cancel(); - - // now that the watch is gone we should be able to clear it - assertThat(cache.clearSnapshot(SingleNodeGroup.GROUP)).isTrue(); - assertThat(cache.getSnapshot(SingleNodeGroup.GROUP)).isNull(); - assertThat(cache.statusInfo(SingleNodeGroup.GROUP)).isNull(); - } - - @Test - public void groups() { - SimpleCache cache = new SimpleCache<>(new SingleNodeGroup(), shouldSendMissingEndpoints()); - - assertThat(cache.groups()).isEmpty(); - - cache.createWatch(ADS, XdsRequest.create(DiscoveryRequest.newBuilder() - .setNode(Node.getDefaultInstance()) - .setTypeUrl(CLUSTER_TYPE_URL) - .build()), - Collections.emptySet(), - r -> { }, - false); - - assertThat(cache.groups()).containsExactly(SingleNodeGroup.GROUP); - } - - private static void assertThatWatchIsOpenWithNoResponses(WatchAndTracker watchAndTracker) { - assertThat(watchAndTracker.watch.isCancelled()).isFalse(); - assertThat(watchAndTracker.tracker.responses).isEmpty(); - } - - - protected static void assertThatWatchReceivesSnapshot(WatchAndTracker watchAndTracker, Snapshot snapshot) { - assertThat(watchAndTracker.tracker.responses).isNotEmpty(); - - Response response = watchAndTracker.tracker.responses.getFirst(); - - assertThat(response).isNotNull(); - assertThat(response.version()).isEqualTo(snapshot.version(watchAndTracker.watch.request().getTypeUrl())); - assertThat(response.resources().toArray(new Message[0])) - .containsExactlyElementsOf(snapshot.resources(watchAndTracker.watch.request().getTypeUrl()).values() - .stream().map(VersionedResource::resource).collect(Collectors.toList())); - } - - protected static class ResponseTracker implements Consumer { - - private final LinkedList responses = new LinkedList<>(); - - @Override - public void accept(Response response) { - responses.add(response); - } - - public LinkedList getResponses() { - return responses; - } - } - - private static class ResponseOrderTracker implements Consumer { - - private final LinkedList responseTypes = new LinkedList<>(); - - @Override public void accept(Response response) { - responseTypes.add(response.request().getTypeUrl()); - } - } - - protected static class SingleNodeGroup implements NodeGroup { - - protected static final String GROUP = "node"; - - @Override - public String hash(Node node) { - if (node == null) { - throw new IllegalArgumentException("node"); - } - - return GROUP; - } - } - - protected static class WatchAndTracker { - - final Watch watch; - final ResponseTracker tracker; - - WatchAndTracker(Watch watch, ResponseTracker tracker) { - this.watch = watch; - this.tracker = tracker; - } - } -} diff --git a/envoy-control-core/src/test/java/pl/allegro/tech/servicemesh/envoycontrol/v3/SimpleCacheWithMissingEndpointsTest.java b/envoy-control-core/src/test/java/pl/allegro/tech/servicemesh/envoycontrol/v3/SimpleCacheWithMissingEndpointsTest.java deleted file mode 100644 index 77d9ee749..000000000 --- a/envoy-control-core/src/test/java/pl/allegro/tech/servicemesh/envoycontrol/v3/SimpleCacheWithMissingEndpointsTest.java +++ /dev/null @@ -1,84 +0,0 @@ -package pl.allegro.tech.servicemesh.envoycontrol.v3; - -import com.google.common.collect.ImmutableList; -import com.google.protobuf.Message; -import io.envoyproxy.controlplane.cache.Resources; -import io.envoyproxy.controlplane.cache.Response; -import io.envoyproxy.controlplane.cache.VersionedResource; -import io.envoyproxy.controlplane.cache.Watch; -import io.envoyproxy.controlplane.cache.XdsRequest; -import io.envoyproxy.controlplane.cache.v3.Snapshot; -import io.envoyproxy.envoy.config.core.v3.Node; -import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; -import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; -import org.junit.Ignore; -import org.junit.jupiter.api.Test; - -import java.util.Collections; - -import static java.util.Collections.emptyList; -import static org.assertj.core.api.Assertions.assertThat; - -public class SimpleCacheWithMissingEndpointsTest extends SimpleCacheTest { - - @Override - protected boolean shouldSendMissingEndpoints() { - return true; - } - - protected static final Snapshot SNAPSHOT_WITH_MISSING_RESOURCES = Snapshot.create( - emptyList(), - ImmutableList.of( - ClusterLoadAssignment.newBuilder().setClusterName("none").build(), - ClusterLoadAssignment.newBuilder().setClusterName(CLUSTER_NAME).build() - ), - emptyList(), - emptyList(), - emptyList(), - VERSION2 - ); - - @Ignore - @Override - public void invalidNamesListShouldReturnWatcherWithNoResponseInAdsMode() { - } - - @Test - public void missingNamesListShouldReturnWatcherWithResponseInAdsMode() { - pl.allegro.tech.servicemesh.envoycontrol.v3.SimpleCache cache = new pl.allegro.tech.servicemesh.envoycontrol.v3.SimpleCache<>(new SingleNodeGroup(), shouldSendMissingEndpoints()); - - cache.setSnapshot(SingleNodeGroup.GROUP, MULTIPLE_RESOURCES_SNAPSHOT2); - - ResponseTracker responseTracker = new ResponseTracker(); - - Watch watch = cache.createWatch( - true, - XdsRequest.create(DiscoveryRequest.newBuilder() - .setNode(Node.getDefaultInstance()) - .setTypeUrl(Resources.V3.ENDPOINT_TYPE_URL) - .addResourceNames("none") - .addResourceNames(CLUSTER_NAME) - .build()), - Collections.emptySet(), - responseTracker, - false); - - assertThatWatchReceivesSnapshotWithMissingResources(new WatchAndTracker(watch, responseTracker), SNAPSHOT_WITH_MISSING_RESOURCES); - } - - private static void assertThatWatchReceivesSnapshotWithMissingResources(WatchAndTracker watchAndTracker, Snapshot snapshot) { - assertThat(watchAndTracker.tracker.getResponses()).isNotEmpty(); - - Response response = watchAndTracker.tracker.getResponses().getFirst(); - - assertThat(response).isNotNull(); - assertThat(response.version()).isEqualTo(snapshot.version(watchAndTracker.watch.request().getTypeUrl())); - Message[] responseValues = response.resources().toArray(new Message[0]); - Message[] snapshotValues = snapshot.resources(watchAndTracker.watch.request().getTypeUrl()).values().stream().map(VersionedResource::resource).toArray(Message[]::new); - - assertThat(responseValues.length).isEqualTo(2); - assertThat(responseValues.length).isEqualTo(snapshotValues.length); - assertThat(responseValues[0].toString()).isEqualTo(snapshotValues[0].toString()); - assertThat(responseValues[1].toString()).isEqualTo(snapshotValues[1].toString()); - } -} diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt index 247f879e3..0cba50150 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt @@ -1121,7 +1121,8 @@ class SnapshotUpdaterTest { request: XdsRequest, knownResourceNames: MutableSet, responseConsumer: Consumer, - hasClusterChanged: Boolean + hasClusterChanged: Boolean, + allowDefaultEmptyEdsUpdate: Boolean ): Watch { throw UnsupportedOperationException("not used in testing") }