Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Go Client v8.0.0-beta.2 #460

Merged
merged 19 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Change History

## December 20 2024: v8.0.0-beta.2

- **New Features**
- [CLIENT-2820] Actively refresh pool connections that will be idle before the next tend.

- **Fixes**
- [CLIENT-3218] Fix FilterExpression encoding in Batch commands.

## December 13 2024: v8.0.0-beta.1

Major breaking release. This release supports Multi-Record Transactions. Please note that this is a beta release and will be subject to breaking changes.
Expand Down
170 changes: 161 additions & 9 deletions batch_index_command_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ package aerospike

import (
"github.com/aerospike/aerospike-client-go/v8/types"
Buffer "github.com/aerospike/aerospike-client-go/v8/utils/buffer"
)

type batchIndexCommandGet struct {
batchCommandOperate

indexRecords []*BatchRead
records []*BatchRead
}

func newBatchIndexCommandGet(
Expand All @@ -31,19 +32,24 @@ func newBatchIndexCommandGet(
records []*BatchRead,
isOperation bool,
) batchIndexCommandGet {
recIfcs := make([]BatchRecordIfc, len(records))
for i := range records {
recIfcs[i] = records[i]
}

res := batchIndexCommandGet{
batchCommandOperate: newBatchCommandOperate(client, batch, policy, recIfcs),
indexRecords: records,
batchCommandOperate: newBatchCommandOperate(client, batch, policy, nil),
records: records,
}
res.txn = policy.Txn
return res
}

func (cmd *batchIndexCommandGet) writeBuffer(ifc command) Error {
attr, err := cmd.setBatchOperateRead(cmd.client, cmd.policy, cmd.records, cmd.batch)
cmd.attr = attr
return err
}

func (cmd *batchIndexCommandGet) isRead() bool {
return true
}

func (cmd *batchIndexCommandGet) cloneBatchCommand(batch *batchNode) batcher {
res := *cmd
res.batch = batch
Expand All @@ -60,7 +66,7 @@ func (cmd *batchIndexCommandGet) Execute() Error {
}

func (cmd *batchIndexCommandGet) executeSingle(client *Client) Error {
for _, br := range cmd.indexRecords {
for _, br := range cmd.records {
var ops []*Operation
if br.headerOnly() {
ops = []*Operation{GetHeaderOp()}
Expand Down Expand Up @@ -96,3 +102,149 @@ func (cmd *batchIndexCommandGet) executeSingle(client *Client) Error {
}
return nil
}

// Parse all results in the batch. Add records to shared list.
// If the record was not found, the bins will be nil.
func (cmd *batchIndexCommandGet) parseRecordResults(ifc command, receiveSize int) (bool, Error) {
//Parse each message response and add it to the result array
cmd.dataOffset = 0
for cmd.dataOffset < receiveSize {
if err := cmd.readBytes(int(_MSG_REMAINING_HEADER_SIZE)); err != nil {
return false, err
}
resultCode := types.ResultCode(cmd.dataBuffer[5] & 0xFF)

info3 := int(cmd.dataBuffer[3])

// If cmd is the end marker of the response, do not proceed further
if resultCode == 0 && (info3&_INFO3_LAST) == _INFO3_LAST {
return false, nil
}

generation := Buffer.BytesToUint32(cmd.dataBuffer, 6)
expiration := types.TTL(Buffer.BytesToUint32(cmd.dataBuffer, 10))
batchIndex := int(Buffer.BytesToUint32(cmd.dataBuffer, 14))
fieldCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 18))
opCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 20))

err := cmd.parseFieldsBatch(resultCode, fieldCount, cmd.records[batchIndex])
if err != nil {
return false, err
}

if resultCode != 0 {
if resultCode == types.FILTERED_OUT {
cmd.filteredOutCnt++
}

// If it looks like the error is on the first record and the message is marked as last part,
// the error is for the whole command and not just for the first batchIndex
lastMessage := (info3 & _INFO3_LAST) == _INFO3_LAST
if resultCode != 0 && lastMessage && receiveSize == int(_MSG_REMAINING_HEADER_SIZE) {
return false, newError(resultCode).setNode(cmd.node)
}

if resultCode == types.UDF_BAD_RESPONSE {
rec, err := cmd.parseRecord(cmd.records[batchIndex].key(), opCount, generation, expiration)
if err != nil {
cmd.records[batchIndex].setError(cmd.node, resultCode, cmd.batchInDoubt(cmd.attr.hasWrite, cmd.commandSentCounter))
return false, err
}

// for UDF failures
var msg any
if rec != nil {
msg = rec.Bins["FAILURE"]
}

// Need to store record because failure bin contains an error message.
cmd.records[batchIndex].setRecord(rec)
if msg, ok := msg.(string); ok && len(msg) > 0 {
cmd.records[batchIndex].setErrorWithMsg(cmd.node, resultCode, msg, cmd.batchInDoubt(cmd.attr.hasWrite, cmd.commandSentCounter))
} else {
cmd.records[batchIndex].setError(cmd.node, resultCode, cmd.batchInDoubt(cmd.attr.hasWrite, cmd.commandSentCounter))
}

// If cmd is the end marker of the response, do not proceed further
// if (info3 & _INFO3_LAST) == _INFO3_LAST {
if lastMessage {
return false, nil
}
continue
}

cmd.records[batchIndex].setError(cmd.node, resultCode, cmd.batchInDoubt(cmd.attr.hasWrite, cmd.commandSentCounter))

// If cmd is the end marker of the response, do not proceed further
if (info3 & _INFO3_LAST) == _INFO3_LAST {
return false, nil
}
continue
}

if resultCode == 0 {
if cmd.objects == nil {
rec, err := cmd.parseRecord(cmd.records[batchIndex].key(), opCount, generation, expiration)
if err != nil {
cmd.records[batchIndex].setError(cmd.node, resultCode, cmd.batchInDoubt(cmd.attr.hasWrite, cmd.commandSentCounter))
return false, err
}
cmd.records[batchIndex].setRecord(rec)
} else if batchObjectParser != nil {
// mark it as found
cmd.objectsFound[batchIndex] = true
if err := batchObjectParser(cmd, batchIndex, opCount, fieldCount, generation, expiration); err != nil {
return false, err

}
}
}
}

return true, nil
}

// Parses the given byte buffer and populate the result object.
// Returns the number of bytes that were parsed from the given buffer.
func (cmd *batchIndexCommandGet) parseRecord(key *Key, opCount int, generation, expiration uint32) (*Record, Error) {
bins := make(BinMap, opCount)

for i := 0; i < opCount; i++ {
if err := cmd.readBytes(8); err != nil {
return nil, err
}
opSize := int(Buffer.BytesToUint32(cmd.dataBuffer, 0))
particleType := int(cmd.dataBuffer[5])
nameSize := int(cmd.dataBuffer[7])

if err := cmd.readBytes(nameSize); err != nil {
return nil, err
}
name := string(cmd.dataBuffer[:nameSize])

particleBytesSize := opSize - (4 + nameSize)
if err := cmd.readBytes(particleBytesSize); err != nil {
return nil, err
}
value, err := bytesToParticle(particleType, cmd.dataBuffer, 0, particleBytesSize)
if err != nil {
return nil, err
}

if cmd.isOperation {
if prev, ok := bins[name]; ok {
if prev2, ok := prev.(OpResults); ok {
bins[name] = append(prev2, value)
} else {
bins[name] = OpResults{prev, value}
}
} else {
bins[name] = value
}
} else {
bins[name] = value
}
}

return newRecord(cmd.node, key, bins, generation, expiration), nil
}
Loading
Loading