Skip to content

Commit

Permalink
Merge pull request #87 from xataio/reinforce-max-kafka-msg-size
Browse files Browse the repository at this point in the history
Reinforce kafka msg size guards
  • Loading branch information
eminano authored Nov 18, 2024
2 parents c795714 + 948e43d commit 8fe9ff7
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions pkg/wal/processor/kafka/wal_kafka_batch_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,11 @@ func (w *BatchWriter) ProcessWALEvent(ctx context.Context, walEvent *wal.Event)
if err != nil {
return fmt.Errorf("marshalling event: %w", err)
}
// check if walEventBytes is larger than the Kafka accepted max message size
if len(walDataBytes) > int(w.maxBatchBytes) {
// check if walEventBytes is larger than 95% of the Kafka accepted max
// message size to allow for some buffer for the rest of the message
if len(walDataBytes) > int(0.95*float64(w.maxBatchBytes)) {
w.logger.Warn(errRecordTooLarge,
"kafka batch writer: wal event is larger than max bytes",
"kafka batch writer: wal event is larger than 95% of max bytes allowed",
loglib.Fields{
"max_bytes": w.maxBatchBytes,
"size": len(walDataBytes),
Expand Down Expand Up @@ -205,6 +206,9 @@ func (w *BatchWriter) Send(ctx context.Context) error {
case sendErr := <-sendErrChan:
// if there's an error while sending the batch, return the error and
// stop sending batches
if sendErr != nil && !errors.Is(sendErr, context.Canceled) {
w.logger.Error(sendErr, "sending thread stopped due to errors writing to kafka")
}
return sendErr
case <-ticker.C:
if !msgBatch.isEmpty() {
Expand Down

0 comments on commit 8fe9ff7

Please sign in to comment.