Skip to content

Commit

Permalink
Implemented locality weighted load balancing
Browse files Browse the repository at this point in the history
  • Loading branch information
nastassia-dailidava committed Jan 15, 2024
1 parent d6481de commit 7134e1b
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,11 @@ class EnvoySnapshotFactory(
val rateLimitClusters =
if (rateLimitEndpoints.isNotEmpty()) listOf(properties.rateLimit.serviceName) else emptyList()
val rateLimitLoadAssignments = rateLimitClusters.mapNotNull { name -> globalSnapshot.endpoints[name] }
val secondaryLoadAssignments = endpointsFactory.getSecondaryClusterEndpoints(
val loadAssignments = endpointsFactory.assignLocalityWeights(
egressLoadAssignments,
egressRouteSpecifications
)
return egressLoadAssignments.values.toList() + rateLimitLoadAssignments + secondaryLoadAssignments
return loadAssignments + rateLimitLoadAssignments
}

private fun newSnapshotForGroup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ class TrafficSplittingProperties {
}

class ZoneWeights {
var main = 100
var main = 100 // todo remove
var secondary = 0
var zoneByWeights: Map<String, Int> = mapOf()
}

class LoadBalancingWeightsProperties {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,6 @@ class EnvoyClustersFactory(

companion object {
private val logger by logger()

@JvmStatic
fun getSecondaryClusterName(serviceName: String, snapshotProperties: SnapshotProperties): String {
return "$serviceName-${snapshotProperties.loadBalancing.trafficSplitting.secondaryClusterSuffix}"
}

@JvmStatic
fun getAggregateClusterName(serviceName: String, snapshotProperties: SnapshotProperties): String {
return "$serviceName-${snapshotProperties.loadBalancing.trafficSplitting.aggregateClusterSuffix}"
}
}

fun getClustersForServices(
Expand Down Expand Up @@ -262,15 +252,16 @@ class EnvoyClustersFactory(
dependencySettings: DependencySettings,
cluster: Cluster
): Collection<Cluster> {
val mainCluster = createClusterForGroup(dependencySettings, cluster)
val secondaryCluster = createClusterForGroup(
dependencySettings,
cluster,
getSecondaryClusterName(cluster.name, properties)
)
val aggregateCluster =
createAggregateCluster(mainCluster.name, linkedSetOf(secondaryCluster.name, mainCluster.name))
return listOf(mainCluster, secondaryCluster, aggregateCluster)
val cluster = createClusterForGroup(dependencySettings, cluster)
.toBuilder()
.setCommonLbConfig(
Cluster.CommonLbConfig.newBuilder().setLocalityWeightedLbConfig(
Cluster.CommonLbConfig.LocalityWeightedLbConfig.getDefaultInstance()
)
.build()
)
.build()
return listOf(cluster)
.onEach {
logger.debug("Created set of cluster configs for traffic splitting: {}", it.toString())
}
Expand Down Expand Up @@ -357,25 +348,6 @@ class EnvoyClustersFactory(
}
}

private fun createAggregateCluster(clusterName: String, aggregatedClusters: Collection<String>): Cluster {
return Cluster.newBuilder()
.setName(getAggregateClusterName(clusterName, properties))
.setConnectTimeout(Durations.fromMillis(properties.edsConnectionTimeout.toMillis()))
.setLbPolicy(Cluster.LbPolicy.CLUSTER_PROVIDED)
.setClusterType(
Cluster.CustomClusterType.newBuilder()
.setName("envoy.clusters.aggregate")
.setTypedConfig(
Any.pack(
EnvoyClusterConfig.newBuilder()
.addAllClusters(aggregatedClusters)
.build()
)
)
)
.build()
}

private fun strictDnsCluster(
domainDependency: DomainDependency,
useTransparentProxy: Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstance
import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstances
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.RouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.StandardRouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.WeightRouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.clusters.EnvoyClustersFactory.Companion.getSecondaryClusterName
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.ZoneWeights
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.ServiceTagMetadataGenerator

typealias EnvoyProxyLocality = io.envoyproxy.envoy.config.core.v3.Locality
Expand Down Expand Up @@ -77,25 +78,37 @@ class EnvoyEndpointsFactory(
}
}

fun getSecondaryClusterEndpoints(
fun assignLocalityWeights(
clusterLoadAssignments: Map<String, ClusterLoadAssignment>,
egressRouteSpecifications: List<RouteSpecification>
): List<ClusterLoadAssignment> {
return egressRouteSpecifications
val weighted = egressRouteSpecifications
.filterIsInstance<WeightRouteSpecification>()
.onEach { logger.debug("Traffic splitting is enabled for cluster: ${it.clusterName}") }
.mapNotNull { routeSpec ->
clusterLoadAssignments[routeSpec.clusterName]?.let { assignment ->
ClusterLoadAssignment.newBuilder(assignment)
.clearEndpoints()
.addAllEndpoints(assignment.endpointsList?.filter { e ->
e.locality.zone == properties.loadBalancing.trafficSplitting.zoneName
})
.setClusterName(getSecondaryClusterName(routeSpec.clusterName, properties))
.addAllEndpoints(assignWeights(assignment.endpointsList, routeSpec.clusterWeights))
.setClusterName(routeSpec.clusterName)
.build()
}
}
.filter { it.endpointsList.isNotEmpty() }
val remaining = egressRouteSpecifications.filterIsInstance<StandardRouteSpecification>()
.mapNotNull { clusterLoadAssignments[it.clusterName] }
return (remaining + weighted).filter { it.endpointsList.isNotEmpty() }
}

private fun assignWeights(
llbEndpointsList: List<LocalityLbEndpoints>, weights: ZoneWeights
): List<LocalityLbEndpoints> {
return llbEndpointsList
.filter { weights.zoneByWeights.containsKey(it.locality.zone) }
.map {
it.toBuilder()
.setLoadBalancingWeight(UInt32Value.of(weights.zoneByWeights[it.locality.zone] ?: 0))
.build()
}.toList()
}

private fun filterEndpoints(loadAssignment: ClusterLoadAssignment, tag: String): ClusterLoadAssignment? {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import io.envoyproxy.envoy.config.route.v3.RouteAction
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration
import io.envoyproxy.envoy.config.route.v3.RouteMatch
import io.envoyproxy.envoy.config.route.v3.VirtualHost
import io.envoyproxy.envoy.config.route.v3.WeightedCluster
import io.envoyproxy.envoy.extensions.retry.host.omit_canary_hosts.v3.OmitCanaryHostsPredicate
import io.envoyproxy.envoy.extensions.retry.host.omit_host_metadata.v3.OmitHostMetadataConfig
import io.envoyproxy.envoy.extensions.retry.host.previous_hosts.v3.PreviousHostsPredicate
Expand All @@ -32,11 +31,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.groups.RetryHostPredicate
import pl.allegro.tech.servicemesh.envoycontrol.logger
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.RouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.StandardRouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.WeightRouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.clusters.EnvoyClustersFactory.Companion.getAggregateClusterName
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.ServiceTagFilterFactory
import java.lang.Boolean.TRUE
import pl.allegro.tech.servicemesh.envoycontrol.groups.RetryPolicy as EnvoyControlRetryPolicy

class EnvoyEgressRoutesFactory(
Expand Down Expand Up @@ -323,7 +318,7 @@ class EnvoyEgressRoutesFactory(
shouldAddRetryPolicy: Boolean = false
): RouteAction.Builder {
val routeAction = RouteAction.newBuilder()
.setCluster(routeSpecification)
.setCluster(routeSpecification.clusterName) // todo add header back

routeSpecification.settings.timeoutPolicy.let { timeoutPolicy ->
timeoutPolicy.idleTimeout?.let { routeAction.setIdleTimeout(it) }
Expand All @@ -346,65 +341,6 @@ class EnvoyEgressRoutesFactory(

return routeAction
}

private fun RouteAction.Builder.setCluster(routeSpec: RouteSpecification): RouteAction.Builder {
return when (routeSpec) {
is WeightRouteSpecification -> {
logger.debug(
"Creating weighted cluster configuration for route spec {}, {}",
routeSpec.clusterName,
routeSpec.clusterWeights
)
this.setWeightedClusters(
WeightedCluster.newBuilder()
.withClusterWeight(routeSpec.clusterName, routeSpec.clusterWeights.main)
.withClusterWeight(
getAggregateClusterName(routeSpec.clusterName, properties),
routeSpec.clusterWeights.secondary,
true
)
)
}
is StandardRouteSpecification -> {
this.setCluster(routeSpec.clusterName)
}
}
}

private fun WeightedCluster.Builder.withClusterWeight(
clusterName: String,
weight: Int,
withHeader: Boolean = false
): WeightedCluster.Builder {
val clusters = WeightedCluster.ClusterWeight.newBuilder()
.setName(clusterName)
.setWeight(UInt32Value.of(weight))
.also {
if (withHeader) {
it.withHeader(properties.loadBalancing.trafficSplitting.headerName)
}
}
return this.addClusters(clusters)
}

private fun WeightedCluster.ClusterWeight.Builder.withHeader(key: String?): WeightedCluster.ClusterWeight.Builder {
key?.takeIf { it.isNotBlank() }
?.let {
this.addResponseHeadersToAdd(buildHeader(key))
}
return this
}

private fun buildHeader(key: String): HeaderValueOption.Builder {
return HeaderValueOption.newBuilder()
.setHeader(
HeaderValue.newBuilder()
.setKey(key)
.setValue(TRUE.toString())
)
.setAppendAction(HeaderValueOption.HeaderAppendAction.OVERWRITE_IF_EXISTS_OR_ADD)
.setKeepEmptyValue(false)
}
}

class RequestPolicyMapper private constructor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ internal class EnvoyEndpointsFactoryTest {
.createLoadAssignment(services, multiClusterState)
.associateBy { it.clusterName }

val result = envoyEndpointsFactory.getSecondaryClusterEndpoints(
val result = envoyEndpointsFactory.assignLocalityWeights(
loadAssignments,
services.map { it.toRouteSpecification() }
)
Expand Down Expand Up @@ -417,7 +417,7 @@ internal class EnvoyEndpointsFactoryTest {
multiClusterState
).associateBy { it.clusterName }

val result = envoyEndpointsFactory.getSecondaryClusterEndpoints(
val result = envoyEndpointsFactory.assignLocalityWeights(
loadAssignments,
listOf(serviceName.toRouteSpecification())
)
Expand Down Expand Up @@ -447,7 +447,7 @@ internal class EnvoyEndpointsFactoryTest {
multiClusterState
).associateBy { it.clusterName }

val result = envoyEndpointsFactory.getSecondaryClusterEndpoints(
val result = envoyEndpointsFactory.assignLocalityWeights(
loadAssignments,
listOf("some-other-service-name".toRouteSpecification())
)
Expand Down Expand Up @@ -475,7 +475,7 @@ internal class EnvoyEndpointsFactoryTest {
multiClusterState
).associateBy { it.clusterName }

val result = envoyEndpointsFactory.getSecondaryClusterEndpoints(
val result = envoyEndpointsFactory.assignLocalityWeights(
loadAssignments,
listOf(serviceName.toRouteSpecification())
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import pl.allegro.tech.servicemesh.envoycontrol.config.envoy.EnvoyExtension
import pl.allegro.tech.servicemesh.envoycontrol.config.envoycontrol.EnvoyControlClusteredExtension
import pl.allegro.tech.servicemesh.envoycontrol.config.service.EchoServiceExtension
import verifyCallsCountCloseTo
import verifyCallsCountGreaterThan
import verifyIsReachable
import java.time.Duration

Expand All @@ -20,18 +19,22 @@ class WeightedClustersRoutingTest {
private const val forceTrafficZone = "dc2"

private val properties = mapOf(
"pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.endpoints.EnvoyEndpointsFactory" to "DEBUG",
"pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.clusters.EnvoyClustersFactory" to "DEBUG",
"envoy-control.envoy.snapshot.stateSampleDuration" to Duration.ofSeconds(0),
"envoy-control.sync.enabled" to true,
"envoy-control.envoy.snapshot.loadBalancing.trafficSplitting.zoneName" to forceTrafficZone,
"envoy-control.envoy.snapshot.loadBalancing.trafficSplitting.serviceByWeightsProperties.$serviceName.main" to 90,
"envoy-control.envoy.snapshot.loadBalancing.trafficSplitting.serviceByWeightsProperties.$serviceName.secondary" to 10,
"envoy-control.envoy.snapshot.loadBalancing.trafficSplitting.serviceByWeightsProperties.$serviceName.zoneByWeights.dc1" to 3,
"envoy-control.envoy.snapshot.loadBalancing.trafficSplitting.serviceByWeightsProperties.$serviceName.zoneByWeights.dc2" to 1,
"envoy-control.envoy.snapshot.loadBalancing.priorities.zonePriorities" to mapOf(
"dc1" to mapOf(
"dc1" to 0,
"dc2" to 1
"dc2" to 0
),
"dc2" to mapOf(
"dc1" to 1,
"dc1" to 0,
"dc2" to 0,
),
)
Expand Down Expand Up @@ -93,8 +96,8 @@ class WeightedClustersRoutingTest {
echoEnvoyDC1.verifyIsReachable(upstreamServiceDC2, upstreamServiceName)

echoEnvoyDC1.callUpstreamServiceRepeatedly(upstreamServiceDC1, upstreamServiceDC2)
.verifyCallsCountCloseTo(upstreamServiceDC1, 90)
.verifyCallsCountGreaterThan(upstreamServiceDC2, 1)
.verifyCallsCountCloseTo(upstreamServiceDC1, 75)
.verifyCallsCountCloseTo(upstreamServiceDC2, 25)
}

@Test
Expand All @@ -108,7 +111,7 @@ class WeightedClustersRoutingTest {
echoEnvoyDC1.verifyIsReachable(upstreamServiceDC2, upstreamServiceName)

echoEnvoyDC1.callUpstreamServiceRepeatedly(upstreamServiceDC1, upstreamServiceDC2, tag = "tag")
.verifyCallsCountCloseTo(upstreamServiceDC1, 90)
.verifyCallsCountGreaterThan(upstreamServiceDC2, 1)
.verifyCallsCountCloseTo(upstreamServiceDC1, 75)
.verifyCallsCountCloseTo(upstreamServiceDC2, 25)
}
}

0 comments on commit 7134e1b

Please sign in to comment.