From fdd4c1574413b5d46d0179868e9a035a974de4e6 Mon Sep 17 00:00:00 2001 From: Ankit Sharma <111491139+hash-data@users.noreply.github.com> Date: Wed, 19 Feb 2025 14:17:48 +0530 Subject: [PATCH] chore: Exponential Backoff For Backfill In Mongodb (#74) Co-authored-by: Shubham Baldava --- drivers/base/utils.go | 10 ++--- drivers/mongodb/README.md | 3 +- drivers/mongodb/internal/backfill.go | 61 ++++++++++++++-------------- drivers/mongodb/internal/config.go | 1 + drivers/mongodb/internal/mon.go | 14 +++++-- types/stream_configured.go | 10 ++++- 6 files changed, 58 insertions(+), 41 deletions(-) diff --git a/drivers/base/utils.go b/drivers/base/utils.go index 5f46854..3000251 100644 --- a/drivers/base/utils.go +++ b/drivers/base/utils.go @@ -6,14 +6,14 @@ import ( "github.com/datazip-inc/olake/logger" ) -func RetryOnFailure(attempts int, sleep *time.Duration, f func() error) (err error) { - for i := 0; i < attempts; i++ { +func RetryOnBackoff(attempts int, sleep time.Duration, f func() error) (err error) { + for cur := 0; cur < attempts; cur++ { if err = f(); err == nil { return nil } - - logger.Infof("Retrying after %v...", sleep) - time.Sleep(*sleep) + logger.Infof("retry attempt[%d], retrying after %.2f seconds due to err: %s", cur+1, sleep.Seconds(), err) + time.Sleep(sleep) + sleep = sleep * 2 } return err diff --git a/drivers/mongodb/README.md b/drivers/mongodb/README.md index 468de3f..9821a67 100644 --- a/drivers/mongodb/README.md +++ b/drivers/mongodb/README.md @@ -42,7 +42,8 @@ Add MongoDB credentials in following format in config.json file "server-ram": 16, "database": "database", "max_threads": 50, - "default_mode" :"cdc" + "default_mode" :"cdc", + "backoff_retry_count": 2 } ``` diff --git a/drivers/mongodb/internal/backfill.go b/drivers/mongodb/internal/backfill.go index e146769..c48bc4f 100644 --- a/drivers/mongodb/internal/backfill.go +++ b/drivers/mongodb/internal/backfill.go @@ -9,6 +9,7 @@ import ( "time" "github.com/datazip-inc/olake/constants" + "github.com/datazip-inc/olake/drivers/base" "github.com/datazip-inc/olake/logger" "github.com/datazip-inc/olake/protocol" "github.com/datazip-inc/olake/types" @@ -23,12 +24,13 @@ import ( func (m *Mongo) backfill(stream protocol.Stream, pool *protocol.WriterPool) error { collection := m.client.Database(stream.Namespace(), options.Database().SetReadConcern(readconcern.Majority())).Collection(stream.Name()) chunks := stream.GetStateChunks() + backfillCtx := context.TODO() if chunks == nil || chunks.Len() == 0 { chunks = types.NewSet[types.Chunk]() // chunks state not present means full load logger.Infof("starting full load for stream [%s]", stream.ID()) - totalCount, err := m.totalCountInCollection(collection) + totalCount, err := m.totalCountInCollection(backfillCtx, collection) if err != nil { return err } @@ -84,13 +86,6 @@ func (m *Mongo) backfill(stream protocol.Stream, pool *protocol.WriterPool) erro end = &max } - opts := options.Aggregate().SetAllowDiskUse(true).SetBatchSize(int32(math.Pow10(6))) - cursor, err := collection.Aggregate(ctx, generatepipeline(start, end), opts) - if err != nil { - return fmt.Errorf("collection.Find: %s", err) - } - defer cursor.Close(ctx) - waitChannel := make(chan error, 1) insert, err := pool.NewThread(threadContext, stream, protocol.WithWaitChannel(waitChannel)) if err != nil { @@ -102,32 +97,38 @@ func (m *Mongo) backfill(stream protocol.Stream, pool *protocol.WriterPool) erro err = <-waitChannel }() - for cursor.Next(ctx) { - var doc bson.M - if _, err = cursor.Current.LookupErr("_id"); err != nil { - return fmt.Errorf("looking up idProperty: %s", err) - } else if err = cursor.Decode(&doc); err != nil { - return fmt.Errorf("backfill decoding document: %s", err) - } - - handleObjectID(doc) - exit, err := insert.Insert(types.CreateRawRecord(utils.GetKeysHash(doc, constants.MongoPrimaryID), doc, 0)) + opts := options.Aggregate().SetAllowDiskUse(true).SetBatchSize(int32(math.Pow10(6))) + cursorIterationFunc := func() error { + cursor, err := collection.Aggregate(ctx, generatepipeline(start, end), opts) if err != nil { - return fmt.Errorf("failed to finish backfill chunk: %s", err) + return fmt.Errorf("collection.Find: %s", err) } - if exit { - return nil - } - } + defer cursor.Close(ctx) - if err := cursor.Err(); err != nil { - return err + for cursor.Next(ctx) { + var doc bson.M + if _, err = cursor.Current.LookupErr("_id"); err != nil { + return fmt.Errorf("looking up idProperty: %s", err) + } else if err = cursor.Decode(&doc); err != nil { + return fmt.Errorf("backfill decoding document: %s", err) + } + + handleObjectID(doc) + exit, err := insert.Insert(types.CreateRawRecord(utils.GetKeysHash(doc, constants.MongoPrimaryID), doc, 0)) + if err != nil { + return fmt.Errorf("failed to finish backfill chunk: %s", err) + } + if exit { + return nil + } + } + return cursor.Err() } - return nil + return base.RetryOnBackoff(m.config.RetryCount, 1*time.Minute, cursorIterationFunc) } - return utils.Concurrent(context.TODO(), chunks.Array(), chunks.Len(), func(ctx context.Context, one types.Chunk, number int) error { - err := processChunk(ctx, pool, stream, collection, one.Min, &one.Max) + return utils.Concurrent(backfillCtx, chunks.Array(), m.config.MaxThreads, func(ctx context.Context, one types.Chunk, number int) error { + err := processChunk(backfillCtx, pool, stream, collection, one.Min, &one.Max) if err != nil { return err } @@ -137,14 +138,14 @@ func (m *Mongo) backfill(stream protocol.Stream, pool *protocol.WriterPool) erro }) } -func (m *Mongo) totalCountInCollection(collection *mongo.Collection) (int64, error) { +func (m *Mongo) totalCountInCollection(ctx context.Context, collection *mongo.Collection) (int64, error) { var countResult bson.M command := bson.D{{ Key: "collStats", Value: collection.Name(), }} // Select the database - err := collection.Database().RunCommand(context.TODO(), command).Decode(&countResult) + err := collection.Database().RunCommand(ctx, command).Decode(&countResult) if err != nil { return 0, fmt.Errorf("failed to get total count: %s", err) } diff --git a/drivers/mongodb/internal/config.go b/drivers/mongodb/internal/config.go index 5adc087..bf054d1 100644 --- a/drivers/mongodb/internal/config.go +++ b/drivers/mongodb/internal/config.go @@ -21,6 +21,7 @@ type Config struct { MaxThreads int `json:"max_threads"` Database string `json:"database"` DefaultMode types.SyncMode `json:"default_mode"` + RetryCount int `json:"backoff_retry_count"` } func (c *Config) URI() string { diff --git a/drivers/mongodb/internal/mon.go b/drivers/mongodb/internal/mon.go index e30ce12..92fe87a 100644 --- a/drivers/mongodb/internal/mon.go +++ b/drivers/mongodb/internal/mon.go @@ -17,8 +17,9 @@ import ( ) const ( - discoverTime = 5 * time.Minute // maximum time allowed to discover all the streams - cdcCursorField = "_data" + discoverTime = 5 * time.Minute // maximum time allowed to discover all the streams + cdcCursorField = "_data" + defaultBackoffCount = 3 ) type Mongo struct { @@ -54,7 +55,14 @@ func (m *Mongo) Setup() error { m.client = conn // no need to check from discover if it have cdc support or not m.CDCSupport = true - // set state + // check for default backoff count + if m.config.RetryCount < 0 { + logger.Info("setting backoff retry count to default value %d", defaultBackoffCount) + m.config.RetryCount = defaultBackoffCount + } else { + // add 1 for first run + m.config.RetryCount += 1 + } return nil } diff --git a/types/stream_configured.go b/types/stream_configured.go index 3f08056..a3987ca 100644 --- a/types/stream_configured.go +++ b/types/stream_configured.go @@ -137,15 +137,21 @@ func (s *ConfiguredStream) SetStateChunks(chunks *Set[Chunk]) { // remove chunk func (s *ConfiguredStream) RemoveStateChunk(chunk Chunk) { + // locking global state so that marshaling call not happen on streamState while writing + // example: logState can be called from anywhere which marshal the streamState + s.globalState.Lock() s.streamState.Lock() - defer s.streamState.Unlock() + defer func() { + s.streamState.Unlock() + s.globalState.Unlock() + s.globalState.LogState() + }() stateChunks, loaded := s.streamState.State.LoadAndDelete(ChunksKey) if loaded { stateChunks.(*Set[Chunk]).Remove(chunk) s.streamState.State.Store(ChunksKey, stateChunks) } - s.globalState.LogState() } // Delete keys from Stream State