From 62234519ae6b4244b3649f6bb451c8ef99e7b5fe Mon Sep 17 00:00:00 2001 From: Alice Lilith Date: Thu, 30 Jan 2025 10:28:28 -0800 Subject: [PATCH] add cloud endpoint pooling support (#582) * add cloud endpoint pooling support Signed-off-by: Alice-Lilith * add endpoint pooling to service translation Signed-off-by: Alice-Lilith --------- Signed-off-by: Alice-Lilith --- api/ngrok/v1alpha1/cloudendpoint_types.go | 8 + go.mod | 2 +- .../ngrok.k8s.ngrok.com_cloudendpoints.yaml | 7 + internal/annotations/annotations.go | 17 ++ internal/annotations/annotations_test.go | 61 ++++ .../controller/ingress/service_controller.go | 12 +- .../ngrok/cloudendpoint_controller.go | 26 +- internal/ir/ir.go | 11 +- ...oint-ingress-default-backend-conflict.yaml | 135 +++++++++ .../endpoint-ingress-namespace-conflict.yaml | 119 ++++++++ .../endpoint-ingress-pooling-conflict.yaml | 122 ++++++++ ...dpoint-ingress-trafficpolicy-conflict.yaml | 167 ++++++++++ pkg/managerdriver/translate_ingresses.go | 286 +++++++++++------- pkg/managerdriver/translator.go | 8 +- pkg/managerdriver/translator_test.go | 217 ++++++++++++- 15 files changed, 1062 insertions(+), 136 deletions(-) create mode 100644 pkg/managerdriver/testdata/translator/endpoint-ingress-default-backend-conflict.yaml create mode 100644 pkg/managerdriver/testdata/translator/endpoint-ingress-namespace-conflict.yaml create mode 100644 pkg/managerdriver/testdata/translator/endpoint-ingress-pooling-conflict.yaml create mode 100644 pkg/managerdriver/testdata/translator/endpoint-ingress-trafficpolicy-conflict.yaml diff --git a/api/ngrok/v1alpha1/cloudendpoint_types.go b/api/ngrok/v1alpha1/cloudendpoint_types.go index 9c8147f6..6f9ec327 100644 --- a/api/ngrok/v1alpha1/cloudendpoint_types.go +++ b/api/ngrok/v1alpha1/cloudendpoint_types.go @@ -52,6 +52,14 @@ type CloudEndpointSpec struct { // +kubebuilder:validation:Optional TrafficPolicyName string `json:"trafficPolicyName,omitempty"` + // Controlls whether or not the Cloud Endpoint should allow pooling with other + // Cloud Endpoints sharing the same URL. When Cloud Endpoints are pooled, any requests + // going to the URL for the pooled endpoint will be distributed among all Cloud Endpoints + // in the pool. A URL can only be shared across multiple Cloud Endpoints if they all have pooling enabled. + // + // +kubebuilder:validation:Optional + PoolingEnabled bool `json:"poolingEnabled"` + // Allows inline definition of a TrafficPolicy object // // +kubebuilder:validation:Optional diff --git a/go.mod b/go.mod index 4b942979..02696e3b 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( golang.org/x/exp v0.0.0-20231006140011-7918f672742d golang.org/x/sync v0.10.0 google.golang.org/protobuf v1.36.4 + gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.29.2 k8s.io/apimachinery v0.29.2 k8s.io/client-go v0.29.2 @@ -97,7 +98,6 @@ require ( gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/appengine v1.6.8 // indirect gopkg.in/inf.v0 v0.9.1 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.29.2 // indirect k8s.io/component-base v0.29.2 // indirect diff --git a/helm/ngrok-operator/templates/crds/ngrok.k8s.ngrok.com_cloudendpoints.yaml b/helm/ngrok-operator/templates/crds/ngrok.k8s.ngrok.com_cloudendpoints.yaml index ee725260..9f05b47d 100644 --- a/helm/ngrok-operator/templates/crds/ngrok.k8s.ngrok.com_cloudendpoints.yaml +++ b/helm/ngrok-operator/templates/crds/ngrok.k8s.ngrok.com_cloudendpoints.yaml @@ -70,6 +70,13 @@ spec: description: String of arbitrary data associated with the object in the ngrok API/Dashboard type: string + poolingEnabled: + description: |- + Controlls whether or not the Cloud Endpoint should allow pooling with other + Cloud Endpoints sharing the same URL. When Cloud Endpoints are pooled, any requests + going to the URL for the pooled endpoint will be distributed among all Cloud Endpoints + in the pool. A URL can only be shared across multiple Cloud Endpoints if they all have pooling enabled. + type: boolean trafficPolicy: description: Allows inline definition of a TrafficPolicy object properties: diff --git a/internal/annotations/annotations.go b/internal/annotations/annotations.go index 7eb6396f..595e1c6e 100644 --- a/internal/annotations/annotations.go +++ b/internal/annotations/annotations.go @@ -43,6 +43,9 @@ const ( MappingStrategyAnnotationKey = "mapping-strategy" MappingStrategy_Endpoints = "endpoints" MappingStrategy_Edges = "edges" + + EndpointPoolingAnnotation = "k8s.ngrok.com/pooling-enabled" + EndpointPoolingAnnotationKey = "pooling-enabled" ) type RouteModules struct { @@ -148,3 +151,17 @@ func ExtractUseEndpoints(obj client.Object) (bool, error) { } return strings.EqualFold(val, MappingStrategy_Endpoints), nil } + +// Whether or not we should use endpoint pooling +// from the annotation "k8s.ngrok.com/pooling-enabled" if it is present. Otherwise, it defaults to false +func ExtractUseEndpointPooling(obj client.Object) (bool, error) { + val, err := parser.GetStringAnnotation(EndpointPoolingAnnotationKey, obj) + if err != nil { + if errors.IsMissingAnnotations(err) { + return false, nil + } + return false, err + } + + return strings.EqualFold(val, "true"), nil +} diff --git a/internal/annotations/annotations_test.go b/internal/annotations/annotations_test.go index 98319f2b..41963167 100644 --- a/internal/annotations/annotations_test.go +++ b/internal/annotations/annotations_test.go @@ -133,3 +133,64 @@ func TestExtractUseEndpoints(t *testing.T) { }) } } + +func TestExtractUseEndpointPooling(t *testing.T) { + tests := []struct { + name string + annotations map[string]string + expected bool + expectedErr error + }{ + { + name: "Pooling enabled", + annotations: map[string]string{ + "k8s.ngrok.com/pooling-enabled": "true", + }, + expected: true, + expectedErr: nil, + }, + { + name: "Pooling disabled", + annotations: map[string]string{ + "k8s.ngrok.com/pooling-enabled": "false", + }, + expected: false, + expectedErr: nil, + }, + { + name: "Invalid value", + annotations: map[string]string{ + "k8s.ngrok.com/pooling-enabled": "foo", + }, + expected: false, + expectedErr: nil, + }, + { + name: "Annotation not present", + annotations: nil, + expected: false, + expectedErr: nil, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + obj := &networking.Ingress{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-ingress", + Namespace: "default", + Annotations: tc.annotations, + }, + } + + useEndpoints, err := annotations.ExtractUseEndpointPooling(obj) + if tc.expectedErr != nil { + require.Error(t, err) + assert.Equal(t, tc.expectedErr, err) + } else { + require.NoError(t, err) + assert.Equal(t, tc.expected, useEndpoints) + } + }) + } +} diff --git a/internal/controller/ingress/service_controller.go b/internal/controller/ingress/service_controller.go index a980b7b9..d0e40253 100644 --- a/internal/controller/ingress/service_controller.go +++ b/internal/controller/ingress/service_controller.go @@ -403,6 +403,15 @@ func (r *ServiceReconciler) buildEndpoints(ctx context.Context, svc *corev1.Serv internalURL := fmt.Sprintf("tcp://%s.%s.%s.internal:%d", svc.UID, svc.Name, svc.Namespace, port) + // Get whether endpoint pooling should be enabled/disabled from annotations + useEndpointPooling, err := annotations.ExtractUseEndpointPooling(svc) + if err != nil { + log.Error(err, "failed to check endpoints-enabled annotation for service", + "service", fmt.Sprintf("%s.%s", svc.Name, svc.Namespace), + ) + return objects, err + } + // The final traffic policy that will be applied to the CloudEndpoint tp := trafficpolicy.NewTrafficPolicy() @@ -476,7 +485,8 @@ func (r *ServiceReconciler) buildEndpoints(ctx context.Context, svc *corev1.Serv }, }, Spec: ngrokv1alpha1.CloudEndpointSpec{ - URL: cloudEndpointURL, + URL: cloudEndpointURL, + PoolingEnabled: useEndpointPooling, TrafficPolicy: &ngrokv1alpha1.NgrokTrafficPolicySpec{ Policy: rawPolicy, }, diff --git a/internal/controller/ngrok/cloudendpoint_controller.go b/internal/controller/ngrok/cloudendpoint_controller.go index fe546b80..c6019fee 100644 --- a/internal/controller/ngrok/cloudendpoint_controller.go +++ b/internal/controller/ngrok/cloudendpoint_controller.go @@ -154,12 +154,13 @@ func (r *CloudEndpointReconciler) create(ctx context.Context, clep *ngrokv1alpha } createParams := &ngrok.EndpointCreate{ - Type: "cloud", - URL: clep.Spec.URL, - Description: &clep.Spec.Description, - Metadata: &clep.Spec.Metadata, - TrafficPolicy: policy, - Bindings: clep.Spec.Bindings, + Type: "cloud", + URL: clep.Spec.URL, + Description: &clep.Spec.Description, + Metadata: &clep.Spec.Metadata, + TrafficPolicy: policy, + Bindings: clep.Spec.Bindings, + PoolingEnabled: clep.Spec.PoolingEnabled, } ngrokClep, err := r.NgrokClientset.Endpoints().Create(ctx, createParams) @@ -184,12 +185,13 @@ func (r *CloudEndpointReconciler) update(ctx context.Context, clep *ngrokv1alpha } updateParams := &ngrok.EndpointUpdate{ - ID: clep.Status.ID, - Url: &clep.Spec.URL, - Description: &clep.Spec.Description, - Metadata: &clep.Spec.Metadata, - TrafficPolicy: &policy, - Bindings: clep.Spec.Bindings, + ID: clep.Status.ID, + Url: &clep.Spec.URL, + Description: &clep.Spec.Description, + Metadata: &clep.Spec.Metadata, + TrafficPolicy: &policy, + Bindings: clep.Spec.Bindings, + PoolingEnabled: clep.Spec.PoolingEnabled, } ngrokClep, err := r.NgrokClientset.Endpoints().Update(ctx, updateParams) diff --git a/internal/ir/ir.go b/internal/ir/ir.go index 654812ab..57b4bf1b 100644 --- a/internal/ir/ir.go +++ b/internal/ir/ir.go @@ -23,16 +23,19 @@ type IRHostname string type IRVirtualHost struct { // The names of any resources (such as Ingress) that were used in the construction of this IRVirtualHost // Currently only used for debug/error logs, but can be added to generated resource statuses - OwningResources []OwningResource - Hostname string + OwningResources []OwningResource + Hostname string + EndpointPoolingEnabled bool // Keeps track of the namespace for this hostname. Since we do not allow multiple endpoints with the same hostname, we cannot support multiple ingresses // using the same hostname in different namespaces. Namespace string // This traffic policy will apply to all routes under this hostname - TrafficPolicy *trafficpolicy.TrafficPolicy - Routes []*IRRoute + TrafficPolicy *trafficpolicy.TrafficPolicy + TrafficPolicyObj *OwningResource // Reference to the object that the above traffic policy config was loaded from + + Routes []*IRRoute // The following is used to support ingress default backends (currently only supported for endpoints and not edges) DefaultDestination *IRDestination diff --git a/pkg/managerdriver/testdata/translator/endpoint-ingress-default-backend-conflict.yaml b/pkg/managerdriver/testdata/translator/endpoint-ingress-default-backend-conflict.yaml new file mode 100644 index 00000000..2d00ecbd --- /dev/null +++ b/pkg/managerdriver/testdata/translator/endpoint-ingress-default-backend-conflict.yaml @@ -0,0 +1,135 @@ +# Ingresses with conflicting default backends +input: + ingressClasses: + - apiVersion: networking.k8s.io/v1 + kind: IngressClass + metadata: + labels: + app.kubernetes.io/component: controller + app.kubernetes.io/instance: ngrok-operator + app.kubernetes.io/name: ngrok-operator + app.kubernetes.io/part-of: ngrok-operator + name: ngrok + spec: + controller: k8s.ngrok.com/ingress-controller + ingresses: + - apiVersion: networking.k8s.io/v1 + kind: Ingress + metadata: + annotations: + k8s.ngrok.com/mapping-strategy: "endpoints" + name: test-ingress-1 + namespace: default + spec: + ingressClassName: ngrok + defaultBackend: + service: + name: test-service-1 + port: + number: 8080 + rules: + - host: test-ingresses.ngrok.io + http: + paths: + - path: /test-1 + pathType: Prefix + backend: + service: + name: test-service-1 + port: + number: 8080 + - apiVersion: networking.k8s.io/v1 + kind: Ingress + metadata: + annotations: + k8s.ngrok.com/mapping-strategy: "endpoints" + name: test-ingress-2 + namespace: default + spec: + ingressClassName: ngrok + defaultBackend: + service: + name: test-service-2 + port: + number: 8080 + rules: + - host: test-ingresses.ngrok.io + http: + paths: + - path: /test-2 + pathType: Prefix + backend: + service: + name: test-service-2 + port: + number: 8080 + services: + - apiVersion: v1 + kind: Service + metadata: + name: test-service-1 + namespace: default + spec: + ports: + - name: http + port: 8080 + protocol: TCP + targetPort: http + type: ClusterIP + - apiVersion: v1 + kind: Service + metadata: + name: test-service-2 + namespace: default + spec: + ports: + - name: http + port: 8080 + protocol: TCP + targetPort: http + type: ClusterIP + trafficPolicies: [] +expected: + # Generated cloud endpoint should have the routes and default backend from the first ingress, but + # the second ingress will not be processed due to the conflicting default destination + cloudEndpoints: + - apiVersion: ngrok.k8s.ngrok.com/v1alpha1 + kind: CloudEndpoint + metadata: + labels: + k8s.ngrok.com/controller-name: test-manager-name + k8s.ngrok.com/controller-namespace: test-manager-namespace + name: test-ingresses.ngrok.io + namespace: default + spec: + url: https://test-ingresses.ngrok.io + trafficPolicy: + policy: + on_http_request: + - name: Generated-Route-/test-1 + expressions: + - req.url.path.startsWith("/test-1") + actions: + - type: forward-internal + config: + url: https://e3b0c-test-service-1-default-8080.internal + + - name: Generated-Route-Default-Backend + actions: + - type: forward-internal + config: + url: https://e3b0c-test-service-1-default-8080.internal + agentEndpoints: + - apiVersion: ngrok.k8s.ngrok.com/v1alpha1 + kind: AgentEndpoint + metadata: + labels: + k8s.ngrok.com/controller-name: test-manager-name + k8s.ngrok.com/controller-namespace: test-manager-namespace + name: e3b0c-test-service-1-default-8080 + namespace: default + spec: + url: "https://e3b0c-test-service-1-default-8080.internal" + upstream: + url: "http://test-service-1.default:8080" + diff --git a/pkg/managerdriver/testdata/translator/endpoint-ingress-namespace-conflict.yaml b/pkg/managerdriver/testdata/translator/endpoint-ingress-namespace-conflict.yaml new file mode 100644 index 00000000..84c353cf --- /dev/null +++ b/pkg/managerdriver/testdata/translator/endpoint-ingress-namespace-conflict.yaml @@ -0,0 +1,119 @@ +# Ingresses with conflicting namespaces +input: + ingressClasses: + - apiVersion: networking.k8s.io/v1 + kind: IngressClass + metadata: + labels: + app.kubernetes.io/component: controller + app.kubernetes.io/instance: ngrok-operator + app.kubernetes.io/name: ngrok-operator + app.kubernetes.io/part-of: ngrok-operator + name: ngrok + spec: + controller: k8s.ngrok.com/ingress-controller + ingresses: + - apiVersion: networking.k8s.io/v1 + kind: Ingress + metadata: + annotations: + k8s.ngrok.com/mapping-strategy: "endpoints" + name: test-ingress-1 + namespace: aaa + spec: + ingressClassName: ngrok + rules: + - host: test-ingresses.ngrok.io + http: + paths: + - path: /test-1 + pathType: Prefix + backend: + service: + name: test-service-1 + port: + number: 8080 + - apiVersion: networking.k8s.io/v1 + kind: Ingress + metadata: + annotations: + k8s.ngrok.com/mapping-strategy: "endpoints" + name: test-ingress-2 + namespace: zzz + spec: + ingressClassName: ngrok + rules: + - host: test-ingresses.ngrok.io + http: + paths: + - path: /test-2 + pathType: Prefix + backend: + service: + name: test-service-2 + port: + number: 8080 + services: + - apiVersion: v1 + kind: Service + metadata: + name: test-service-1 + namespace: aaa + spec: + ports: + - name: http + port: 8080 + protocol: TCP + targetPort: http + type: ClusterIP + - apiVersion: v1 + kind: Service + metadata: + name: test-service-2 + namespace: zzz + spec: + ports: + - name: http + port: 8080 + protocol: TCP + targetPort: http + type: ClusterIP + trafficPolicies: [] +expected: + # Generated cloud endpoint should have the first traffic policy, but the second ingress will not be processed due to the + # traffic policy conflict + cloudEndpoints: + - apiVersion: ngrok.k8s.ngrok.com/v1alpha1 + kind: CloudEndpoint + metadata: + labels: + k8s.ngrok.com/controller-name: test-manager-name + k8s.ngrok.com/controller-namespace: test-manager-namespace + name: test-ingresses.ngrok.io + namespace: aaa + spec: + url: https://test-ingresses.ngrok.io + trafficPolicy: + policy: + on_http_request: + - name: Generated-Route-/test-1 + expressions: + - req.url.path.startsWith("/test-1") + actions: + - type: forward-internal + config: + url: https://e3b0c-test-service-1-aaa-8080.internal + agentEndpoints: + - apiVersion: ngrok.k8s.ngrok.com/v1alpha1 + kind: AgentEndpoint + metadata: + labels: + k8s.ngrok.com/controller-name: test-manager-name + k8s.ngrok.com/controller-namespace: test-manager-namespace + name: e3b0c-test-service-1-aaa-8080 + namespace: aaa + spec: + url: "https://e3b0c-test-service-1-aaa-8080.internal" + upstream: + url: "http://test-service-1.aaa:8080" + diff --git a/pkg/managerdriver/testdata/translator/endpoint-ingress-pooling-conflict.yaml b/pkg/managerdriver/testdata/translator/endpoint-ingress-pooling-conflict.yaml new file mode 100644 index 00000000..40894e09 --- /dev/null +++ b/pkg/managerdriver/testdata/translator/endpoint-ingress-pooling-conflict.yaml @@ -0,0 +1,122 @@ +# Ingresses with conflicting traffic policy annotations +input: + ingressClasses: + - apiVersion: networking.k8s.io/v1 + kind: IngressClass + metadata: + labels: + app.kubernetes.io/component: controller + app.kubernetes.io/instance: ngrok-operator + app.kubernetes.io/name: ngrok-operator + app.kubernetes.io/part-of: ngrok-operator + name: ngrok + spec: + controller: k8s.ngrok.com/ingress-controller + ingresses: + - apiVersion: networking.k8s.io/v1 + kind: Ingress + metadata: + annotations: + k8s.ngrok.com/pooling-enabled: "true" + k8s.ngrok.com/mapping-strategy: "endpoints" + name: test-ingress-1 + namespace: default + spec: + ingressClassName: ngrok + rules: + - host: test-ingresses.ngrok.io + http: + paths: + - path: /test-1 + pathType: Prefix + backend: + service: + name: test-service-1 + port: + number: 8080 + - apiVersion: networking.k8s.io/v1 + kind: Ingress + metadata: + annotations: + k8s.ngrok.com/pooling-enabled: "false" + k8s.ngrok.com/mapping-strategy: "endpoints" + name: test-ingress-2 + namespace: default + spec: + ingressClassName: ngrok + rules: + - host: test-ingresses.ngrok.io + http: + paths: + - path: /test-2 + pathType: Prefix + backend: + service: + name: test-service-2 + port: + number: 8080 + services: + - apiVersion: v1 + kind: Service + metadata: + name: test-service-1 + namespace: default + spec: + ports: + - name: http + port: 8080 + protocol: TCP + targetPort: http + type: ClusterIP + - apiVersion: v1 + kind: Service + metadata: + name: test-service-2 + namespace: default + spec: + ports: + - name: http + port: 8080 + protocol: TCP + targetPort: http + type: ClusterIP + trafficPolicies: [] +expected: + # Generated cloud endpoint should have the first traffic policy, but the second ingress will not be processed due to the + # pooling support conflict + cloudEndpoints: + - apiVersion: ngrok.k8s.ngrok.com/v1alpha1 + kind: CloudEndpoint + metadata: + labels: + k8s.ngrok.com/controller-name: test-manager-name + k8s.ngrok.com/controller-namespace: test-manager-namespace + name: test-ingresses.ngrok.io + namespace: default + spec: + url: https://test-ingresses.ngrok.io + poolingEnabled: true + trafficPolicy: + policy: + on_http_request: + - name: Generated-Route-/test-1 + expressions: + - req.url.path.startsWith("/test-1") + actions: + - type: forward-internal + config: + url: https://e3b0c-test-service-1-default-8080.internal + agentEndpoints: + - apiVersion: ngrok.k8s.ngrok.com/v1alpha1 + kind: AgentEndpoint + metadata: + labels: + k8s.ngrok.com/controller-name: test-manager-name + k8s.ngrok.com/controller-namespace: test-manager-namespace + name: e3b0c-test-service-1-default-8080 + namespace: default + spec: + url: "https://e3b0c-test-service-1-default-8080.internal" + upstream: + url: "http://test-service-1.default:8080" + diff --git a/pkg/managerdriver/testdata/translator/endpoint-ingress-trafficpolicy-conflict.yaml b/pkg/managerdriver/testdata/translator/endpoint-ingress-trafficpolicy-conflict.yaml new file mode 100644 index 00000000..595fd33b --- /dev/null +++ b/pkg/managerdriver/testdata/translator/endpoint-ingress-trafficpolicy-conflict.yaml @@ -0,0 +1,167 @@ +# Ingresses with conflicting traffic policy annotations +input: + ingressClasses: + - apiVersion: networking.k8s.io/v1 + kind: IngressClass + metadata: + labels: + app.kubernetes.io/component: controller + app.kubernetes.io/instance: ngrok-operator + app.kubernetes.io/name: ngrok-operator + app.kubernetes.io/part-of: ngrok-operator + name: ngrok + spec: + controller: k8s.ngrok.com/ingress-controller + ingresses: + - apiVersion: networking.k8s.io/v1 + kind: Ingress + metadata: + annotations: + k8s.ngrok.com/traffic-policy: response-503 + k8s.ngrok.com/mapping-strategy: "endpoints" + name: test-ingress-1 + namespace: default + spec: + ingressClassName: ngrok + rules: + - host: test-ingresses.ngrok.io + http: + paths: + - path: /test-1 + pathType: Prefix + backend: + service: + name: test-service-1 + port: + number: 8080 + - apiVersion: networking.k8s.io/v1 + kind: Ingress + metadata: + annotations: + k8s.ngrok.com/traffic-policy: response-404 + k8s.ngrok.com/mapping-strategy: "endpoints" + name: test-ingress-2 + namespace: default + spec: + ingressClassName: ngrok + rules: + - host: test-ingresses.ngrok.io + http: + paths: + - path: /test-2 + pathType: Prefix + backend: + service: + name: test-service-2 + port: + number: 8080 + services: + - apiVersion: v1 + kind: Service + metadata: + name: test-service-1 + namespace: default + spec: + ports: + - name: http + port: 8080 + protocol: TCP + targetPort: http + type: ClusterIP + - apiVersion: v1 + kind: Service + metadata: + name: test-service-2 + namespace: default + spec: + ports: + - name: http + port: 8080 + protocol: TCP + targetPort: http + type: ClusterIP + trafficPolicies: + - apiVersion: ngrok.k8s.ngrok.com/v1alpha1 + kind: NgrokTrafficPolicy + metadata: + name: response-503 + namespace: default + spec: + policy: + on_http_request: + - name: response-503 + expressions: + - req.url.path.startsWith("/foo") + actions: + - type: custom-response + config: + status_code: 503 + content: "Service is temporarily unavailable" + headers: + content-type: text/plain + - apiVersion: ngrok.k8s.ngrok.com/v1alpha1 + kind: NgrokTrafficPolicy + metadata: + name: response-404 + namespace: default + spec: + policy: + on_http_request: + - name: response-404 + expressions: + - req.url.path.startsWith("/foo") + actions: + - type: custom-response + config: + status_code: 404 + content: "Not found" + headers: + content-type: text/plain +expected: + # Generated cloud endpoint should have the first traffic policy, but the second ingress will not be processed due to the + # traffic policy conflict + cloudEndpoints: + - apiVersion: ngrok.k8s.ngrok.com/v1alpha1 + kind: CloudEndpoint + metadata: + labels: + k8s.ngrok.com/controller-name: test-manager-name + k8s.ngrok.com/controller-namespace: test-manager-namespace + name: test-ingresses.ngrok.io + namespace: default + spec: + url: https://test-ingresses.ngrok.io + trafficPolicy: + policy: + on_http_request: + - name: response-503 + expressions: + - req.url.path.startsWith("/foo") + actions: + - type: custom-response + config: + status_code: 503 + content: "Service is temporarily unavailable" + headers: + content-type: text/plain + - name: Generated-Route-/test-1 + expressions: + - req.url.path.startsWith("/test-1") + actions: + - type: forward-internal + config: + url: https://e3b0c-test-service-1-default-8080.internal + agentEndpoints: + - apiVersion: ngrok.k8s.ngrok.com/v1alpha1 + kind: AgentEndpoint + metadata: + labels: + k8s.ngrok.com/controller-name: test-manager-name + k8s.ngrok.com/controller-namespace: test-manager-namespace + name: e3b0c-test-service-1-default-8080 + namespace: default + spec: + url: "https://e3b0c-test-service-1-default-8080.internal" + upstream: + url: "http://test-service-1.default:8080" + diff --git a/pkg/managerdriver/translate_ingresses.go b/pkg/managerdriver/translate_ingresses.go index 8d84ef9b..1f33a105 100644 --- a/pkg/managerdriver/translate_ingresses.go +++ b/pkg/managerdriver/translate_ingresses.go @@ -6,8 +6,11 @@ import ( "reflect" "strings" + "github.com/go-logr/logr" "github.com/ngrok/ngrok-operator/internal/annotations" + "github.com/ngrok/ngrok-operator/internal/errors" "github.com/ngrok/ngrok-operator/internal/ir" + "github.com/ngrok/ngrok-operator/internal/store" "github.com/ngrok/ngrok-operator/internal/trafficpolicy" netv1 "k8s.io/api/networking/v1" ) @@ -17,11 +20,6 @@ func (t *translator) ingressesToIR() []*ir.IRVirtualHost { hostCache := make(map[ir.IRHostname]*ir.IRVirtualHost) // Each unique hostname corresponds to one IRVirtualHost upstreamCache := make(map[ir.IRService]*ir.IRUpstream) // Each unique service/port combo corresponds to one IRUpstream - // The following two maps keep track of traffic policy annotations and ingress backends for hostnames - // so that we can handle the case where two ingresses bringing different ones for the same hostname as an error - hostnameDefaultDestinations := make(map[ir.IRHostname]*ir.IRDestination) - hostnameAnnotationPolicies := make(map[ir.IRHostname]*trafficpolicy.TrafficPolicy) - ingresses := t.store.ListNgrokIngressesV1() for _, ingress := range ingresses { // We currently require this annotation to be present for an Ingress to be translated into CloudEndpoints/AgentEndpoints, otherwise the default behaviour is to @@ -39,67 +37,30 @@ func (t *translator) ingressesToIR() []*ir.IRVirtualHost { continue } - // We don't support modulesets on endpoints or currently support converting a moduleset to a traffic policy, but still try to allow - // a moduleset that supplies a traffic policy with an error log to let users know that any other moduleset fields will be ignored - ingressModuleSet, err := getNgrokModuleSetForIngress(ingress, t.store) + useEndpointPooling, err := annotations.ExtractUseEndpointPooling(ingress) if err != nil { - t.log.Error(err, "error getting ngrok moduleset for ingress", "ingress", ingress) - continue + t.log.Error(err, fmt.Sprintf("failed to check %q annotation", annotations.MappingStrategyAnnotation)) } - - // We always get back a moduleset from the above function, check if it is empty or not - if modules := ingressModuleSet.Modules; modules.CircuitBreaker != nil || - modules.Compression != nil || - modules.Headers != nil || - modules.IPRestriction != nil || - modules.OAuth != nil || - modules.Policy != nil || - modules.OIDC != nil || - modules.SAML != nil || - modules.TLSTermination != nil || - modules.MutualTLS != nil || - modules.WebhookVerification != nil { - if useEndpoints { - t.log.Error(fmt.Errorf("ngrok moduleset supplied to ingress with annotation to use endpoints instead of edges"), "ngrok moduleset are not supported on endpoints. prefer using a traffic policy directly. any fields other than supplying a traffic policy using the module set will be ignored", - "ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace), - ) - } + if useEndpointPooling { + t.log.Info(fmt.Sprintf("the following ingress will create endpoint(s) with pooling enabled because of the %q annotation", + annotations.MappingStrategyAnnotation), + "ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace), + ) } - ingressTrafficPolicyCfg, err := getNgrokTrafficPolicyForIngress(ingress, t.store) + annotationTrafficPolicy, tpObjRef, err := trafficPolicyFromIngressAnnotation(t.store, ingress) if err != nil { t.log.Error(err, "error getting ngrok traffic policy for ingress", - "ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace), - ) + "ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace)) continue } - var ingressTrafficPolicy *trafficpolicy.TrafficPolicy - switch { - case ingressTrafficPolicyCfg != nil: - tmp := &trafficpolicy.TrafficPolicy{} - if err := json.Unmarshal(ingressTrafficPolicyCfg.Spec.Policy, tmp); err != nil { - t.log.Error(err, "failed to unmarshal traffic policy", - "ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace), - "policy", ingressTrafficPolicyCfg.Spec.Policy, - ) - continue - } - ingressTrafficPolicy = tmp - case ingressModuleSet.Modules.Policy != nil: - tpJSON, err := json.Marshal(ingressModuleSet.Modules.Policy) + // If we don't have a native traffic policy from annotations, see if one was provided from a moduleset annotation + if annotationTrafficPolicy == nil { + annotationTrafficPolicy, tpObjRef, err = trafficPolicyFromIngressModSetAnnotation(t.log, t.store, ingress, useEndpoints) if err != nil { - t.log.Error(err, "cannot convert module-set policy json", - "ingress", ingress, - "policy", ingressModuleSet.Modules.Policy, - ) - continue - } - if err := json.Unmarshal(tpJSON, ingressTrafficPolicy); err != nil { - t.log.Error(err, "failed to unmarshal traffic policy from module set", - "ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace), - "policy", ingressModuleSet.Modules.Policy, - ) + t.log.Error(err, "error getting ngrok traffic policy for ingress", + "ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace)) continue } } @@ -117,12 +78,12 @@ func (t *translator) ingressesToIR() []*ir.IRVirtualHost { t.ingressToIR( ingress, - ingressTrafficPolicy, defaultDestination, hostCache, upstreamCache, - hostnameDefaultDestinations, - hostnameAnnotationPolicies, + useEndpointPooling, + annotationTrafficPolicy, + tpObjRef, ) } @@ -137,12 +98,12 @@ func (t *translator) ingressesToIR() []*ir.IRVirtualHost { // ingressToIR translates a single ingress into IR and stores entries in the cache. Caches are used so that we do not generate duplicate IR for hostnames/services func (t *translator) ingressToIR( ingress *netv1.Ingress, - ingressTP *trafficpolicy.TrafficPolicy, defaultDestination *ir.IRDestination, hostCache map[ir.IRHostname]*ir.IRVirtualHost, upstreamCache map[ir.IRService]*ir.IRUpstream, - hostnameDefaultDestinations map[ir.IRHostname]*ir.IRDestination, - hostnameAnnotationPolicies map[ir.IRHostname]*trafficpolicy.TrafficPolicy, + endpointPoolingEnabled bool, + annotationTrafficPolicy *trafficpolicy.TrafficPolicy, + annotationTrafficPolicyRef *ir.OwningResource, ) { for _, rule := range ingress.Spec.Rules { ruleHostname := rule.Host @@ -154,48 +115,6 @@ func (t *translator) ingressToIR( continue } - // Check for clashing default backends and annotation traffic policies for this hostname - if defaultDestination != nil { - if current, exists := hostnameDefaultDestinations[ir.IRHostname(ruleHostname)]; exists { - if !reflect.DeepEqual(current, defaultDestination) { - t.log.Error(fmt.Errorf("different ingress default backends provided for the same hostname"), - "when using the same hostname across multiple ingresses, ensure that they do not use different default backends. the existing default backend for the hostname will not be overwritten", - "current ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace), - "hostname", ruleHostname, - ) - defaultDestination = current - } - } - hostnameDefaultDestinations[ir.IRHostname(ruleHostname)] = defaultDestination - } - if ingressTP != nil { - if current, exists := hostnameAnnotationPolicies[ir.IRHostname(ruleHostname)]; exists { - if !reflect.DeepEqual(current, ingressTP) { - t.log.Error(fmt.Errorf("different traffic policy annotations provided for the same hostname"), - "when using the same hostname across multiple ingresses, ensure that they do not use different traffic policies provided via annotations. the existing traffic policy for the hostname will not be overwitten", - "current ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace), - "hostname", ruleHostname, - ) - ingressTP = current - } - } else { - hostnameAnnotationPolicies[ir.IRHostname(ruleHostname)] = ingressTP - } - } - - // Make a deep copy of the traffic policy so that we don't taint it for subsequent rules - var ruleTrafficPolicy *trafficpolicy.TrafficPolicy - if ingressTP != nil { - var err error - ruleTrafficPolicy, err = ingressTP.DeepCopy() - if err != nil { - t.log.Error(err, "failed to copy traffic policy from ingress", - "ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace), - ) - continue - } - } - // Make a new IRVirtualHost for this hostname unless we have one in the cache owningResource := ir.OwningResource{ Kind: "Ingress", @@ -203,25 +122,73 @@ func (t *translator) ingressToIR( Namespace: ingress.Namespace, } irVHost, exists := hostCache[ir.IRHostname(ruleHostname)] - if !exists { - irVHost = &ir.IRVirtualHost{ - Namespace: ingress.Namespace, - OwningResources: []ir.OwningResource{owningResource}, - Hostname: ruleHostname, - TrafficPolicy: ruleTrafficPolicy, - Routes: []*ir.IRRoute{}, - DefaultDestination: defaultDestination, + if exists { + // If we already have a virtual host for this hostname, the traffic policy config must be the same as the one we are currently processing + if !reflect.DeepEqual(irVHost.TrafficPolicyObj, annotationTrafficPolicyRef) { + t.log.Error(fmt.Errorf("different traffic policy annotations provided for the same hostname"), + "when using the same hostname across multiple ingresses, ensure that they do not use different traffic policies provided via annotations", + "current ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace), + "hostname", ruleHostname, + ) + continue } - hostCache[ir.IRHostname(ruleHostname)] = irVHost - } else { + // They must have the same configuration for whether or not to pool endpoints + if irVHost.EndpointPoolingEnabled != endpointPoolingEnabled { + t.log.Error(fmt.Errorf("different endpoint pooling annotations provided for the same hostname"), + "when using the same hostname across multiple ingresses, ensure that they all enable or all disable endpoint pooling", + "current ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace), + "hostname", ruleHostname, + ) + continue + } + + // They must share the same namespace if irVHost.Namespace != ingress.Namespace { t.log.Error(fmt.Errorf("unable to convert ingress rule into cloud and agent endpoints. the domain (%q) is already being used by another ingress in a different namespace. you will need to either consolidate them, ensure they are in the same namespace, or use a different domain for one of them", ruleHostname), "ingress to endpoint conversion error", "ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace), + "namespace the hostname is already in-use in", irVHost.Namespace, + ) + continue + } + + // They must have the same default backend + if !reflect.DeepEqual(irVHost.DefaultDestination, defaultDestination) { + t.log.Error(fmt.Errorf("different ingress default backends provided for the same hostname"), + "when using the same hostname across multiple ingresses, ensure that they do not use different default backends. the existing default backend for the hostname will not be overwritten", + "current ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace), + "hostname", ruleHostname, ) continue } + + // The current and existing configurations match, add the new owning ingress reference and keep going irVHost.AddOwningResource(owningResource) + } else { + // Make a deep copy of the ingress traffic policy so that we don't taint it for subsequent rules + var ruleTrafficPolicy *trafficpolicy.TrafficPolicy + if annotationTrafficPolicy != nil { + var err error + ruleTrafficPolicy, err = annotationTrafficPolicy.DeepCopy() + if err != nil { + t.log.Error(err, "failed to copy traffic policy from ingress", + "ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace), + ) + continue + } + } + + irVHost = &ir.IRVirtualHost{ + Namespace: ingress.Namespace, + OwningResources: []ir.OwningResource{owningResource}, + Hostname: ruleHostname, + TrafficPolicy: ruleTrafficPolicy, + TrafficPolicyObj: annotationTrafficPolicyRef, + Routes: []*ir.IRRoute{}, + DefaultDestination: defaultDestination, + EndpointPoolingEnabled: endpointPoolingEnabled, + } + hostCache[ir.IRHostname(ruleHostname)] = irVHost } if rule.HTTP == nil { @@ -338,3 +305,94 @@ func (t *translator) ingressBackendToIR(ingress *netv1.Ingress, backend *netv1.I Upstream: upstream, }, nil } + +func trafficPolicyFromIngressAnnotation(store store.Storer, ingress *netv1.Ingress) (tp *trafficpolicy.TrafficPolicy, objRef *ir.OwningResource, err error) { + tpName, err := annotations.ExtractNgrokTrafficPolicyFromAnnotations(ingress) + if err != nil { + if errors.IsMissingAnnotations(err) { + return nil, nil, nil + } + return nil, nil, fmt.Errorf("error getting ngrok traffic policy for ingress %q: %w", + fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace), + err, + ) + } + + tpObj, err := store.GetNgrokTrafficPolicyV1(tpName, ingress.Namespace) + if err != nil { + return nil, nil, fmt.Errorf("unable to load traffic policy for ingress from annotations. name: %q, namespace: %q: %w", + tpName, + ingress.Namespace, + err, + ) + } + + trafficPolicyCfg := &trafficpolicy.TrafficPolicy{} + if err := json.Unmarshal(tpObj.Spec.Policy, trafficPolicyCfg); err != nil { + return nil, nil, fmt.Errorf("%w, failed to unmarshal traffic policy for ingress %q, traffic policy config: %v", + err, + fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace), + tpObj.Spec.Policy, + ) + } + return trafficPolicyCfg, &ir.OwningResource{ + Kind: "NgrokTrafficPolicy", + Name: tpObj.Name, + Namespace: tpObj.Namespace, + }, nil +} + +func trafficPolicyFromIngressModSetAnnotation(log logr.Logger, store store.Storer, ingress *netv1.Ingress, useEndpoints bool) (tp *trafficpolicy.TrafficPolicy, objRef *ir.OwningResource, err error) { + // We don't support modulesets on endpoints or currently support converting a moduleset to a traffic policy, but still try to allow + // a moduleset that supplies a traffic policy with an error log to let users know that any other moduleset fields will be ignored + ingressModuleSet, err := getNgrokModuleSetForIngress(ingress, store) + if err != nil { + return nil, nil, err + } + + // We always get back a moduleset from the above function, check if it is empty or not + if modules := ingressModuleSet.Modules; modules.CircuitBreaker != nil || + modules.Compression != nil || + modules.Headers != nil || + modules.IPRestriction != nil || + modules.OAuth != nil || + modules.Policy != nil || + modules.OIDC != nil || + modules.SAML != nil || + modules.TLSTermination != nil || + modules.MutualTLS != nil || + modules.WebhookVerification != nil { + if useEndpoints { + log.Error(fmt.Errorf("ngrok moduleset supplied to ingress with annotation to use endpoints instead of edges"), "ngrok moduleset are not supported on endpoints. prefer using a traffic policy directly. any fields other than supplying a traffic policy using the module set will be ignored", + "ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace), + ) + } + } + + if ingressModuleSet.Modules.Policy == nil { + return nil, nil, nil + } + + tpJSON, err := json.Marshal(ingressModuleSet.Modules.Policy) + if err != nil { + return nil, nil, fmt.Errorf("%w: cannot convert module-set policy json for ingress %q, moduleset policy: %v", + err, + fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace), + ingressModuleSet.Modules.Policy, + ) + } + var ingressTrafficPolicy *trafficpolicy.TrafficPolicy + if err := json.Unmarshal(tpJSON, ingressTrafficPolicy); err != nil { + return nil, nil, fmt.Errorf("%w: failed to unmarshal traffic policy from module set for ingress %q, moduleset policy: %v", + err, + fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace), + ingressModuleSet.Modules.Policy, + ) + } + + return ingressTrafficPolicy, &ir.OwningResource{ + Kind: "NgrokModuleSet", + Name: ingressModuleSet.Name, + Namespace: ingressModuleSet.Namespace, + }, nil +} diff --git a/pkg/managerdriver/translator.go b/pkg/managerdriver/translator.go index d8d3ec6d..252b996b 100644 --- a/pkg/managerdriver/translator.go +++ b/pkg/managerdriver/translator.go @@ -79,6 +79,7 @@ func (t *translator) IRToEndpoints(irVHosts []*ir.IRVirtualHost) (parents map[ty irVHost.Namespace, irVHost.Hostname, t.managedResourceLabels, + irVHost.EndpointPoolingEnabled, t.defaultIngressMetadata, ) @@ -244,7 +245,7 @@ func (t *translator) injectEndpointDefaultDestinationTPConfig(parentPolicy *traf } // buildCloudEndpoint initializes a new CloudEndpoint -func buildCloudEndpoint(namespace, hostname string, labels map[string]string, metadata string) *ngrokv1alpha1.CloudEndpoint { +func buildCloudEndpoint(namespace, hostname string, labels map[string]string, endpointPoolingEnabled bool, metadata string) *ngrokv1alpha1.CloudEndpoint { return &ngrokv1alpha1.CloudEndpoint{ ObjectMeta: metav1.ObjectMeta{ Name: sanitizeStringForK8sName(hostname), @@ -252,8 +253,9 @@ func buildCloudEndpoint(namespace, hostname string, labels map[string]string, me Labels: labels, }, Spec: ngrokv1alpha1.CloudEndpointSpec{ - URL: "https://" + hostname, - Metadata: metadata, + URL: "https://" + hostname, + PoolingEnabled: endpointPoolingEnabled, + Metadata: metadata, }, } } diff --git a/pkg/managerdriver/translator_test.go b/pkg/managerdriver/translator_test.go index f86fd416..05da58f0 100644 --- a/pkg/managerdriver/translator_test.go +++ b/pkg/managerdriver/translator_test.go @@ -3,6 +3,9 @@ package managerdriver import ( "context" "encoding/json" + "fmt" + "os" + "path/filepath" "testing" "github.com/go-logr/logr" @@ -14,10 +17,13 @@ import ( "github.com/ngrok/ngrok-operator/internal/trafficpolicy" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" + corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" netv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" @@ -87,6 +93,7 @@ func TestBuildCloudEndpoint(t *testing.T) { namespace string hostname string labels map[string]string + poolingEnabled bool metadata string expectedName string expectedLabels map[string]string @@ -112,12 +119,23 @@ func TestBuildCloudEndpoint(t *testing.T) { expectedLabels: map[string]string{"env": "prod"}, expectedMeta: "prod-metadata", }, + { + name: "Pooling enabled", + namespace: "default", + hostname: "cloud-host", + labels: map[string]string{"app": "cloud"}, + metadata: "test-metadata", + expectedName: "cloud-host", + expectedLabels: map[string]string{"app": "cloud"}, + poolingEnabled: true, + expectedMeta: "test-metadata", + }, } for _, tc := range testCases { tc := tc t.Run(tc.name, func(t *testing.T) { - result := buildCloudEndpoint(tc.namespace, tc.hostname, tc.labels, tc.metadata) + result := buildCloudEndpoint(tc.namespace, tc.hostname, tc.labels, tc.poolingEnabled, tc.metadata) assert.Equal(t, tc.expectedName, result.Name, "unexpected name for test case: %s", tc.name) assert.Equal(t, tc.namespace, result.Namespace, "unexpected namespace for test case: %s", tc.name) assert.Equal(t, tc.expectedLabels, result.Labels, "unexpected labels for test case: %s", tc.name) @@ -1180,3 +1198,200 @@ func TestTranslate(t *testing.T) { } assert.Equal(t, expectedCLEP2Policy, clep2TP) } + +func TestTranslateIngresses(t *testing.T) { + testdataDir := "testdata/translator" + + // RawTestCase facuilitates the initial loading of test input/expected objects, but k8s objects with embedded structs don't parse cleanly + // with regular yaml marshalling so we need to be a little creative about how we process them. + type RawTestCase struct { + Input struct { + IngressClasses []map[string]interface{} `yaml:"ingressClasses"` + Ingresses []map[string]interface{} `yaml:"ingresses"` + Services []map[string]interface{} `yaml:"services"` + TrafficPolicies []map[string]interface{} `yaml:"trafficPolicies"` + } `yaml:"input"` + + Expected struct { + CloudEndpoints []map[string]interface{} `yaml:"cloudEndpoints"` + AgentEndpoints []map[string]interface{} `yaml:"agentEndpoints"` + } `yaml:"expected"` + } + + // TestCase stores our actual fully parsed inputs/outputs + type TestCase struct { + Input struct { + IngressClasses []*netv1.IngressClass + Ingresses []*netv1.Ingress + Services []*corev1.Service + TrafficPolicies []*ngrokv1alpha1.NgrokTrafficPolicy + } + + Expected struct { + CloudEndpoints []*ngrokv1alpha1.CloudEndpoint + AgentEndpoints []*ngrokv1alpha1.AgentEndpoint + } + } + + // Create a scheme with all supported types + sch := runtime.NewScheme() + utilruntime.Must(clientgoscheme.AddToScheme(sch)) + utilruntime.Must(ingressv1alpha1.AddToScheme(sch)) + utilruntime.Must(corev1.AddToScheme(sch)) + utilruntime.Must(ngrokv1alpha1.AddToScheme(sch)) + + // Load test files from the testdata directory + files, err := filepath.Glob(filepath.Join(testdataDir, "*.yaml")) + require.NoError(t, err, "failed to read test files in %s", testdataDir) + + for _, file := range files { + t.Run(filepath.Base(file), func(t *testing.T) { + data, err := os.ReadFile(file) + require.NoError(t, err, "failed to read file: %s", file) + + // Load into the RawTestCase + rawTC := new(RawTestCase) + require.NoError(t, yaml.UnmarshalStrict(data, rawTC), "failed to unmarshal raw testCase") + + // Use scheme based decoding to properly parse everything into TestCase + tc := TestCase{} + + // Decode input objects + for _, rawObj := range rawTC.Input.IngressClasses { + obj, err := decodeViaScheme(sch, rawObj) + require.NoError(t, err) + ingClass, ok := obj.(*netv1.IngressClass) + require.True(t, ok, "expected an IngressClass, got %T", obj) + tc.Input.IngressClasses = append(tc.Input.IngressClasses, ingClass) + } + for _, rawObj := range rawTC.Input.Ingresses { + obj, err := decodeViaScheme(sch, rawObj) + require.NoError(t, err) + ing, ok := obj.(*netv1.Ingress) + require.True(t, ok, "expected an Ingress, got %T", obj) + tc.Input.Ingresses = append(tc.Input.Ingresses, ing) + } + for _, rawObj := range rawTC.Input.Services { + obj, err := decodeViaScheme(sch, rawObj) + require.NoError(t, err) + svc, ok := obj.(*corev1.Service) + require.True(t, ok, "expected a Service, got %T", obj) + tc.Input.Services = append(tc.Input.Services, svc) + } + for _, rawObj := range rawTC.Input.TrafficPolicies { + obj, err := decodeViaScheme(sch, rawObj) + require.NoError(t, err) + pol, ok := obj.(*ngrokv1alpha1.NgrokTrafficPolicy) + require.True(t, ok, "expected a NgrokTrafficPolicy, got %T", obj) + tc.Input.TrafficPolicies = append(tc.Input.TrafficPolicies, pol) + } + + // Decode expected objects + for _, rawObj := range rawTC.Expected.CloudEndpoints { + obj, err := decodeViaScheme(sch, rawObj) + require.NoError(t, err) + ce, ok := obj.(*ngrokv1alpha1.CloudEndpoint) + require.True(t, ok, "expected a CloudEndpoint, got %T", obj) + tc.Expected.CloudEndpoints = append(tc.Expected.CloudEndpoints, ce) + } + for _, rawObj := range rawTC.Expected.AgentEndpoints { + obj, err := decodeViaScheme(sch, rawObj) + require.NoError(t, err) + ae, ok := obj.(*ngrokv1alpha1.AgentEndpoint) + require.True(t, ok, "expected an AgentEndpoint, got %T", obj) + tc.Expected.AgentEndpoints = append(tc.Expected.AgentEndpoints, ae) + } + + logger := logr.New(logr.Discard().GetSink()) + // If you need to debug tests, uncomment this logger instead to actually see errors printed in the tests. + // Otherwise, keep the above logger so that we don't output stuff and make the test output harder to read. + // logger = testr.New(t) + + driver := NewDriver( + logger, + sch, + testutils.DefaultControllerName, + types.NamespacedName{ + Name: "test-manager-name", + Namespace: "test-manager-namespace", + }, + WithGatewayEnabled(false), + WithSyncAllowConcurrent(true), + ) + + // Load input objects into the driver store + inputObjects := []runtime.Object{} + for _, obj := range tc.Input.IngressClasses { + inputObjects = append(inputObjects, obj) + } + for _, obj := range tc.Input.Ingresses { + inputObjects = append(inputObjects, obj) + } + for _, obj := range tc.Input.Services { + inputObjects = append(inputObjects, obj) + } + for _, obj := range tc.Input.TrafficPolicies { + inputObjects = append(inputObjects, obj) + } + + client := fake.NewClientBuilder().WithScheme(sch).WithRuntimeObjects(inputObjects...).Build() + + require.NoError(t, driver.Seed(context.Background(), client)) + translator := NewTranslator( + driver.log, + driver.store, + driver.defaultManagedResourceLabels(), + driver.ingressNgrokMetadata, + "svc.cluster.local", + ) + + // Finally, run translate and check the contents + result := translator.Translate() + require.Equal(t, len(tc.Expected.AgentEndpoints), len(result.AgentEndpoints)) + require.Equal(t, len(tc.Expected.CloudEndpoints), len(result.CloudEndpoints)) + + for _, expectedCLEP := range tc.Expected.CloudEndpoints { + actualCE, exists := result.CloudEndpoints[types.NamespacedName{ + Name: expectedCLEP.Name, + Namespace: expectedCLEP.Namespace, + }] + require.True(t, exists, "expected CloudEndpoint %s.%s to exist", expectedCLEP.Name, expectedCLEP.Namespace) + assert.Equal(t, expectedCLEP.Name, actualCE.Name) + assert.Equal(t, expectedCLEP.Namespace, actualCE.Namespace) + assert.Equal(t, expectedCLEP.Labels, actualCE.Labels) + assert.Equal(t, expectedCLEP.Spec, expectedCLEP.Spec) + } + + for _, expectedAE := range tc.Expected.AgentEndpoints { + actualAE, exists := result.AgentEndpoints[types.NamespacedName{ + Name: expectedAE.Name, + Namespace: expectedAE.Namespace, + }] + require.True(t, exists, "expected AgentEndpoint %s.%s to exist. actual agent endpoints: %v", expectedAE.Name, expectedAE.Namespace, result.AgentEndpoints) + require.Equal(t, expectedAE.Name, actualAE.Name) + require.Equal(t, expectedAE.Namespace, actualAE.Namespace) + require.Equal(t, expectedAE.Labels, actualAE.Labels) + require.Equal(t, expectedAE.Spec, actualAE.Spec) + } + + }) + } +} + +// decodeViaScheme helps us decode raw objects loaded from test data yaml files into proper objects that can then be typecast +func decodeViaScheme(s *runtime.Scheme, rawObj map[string]interface{}) (runtime.Object, error) { + // Convert map to YAML + y, err := yaml.Marshal(rawObj) + if err != nil { + return nil, fmt.Errorf("failed to marshal raw map to YAML: %w", err) + } + + // Decode + decoder := serializer.NewCodecFactory(s).UniversalDeserializer() + obj, _, err := decoder.Decode(y, nil, nil) + if err != nil { + return nil, fmt.Errorf("failed to decode via scheme: %w", err) + } + + return obj, nil +}