From 3806a32f621cc98ebd112d92d334b03ac6f0c3b6 Mon Sep 17 00:00:00 2001 From: vikash390 Date: Tue, 18 Feb 2025 12:34:28 +0530 Subject: [PATCH] feat: Resumable Full Load In Mongodb (#61) (#99) Co-authored-by: Datazip Co-authored-by: Datazip Co-authored-by: Shubham Baldava Co-authored-by: hash-data --- drivers/mongodb/internal/backfill.go | 139 +++++++++++++++------------ drivers/mongodb/internal/cdc.go | 8 +- drivers/mongodb/internal/mon.go | 1 + logger/logger.go | 71 ++------------ protocol/check.go | 14 ++- protocol/discover.go | 4 +- protocol/interface.go | 4 +- protocol/spec.go | 12 ++- protocol/sync.go | 9 +- protocol/writers.go | 8 +- types/state.go | 58 ++++++++++- types/stream.go | 14 +++ types/stream_configured.go | 59 ++++++++++-- utils/utils.go | 8 +- writers/parquet/parquet.go | 2 +- 15 files changed, 250 insertions(+), 161 deletions(-) diff --git a/drivers/mongodb/internal/backfill.go b/drivers/mongodb/internal/backfill.go index d5001cf..e146769 100644 --- a/drivers/mongodb/internal/backfill.go +++ b/drivers/mongodb/internal/backfill.go @@ -20,81 +20,87 @@ import ( "go.mongodb.org/mongo-driver/mongo/readconcern" ) -type Boundry struct { - StartID primitive.ObjectID `json:"start"` - EndID *primitive.ObjectID `json:"end"` - end time.Time -} - func (m *Mongo) backfill(stream protocol.Stream, pool *protocol.WriterPool) error { - logger.Infof("starting full load for stream [%s]", stream.ID()) - collection := m.client.Database(stream.Namespace(), options.Database().SetReadConcern(readconcern.Majority())).Collection(stream.Name()) - totalCount, err := m.totalCountInCollection(collection) - if err != nil { - return err - } + chunks := stream.GetStateChunks() + if chunks == nil || chunks.Len() == 0 { + chunks = types.NewSet[types.Chunk]() + // chunks state not present means full load + logger.Infof("starting full load for stream [%s]", stream.ID()) - first, last, err := m.fetchExtremes(collection) - if err != nil { - return err - } - - logger.Infof("Extremes of Stream %s are start: %s \t end:%s", stream.ID(), first, last) - logger.Infof("Total expected count for stream %s are %d", stream.ID(), totalCount) - - timeDiff := last.Sub(first).Hours() / 6 - if timeDiff < 1 { - timeDiff = 1 - } - // for every 6hr difference ideal density is 10 Seconds - density := time.Duration(timeDiff) * (10 * time.Second) - return utils.ConcurrentC(context.TODO(), utils.Yield(func(prev *Boundry) (bool, *Boundry, error) { - start := first - if prev != nil { - start = prev.end + totalCount, err := m.totalCountInCollection(collection) + if err != nil { + return err } - boundry := &Boundry{ - StartID: *generateMinObjectID(start), + + first, last, err := m.fetchExtremes(collection) + if err != nil { + return err } - end := start.Add(density) - exit := true - if !end.After(last) { - exit = false - boundry.EndID = generateMinObjectID(end) - boundry.end = end - } else { - logger.Info("Scheduling last full load chunk query!") + logger.Infof("Extremes of Stream %s are start: %s \t end:%s", stream.ID(), first, last) + logger.Infof("Total expected count for stream %s are %d", stream.ID(), totalCount) + + timeDiff := last.Sub(first).Hours() / 6 + if timeDiff < 1 { + timeDiff = 1 } + // for every 6hr difference ideal density is 10 Seconds + density := time.Duration(timeDiff) * (10 * time.Second) + start := first + for start.Before(last) { + end := start.Add(density) + minObjectID := generateMinObjectID(start) + maxObjecID := generateMinObjectID(end) + if end.After(last) { + maxObjecID = generateMinObjectID(last.Add(time.Second)) + } + start = end + chunks.Insert(types.Chunk{ + Min: minObjectID, + Max: maxObjecID, + }) + } + // save the chunks state + stream.SetStateChunks(chunks) - return exit, boundry, nil - }), m.config.MaxThreads, func(ctx context.Context, one *Boundry, number int64) error { + } + logger.Infof("Running backfill for %d chunks", chunks.Len()) + // notice: err is declared in return, reason: defer call can access it + processChunk := func(ctx context.Context, pool *protocol.WriterPool, stream protocol.Stream, collection *mongo.Collection, minStr string, maxStr *string) (err error) { threadContext, cancelThread := context.WithCancel(ctx) defer cancelThread() + start, err := primitive.ObjectIDFromHex(minStr) + if err != nil { + return fmt.Errorf("invalid min ObjectID: %s", err) + } + + var end *primitive.ObjectID + if maxStr != nil { + max, err := primitive.ObjectIDFromHex(*maxStr) + if err != nil { + return fmt.Errorf("invalid max ObjectID: %s", err) + } + end = &max + } opts := options.Aggregate().SetAllowDiskUse(true).SetBatchSize(int32(math.Pow10(6))) - cursor, err := collection.Aggregate(ctx, generatepipeline(one.StartID, one.EndID), opts) + cursor, err := collection.Aggregate(ctx, generatepipeline(start, end), opts) if err != nil { return fmt.Errorf("collection.Find: %s", err) } defer cursor.Close(ctx) - waitChannel := make(chan struct{}) - defer func() { - if stream.GetSyncMode() == types.CDC { - // only wait in cdc mode - // make sure it get called after insert.Close() - <-waitChannel - } - logger.Infof("Finished full load chunk number %d.", number) - }() - - insert, err := pool.NewThread(threadContext, stream, protocol.WithNumber(number), protocol.WithWaitChannel(waitChannel)) + waitChannel := make(chan error, 1) + insert, err := pool.NewThread(threadContext, stream, protocol.WithWaitChannel(waitChannel)) if err != nil { return err } - defer insert.Close() + defer func() { + insert.Close() + // wait for chunk completion + err = <-waitChannel + }() for cursor.Next(ctx) { var doc bson.M @@ -107,15 +113,28 @@ func (m *Mongo) backfill(stream protocol.Stream, pool *protocol.WriterPool) erro handleObjectID(doc) exit, err := insert.Insert(types.CreateRawRecord(utils.GetKeysHash(doc, constants.MongoPrimaryID), doc, 0)) if err != nil { - return fmt.Errorf("failed to finish backfill chunk %d: %s", number, err) + return fmt.Errorf("failed to finish backfill chunk: %s", err) } if exit { return nil } } - return cursor.Err() - }) + if err := cursor.Err(); err != nil { + return err + } + return nil + } + + return utils.Concurrent(context.TODO(), chunks.Array(), chunks.Len(), func(ctx context.Context, one types.Chunk, number int) error { + err := processChunk(ctx, pool, stream, collection, one.Min, &one.Max) + if err != nil { + return err + } + // remove success chunk from state + stream.RemoveStateChunk(one) + return nil + }) } func (m *Mongo) totalCountInCollection(collection *mongo.Collection) (int64, error) { @@ -222,14 +241,14 @@ func generatepipeline(start primitive.ObjectID, end *primitive.ObjectID) mongo.P } // function to generate ObjectID with the minimum value for a given time -func generateMinObjectID(t time.Time) *primitive.ObjectID { +func generateMinObjectID(t time.Time) string { // Create the ObjectID with the first 4 bytes as the timestamp and the rest 8 bytes as 0x00 objectID := primitive.NewObjectIDFromTimestamp(t) for i := 4; i < 12; i++ { objectID[i] = 0x00 } - return &objectID + return objectID.Hex() } func handleObjectID(doc bson.M) { diff --git a/drivers/mongodb/internal/cdc.go b/drivers/mongodb/internal/cdc.go index caa860a..211f50a 100644 --- a/drivers/mongodb/internal/cdc.go +++ b/drivers/mongodb/internal/cdc.go @@ -49,7 +49,9 @@ func (m *Mongo) changeStreamSync(stream protocol.Stream, pool *protocol.WriterPo } prevResumeToken := stream.GetStateKey(cdcCursorField) - if prevResumeToken == nil { + chunks := stream.GetStateChunks() + + if prevResumeToken == nil || chunks == nil || chunks.Len() != 0 { // get current resume token and do full load for stream resumeToken, err := m.getCurrentResumeToken(cdcCtx, collection, pipeline) if err != nil { @@ -58,6 +60,10 @@ func (m *Mongo) changeStreamSync(stream protocol.Stream, pool *protocol.WriterPo if resumeToken != nil { prevResumeToken = (*resumeToken).Lookup(cdcCursorField).StringValue() } + + // save resume token + stream.SetStateKey(cdcCursorField, prevResumeToken) + if err := m.backfill(stream, pool); err != nil { return err } diff --git a/drivers/mongodb/internal/mon.go b/drivers/mongodb/internal/mon.go index 0176f6f..e30ce12 100644 --- a/drivers/mongodb/internal/mon.go +++ b/drivers/mongodb/internal/mon.go @@ -54,6 +54,7 @@ func (m *Mongo) Setup() error { m.client = conn // no need to check from discover if it have cdc support or not m.CDCSupport = true + // set state return nil } diff --git a/logger/logger.go b/logger/logger.go index 013ef3d..beec3bf 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -10,7 +10,6 @@ import ( "strings" "time" - "github.com/datazip-inc/olake/types" "github.com/rs/zerolog" "github.com/spf13/viper" "gopkg.in/natefinch/lumberjack.v2" @@ -74,49 +73,6 @@ func Warnf(format string, v ...interface{}) { logger.Warn().Msgf(format, v...) } -func LogSpec(spec map[string]interface{}) { - message := types.Message{} - message.Spec = spec - message.Type = types.SpecMessage - - Debug("logging spec") - Info(message) - if configFolder := viper.GetString("CONFIG_FOLDER"); configFolder != "" { - err := FileLogger(message.Spec, configFolder, "config", ".json") - if err != nil { - Fatalf("failed to create spec file: %s", err) - } - } -} - -func LogCatalog(streams []*types.Stream) { - message := types.Message{} - message.Type = types.CatalogMessage - message.Catalog = types.GetWrappedCatalog(streams) - Debug("logging catalog") - - Info(message) - // write catalog to the specified file - if configFolder := viper.GetString("CONFIG_FOLDER"); configFolder != "" { - err := FileLogger(message.Catalog, configFolder, "catalog", ".json") - if err != nil { - Fatalf("failed to create catalog file: %s", err) - } - } -} -func LogConnectionStatus(err error) { - message := types.Message{} - message.Type = types.ConnectionStatusMessage - message.ConnectionStatus = &types.StatusRow{} - if err != nil { - message.ConnectionStatus.Message = err.Error() - message.ConnectionStatus.Status = types.ConnectionFailed - } else { - message.ConnectionStatus.Status = types.ConnectionSucceed - } - Info(message) -} - func LogResponse(response *http.Response) { respDump, err := httputil.DumpResponse(response, true) if err != nil { @@ -135,25 +91,13 @@ func LogRequest(req *http.Request) { fmt.Println(string(requestDump)) } -func LogState(state *types.State) { - state.Lock() - defer state.Unlock() - - message := types.Message{} - message.Type = types.StateMessage - message.State = state - Debug("logging state") - Info(message) - if configFolder := viper.GetString("CONFIG_FOLDER"); configFolder != "" { - err := FileLogger(state, configFolder, "state", ".json") - if err != nil { - Fatalf("failed to create state file: %s", err) - } - } -} - // CreateFile creates a new file or overwrites an existing one with the specified filename, path, extension, -func FileLogger(content any, filePath string, fileName, fileExtension string) error { +func FileLogger(content any, fileName, fileExtension string) error { + // get config folder + filePath := viper.GetString("CONFIG_FOLDER") + if filePath == "" { + return fmt.Errorf("config folder is not set") + } // Construct the full file path contentBytes, err := json.Marshal(content) if err != nil { @@ -180,7 +124,8 @@ func FileLogger(content any, filePath string, fileName, fileExtension string) er func Init() { // Configure lumberjack for log rotation - timestamp := fmt.Sprintf("%d-%d-%d_%d-%d-%d", time.Now().Year(), time.Now().Month(), time.Now().Day(), time.Now().Hour(), time.Now().Minute(), time.Now().Second()) + currentTimestamp := time.Now().UTC() + timestamp := fmt.Sprintf("%d-%d-%d_%d-%d-%d", currentTimestamp.Year(), currentTimestamp.Month(), currentTimestamp.Day(), currentTimestamp.Hour(), currentTimestamp.Minute(), currentTimestamp.Second()) rotatingFile := &lumberjack.Logger{ Filename: fmt.Sprintf("%s/logs/sync_%s/olake.log", viper.GetString("CONFIG_FOLDER"), timestamp), // Log file path MaxSize: 100, // Max size in MB before log rotation diff --git a/protocol/check.go b/protocol/check.go index 96050ca..0b77541 100644 --- a/protocol/check.go +++ b/protocol/check.go @@ -80,7 +80,17 @@ var checkCmd = &cobra.Command{ return nil }() - // success - logger.LogConnectionStatus(err) + // log success + message := types.Message{ + Type: types.ConnectionStatusMessage, + ConnectionStatus: &types.StatusRow{ + Status: types.ConnectionSucceed, + }, + } + if err != nil { + message.ConnectionStatus.Message = err.Error() + message.ConnectionStatus.Status = types.ConnectionFailed + } + logger.Info(message) }, } diff --git a/protocol/discover.go b/protocol/discover.go index f598141..d22a723 100644 --- a/protocol/discover.go +++ b/protocol/discover.go @@ -4,7 +4,7 @@ import ( "errors" "fmt" - "github.com/datazip-inc/olake/logger" + "github.com/datazip-inc/olake/types" "github.com/datazip-inc/olake/utils" "github.com/spf13/cobra" ) @@ -38,7 +38,7 @@ var discoverCmd = &cobra.Command{ return errors.New("no streams found in connector") } - logger.LogCatalog(streams) + types.LogCatalog(streams) return nil }, } diff --git a/protocol/interface.go b/protocol/interface.go index 3b4d4da..d766108 100644 --- a/protocol/interface.go +++ b/protocol/interface.go @@ -83,9 +83,11 @@ type Stream interface { SetStateCursor(value any) SetStateKey(key string, value any) Validate(source *types.Stream) error + SetStateChunks(chunks *types.Set[types.Chunk]) + GetStateChunks() *types.Set[types.Chunk] + RemoveStateChunk(chunk types.Chunk) } type State interface { SetType(typ types.StateType) - IsZero() bool } diff --git a/protocol/spec.go b/protocol/spec.go index dd04901..1eb55ff 100644 --- a/protocol/spec.go +++ b/protocol/spec.go @@ -9,6 +9,7 @@ import ( "github.com/datazip-inc/olake/jsonschema" "github.com/datazip-inc/olake/logger" + "github.com/datazip-inc/olake/types" "github.com/datazip-inc/olake/utils" "github.com/spf13/cobra" @@ -71,7 +72,16 @@ var specCmd = &cobra.Command{ } } - logger.LogSpec(spec) + // log spec + message := types.Message{ + Spec: spec, + Type: types.SpecMessage, + } + logger.Info(message) + err := logger.FileLogger(message.Spec, "config", ".json") + if err != nil { + logger.Fatalf("failed to create spec file: %s", err) + } return nil }, diff --git a/protocol/sync.go b/protocol/sync.go index e26a5a6..431f7c6 100644 --- a/protocol/sync.go +++ b/protocol/sync.go @@ -133,11 +133,6 @@ var syncCmd = &cobra.Command{ logger.Info("Starting ChangeStream process in driver") - // Setup Global State from Connector - if err := driver.SetupGlobalState(state); err != nil { - return err - } - err := driver.RunChangeStream(pool, cdcStreams...) if err != nil { return fmt.Errorf("error occurred while reading records: %s", err) @@ -173,9 +168,7 @@ var syncCmd = &cobra.Command{ } logger.Infof("Total records read: %d", pool.TotalRecords()) - if !state.IsZero() { - logger.LogState(state) - } + state.LogState() return nil }, diff --git a/protocol/writers.go b/protocol/writers.go index c5ca048..fea7099 100644 --- a/protocol/writers.go +++ b/protocol/writers.go @@ -24,7 +24,7 @@ var RegisteredWriters = map[types.AdapterType]NewFunc{} type Options struct { Identifier string Number int64 - WaitChannel chan struct{} + WaitChannel chan error } type ThreadOptions func(opt *Options) @@ -41,7 +41,7 @@ func WithNumber(number int64) ThreadOptions { } } -func WithWaitChannel(waitChannel chan struct{}) ThreadOptions { +func WithWaitChannel(waitChannel chan error) ThreadOptions { return func(opt *Options) { opt.WaitChannel = waitChannel } @@ -197,9 +197,7 @@ func (w *WriterPool) NewThread(parent context.Context, stream Stream, options .. w.recordCount.Add(1) // increase the record count if w.TotalRecords()%batchSize == 0 { - if !state.IsZero() { - logger.LogState(state) - } + state.LogState() } } } diff --git a/types/state.go b/types/state.go index 4e236bc..2306063 100644 --- a/types/state.go +++ b/types/state.go @@ -5,6 +5,7 @@ import ( "sync" "sync/atomic" + "github.com/datazip-inc/olake/logger" "github.com/goccy/go-json" ) @@ -18,6 +19,8 @@ const ( // Mixed type indicates that the connector works with a mix of Globally shared and // Individual stream state (Note: not being used yet but in plan) MixedType StateType = "MIXED" + // constant key for chunks + ChunksKey = "chunks" ) // TODO: Add validation tags; Write custom unmarshal that triggers validation @@ -58,12 +61,12 @@ func (s *State) SetType(typ StateType) { // return nil // } -func (s *State) IsZero() bool { +func (s *State) isZero() bool { return s.Global == nil && len(s.Streams) == 0 } func (s *State) MarshalJSON() ([]byte, error) { - if s.IsZero() { + if s.isZero() { return json.Marshal(nil) } @@ -81,13 +84,41 @@ func (s *State) MarshalJSON() ([]byte, error) { return json.Marshal(p) } +func (s *State) LogState() { + if s.isZero() { + logger.Info("state is empty") + return + } + s.Lock() + defer s.Unlock() + + message := Message{ + Type: StateMessage, + State: s, + } + logger.Info(message) + + // log to file + err := logger.FileLogger(message.State, "state", ".json") + if err != nil { + logger.Fatalf("failed to create state file: %s", err) + } +} + +// Chunk struct that holds status, min, and max values +type Chunk struct { + Min string `json:"min"` + Max string `json:"max"` +} + type StreamState struct { - HoldsValue atomic.Bool `json:"-"` // If State holds some value and should not be excluded during unmarshaling then value true + *sync.Mutex `json:"-"` + HoldsValue atomic.Bool `json:"-"` // If State holds some value and should not be excluded during unmarshaling then value true Stream string `json:"stream"` Namespace string `json:"namespace"` SyncMode string `json:"sync_mode"` - State sync.Map `json:"-"` + State sync.Map `json:"state"` } // MarshalJSON custom marshaller to handle sync.Map encoding @@ -137,7 +168,24 @@ func (s *StreamState) UnmarshalJSON(data []byte) error { if len(aux.State) > 0 { s.HoldsValue.Store(true) } - + if rawChunks, exists := aux.State[ChunksKey]; exists { + if chunkList, ok := rawChunks.([]interface{}); ok { + chunksJSON, err := json.Marshal(chunkList) + if err != nil { + return err + } + + var chunks []Chunk + if err := json.Unmarshal(chunksJSON, &chunks); err != nil { + return err + } + + // Create a new Set[Chunk] and add chunks to it + chunkSet := NewSet[Chunk](chunks...) // Assuming you have a NewSet function + // Store the *Set[Chunk] in State + s.State.Store(ChunksKey, chunkSet) + } + } return nil } diff --git a/types/stream.go b/types/stream.go index 3c2f152..526129e 100644 --- a/types/stream.go +++ b/types/stream.go @@ -4,6 +4,7 @@ import ( "github.com/goccy/go-json" "github.com/datazip-inc/olake/jsonschema/schema" + "github.com/datazip-inc/olake/logger" "github.com/datazip-inc/olake/utils" ) @@ -117,3 +118,16 @@ func StreamsToMap(streams ...*Stream) map[string]*Stream { return output } + +func LogCatalog(streams []*Stream) { + message := Message{ + Type: CatalogMessage, + Catalog: GetWrappedCatalog(streams), + } + logger.Info(message) + // write catalog to the specified file + err := logger.FileLogger(message.Catalog, "catalog", ".json") + if err != nil { + logger.Fatalf("failed to create catalog file: %s", err) + } +} diff --git a/types/stream_configured.go b/types/stream_configured.go index 718843c..3f08056 100644 --- a/types/stream_configured.go +++ b/types/stream_configured.go @@ -9,6 +9,7 @@ import ( // Input/Processed object for Stream type ConfiguredStream struct { + globalState *State `json:"-"` // global state streamState *StreamState `json:"-"` // in-memory state copy for individual stream StreamMetadata StreamMetadata InitialCursorStateValue any `json:"-"` // Cached initial state value @@ -61,8 +62,10 @@ func (s *ConfiguredStream) Cursor() string { // Returns empty and missing func (s *ConfiguredStream) SetupState(state *State) { + // set global state (stream must know parent state as well) + s.globalState = state // Initialize a state or map the already present state - if !state.IsZero() { + if !state.isZero() { i, contains := utils.ArrayContains(state.Streams, func(elem *StreamState) bool { return elem.Namespace == s.Namespace() && elem.Stream == s.Name() }) @@ -70,6 +73,7 @@ func (s *ConfiguredStream) SetupState(state *State) { if contains { s.InitialCursorStateValue, _ = state.Streams[i].State.Load(s.CursorField) s.streamState = state.Streams[i] + s.streamState.Mutex = &sync.Mutex{} return } } @@ -78,6 +82,7 @@ func (s *ConfiguredStream) SetupState(state *State) { Stream: s.Name(), Namespace: s.Namespace(), State: sync.Map{}, + Mutex: &sync.Mutex{}, } // save references of stream state and add it to connector state @@ -92,11 +97,13 @@ func (s *ConfiguredStream) InitialState() any { func (s *ConfiguredStream) SetStateCursor(value any) { s.streamState.HoldsValue.Store(true) s.streamState.State.Store(s.Cursor(), value) + s.globalState.LogState() } func (s *ConfiguredStream) SetStateKey(key string, value any) { s.streamState.HoldsValue.Store(true) s.streamState.State.Store(key, value) + s.globalState.LogState() } func (s *ConfiguredStream) GetStateCursor() any { @@ -109,19 +116,51 @@ func (s *ConfiguredStream) GetStateKey(key string) any { return val } -// Delete keys from Stream State -func (s *ConfiguredStream) DeleteStateKeys(keys ...string) []any { - values := []any{} - for _, key := range keys { - val, _ := s.streamState.State.Load(key) - values = append(values, val) // cache - - s.streamState.State.Delete(key) // delete +// GetStateChunks retrieves all chunks from the state. +func (s *ConfiguredStream) GetStateChunks() *Set[Chunk] { + chunks, _ := s.streamState.State.Load(ChunksKey) + if chunks != nil { + chunksSet, converted := chunks.(*Set[Chunk]) + if converted { + return chunksSet + } } + return nil +} + +// set chunks +func (s *ConfiguredStream) SetStateChunks(chunks *Set[Chunk]) { + s.streamState.State.Store(ChunksKey, chunks) + s.streamState.HoldsValue.Store(true) + s.globalState.LogState() +} + +// remove chunk +func (s *ConfiguredStream) RemoveStateChunk(chunk Chunk) { + s.streamState.Lock() + defer s.streamState.Unlock() - return values + stateChunks, loaded := s.streamState.State.LoadAndDelete(ChunksKey) + if loaded { + stateChunks.(*Set[Chunk]).Remove(chunk) + s.streamState.State.Store(ChunksKey, stateChunks) + } + s.globalState.LogState() } +// Delete keys from Stream State +// func (s *ConfiguredStream) DeleteStateKeys(keys ...string) []any { +// values := []any{} +// for _, key := range keys { +// val, _ := s.streamState.State.Load(key) +// values = append(values, val) // cache + +// s.streamState.State.Delete(key) // delete +// } + +// return values +// } + // Validate Configured Stream with Source Stream func (s *ConfiguredStream) Validate(source *Stream) error { if !source.SupportedSyncModes.Exists(s.Stream.SyncMode) { diff --git a/utils/utils.go b/utils/utils.go index c7bc795..22ccf45 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/datazip-inc/olake/logger" "github.com/goccy/go-json" "github.com/oklog/ulid" @@ -141,7 +142,7 @@ func UnmarshalFile(file string, dest any) error { err = json.Unmarshal(data, dest) if err != nil { - return err + return fmt.Errorf("failed to unmarshal file[%s]: %s", file, err) } return nil @@ -199,7 +200,10 @@ func genULID(t time.Time) string { ulidMutex.Lock() defer ulidMutex.Unlock() // TODO: Handle Error (Need to remove state and catalog print from logger) - newUlid, _ := ulid.New(ulid.Timestamp(t), entropy) + newUlid, err := ulid.New(ulid.Timestamp(t), entropy) + if err != nil { + logger.Fatalf("failed to generate ulid: %s", err) + } return newUlid.String() } diff --git a/writers/parquet/parquet.go b/writers/parquet/parquet.go index 8fcbe63..a40cd93 100644 --- a/writers/parquet/parquet.go +++ b/writers/parquet/parquet.go @@ -198,7 +198,7 @@ func (p *Parquet) Check() error { p.config.Path = os.TempDir() logger.Info("s3 writer configuration found") } else if p.config.Path != "" { - logger.Info("local writer configuration found, writing at location[%s]", p.config.Path) + logger.Infof("local writer configuration found, writing at location[%s]", p.config.Path) } else { return fmt.Errorf("invalid configuration found") }