Skip to content

Commit

Permalink
chore: Exponential Backoff For Backfill In Mongodb (#74)
Browse files Browse the repository at this point in the history
Co-authored-by: Shubham Baldava <shubham@datazip.io>
  • Loading branch information
hash-data and shubham19may authored Feb 19, 2025
1 parent 392e081 commit fdd4c15
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 41 deletions.
10 changes: 5 additions & 5 deletions drivers/base/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (
"github.com/datazip-inc/olake/logger"
)

func RetryOnFailure(attempts int, sleep *time.Duration, f func() error) (err error) {
for i := 0; i < attempts; i++ {
func RetryOnBackoff(attempts int, sleep time.Duration, f func() error) (err error) {
for cur := 0; cur < attempts; cur++ {
if err = f(); err == nil {
return nil
}

logger.Infof("Retrying after %v...", sleep)
time.Sleep(*sleep)
logger.Infof("retry attempt[%d], retrying after %.2f seconds due to err: %s", cur+1, sleep.Seconds(), err)
time.Sleep(sleep)
sleep = sleep * 2
}

return err
Expand Down
3 changes: 2 additions & 1 deletion drivers/mongodb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ Add MongoDB credentials in following format in config.json file
"server-ram": 16,
"database": "database",
"max_threads": 50,
"default_mode" :"cdc"
"default_mode" :"cdc",
"backoff_retry_count": 2
}
```

Expand Down
61 changes: 31 additions & 30 deletions drivers/mongodb/internal/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/datazip-inc/olake/constants"
"github.com/datazip-inc/olake/drivers/base"
"github.com/datazip-inc/olake/logger"
"github.com/datazip-inc/olake/protocol"
"github.com/datazip-inc/olake/types"
Expand All @@ -23,12 +24,13 @@ import (
func (m *Mongo) backfill(stream protocol.Stream, pool *protocol.WriterPool) error {
collection := m.client.Database(stream.Namespace(), options.Database().SetReadConcern(readconcern.Majority())).Collection(stream.Name())
chunks := stream.GetStateChunks()
backfillCtx := context.TODO()
if chunks == nil || chunks.Len() == 0 {
chunks = types.NewSet[types.Chunk]()
// chunks state not present means full load
logger.Infof("starting full load for stream [%s]", stream.ID())

totalCount, err := m.totalCountInCollection(collection)
totalCount, err := m.totalCountInCollection(backfillCtx, collection)
if err != nil {
return err
}
Expand Down Expand Up @@ -84,13 +86,6 @@ func (m *Mongo) backfill(stream protocol.Stream, pool *protocol.WriterPool) erro
end = &max
}

opts := options.Aggregate().SetAllowDiskUse(true).SetBatchSize(int32(math.Pow10(6)))
cursor, err := collection.Aggregate(ctx, generatepipeline(start, end), opts)
if err != nil {
return fmt.Errorf("collection.Find: %s", err)
}
defer cursor.Close(ctx)

waitChannel := make(chan error, 1)
insert, err := pool.NewThread(threadContext, stream, protocol.WithWaitChannel(waitChannel))
if err != nil {
Expand All @@ -102,32 +97,38 @@ func (m *Mongo) backfill(stream protocol.Stream, pool *protocol.WriterPool) erro
err = <-waitChannel
}()

for cursor.Next(ctx) {
var doc bson.M
if _, err = cursor.Current.LookupErr("_id"); err != nil {
return fmt.Errorf("looking up idProperty: %s", err)
} else if err = cursor.Decode(&doc); err != nil {
return fmt.Errorf("backfill decoding document: %s", err)
}

handleObjectID(doc)
exit, err := insert.Insert(types.CreateRawRecord(utils.GetKeysHash(doc, constants.MongoPrimaryID), doc, 0))
opts := options.Aggregate().SetAllowDiskUse(true).SetBatchSize(int32(math.Pow10(6)))
cursorIterationFunc := func() error {
cursor, err := collection.Aggregate(ctx, generatepipeline(start, end), opts)
if err != nil {
return fmt.Errorf("failed to finish backfill chunk: %s", err)
return fmt.Errorf("collection.Find: %s", err)
}
if exit {
return nil
}
}
defer cursor.Close(ctx)

if err := cursor.Err(); err != nil {
return err
for cursor.Next(ctx) {
var doc bson.M
if _, err = cursor.Current.LookupErr("_id"); err != nil {
return fmt.Errorf("looking up idProperty: %s", err)
} else if err = cursor.Decode(&doc); err != nil {
return fmt.Errorf("backfill decoding document: %s", err)
}

handleObjectID(doc)
exit, err := insert.Insert(types.CreateRawRecord(utils.GetKeysHash(doc, constants.MongoPrimaryID), doc, 0))
if err != nil {
return fmt.Errorf("failed to finish backfill chunk: %s", err)
}
if exit {
return nil
}
}
return cursor.Err()
}
return nil
return base.RetryOnBackoff(m.config.RetryCount, 1*time.Minute, cursorIterationFunc)
}

return utils.Concurrent(context.TODO(), chunks.Array(), chunks.Len(), func(ctx context.Context, one types.Chunk, number int) error {
err := processChunk(ctx, pool, stream, collection, one.Min, &one.Max)
return utils.Concurrent(backfillCtx, chunks.Array(), m.config.MaxThreads, func(ctx context.Context, one types.Chunk, number int) error {
err := processChunk(backfillCtx, pool, stream, collection, one.Min, &one.Max)
if err != nil {
return err
}
Expand All @@ -137,14 +138,14 @@ func (m *Mongo) backfill(stream protocol.Stream, pool *protocol.WriterPool) erro
})
}

func (m *Mongo) totalCountInCollection(collection *mongo.Collection) (int64, error) {
func (m *Mongo) totalCountInCollection(ctx context.Context, collection *mongo.Collection) (int64, error) {
var countResult bson.M
command := bson.D{{
Key: "collStats",
Value: collection.Name(),
}}
// Select the database
err := collection.Database().RunCommand(context.TODO(), command).Decode(&countResult)
err := collection.Database().RunCommand(ctx, command).Decode(&countResult)
if err != nil {
return 0, fmt.Errorf("failed to get total count: %s", err)
}
Expand Down
1 change: 1 addition & 0 deletions drivers/mongodb/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Config struct {
MaxThreads int `json:"max_threads"`
Database string `json:"database"`
DefaultMode types.SyncMode `json:"default_mode"`
RetryCount int `json:"backoff_retry_count"`
}

func (c *Config) URI() string {
Expand Down
14 changes: 11 additions & 3 deletions drivers/mongodb/internal/mon.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ import (
)

const (
discoverTime = 5 * time.Minute // maximum time allowed to discover all the streams
cdcCursorField = "_data"
discoverTime = 5 * time.Minute // maximum time allowed to discover all the streams
cdcCursorField = "_data"
defaultBackoffCount = 3
)

type Mongo struct {
Expand Down Expand Up @@ -54,7 +55,14 @@ func (m *Mongo) Setup() error {
m.client = conn
// no need to check from discover if it have cdc support or not
m.CDCSupport = true
// set state
// check for default backoff count
if m.config.RetryCount < 0 {
logger.Info("setting backoff retry count to default value %d", defaultBackoffCount)
m.config.RetryCount = defaultBackoffCount
} else {
// add 1 for first run
m.config.RetryCount += 1
}
return nil
}

Expand Down
10 changes: 8 additions & 2 deletions types/stream_configured.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,21 @@ func (s *ConfiguredStream) SetStateChunks(chunks *Set[Chunk]) {

// remove chunk
func (s *ConfiguredStream) RemoveStateChunk(chunk Chunk) {
// locking global state so that marshaling call not happen on streamState while writing
// example: logState can be called from anywhere which marshal the streamState
s.globalState.Lock()
s.streamState.Lock()
defer s.streamState.Unlock()
defer func() {
s.streamState.Unlock()
s.globalState.Unlock()
s.globalState.LogState()
}()

stateChunks, loaded := s.streamState.State.LoadAndDelete(ChunksKey)
if loaded {
stateChunks.(*Set[Chunk]).Remove(chunk)
s.streamState.State.Store(ChunksKey, stateChunks)
}
s.globalState.LogState()
}

// Delete keys from Stream State
Expand Down

0 comments on commit fdd4c15

Please sign in to comment.