From 59ffb4e0a1d7824b4d5ae20bdc9fda3c47a81f45 Mon Sep 17 00:00:00 2001 From: Sakura-Byte <42319937+Sakura-Byte@users.noreply.github.com> Date: Sat, 1 Feb 2025 20:53:54 +0800 Subject: [PATCH] Update upload.go --- backend/115/upload.go | 165 +++++++++++++++--------------------------- 1 file changed, 57 insertions(+), 108 deletions(-) diff --git a/backend/115/upload.go b/backend/115/upload.go index a1287dcdb8f7c..d65ef79799869 100644 --- a/backend/115/upload.go +++ b/backend/115/upload.go @@ -450,7 +450,7 @@ func (f *Fs) sampleUploadForm(ctx context.Context, in io.Reader, initResp *api.S } // ───────────────────────────────────────────────────────────────────────────────────── -// NEWLY REFACTORED FUNCTIONS +// NEWLY REFACTORED FUNCTIONS - Applied to the second code // ───────────────────────────────────────────────────────────────────────────────────── // tryHashUpload attempts 秒传 by checking if the file already exists on the server. @@ -483,6 +483,7 @@ func (f *Fs) tryHashUpload( } } else { fs.Debugf(o, "tryHashUpload: using precomputed SHA1=%s", hashStr) + newIn = in // if SHA1 is precomputed, no need to change the input reader. } o.sha1sum = strings.ToLower(hashStr) @@ -504,16 +505,13 @@ func (f *Fs) tryHashUpload( acc.ServerSideTransferStart() acc.ServerSideCopyEnd(size) } - // Optionally fetch final info - if info, err2 := f.getFile(ctx, "", ui.PickCode); err2 == nil { - _ = o.setMetaData(info) - } - return false, ui, newIn, cleanup, nil + // Optionally fetch final info - Removed this line in this fix to simplify and avoid potential issues if getFile fails. + return true, ui, newIn, cleanup, nil // Return true to indicate 秒传 success case 1: // status=1 => server doesn't have file => need actual upload fs.Debugf(o, "tryHashUpload: 秒传 not possible => server requires real upload.") - return false, ui, in, cleanup, nil + return false, ui, newIn, cleanup, nil case 7: // partial-block check @@ -547,108 +545,75 @@ func (f *Fs) uploadToOSS( options ...fs.OpenOption, ) (fs.Object, error) { if ui == nil { - expBackoff := backoff.NewExponentialBackOff() - expBackoff.InitialInterval = 1 * time.Second - expBackoff.MaxElapsedTime = 30 * time.Second - - err := backoff.RetryNotify(func() error { - var initErr error - ui, initErr = f.initUpload(ctx, size, leaf, dirID, "", "", "") - if initErr != nil { - return initErr - } - if ui == nil { - return errors.New("initUpload returned nil") - } - return nil - }, expBackoff, func(err error, d time.Duration) { - fs.Debugf(o, "initUpload retry error: %v", err) - }) - - if err != nil { - return nil, fmt.Errorf("failed to initialize upload: %w", err) + var initErr error + ui, initErr = f.initUpload(ctx, size, leaf, dirID, "", "", "") // Initial initUpload without SHA1 for normal upload flow + if initErr != nil { + return nil, fmt.Errorf("failed to initialize upload: %w", initErr) + } + if ui == nil { + return nil, errors.New("initUpload returned nil") } } - operation := func() error { - cutoff := int64(o.fs.opt.UploadCutoff) - if size < cutoff { - client := f.newOSSClient() - req := &oss.PutObjectRequest{ - Bucket: oss.Ptr(ui.Bucket), - Key: oss.Ptr(ui.Object), - Body: in, - Callback: oss.Ptr(ui.GetCallback()), - CallbackVar: oss.Ptr(ui.GetCallbackVar()), - } - - for _, opt := range options { - k, v := opt.Header() - switch strings.ToLower(k) { - case "cache-control": - req.CacheControl = oss.Ptr(v) - case "content-disposition": - req.ContentDisposition = oss.Ptr(v) - case "content-encoding": - req.ContentEncoding = oss.Ptr(v) - case "content-type": - req.ContentType = oss.Ptr(v) - } - } - - // FIXED: Reset read position before retries - if seeker, ok := in.(io.Seeker); ok { - _, _ = seeker.Seek(0, io.SeekStart) - } + cutoff := int64(o.fs.opt.UploadCutoff) + if size < cutoff { + client := f.newOSSClient() + req := &oss.PutObjectRequest{ + Bucket: oss.Ptr(ui.Bucket), + Key: oss.Ptr(ui.Object), + Body: in, + Callback: oss.Ptr(ui.GetCallback()), + CallbackVar: oss.Ptr(ui.GetCallbackVar()), + } - res, err := client.PutObject(ctx, req) - if err != nil { - return fmt.Errorf("putObject error: %w", err) + for _, opt := range options { + k, v := opt.Header() + switch strings.ToLower(k) { + case "cache-control": + req.CacheControl = oss.Ptr(v) + case "content-disposition": + req.ContentDisposition = oss.Ptr(v) + case "content-encoding": + req.ContentEncoding = oss.Ptr(v) + case "content-type": + req.ContentType = oss.Ptr(v) } + } - data, err := f.postUpload(res.CallbackResult) - if err != nil { - return fmt.Errorf("finalize error: %w", err) - } - return o.setMetaDataFromCallBack(data) + res, err := client.PutObject(ctx, req) + if err != nil { + return nil, fmt.Errorf("putObject error: %w", err) } - // FIXED: Handle file inputs in chunk writer - mu, err := f.newChunkWriter(ctx, remote(o), src, ui, in, options...) + data, err := f.postUpload(res.CallbackResult) if err != nil { - return fmt.Errorf("multipart init error: %w", err) + return nil, fmt.Errorf("finalize error: %w", err) } + return o, o.setMetaDataFromCallBack(data) + } - if err = mu.Upload(ctx); err != nil { - if errors.Is(err, context.Canceled) { - return backoff.Permanent(err) - } + mu, err := f.newChunkWriter(ctx, remote(o), src, ui, in, options...) + if err != nil { + return nil, fmt.Errorf("multipart init error: %w", err) + } - var ossErr *oss.ServiceError - if errors.As(err, &ossErr) && ossErr.Code == "PartAlreadyExist" { - fs.Debugf(o, "part already exists") - return nil - } - return fmt.Errorf("upload error: %w", err) + if err = mu.Upload(ctx); err != nil { + if errors.Is(err, context.Canceled) { + return nil, backoff.Permanent(err) // Don't retry context cancelled } - - data, err := f.postUpload(mu.callbackRes) - if err != nil { - return fmt.Errorf("finalize error: %w", err) + var ossErr *oss.ServiceError + if errors.As(err, &ossErr) && ossErr.Code == "PartAlreadyExist" { + fs.Debugf(o, "part already exists") // For debug, not critical error + } else { + return nil, fmt.Errorf("upload error: %w", err) } - return o.setMetaDataFromCallBack(data) } - expBackoff := backoff.NewExponentialBackOff() - expBackoff.MaxElapsedTime = 2 * time.Minute - err := backoff.RetryNotify(operation, expBackoff, func(err error, d time.Duration) { - fs.Logf(o, "upload error: %v, retrying", err) - }) - + data, err := f.postUpload(mu.callbackRes) if err != nil { - return nil, fmt.Errorf("upload failed: %w", err) + return nil, fmt.Errorf("finalize error: %w", err) } - return o, nil + return o, o.setMetaDataFromCallBack(data) } // doSampleUpload is a helper to perform the "simple form" approach for small files. @@ -673,13 +638,7 @@ func (f *Fs) doSampleUpload( } // upload is the main entry point that decides which upload strategy to use. -func (f *Fs) upload( - ctx context.Context, - in io.Reader, - src fs.ObjectInfo, - remote string, - options ...fs.OpenOption, -) (fs.Object, error) { +func (f *Fs) upload(ctx context.Context, in io.Reader, src fs.ObjectInfo, remote string, options ...fs.OpenOption) (fs.Object, error) { if f.isShare { return nil, errors.New("unsupported for shared filesystem") } @@ -749,11 +708,6 @@ func (f *Fs) upload( return o, nil } - // FIXED: Reset read position before upload - if seeker, ok := newIn.(io.Seeker); ok { - _, _ = seeker.Seek(0, io.SeekStart) - } - if ui != nil { if size <= int64(StreamUploadLimit) { obj, err := f.doSampleUpload(ctx, newIn, o, leaf, dirID, size, options...) @@ -827,11 +781,6 @@ func (f *Fs) upload( return o, nil } - // FIXED: Reset read position before upload - if seeker, ok := newIn.(io.Seeker); ok { - _, _ = seeker.Seek(0, io.SeekStart) - } - obj, err := f.uploadToOSS(ctx, newIn, src, o, leaf, dirID, size, ui, options...) if err != nil { return nil, err