Skip to content

Commit

Permalink
test bytes buffer for meta
Browse files Browse the repository at this point in the history
  • Loading branch information
mhmtszr committed Feb 9, 2024
1 parent 7a99564 commit 2ba612c
Showing 1 changed file with 28 additions and 21 deletions.
49 changes: 28 additions & 21 deletions elasticsearch/bulk/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Metric struct {
}

type BatchItem struct {
Bytes []byte
Bytes *bytes.Buffer
Action *document.ESActionDocument
}

Expand Down Expand Up @@ -145,7 +145,7 @@ func (b *Bulk) AddActions(

key := getActionKey(action)
if batchIndex, ok := b.batchKeys[key]; ok {
b.batchByteSize += len(value) - len(b.batch[batchIndex].Bytes)
b.batchByteSize += value.Len() - b.batch[batchIndex].Bytes.Len()
b.batch[batchIndex] = BatchItem{
Action: &actions[i],
Bytes: value,
Expand All @@ -158,7 +158,7 @@ func (b *Bulk) AddActions(
b.batchKeys[key] = b.batchIndex
b.batchIndex++
b.batchSize++
b.batchByteSize += len(value)
b.batchByteSize += value.Len()
}
}
ctx.Ack()
Expand All @@ -182,35 +182,43 @@ var (

var metaPool = sync.Pool{
New: func() interface{} {
return []byte{}
return &bytes.Buffer{}
},
}

func getEsActionJSON(docID []byte, action document.EsAction, indexName string, routing *string, source []byte, typeName []byte) []byte {
meta := metaPool.Get().([]byte)[:0]
func getEsActionJSON(
docID []byte,
action document.EsAction,
indexName string,
routing *string,
source []byte,
typeName []byte,
) *bytes.Buffer {
meta := metaPool.Get().(*bytes.Buffer)
meta.Reset()

if action == document.Index {
meta = append(meta, indexPrefix...)
meta.Write(indexPrefix)
} else {
meta = append(meta, deletePrefix...)
meta.Write(deletePrefix)
}
meta = append(meta, helper.Byte(indexName)...)
meta = append(meta, idPrefix...)
meta = append(meta, helper.EscapePredefinedBytes(docID)...)
meta.Write(helper.Byte(indexName))
meta.Write(idPrefix)
meta.Write(helper.EscapePredefinedBytes(docID))
if routing != nil {
meta = append(meta, routingPrefix...)
meta = append(meta, helper.Byte(*routing)...)
meta.Write(routingPrefix)
meta.Write(helper.Byte(*routing))
}
if typeName != nil {
meta = append(meta, typePrefix...)
meta = append(meta, typeName...)
meta.Write(typePrefix)
meta.Write(typeName)
}
meta = append(meta, postFix...)
meta.Write(postFix)
if action == document.Index {
meta = append(meta, '\n')
meta = append(meta, source...)
meta.WriteByte('\n')
meta.Write(source)
}
meta = append(meta, '\n')
meta.WriteByte('\n')
return meta
}

Expand All @@ -233,7 +241,6 @@ func (b *Bulk) flushMessages() {
}
b.batchTicker.Reset(b.batchTickerDuration)
for _, batch := range b.batch {
//nolint:staticcheck
metaPool.Put(batch.Bytes)
}
b.batch = b.batch[:0]
Expand Down Expand Up @@ -387,7 +394,7 @@ func getActionKey(action document.ESActionDocument) string {
func getBytes(batchItems []BatchItem) [][]byte {
var batchBytes [][]byte
for _, batchItem := range batchItems {
batchBytes = append(batchBytes, batchItem.Bytes)
batchBytes = append(batchBytes, batchItem.Bytes.Bytes())
}
return batchBytes
}
Expand Down

0 comments on commit 2ba612c

Please sign in to comment.