Skip to content

Commit

Permalink
support write status written in tx
Browse files Browse the repository at this point in the history
  • Loading branch information
rekby committed Sep 20, 2024
1 parent e2b4472 commit eaa2e0c
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 8 deletions.
23 changes: 16 additions & 7 deletions internal/grpcwrapper/rawtopic/rawtopicwriter/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,13 @@ func (r *WriteResult) GetAcks() (res traceAck) {
}
for i := range r.Acks {
ack := &r.Acks[i]
if ack.MessageWriteStatus.Type == WriteStatusTypeWritten {
switch ack.MessageWriteStatus.Type {
case WriteStatusTypeWritten:
res.WrittenCount++
}
if ack.MessageWriteStatus.Type == WriteStatusTypeSkipped {
case WriteStatusTypeSkipped:
res.SkipCount++
case WriteStatusTypeWrittenInTx:
res.WrittenInTxCount++
}

if ack.SeqNo < res.SeqNoMin {
Expand All @@ -263,6 +265,7 @@ type traceAck = struct {
WrittenOffsetMin int64
WrittenOffsetMax int64
WrittenCount int
WrittenInTxCount int
SkipCount int
}

Expand Down Expand Up @@ -301,6 +304,12 @@ func (s *MessageWriteStatus) fromProto(status interface{}) error {
s.SkippedReason = WriteStatusSkipReason(v.Skipped.GetReason())

return nil

case *Ydb_Topic.StreamWriteMessage_WriteResponse_WriteAck_WrittenInTx_:
s.Type = WriteStatusTypeWrittenInTx

return nil

default:
return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: unexpected write status type: %v", reflect.TypeOf(v))))
}
Expand All @@ -309,19 +318,19 @@ func (s *MessageWriteStatus) fromProto(status interface{}) error {
type WriteStatusType int

const (
WriteStatusTypeUnknown WriteStatusType = iota
WriteStatusTypeWritten
WriteStatusTypeWritten WriteStatusType = iota + 1
WriteStatusTypeSkipped
WriteStatusTypeWrittenInTx
)

func (t WriteStatusType) String() string {
switch t {
case WriteStatusTypeUnknown:
return "Unknown"
case WriteStatusTypeSkipped:
return "Skipped"
case WriteStatusTypeWritten:
return "Written"
case WriteStatusTypeWrittenInTx:
return "WrittenInTx"
default:
return strconv.Itoa(int(t))
}
Expand Down
2 changes: 1 addition & 1 deletion internal/topic/topicwriterinternal/writer_reconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ func createWriteRequest(messages []messageWithDataContent, targetCodec rawtopicc
}
}

if len(messages) > 0 {
if len(messages) > 0 && messages[0].tx != nil {
res.Tx.ID = messages[0].tx.ID()
res.Tx.Session = messages[0].tx.SessionID()
}
Expand Down
1 change: 1 addition & 0 deletions log/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
Int64("written_offset_min", acks.WrittenOffsetMin),
Int64("written_offset_max", acks.WrittenOffsetMax),
Int("written_offset_count", acks.WrittenCount),
Int("written_in_tx_count", acks.WrittenInTxCount),
Int("skip_count", acks.SkipCount),
versionField(),
)
Expand Down
1 change: 1 addition & 0 deletions trace/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ type (
WrittenOffsetMin int64
WrittenOffsetMax int64
WrittenCount int
WrittenInTxCount int
SkipCount int
}
}
Expand Down

0 comments on commit eaa2e0c

Please sign in to comment.