Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] services/horizon: Cache files downloaded from HAs #5165

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion exp/lighthorizon/ingester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ func NewIngester(config IngesterConfig) (Ingester, error) {
}

if parsed.Scheme != "file" { // otherwise, already on-disk
cache, errr := storage.MakeOnDiskCache(source, config.CacheDir, uint(config.CacheSize))
cache, errr := storage.MakeOnDiskCache(source, storage.OnDiskCacheConfig{
Path: config.CacheDir,
MaxFiles: uint(config.CacheSize),
})

if errr != nil { // non-fatal: warn but continue w/o cache
log.WithField("path", config.CacheDir).WithError(errr).
Expand Down
6 changes: 5 additions & 1 deletion exp/lighthorizon/tools/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,11 @@ func BuildCache(ledgerSource, cacheDir string, start uint32, count uint32, repai
store, err := storage.ConnectBackend(ledgerSource, storage.ConnectOptions{
Context: ctx,
Wrap: func(store storage.Storage) (storage.Storage, error) {
return storage.MakeOnDiskCache(store, cacheDir, uint(count))
return storage.MakeOnDiskCache(store, storage.OnDiskCacheConfig{
Path: cacheDir,
MaxFiles: uint(count),
Log: log,
})
},
})
if err != nil {
Expand Down
10 changes: 7 additions & 3 deletions historyarchive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/url"
"path"
"regexp"
Expand Down Expand Up @@ -75,6 +74,7 @@ type ArchiveInterface interface {
GetXdrStreamForHash(hash Hash) (*XdrStream, error)
GetXdrStream(pth string) (*XdrStream, error)
GetCheckpointManager() CheckpointManager
Close() error
}

var _ ArchiveInterface = &Archive{}
Expand Down Expand Up @@ -149,8 +149,8 @@ func (a *Archive) PutPathHAS(path string, has HistoryArchiveState, opts *Command
if err != nil {
return err
}
return a.backend.PutFile(path,
ioutil.NopCloser(bytes.NewReader(buf)))

return a.backend.PutFile(path, io.NopCloser(bytes.NewReader(buf)))
}

func (a *Archive) BucketExists(bucket Hash) (bool, error) {
Expand Down Expand Up @@ -366,6 +366,10 @@ func (a *Archive) GetXdrStream(pth string) (*XdrStream, error) {
return NewXdrGzStream(rdr)
}

func (a *Archive) Close() error {
return a.backend.Close()
}

func Connect(u string, opts ArchiveOptions) (*Archive, error) {
arch := Archive{
networkPassphrase: opts.NetworkPassphrase,
Expand Down
182 changes: 182 additions & 0 deletions historyarchive/archive_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package historyarchive

import (
"fmt"
"io"
"os"
"path"

lru "github.com/hashicorp/golang-lru"
log "github.com/sirupsen/logrus"

Check failure on line 10 in historyarchive/archive_cache.go

View workflow job for this annotation

GitHub Actions / golangci

`github.com/sirupsen/logrus` is in the denylist: logging is allowed only by logutils.Log (depguard)
"github.com/stellar/go/support/storage"
)

type CacheOptions struct {
Cache bool
Path string
MaxFiles uint
}

type ArchiveBucketCache struct {
path string
lru *lru.Cache
log *log.Entry
}

func MakeArchiveBucketCache(path string, maxFiles uint) (*ArchiveBucketCache, error) {
log_ := log.
WithField("subservice", "fs-cache").
WithField("path", path).
WithField("cap", 100)

Check failure on line 30 in historyarchive/archive_cache.go

View workflow job for this annotation

GitHub Actions / golangci

mnd: Magic number: 100, in <argument> detected (gomnd)

backend := &ArchiveBucketCache{
path: path,
log: log_,
}

cache, err := lru.NewWithEvict(int(maxFiles), backend.onEviction)
if err != nil {
return &ArchiveBucketCache{}, err
}

backend.lru = cache
return backend, nil
}

// GetFile retrieves the file contents from the local cache if present.
// Otherwise, it returns the same result that the wrapped backend returns and
// adds that result into the local cache, if possible.
func (abc *ArchiveBucketCache) GetFile(
filepath string,
upstream storage.Storage,
) (io.ReadCloser, error) {
L := abc.log.WithField("key", filepath)
localPath := path.Join(abc.path, filepath)

// If the lockfile exists, we should defer to the remote source but *not*
// update the cache, as it means there's an in-progress sync of the same
// file.
_, statErr := os.Stat(NameLockfile(localPath))
if statErr == nil {
L.Info("Incomplete file in on-disk cache: deferring")
return nil, os.ErrExist
} else if _, ok := abc.lru.Get(localPath); !ok {
L.Info("File does not exist in the cache: downloading")

// Since it's not on-disk, pull it from the remote backend, shove it
// into the cache, and write it to disk.
remote, err := upstream.GetFile(filepath)
if err != nil {
return remote, err
}

local, err := abc.createLocal(filepath)
if err != nil {
// If there's some local FS error, we can still continue with the
// remote version, so just log it and continue.
L.WithError(err).Error("Caching ledger failed")
return remote, nil
}

return teeReadCloser(remote, local, func() error {
return os.Remove(NameLockfile(localPath))
}), nil
}

L.Info("Found file in cache")
// The cache claims it exists, so just give it a read and send it.
local, err := os.Open(localPath)
if err != nil {
// Uh-oh, the cache and the disk are not in sync somehow? Let's evict
// this value and try again (recurse) w/ the remote version.
L.WithError(err).Warn("Opening cached ledger failed")
abc.lru.Remove(localPath)
return abc.GetFile(filepath, upstream)
}

return local, nil
}

// Close purges the cache, then forwards the call to the wrapped backend.
func (abc *ArchiveBucketCache) Close() error {
// We only purge the cache, leaving the filesystem untouched:
// https://github.com/stellar/go/pull/4457#discussion_r929352643
abc.lru.Purge()

// Only bubble up the disk purging error if there is no other error.
return os.RemoveAll(abc.path)
}

// Evict removes a file from the cache and the filesystem, but does not affect
// the upstream backend. It isn't part of the `Storage` interface.
func (abc *ArchiveBucketCache) Evict(filepath string) {
log.WithField("key", filepath).Info("evicting file")
abc.lru.Remove(path.Join(abc.path, filepath))
}

func (abc *ArchiveBucketCache) onEviction(key, value interface{}) {
path := key.(string)
os.Remove(NameLockfile(path)) // just in case
if err := os.Remove(path); err != nil { // best effort removal
abc.log.WithError(err).
WithField("key", path).
Warn("removal failed after cache eviction")
}
}

func (abc *ArchiveBucketCache) createLocal(filepath string) (*os.File, error) {
localPath := path.Join(abc.path, filepath)
if err := os.MkdirAll(path.Dir(localPath), 0755 /* drwxr-xr-x */); err != nil {

Check failure on line 129 in historyarchive/archive_cache.go

View workflow job for this annotation

GitHub Actions / golangci

mnd: Magic number: 0755, in <argument> detected (gomnd)
return nil, err
}

local, err := os.Create(localPath) /* mode -rw-rw-rw- */
if err != nil {
return nil, err
}
_, err = os.Create(NameLockfile(localPath))
if err != nil {
return nil, err
}

abc.lru.Add(localPath, struct{}{}) // just use the cache as an array
return local, nil
}

func NameLockfile(file string) string {
return file + ".lock"
}

// The below is a helper interface so that we can use io.TeeReader to write
// data locally immediately as we read it remotely.

type trc struct {
io.Reader
close func() error
}

func (t trc) Close() error {
return t.close()
}

func teeReadCloser(r io.ReadCloser, w io.WriteCloser, onClose func() error) io.ReadCloser {
fmt.Printf("Making teeReadCloser onto %v and %v\n", r, w)
return trc{
Reader: io.TeeReader(r, w),
close: func() error {
// Always run all closers, but return the first error
err1 := r.Close()
err2 := w.Close()
err3 := onClose()

fmt.Println("Errors were:", err1, err2, err3)

if err1 != nil {
return err1
} else if err2 != nil {
return err2
}
return err3
},
}
}
12 changes: 12 additions & 0 deletions historyarchive/archive_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,15 @@ func (pa ArchivePool) GetXdrStream(pth string) (*XdrStream, error) {
func (pa ArchivePool) GetCheckpointManager() CheckpointManager {
return pa.GetAnyArchive().GetCheckpointManager()
}

func (pa ArchivePool) Close() error {
// Closes all archives, but returns the first error encountered (if any)
var anyErr error
for _, pool := range pa {
err := pool.Close()
if anyErr == nil {
anyErr = err
}
}
return anyErr
}
5 changes: 2 additions & 3 deletions historyarchive/mock_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"bytes"
"errors"
"io"
"io/ioutil"
"strings"
"sync"

Expand Down Expand Up @@ -46,13 +45,13 @@ func (b *MockArchiveBackend) GetFile(pth string) (io.ReadCloser, error) {
if !ok {
return nil, errors.New("no such file: " + pth)
}
return ioutil.NopCloser(bytes.NewReader(buf)), nil
return io.NopCloser(bytes.NewReader(buf)), nil
}

func (b *MockArchiveBackend) PutFile(pth string, in io.ReadCloser) error {
b.mutex.Lock()
defer b.mutex.Unlock()
buf, e := ioutil.ReadAll(in)
buf, e := io.ReadAll(in)
if e != nil {
return e
}
Expand Down
5 changes: 5 additions & 0 deletions historyarchive/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,8 @@ func (m *MockArchive) GetXdrStream(pth string) (*XdrStream, error) {
a := m.Called(pth)
return a.Get(0).(*XdrStream), a.Error(1)
}

func (m *MockArchive) Close() error {
a := m.Called()
return a.Error(0)
}
7 changes: 3 additions & 4 deletions historyarchive/xdrstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"fmt"
"hash"
"io"
"io/ioutil"

"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
Expand Down Expand Up @@ -107,7 +106,7 @@ func (x *XdrStream) ExpectedHash() ([sha256.Size]byte, bool) {
func (x *XdrStream) Close() error {
if x.validateHash {
// Read all remaining data from rdr
_, err := io.Copy(ioutil.Discard, x.rdr)
_, err := io.Copy(io.Discard, x.rdr)
if err != nil {
// close the internal readers to avoid memory leaks
x.closeReaders()
Expand Down Expand Up @@ -204,7 +203,7 @@ func (x *XdrStream) GzipBytesRead() int64 {

// Discard removes n bytes from the stream
func (x *XdrStream) Discard(n int64) (int64, error) {
return io.CopyN(ioutil.Discard, x.rdr, n)
return io.CopyN(io.Discard, x.rdr, n)
}

func CreateXdrStream(entries ...xdr.BucketEntry) *XdrStream {
Expand All @@ -216,5 +215,5 @@ func CreateXdrStream(entries ...xdr.BucketEntry) *XdrStream {
}
}

return NewXdrStream(ioutil.NopCloser(b))
return NewXdrStream(io.NopCloser(b))
}
11 changes: 9 additions & 2 deletions ingest/checkpoint_change_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"

log "github.com/sirupsen/logrus"

Check failure on line 13 in ingest/checkpoint_change_reader.go

View workflow job for this annotation

GitHub Actions / golangci

`github.com/sirupsen/logrus` is in the denylist: logging is allowed only by logutils.Log (depguard)
)

// readResult is the result of reading a bucket value
Expand Down Expand Up @@ -311,9 +313,13 @@
defer func() {
err := rdr.Close()
if err != nil {
r.readChan <- r.error(errors.Wrap(err, "Error closing xdr stream"))
// FIXME: When using a cache, this errors due to a double-close
// which isn't really a real error but is strictly incorrect.
// r.readChan <- r.error(errors.Wrap(err, "Error closing xdr stream"))
log.WithError(err).Info("Error closing xdr stream")

// Stop streaming from the rest of the files.
r.Close()
// r.Close()
}
}()

Expand Down Expand Up @@ -557,6 +563,7 @@

func (r *CheckpointChangeReader) close() {
close(r.done)
r.archive.Close()
}

// Progress returns progress reading all buckets in percents.
Expand Down
15 changes: 11 additions & 4 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,14 +762,21 @@ func (c *CaptiveStellarCore) Close() error {
// after the CaptiveStellarCore context is canceled all subsequent calls to PrepareRange() will fail
c.cancel()

// TODO: Sucks to ignore the error here, but no worse than it was before,
// so...
// TODO: Sucks to ignore the error here, but no worse than it was before, so...
if c.ledgerHashStore != nil {
c.ledgerHashStore.Close()
}

var err error
if c.stellarCoreRunner != nil {
return c.stellarCoreRunner.close()
err = c.stellarCoreRunner.close()
}
return nil

// Underlying archive may need to be closed, too.
archiveErr := c.archive.Close()
if err == nil {
err = archiveErr
}

return err
}
Loading
Loading