From 27fda5343be02149ecc71555bfc0573c074ee808 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Tue, 8 Oct 2024 16:54:53 +1100 Subject: [PATCH 1/3] refactor: Update event handler logging statements --- pkg/sentry/presets.go | 2 +- pkg/server/service/event-ingester/handler.go | 22 ++++++++++++++++++- pkg/server/service/event-ingester/ingester.go | 19 ++++++++++------ 3 files changed, 34 insertions(+), 9 deletions(-) diff --git a/pkg/sentry/presets.go b/pkg/sentry/presets.go index 8947eed9..1a64c4fe 100644 --- a/pkg/sentry/presets.go +++ b/pkg/sentry/presets.go @@ -125,7 +125,7 @@ outputs: Value: []byte(` preset: docker-compose outputs: -- name: ethpandaops +- name: dockercompose type: xatu config: address: localhost:8080 diff --git a/pkg/server/service/event-ingester/handler.go b/pkg/server/service/event-ingester/handler.go index 9c4a64a3..8a83bfcf 100644 --- a/pkg/server/service/event-ingester/handler.go +++ b/pkg/server/service/event-ingester/handler.go @@ -54,6 +54,8 @@ func (h *Handler) Events(ctx context.Context, events []*xatu.DecoratedEvent, use username = user.Username() } + h.log.WithField("input_events", len(events)).Info("Passing events to filterEvents") + filteredEvents, err := h.filterEvents(ctx, events, user, group) if err != nil { return nil, fmt.Errorf("failed to filter events: %w", err) @@ -61,6 +63,8 @@ func (h *Handler) Events(ctx context.Context, events []*xatu.DecoratedEvent, use events = filteredEvents + h.log.WithField("output_events", len(events)).Info("Passing events to applyRedacter") + // Redact the events. Redacting is done before and after the event is processed to ensure that the field is not leaked by processing such as geoip lookups. if group != nil { redactedEvents, err := group.ApplyRedacter(events) @@ -71,6 +75,8 @@ func (h *Handler) Events(ctx context.Context, events []*xatu.DecoratedEvent, use events = redactedEvents } + h.log.WithField("output_events", len(events)).Info("Got events from redacted") + now := time.Now() if h.clockDrift != nil { now = now.Add(*h.clockDrift) @@ -142,6 +148,8 @@ func (h *Handler) Events(ctx context.Context, events []*xatu.DecoratedEvent, use } } + handlerFilteredEvents := make([]*xatu.DecoratedEvent, 0) + // Route the events to the correct handler for _, event := range events { if event == nil || event.Event == nil { continue @@ -199,9 +207,13 @@ func (h *Handler) Events(ctx context.Context, events []*xatu.DecoratedEvent, use event.Meta.Server = e.AppendServerMeta(ctx, &meta) - filteredEvents = append(filteredEvents, event) + handlerFilteredEvents = append(handlerFilteredEvents, event) } + h.log.WithField("output_events", len(handlerFilteredEvents)).Info("Got events from event handler") + + filteredEvents = handlerFilteredEvents + // Redact the events again if group != nil { redactedEvents, err := group.ApplyRedacter(filteredEvents) @@ -218,6 +230,8 @@ func (h *Handler) Events(ctx context.Context, events []*xatu.DecoratedEvent, use func (h *Handler) filterEvents(_ context.Context, events []*xatu.DecoratedEvent, user *auth.User, group *auth.Group) ([]*xatu.DecoratedEvent, error) { filteredEvents := events + h.log.WithField("input_events", len(events)).Info("Filtering events") + // Apply the user filter if user != nil { ev, err := user.ApplyFilter(filteredEvents) @@ -226,6 +240,8 @@ func (h *Handler) filterEvents(_ context.Context, events []*xatu.DecoratedEvent, } filteredEvents = ev + + h.log.WithField("events", len(filteredEvents)).Info("Filtered events for user") } // Apply the group filter @@ -236,7 +252,11 @@ func (h *Handler) filterEvents(_ context.Context, events []*xatu.DecoratedEvent, } filteredEvents = ev + + h.log.WithField("events", len(filteredEvents)).Info("Filtered events for group") } + h.log.WithField("events", len(filteredEvents)).Info("Filtered events") + return filteredEvents, nil } diff --git a/pkg/server/service/event-ingester/ingester.go b/pkg/server/service/event-ingester/ingester.go index e85e0cb6..439fb984 100644 --- a/pkg/server/service/event-ingester/ingester.go +++ b/pkg/server/service/event-ingester/ingester.go @@ -87,7 +87,7 @@ func (e *Ingester) Stop(ctx context.Context) error { } func (e *Ingester) CreateEvents(ctx context.Context, req *xatu.CreateEventsRequest) (*xatu.CreateEventsResponse, error) { - e.log.WithField("events", len(req.Events)).Debug("Received batch of events") + e.log.WithField("events", len(req.Events)).Info("Received batch of events") md, ok := metadata.FromIncomingContext(ctx) if !ok { @@ -125,12 +125,22 @@ func (e *Ingester) CreateEvents(ctx context.Context, req *xatu.CreateEventsReque return nil, status.Error(codes.Internal, err.Error()) } + e.log.WithFields(logrus.Fields{ + "input_events": len(req.Events), + "output_events": len(filteredEvents), + }).Info("Got events from event handler") + for _, sink := range e.sinks { if err := sink.HandleNewDecoratedEvents(ctx, filteredEvents); err != nil { return nil, status.Error(codes.Internal, err.Error()) } } + e.log.WithFields(logrus.Fields{ + "input_events": len(req.Events), + "output_events": len(filteredEvents), + }).Info("Successfully processed batch of events") + return &xatu.CreateEventsResponse{}, nil } @@ -138,17 +148,12 @@ func (e *Ingester) CreateSinks() ([]output.Sink, error) { sinks := make([]output.Sink, len(e.config.Outputs)) for i, out := range e.config.Outputs { - if out.ShippingMethod == nil { - shippingMethod := processor.ShippingMethodSync - out.ShippingMethod = &shippingMethod - } - sink, err := output.NewSink(out.Name, out.SinkType, out.Config, e.log, out.FilterConfig, - *out.ShippingMethod, + processor.ShippingMethodSync, ) if err != nil { return nil, err From c4ef001189885366e01cdb511a05324d0a23e090 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Tue, 8 Oct 2024 16:56:29 +1100 Subject: [PATCH 2/3] fix(server): Remove duplicates --- pkg/server/service/event-ingester/handler.go | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/pkg/server/service/event-ingester/handler.go b/pkg/server/service/event-ingester/handler.go index 8a83bfcf..ebc0d75e 100644 --- a/pkg/server/service/event-ingester/handler.go +++ b/pkg/server/service/event-ingester/handler.go @@ -54,8 +54,6 @@ func (h *Handler) Events(ctx context.Context, events []*xatu.DecoratedEvent, use username = user.Username() } - h.log.WithField("input_events", len(events)).Info("Passing events to filterEvents") - filteredEvents, err := h.filterEvents(ctx, events, user, group) if err != nil { return nil, fmt.Errorf("failed to filter events: %w", err) @@ -63,8 +61,6 @@ func (h *Handler) Events(ctx context.Context, events []*xatu.DecoratedEvent, use events = filteredEvents - h.log.WithField("output_events", len(events)).Info("Passing events to applyRedacter") - // Redact the events. Redacting is done before and after the event is processed to ensure that the field is not leaked by processing such as geoip lookups. if group != nil { redactedEvents, err := group.ApplyRedacter(events) @@ -75,8 +71,6 @@ func (h *Handler) Events(ctx context.Context, events []*xatu.DecoratedEvent, use events = redactedEvents } - h.log.WithField("output_events", len(events)).Info("Got events from redacted") - now := time.Now() if h.clockDrift != nil { now = now.Add(*h.clockDrift) @@ -210,8 +204,6 @@ func (h *Handler) Events(ctx context.Context, events []*xatu.DecoratedEvent, use handlerFilteredEvents = append(handlerFilteredEvents, event) } - h.log.WithField("output_events", len(handlerFilteredEvents)).Info("Got events from event handler") - filteredEvents = handlerFilteredEvents // Redact the events again @@ -230,8 +222,6 @@ func (h *Handler) Events(ctx context.Context, events []*xatu.DecoratedEvent, use func (h *Handler) filterEvents(_ context.Context, events []*xatu.DecoratedEvent, user *auth.User, group *auth.Group) ([]*xatu.DecoratedEvent, error) { filteredEvents := events - h.log.WithField("input_events", len(events)).Info("Filtering events") - // Apply the user filter if user != nil { ev, err := user.ApplyFilter(filteredEvents) @@ -240,8 +230,6 @@ func (h *Handler) filterEvents(_ context.Context, events []*xatu.DecoratedEvent, } filteredEvents = ev - - h.log.WithField("events", len(filteredEvents)).Info("Filtered events for user") } // Apply the group filter @@ -252,11 +240,7 @@ func (h *Handler) filterEvents(_ context.Context, events []*xatu.DecoratedEvent, } filteredEvents = ev - - h.log.WithField("events", len(filteredEvents)).Info("Filtered events for group") } - h.log.WithField("events", len(filteredEvents)).Info("Filtered events") - return filteredEvents, nil } From fbe89d1fb0fec475d26512f5d19b3040d7222a06 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Tue, 8 Oct 2024 16:57:38 +1100 Subject: [PATCH 3/3] tidy --- pkg/server/service/event-ingester/ingester.go | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/pkg/server/service/event-ingester/ingester.go b/pkg/server/service/event-ingester/ingester.go index 439fb984..9e002e81 100644 --- a/pkg/server/service/event-ingester/ingester.go +++ b/pkg/server/service/event-ingester/ingester.go @@ -87,7 +87,7 @@ func (e *Ingester) Stop(ctx context.Context) error { } func (e *Ingester) CreateEvents(ctx context.Context, req *xatu.CreateEventsRequest) (*xatu.CreateEventsResponse, error) { - e.log.WithField("events", len(req.Events)).Info("Received batch of events") + e.log.WithField("events", len(req.Events)).Debug("Received batch of events") md, ok := metadata.FromIncomingContext(ctx) if !ok { @@ -125,22 +125,12 @@ func (e *Ingester) CreateEvents(ctx context.Context, req *xatu.CreateEventsReque return nil, status.Error(codes.Internal, err.Error()) } - e.log.WithFields(logrus.Fields{ - "input_events": len(req.Events), - "output_events": len(filteredEvents), - }).Info("Got events from event handler") - for _, sink := range e.sinks { if err := sink.HandleNewDecoratedEvents(ctx, filteredEvents); err != nil { return nil, status.Error(codes.Internal, err.Error()) } } - e.log.WithFields(logrus.Fields{ - "input_events": len(req.Events), - "output_events": len(filteredEvents), - }).Info("Successfully processed batch of events") - return &xatu.CreateEventsResponse{}, nil }