diff --git a/pkg/output/xatu/exporter.go b/pkg/output/xatu/exporter.go index b77a6d7e..de8112bb 100644 --- a/pkg/output/xatu/exporter.go +++ b/pkg/output/xatu/exporter.go @@ -31,13 +31,13 @@ var ( retryPolicy = `{ "methodConfig": [{ "name": [{"service": "xatu.EventIngester"}], - "waitForReady": true, + "waitForReady": false, "retryPolicy": { "MaxAttempts": 5, - "InitialBackoff": ".01s", + "InitialBackoff": "1s", "MaxBackoff": "15s", - "BackoffMultiplier": 1.0, - "RetryableStatusCodes": [ "UNAVAILABLE", "UNKNOWN" ] + "BackoffMultiplier": 1.5, + "RetryableStatusCodes": [ "UNAVAILABLE", "UNKNOWN", "INTERNAL" ] } }]} ` diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index 61fe0f66..386c4568 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -229,8 +229,6 @@ func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, it err := bvp.exportWithTimeout(ctx, itemsBatch) if err != nil { - bvp.metrics.IncItemsFailedBy(bvp.name, float64(len(itemsBatch))) - return err } } @@ -248,8 +246,6 @@ func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBa defer cancel() } - bvp.metrics.IncItemsExportedBy(bvp.name, float64(len(itemsBatch))) - err := bvp.e.ExportItems(ctx, itemsBatch) if err != nil { bvp.metrics.IncItemsFailedBy(bvp.name, float64(len(itemsBatch))) @@ -257,6 +253,8 @@ func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBa return err } + bvp.metrics.IncItemsExportedBy(bvp.name, float64(len(itemsBatch))) + return nil }