Skip to content

Commit

Permalink
Update upload.go
Browse files Browse the repository at this point in the history
  • Loading branch information
Sakura-Byte committed Feb 1, 2025
1 parent c92cd96 commit 59ffb4e
Showing 1 changed file with 57 additions and 108 deletions.
165 changes: 57 additions & 108 deletions backend/115/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func (f *Fs) sampleUploadForm(ctx context.Context, in io.Reader, initResp *api.S
}

// ─────────────────────────────────────────────────────────────────────────────────────
// NEWLY REFACTORED FUNCTIONS
// NEWLY REFACTORED FUNCTIONS - Applied to the second code
// ─────────────────────────────────────────────────────────────────────────────────────

// tryHashUpload attempts 秒传 by checking if the file already exists on the server.
Expand Down Expand Up @@ -483,6 +483,7 @@ func (f *Fs) tryHashUpload(
}
} else {
fs.Debugf(o, "tryHashUpload: using precomputed SHA1=%s", hashStr)
newIn = in // if SHA1 is precomputed, no need to change the input reader.
}
o.sha1sum = strings.ToLower(hashStr)

Expand All @@ -504,16 +505,13 @@ func (f *Fs) tryHashUpload(
acc.ServerSideTransferStart()
acc.ServerSideCopyEnd(size)
}
// Optionally fetch final info
if info, err2 := f.getFile(ctx, "", ui.PickCode); err2 == nil {
_ = o.setMetaData(info)
}
return false, ui, newIn, cleanup, nil
// Optionally fetch final info - Removed this line in this fix to simplify and avoid potential issues if getFile fails.
return true, ui, newIn, cleanup, nil // Return true to indicate 秒传 success

case 1:
// status=1 => server doesn't have file => need actual upload
fs.Debugf(o, "tryHashUpload: 秒传 not possible => server requires real upload.")
return false, ui, in, cleanup, nil
return false, ui, newIn, cleanup, nil

