From 9eb3d4916f3a7b291b2d4b3a4484ddd6427dc4cf Mon Sep 17 00:00:00 2001 From: eminano Date: Thu, 5 Sep 2024 13:16:36 +0200 Subject: [PATCH 1/2] Update search store retrier to not ignore dropped documents --- .../processor/search/search_store_retrier.go | 25 +++++++++-------- .../search/search_store_retrier_test.go | 27 ++++++++++++++++--- 2 files changed, 37 insertions(+), 15 deletions(-) diff --git a/pkg/wal/processor/search/search_store_retrier.go b/pkg/wal/processor/search/search_store_retrier.go index f2275ed..e1f6419 100644 --- a/pkg/wal/processor/search/search_store_retrier.go +++ b/pkg/wal/processor/search/search_store_retrier.go @@ -78,6 +78,7 @@ func (s *StoreRetrier) DeleteTableDocuments(ctx context.Context, schemaName stri func (s *StoreRetrier) SendDocuments(ctx context.Context, docs []Document) ([]DocumentError, error) { docsToSend := docs failedDocs := []DocumentError{} + docsDropped := []DocumentError{} send := func(ctx context.Context) error { total := len(docsToSend) var err error @@ -86,7 +87,9 @@ func (s *StoreRetrier) SendDocuments(ctx context.Context, docs []Document) ([]Do return err } - docsToSend = s.getRetriableDocs(failedDocs) + var dropped []DocumentError + docsToSend, dropped = s.getRetriableDocs(failedDocs) + docsDropped = append(docsDropped, dropped...) // nothing to retry if len(docsToSend) == 0 { return nil @@ -114,27 +117,27 @@ func (s *StoreRetrier) SendDocuments(ctx context.Context, docs []Document) ([]Do if err != nil { // some documents failed to send - return back whatever failed if errors.Is(err, errPartialDocumentSend) { - return failedDocs, nil + return append(failedDocs, docsDropped...), nil } // internal search store error points to something wrong, return the error // along with the failed documents - return failedDocs, err + return append(failedDocs, docsDropped...), err } - return nil, nil + return docsDropped, nil } -func (s *StoreRetrier) getRetriableDocs(failedDocs []DocumentError) []Document { +func (s *StoreRetrier) getRetriableDocs(failedDocs []DocumentError) ([]Document, []DocumentError) { if len(failedDocs) == 0 { - return nil + return nil, nil } - dropped := 0 docsToRetry := make([]Document, 0, len(failedDocs)) + docsDropped := make([]DocumentError, 0, len(failedDocs)) for _, f := range failedDocs { switch f.Severity { case SeverityDataLoss: - dropped++ + docsDropped = append(docsDropped, f) case SeverityRetriable: docsToRetry = append(docsToRetry, f.Document) } @@ -143,14 +146,14 @@ func (s *StoreRetrier) getRetriableDocs(failedDocs []DocumentError) []Document { } } - if dropped > 0 { + if len(docsDropped) > 0 { s.logger.Warn(nil, "search store retrier: documents dropped", loglib.Fields{ "docs_failed": len(failedDocs), - "docs_dropped": dropped, + "docs_dropped": len(docsDropped), }) } - return docsToRetry + return docsToRetry, docsDropped } func (s *StoreRetrier) logFailure(docErr DocumentError) { diff --git a/pkg/wal/processor/search/search_store_retrier_test.go b/pkg/wal/processor/search/search_store_retrier_test.go index 7921053..4e03620 100644 --- a/pkg/wal/processor/search/search_store_retrier_test.go +++ b/pkg/wal/processor/search/search_store_retrier_test.go @@ -47,7 +47,7 @@ func TestStoreRetrier_SendDocuments(t *testing.T) { return nil, nil }, }, - wantFailedDocs: nil, + wantFailedDocs: []DocumentError{}, wantErr: nil, }, { @@ -66,7 +66,26 @@ func TestStoreRetrier_SendDocuments(t *testing.T) { } }, }, - wantFailedDocs: nil, + wantFailedDocs: []DocumentError{}, + wantErr: nil, + }, + { + name: "ok - failed and dropped documents", + store: &mockStore{ + sendDocumentsFn: func(ctx context.Context, i uint, docs []Document) ([]DocumentError, error) { + switch i { + case 1: + require.Equal(t, testDocs, docs) + return append(failedDocs(SeverityDataLoss), failedDocs(SeverityRetriable)...), nil + case 2, 3: + require.Equal(t, []Document{*newTestDocument(withID("1"))}, docs) + return failedDocs(SeverityRetriable), nil + default: + return nil, fmt.Errorf("sendDocumentsFn: unexpected call %d", i) + } + }, + }, + wantFailedDocs: append(failedDocs(SeverityRetriable), failedDocs(SeverityDataLoss)...), wantErr: nil, }, { @@ -82,11 +101,11 @@ func TestStoreRetrier_SendDocuments(t *testing.T) { } }, }, - wantFailedDocs: nil, + wantFailedDocs: failedDocs(SeverityDataLoss), wantErr: nil, }, { - name: "error - some failed documents", + name: "ok - some failed documents", store: &mockStore{ sendDocumentsFn: func(ctx context.Context, i uint, docs []Document) ([]DocumentError, error) { switch i { From 720dc9fba6d72ef46e02ca8028ee569341563881 Mon Sep 17 00:00:00 2001 From: eminano Date: Thu, 5 Sep 2024 13:16:57 +0200 Subject: [PATCH 2/2] Apply opts after kafka writer is created --- pkg/wal/processor/kafka/wal_kafka_batch_writer.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/wal/processor/kafka/wal_kafka_batch_writer.go b/pkg/wal/processor/kafka/wal_kafka_batch_writer.go index 16ae901..4736a80 100644 --- a/pkg/wal/processor/kafka/wal_kafka_batch_writer.go +++ b/pkg/wal/processor/kafka/wal_kafka_batch_writer.go @@ -62,10 +62,6 @@ func NewBatchWriter(config *Config, opts ...Option) (*BatchWriter, error) { } w.queueBytesSema = synclib.NewWeightedSemaphore(int64(maxQueueBytes)) - for _, opt := range opts { - opt(w) - } - // Since the batch kafka writer handles the batching, we don't want to have // a timeout configured in the underlying kafka-go writer or the latency for // the send will increase unnecessarily. Instead, we set the kafka-go writer @@ -87,6 +83,10 @@ func NewBatchWriter(config *Config, opts ...Option) (*BatchWriter, error) { return nil, err } + for _, opt := range opts { + opt(w) + } + return w, nil }