Skip to content

Commit

Permalink
Merge pull request #71 from xataio/fix-instrumentation
Browse files Browse the repository at this point in the history
Fix instrumentation
  • Loading branch information
eminano authored Sep 5, 2024
2 parents 0f761ef + 720dc9f commit 5a6115a
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 19 deletions.
8 changes: 4 additions & 4 deletions pkg/wal/processor/kafka/wal_kafka_batch_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ func NewBatchWriter(config *Config, opts ...Option) (*BatchWriter, error) {
}
w.queueBytesSema = synclib.NewWeightedSemaphore(int64(maxQueueBytes))

for _, opt := range opts {
opt(w)
}

// Since the batch kafka writer handles the batching, we don't want to have
// a timeout configured in the underlying kafka-go writer or the latency for
// the send will increase unnecessarily. Instead, we set the kafka-go writer
Expand All @@ -87,6 +83,10 @@ func NewBatchWriter(config *Config, opts ...Option) (*BatchWriter, error) {
return nil, err
}

for _, opt := range opts {
opt(w)
}

return w, nil
}

Expand Down
25 changes: 14 additions & 11 deletions pkg/wal/processor/search/search_store_retrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (s *StoreRetrier) DeleteTableDocuments(ctx context.Context, schemaName stri
func (s *StoreRetrier) SendDocuments(ctx context.Context, docs []Document) ([]DocumentError, error) {
docsToSend := docs
failedDocs := []DocumentError{}
docsDropped := []DocumentError{}
send := func(ctx context.Context) error {
total := len(docsToSend)
var err error
Expand All @@ -86,7 +87,9 @@ func (s *StoreRetrier) SendDocuments(ctx context.Context, docs []Document) ([]Do
return err
}

docsToSend = s.getRetriableDocs(failedDocs)
var dropped []DocumentError
docsToSend, dropped = s.getRetriableDocs(failedDocs)
docsDropped = append(docsDropped, dropped...)
// nothing to retry
if len(docsToSend) == 0 {
return nil
Expand Down Expand Up @@ -114,27 +117,27 @@ func (s *StoreRetrier) SendDocuments(ctx context.Context, docs []Document) ([]Do
if err != nil {
// some documents failed to send - return back whatever failed
if errors.Is(err, errPartialDocumentSend) {
return failedDocs, nil
return append(failedDocs, docsDropped...), nil
}
// internal search store error points to something wrong, return the error
// along with the failed documents
return failedDocs, err
return append(failedDocs, docsDropped...), err
}

return nil, nil
return docsDropped, nil
}

func (s *StoreRetrier) getRetriableDocs(failedDocs []DocumentError) []Document {
func (s *StoreRetrier) getRetriableDocs(failedDocs []DocumentError) ([]Document, []DocumentError) {
if len(failedDocs) == 0 {
return nil
return nil, nil
}

dropped := 0
docsToRetry := make([]Document, 0, len(failedDocs))
docsDropped := make([]DocumentError, 0, len(failedDocs))
for _, f := range failedDocs {
switch f.Severity {
case SeverityDataLoss:
dropped++
docsDropped = append(docsDropped, f)
case SeverityRetriable:
docsToRetry = append(docsToRetry, f.Document)
}
Expand All @@ -143,14 +146,14 @@ func (s *StoreRetrier) getRetriableDocs(failedDocs []DocumentError) []Document {
}
}

if dropped > 0 {
if len(docsDropped) > 0 {
s.logger.Warn(nil, "search store retrier: documents dropped", loglib.Fields{
"docs_failed": len(failedDocs),
"docs_dropped": dropped,
"docs_dropped": len(docsDropped),
})
}

return docsToRetry
return docsToRetry, docsDropped
}

func (s *StoreRetrier) logFailure(docErr DocumentError) {
Expand Down
27 changes: 23 additions & 4 deletions pkg/wal/processor/search/search_store_retrier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestStoreRetrier_SendDocuments(t *testing.T) {
return nil, nil
},
},
wantFailedDocs: nil,
wantFailedDocs: []DocumentError{},
wantErr: nil,
},
{
Expand All @@ -66,7 +66,26 @@ func TestStoreRetrier_SendDocuments(t *testing.T) {
}
},
},
wantFailedDocs: nil,
wantFailedDocs: []DocumentError{},
wantErr: nil,
},
{
name: "ok - failed and dropped documents",
store: &mockStore{
sendDocumentsFn: func(ctx context.Context, i uint, docs []Document) ([]DocumentError, error) {
switch i {
case 1:
require.Equal(t, testDocs, docs)
return append(failedDocs(SeverityDataLoss), failedDocs(SeverityRetriable)...), nil
case 2, 3:
require.Equal(t, []Document{*newTestDocument(withID("1"))}, docs)
return failedDocs(SeverityRetriable), nil
default:
return nil, fmt.Errorf("sendDocumentsFn: unexpected call %d", i)
}
},
},
wantFailedDocs: append(failedDocs(SeverityRetriable), failedDocs(SeverityDataLoss)...),
wantErr: nil,
},
{
Expand All @@ -82,11 +101,11 @@ func TestStoreRetrier_SendDocuments(t *testing.T) {
}
},
},
wantFailedDocs: nil,
wantFailedDocs: failedDocs(SeverityDataLoss),
wantErr: nil,
},
{
name: "error - some failed documents",
name: "ok - some failed documents",
store: &mockStore{
sendDocumentsFn: func(ctx context.Context, i uint, docs []Document) ([]DocumentError, error) {
switch i {
Expand Down

0 comments on commit 5a6115a

Please sign in to comment.