Skip to content

Commit

Permalink
mathpresso custom initialize
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-lss committed Aug 24, 2024
1 parent 82f9dd0 commit 7802f41
Show file tree
Hide file tree
Showing 9 changed files with 739 additions and 50 deletions.
5 changes: 3 additions & 2 deletions core/api/replicate_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ type ChannelManager interface {
AddDroppedCollection(ids []int64)
AddDroppedPartition(ids []int64)

StartReadCollection(ctx context.Context, info *pb.CollectionInfo, seekPositions []*msgpb.MsgPosition) error
StartReadCollection(ctx context.Context, info *pb.CollectionInfo, targetDBType string, seekPositions []*msgpb.MsgPosition) error

StopReadCollection(ctx context.Context, info *pb.CollectionInfo) error
AddPartition(ctx context.Context, collectionInfo *pb.CollectionInfo, partitionInfo *pb.PartitionInfo) error

Expand Down Expand Up @@ -103,7 +104,7 @@ func (d *DefaultChannelManager) AddDroppedPartition(ids []int64) {
log.Warn("AddDroppedPartition is not implemented, please check it")
}

func (d *DefaultChannelManager) StartReadCollection(ctx context.Context, info *pb.CollectionInfo, seekPositions []*msgpb.MsgPosition) error {
func (d *DefaultChannelManager) StartReadCollection(ctx context.Context, info *pb.CollectionInfo, targetDBType string, seekPositions []*msgpb.MsgPosition) error {
log.Warn("StartReadCollection is not implemented, please check it")
return nil
}
Expand Down
5 changes: 3 additions & 2 deletions core/reader/collection_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type CollectionReader struct {
quitOnce sync.Once

retryOptions []retry.Option
targetDBType string
}

func NewCollectionReader(id string,
Expand Down Expand Up @@ -124,7 +125,7 @@ func (reader *CollectionReader) StartRead(ctx context.Context) {
Timestamp: info.CreateTime,
})
}
if err := reader.channelManager.StartReadCollection(ctx, info, startPositions); err != nil {
if err := reader.channelManager.StartReadCollection(ctx, info, reader.targetDBType, startPositions); err != nil {
collectionLog.Warn("fail to start to replicate the collection data in the watch process", zap.Any("info", info), zap.Error(err))
reader.sendError(err)
}
Expand Down Expand Up @@ -251,7 +252,7 @@ func (reader *CollectionReader) StartRead(ctx context.Context) {
zap.String("name", info.Schema.Name),
zap.Int64("collection_id", info.ID),
zap.String("state", info.State.String()))
if err := reader.channelManager.StartReadCollection(ctx, info, seekPositions); err != nil {
if err := reader.channelManager.StartReadCollection(ctx, info, reader.targetDBType, seekPositions); err != nil {
readerLog.Warn("fail to start to replicate the collection data", zap.Any("collection", info), zap.Error(err))
reader.sendError(err)
}
Expand Down
38 changes: 30 additions & 8 deletions core/reader/replicate_channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package reader

import (
"context"
"fmt"
"io"
"math"
"sort"
Expand Down Expand Up @@ -159,7 +160,7 @@ func (r *replicateChannelManager) AddDroppedPartition(ids []int64) {
log.Info("has removed dropped partitions", zap.Int64s("ids", ids))
}

func (r *replicateChannelManager) StartReadCollection(ctx context.Context, info *pb.CollectionInfo, seekPositions []*msgpb.MsgPosition) error {
func (r *replicateChannelManager) StartReadCollection(ctx context.Context, info *pb.CollectionInfo, targetDBType string, seekPositions []*msgpb.MsgPosition) error {
r.addCollectionLock.Lock()
*r.addCollectionCnt++
r.addCollectionLock.Unlock()
Expand Down Expand Up @@ -261,6 +262,7 @@ func (r *replicateChannelManager) StartReadCollection(ctx context.Context, info
}

targetMsgCount := len(info.StartPositions)

barrier := NewBarrier(targetMsgCount, func(msgTs uint64, b *Barrier) {
select {
case <-b.CloseChan:
Expand All @@ -284,9 +286,16 @@ func (r *replicateChannelManager) StartReadCollection(ctx context.Context, info

var successChannels []string
var channelHandlers []*replicateChannelHandler
err = ForeachChannel(info.VirtualChannelNames, targetInfo.VChannels, func(sourceVChannel, targetVChannel string) error {
sourcePChannel := funcutil.ToPhysicalChannel(sourceVChannel)
targetPChannel := funcutil.ToPhysicalChannel(targetVChannel)
toPhysicalChannel := func(vChannel string) string {
if strings.ToLower(vChannel) != "mysql" && strings.ToLower(vChannel) != "bigquery" {
return funcutil.ToPhysicalChannel(vChannel)
}

return strings.ToLower(vChannel)
}
err = ForeachChannel(targetDBType, info.VirtualChannelNames, targetInfo.VChannels, targetInfo.DatabaseName, targetInfo.CollectionName, func(sourceVChannel, targetVChannel string) error {
sourcePChannel := toPhysicalChannel(sourceVChannel)
targetPChannel := toPhysicalChannel(targetVChannel)
channelHandler, err := r.startReadChannel(&model.SourceCollectionInfo{
PChannelName: sourcePChannel,
VChannelName: sourceVChannel,
Expand Down Expand Up @@ -340,12 +349,25 @@ func GetVChannelByPChannel(pChannel string, vChannels []string) string {
return ""
}

func ForeachChannel(sourcePChannels, targetPChannels []string, f func(sourcePChannel, targetPChannel string) error) error {
if len(sourcePChannels) != len(targetPChannels) {
return errors.New("the lengths of source and target channels are not equal")
func ForeachChannel(targetDBType string, sourcePChannels, targetPChannels []string, targetDBName, targetCollectionName string, f func(sourcePChannel, targetPChannel string) error) error {
if targetDBType == "milvus" {
if len(sourcePChannels) != len(targetPChannels) {
return errors.New("the lengths of source and target channels are not equal")
}
}

sources := make([]string, len(sourcePChannels))
targets := make([]string, len(targetPChannels))
var targets []string

if targetDBType == "milvus" {
targets = make([]string, len(targetPChannels))
} else {
targets = make([]string, len(sourcePChannels))
for i := 0; i < len(sourcePChannels); i++ {
targetPChannels = append(targetPChannels, fmt.Sprintf("%s-%s-%s-dml_%d_v0", targetDBName, targetCollectionName, targetDBType, i))
}
}

copy(sources, sourcePChannels)
copy(targets, targetPChannels)
sort.Strings(sources)
Expand Down
12 changes: 8 additions & 4 deletions core/reader/target_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,14 @@ type TargetClient struct {
}

type TargetConfig struct {
URI string
Token string
APIKey string
DialConfig util.DialConfig
TargetDBType string
URI string
Token string
ProjectId string
APIKey string
ConnectionTimeout int
DialConfig util.DialConfig
TargetCDCAgentUri string
}

func NewTarget(ctx context.Context, config TargetConfig) (api.TargetAPI, error) {
Expand Down
114 changes: 114 additions & 0 deletions core/util/devon_client_resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the LF AI & Data foundation under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* //
* http://www.apache.org/licenses/LICENSE-2.0
* //
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* custom by qanda
*/

package util

import (
"context"
"fmt"
"net"
"reflect"
"sync"
"time"

"github.com/cockroachdb/errors"
"go.uber.org/zap"

"github.com/milvus-io/milvus/pkg/util/resource"

"github.com/zilliztech/milvus-cdc/core/log"
)

const (
DBClientResourceTyp = "db_client"
DBClientExpireTime = 30 * time.Second
)

var (
dbClientManager *DBClientResourceManager
dbClientManagerOnce sync.Once
)

type DBClientResourceManager struct {
manager resource.Manager
}

func GetDBClientManager() *DBClientResourceManager {
dbClientManagerOnce.Do(func() {
manager := resource.NewManager(0, 0, nil)
dbClientManager = &DBClientResourceManager{
manager: manager,
}
})
return dbClientManager
}

func (m *DBClientResourceManager) newDBClient(cdcAgentHost, cdcAgentPort, address, database, collection string, dialConfig DialConfig) resource.NewResourceFunc {
return func() (resource.Resource, error) {
conn, err := net.Dial("tcp", cdcAgentHost+":"+cdcAgentPort)
if err != nil {
log.Warn("Error connecting:", zap.Error(err))
return nil, err
}
/*
c, err := client.NewClient(ctx, client.Config{
Address: address,
APIKey: apiKey,
EnableTLSAuth: enableTLS,
DBName: database,
})
if err != nil {
log.Warn("fail to new the db client", zap.String("database", database), zap.String("address", address), zap.Error(err))
return nil, err
}
*/
res := resource.NewSimpleResource(conn, DBClientResourceTyp, fmt.Sprintf("%s:%s:%s", address, database, collection), DBClientExpireTime, func() {
_ = conn.Close()
})

return res, nil
}
}

func (m *DBClientResourceManager) GetDBClient(ctx context.Context, cdcAgentHost, cdcAgentPort, address, database, collection string, dialConfig DialConfig, connectionTimeout int) (net.Conn, error) {
if database == "" {
database = DefaultDbName
}
ctxLog := log.Ctx(ctx).With(zap.String("database", database), zap.String("address", address))
res, err := m.manager.Get(DBClientResourceTyp,
getDBClientResourceName(address, database, collection),
m.newDBClient(cdcAgentHost, cdcAgentPort, address, database, collection, dialConfig))
if err != nil {
ctxLog.Error("fail to get db client", zap.Error(err))
return nil, err
}
if obj, ok := res.Get().(net.Conn); ok && obj != nil {
return obj, nil
}
ctxLog.Warn("invalid resource object", zap.Any("obj", reflect.TypeOf(res.Get())))
return nil, errors.New("invalid resource object")
}

func (m *DBClientResourceManager) DeleteDBClient(address, database, collection string) {
_ = m.manager.Delete(DBClientResourceTyp, getDBClientResourceName(address, database, collection))
}

func getDBClientResourceName(address, database, collection string) string {
return fmt.Sprintf("%s:%s:%s", address, database, collection)
}
42 changes: 37 additions & 5 deletions core/writer/config_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,66 @@ import (
"github.com/zilliztech/milvus-cdc/core/util"
)

func TokenOption(token string) config.Option[*MilvusDataHandler] {
func MilvusTokenOption(token string) config.Option[*MilvusDataHandler] {
return config.OptionFunc[*MilvusDataHandler](func(object *MilvusDataHandler) {
object.token = token
})
}

func URIOption(uri string) config.Option[*MilvusDataHandler] {
func MilvusURIOption(uri string) config.Option[*MilvusDataHandler] {
return config.OptionFunc[*MilvusDataHandler](func(object *MilvusDataHandler) {
object.uri = uri
})
}

func ConnectTimeoutOption(timeout int) config.Option[*MilvusDataHandler] {
func MilvusConnectTimeoutOption(timeout int) config.Option[*MilvusDataHandler] {
return config.OptionFunc[*MilvusDataHandler](func(object *MilvusDataHandler) {
if timeout > 0 {
object.connectTimeout = timeout
}
})
}

func IgnorePartitionOption(ignore bool) config.Option[*MilvusDataHandler] {
func MilvusIgnorePartitionOption(ignore bool) config.Option[*MilvusDataHandler] {
return config.OptionFunc[*MilvusDataHandler](func(object *MilvusDataHandler) {
object.ignorePartition = ignore
})
}

func DialConfigOption(dialConfig util.DialConfig) config.Option[*MilvusDataHandler] {
func MilvusDialConfigOption(dialConfig util.DialConfig) config.Option[*MilvusDataHandler] {
return config.OptionFunc[*MilvusDataHandler](func(object *MilvusDataHandler) {
object.dialConfig = dialConfig
})
}

func TokenOption(token string) config.Option[*DataHandler] {
return config.OptionFunc[*DataHandler](func(object *DataHandler) {
object.token = token
})
}

func URIOption(uri string) config.Option[*DataHandler] {
return config.OptionFunc[*DataHandler](func(object *DataHandler) {
object.uri = uri
})
}

func ConnectTimeoutOption(timeout int) config.Option[*DataHandler] {
return config.OptionFunc[*DataHandler](func(object *DataHandler) {
if timeout > 0 {
object.connectTimeout = timeout
}
})
}

func IgnorePartitionOption(ignore bool) config.Option[*DataHandler] {
return config.OptionFunc[*DataHandler](func(object *DataHandler) {
object.ignorePartition = ignore
})
}

func DialConfigOption(dialConfig util.DialConfig) config.Option[*DataHandler] {
return config.OptionFunc[*DataHandler](func(object *DataHandler) {
object.dialConfig = dialConfig
})
}
Loading

0 comments on commit 7802f41

Please sign in to comment.