Skip to content

Commit

Permalink
change the way to decode the position in the create request
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG committed Jul 29, 2024
1 parent f61806d commit 16e59be
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 9 deletions.
15 changes: 15 additions & 0 deletions core/util/string.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,21 @@ func Base64MsgPosition(position *msgstream.MsgPosition) string {
return base64.StdEncoding.EncodeToString(positionByte)
}

func Base64DecodeMsgPosition(position string) (*msgstream.MsgPosition, error) {
decodeBytes, err := base64.StdEncoding.DecodeString(position)
if err != nil {
log.Warn("fail to decode the position", zap.Error(err))
return nil, err
}
msgPosition := &msgstream.MsgPosition{}
err = proto.Unmarshal(decodeBytes, msgPosition)
if err != nil {
log.Warn("fail to unmarshal the position", zap.Error(err))
return nil, err
}
return msgPosition, nil
}

func GetCreateInfoKey(key string) string {
return fmt.Sprintf("%s_c", key)
}
Expand Down
29 changes: 24 additions & 5 deletions doc/cdc-usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
- Load/Release Partitions
- Create/Drop Database

**Anything not mentioned is not supported;**
**Anything not mentioned is not supported;**

3. Milvus cdc only supports synchronized data. If you need active and backup disaster recovery functions, please contact us;

Expand Down Expand Up @@ -43,8 +43,16 @@ metaStoreConfig:
# the metastore type, available value: etcd, mysql
storeType: etcd
# etcd address
etcdEndpoints:
- localhost:2379
etcd:
address:
- http://127.0.0.1:2379
enableAuth: false
username: root
password: root123456
enableTLS: false
tlsCertPath: deployment/cert/client.pem # path to your cert file
tlsKeyPath: deployment/cert/client.key # path to your key file
tlsCACertPath: deployment/cert/ca.pem # path to your CACert file
# mysql connection address
mysqlSourceUrl: root:root@tcp(127.0.0.1:3306)/milvus-cdc?charset=utf8
# meta data prefix, if multiple cdc services use the same store service, you can set different rootPaths to achieve multi-tenancy
Expand All @@ -53,8 +61,19 @@ metaStoreConfig:
# milvus-source config, these settings are basically the same as the corresponding configuration of milvus.yaml in milvus source.
sourceConfig:
# etcd config
etcdAddress:
- localhost:2379
etcd:
address:
- http://127.0.0.1:2379
rootPath: by-dev
metaSubPath: meta
enableAuth: false
username: root
password: root123456
enableTLS: false
tlsCertPath: deployment/cert/client.pem # path to your cert file
tlsKeyPath: deployment/cert/client.key # path to your key file
tlsCACertPath: deployment/cert/ca.pem # path to your CACert file
tlsMinVersion: 1.3
etcdRootPath: by-dev
etcdMetaSubPath: meta
# default partition name
Expand Down
8 changes: 4 additions & 4 deletions server/cdc_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,14 @@ func (e *MetaCDC) Create(req *request.CreateRequest) (resp *request.CreateRespon
revertCollectionNames()
return nil, servererror.NewClientError(fmt.Sprintf("the vchannel is invalid, %s, err: %s", vchannel, err.Error()))
}
positionDataBytes, err := base64.StdEncoding.DecodeString(collectionPosition)
decodePosition, err := util.Base64DecodeMsgPosition(collectionPosition)
if err != nil {
return nil, servererror.NewServerError(errors.WithMessage(err, "fail to decode the position data"))
}
p := &meta.PositionInfo{
DataPair: &commonpb.KeyDataPair{
Key: channelInfo.PChannelName,
Data: positionDataBytes,
Data: decodePosition.MsgID,
},
}
positions[channelInfo.PChannelName] = p
Expand Down Expand Up @@ -294,7 +294,7 @@ func (e *MetaCDC) Create(req *request.CreateRequest) (resp *request.CreateRespon
}

if req.RPCChannelInfo.Position != "" {
positionDataBytes, err := base64.StdEncoding.DecodeString(req.RPCChannelInfo.Position)
decodePosition, err := util.Base64DecodeMsgPosition(req.RPCChannelInfo.Position)
if err != nil {
return nil, servererror.NewServerError(errors.WithMessage(err, "fail to decode the rpc position data"))
}
Expand All @@ -307,7 +307,7 @@ func (e *MetaCDC) Create(req *request.CreateRequest) (resp *request.CreateRespon
req.RPCChannelInfo.Name: {
DataPair: &commonpb.KeyDataPair{
Key: req.RPCChannelInfo.Name,
Data: positionDataBytes,
Data: decodePosition.MsgID,
},
},
},
Expand Down

0 comments on commit 16e59be

Please sign in to comment.