From c78dccc1fe9ad7d5d63012d917afbb892d9b29fe Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Fri, 28 Jun 2024 18:36:52 +1000 Subject: [PATCH] feat: Add option to override shipping method for writing items --- pkg/output/http/http.go | 15 ++++++++++++++- pkg/processor/batch.go | 15 ++++++++++++--- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/pkg/output/http/http.go b/pkg/output/http/http.go index ed8473db..102659f4 100644 --- a/pkg/output/http/http.go +++ b/pkg/output/http/http.go @@ -3,6 +3,7 @@ package http import ( "context" "errors" + "strings" "github.com/ethpandaops/xatu/pkg/processor" "github.com/ethpandaops/xatu/pkg/proto/xatu" @@ -106,5 +107,17 @@ func (h *HTTP) HandleNewDecoratedEvents(ctx context.Context, events []*xatu.Deco } } - return h.proc.Write(ctx, filtered) + if len(filtered) == 0 { + return nil + } + + shippingMethod := processor.ShippingMethodSync + + if strings.Contains(filtered[0].Meta.Client.Name, "sentry") || strings.Contains(filtered[0].Meta.Client.Name, "cl-mimicry") { + shippingMethod = processor.ShippingMethodAsync + } + + return h.proc.Write(ctx, filtered, processor.WriteOptions{ + OverrideShippingMethod: &shippingMethod, + }) } diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index bcd1a2b6..c1ca4bde 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -205,12 +205,16 @@ func (bvp *BatchItemProcessor[T]) Start(ctx context.Context) { }() } +type WriteOptions struct { + OverrideShippingMethod *ShippingMethod +} + // Write writes items to the queue. If the Processor is configured to use // the sync shipping method, the items will be written to the queue and this // function will return when all items have been processed. If the Processor is // configured to use the async shipping method, the items will be written to // the queue and this function will return immediately. -func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { +func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T, opts ...WriteOptions) error { _, span := observability.Tracer().Start(ctx, "BatchItemProcessor.Write") defer span.End() @@ -218,6 +222,11 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { return errors.New("exporter is nil") } + shippingMethod := bvp.o.ShippingMethod + if len(opts) > 0 { + shippingMethod = *opts[0].OverrideShippingMethod + } + // Break our items up in to chunks that can be processed at // one time by our workers. This is to prevent wasting // resources sending items if we've failed an earlier @@ -236,7 +245,7 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { item: i, } - if bvp.o.ShippingMethod == ShippingMethodSync { + if shippingMethod == ShippingMethodSync { item.errCh = make(chan error, 1) item.completedCh = make(chan struct{}, 1) } @@ -250,7 +259,7 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { } } - if bvp.o.ShippingMethod == ShippingMethodSync { + if shippingMethod == ShippingMethodSync { if err := bvp.waitForBatchCompletion(ctx, prepared); err != nil { return err }