Skip to content

Commit

Permalink
#564 Implemented adding a header for locality weighted load balancing
Browse files Browse the repository at this point in the history
  • Loading branch information
nastassia-dailidava committed Feb 7, 2024
1 parent db5aeaf commit 7d30d1f
Show file tree
Hide file tree
Showing 14 changed files with 218 additions and 44 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

Lists all changes with user impact.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
## [0.20.11]
### Changed
- Implemented adding a header for locality weighted load balancing

## [0.20.10]
### Changed
- Implemented locality weighted load balancing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class ControlPlane private constructor(
val snapshotsVersions = SnapshotsVersions()
val snapshotProperties = properties.envoy.snapshot
val envoySnapshotFactory = EnvoySnapshotFactory(
ingressRoutesFactory = EnvoyIngressRoutesFactory(snapshotProperties, envoyHttpFilters),
ingressRoutesFactory = EnvoyIngressRoutesFactory(snapshotProperties, envoyHttpFilters, currentZone),
egressRoutesFactory = EnvoyEgressRoutesFactory(snapshotProperties),
clustersFactory = EnvoyClustersFactory(snapshotProperties),
endpointsFactory = EnvoyEndpointsFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ class CanaryProperties {

class TrafficSplittingProperties {
var zoneName = ""
var headerName = ""
var weightsByService: Map<String, ZoneWeights> = mapOf()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ class EnvoyDefaultFilters(
snapshotProperties.routes.status,
jwtProperties = snapshotProperties.jwt
)
private val luaFilterFactory = LuaFilterFactory(
snapshotProperties.incomingPermissions
)
private val luaFilterFactory = LuaFilterFactory(snapshotProperties)
private val jwtFilterFactory = JwtFilterFactory(
snapshotProperties.jwt
)
Expand Down Expand Up @@ -68,6 +66,10 @@ class EnvoyDefaultFilters(
defaultHeaderToMetadataFilter, defaultServiceTagFilter, defaultEnvoyRouterHttpFilter
)

val defaultCurrentZoneHeaderFilter = { _: Group, _: GlobalSnapshot ->
luaFilterFactory.ingressCurrentZoneHeaderFilter()
}

/**
* Order matters:
* * defaultClientNameHeaderFilter has to be before defaultRbacLoggingFilter, because the latter consumes results of
Expand Down Expand Up @@ -101,7 +103,8 @@ class EnvoyDefaultFilters(
val preFilters = listOf(
defaultClientNameHeaderFilter,
defaultAuthorizationHeaderFilter,
defaultJwtHttpFilter
defaultJwtHttpFilter,
defaultCurrentZoneHeaderFilter
)
val postFilters = listOf(
defaultRbacLoggingFilter,
Expand All @@ -113,7 +116,9 @@ class EnvoyDefaultFilters(
return preFilters + filters.toList() + postFilters
}

val defaultIngressMetadata = { group: Group -> luaFilterFactory.ingressScriptsMetadata(group, customLuaMetadata) }
val defaultIngressMetadata = { group: Group, currentZone: String ->
luaFilterFactory.ingressScriptsMetadata(group, customLuaMetadata, currentZone)
}

private fun headerToMetadataConfig(
rules: List<Config.Rule>,
Expand Down Expand Up @@ -144,8 +149,12 @@ class EnvoyDefaultFilters(
private fun envoyRouterHttpFilter(): HttpFilter = HttpFilter
.newBuilder()
.setName("envoy.filters.http.router")
.setTypedConfig(Any.pack(Router.newBuilder()
.build()))
.setTypedConfig(
Any.pack(
Router.newBuilder()
.build()
)
)
.build()

private fun headerToMetadataHttpFilter(headerToMetadataConfig: Config.Builder): HttpFilter {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters

import io.envoyproxy.envoy.config.core.v3.Metadata
import pl.allegro.tech.servicemesh.envoycontrol.groups.Group
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.HttpFilterFactory
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.IngressMetadataFactory

class EnvoyHttpFilters(
val ingressFilters: List<HttpFilterFactory>,
val egressFilters: List<HttpFilterFactory>,
val ingressMetadata: IngressMetadataFactory = { _ -> Metadata.getDefaultInstance() }
val ingressMetadata: IngressMetadataFactory = { _: Group, _: String -> Metadata.getDefaultInstance() }
) {
companion object {
val emptyFilters = EnvoyHttpFilters(listOf(), listOf()) { Metadata.getDefaultInstance() }
val emptyFilters = EnvoyHttpFilters(listOf(), listOf()) { _, _ -> Metadata.getDefaultInstance() }

fun defaultFilters(
snapshotProperties: SnapshotProperties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ import io.envoyproxy.envoy.config.core.v3.Metadata
import io.envoyproxy.envoy.extensions.filters.http.lua.v3.Lua
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter
import pl.allegro.tech.servicemesh.envoycontrol.groups.Group
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.IncomingPermissionsProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.LuaMetadataProperty.ListPropertyLua
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.LuaMetadataProperty.StringPropertyLua
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.LuaMetadataProperty.StructPropertyLua

class LuaFilterFactory(private val incomingPermissionsProperties: IncomingPermissionsProperties) {
class LuaFilterFactory(private val snapshotProperties: SnapshotProperties) {

private val ingressRbacLoggingScript: String = this::class.java.classLoader
.getResource("lua/ingress_rbac_logging.lua")!!.readText()

private val ingressRbacLoggingFilter: HttpFilter? = if (incomingPermissionsProperties.enabled) {
private val ingressRbacLoggingFilter: HttpFilter? = if (snapshotProperties.incomingPermissions.enabled) {
HttpFilter.newBuilder()
.setName("envoy.lua")
.setTypedConfig(Any.pack(Lua.newBuilder().setInlineCode(ingressRbacLoggingScript).build()))
Expand All @@ -27,7 +27,7 @@ class LuaFilterFactory(private val incomingPermissionsProperties: IncomingPermis
null
}

private val trustedClientIdentityHeader = incomingPermissionsProperties.trustedClientIdentityHeader
private val trustedClientIdentityHeader = snapshotProperties.incomingPermissions.trustedClientIdentityHeader

fun ingressRbacLoggingFilter(group: Group): HttpFilter? =
ingressRbacLoggingFilter.takeIf { group.proxySettings.incoming.permissionsEnabled }
Expand All @@ -41,40 +41,61 @@ class LuaFilterFactory(private val incomingPermissionsProperties: IncomingPermis
.setTypedConfig(Any.pack(Lua.newBuilder().setInlineCode(ingressClientNameHeaderScript).build()))
.build()

private val sanUriWildcardRegexForLua = SanUriMatcherFactory(incomingPermissionsProperties.tlsAuthentication)
private val ingressCurrentZoneHeaderScript: String = this::class.java.classLoader
.getResource("lua/ingress_current_zone_header.lua")!!.readText()

private val ingressCurrentZoneHeaderFilter: HttpFilter =
HttpFilter.newBuilder()
.setName("ingress.zone.lua")
.setTypedConfig(Any.pack(Lua.newBuilder().setInlineCode(ingressCurrentZoneHeaderScript).build()))
.build()

private val sanUriWildcardRegexForLua = SanUriMatcherFactory(
snapshotProperties.incomingPermissions.tlsAuthentication
)
.sanUriWildcardRegexForLua

fun ingressScriptsMetadata(group: Group, customLuaMetadata: StructPropertyLua = StructPropertyLua()): Metadata {
fun ingressScriptsMetadata(
group: Group,
customLuaMetadata: StructPropertyLua = StructPropertyLua(),
currentZone: String
): Metadata {
val metadata = StructPropertyLua(
"client_identity_headers" to ListPropertyLua(
incomingPermissionsProperties
snapshotProperties.incomingPermissions
.clientIdentityHeaders.map(::StringPropertyLua)
),
"request_id_headers" to ListPropertyLua(
incomingPermissionsProperties.requestIdentificationHeaders.map(
snapshotProperties.incomingPermissions.requestIdentificationHeaders.map(
::StringPropertyLua
)
),
"trusted_client_identity_header" to StringPropertyLua(trustedClientIdentityHeader),
"san_uri_lua_pattern" to StringPropertyLua(sanUriWildcardRegexForLua),
"clients_allowed_to_all_endpoints" to ListPropertyLua(
incomingPermissionsProperties.clientsAllowedToAllEndpoints.map(
snapshotProperties.incomingPermissions.clientsAllowedToAllEndpoints.map(
::StringPropertyLua
)
),
"service_name" to StringPropertyLua(group.serviceName),
"discovery_service_name" to StringPropertyLua(group.discoveryServiceName ?: ""),
"rbac_headers_to_log" to ListPropertyLua(
incomingPermissionsProperties.headersToLogInRbac.map(::StringPropertyLua)
snapshotProperties.incomingPermissions.headersToLogInRbac.map(::StringPropertyLua)
),
"traffic_splitting_zone_header_name" to StringPropertyLua(
snapshotProperties.loadBalancing.trafficSplitting.headerName
),
) + customLuaMetadata
"current_zone" to StringPropertyLua(currentZone)
) + customLuaMetadata
return Metadata.newBuilder()
.putFilterMetadata("envoy.filters.http.lua", metadata.toValue().structValue)
.build()
}

fun ingressClientNameHeaderFilter(): HttpFilter? =
ingressClientNameHeaderFilter.takeIf { trustedClientIdentityHeader.isNotEmpty() }

fun ingressCurrentZoneHeaderFilter(): HttpFilter = ingressCurrentZoneHeaderFilter
}

sealed class LuaMetadataProperty<T>(open val value: T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.getRuleId
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.EnvoyHttpFilters

typealias IngressMetadataFactory = (node: Group) -> Metadata
typealias IngressMetadataFactory = (node: Group, currentZone: String) -> Metadata

class EnvoyIngressRoutesFactory(
private val properties: SnapshotProperties,
envoyHttpFilters: EnvoyHttpFilters = EnvoyHttpFilters.emptyFilters
envoyHttpFilters: EnvoyHttpFilters = EnvoyHttpFilters.emptyFilters,
private val currentZone: String
) {

private val allClients = setOf(
Expand Down Expand Up @@ -187,7 +188,7 @@ class EnvoyIngressRoutesFactory(
.setRoute(clusterRouteActionWithRetryPolicy(retryPolicy, localRouteAction))
}
return (retryRoutes + nonRetryRoute).map { builder ->
builder.setMetadata(filterMetadata(group)).build()
builder.setMetadata(filterMetadata(group, currentZone)).build()
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
function envoy_on_response(handle)
local traffic_splitting_zone_header_name = handle:metadata():get("traffic_splitting_zone_header_name") or ""
local current_zone = handle:metadata():get("current_zone") or ""
if traffic_splitting_zone_header_name == "" then
return
end
handle:headers():add(traffic_splitting_zone_header_name, current_zone)
end
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.EnvoyIn
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.ServiceTagMetadataGenerator
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.serviceDependencies
import pl.allegro.tech.servicemesh.envoycontrol.utils.CLUSTER_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.CURRENT_ZONE
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_CLUSTER_WEIGHTS
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_DISCOVERY_SERVICE_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_IDLE_TIMEOUT
Expand All @@ -54,6 +53,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.utils.zoneWeights
class EnvoySnapshotFactoryTest {
companion object {
const val SERVICE_NAME_2 = "service-name-2"
const val CURRENT_ZONE = "dc1"
}

@Test
Expand Down Expand Up @@ -453,7 +453,8 @@ class EnvoySnapshotFactoryTest {
SnapshotProperties(),
EnvoyHttpFilters(
emptyList(), emptyList()
) { Metadata.getDefaultInstance() }
) { _, _ -> Metadata.getDefaultInstance() },
CURRENT_ZONE
)
val egressRoutesFactory = EnvoyEgressRoutesFactory(properties)
val clustersFactory = EnvoyClustersFactory(properties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class SnapshotUpdaterTest {
)

private val uninitializedSnapshot = null
private const val CURRENT_ZONE = "dc1"
}

val groupWithProxy = AllServicesGroup(
Expand Down Expand Up @@ -1309,11 +1310,11 @@ class SnapshotUpdaterTest {

private fun snapshotFactory(snapshotProperties: SnapshotProperties, meterRegistry: MeterRegistry) =
EnvoySnapshotFactory(
ingressRoutesFactory = EnvoyIngressRoutesFactory(snapshotProperties),
ingressRoutesFactory = EnvoyIngressRoutesFactory(snapshotProperties, currentZone = CURRENT_ZONE),
egressRoutesFactory = EnvoyEgressRoutesFactory(snapshotProperties),
clustersFactory = EnvoyClustersFactory(snapshotProperties),
endpointsFactory = EnvoyEndpointsFactory(
snapshotProperties, ServiceTagMetadataGenerator(snapshotProperties.routing.serviceTags), "dc1"
snapshotProperties, ServiceTagMetadataGenerator(snapshotProperties.routing.serviceTags), CURRENT_ZONE
),
listenersFactory = EnvoyListenersFactory(
snapshotProperties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties

class EnvoyDefaultFiltersTest {

private val defaultFilters = EnvoyDefaultFilters(SnapshotProperties(), LuaMetadataProperty.StructPropertyLua())
private val defaultFilters = EnvoyDefaultFilters(
SnapshotProperties(),
LuaMetadataProperty.StructPropertyLua()
)

@Test
fun `should create default filters`() {
Expand All @@ -20,6 +23,7 @@ class EnvoyDefaultFiltersTest {
defaultFilters.defaultClientNameHeaderFilter,
defaultFilters.defaultAuthorizationHeaderFilter,
defaultFilters.defaultJwtHttpFilter,
defaultFilters.defaultCurrentZoneHeaderFilter,
defaultFilters.defaultRbacLoggingFilter,
defaultFilters.defaultRbacFilter,
defaultFilters.defaultRateLimitLuaFilter,
Expand Down Expand Up @@ -47,6 +51,7 @@ class EnvoyDefaultFiltersTest {
defaultFilters.defaultClientNameHeaderFilter,
defaultFilters.defaultAuthorizationHeaderFilter,
defaultFilters.defaultJwtHttpFilter,
defaultFilters.defaultCurrentZoneHeaderFilter,
customFilter,
defaultFilters.defaultRbacLoggingFilter,
defaultFilters.defaultRbacFilter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode
import pl.allegro.tech.servicemesh.envoycontrol.groups.Group
import pl.allegro.tech.servicemesh.envoycontrol.groups.ServicesGroup
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.IncomingPermissionsProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.LuaMetadataProperty.StructPropertyLua
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.LuaMetadataProperty.ListPropertyLua
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.LuaMetadataProperty.StringPropertyLua
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.LuaMetadataProperty.BooleanPropertyLua
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.LuaMetadataProperty.ListPropertyLua
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.LuaMetadataProperty.NumberPropertyLua
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.LuaMetadataProperty.StringPropertyLua
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.LuaMetadataProperty.StructPropertyLua

internal class LuaFilterFactoryTest {
val properties = SnapshotProperties().also { it.incomingPermissions = IncomingPermissionsProperties() }

@Test
fun `should create metadata with service name and discovery service name`() {
Expand All @@ -24,10 +26,10 @@ internal class LuaFilterFactoryTest {
serviceName = expectedServiceName,
discoveryServiceName = expectedDiscoveryServiceName
)
val factory = LuaFilterFactory(IncomingPermissionsProperties())
val factory = LuaFilterFactory(properties)

// when
val metadata = factory.ingressScriptsMetadata(group)
val metadata = factory.ingressScriptsMetadata(group, currentZone = "dc1")
val givenServiceName = metadata
.getFilterMetadataOrThrow("envoy.filters.http.lua")
.getFieldsOrThrow("service_name")
Expand Down Expand Up @@ -62,15 +64,36 @@ internal class LuaFilterFactoryTest {
serviceName = expectedServiceName,
discoveryServiceName = expectedDiscoveryServiceName
)
val factory = LuaFilterFactory(IncomingPermissionsProperties())
val factory = LuaFilterFactory(properties)

// when
val luaMetadata = factory.ingressScriptsMetadata(group, customMetadata)
val luaMetadata = factory.ingressScriptsMetadata(group, customMetadata, "dc1")
.getFilterMetadataOrThrow("envoy.filters.http.lua").fieldsMap

// then
assertThat(luaMetadata["flags"]).isEqualTo(customMetadata["flags"]?.toValue())
assertThat(luaMetadata["list-value"]).isEqualTo(customMetadata["list-value"]?.toValue())
assertThat(luaMetadata["count"]).isEqualTo(customMetadata["count"]?.toValue())
}

@Test
fun `should create metadata with current zone`() {
// given
val expectedServiceName = "service-1"
val expectedDiscoveryServiceName = "consul-service-1"
val expectedCurrentZone = "dc1"
val group: Group = ServicesGroup(
communicationMode = CommunicationMode.XDS,
serviceName = expectedServiceName,
discoveryServiceName = expectedDiscoveryServiceName
)
val factory = LuaFilterFactory(properties)

// when
val luaMetadata = factory.ingressScriptsMetadata(group, StructPropertyLua(), expectedCurrentZone)
.getFilterMetadataOrThrow("envoy.filters.http.lua").fieldsMap

// then
assertThat(luaMetadata["current_zone"]?.stringValue).isEqualTo(expectedCurrentZone)
}
}
Loading

0 comments on commit 7d30d1f

Please sign in to comment.