Skip to content

Commit

Permalink
fix: refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Jul 11, 2023
1 parent 96228a0 commit f5f592e
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 27 deletions.
25 changes: 8 additions & 17 deletions couchbase/doc_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,26 @@ import (
"github.com/couchbase/gocbcore/v10/memd"

"github.com/couchbase/gocbcore/v10"
jsoniter "github.com/json-iterator/go"
)

func CreateDocument(ctx context.Context,
agent *gocbcore.Agent,
scopeName string,
collectionName string,
id []byte,
value interface{},
value []byte,
flags uint32,
expiry uint32,
) error {
opm := NewAsyncOp(ctx)

deadline, _ := ctx.Deadline()

payload, _ := jsoniter.Marshal(value)

ch := make(chan error)

op, err := agent.Set(gocbcore.SetOptions{
Key: id,
Value: payload,
Value: value,
Flags: flags,
Deadline: deadline,
Expiry: expiry,
Expand All @@ -54,23 +51,21 @@ func UpdateDocument(ctx context.Context,
scopeName string,
collectionName string,
id []byte,
value interface{},
value []byte,
expiry uint32,
) error {
opm := NewAsyncOp(ctx)

deadline, _ := ctx.Deadline()

payload, _ := jsoniter.Marshal(value)

ch := make(chan error)

op, err := agent.MutateIn(gocbcore.MutateInOptions{
Key: id,
Ops: []gocbcore.SubDocOp{
{
Op: memd.SubDocOpSetDoc,
Value: payload,
Value: value,
},
},
Expiry: expiry,
Expand Down Expand Up @@ -127,15 +122,13 @@ func UpsertXattrs(ctx context.Context,
collectionName string,
id []byte,
path string,
xattrs interface{},
value []byte,
expiry uint32,
) error {
opm := NewAsyncOp(ctx)

deadline, _ := ctx.Deadline()

payload, _ := jsoniter.Marshal(xattrs)

ch := make(chan error)

op, err := agent.MutateIn(gocbcore.MutateInOptions{
Expand All @@ -145,7 +138,7 @@ func UpsertXattrs(ctx context.Context,
Op: memd.SubDocOpDictSet,
Flags: memd.SubdocFlagXattrPath,
Path: path,
Value: payload,
Value: value,
},
},
Expiry: expiry,
Expand Down Expand Up @@ -253,22 +246,20 @@ func CreatePath(ctx context.Context,
collectionName string,
id []byte,
path []byte,
value interface{},
value []byte,
) error {
opm := NewAsyncOp(ctx)

deadline, _ := ctx.Deadline()

payload, _ := jsoniter.Marshal(value)

ch := make(chan error)

op, err := agent.MutateIn(gocbcore.MutateInOptions{
Key: id,
Ops: []gocbcore.SubDocOp{
{
Op: memd.SubDocOpDictSet,
Value: payload,
Value: value,
Path: string(path),
},
},
Expand Down
24 changes: 16 additions & 8 deletions couchbase/membership.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,16 @@ func (h *cbMembership) register() {
ClusterJoinTime: now,
}

err = UpdateDocument(ctx, h.client.GetMetaAgent(), h.scopeName, h.collectionName, h.id, instance, _expirySec)
payload, _ := jsoniter.Marshal(instance)

err = UpdateDocument(ctx, h.client.GetMetaAgent(), h.scopeName, h.collectionName, h.id, payload, _expirySec)

var kvErr *gocbcore.KeyValueError
if err != nil && errors.As(err, &kvErr) && kvErr.StatusCode == memd.StatusKeyNotFound {
err = CreateDocument(ctx, h.client.GetMetaAgent(), h.scopeName, h.collectionName, h.id, instance, helpers.JSONFlags, _expirySec)
err = CreateDocument(ctx, h.client.GetMetaAgent(), h.scopeName, h.collectionName, h.id, payload, helpers.JSONFlags, _expirySec)

if err == nil {
err = UpdateDocument(ctx, h.client.GetMetaAgent(), h.scopeName, h.collectionName, h.id, instance, _expirySec)
err = UpdateDocument(ctx, h.client.GetMetaAgent(), h.scopeName, h.collectionName, h.id, payload, _expirySec)
}
}

Expand All @@ -99,14 +101,16 @@ func (h *cbMembership) register() {
}

func (h *cbMembership) createIndex(ctx context.Context, clusterJoinTime int64) error {
err := CreatePath(ctx, h.client.GetMetaAgent(), h.scopeName, h.collectionName, h.instanceAll, h.id, clusterJoinTime)
payload, _ := jsoniter.Marshal(clusterJoinTime)

err := CreatePath(ctx, h.client.GetMetaAgent(), h.scopeName, h.collectionName, h.instanceAll, h.id, payload)

var kvErr *gocbcore.KeyValueError
if err != nil && errors.As(err, &kvErr) && kvErr.StatusCode == memd.StatusKeyNotFound {
err = CreateDocument(ctx, h.client.GetMetaAgent(), h.scopeName, h.collectionName, h.instanceAll, struct{}{}, helpers.JSONFlags, 0)
err = CreateDocument(ctx, h.client.GetMetaAgent(), h.scopeName, h.collectionName, h.instanceAll, []byte{123, 125} /* empty json */, helpers.JSONFlags, 0) //nolint:lll

if err == nil {
err = CreatePath(ctx, h.client.GetMetaAgent(), h.scopeName, h.collectionName, h.instanceAll, h.id, clusterJoinTime)
err = CreatePath(ctx, h.client.GetMetaAgent(), h.scopeName, h.collectionName, h.instanceAll, h.id, payload)
}
}

Expand Down Expand Up @@ -137,7 +141,9 @@ func (h *cbMembership) heartbeat() {
ClusterJoinTime: h.clusterJoinTime,
}

err := UpdateDocument(ctx, h.client.GetMetaAgent(), h.scopeName, h.collectionName, h.id, instance, _expirySec)
payload, _ := jsoniter.Marshal(instance)

err := UpdateDocument(ctx, h.client.GetMetaAgent(), h.scopeName, h.collectionName, h.id, payload, _expirySec)
if err != nil {
logger.ErrorLog.Printf("error while heartbeat: %v", err)
return
Expand Down Expand Up @@ -232,7 +238,9 @@ func (h *cbMembership) updateIndex(ctx context.Context) {
all[*instance.ID] = instance.ClusterJoinTime
}

err := UpdateDocument(ctx, h.client.GetMetaAgent(), h.scopeName, h.collectionName, h.instanceAll, all, 0)
payload, _ := jsoniter.Marshal(all)

err := UpdateDocument(ctx, h.client.GetMetaAgent(), h.scopeName, h.collectionName, h.instanceAll, payload, 0)
if err != nil {
logger.ErrorLog.Printf("error while update instances: %v", err)
return
Expand Down
5 changes: 3 additions & 2 deletions couchbase/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ func (s *cbMetadata) Save(state map[uint16]*models.CheckpointDocument, dirtyOffs
func (s *cbMetadata) saveVBucketCheckpoint(ctx context.Context, vbID uint16, checkpointDocument *models.CheckpointDocument) func() error {
return func() error {
id := getCheckpointID(vbID, s.config.Dcp.Group.Name)
err := UpsertXattrs(ctx, s.client.GetMetaAgent(), s.scopeName, s.collectionName, id, helpers.Name, checkpointDocument, 0)
payload, _ := jsoniter.Marshal(checkpointDocument)
err := UpsertXattrs(ctx, s.client.GetMetaAgent(), s.scopeName, s.collectionName, id, helpers.Name, payload, 0)

var kvErr *gocbcore.KeyValueError
if err != nil && errors.As(err, &kvErr) && kvErr.StatusCode == memd.StatusKeyNotFound {
err = CreateDocument(ctx, s.client.GetMetaAgent(), s.scopeName, s.collectionName, id, []byte{}, helpers.JSONFlags, 0)

if err == nil {
err = UpsertXattrs(ctx, s.client.GetMetaAgent(), s.scopeName, s.collectionName, id, helpers.Name, checkpointDocument, 0)
err = UpsertXattrs(ctx, s.client.GetMetaAgent(), s.scopeName, s.collectionName, id, helpers.Name, payload, 0)
}
}
return err
Expand Down

0 comments on commit f5f592e

Please sign in to comment.