Skip to content

Commit

Permalink
feat: Resumable Full Load In Mongodb (#61) (#99)
Browse files Browse the repository at this point in the history
Co-authored-by: Datazip <datazip@Datazips-MBP.localdomain>
Co-authored-by: Datazip <datazip@Datazips-MacBook-Pro.local>
Co-authored-by: Shubham Baldava <shubham@datazip.io>
Co-authored-by: hash-data <ankit@datazip.io>
  • Loading branch information
5 people authored Feb 18, 2025
1 parent 1b05c50 commit 3806a32
Show file tree
Hide file tree
Showing 15 changed files with 250 additions and 161 deletions.
139 changes: 79 additions & 60 deletions drivers/mongodb/internal/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,81 +20,87 @@ import (
"go.mongodb.org/mongo-driver/mongo/readconcern"
)

type Boundry struct {
StartID primitive.ObjectID `json:"start"`
EndID *primitive.ObjectID `json:"end"`
end time.Time
}

func (m *Mongo) backfill(stream protocol.Stream, pool *protocol.WriterPool) error {
logger.Infof("starting full load for stream [%s]", stream.ID())

collection := m.client.Database(stream.Namespace(), options.Database().SetReadConcern(readconcern.Majority())).Collection(stream.Name())
totalCount, err := m.totalCountInCollection(collection)
if err != nil {
return err
}
chunks := stream.GetStateChunks()
if chunks == nil || chunks.Len() == 0 {
chunks = types.NewSet[types.Chunk]()
// chunks state not present means full load
logger.Infof("starting full load for stream [%s]", stream.ID())

first, last, err := m.fetchExtremes(collection)
if err != nil {
return err
}

logger.Infof("Extremes of Stream %s are start: %s \t end:%s", stream.ID(), first, last)
logger.Infof("Total expected count for stream %s are %d", stream.ID(), totalCount)

timeDiff := last.Sub(first).Hours() / 6
if timeDiff < 1 {
timeDiff = 1
}
// for every 6hr difference ideal density is 10 Seconds
density := time.Duration(timeDiff) * (10 * time.Second)
return utils.ConcurrentC(context.TODO(), utils.Yield(func(prev *Boundry) (bool, *Boundry, error) {
start := first
if prev != nil {
start = prev.end
totalCount, err := m.totalCountInCollection(collection)
if err != nil {
return err
}
boundry := &Boundry{
StartID: *generateMinObjectID(start),

first, last, err := m.fetchExtremes(collection)
if err != nil {
return err
}

end := start.Add(density)
exit := true
if !end.After(last) {
exit = false
boundry.EndID = generateMinObjectID(end)
boundry.end = end
} else {
logger.Info("Scheduling last full load chunk query!")
logger.Infof("Extremes of Stream %s are start: %s \t end:%s", stream.ID(), first, last)
logger.Infof("Total expected count for stream %s are %d", stream.ID(), totalCount)

timeDiff := last.Sub(first).Hours() / 6
if timeDiff < 1 {
timeDiff = 1
}
// for every 6hr difference ideal density is 10 Seconds
density := time.Duration(timeDiff) * (10 * time.Second)
start := first
for start.Before(last) {
end := start.Add(density)
minObjectID := generateMinObjectID(start)
maxObjecID := generateMinObjectID(end)
if end.After(last) {
maxObjecID = generateMinObjectID(last.Add(time.Second))
}
start = end
chunks.Insert(types.Chunk{
Min: minObjectID,
Max: maxObjecID,
})
}
// save the chunks state
stream.SetStateChunks(chunks)

return exit, boundry, nil
}), m.config.MaxThreads, func(ctx context.Context, one *Boundry, number int64) error {
}
logger.Infof("Running backfill for %d chunks", chunks.Len())
// notice: err is declared in return, reason: defer call can access it
processChunk := func(ctx context.Context, pool *protocol.WriterPool, stream protocol.Stream, collection *mongo.Collection, minStr string, maxStr *string) (err error) {
threadContext, cancelThread := context.WithCancel(ctx)
defer cancelThread()
start, err := primitive.ObjectIDFromHex(minStr)
if err != nil {
return fmt.Errorf("invalid min ObjectID: %s", err)
}

var end *primitive.ObjectID
if maxStr != nil {
max, err := primitive.ObjectIDFromHex(*maxStr)
if err != nil {
return fmt.Errorf("invalid max ObjectID: %s", err)
}
end = &max
}

opts := options.Aggregate().SetAllowDiskUse(true).SetBatchSize(int32(math.Pow10(6)))
cursor, err := collection.Aggregate(ctx, generatepipeline(one.StartID, one.EndID), opts)
cursor, err := collection.Aggregate(ctx, generatepipeline(start, end), opts)
if err != nil {
return fmt.Errorf("collection.Find: %s", err)
}
defer cursor.Close(ctx)

waitChannel := make(chan struct{})
defer func() {
if stream.GetSyncMode() == types.CDC {
// only wait in cdc mode
// make sure it get called after insert.Close()
<-waitChannel
}
logger.Infof("Finished full load chunk number %d.", number)
}()

insert, err := pool.NewThread(threadContext, stream, protocol.WithNumber(number), protocol.WithWaitChannel(waitChannel))
waitChannel := make(chan error, 1)
insert, err := pool.NewThread(threadContext, stream, protocol.WithWaitChannel(waitChannel))
if err != nil {
return err
}
defer insert.Close()
defer func() {
insert.Close()
// wait for chunk completion
err = <-waitChannel
}()

for cursor.Next(ctx) {
var doc bson.M
Expand All @@ -107,15 +113,28 @@ func (m *Mongo) backfill(stream protocol.Stream, pool *protocol.WriterPool) erro
handleObjectID(doc)
exit, err := insert.Insert(types.CreateRawRecord(utils.GetKeysHash(doc, constants.MongoPrimaryID), doc, 0))
if err != nil {
return fmt.Errorf("failed to finish backfill chunk %d: %s", number, err)
return fmt.Errorf("failed to finish backfill chunk: %s", err)
}
if exit {
return nil
}
}
return cursor.Err()
})

if err := cursor.Err(); err != nil {
return err
}
return nil
}

return utils.Concurrent(context.TODO(), chunks.Array(), chunks.Len(), func(ctx context.Context, one types.Chunk, number int) error {
err := processChunk(ctx, pool, stream, collection, one.Min, &one.Max)
if err != nil {
return err
}
// remove success chunk from state
stream.RemoveStateChunk(one)
return nil
})
}

func (m *Mongo) totalCountInCollection(collection *mongo.Collection) (int64, error) {
Expand Down Expand Up @@ -222,14 +241,14 @@ func generatepipeline(start primitive.ObjectID, end *primitive.ObjectID) mongo.P
}

// function to generate ObjectID with the minimum value for a given time
func generateMinObjectID(t time.Time) *primitive.ObjectID {
func generateMinObjectID(t time.Time) string {
// Create the ObjectID with the first 4 bytes as the timestamp and the rest 8 bytes as 0x00
objectID := primitive.NewObjectIDFromTimestamp(t)
for i := 4; i < 12; i++ {
objectID[i] = 0x00
}

return &objectID
return objectID.Hex()
}

func handleObjectID(doc bson.M) {
Expand Down
8 changes: 7 additions & 1 deletion drivers/mongodb/internal/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ func (m *Mongo) changeStreamSync(stream protocol.Stream, pool *protocol.WriterPo
}

prevResumeToken := stream.GetStateKey(cdcCursorField)
if prevResumeToken == nil {
chunks := stream.GetStateChunks()

if prevResumeToken == nil || chunks == nil || chunks.Len() != 0 {
// get current resume token and do full load for stream
resumeToken, err := m.getCurrentResumeToken(cdcCtx, collection, pipeline)
if err != nil {
Expand All @@ -58,6 +60,10 @@ func (m *Mongo) changeStreamSync(stream protocol.Stream, pool *protocol.WriterPo
if resumeToken != nil {
prevResumeToken = (*resumeToken).Lookup(cdcCursorField).StringValue()
}

// save resume token
stream.SetStateKey(cdcCursorField, prevResumeToken)

if err := m.backfill(stream, pool); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions drivers/mongodb/internal/mon.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (m *Mongo) Setup() error {
m.client = conn
// no need to check from discover if it have cdc support or not
m.CDCSupport = true
// set state
return nil
}

Expand Down
71 changes: 8 additions & 63 deletions logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"strings"
"time"

"github.com/datazip-inc/olake/types"
"github.com/rs/zerolog"
"github.com/spf13/viper"
"gopkg.in/natefinch/lumberjack.v2"
Expand Down Expand Up @@ -74,49 +73,6 @@ func Warnf(format string, v ...interface{}) {
logger.Warn().Msgf(format, v...)
}

func LogSpec(spec map[string]interface{}) {
message := types.Message{}
message.Spec = spec
message.Type = types.SpecMessage

Debug("logging spec")
Info(message)
if configFolder := viper.GetString("CONFIG_FOLDER"); configFolder != "" {
err := FileLogger(message.Spec, configFolder, "config", ".json")
if err != nil {
Fatalf("failed to create spec file: %s", err)
}
}
}

func LogCatalog(streams []*types.Stream) {
message := types.Message{}
message.Type = types.CatalogMessage
message.Catalog = types.GetWrappedCatalog(streams)
Debug("logging catalog")

Info(message)
// write catalog to the specified file
if configFolder := viper.GetString("CONFIG_FOLDER"); configFolder != "" {
err := FileLogger(message.Catalog, configFolder, "catalog", ".json")
if err != nil {
Fatalf("failed to create catalog file: %s", err)
}
}
}
func LogConnectionStatus(err error) {
message := types.Message{}
message.Type = types.ConnectionStatusMessage
message.ConnectionStatus = &types.StatusRow{}
if err != nil {
message.ConnectionStatus.Message = err.Error()
message.ConnectionStatus.Status = types.ConnectionFailed
} else {
message.ConnectionStatus.Status = types.ConnectionSucceed
}
Info(message)
}

func LogResponse(response *http.Response) {
respDump, err := httputil.DumpResponse(response, true)
if err != nil {
Expand All @@ -135,25 +91,13 @@ func LogRequest(req *http.Request) {
fmt.Println(string(requestDump))
}

func LogState(state *types.State) {
state.Lock()
defer state.Unlock()

message := types.Message{}
message.Type = types.StateMessage
message.State = state
Debug("logging state")
Info(message)
if configFolder := viper.GetString("CONFIG_FOLDER"); configFolder != "" {
err := FileLogger(state, configFolder, "state", ".json")
if err != nil {
Fatalf("failed to create state file: %s", err)
}
}
}

// CreateFile creates a new file or overwrites an existing one with the specified filename, path, extension,
func FileLogger(content any, filePath string, fileName, fileExtension string) error {
func FileLogger(content any, fileName, fileExtension string) error {
// get config folder
filePath := viper.GetString("CONFIG_FOLDER")
if filePath == "" {
return fmt.Errorf("config folder is not set")
}
// Construct the full file path
contentBytes, err := json.Marshal(content)
if err != nil {
Expand All @@ -180,7 +124,8 @@ func FileLogger(content any, filePath string, fileName, fileExtension string) er

func Init() {
// Configure lumberjack for log rotation
timestamp := fmt.Sprintf("%d-%d-%d_%d-%d-%d", time.Now().Year(), time.Now().Month(), time.Now().Day(), time.Now().Hour(), time.Now().Minute(), time.Now().Second())
currentTimestamp := time.Now().UTC()
timestamp := fmt.Sprintf("%d-%d-%d_%d-%d-%d", currentTimestamp.Year(), currentTimestamp.Month(), currentTimestamp.Day(), currentTimestamp.Hour(), currentTimestamp.Minute(), currentTimestamp.Second())
rotatingFile := &lumberjack.Logger{
Filename: fmt.Sprintf("%s/logs/sync_%s/olake.log", viper.GetString("CONFIG_FOLDER"), timestamp), // Log file path
MaxSize: 100, // Max size in MB before log rotation
Expand Down
14 changes: 12 additions & 2 deletions protocol/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,17 @@ var checkCmd = &cobra.Command{
return nil
}()

// success
logger.LogConnectionStatus(err)
// log success
message := types.Message{
Type: types.ConnectionStatusMessage,
ConnectionStatus: &types.StatusRow{
Status: types.ConnectionSucceed,
},
}
if err != nil {
message.ConnectionStatus.Message = err.Error()
message.ConnectionStatus.Status = types.ConnectionFailed
}
logger.Info(message)
},
}
4 changes: 2 additions & 2 deletions protocol/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"errors"
"fmt"

"github.com/datazip-inc/olake/logger"
"github.com/datazip-inc/olake/types"
"github.com/datazip-inc/olake/utils"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -38,7 +38,7 @@ var discoverCmd = &cobra.Command{
return errors.New("no streams found in connector")
}

logger.LogCatalog(streams)
types.LogCatalog(streams)
return nil
},
}
4 changes: 3 additions & 1 deletion protocol/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,11 @@ type Stream interface {
SetStateCursor(value any)
SetStateKey(key string, value any)
Validate(source *types.Stream) error
SetStateChunks(chunks *types.Set[types.Chunk])
GetStateChunks() *types.Set[types.Chunk]
RemoveStateChunk(chunk types.Chunk)
}

type State interface {
SetType(typ types.StateType)
IsZero() bool
}
12 changes: 11 additions & 1 deletion protocol/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/datazip-inc/olake/jsonschema"
"github.com/datazip-inc/olake/logger"
"github.com/datazip-inc/olake/types"
"github.com/datazip-inc/olake/utils"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -71,7 +72,16 @@ var specCmd = &cobra.Command{
}
}

logger.LogSpec(spec)
// log spec
message := types.Message{
Spec: spec,
Type: types.SpecMessage,
}
logger.Info(message)
err := logger.FileLogger(message.Spec, "config", ".json")
if err != nil {
logger.Fatalf("failed to create spec file: %s", err)
}

return nil
},
Expand Down
Loading

0 comments on commit 3806a32

Please sign in to comment.