case 7:
// partial-block check
Expand Down Expand Up @@ -547,108 +545,75 @@ func (f *Fs) uploadToOSS(
options ...fs.OpenOption,
) (fs.Object, error) {
if ui == nil {
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = 1 * time.Second
expBackoff.MaxElapsedTime = 30 * time.Second

err := backoff.RetryNotify(func() error {
var initErr error
ui, initErr = f.initUpload(ctx, size, leaf, dirID, "", "", "")
if initErr != nil {
return initErr
}
if ui == nil {
return errors.New("initUpload returned nil")
}
return nil
}, expBackoff, func(err error, d time.Duration) {
fs.Debugf(o, "initUpload retry error: %v", err)
})

if err != nil {
return nil, fmt.Errorf("failed to initialize upload: %w", err)
var initErr error
ui, initErr = f.initUpload(ctx, size, leaf, dirID, "", "", "") // Initial initUpload without SHA1 for normal upload flow
if initErr != nil {
return nil, fmt.Errorf("failed to initialize upload: %w", initErr)
}
if ui == nil {
return nil, errors.New("initUpload returned nil")
}
}

operation := func() error {
cutoff := int64(o.fs.opt.UploadCutoff)
if size < cutoff {
client := f.newOSSClient()
req := &oss.PutObjectRequest{
Bucket: oss.Ptr(ui.Bucket),
Key: oss.Ptr(ui.Object),
Body: in,
Callback: oss.Ptr(ui.GetCallback()),
CallbackVar: oss.Ptr(ui.GetCallbackVar()),
}

for _, opt := range options {
k, v := opt.Header()
switch strings.ToLower(k) {
case "cache-control":
req.CacheControl = oss.Ptr(v)
case "content-disposition":
req.ContentDisposition = oss.Ptr(v)
case "content-encoding":
req.ContentEncoding = oss.Ptr(v)
case "content-type":
req.ContentType = oss.Ptr(v)
}
}

// FIXED: Reset read position before retries
if seeker, ok := in.(io.Seeker); ok {
_, _ = seeker.Seek(0, io.SeekStart)
}
cutoff := int64(o.fs.opt.UploadCutoff)
if size < cutoff {
client := f.newOSSClient()
req := &oss.PutObjectRequest{
Bucket: oss.Ptr(ui.Bucket),
Key: oss.Ptr(ui.Object),
Body: in,
Callback: oss.Ptr(ui.GetCallback()),
CallbackVar: oss.Ptr(ui.GetCallbackVar()),
}

res, err := client.PutObject(ctx, req)
if err != nil {
return fmt.Errorf("putObject error: %w", err)
for _, opt := range options {
k, v := opt.Header()
switch strings.ToLower(k) {
case "cache-control":
req.CacheControl = oss.Ptr(v)
case "content-disposition":
req.ContentDisposition = oss.Ptr(v)
case "content-encoding":
req.ContentEncoding = oss.Ptr(v)
case "content-type":
req.ContentType = oss.Ptr(v)
}
}

data, err := f.postUpload(res.CallbackResult)
if err != nil {
return fmt.Errorf("finalize error: %w", err)
}
return o.setMetaDataFromCallBack(data)
res, err := client.PutObject(ctx, req)
if err != nil {
return nil, fmt.Errorf("putObject error: %w", err)
}

// FIXED: Handle file inputs in chunk writer
mu, err := f.newChunkWriter(ctx, remote(o), src, ui, in, options...)
data, err := f.postUpload(res.CallbackResult)
if err != nil {
return fmt.Errorf("multipart init error: %w", err)
return nil, fmt.Errorf("finalize error: %w", err)
}
return o, o.setMetaDataFromCallBack(data)
}

if err = mu.Upload(ctx); err != nil {
if errors.Is(err, context.Canceled) {
return backoff.Permanent(err)
}
mu, err := f.newChunkWriter(ctx, remote(o), src, ui, in, options...)
if err != nil {
return nil, fmt.Errorf("multipart init error: %w", err)
}

var ossErr *oss.ServiceError
if errors.As(err, &ossErr) && ossErr.Code == "PartAlreadyExist" {
fs.Debugf(o, "part already exists")
return nil
}
return fmt.Errorf("upload error: %w", err)
if err = mu.Upload(ctx); err != nil {
if errors.Is(err, context.Canceled) {
return nil, backoff.Permanent(err) // Don't retry context cancelled
}

data, err := f.postUpload(mu.callbackRes)
if err != nil {
return fmt.Errorf("finalize error: %w", err)
var ossErr *oss.ServiceError
if errors.As(err, &ossErr) && ossErr.Code == "PartAlreadyExist" {
fs.Debugf(o, "part already exists") // For debug, not critical error
} else {
return nil, fmt.Errorf("upload error: %w", err)
}
return o.setMetaDataFromCallBack(data)
}

expBackoff := backoff.NewExponentialBackOff()
expBackoff.MaxElapsedTime = 2 * time.Minute
err := backoff.RetryNotify(operation, expBackoff, func(err error, d time.Duration) {
fs.Logf(o, "upload error: %v, retrying", err)
})

data, err := f.postUpload(mu.callbackRes)
if err != nil {
return nil, fmt.Errorf("upload failed: %w", err)
return nil, fmt.Errorf("finalize error: %w", err)
}
return o, nil
return o, o.setMetaDataFromCallBack(data)
}

// doSampleUpload is a helper to perform the "simple form" approach for small files.
Expand All @@ -673,13 +638,7 @@ func (f *Fs) doSampleUpload(
}

// upload is the main entry point that decides which upload strategy to use.
func (f *Fs) upload(
ctx context.Context,
in io.Reader,
src fs.ObjectInfo,
remote string,
options ...fs.OpenOption,
) (fs.Object, error) {
func (f *Fs) upload(ctx context.Context, in io.Reader, src fs.ObjectInfo, remote string, options ...fs.OpenOption) (fs.Object, error) {
if f.isShare {
return nil, errors.New("unsupported for shared filesystem")
}
Expand Down Expand Up @@ -749,11 +708,6 @@ func (f *Fs) upload(
return o, nil
}

// FIXED: Reset read position before upload
if seeker, ok := newIn.(io.Seeker); ok {
_, _ = seeker.Seek(0, io.SeekStart)
}

if ui != nil {
if size <= int64(StreamUploadLimit) {
obj, err := f.doSampleUpload(ctx, newIn, o, leaf, dirID, size, options...)
Expand Down Expand Up @@ -827,11 +781,6 @@ func (f *Fs) upload(
return o, nil
}

// FIXED: Reset read position before upload
if seeker, ok := newIn.(io.Seeker); ok {
_, _ = seeker.Seek(0, io.SeekStart)
}

obj, err := f.uploadToOSS(ctx, newIn, src, o, leaf, dirID, size, ui, options...)
if err != nil {
return nil, err
Expand Down

0 comments on commit 59ffb4e

Please sign in to comment.