From 05936eaa9bb2e4a68071e8d625134f2c7dbc1c7d Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Tue, 3 Dec 2024 13:28:44 +0100 Subject: [PATCH] MT-Broker: return retriable status code based on the state to leverage retries (#8366) * MT-Broker: return appropriate status code based on the state to leverage retries The ingress or filter deployments were returning 400 even in the case where a given resource (like trigger, broker, subscription) wasn't found, however, this is a common case where the lister cache hasn't caught up with the latest state. Signed-off-by: Pierangelo Di Pilato * Fix unit tests Signed-off-by: Pierangelo Di Pilato --------- Signed-off-by: Pierangelo Di Pilato --- pkg/broker/filter/filter_handler.go | 20 ++++++++++++++++++++ pkg/broker/filter/filter_handler_test.go | 8 +++++--- pkg/broker/ingress/ingress_handler.go | 8 +++++++- pkg/broker/ingress/ingress_handler_test.go | 8 +++++--- 4 files changed, 37 insertions(+), 7 deletions(-) diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 8ea2565ebfa..c744d969ca1 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -26,6 +26,8 @@ import ( "net/http" "time" + apierrors "k8s.io/apimachinery/pkg/api/errors" + opencensusclient "github.com/cloudevents/sdk-go/observability/opencensus/v2/client" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/binding" @@ -170,6 +172,11 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { } trigger, err := h.getTrigger(triggerRef) + if apierrors.IsNotFound(err) { + h.logger.Info("Unable to find the Trigger", zap.Error(err), zap.Any("triggerRef", triggerRef)) + writer.WriteHeader(http.StatusNotFound) + return + } if err != nil { h.logger.Info("Unable to get the Trigger", zap.Error(err), zap.Any("triggerRef", triggerRef)) writer.WriteHeader(http.StatusBadRequest) @@ -245,6 +252,11 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve } broker, err := h.brokerLister.Brokers(brokerNamespace).Get(brokerName) + if apierrors.IsNotFound(err) { + h.logger.Info("Unable to get the Broker", zap.Error(err)) + writer.WriteHeader(http.StatusNotFound) + return + } if err != nil { h.logger.Info("Unable to get the Broker", zap.Error(err)) writer.WriteHeader(http.StatusBadRequest) @@ -290,6 +302,11 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event brokerNamespace = trigger.Namespace } broker, err := h.brokerLister.Brokers(brokerNamespace).Get(brokerName) + if apierrors.IsNotFound(err) { + h.logger.Info("Unable to get the Broker", zap.Error(err)) + writer.WriteHeader(http.StatusNotFound) + return + } if err != nil { h.logger.Info("Unable to get the Broker", zap.Error(err)) writer.WriteHeader(http.StatusBadRequest) @@ -310,6 +327,9 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event Audience: broker.Status.DeadLetterSinkAudience, } } + if target == nil { + return + } reportArgs := &ReportArgs{ ns: trigger.Namespace, diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index e220e401774..b6fa2b6d5d9 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -52,9 +52,11 @@ import ( brokerinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake" triggerinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake" + _ "knative.dev/pkg/client/injection/kube/client/fake" + _ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake" + // Fake injection client _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake" - _ "knative.dev/pkg/client/injection/kube/client/fake" ) const ( @@ -108,7 +110,7 @@ func TestReceiver(t *testing.T) { expectedStatus: http.StatusBadRequest, }, "Path too long": { - request: httptest.NewRequest(http.MethodPost, "/triggers/test-namespace/test-trigger/extra", nil), + request: httptest.NewRequest(http.MethodPost, "/triggers/test-namespace/test-trigger/uuid/extra/extra", nil), expectedStatus: http.StatusBadRequest, }, "Path without prefix": { @@ -117,7 +119,7 @@ func TestReceiver(t *testing.T) { }, "Trigger.Get fails": { // No trigger exists, so the Get will fail. - expectedStatus: http.StatusBadRequest, + expectedStatus: http.StatusNotFound, }, "Trigger doesn't have SubscriberURI": { triggers: []*eventingv1.Trigger{ diff --git a/pkg/broker/ingress/ingress_handler.go b/pkg/broker/ingress/ingress_handler.go index 36f514c0cc6..a22295a5ec1 100644 --- a/pkg/broker/ingress/ingress_handler.go +++ b/pkg/broker/ingress/ingress_handler.go @@ -23,6 +23,7 @@ import ( "strings" "time" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/utils/ptr" opencensusclient "github.com/cloudevents/sdk-go/observability/opencensus/v2/client" @@ -226,6 +227,11 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { } broker, err := h.getBroker(brokerName, brokerNamespace) + if apierrors.IsNotFound(err) { + h.Logger.Warn("Failed to retrieve broker", zap.Error(err)) + writer.WriteHeader(http.StatusNotFound) + return + } if err != nil { h.Logger.Warn("Failed to retrieve broker", zap.Error(err)) writer.WriteHeader(http.StatusBadRequest) @@ -315,7 +321,7 @@ func (h *Handler) receive(ctx context.Context, headers http.Header, event *cloud channelAddress, err := h.getChannelAddress(brokerObj) if err != nil { h.Logger.Warn("could not get channel address from broker", zap.Error(err)) - return http.StatusBadRequest, kncloudevents.NoDuration + return http.StatusInternalServerError, kncloudevents.NoDuration } opts := []kncloudevents.SendOption{ diff --git a/pkg/broker/ingress/ingress_handler_test.go b/pkg/broker/ingress/ingress_handler_test.go index db6a18dda4c..d661520d7e7 100644 --- a/pkg/broker/ingress/ingress_handler_test.go +++ b/pkg/broker/ingress/ingress_handler_test.go @@ -45,9 +45,11 @@ import ( brokerinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake" + _ "knative.dev/pkg/client/injection/kube/client/fake" + _ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake" + // Fake injection client _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake" - _ "knative.dev/pkg/client/injection/kube/client/fake" ) const ( @@ -213,9 +215,9 @@ func TestHandler_ServeHTTP(t *testing.T) { method: nethttp.MethodPost, uri: "/ns/name", body: getValidEvent(), - statusCode: nethttp.StatusBadRequest, + statusCode: nethttp.StatusInternalServerError, handler: handler(), - reporter: &mockReporter{StatusCode: nethttp.StatusBadRequest, EventDispatchTimeReported: false}, + reporter: &mockReporter{StatusCode: nethttp.StatusInternalServerError, EventDispatchTimeReported: false}, defaulter: broker.TTLDefaulter(logger, 100), brokers: []*eventingv1.Broker{ withUninitializedAnnotations(makeBroker("name", "ns")),