Skip to content

Commit

Permalink
#624 Set flat priority only for services with traffic splitting
Browse files Browse the repository at this point in the history
  • Loading branch information
nastassia-dailidava committed Apr 22, 2024
1 parent 8e902af commit 0476fc8
Show file tree
Hide file tree
Showing 14 changed files with 300 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class ControlPlane private constructor(
val envoySnapshotFactory = EnvoySnapshotFactory(
ingressRoutesFactory = EnvoyIngressRoutesFactory(snapshotProperties, envoyHttpFilters, currentZone),
egressRoutesFactory = EnvoyEgressRoutesFactory(snapshotProperties),
clustersFactory = EnvoyClustersFactory(snapshotProperties),
clustersFactory = EnvoyClustersFactory(snapshotProperties, currentZone),
endpointsFactory = EnvoyEndpointsFactory(
snapshotProperties,
ServiceTagMetadataGenerator(snapshotProperties.routing.serviceTags),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,12 @@ class EnvoySnapshotFactory(
?.any { e -> trafficSplitting.zoneName == e.locality.zone }
?: false
return if (weights != null && enabledForDependency) {
if (serviceName == "varnish") {
logger.info(
"Building traffic splitting route spec, weights: $weights, " +
"serviceName: $serviceName, clusterName: $clusterName, "
)
}
logger.debug(
"Building traffic splitting route spec, weights: $weights, " +
"serviceName: $serviceName, clusterName: $clusterName, "
Expand Down Expand Up @@ -273,7 +279,6 @@ class EnvoySnapshotFactory(
}
}
}

val rateLimitClusters =
if (rateLimitEndpoints.isNotEmpty()) listOf(properties.rateLimit.serviceName) else emptyList()
val rateLimitLoadAssignments = rateLimitClusters.mapNotNull { name -> globalSnapshot.endpoints[name] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ class TrafficSplittingProperties {
var zoneName = ""
var headerName = ""
var weightsByService: Map<String, ZoneWeights> = mapOf()
var zonesAllowingTrafficSplitting = listOf<String>()
}

class ZoneWeights {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ import pl.allegro.tech.servicemesh.envoycontrol.snapshot.GlobalSnapshot
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.OAuthProvider
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.Threshold
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.TrafficSplittingProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.SanUriMatcherFactory

typealias EnvoyClusterConfig = io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig

class EnvoyClustersFactory(
private val properties: SnapshotProperties
private val properties: SnapshotProperties,
private val currentZone: String
) {
private val httpProtocolOptions: HttpProtocolOptions = HttpProtocolOptions.newBuilder().setIdleTimeout(
Durations.fromMillis(properties.egress.commonHttp.connectionIdleTimeout.toMillis())
Expand Down Expand Up @@ -283,15 +283,14 @@ class EnvoyClustersFactory(
): Boolean {
val trafficSplitting = properties.loadBalancing.trafficSplitting
val trafficSplitEnabled = trafficSplitting.weightsByService.containsKey(serviceName)
return trafficSplitEnabled && hasEndpointsInZone(clusterLoadAssignment, trafficSplitting)
val allowed = clusterLoadAssignment != null &&
properties.loadBalancing.trafficSplitting.zonesAllowingTrafficSplitting.contains(currentZone)
if (serviceName == "varnish" || serviceName == "service-mesh-service-second") {
logger.info("trafficSplitEnabled $trafficSplitEnabled allowed $allowed, $currentZone")
}
return trafficSplitEnabled && allowed
}

private fun hasEndpointsInZone(
clusterLoadAssignment: ClusterLoadAssignment?,
trafficSplitting: TrafficSplittingProperties
) = clusterLoadAssignment?.endpointsList
?.any { e -> trafficSplitting.zoneName == e.locality.zone && e.lbEndpointsCount > 0 } ?: false

private fun shouldAddDynamicForwardProxyCluster(group: Group) =
group.proxySettings.outgoing.getDomainPatternDependencies().isNotEmpty()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class EnvoyEndpointsFactory(
) {
companion object {
private val logger by logger()
private const val HIGHEST_PRIORITY = 0
}

fun createLoadAssignment(
Expand Down Expand Up @@ -84,22 +85,44 @@ class EnvoyEndpointsFactory(
return if (routeSpec is WeightRouteSpecification) {
ClusterLoadAssignment.newBuilder(loadAssignment)
.clearEndpoints()
.addAllEndpoints(assignWeights(loadAssignment.endpointsList, routeSpec.clusterWeights))
.addAllEndpoints(
assignWeightsAndDuplicateEndpoints(
loadAssignment.endpointsList,
routeSpec.clusterWeights
)
)
.setClusterName(routeSpec.clusterName)
.build()
} else loadAssignment
}

private fun assignWeights(
llbEndpointsList: List<LocalityLbEndpoints>, weights: ZoneWeights
private fun assignWeightsAndDuplicateEndpoints(
llbEndpointsList: List<LocalityLbEndpoints>,
weights: ZoneWeights
): List<LocalityLbEndpoints> {
if (properties.loadBalancing.trafficSplitting.zonesAllowingTrafficSplitting.contains(currentZone)) {
val endpoints = llbEndpointsList
.map {
if (weights.weightByZone.containsKey(it.locality.zone)) {
LocalityLbEndpoints.newBuilder(it)
.setLoadBalancingWeight(UInt32Value.of(weights.weightByZone[it.locality.zone] ?: 0))
.build()
} else it
}
return overrideTrafficSplittingZoneEndpointsPriority(endpoints) + endpoints
}
return llbEndpointsList
}

private fun overrideTrafficSplittingZoneEndpointsPriority(
endpoints: List<LocalityLbEndpoints>
): List<LocalityLbEndpoints> {
return endpoints
.filter { properties.loadBalancing.trafficSplitting.zoneName == it.locality.zone }
.map {
if (weights.weightByZone.containsKey(it.locality.zone)) {
LocalityLbEndpoints.newBuilder(it)
.setLoadBalancingWeight(UInt32Value.of(weights.weightByZone[it.locality.zone] ?: 0))
.build()
} else it
LocalityLbEndpoints.newBuilder(it)
.setPriority(HIGHEST_PRIORITY)
.build()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ import pl.allegro.tech.servicemesh.envoycontrol.utils.CLUSTER_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_CLUSTER_WEIGHTS
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_DISCOVERY_SERVICE_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_IDLE_TIMEOUT
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_PRIORITY
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_SERVICE_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.EGRESS_HOST
import pl.allegro.tech.servicemesh.envoycontrol.utils.EGRESS_PORT
import pl.allegro.tech.servicemesh.envoycontrol.utils.HIGHEST_PRIORITY
import pl.allegro.tech.servicemesh.envoycontrol.utils.INGRESS_HOST
import pl.allegro.tech.servicemesh.envoycontrol.utils.INGRESS_PORT
import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_PROPERTIES_WITH_WEIGHTS
Expand Down Expand Up @@ -275,13 +277,18 @@ class EnvoySnapshotFactoryTest {
assertThat(it.endpointsList)
.anySatisfy { e ->
e.locality.zone == TRAFFIC_SPLITTING_ZONE &&
e.loadBalancingWeight.value == DEFAULT_CLUSTER_WEIGHTS.weightByZone[TRAFFIC_SPLITTING_ZONE]
e.loadBalancingWeight.value == DEFAULT_CLUSTER_WEIGHTS.weightByZone[TRAFFIC_SPLITTING_ZONE] &&
e.priority == DEFAULT_PRIORITY
}.anySatisfy { e ->
e.locality.zone == TRAFFIC_SPLITTING_ZONE &&
e.loadBalancingWeight.value == DEFAULT_CLUSTER_WEIGHTS.weightByZone[TRAFFIC_SPLITTING_ZONE] &&
e.priority == HIGHEST_PRIORITY
}
.anySatisfy { e ->
e.locality.zone == CURRENT_ZONE &&
e.loadBalancingWeight.value == DEFAULT_CLUSTER_WEIGHTS.weightByZone[CURRENT_ZONE]
}
.hasSize(2)
.hasSize(3)
}
}

Expand Down Expand Up @@ -313,7 +320,7 @@ class EnvoySnapshotFactoryTest {
e.locality.zone == TRAFFIC_SPLITTING_ZONE &&
!e.hasLoadBalancingWeight()
}
.hasSize(2)
.hasSize(3)
}
}

Expand Down Expand Up @@ -457,7 +464,7 @@ class EnvoySnapshotFactoryTest {
CURRENT_ZONE
)
val egressRoutesFactory = EnvoyEgressRoutesFactory(properties)
val clustersFactory = EnvoyClustersFactory(properties)
val clustersFactory = EnvoyClustersFactory(properties, CURRENT_ZONE)
val endpointsFactory = EnvoyEndpointsFactory(properties, ServiceTagMetadataGenerator(), CURRENT_ZONE)
val envoyHttpFilters = EnvoyHttpFilters.defaultFilters(properties)
val listenersFactory = EnvoyListenersFactory(properties, envoyHttpFilters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1312,7 +1312,7 @@ class SnapshotUpdaterTest {
EnvoySnapshotFactory(
ingressRoutesFactory = EnvoyIngressRoutesFactory(snapshotProperties, currentZone = CURRENT_ZONE),
egressRoutesFactory = EnvoyEgressRoutesFactory(snapshotProperties),
clustersFactory = EnvoyClustersFactory(snapshotProperties),
clustersFactory = EnvoyClustersFactory(snapshotProperties, CURRENT_ZONE),
endpointsFactory = EnvoyEndpointsFactory(
snapshotProperties, ServiceTagMetadataGenerator(snapshotProperties.routing.serviceTags), CURRENT_ZONE
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.snapshot.GlobalSnapshot
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.utils.CLUSTER_NAME1
import pl.allegro.tech.servicemesh.envoycontrol.utils.CLUSTER_NAME2
import pl.allegro.tech.servicemesh.envoycontrol.utils.CURRENT_ZONE
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_CLUSTER_WEIGHTS
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_SERVICE_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.TRAFFIC_SPLITTING_ZONE
Expand All @@ -23,7 +24,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.utils.createServicesGroup
internal class EnvoyClustersFactoryTest {

companion object {
private val factory = EnvoyClustersFactory(SnapshotProperties())
private val factory = EnvoyClustersFactory(SnapshotProperties(), CURRENT_ZONE)
private val snapshotPropertiesWithWeights = SnapshotProperties().apply {
loadBalancing.trafficSplitting.weightsByService = mapOf(
DEFAULT_SERVICE_NAME to DEFAULT_CLUSTER_WEIGHTS
Expand Down Expand Up @@ -96,7 +97,7 @@ internal class EnvoyClustersFactoryTest {
@Test
fun `should get cluster with locality weighted config for group clusters`() {
val cluster1 = createCluster(snapshotPropertiesWithWeights, CLUSTER_NAME1)
val factory = EnvoyClustersFactory(snapshotPropertiesWithWeights)
val factory = EnvoyClustersFactory(snapshotPropertiesWithWeights, CURRENT_ZONE)
val result = factory.getClustersForGroup(
createServicesGroup(
snapshotProperties = snapshotPropertiesWithWeights,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.MethodSource
import pl.allegro.tech.servicemesh.envoycontrol.groups.DependencySettings
import pl.allegro.tech.servicemesh.envoycontrol.groups.RoutingPolicy
import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterState
import pl.allegro.tech.servicemesh.envoycontrol.services.Locality
Expand All @@ -19,6 +20,9 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.ServicesState
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.LoadBalancingPriorityProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.LoadBalancingProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.TrafficSplittingProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.WeightRouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.ZoneWeights
import java.util.concurrent.ConcurrentHashMap
import java.util.stream.Stream

Expand All @@ -33,13 +37,13 @@ internal class EnvoyEndpointsFactoryTest {
),
"DC2" to mapOf(
"DC1" to 1,
"DC2" to 2,
"DC3" to 3
"DC2" to 0,
"DC3" to 2
),
"DC3" to mapOf(
"DC1" to 2,
"DC2" to 3,
"DC3" to 4
"DC1" to 1,
"DC2" to 2,
"DC3" to 0
)
)

Expand All @@ -52,6 +56,7 @@ internal class EnvoyEndpointsFactoryTest {
private val serviceName = "service-one"

private val defaultZone = "DC1"
private val trafficSplittingZone = "DC2"

private val endpointsFactory = EnvoyEndpointsFactory(
SnapshotProperties().apply {
Expand All @@ -77,6 +82,31 @@ internal class EnvoyEndpointsFactoryTest {
)
)

private val defaultZoneWeights = mapOf(
"DC1" to 100,
"DC2" to 10,
"DC3" to 2
)

private val snapshotPropertiesWithWeights = SnapshotProperties().apply {
loadBalancing = LoadBalancingProperties()
.apply {
priorities = LoadBalancingPriorityProperties()
.apply { zonePriorities = dcPriorityProperties }
trafficSplitting = TrafficSplittingProperties()
.apply {
zoneName = trafficSplittingZone
zonesAllowingTrafficSplitting = listOf("DC1")
weightsByService = mapOf(
serviceName to ZoneWeights()
.apply {
weightByZone = defaultZoneWeights
}
)
}
}
}

// language=json
private val globalLoadAssignmentJson = """{
"cluster_name": "lorem-service",
Expand Down Expand Up @@ -354,6 +384,28 @@ internal class EnvoyEndpointsFactoryTest {
)
}

@Test
fun `should override priority and duplicate endpoints for traffic splitting zone`() {
val envoyEndpointsFactory = EnvoyEndpointsFactory(
snapshotPropertiesWithWeights,
currentZone = "DC1"
)
val loadAssignments = envoyEndpointsFactory.createLoadAssignment(setOf(serviceName), multiClusterStateDC1Local)
var resultLoadAssignment = envoyEndpointsFactory.assignLocalityWeights(
WeightRouteSpecification(
serviceName, listOf(), DependencySettings(), ZoneWeights().apply { weightByZone = defaultZoneWeights }
),
loadAssignments[0]
)

assertThat(resultLoadAssignment.endpointsList).hasSize(dcPriorityProperties.size + 1)
assertThat(resultLoadAssignment.endpointsList)
.anySatisfy { it.hasZoneWithPriority("DC2", 1) }
.anySatisfy { it.hasZoneWithPriority("DC2", 0) }
.anySatisfy { it.hasZoneWithPriority("DC1", 0) }
.anySatisfy { it.hasZoneWithPriority("DC3", 2) }
}

private fun List<ClusterLoadAssignment>.assertHasLoadAssignment(map: Map<String, Int>) {
assertThat(this)
.isNotEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ fun createEndpoint(zone: String): LocalityLbEndpoints {
.build()
)
.addAllLbEndpoints(listOf(LbEndpoint.getDefaultInstance()))
.setPriority(DEFAULT_PRIORITY)
.build()
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const val CLUSTER_NAME1 = "cluster-1"
const val CLUSTER_NAME2 = "cluster-2"
const val TRAFFIC_SPLITTING_ZONE = "dc2"
const val CURRENT_ZONE = "dc1"
const val DEFAULT_PRIORITY = 1
const val HIGHEST_PRIORITY = 0

val DEFAULT_CLUSTER_WEIGHTS = zoneWeights(mapOf(CURRENT_ZONE to 60, TRAFFIC_SPLITTING_ZONE to 40))

Expand All @@ -24,6 +26,7 @@ val SNAPSHOT_PROPERTIES_WITH_WEIGHTS = SnapshotProperties().also {
DEFAULT_SERVICE_NAME to DEFAULT_CLUSTER_WEIGHTS
)
it.loadBalancing.trafficSplitting.zoneName = TRAFFIC_SPLITTING_ZONE
it.loadBalancing.trafficSplitting.zonesAllowingTrafficSplitting = listOf(CURRENT_ZONE)
}

fun zoneWeights(weightByZone: Map<String, Int>) = ZoneWeights().also {
Expand Down
Loading

0 comments on commit 0476fc8

Please sign in to comment.