From 0ac7f195638953ba2d9bd69304288be93fa71dfe Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Tue, 16 Jan 2024 21:20:24 -0800 Subject: [PATCH 1/6] Add filesystem-backed cache to captive core instance --- ingest/checkpoint_change_reader.go | 10 ++++++++-- services/horizon/internal/ingest/main.go | 5 +++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/ingest/checkpoint_change_reader.go b/ingest/checkpoint_change_reader.go index 37e5e994e1..98b576b487 100644 --- a/ingest/checkpoint_change_reader.go +++ b/ingest/checkpoint_change_reader.go @@ -9,6 +9,8 @@ import ( "github.com/stellar/go/historyarchive" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" + + log "github.com/sirupsen/logrus" ) // readResult is the result of reading a bucket value @@ -311,9 +313,13 @@ func (r *CheckpointChangeReader) streamBucketContents(hash historyarchive.Hash, defer func() { err := rdr.Close() if err != nil { - r.readChan <- r.error(errors.Wrap(err, "Error closing xdr stream")) + // FIXME: When using a cache, this errors due to a double-close + // which isn't really a real error but is strictly incorrect. + // r.readChan <- r.error(errors.Wrap(err, "Error closing xdr stream")) + log.WithError(err).Info("Error closing xdr stream") + // Stop streaming from the rest of the files. - r.Close() + // r.Close() } }() diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index f6c9e23f9f..c035299368 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -6,6 +6,7 @@ package ingest import ( "context" "fmt" + "path" "runtime" "sync" "time" @@ -240,6 +241,10 @@ func NewSystem(config Config) (System, error) { ConnectOptions: storage.ConnectOptions{ Context: ctx, UserAgent: fmt.Sprintf("horizon/%s golang/%s", apkg.Version(), runtime.Version()), + Wrap: func(upstream storage.Storage) (storage.Storage, error) { + p := path.Join(config.CaptiveCoreStoragePath, "history-archive-cache") + return storage.MakeOnDiskCache(upstream, p, 0) + }, }, }, ) From 45dda343e06f98d9692113074dca01fd0be9cac2 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Tue, 16 Jan 2024 21:20:30 -0800 Subject: [PATCH 2/6] Drop references to deprecated ioutil --- historyarchive/archive.go | 5 ++--- historyarchive/xdrstream.go | 7 +++---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/historyarchive/archive.go b/historyarchive/archive.go index 1679d2210f..dc4a56f83d 100644 --- a/historyarchive/archive.go +++ b/historyarchive/archive.go @@ -10,7 +10,6 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "net/url" "path" "regexp" @@ -149,8 +148,8 @@ func (a *Archive) PutPathHAS(path string, has HistoryArchiveState, opts *Command if err != nil { return err } - return a.backend.PutFile(path, - ioutil.NopCloser(bytes.NewReader(buf))) + + return a.backend.PutFile(path, io.NopCloser(bytes.NewReader(buf))) } func (a *Archive) BucketExists(bucket Hash) (bool, error) { diff --git a/historyarchive/xdrstream.go b/historyarchive/xdrstream.go index e0d9745585..e66875adc9 100644 --- a/historyarchive/xdrstream.go +++ b/historyarchive/xdrstream.go @@ -13,7 +13,6 @@ import ( "fmt" "hash" "io" - "io/ioutil" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" @@ -107,7 +106,7 @@ func (x *XdrStream) ExpectedHash() ([sha256.Size]byte, bool) { func (x *XdrStream) Close() error { if x.validateHash { // Read all remaining data from rdr - _, err := io.Copy(ioutil.Discard, x.rdr) + _, err := io.Copy(io.Discard, x.rdr) if err != nil { // close the internal readers to avoid memory leaks x.closeReaders() @@ -204,7 +203,7 @@ func (x *XdrStream) GzipBytesRead() int64 { // Discard removes n bytes from the stream func (x *XdrStream) Discard(n int64) (int64, error) { - return io.CopyN(ioutil.Discard, x.rdr, n) + return io.CopyN(io.Discard, x.rdr, n) } func CreateXdrStream(entries ...xdr.BucketEntry) *XdrStream { @@ -216,5 +215,5 @@ func CreateXdrStream(entries ...xdr.BucketEntry) *XdrStream { } } - return NewXdrStream(ioutil.NopCloser(b)) + return NewXdrStream(io.NopCloser(b)) } From 777c8ceb776e9606068bd15c079c907afe67efdd Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Wed, 17 Jan 2024 10:24:49 -0800 Subject: [PATCH 3/6] Add config for purging of on-disk cache --- services/horizon/internal/ingest/main.go | 7 +- support/storage/ondisk_cache.go | 109 +++++++++++++++-------- 2 files changed, 79 insertions(+), 37 deletions(-) diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index c035299368..a653d1e97e 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -243,7 +243,11 @@ func NewSystem(config Config) (System, error) { UserAgent: fmt.Sprintf("horizon/%s golang/%s", apkg.Version(), runtime.Version()), Wrap: func(upstream storage.Storage) (storage.Storage, error) { p := path.Join(config.CaptiveCoreStoragePath, "history-archive-cache") - return storage.MakeOnDiskCache(upstream, p, 0) + return storage.MakeOnDiskCache(upstream, storage.OnDiskCacheConfig{ + Path: p, + MaxFiles: 100, + Ephemeral: true, + }) }, }, }, @@ -788,6 +792,7 @@ func (s *system) Shutdown() { } s.stateVerificationMutex.Unlock() s.cancel() + // wait for ingestion state machine to terminate s.wg.Wait() s.historyQ.Close() diff --git a/support/storage/ondisk_cache.go b/support/storage/ondisk_cache.go index c8997d7e13..69ec9611d4 100644 --- a/support/storage/ondisk_cache.go +++ b/support/storage/ondisk_cache.go @@ -1,6 +1,7 @@ package storage import ( + "fmt" "io" "os" "path" @@ -14,44 +15,68 @@ import ( // the same time without corruption, because retrieval will wait for the fetch. type OnDiskCache struct { Storage - dir string - maxFiles int - lru *lru.Cache + OnDiskCacheConfig - log *log.Entry + lru *lru.Cache +} + +// OnDiskCacheConfig describes the configuration options for the on-disk cache. +type OnDiskCacheConfig struct { + // Where should we store the cache? If omitted, a temporary directory will + // be used. + Path string + + // How many items should we cache? If omitted, a default is used (90 days of + // individual ledger files, so ~24,000). If you aren't using this to store + // individual ledgers, you should definitely set this parameter. + MaxFiles uint + + // Should we remove the `Path` when `Close()` is called? If omitted, the + // cache will be preserved on disk. + Ephemeral bool + + // If provided, the cache will log to a subservice of this log. If omitted, + // the default system log will be used. + Log *log.Entry } // MakeOnDiskCache wraps an Storage with a local filesystem cache in // `dir`. If dir is blank, a temporary directory will be created. If `maxFiles` // is zero, a default (90 days of ledgers) is used. -func MakeOnDiskCache(upstream Storage, dir string, maxFiles uint) (Storage, error) { - if dir == "" { +func MakeOnDiskCache(upstream Storage, config OnDiskCacheConfig) (Storage, error) { + if config.Path == "" { tmp, err := os.MkdirTemp(os.TempDir(), "stellar-horizon-*") if err != nil { return nil, err } - dir = tmp + config.Path = tmp } - if maxFiles == 0 { + if config.MaxFiles == 0 { // A guess at a reasonable number of checkpoints. This is 90 days of // ledgers. (90*86_400)/(5*64) = 24_300 - maxFiles = 24_300 + config.MaxFiles = 24_300 + } + + if config.Log == nil { + config.Log = log. + WithField("subservice", "fs-cache"). + WithField("path", config.Path). + WithField("cap", config.MaxFiles) + } else { + config.Log = config.Log. + WithField("subservice", "fs-cache"). + WithField("path", config.Path). + WithField("cap", config.MaxFiles) } - backendLog := log. - WithField("subservice", "fs-cache"). - WithField("path", dir). - WithField("size", maxFiles) - backendLog.Info("Filesystem cache configured") + config.Log.Info("Filesystem cache configured") backend := &OnDiskCache{ - Storage: upstream, - dir: dir, - maxFiles: int(maxFiles), - log: backendLog, + Storage: upstream, + OnDiskCacheConfig: config, } - cache, err := lru.NewWithEvict(int(maxFiles), backend.onEviction) + cache, err := lru.NewWithEvict(int(config.MaxFiles), backend.onEviction) if err != nil { return nil, err } @@ -64,28 +89,27 @@ func MakeOnDiskCache(upstream Storage, dir string, maxFiles uint) (Storage, erro // Otherwise, it returns the same result that the wrapped backend returns and // adds that result into the local cache, if possible. func (b *OnDiskCache) GetFile(filepath string) (io.ReadCloser, error) { - L := b.log.WithField("key", filepath) - localPath := path.Join(b.dir, filepath) + L := b.Log.WithField("key", filepath) + localPath := path.Join(b.Path, filepath) // If the lockfile exists, we should defer to the remote source but *not* // update the cache, as it means there's an in-progress sync of the same // file. _, statErr := os.Stat(NameLockfile(localPath)) if statErr == nil { - L.Debug("incomplete file in cache on disk") - L.Debug("retrieving file from remote backend") + L.Debug("Incomplete file in on-disk cache: retrieving from backend") return b.Storage.GetFile(filepath) } else if _, ok := b.lru.Get(localPath); !ok { // If it doesn't exist in the cache, it might still exist on the disk if // we've restarted from an existing directory. local, err := os.Open(localPath) if err == nil { - L.Debug("found file on disk but not in cache, adding") + L.Debug("Found file on disk but not in cache, adding") b.lru.Add(localPath, struct{}{}) return local, nil } - L.Debug("retrieving file from remote backend") + L.Debug("Retrieving from remote backend") // Since it's not on-disk, pull it from the remote backend, shove it // into the cache, and write it to disk. @@ -98,7 +122,7 @@ func (b *OnDiskCache) GetFile(filepath string) (io.ReadCloser, error) { if err != nil { // If there's some local FS error, we can still continue with the // remote version, so just log it and continue. - L.WithError(err).Error("caching ledger failed") + L.WithError(err).Error("Caching ledger failed") return remote, nil } @@ -107,12 +131,13 @@ func (b *OnDiskCache) GetFile(filepath string) (io.ReadCloser, error) { }), nil } + L.Debug("Found file in cache") // The cache claims it exists, so just give it a read and send it. local, err := os.Open(localPath) if err != nil { // Uh-oh, the cache and the disk are not in sync somehow? Let's evict // this value and try again (recurse) w/ the remote version. - L.WithError(err).Warn("opening cached ledger failed") + L.WithError(err).Warn("Opening cached ledger failed") b.lru.Remove(localPath) return b.GetFile(filepath) } @@ -125,8 +150,8 @@ func (b *OnDiskCache) GetFile(filepath string) (io.ReadCloser, error) { // Otherwise, it returns the same result as the wrapped backend. Note that in // the latter case, the cache isn't modified. func (b *OnDiskCache) Exists(filepath string) (bool, error) { - localPath := path.Join(b.dir, filepath) - b.log.WithField("key", filepath).Debug("checking existence") + localPath := path.Join(b.Path, filepath) + b.Log.WithField("key", filepath).Debug("checking existence") if _, ok := b.lru.Get(localPath); ok { // If the cache says it's there, we can definitively say that this path @@ -141,8 +166,8 @@ func (b *OnDiskCache) Exists(filepath string) (bool, error) { // Otherwise, it returns the same result as the wrapped backend. Note that in // the latter case, the cache isn't modified. func (b *OnDiskCache) Size(filepath string) (int64, error) { - localPath := path.Join(b.dir, filepath) - L := b.log.WithField("key", filepath) + localPath := path.Join(b.Path, filepath) + L := b.Log.WithField("key", filepath) L.Debug("retrieving size") if _, ok := b.lru.Get(localPath); ok { @@ -173,7 +198,7 @@ func (b *OnDiskCache) PutFile(filepath string, in io.ReadCloser) error { } else { // tee upload data into our local file in = teeReadCloser(in, local, func() error { - return os.Remove(NameLockfile(path.Join(b.dir, filepath))) + return os.Remove(NameLockfile(path.Join(b.Path, filepath))) }) } @@ -185,28 +210,38 @@ func (b *OnDiskCache) Close() error { // We only purge the cache, leaving the filesystem untouched: // https://github.com/stellar/go/pull/4457#discussion_r929352643 b.lru.Purge() - return b.Storage.Close() + + // Close the underlying storage *before* removing the directory + closeErr := b.Storage.Close() + if b.Ephemeral { + // Only bubble up the disk purging error if there is no other error. + if err := os.RemoveAll(b.Path); err != nil && closeErr == nil { + closeErr = err + } + } + + return closeErr } // Evict removes a file from the cache and the filesystem, but does not affect // the upstream backend. It isn't part of the `Storage` interface. func (b *OnDiskCache) Evict(filepath string) { log.WithField("key", filepath).Debug("evicting file") - b.lru.Remove(path.Join(b.dir, filepath)) + b.lru.Remove(path.Join(b.Path, filepath)) } func (b *OnDiskCache) onEviction(key, value interface{}) { path := key.(string) os.Remove(NameLockfile(path)) // just in case if err := os.Remove(path); err != nil { // best effort removal - b.log.WithError(err). + b.Log.WithError(err). WithField("key", path). Warn("removal failed after cache eviction") } } func (b *OnDiskCache) createLocal(filepath string) (*os.File, error) { - localPath := path.Join(b.dir, filepath) + localPath := path.Join(b.Path, filepath) if err := os.MkdirAll(path.Dir(localPath), 0755 /* drwxr-xr-x */); err != nil { return nil, err } @@ -249,6 +284,8 @@ func teeReadCloser(r io.ReadCloser, w io.WriteCloser, onClose func() error) io.R err2 := w.Close() err3 := onClose() + fmt.Println("Errors were:", err1, err2, err3) + if err1 != nil { return err1 } else if err2 != nil { From 3ecd638877206c7b42717c78a5d9e382b51e50c7 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Wed, 17 Jan 2024 12:51:16 -0800 Subject: [PATCH 4/6] Update code to use new ArchiveInterface.Close() and OnDiskCacheConfig struct --- exp/lighthorizon/ingester/main.go | 5 ++++- exp/lighthorizon/tools/cache.go | 10 +++++++--- historyarchive/archive.go | 5 +++++ historyarchive/archive_pool.go | 12 ++++++++++++ historyarchive/mock_archive.go | 5 ++--- historyarchive/mocks.go | 5 +++++ ingest/checkpoint_change_reader.go | 1 + ingest/ledgerbackend/captive_core_backend.go | 15 +++++++++++---- services/horizon/internal/ingest/main.go | 7 +++++++ 9 files changed, 54 insertions(+), 11 deletions(-) diff --git a/exp/lighthorizon/ingester/main.go b/exp/lighthorizon/ingester/main.go index a93636c67a..fec588ff57 100644 --- a/exp/lighthorizon/ingester/main.go +++ b/exp/lighthorizon/ingester/main.go @@ -60,7 +60,10 @@ func NewIngester(config IngesterConfig) (Ingester, error) { } if parsed.Scheme != "file" { // otherwise, already on-disk - cache, errr := storage.MakeOnDiskCache(source, config.CacheDir, uint(config.CacheSize)) + cache, errr := storage.MakeOnDiskCache(source, storage.OnDiskCacheConfig{ + Path: config.CacheDir, + MaxFiles: uint(config.CacheSize), + }) if errr != nil { // non-fatal: warn but continue w/o cache log.WithField("path", config.CacheDir).WithError(errr). diff --git a/exp/lighthorizon/tools/cache.go b/exp/lighthorizon/tools/cache.go index 0290fcb164..c2f722bc67 100644 --- a/exp/lighthorizon/tools/cache.go +++ b/exp/lighthorizon/tools/cache.go @@ -3,7 +3,7 @@ package tools import ( "context" "fmt" - "io/ioutil" + "io" "os" "path/filepath" "strconv" @@ -136,7 +136,11 @@ func BuildCache(ledgerSource, cacheDir string, start uint32, count uint32, repai store, err := storage.ConnectBackend(ledgerSource, storage.ConnectOptions{ Context: ctx, Wrap: func(store storage.Storage) (storage.Storage, error) { - return storage.MakeOnDiskCache(store, cacheDir, uint(count)) + return storage.MakeOnDiskCache(store, storage.OnDiskCacheConfig{ + Path: cacheDir, + MaxFiles: uint(count), + Log: log, + }) }, }) if err != nil { @@ -230,7 +234,7 @@ func PurgeCache(cacheDir string) error { } func ShowCache(cacheDir string) error { - files, err := ioutil.ReadDir(filepath.Join(cacheDir, "ledgers")) + files, err := io.ReadDir(filepath.Join(cacheDir, "ledgers")) if err != nil { log.Errorf("Failed to read cache: %v", err) return err diff --git a/historyarchive/archive.go b/historyarchive/archive.go index dc4a56f83d..c57b23ea9b 100644 --- a/historyarchive/archive.go +++ b/historyarchive/archive.go @@ -74,6 +74,7 @@ type ArchiveInterface interface { GetXdrStreamForHash(hash Hash) (*XdrStream, error) GetXdrStream(pth string) (*XdrStream, error) GetCheckpointManager() CheckpointManager + Close() error } var _ ArchiveInterface = &Archive{} @@ -365,6 +366,10 @@ func (a *Archive) GetXdrStream(pth string) (*XdrStream, error) { return NewXdrGzStream(rdr) } +func (a *Archive) Close() error { + return a.backend.Close() +} + func Connect(u string, opts ArchiveOptions) (*Archive, error) { arch := Archive{ networkPassphrase: opts.NetworkPassphrase, diff --git a/historyarchive/archive_pool.go b/historyarchive/archive_pool.go index e4e24e0853..40f647a4a4 100644 --- a/historyarchive/archive_pool.go +++ b/historyarchive/archive_pool.go @@ -131,3 +131,15 @@ func (pa ArchivePool) GetXdrStream(pth string) (*XdrStream, error) { func (pa ArchivePool) GetCheckpointManager() CheckpointManager { return pa.GetAnyArchive().GetCheckpointManager() } + +func (pa ArchivePool) Close() error { + // Closes all archives, but returns the first error encountered (if any) + var anyErr error + for _, pool := range pa { + err := pool.Close() + if anyErr == nil { + anyErr = err + } + } + return anyErr +} diff --git a/historyarchive/mock_archive.go b/historyarchive/mock_archive.go index 9f8bab69de..27515dd375 100644 --- a/historyarchive/mock_archive.go +++ b/historyarchive/mock_archive.go @@ -8,7 +8,6 @@ import ( "bytes" "errors" "io" - "io/ioutil" "strings" "sync" @@ -46,13 +45,13 @@ func (b *MockArchiveBackend) GetFile(pth string) (io.ReadCloser, error) { if !ok { return nil, errors.New("no such file: " + pth) } - return ioutil.NopCloser(bytes.NewReader(buf)), nil + return io.NopCloser(bytes.NewReader(buf)), nil } func (b *MockArchiveBackend) PutFile(pth string, in io.ReadCloser) error { b.mutex.Lock() defer b.mutex.Unlock() - buf, e := ioutil.ReadAll(in) + buf, e := io.ReadAll(in) if e != nil { return e } diff --git a/historyarchive/mocks.go b/historyarchive/mocks.go index 3952211cd3..1e94615fb3 100644 --- a/historyarchive/mocks.go +++ b/historyarchive/mocks.go @@ -103,3 +103,8 @@ func (m *MockArchive) GetXdrStream(pth string) (*XdrStream, error) { a := m.Called(pth) return a.Get(0).(*XdrStream), a.Error(1) } + +func (m *MockArchive) Close() error { + a := m.Called() + return a.Error(0) +} diff --git a/ingest/checkpoint_change_reader.go b/ingest/checkpoint_change_reader.go index 98b576b487..a88f2c6bf1 100644 --- a/ingest/checkpoint_change_reader.go +++ b/ingest/checkpoint_change_reader.go @@ -563,6 +563,7 @@ func (r *CheckpointChangeReader) error(err error) readResult { func (r *CheckpointChangeReader) close() { close(r.done) + r.archive.Close() } // Progress returns progress reading all buckets in percents. diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index a8acb19182..9421b2b9ea 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -762,14 +762,21 @@ func (c *CaptiveStellarCore) Close() error { // after the CaptiveStellarCore context is canceled all subsequent calls to PrepareRange() will fail c.cancel() - // TODO: Sucks to ignore the error here, but no worse than it was before, - // so... + // TODO: Sucks to ignore the error here, but no worse than it was before, so... if c.ledgerHashStore != nil { c.ledgerHashStore.Close() } + var err error if c.stellarCoreRunner != nil { - return c.stellarCoreRunner.close() + err = c.stellarCoreRunner.close() } - return nil + + // Underlying archive may need to be closed, too. + archiveErr := c.archive.Close() + if err == nil { + err = archiveErr + } + + return err } diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index a653d1e97e..9e313afd1b 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -6,6 +6,7 @@ package ingest import ( "context" "fmt" + "os" "path" "runtime" "sync" @@ -243,6 +244,12 @@ func NewSystem(config Config) (System, error) { UserAgent: fmt.Sprintf("horizon/%s golang/%s", apkg.Version(), runtime.Version()), Wrap: func(upstream storage.Storage) (storage.Storage, error) { p := path.Join(config.CaptiveCoreStoragePath, "history-archive-cache") + + if _, existErr := os.Stat(p); existErr == nil || !os.IsNotExist(existErr) { + log.Warnf("History archive cache exists (%s): removing", p) + os.RemoveAll(p) + } + return storage.MakeOnDiskCache(upstream, storage.OnDiskCacheConfig{ Path: p, MaxFiles: 100, From 378eee65282a40c0067ff295657d64bc33d77ec5 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Wed, 17 Jan 2024 13:46:01 -0800 Subject: [PATCH 5/6] Fixup incorrect import --- exp/lighthorizon/tools/cache.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exp/lighthorizon/tools/cache.go b/exp/lighthorizon/tools/cache.go index c2f722bc67..01d1fcf35a 100644 --- a/exp/lighthorizon/tools/cache.go +++ b/exp/lighthorizon/tools/cache.go @@ -3,7 +3,7 @@ package tools import ( "context" "fmt" - "io" + "io/ioutil" "os" "path/filepath" "strconv" @@ -234,7 +234,7 @@ func PurgeCache(cacheDir string) error { } func ShowCache(cacheDir string) error { - files, err := io.ReadDir(filepath.Join(cacheDir, "ledgers")) + files, err := ioutil.ReadDir(filepath.Join(cacheDir, "ledgers")) if err != nil { log.Errorf("Failed to read cache: %v", err) return err From bd47c3087786c85331834a8cba5c3c436914233b Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Thu, 18 Jan 2024 10:34:15 -0800 Subject: [PATCH 6/6] Move caching layer to history archive itself --- historyarchive/archive_cache.go | 182 +++++++++++++++++++++++ services/horizon/internal/ingest/main.go | 20 +-- 2 files changed, 187 insertions(+), 15 deletions(-) create mode 100644 historyarchive/archive_cache.go diff --git a/historyarchive/archive_cache.go b/historyarchive/archive_cache.go new file mode 100644 index 0000000000..e469cc32fc --- /dev/null +++ b/historyarchive/archive_cache.go @@ -0,0 +1,182 @@ +package historyarchive + +import ( + "fmt" + "io" + "os" + "path" + + lru "github.com/hashicorp/golang-lru" + log "github.com/sirupsen/logrus" + "github.com/stellar/go/support/storage" +) + +type CacheOptions struct { + Cache bool + Path string + MaxFiles uint +} + +type ArchiveBucketCache struct { + path string + lru *lru.Cache + log *log.Entry +} + +func MakeArchiveBucketCache(path string, maxFiles uint) (*ArchiveBucketCache, error) { + log_ := log. + WithField("subservice", "fs-cache"). + WithField("path", path). + WithField("cap", 100) + + backend := &ArchiveBucketCache{ + path: path, + log: log_, + } + + cache, err := lru.NewWithEvict(int(maxFiles), backend.onEviction) + if err != nil { + return &ArchiveBucketCache{}, err + } + + backend.lru = cache + return backend, nil +} + +// GetFile retrieves the file contents from the local cache if present. +// Otherwise, it returns the same result that the wrapped backend returns and +// adds that result into the local cache, if possible. +func (abc *ArchiveBucketCache) GetFile( + filepath string, + upstream storage.Storage, +) (io.ReadCloser, error) { + L := abc.log.WithField("key", filepath) + localPath := path.Join(abc.path, filepath) + + // If the lockfile exists, we should defer to the remote source but *not* + // update the cache, as it means there's an in-progress sync of the same + // file. + _, statErr := os.Stat(NameLockfile(localPath)) + if statErr == nil { + L.Info("Incomplete file in on-disk cache: deferring") + return nil, os.ErrExist + } else if _, ok := abc.lru.Get(localPath); !ok { + L.Info("File does not exist in the cache: downloading") + + // Since it's not on-disk, pull it from the remote backend, shove it + // into the cache, and write it to disk. + remote, err := upstream.GetFile(filepath) + if err != nil { + return remote, err + } + + local, err := abc.createLocal(filepath) + if err != nil { + // If there's some local FS error, we can still continue with the + // remote version, so just log it and continue. + L.WithError(err).Error("Caching ledger failed") + return remote, nil + } + + return teeReadCloser(remote, local, func() error { + return os.Remove(NameLockfile(localPath)) + }), nil + } + + L.Info("Found file in cache") + // The cache claims it exists, so just give it a read and send it. + local, err := os.Open(localPath) + if err != nil { + // Uh-oh, the cache and the disk are not in sync somehow? Let's evict + // this value and try again (recurse) w/ the remote version. + L.WithError(err).Warn("Opening cached ledger failed") + abc.lru.Remove(localPath) + return abc.GetFile(filepath, upstream) + } + + return local, nil +} + +// Close purges the cache, then forwards the call to the wrapped backend. +func (abc *ArchiveBucketCache) Close() error { + // We only purge the cache, leaving the filesystem untouched: + // https://github.com/stellar/go/pull/4457#discussion_r929352643 + abc.lru.Purge() + + // Only bubble up the disk purging error if there is no other error. + return os.RemoveAll(abc.path) +} + +// Evict removes a file from the cache and the filesystem, but does not affect +// the upstream backend. It isn't part of the `Storage` interface. +func (abc *ArchiveBucketCache) Evict(filepath string) { + log.WithField("key", filepath).Info("evicting file") + abc.lru.Remove(path.Join(abc.path, filepath)) +} + +func (abc *ArchiveBucketCache) onEviction(key, value interface{}) { + path := key.(string) + os.Remove(NameLockfile(path)) // just in case + if err := os.Remove(path); err != nil { // best effort removal + abc.log.WithError(err). + WithField("key", path). + Warn("removal failed after cache eviction") + } +} + +func (abc *ArchiveBucketCache) createLocal(filepath string) (*os.File, error) { + localPath := path.Join(abc.path, filepath) + if err := os.MkdirAll(path.Dir(localPath), 0755 /* drwxr-xr-x */); err != nil { + return nil, err + } + + local, err := os.Create(localPath) /* mode -rw-rw-rw- */ + if err != nil { + return nil, err + } + _, err = os.Create(NameLockfile(localPath)) + if err != nil { + return nil, err + } + + abc.lru.Add(localPath, struct{}{}) // just use the cache as an array + return local, nil +} + +func NameLockfile(file string) string { + return file + ".lock" +} + +// The below is a helper interface so that we can use io.TeeReader to write +// data locally immediately as we read it remotely. + +type trc struct { + io.Reader + close func() error +} + +func (t trc) Close() error { + return t.close() +} + +func teeReadCloser(r io.ReadCloser, w io.WriteCloser, onClose func() error) io.ReadCloser { + fmt.Printf("Making teeReadCloser onto %v and %v\n", r, w) + return trc{ + Reader: io.TeeReader(r, w), + close: func() error { + // Always run all closers, but return the first error + err1 := r.Close() + err2 := w.Close() + err3 := onClose() + + fmt.Println("Errors were:", err1, err2, err3) + + if err1 != nil { + return err1 + } else if err2 != nil { + return err2 + } + return err3 + }, + } +} diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 9e313afd1b..ed7c4a5a54 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -6,7 +6,6 @@ package ingest import ( "context" "fmt" - "os" "path" "runtime" "sync" @@ -242,20 +241,11 @@ func NewSystem(config Config) (System, error) { ConnectOptions: storage.ConnectOptions{ Context: ctx, UserAgent: fmt.Sprintf("horizon/%s golang/%s", apkg.Version(), runtime.Version()), - Wrap: func(upstream storage.Storage) (storage.Storage, error) { - p := path.Join(config.CaptiveCoreStoragePath, "history-archive-cache") - - if _, existErr := os.Stat(p); existErr == nil || !os.IsNotExist(existErr) { - log.Warnf("History archive cache exists (%s): removing", p) - os.RemoveAll(p) - } - - return storage.MakeOnDiskCache(upstream, storage.OnDiskCacheConfig{ - Path: p, - MaxFiles: 100, - Ephemeral: true, - }) - }, + }, + CacheConfig: historyarchive.CacheOptions{ + Cache: true, + Path: path.Join(config.CaptiveCoreStoragePath, "bucket-cache"), + MaxFiles: 100, }, }, )