Skip to content

Commit

Permalink
Revert "correctly handle lz4 error."
Browse files Browse the repository at this point in the history
This reverts commit 72d9adb.
  • Loading branch information
Sakura-Byte committed Jan 5, 2025
1 parent afdac40 commit acfd22b
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 88 deletions.
18 changes: 7 additions & 11 deletions backend/115/115.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"sync"
"time"

"github.com/pierrec/lz4/v4"
"github.com/rclone/rclone/backend/115/api"
"github.com/rclone/rclone/backend/115/dircache"
"github.com/rclone/rclone/fs"
Expand Down Expand Up @@ -927,9 +928,13 @@ func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options .
// checking to see if there is one already - use Put() for that.
func (f *Fs) putUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo, remote string, options ...fs.OpenOption) (fs.Object, error) {
// upload src with the name of remote
newObj, isSpecialError, err := f.upload(ctx, in, src, remote, options...)
newObj, err := f.upload(ctx, in, src, remote, options...)
if err != nil {
return nil, fmt.Errorf("failed to upload: %w", err)
if !errors.Is(err, lz4.ErrInvalidSourceShortBuffer) {
return nil, fmt.Errorf("failed to upload: %w", err)
}
// In this case, the upload (perhaps via hash) could be successful,
/// so let the subsequent process locate the uploaded object.
}

if newObj == nil {
Expand All @@ -939,19 +944,13 @@ func (f *Fs) putUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo,
o := newObj.(*Object)

if o.hasMetaData {
if isSpecialError {
fs.Logf(o, "Upload successful with lz4.ErrInvalidSourceShortBuffer")
}
return o, nil
}

var info *api.File
found, err := f.listAll(ctx, o.parent, f.opt.ListChunk, true, false, func(item *api.File) bool {
if strings.ToLower(item.Sha) == o.sha1sum {
info = item
if isSpecialError {
fs.Logf(o, "Upload successful with lz4.ErrInvalidSourceShortBuffer")
}
return true
}
return false
Expand All @@ -962,9 +961,6 @@ func (f *Fs) putUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo,
if !found {
return nil, fs.ErrorObjectNotFound
}
if isSpecialError {
fs.Logf(o, "Upload successful with lz4.ErrInvalidSourceShortBuffer")
}
return o, o.setMetaData(info)
}

Expand Down
131 changes: 54 additions & 77 deletions backend/115/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
"github.com/cenkalti/backoff/v4"
"github.com/pierrec/lz4/v4"
"github.com/rclone/rclone/backend/115/api"
"github.com/rclone/rclone/backend/115/cipher"
"github.com/rclone/rclone/fs"
Expand Down Expand Up @@ -131,7 +130,7 @@ func generateToken(userID, fileID, fileSize, signKey, signVal, timeStamp, appVer
}

// initUpload calls 115's initupload endpoint. This is used for both 秒传 checks and actual uploads.
func (f *Fs) initUpload(ctx context.Context, size int64, name, dirID, sha1sum, signKey, signVal string) (*api.UploadInitInfo, bool, error) {
func (f *Fs) initUpload(ctx context.Context, size int64, name, dirID, sha1sum, signKey, signVal string) (*api.UploadInitInfo, error) {
operation := func() (*api.UploadInitInfo, error) {
filename := f.opt.Enc.FromStandardName(name)
filesize := strconv.FormatInt(size, 10)
Expand Down Expand Up @@ -209,34 +208,27 @@ func (f *Fs) initUpload(ctx context.Context, size int64, name, dirID, sha1sum, s
}

var ui *api.UploadInitInfo
var isSpecialError bool
var err error

operationWithRetry := func() (*api.UploadInitInfo, bool, error) {
operationWithRetry := func() (*api.UploadInitInfo, error) {
expBackoff := backoff.NewExponentialBackOff()
expBackoff.MaxElapsedTime = 2 * time.Minute
err := backoff.RetryNotify(func() error {
var err error
ui, err = operation()
if err != nil {
if errors.Is(err, lz4.ErrInvalidSourceShortBuffer) {
// Special handling: treat as success
isSpecialError = true
return backoff.Permanent(err) // Stop retrying
}
return err // Retry for other errors
return err
}
return nil
}, expBackoff, func(err error, duration time.Duration) {
fs.Logf(nil, "initUpload failed: %v. Retrying in %v", err, duration)
})
return ui, isSpecialError, err
return ui, err
}

ui, isSpecialError, err = operationWithRetry()
if err != nil && !isSpecialError {
return nil, false, err
ui, err := operationWithRetry()
if err != nil {
return nil, err
}
return ui, isSpecialError, nil
return ui, nil
}

// postUpload processes the JSON callback after an upload to OSS or sample upload.
Expand Down Expand Up @@ -470,7 +462,7 @@ func (f *Fs) tryHashUpload(
o *Object,
leaf, dirID string,
size int64,
) (bool, *api.UploadInitInfo, bool, error) {
) (bool, *api.UploadInitInfo, error) {
fs.Debugf(o, "tryHashUpload: attempting 秒传...")

// 1) Get or compute the file's SHA-1
Expand All @@ -481,22 +473,17 @@ func (f *Fs) tryHashUpload(
hashStr, in, cleanup, err = bufferIOwithSHA1(in, size, int64(f.opt.HashMemoryThreshold))
defer cleanup()
if err != nil {
return false, nil, false, fmt.Errorf("failed to calculate SHA1: %w", err)
return false, nil, fmt.Errorf("failed to calculate SHA1: %w", err)
}
} else {
fs.Debugf(o, "tryHashUpload: using precomputed SHA1=%s", hashStr)
}
o.sha1sum = strings.ToLower(hashStr)

// 2) Call initUpload with that SHA-1
ui, isSpecialError, err := f.initUpload(ctx, size, leaf, dirID, hashStr, "", "")
ui, err := f.initUpload(ctx, size, leaf, dirID, hashStr, "", "")
if err != nil {
if isSpecialError {
// Treat as success and proceed
fs.Logf(o, "initUpload encountered lz4.ErrInvalidSourceShortBuffer, treating as success")
return true, ui, isSpecialError, nil
}
return false, nil, isSpecialError, fmt.Errorf("秒传 initUpload failed: %w", err)
return false, nil, fmt.Errorf("秒传 initUpload failed: %w", err)
}

// 3) Handle different statuses
Expand All @@ -515,29 +502,29 @@ func (f *Fs) tryHashUpload(
if info, err2 := f.getFile(ctx, "", ui.PickCode); err2 == nil {
_ = o.setMetaData(info)
}
return true, ui, false, nil
return true, ui, nil

case 1:
// status=1 => server doesn't have file => need actual upload
fs.Debugf(o, "tryHashUpload: 秒传 not possible => server requires real upload.")
return false, ui, false, nil
return false, ui, nil

case 7:
// partial-block check
fs.Debugf(o, "tryHashUpload: 秒传 partial-block check => signCheck=%q", ui.SignCheck)
signKey = ui.SignKey
if signVal, err = calcBlockSHA1(ctx, in, src, ui.SignCheck); err != nil {
return false, nil, false, fmt.Errorf("calcBlockSHA1 error: %w", err)
return false, nil, fmt.Errorf("calcBlockSHA1 error: %w", err)
}
ui, isSpecialError, err = f.initUpload(ctx, size, leaf, dirID, hashStr, signKey, signVal)
ui, err = f.initUpload(ctx, size, leaf, dirID, hashStr, signKey, signVal)
if err != nil {
return false, nil, isSpecialError, fmt.Errorf("tryHashUpload: 秒传 re-init error: %w", err)
return false, nil, fmt.Errorf("tryHashUpload: 秒传 re-init error: %w", err)
}
continue

default:
// Unexpected status => treat as error
return false, nil, false, fmt.Errorf("tryHashUpload: 秒传 error: unexpected status=%d", ui.Status)
return false, nil, fmt.Errorf("tryHashUpload: 秒传 error: unexpected status=%d", ui.Status)
}
}
}
Expand Down Expand Up @@ -672,29 +659,29 @@ func (f *Fs) upload(
src fs.ObjectInfo,
remote string,
options ...fs.OpenOption,
) (fs.Object, bool, error) {
) (fs.Object, error) {
if f.isShare {
return nil, false, errors.New("unsupported for shared filesystem")
return nil, errors.New("unsupported for shared filesystem")
}
size := src.Size()

// Ensure we have userID/userkey
if f.userkey == "" {
if err := f.getUploadBasicInfo(ctx); err != nil {
return nil, false, fmt.Errorf("failed to get upload basic info: %w", err)
return nil, fmt.Errorf("failed to get upload basic info: %w", err)
}
if f.userID == "" || f.userkey == "" {
return nil, false, fmt.Errorf("empty userid or userkey")
return nil, fmt.Errorf("empty userid or userkey")
}
}

if size > int64(maxUploadSize) {
return nil, false, fmt.Errorf("file size exceeds upload limit: %d > %d", size, maxUploadSize)
return nil, fmt.Errorf("file size exceeds upload limit: %d > %d", size, maxUploadSize)
}

o, leaf, dirID, err := f.createObject(ctx, remote, src.ModTime(ctx), size)
if err != nil {
return nil, false, err
return nil, err
}

//----------------------------------------------------------------
Expand All @@ -704,11 +691,11 @@ func (f *Fs) upload(
if size <= int64(StreamUploadLimit) {
obj, err := f.doSampleUpload(ctx, in, o, leaf, dirID, size, options...)
if err != nil {
return nil, false, err
return nil, err
}
return obj, false, nil
return obj, nil
}
return nil, false, fmt.Errorf("OnlyStream is enabled but file size %d exceeds StreamUploadLimit %d",
return nil, fmt.Errorf("OnlyStream is enabled but file size %d exceeds StreamUploadLimit %d",
size, StreamUploadLimit)
}

Expand All @@ -721,50 +708,46 @@ func (f *Fs) upload(
// Tiny files use sample upload
obj, err := f.doSampleUpload(ctx, in, o, leaf, dirID, size, options...)
if err != nil {
return nil, false, err
return nil, err
}
return obj, false, nil
return obj, nil
}
// Attempt fast upload (秒传)
gotIt, ui, isSpecialError, err := f.tryHashUpload(ctx, in, src, o, leaf, dirID, size)
if isSpecialError {
return o, true, nil
}
gotIt, ui, err := f.tryHashUpload(ctx, in, src, o, leaf, dirID, size)
if err != nil {
return nil, false, fmt.Errorf("FastUpload: 秒传 error: %w", err)
return nil, fmt.Errorf("FastUpload: 秒传 error: %w", err)
}

if gotIt {
return o, false, nil
return o, nil
}
// Fallback to uploadToOSS using the obtained UploadInitInfo
if ui != nil {
if size <= int64(StreamUploadLimit) {
obj, err := f.doSampleUpload(ctx, in, o, leaf, dirID, size, options...)
if err != nil {
return nil, false, err
return nil, err
}
return obj, false, nil
return obj, nil
}
obj, err := f.uploadToOSS(ctx, in, src, o, leaf, dirID, size, ui, options...)
if err != nil {
return nil, false, err
return nil, err
}
return obj, false, nil
return obj, nil
}
// If ui is nil, fallback to standard upload
if size <= int64(StreamUploadLimit) {
obj, err := f.doSampleUpload(ctx, in, o, leaf, dirID, size, options...)
if err != nil {
return nil, false, err
return nil, err
}
return obj, false, nil
return obj, nil
}
obj, err := f.uploadToOSS(ctx, in, src, o, leaf, dirID, size, nil, options...)
if err != nil {
return nil, false, err
return nil, err
}
return obj, false, nil
return obj, nil
}

//----------------------------------------------------------------
Expand All @@ -773,20 +756,17 @@ func (f *Fs) upload(
if f.opt.UploadHashOnly {
hashStr, _ := src.Hash(ctx, hash.SHA1)
if hashStr == "" {
return nil, false, fserrors.NoRetryError(errors.New("UploadHashOnly: skipping since no SHA1"))
}
gotIt, _, isSpecialError, err := f.tryHashUpload(ctx, in, src, o, leaf, dirID, size)
if isSpecialError {
return o, true, nil
return nil, fserrors.NoRetryError(errors.New("UploadHashOnly: skipping since no SHA1"))
}
gotIt, _, err := f.tryHashUpload(ctx, in, src, o, leaf, dirID, size)
if err != nil {
return nil, false, err
return nil, err
}
if gotIt {
return o, false, nil
return o, nil
}
// No fallback for UploadHashOnly
return nil, false, fserrors.NoRetryError(errors.New("UploadHashOnly: server does not have file => skipping"))
return nil, fserrors.NoRetryError(errors.New("UploadHashOnly: server does not have file => skipping"))
}

//----------------------------------------------------------------
Expand All @@ -796,32 +776,29 @@ func (f *Fs) upload(
// Use sample upload for small files
obj, err := f.doSampleUpload(ctx, in, o, leaf, dirID, size, options...)
if err != nil {
return nil, false, err
return nil, err
}
return obj, false, nil
return obj, nil
}

// Attempt fast upload (秒传)
gotIt, ui, isSpecialError, err := f.tryHashUpload(ctx, in, src, o, leaf, dirID, size)
if isSpecialError {
return o, true, nil
}
gotIt, ui, err := f.tryHashUpload(ctx, in, src, o, leaf, dirID, size)
if err != nil {
fs.Debugf(o, "normal: 秒传 error => fallback to uploadToOSS: %v", err)
obj, uploadErr := f.uploadToOSS(ctx, in, src, o, leaf, dirID, size, ui, options...)
if uploadErr != nil {
return nil, false, uploadErr
return nil, uploadErr
}
return obj, false, nil
return obj, nil
}
if gotIt {
// Fast upload successful
return o, false, nil
return o, nil
}
// Fallback to actual upload to OSS
obj, err := f.uploadToOSS(ctx, in, src, o, leaf, dirID, size, ui, options...)
if err != nil {
return nil, false, err
return nil, err
}
return obj, false, nil
return obj, nil
}

0 comments on commit acfd22b

Please sign in to comment.