Skip to content

Commit

Permalink
Merge pull request #460 from aerospike/stage
Browse files Browse the repository at this point in the history
Go Client v8.0.0-beta.2
  • Loading branch information
khaf authored Dec 23, 2024
2 parents f481016 + 8f77af4 commit 8580877
Show file tree
Hide file tree
Showing 6 changed files with 424 additions and 28 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Change History

## December 20 2024: v8.0.0-beta.2

- **New Features**
- [CLIENT-2820] Actively refresh pool connections that will be idle before the next tend.

- **Fixes**
- [CLIENT-3218] Fix FilterExpression encoding in Batch commands.

## December 13 2024: v8.0.0-beta.1

Major breaking release. This release supports Multi-Record Transactions. Please note that this is a beta release and will be subject to breaking changes.
Expand Down
170 changes: 161 additions & 9 deletions batch_index_command_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ package aerospike

import (
"github.com/aerospike/aerospike-client-go/v8/types"
Buffer "github.com/aerospike/aerospike-client-go/v8/utils/buffer"
)

type batchIndexCommandGet struct {
batchCommandOperate

indexRecords []*BatchRead
records []*BatchRead
}

func newBatchIndexCommandGet(
Expand All @@ -31,19 +32,24 @@ func newBatchIndexCommandGet(
records []*BatchRead,
isOperation bool,
) batchIndexCommandGet {
recIfcs := make([]BatchRecordIfc, len(records))
for i := range records {
recIfcs[i] = records[i]
}

res := batchIndexCommandGet{
batchCommandOperate: newBatchCommandOperate(client, batch, policy, recIfcs),
indexRecords: records,
batchCommandOperate: newBatchCommandOperate(client, batch, policy, nil),
records: records,
}
res.txn = policy.Txn
return res
}

func (cmd *batchIndexCommandGet) writeBuffer(ifc command) Error {
attr, err := cmd.setBatchOperateRead(cmd.client, cmd.policy, cmd.records, cmd.batch)
cmd.attr = attr
return err
}

func (cmd *batchIndexCommandGet) isRead() bool {
return true
}

func (cmd *batchIndexCommandGet) cloneBatchCommand(batch *batchNode) batcher {
res := *cmd
res.batch = batch
Expand All @@ -60,7 +66,7 @@ func (cmd *batchIndexCommandGet) Execute() Error {
}

func (cmd *batchIndexCommandGet) executeSingle(client *Client) Error {
for _, br := range cmd.indexRecords {
for _, br := range cmd.records {
var ops []*Operation
if br.headerOnly() {
ops = []*Operation{GetHeaderOp()}
Expand Down Expand Up @@ -96,3 +102,149 @@ func (cmd *batchIndexCommandGet) executeSingle(client *Client) Error {
}
return nil
}

// Parse all results in the batch. Add records to shared list.
// If the record was not found, the bins will be nil.
func (cmd *batchIndexCommandGet) parseRecordResults(ifc command, receiveSize int) (bool, Error) {
//Parse each message response and add it to the result array
cmd.dataOffset = 0
for cmd.dataOffset < receiveSize {
if err := cmd.readBytes(int(_MSG_REMAINING_HEADER_SIZE)); err != nil {
return false, err
}
resultCode := types.ResultCode(cmd.dataBuffer[5] & 0xFF)

info3 := int(cmd.dataBuffer[3])

// If cmd is the end marker of the response, do not proceed further
if resultCode == 0 && (info3&_INFO3_LAST) == _INFO3_LAST {
return false, nil
}

generation := Buffer.BytesToUint32(cmd.dataBuffer, 6)
expiration := types.TTL(Buffer.BytesToUint32(cmd.dataBuffer, 10))
batchIndex := int(Buffer.BytesToUint32(cmd.dataBuffer, 14))
fieldCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 18))
opCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 20))

err := cmd.parseFieldsBatch(resultCode, fieldCount, cmd.records[batchIndex])
if err != nil {
return false, err
}

if resultCode != 0 {
if resultCode == types.FILTERED_OUT {
cmd.filteredOutCnt++
}

// If it looks like the error is on the first record and the message is marked as last part,
// the error is for the whole command and not just for the first batchIndex
lastMessage := (info3 & _INFO3_LAST) == _INFO3_LAST
if resultCode != 0 && lastMessage && receiveSize == int(_MSG_REMAINING_HEADER_SIZE) {
return false, newError(resultCode).setNode(cmd.node)
}

if resultCode == types.UDF_BAD_RESPONSE {
rec, err := cmd.parseRecord(cmd.records[batchIndex].key(), opCount, generation, expiration)
if err != nil {
cmd.records[batchIndex].setError(cmd.node, resultCode, cmd.batchInDoubt(cmd.attr.hasWrite, cmd.commandSentCounter))
return false, err
}

// for UDF failures
var msg any
if rec != nil {
msg = rec.Bins["FAILURE"]
}

// Need to store record because failure bin contains an error message.
cmd.records[batchIndex].setRecord(rec)
if msg, ok := msg.(string); ok && len(msg) > 0 {
cmd.records[batchIndex].setErrorWithMsg(cmd.node, resultCode, msg, cmd.batchInDoubt(cmd.attr.hasWrite, cmd.commandSentCounter))
} else {
cmd.records[batchIndex].setError(cmd.node, resultCode, cmd.batchInDoubt(cmd.attr.hasWrite, cmd.commandSentCounter))
}

// If cmd is the end marker of the response, do not proceed further
// if (info3 & _INFO3_LAST) == _INFO3_LAST {
if lastMessage {
return false, nil
}
continue
}

cmd.records[batchIndex].setError(cmd.node, resultCode, cmd.batchInDoubt(cmd.attr.hasWrite, cmd.commandSentCounter))

// If cmd is the end marker of the response, do not proceed further
if (info3 & _INFO3_LAST) == _INFO3_LAST {
return false, nil
}
continue
}

if resultCode == 0 {
if cmd.objects == nil {
rec, err := cmd.parseRecord(cmd.records[batchIndex].key(), opCount, generation, expiration)
if err != nil {
cmd.records[batchIndex].setError(cmd.node, resultCode, cmd.batchInDoubt(cmd.attr.hasWrite, cmd.commandSentCounter))
return false, err
}
cmd.records[batchIndex].setRecord(rec)
} else if batchObjectParser != nil {
// mark it as found
cmd.objectsFound[batchIndex] = true
if err := batchObjectParser(cmd, batchIndex, opCount, fieldCount, generation, expiration); err != nil {
return false, err

}
}
}
}

return true, nil
}

// Parses the given byte buffer and populate the result object.
// Returns the number of bytes that were parsed from the given buffer.
func (cmd *batchIndexCommandGet) parseRecord(key *Key, opCount int, generation, expiration uint32) (*Record, Error) {
bins := make(BinMap, opCount)

for i := 0; i < opCount; i++ {
if err := cmd.readBytes(8); err != nil {
return nil, err
}
opSize := int(Buffer.BytesToUint32(cmd.dataBuffer, 0))
particleType := int(cmd.dataBuffer[5])
nameSize := int(cmd.dataBuffer[7])

if err := cmd.readBytes(nameSize); err != nil {
return nil, err
}
name := string(cmd.dataBuffer[:nameSize])

particleBytesSize := opSize - (4 + nameSize)
if err := cmd.readBytes(particleBytesSize); err != nil {
return nil, err
}
value, err := bytesToParticle(particleType, cmd.dataBuffer, 0, particleBytesSize)
if err != nil {
return nil, err
}

if cmd.isOperation {
if prev, ok := bins[name]; ok {
if prev2, ok := prev.(OpResults); ok {
bins[name] = append(prev2, value)
} else {
bins[name] = OpResults{prev, value}
}
} else {
bins[name] = value
}
} else {
bins[name] = value
}
}

return newRecord(cmd.node, key, bins, generation, expiration), nil
}
Loading

0 comments on commit 8580877

Please sign in to comment.