Skip to content

Commit

Permalink
Fixing flushPendingWrites method (#2975)
Browse files Browse the repository at this point in the history
* Fixing flushPendingWrites method

* fixing struct name

* Fixing move_file test

* Not flushing pending writes for local file

* running rename test for buffered writes
  • Loading branch information
vadlakondaswetha authored and ashmeenkaur committed Feb 7, 2025
1 parent da12050 commit 68836fe
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 6 deletions.
6 changes: 3 additions & 3 deletions internal/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2024,8 +2024,7 @@ func (fs *fileSystem) Rename(
if err != nil {
return err
}
// For streaming writes, we will finalize localChild and do rename.
if localChild != nil && !fs.newConfig.Write.EnableStreamingWrites {
if localChild != nil {
fs.unlockAndDecrementLookupCount(localChild, 1)
return fmt.Errorf("cannot rename open file %q: %w", op.OldName, syscall.ENOTSUP)
}
Expand Down Expand Up @@ -2062,7 +2061,7 @@ func (fs *fileSystem) Rename(
func (fs *fileSystem) renameFile(ctx context.Context, op *fuseops.RenameOp, child *inode.Core, oldParent inode.DirInode, newParent inode.DirInode) error {
updatedMinObject, err := fs.flushPendingWrites(ctx, child)
if err != nil {
return fmt.Errorf("flushBeforeRename error :%v", err)
return fmt.Errorf("flushPendingWrites error :%v", err)
}

if (child.Bucket.BucketType().Hierarchical && fs.enableAtomicRenameObject) || child.Bucket.BucketType().Zonal {
Expand Down Expand Up @@ -2103,6 +2102,7 @@ func (fs *fileSystem) flushPendingWrites(ctx context.Context, child *inode.Core)
defer fileInode.Unlock()
// Try to flush if there are any pending writes.
err = fs.flushFile(ctx, fileInode)
minObject = fileInode.Source()
return
}

Expand Down
21 changes: 21 additions & 0 deletions internal/fs/streaming_writes_empty_gcs_object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package fs_test
import (
"os"
"path"
"strings"
"syscall"
"testing"

Expand Down Expand Up @@ -65,6 +66,26 @@ func (t *StreamingWritesEmptyGCSObjectTest) SetupTest() {
assert.NoError(t.T(), err)
}

func (t *StreamingWritesEmptyGCSObjectTest) TestRenameFileWithPendingWrites() {
_, err := t.f1.Write([]byte("tacos"))
assert.NoError(t.T(), err)
newFilePath := path.Join(mntDir, "test.txt")
// Check that new file doesn't exist.
_, err = os.Stat(newFilePath)
assert.Error(t.T(), err)
assert.True(t.T(), strings.Contains(err.Error(), "no such file or directory"))

err = os.Rename(t.f1.Name(), newFilePath)

assert.NoError(t.T(), err)
_, err = os.Stat(t.f1.Name())
assert.Error(t.T(), err)
assert.True(t.T(), strings.Contains(err.Error(), "no such file or directory"))
content, err := os.ReadFile(newFilePath)
assert.NoError(t.T(), err)
assert.Equal(t.T(), "tacos", string(content))
}

func (t *StreamingWritesEmptyGCSObjectTest) TearDownTest() {
t.fsTest.TearDown()
}
Expand Down
2 changes: 1 addition & 1 deletion tools/integration_tests/local_file/rename.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func verifyRenameOperationNotSupported(err error, t *testing.T) {
// Tests
////////////////////////////////////////////////////////////////////////

func (t *localFileTestSuite) TestRenameOfLocalFileFails() {
func (t *CommonLocalFileTestSuite) TestRenameOfLocalFileFails() {
testDirPath = setup.SetupTestDirectory(testDirName)
// Create local file with some content.
_, fh := CreateLocalFileInTestDir(ctx, storageClient, testDirPath, FileName1, t.T())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"github.com/stretchr/testify/require"
)

func (t *defaultMountCommonTest) TestMoveBeforeFileIsFlushed() {
// os.Rename can't be invoked over local files. It's failing with file not found error.
// Hence running this test only for empty GCS file.
func (t *defaultMountEmptyGCSFile) TestRenameBeforeFileIsFlushed() {
operations.WriteWithoutClose(t.f1, FileContents, t.T())
operations.WriteWithoutClose(t.f1, FileContents, t.T())
operations.VerifyStatFile(t.filePath, int64(2*len(FileContents)), FilePerms, t.T())
Expand All @@ -31,7 +33,7 @@ func (t *defaultMountCommonTest) TestMoveBeforeFileIsFlushed() {

newFile := "newFile.txt"
destDirPath := path.Join(testDirPath, newFile)
err = operations.Move(t.filePath, destDirPath)
err = operations.RenameFile(t.filePath, destDirPath)

// Validate that move didn't throw any error.
require.NoError(t.T(), err)
Expand Down

0 comments on commit 68836fe

Please sign in to comment.