Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Carry 1948] fs/remote: Refactor blob code to make it more modular #1955

Merged
merged 2 commits into from
Feb 13, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 119 additions & 95 deletions fs/remote/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,28 +259,33 @@ func (b *blob) ReadAt(p []byte, offset int64, opts ...Option) (int, error) {
o(&readAtOpts)
}

// Fetcher can be suddenly updated so we take and use the snapshot of it for
// consistency.
b.fetcherMu.Lock()
fr := b.fetcher
b.fetcherMu.Unlock()
fr := b.getFetcher()

if err := b.prepareChunksForRead(allRegion, offset, p, fr, allData, &readAtOpts); err != nil {
return 0, err
}

// Read required data
if err := b.fetchRange(allData, &readAtOpts); err != nil {
return 0, err
}

return b.adjustBufferSize(p, offset), nil
}

b.walkChunks(allRegion, func(chunk region) error {
// prepareChunksForRead prepares chunks for reading by checking cache and setting up writers
func (b *blob) prepareChunksForRead(allRegion region, offset int64, p []byte, fr fetcher, allData map[region]io.Writer, opts *options) error {
return b.walkChunks(allRegion, func(chunk region) error {
var (
base = positive(chunk.b - offset)
lowerUnread = positive(offset - chunk.b)
upperUnread = positive(chunk.e + 1 - (offset + int64(len(p))))
expectedSize = chunk.size() - upperUnread - lowerUnread
)

// Check if the content exists in the cache
r, err := b.cache.Get(fr.genID(chunk), readAtOpts.cacheOpts...)
if err == nil {
defer r.Close()
n, err := r.ReadAt(p[base:base+expectedSize], lowerUnread)
if (err == nil || err == io.EOF) && int64(n) == expectedSize {
return nil
}
// Try to read from cache first
if err := b.readFromCache(chunk, p[base:base+expectedSize], lowerUnread, fr, opts); err == nil {
return nil
}

// We missed cache. Take it from remote registry.
Expand All @@ -289,21 +294,23 @@ func (b *blob) ReadAt(p []byte, offset int64, opts ...Option) (int, error) {
allData[chunk] = newBytesWriter(p[base:base+expectedSize], lowerUnread)
return nil
})
}

// Read required data
if err := b.fetchRange(allData, &readAtOpts); err != nil {
return 0, err
// readFromCache attempts to read chunk data from cache
func (b *blob) readFromCache(chunk region, dest []byte, offset int64, fr fetcher, opts *options) error {
r, err := b.cache.Get(fr.genID(chunk), opts.cacheOpts...)
if err != nil {
return err
}

// Adjust the buffer size according to the blob size
if remain := b.size - offset; int64(len(p)) >= remain {
if remain < 0 {
remain = 0
}
p = p[:remain]
defer r.Close()
n, err := r.ReadAt(dest, offset)
if err != nil && err != io.EOF {
return err
}

return len(p), nil
if n != len(dest) {
return fmt.Errorf("incomplete read from cache: read %d bytes, expected %d bytes", n, len(dest))
}
return nil
}

// fetchRegions fetches all specified chunks from remote blob and puts it in the local cache.
Expand All @@ -313,11 +320,7 @@ func (b *blob) fetchRegions(allData map[region]io.Writer, fetched map[region]boo
return nil
}

// Fetcher can be suddenly updated so we take and use the snapshot of it for
// consistency.
b.fetcherMu.Lock()
fr := b.fetcher
b.fetcherMu.Unlock()
fr := b.getFetcher()

// request missed regions
var req []region
Expand All @@ -332,7 +335,6 @@ func (b *blob) fetchRegions(allData map[region]io.Writer, fetched map[region]boo
fetchCtx = opts.ctx
}
mr, err := fr.fetch(fetchCtx, req, true)

if err != nil {
return err
}
Expand All @@ -353,35 +355,9 @@ func (b *blob) fetchRegions(allData map[region]io.Writer, fetched map[region]boo
return fmt.Errorf("failed to read multipart resp: %w", err)
}
if err := b.walkChunks(reg, func(chunk region) (retErr error) {
id := fr.genID(chunk)
cw, err := b.cache.Add(id, opts.cacheOpts...)
if err != nil {
if err := b.cacheChunkData(chunk, p, fr, allData, fetched, opts); err != nil {
return err
}
defer cw.Close()
w := io.Writer(cw)

// If this chunk is one of the targets, write the content to the
// passed reader too.
if _, ok := fetched[chunk]; ok {
w = io.MultiWriter(w, allData[chunk])
}

// Copy the target chunk
if _, err := io.CopyN(w, p, chunk.size()); err != nil {
cw.Abort()
return err
}

// Add the target chunk to the cache
if err := cw.Commit(); err != nil {
return err
}

b.fetchedRegionSetMu.Lock()
b.fetchedRegionSet.add(chunk)
b.fetchedRegionSetMu.Unlock()
fetched[chunk] = true
return nil
}); err != nil {
return fmt.Errorf("failed to get chunks: %w", err)
Expand All @@ -408,9 +384,6 @@ func (b *blob) fetchRange(allData map[region]io.Writer, opts *options) error {
return nil
}

// We build a key based on regions we need to fetch and pass it to singleflightGroup.Do(...)
// to block simultaneous same requests. Once the request is finished and the data is ready,
// all blocked callers will be unblocked and that same data will be returned by all blocked callers.
key := makeSyncKey(allData)
fetched := make(map[region]bool)
_, err, shared := b.fetchedRegionGroup.Do(key, func() (interface{}, error) {
Expand All @@ -420,44 +393,64 @@ func (b *blob) fetchRange(allData map[region]io.Writer, opts *options) error {
// When unblocked try to read from cache in case if there were no errors
// If we fail reading from cache, fetch from remote registry again
if err == nil && shared {
for reg := range allData {
if _, ok := fetched[reg]; ok {
continue
}
err = b.walkChunks(reg, func(chunk region) error {
b.fetcherMu.Lock()
fr := b.fetcher
b.fetcherMu.Unlock()

// Check if the content exists in the cache
// And if exists, read from cache
r, err := b.cache.Get(fr.genID(chunk), opts.cacheOpts...)
if err != nil {
return err
}
defer r.Close()
rr := io.NewSectionReader(r, 0, chunk.size())

// Copy the target chunk
b.fetchedRegionCopyMu.Lock()
defer b.fetchedRegionCopyMu.Unlock()
if _, err := io.CopyN(allData[chunk], rr, chunk.size()); err != nil {
return err
}
return nil
})
if err != nil {
break
}
if err := b.handleSharedFetch(allData, fetched, opts); err != nil {
return b.fetchRange(allData, opts) // retry on error
}
}

// if we cannot read the data from cache, do fetch again
if err != nil {
return b.fetchRange(allData, opts)
return err
}

// handleSharedFetch handles the case when multiple goroutines share the same fetch result
func (b *blob) handleSharedFetch(allData map[region]io.Writer, fetched map[region]bool, opts *options) error {
for reg := range allData {
if _, ok := fetched[reg]; ok {
continue
}
if err := b.copyFetchedChunks(reg, allData, opts); err != nil {
return err
}
}
return nil
}

return err
// copyFetchedChunks copies fetched chunks from cache to target writer
func (b *blob) copyFetchedChunks(reg region, allData map[region]io.Writer, opts *options) error {
return b.walkChunks(reg, func(chunk region) error {
fr := b.getFetcher()
r, err := b.cache.Get(fr.genID(chunk), opts.cacheOpts...)
if err != nil {
return err
}
defer r.Close()

b.fetchedRegionCopyMu.Lock()
defer b.fetchedRegionCopyMu.Unlock()

if _, err := io.CopyN(allData[chunk], io.NewSectionReader(r, 0, chunk.size()), chunk.size()); err != nil {
return err
}
return nil
})
}

// getFetcher safely gets the current fetcher
// Fetcher can be suddenly updated so we take and use the snapshot of it for consistency.
func (b *blob) getFetcher() fetcher {
b.fetcherMu.Lock()
defer b.fetcherMu.Unlock()
return b.fetcher
}

// adjustBufferSize adjusts buffer size according to the blob size
func (b *blob) adjustBufferSize(p []byte, offset int64) int {
if remain := b.size - offset; int64(len(p)) >= remain {
if remain < 0 {
remain = 0
}
p = p[:remain]
}
return len(p)
}

type walkFunc func(reg region) error
Expand Down Expand Up @@ -533,3 +526,34 @@ func positive(n int64) int64 {
}
return n
}

// cacheChunkData handles caching of chunk data
func (b *blob) cacheChunkData(chunk region, r io.Reader, fr fetcher, allData map[region]io.Writer, fetched map[region]bool, opts *options) error {
id := fr.genID(chunk)
cw, err := b.cache.Add(id, opts.cacheOpts...)
if err != nil {
return fmt.Errorf("failed to create cache writer: %w", err)
}
defer cw.Close()

w := io.Writer(cw)
if _, ok := fetched[chunk]; ok {
w = io.MultiWriter(w, allData[chunk])
}

if _, err := io.CopyN(w, r, chunk.size()); err != nil {
cw.Abort()
return fmt.Errorf("failed to write chunk data: %w", err)
}

if err := cw.Commit(); err != nil {
return fmt.Errorf("failed to commit chunk: %w", err)
}

b.fetchedRegionSetMu.Lock()
b.fetchedRegionSet.add(chunk)
b.fetchedRegionSetMu.Unlock()
fetched[chunk] = true

return nil
}
Loading