Skip to content

Commit

Permalink
Implemented locality weighted load balancing (#408)
Browse files Browse the repository at this point in the history
* Implemented locality weighted load balancing
  • Loading branch information
nastassia-dailidava authored Feb 1, 2024
1 parent 4654e30 commit db5aeaf
Show file tree
Hide file tree
Showing 15 changed files with 205 additions and 470 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

Lists all changes with user impact.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
## [0.20.10]
### Changed
- Implemented locality weighted load balancing


## [0.20.9]
### Changed
- Configurable path normalization
Expand Down
18 changes: 10 additions & 8 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,16 @@ Property
**envoy-control.envoy.snapshot.outgoing-permissions.rbac.clients-lists.custom-clients-lists** | Lists of clients which will be applied to each rbac policy, only if key for defined list is present in clients for defined endpoint | empty map

## Load Balancing
Property | Description | Default value
------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------
**envoy-control.envoy.snapshot.load-balancing.weights.enabled** | if set to true, weighted load balancing will be enabled | false
**envoy-control.envoy.snapshot.load-balancing.canary.enabled** | if set to true, routing to canary instances based on *canary header* will be enabled (corresponding Envoy static config is required, see [docs](features/load_balancing.md)) | false
**envoy-control.envoy.snapshot.load-balancing.canary.metadata-key** | metadata that will be set for canary EDS endpoints - key (must match Envoy static `header_to_metadata` filter config, see [docs](features/load_balancing.md)) | canary
**envoy-control.envoy.snapshot.load-balancing.canary.header-value** | only when *canary header* is set to this value request will be routed to canary instances (*canary header* name is set in Envoy static config, see [docs](features/load_balancing.md)) | 1
**envoy-control.envoy.snapshot.load-balancing.policy** | load balancing policy used for clusters. [Accepted values](https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/cluster/v3/cluster.proto#enum-config-cluster-v3-cluster-lbpolicy) | LEAST_REQUEST
**envoy-control.envoy.snapshot.load-balancing.use-keys-subset-fallback-policy** | KEYS_SUBSET fallback policy is used by default when canary and service-tags are enabled. It is not supported in Envoy <= 1.12.x. Set to false for compatibility with Envoy 1.12.x | true
Property | Description | Default value
------------------------------------------------------------------------------------------- |----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ---------
**envoy-control.envoy.snapshot.load-balancing.weights.enabled** | if set to true, weighted load balancing will be enabled | false
**envoy-control.envoy.snapshot.load-balancing.canary.enabled** | if set to true, routing to canary instances based on *canary header* will be enabled (corresponding Envoy static config is required, see [docs](features/load_balancing.md)) | false
**envoy-control.envoy.snapshot.load-balancing.canary.metadata-key** | metadata that will be set for canary EDS endpoints - key (must match Envoy static `header_to_metadata` filter config, see [docs](features/load_balancing.md)) | canary
**envoy-control.envoy.snapshot.load-balancing.canary.header-value** | only when *canary header* is set to this value request will be routed to canary instances (*canary header* name is set in Envoy static config, see [docs](features/load_balancing.md)) | 1
**envoy-control.envoy.snapshot.load-balancing.policy** | load balancing policy used for clusters. [Accepted values](https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/cluster/v3/cluster.proto#enum-config-cluster-v3-cluster-lbpolicy) | LEAST_REQUEST
**envoy-control.envoy.snapshot.load-balancing.use-keys-subset-fallback-policy** | KEYS_SUBSET fallback policy is used by default when canary and service-tags are enabled. It is not supported in Envoy <= 1.12.x. Set to false for compatibility with Envoy 1.12.x | true
**envoy-control.envoy.snapshot.load-balancing.traffic-splitting.zoneName** | a zone to which traffic will be routed if traffic splitting is enabled | ""
**envoy-control.envoy.snapshot.load-balancing.traffic-splitting.weights-by-service-properties.** | a map that maps service name to a map [zoneName: weight] | empty map

## Routing

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class EnvoySnapshotFactory(
globalSnapshot: GlobalSnapshot,
): RouteSpecification {
val trafficSplitting = properties.loadBalancing.trafficSplitting
val weights = trafficSplitting.serviceByWeightsProperties[serviceName]
val weights = trafficSplitting.weightsByService[serviceName]
val enabledForDependency = globalSnapshot.endpoints[clusterName]?.endpointsList
?.any { e -> trafficSplitting.zoneName == e.locality.zone }
?: false
Expand Down Expand Up @@ -268,18 +268,16 @@ class EnvoySnapshotFactory(
// endpointsFactory.filterEndpoints() can use this cache to prevent computing the same
// ClusterLoadAssignments many times - it may reduce MEM, CPU and latency if some serviceTags are
// commonly used
routeSpec.clusterName to endpointsFactory.filterEndpoints(endpoints, routeSpec.settings.routingPolicy)
endpointsFactory.filterEndpoints(endpoints, routeSpec.settings.routingPolicy).let {
endpointsFactory.assignLocalityWeights(routeSpec, it)
}
}
}.toMap()
}

val rateLimitClusters =
if (rateLimitEndpoints.isNotEmpty()) listOf(properties.rateLimit.serviceName) else emptyList()
val rateLimitLoadAssignments = rateLimitClusters.mapNotNull { name -> globalSnapshot.endpoints[name] }
val secondaryLoadAssignments = endpointsFactory.getSecondaryClusterEndpoints(
egressLoadAssignments,
egressRouteSpecifications
)
return egressLoadAssignments.values.toList() + rateLimitLoadAssignments + secondaryLoadAssignments
return egressLoadAssignments + rateLimitLoadAssignments
}

private fun newSnapshotForGroup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,15 +164,11 @@ class CanaryProperties {

class TrafficSplittingProperties {
var zoneName = ""
var headerName = ""
var serviceByWeightsProperties: Map<String, ZoneWeights> = mapOf()
var secondaryClusterSuffix = "secondary"
var aggregateClusterSuffix = "aggregate"
var weightsByService: Map<String, ZoneWeights> = mapOf()
}

class ZoneWeights {
var main = 100
var secondary = 0
var weightByZone: Map<String, Int> = mapOf()
}

class LoadBalancingWeightsProperties {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,6 @@ class EnvoyClustersFactory(

companion object {
private val logger by logger()

@JvmStatic
fun getSecondaryClusterName(serviceName: String, snapshotProperties: SnapshotProperties): String {
return "$serviceName-${snapshotProperties.loadBalancing.trafficSplitting.secondaryClusterSuffix}"
}

@JvmStatic
fun getAggregateClusterName(serviceName: String, snapshotProperties: SnapshotProperties): String {
return "$serviceName-${snapshotProperties.loadBalancing.trafficSplitting.aggregateClusterSuffix}"
}
}

fun getClustersForServices(
Expand Down Expand Up @@ -207,8 +197,8 @@ class EnvoyClustersFactory(

val dependencies = group.proxySettings.outgoing.getServiceDependencies().associateBy { it.service }
val clustersForGroup = when (group) {
is ServicesGroup -> dependencies.flatMap {
createClusters(
is ServicesGroup -> dependencies.mapNotNull {
createCluster(
group.serviceName,
it.value.settings,
clusters[it.key],
Expand All @@ -217,9 +207,9 @@ class EnvoyClustersFactory(
}

is AllServicesGroup -> {
globalSnapshot.allServicesNames.flatMap {
globalSnapshot.allServicesNames.mapNotNull {
val dependency = dependencies[it]
createClusters(
createCluster(
group.serviceName,
getDependencySettings(dependency, group),
clusters[it],
Expand All @@ -245,7 +235,7 @@ class EnvoyClustersFactory(
dependencySettings: DependencySettings,
cluster: Cluster,
clusterName: String? = cluster.name
): Cluster {
): Cluster.Builder {
val idleTimeoutPolicy =
dependencySettings.timeoutPolicy.connectionIdleTimeout ?: cluster.commonHttpProtocolOptions.idleTimeout
return Cluster.newBuilder(cluster)
Expand All @@ -255,48 +245,44 @@ class EnvoyClustersFactory(
Cluster.EdsClusterConfig.newBuilder(cluster.edsClusterConfig)
.setServiceName(clusterName)
)
.build()
}

private fun createSetOfClustersForGroup(
private fun createClusterWithLocalityWeightedConfigForGroup(
dependencySettings: DependencySettings,
cluster: Cluster
): Collection<Cluster> {
val mainCluster = createClusterForGroup(dependencySettings, cluster)
val secondaryCluster = createClusterForGroup(
dependencySettings,
cluster,
getSecondaryClusterName(cluster.name, properties)
)
val aggregateCluster =
createAggregateCluster(mainCluster.name, linkedSetOf(secondaryCluster.name, mainCluster.name))
return listOf(mainCluster, secondaryCluster, aggregateCluster)
.onEach {
logger.debug("Created set of cluster configs for traffic splitting: {}", it.toString())
): Cluster.Builder {
return createClusterForGroup(dependencySettings, cluster)
.setCommonLbConfig(
Cluster.CommonLbConfig.newBuilder()
.setLocalityWeightedLbConfig(Cluster.CommonLbConfig.LocalityWeightedLbConfig.getDefaultInstance())
.build()
)
.also {
logger.debug("Created cluster config for traffic splitting: {}", it.toString())
}
}

private fun createClusters(
private fun createCluster(
serviceName: String,
dependencySettings: DependencySettings,
cluster: Cluster?,
clusterLoadAssignment: ClusterLoadAssignment?
): Collection<Cluster> {
): Cluster? {
return cluster?.let {
if (enableTrafficSplitting(serviceName, clusterLoadAssignment)) {
createSetOfClustersForGroup(dependencySettings, cluster)
createClusterWithLocalityWeightedConfigForGroup(dependencySettings, cluster)
} else {
listOf(createClusterForGroup(dependencySettings, cluster))
}
} ?: listOf()
createClusterForGroup(dependencySettings, cluster)
}.build()
}
}

private fun enableTrafficSplitting(
serviceName: String,
clusterLoadAssignment: ClusterLoadAssignment?
): Boolean {
val trafficSplitting = properties.loadBalancing.trafficSplitting
val trafficSplitEnabled = trafficSplitting.serviceByWeightsProperties.containsKey(serviceName)
val trafficSplitEnabled = trafficSplitting.weightsByService.containsKey(serviceName)
return trafficSplitEnabled && hasEndpointsInZone(clusterLoadAssignment, trafficSplitting)
}

Expand Down Expand Up @@ -357,25 +343,6 @@ class EnvoyClustersFactory(
}
}

private fun createAggregateCluster(clusterName: String, aggregatedClusters: Collection<String>): Cluster {
return Cluster.newBuilder()
.setName(getAggregateClusterName(clusterName, properties))
.setConnectTimeout(Durations.fromMillis(properties.edsConnectionTimeout.toMillis()))
.setLbPolicy(Cluster.LbPolicy.CLUSTER_PROVIDED)
.setClusterType(
Cluster.CustomClusterType.newBuilder()
.setName("envoy.clusters.aggregate")
.setTypedConfig(
Any.pack(
EnvoyClusterConfig.newBuilder()
.addAllClusters(aggregatedClusters)
.build()
)
)
)
.build()
}

private fun strictDnsCluster(
domainDependency: DomainDependency,
useTransparentProxy: Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstances
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.RouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.WeightRouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.clusters.EnvoyClustersFactory.Companion.getSecondaryClusterName
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.ZoneWeights
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.ServiceTagMetadataGenerator

typealias EnvoyProxyLocality = io.envoyproxy.envoy.config.core.v3.Locality
Expand Down Expand Up @@ -77,25 +77,30 @@ class EnvoyEndpointsFactory(
}
}

fun getSecondaryClusterEndpoints(
clusterLoadAssignments: Map<String, ClusterLoadAssignment>,
egressRouteSpecifications: List<RouteSpecification>
): List<ClusterLoadAssignment> {
return egressRouteSpecifications
.filterIsInstance<WeightRouteSpecification>()
.onEach { logger.debug("Traffic splitting is enabled for cluster: ${it.clusterName}") }
.mapNotNull { routeSpec ->
clusterLoadAssignments[routeSpec.clusterName]?.let { assignment ->
ClusterLoadAssignment.newBuilder(assignment)
.clearEndpoints()
.addAllEndpoints(assignment.endpointsList?.filter { e ->
e.locality.zone == properties.loadBalancing.trafficSplitting.zoneName
})
.setClusterName(getSecondaryClusterName(routeSpec.clusterName, properties))
fun assignLocalityWeights(
routeSpec: RouteSpecification,
loadAssignment: ClusterLoadAssignment
): ClusterLoadAssignment {
return if (routeSpec is WeightRouteSpecification) {
ClusterLoadAssignment.newBuilder(loadAssignment)
.clearEndpoints()
.addAllEndpoints(assignWeights(loadAssignment.endpointsList, routeSpec.clusterWeights))
.setClusterName(routeSpec.clusterName)
.build()
} else loadAssignment
}

private fun assignWeights(
llbEndpointsList: List<LocalityLbEndpoints>, weights: ZoneWeights
): List<LocalityLbEndpoints> {
return llbEndpointsList
.map {
if (weights.weightByZone.containsKey(it.locality.zone)) {
LocalityLbEndpoints.newBuilder(it)
.setLoadBalancingWeight(UInt32Value.of(weights.weightByZone[it.locality.zone] ?: 0))
.build()
}
} else it
}
.filter { it.endpointsList.isNotEmpty() }
}

private fun filterEndpoints(loadAssignment: ClusterLoadAssignment, tag: String): ClusterLoadAssignment? {
Expand Down Expand Up @@ -148,14 +153,16 @@ class EnvoyEndpointsFactory(
serviceInstances: ServiceInstances?,
zone: String,
locality: Locality
): LocalityLbEndpoints =
LocalityLbEndpoints.newBuilder()
): LocalityLbEndpoints {
return LocalityLbEndpoints.newBuilder()
.setLocality(EnvoyProxyLocality.newBuilder().setZone(zone).build())
.addAllLbEndpoints(serviceInstances?.instances?.map {
createLbEndpoint(it, serviceInstances.serviceName, locality)
} ?: emptyList())
.addAllLbEndpoints(serviceInstances?.instances
?.map {
createLbEndpoint(it, serviceInstances.serviceName, locality)
} ?: emptyList())
.setPriority(toEnvoyPriority(zone, locality))
.build()
}

private fun createLbEndpoint(
serviceInstance: ServiceInstance,
Expand Down Expand Up @@ -258,8 +265,7 @@ class EnvoyEndpointsFactory(
false -> toEnvoyPriority(locality)
}.also {
logger.debug(
"Resolved lb priority to {} with zone={}, currentZone={}, priority props={}",
it, zone, currentZone, zonePriorities
"Resolved lb priority to {} with zone={}, priority props={}", it, zone, zonePriorities
)
}
}
Expand Down
Loading

0 comments on commit db5aeaf

Please sign in to comment.