-
-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Resumable Full Load In Mongodb #61
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More improvements required
drivers/mongodb/internal/backfill.go
Outdated
if end.After(last) { | ||
boundry = types.Chunk{ | ||
Min: generateMinObjectID(start), | ||
Max: "", | ||
} | ||
logger.Info("scheduling last full load chunk query!") | ||
} else { | ||
boundry = types.Chunk{ | ||
Min: generateMinObjectID(start), | ||
Max: generateMinObjectID(end), | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be simplified. Created boundry struct at the end.
drivers/mongodb/internal/backfill.go
Outdated
} | ||
} | ||
chunks = append(chunks, boundry) | ||
chunk := types.Chunk{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are you again creating an chunk var?
drivers/mongodb/internal/backfill.go
Outdated
stream.AppendChunksToStreamState(chunk) | ||
start = end | ||
} | ||
|
||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if node crashes here, where are we storing the chunks into state file?
types/state.go
Outdated
@@ -81,36 +81,69 @@ func (s *State) MarshalJSON() ([]byte, error) { | |||
return json.Marshal(p) | |||
} | |||
|
|||
// DualSyncMap struct to hold Cursor and Chunks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change comment. Please self-review PR line by line.
types/stream_configured.go
Outdated
}) | ||
return chunks | ||
} | ||
func (s *ConfiguredStream) UpdateChunkStatusInStreamState(chunkID string, newStatus string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to persist to disk here. as OOM can occur at any place.
…c/olake into feat/state-controller
Will add defaultFlag for partitioning path in this PR |
…at/state-controller
…at/state-controller
logger/logger.go
Outdated
@@ -180,7 +124,8 @@ func FileLogger(content any, filePath string, fileName, fileExtension string) er | |||
|
|||
func Init() { | |||
// Configure lumberjack for log rotation | |||
timestamp := fmt.Sprintf("%d-%d-%d_%d-%d-%d", time.Now().Year(), time.Now().Month(), time.Now().Day(), time.Now().Hour(), time.Now().Minute(), time.Now().Second()) | |||
timeStamp := time.Now().UTC() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name it something else. one can not have same name but camel case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do it
Description
Added logging for failed chunks in the terminal and implemented handling to retry processing them efficiently.
Fixes # (issue)
Type of change
How Has This Been Tested?
Screenshots or Recordings
Related PR's (If Any):