From b89863461b64471650cd8ad49cc3bbdfc20c7d94 Mon Sep 17 00:00:00 2001 From: Michael Wilner Date: Fri, 24 Jan 2025 12:41:57 -0600 Subject: [PATCH] Protect concurrent usage of filestore --- store/file/file_store.go | 35 ++++++++++++++++++++++++----------- store/file/util.go | 9 +++++++-- 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/store/file/file_store.go b/store/file/file_store.go index 085612d4e..c0a28531e 100644 --- a/store/file/file_store.go +++ b/store/file/file_store.go @@ -22,6 +22,7 @@ import ( "path" "strconv" "strings" + "sync" "time" "github.com/pkg/errors" @@ -41,12 +42,14 @@ type fileStore struct { sessionFname string senderSeqNumsFname string targetSeqNumsFname string - bodyFile *os.File - headerFile *os.File - sessionFile *os.File - senderSeqNumsFile *os.File - targetSeqNumsFile *os.File - fileSync bool + + fileMu sync.Mutex + bodyFile *os.File + headerFile *os.File + sessionFile *os.File + senderSeqNumsFile *os.File + targetSeqNumsFile *os.File + fileSync bool } // NewStoreFactory returns a file-based implementation of MessageStoreFactory. @@ -218,6 +221,9 @@ func (store *fileStore) populateCache() (creationTimePopulated bool, err error) } func (store *fileStore) setSession() error { + store.fileMu.Lock() + defer store.fileMu.Unlock() + if _, err := store.sessionFile.Seek(0, io.SeekStart); err != nil { return fmt.Errorf("unable to rewind file: %s: %s", store.sessionFname, err.Error()) } @@ -238,6 +244,8 @@ func (store *fileStore) setSession() error { } func (store *fileStore) setSeqNum(f *os.File, seqNum int) error { + store.fileMu.Lock() + defer store.fileMu.Unlock() if _, err := f.Seek(0, io.SeekStart); err != nil { return fmt.Errorf("unable to rewind file: %s: %s", f.Name(), err.Error()) } @@ -304,6 +312,8 @@ func (store *fileStore) SetCreationTime(_ time.Time) { } func (store *fileStore) SaveMessage(seqNum int, msg []byte) error { + store.fileMu.Lock() + defer store.fileMu.Unlock() offset, err := store.bodyFile.Seek(0, io.SeekEnd) if err != nil { return fmt.Errorf("unable to seek to end of file: %s: %s", store.bodyFname, err.Error()) @@ -339,6 +349,9 @@ func (store *fileStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg [] } func (store *fileStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error { + store.fileMu.Lock() + defer store.fileMu.Unlock() + // Sync files and seek to start of header file if err := store.bodyFile.Sync(); err != nil { return fmt.Errorf("unable to flush file: %s: %s", store.bodyFname, err.Error()) @@ -386,19 +399,19 @@ func (store *fileStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error // Close closes the store's files. func (store *fileStore) Close() error { - if err := closeFile(store.bodyFile); err != nil { + if err := closeSyncFile(store.bodyFile); err != nil { return err } - if err := closeFile(store.headerFile); err != nil { + if err := closeSyncFile(store.headerFile); err != nil { return err } - if err := closeFile(store.sessionFile); err != nil { + if err := closeSyncFile(store.sessionFile); err != nil { return err } - if err := closeFile(store.senderSeqNumsFile); err != nil { + if err := closeSyncFile(store.senderSeqNumsFile); err != nil { return err } - if err := closeFile(store.targetSeqNumsFile); err != nil { + if err := closeSyncFile(store.targetSeqNumsFile); err != nil { return err } diff --git a/store/file/util.go b/store/file/util.go index d181e8c27..9baf380f6 100644 --- a/store/file/util.go +++ b/store/file/util.go @@ -48,9 +48,14 @@ func createFilenamePrefix(s quickfix.SessionID) string { return strings.Join(fname, "-") } -// closeFile behaves like Close, except that no error is returned if the file does not exist. -func closeFile(f *os.File) error { +// closeSyncFile behaves like Sync and Close, except that no error is returned if the file does not exist. +func closeSyncFile(f *os.File) error { if f != nil { + if err := f.Sync(); err != nil { + if !os.IsNotExist(err) { + return err + } + } if err := f.Close(); err != nil { if !os.IsNotExist(err) { return err