Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http archive requests include user agent and metrics #5166

Merged
merged 6 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions historyarchive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"

log "github.com/sirupsen/logrus"

Expand Down Expand Up @@ -59,6 +60,51 @@ type Ledger struct {
TransactionResult xdr.TransactionHistoryResultEntry
}

// golang will auto wrap them back to 0 if they overflow after addition.
type archiveStats struct {
requests atomic.Uint32
fileDownloads atomic.Uint32
fileUploads atomic.Uint32
backendName string
}

type ArchiveStats interface {
GetRequests() uint32
GetDownloads() uint32
GetUploads() uint32
GetBackendName() string
}

func (as *archiveStats) incrementDownloads() {
as.fileDownloads.Add(1)
as.incrementRequests()
}

func (as *archiveStats) incrementUploads() {
as.fileUploads.Add(1)
as.incrementRequests()
}

func (as *archiveStats) incrementRequests() {
as.requests.Add(1)
}

func (as *archiveStats) GetRequests() uint32 {
return as.requests.Load()
}

func (as *archiveStats) GetDownloads() uint32 {
return as.fileDownloads.Load()
}

func (as *archiveStats) GetUploads() uint32 {
return as.fileUploads.Load()
}

func (as *archiveStats) GetBackendName() string {
return as.backendName
}

type ArchiveBackend interface {
Exists(path string) (bool, error)
Size(path string) (int64, error)
Expand Down Expand Up @@ -87,6 +133,7 @@ type ArchiveInterface interface {
GetXdrStreamForHash(hash Hash) (*XdrStream, error)
GetXdrStream(pth string) (*XdrStream, error)
GetCheckpointManager() CheckpointManager
GetStats() []ArchiveStats
}

var _ ArchiveInterface = &Archive{}
Expand Down Expand Up @@ -115,6 +162,11 @@ type Archive struct {
checkpointManager CheckpointManager

backend ArchiveBackend
stats archiveStats
}

func (arch *Archive) GetStats() []ArchiveStats {
return []ArchiveStats{&arch.stats}
}

func (arch *Archive) GetCheckpointManager() CheckpointManager {
Expand All @@ -124,6 +176,7 @@ func (arch *Archive) GetCheckpointManager() CheckpointManager {
func (a *Archive) GetPathHAS(path string) (HistoryArchiveState, error) {
var has HistoryArchiveState
rdr, err := a.backend.GetFile(path)
a.stats.incrementDownloads()
if err != nil {
return has, err
}
Expand All @@ -150,6 +203,7 @@ func (a *Archive) GetPathHAS(path string) (HistoryArchiveState, error) {

func (a *Archive) PutPathHAS(path string, has HistoryArchiveState, opts *CommandOptions) error {
exists, err := a.backend.Exists(path)
a.stats.incrementRequests()
if err != nil {
return err
}
Expand All @@ -161,19 +215,23 @@ func (a *Archive) PutPathHAS(path string, has HistoryArchiveState, opts *Command
if err != nil {
return err
}
a.stats.incrementUploads()
return a.backend.PutFile(path,
ioutil.NopCloser(bytes.NewReader(buf)))
}

func (a *Archive) BucketExists(bucket Hash) (bool, error) {
a.stats.incrementRequests()
return a.backend.Exists(BucketPath(bucket))
}

func (a *Archive) BucketSize(bucket Hash) (int64, error) {
a.stats.incrementRequests()
return a.backend.Size(BucketPath(bucket))
}

func (a *Archive) CategoryCheckpointExists(cat string, chk uint32) (bool, error) {
a.stats.incrementRequests()
return a.backend.Exists(CategoryCheckpointPath(cat, chk))
}

Expand Down Expand Up @@ -306,14 +364,17 @@ func (a *Archive) PutRootHAS(has HistoryArchiveState, opts *CommandOptions) erro
}

func (a *Archive) ListBucket(dp DirPrefix) (chan string, chan error) {
a.stats.incrementRequests()
return a.backend.ListFiles(path.Join("bucket", dp.Path()))
}

func (a *Archive) ListAllBuckets() (chan string, chan error) {
a.stats.incrementRequests()
return a.backend.ListFiles("bucket")
}

func (a *Archive) ListAllBucketHashes() (chan Hash, chan error) {
a.stats.incrementRequests()
sch, errs := a.backend.ListFiles("bucket")
ch := make(chan Hash)
rx := regexp.MustCompile("bucket" + hexPrefixPat + "bucket-([0-9a-f]{64})\\.xdr\\.gz$")
Expand All @@ -335,6 +396,7 @@ func (a *Archive) ListCategoryCheckpoints(cat string, pth string) (chan uint32,
rx := regexp.MustCompile(cat + hexPrefixPat + cat +
"-([0-9a-f]{8})\\." + regexp.QuoteMeta(ext) + "$")
sch, errs := a.backend.ListFiles(path.Join(cat, pth))
a.stats.incrementRequests()
ch := make(chan uint32)
errs = makeErrorPump(errs)

Expand Down Expand Up @@ -372,6 +434,7 @@ func (a *Archive) GetXdrStream(pth string) (*XdrStream, error) {
return nil, errors.New("File has non-.xdr.gz suffix: " + pth)
}
rdr, err := a.backend.GetFile(pth)
a.stats.incrementDownloads()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -426,6 +489,9 @@ func Connect(u string, opts ConnectOptions) (*Archive, error) {
} else {
err = errors.New("unknown URL scheme: '" + parsed.Scheme + "'")
}

arch.stats = archiveStats{backendName: parsed.String()}

return &arch, err
}

Expand Down
13 changes: 12 additions & 1 deletion historyarchive/archive_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func NewArchivePool(archiveURLs []string, config ConnectOptions) (ArchivePool, e
NetworkPassphrase: config.NetworkPassphrase,
CheckpointFrequency: config.CheckpointFrequency,
Context: config.Context,
UserAgent: config.UserAgent,
sreuland marked this conversation as resolved.
Show resolved Hide resolved
},
)

Expand All @@ -55,8 +56,18 @@ func NewArchivePool(archiveURLs []string, config ConnectOptions) (ArchivePool, e
return validArchives, nil
}

func (pa ArchivePool) GetStats() []ArchiveStats {
stats := []ArchiveStats{}
for _, archive := range pa {
if len(archive.GetStats()) == 1 {
stats = append(stats, archive.GetStats()[0])
}
}
return stats
}

// Ensure the pool conforms to the ArchiveInterface
var _ ArchiveInterface = ArchivePool{}
var _ ArchiveInterface = &ArchivePool{}

// Below are the ArchiveInterface method implementations.

Expand Down
25 changes: 25 additions & 0 deletions historyarchive/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"io"
"io/ioutil"
"math/big"
"net/http"
"net/http/httptest"
"os"
"strings"
"testing"
Expand Down Expand Up @@ -176,6 +178,25 @@ func TestScan(t *testing.T) {
GetRandomPopulatedArchive().Scan(opts)
}

func TestConfiguresHttpUserAgent(t *testing.T) {
var userAgent string
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
userAgent = r.Header["User-Agent"][0]
w.WriteHeader(http.StatusOK)
}))
defer server.Close()

archive, err := Connect(server.URL, ConnectOptions{
UserAgent: "uatest",
})
assert.NoError(t, err)

ok, err := archive.BucketExists(EmptyXdrArrayHash())
assert.True(t, ok)
assert.NoError(t, err)
assert.Equal(t, userAgent, "uatest")
}

func TestScanSize(t *testing.T) {
defer cleanup()
opts := testOptions()
Expand Down Expand Up @@ -523,6 +544,8 @@ func assertXdrEquals(t *testing.T, a, b xdrEntry) {
func TestGetLedgers(t *testing.T) {
archive := GetTestMockArchive()
_, err := archive.GetLedgers(1000, 1002)
assert.Equal(t, uint32(1), archive.GetStats()[0].GetRequests())
assert.Equal(t, uint32(0), archive.GetStats()[0].GetDownloads())
assert.EqualError(t, err, "checkpoint 1023 is not published")

ledgerHeaders := []xdr.LedgerHeaderHistoryEntry{
Expand Down Expand Up @@ -610,6 +633,8 @@ func TestGetLedgers(t *testing.T) {
ledgers, err := archive.GetLedgers(1000, 1002)
assert.NoError(t, err)
assert.Len(t, ledgers, 3)
assert.Equal(t, uint32(7), archive.GetStats()[0].GetRequests()) // it started at 1, incurred 6 requests total, 3 queries, 3 downloads
assert.Equal(t, uint32(3), archive.GetStats()[0].GetDownloads()) // started 0, incurred 3 file downloads
for i, seq := range []uint32{1000, 1001, 1002} {
ledger := ledgers[seq]
assertXdrEquals(t, ledgerHeaders[i], ledger.Header)
Expand Down
29 changes: 29 additions & 0 deletions historyarchive/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,32 @@ func (m *MockArchive) GetXdrStream(pth string) (*XdrStream, error) {
a := m.Called(pth)
return a.Get(0).(*XdrStream), a.Error(1)
}

func (m *MockArchive) GetStats() []ArchiveStats {
a := m.Called()
return a.Get(0).([]ArchiveStats)
}

type MockArchiveStats struct {
mock.Mock
}

func (m *MockArchiveStats) GetRequests() uint32 {
a := m.Called()
return a.Get(0).(uint32)
}

func (m *MockArchiveStats) GetDownloads() uint32 {
a := m.Called()
return a.Get(0).(uint32)
}

func (m *MockArchiveStats) GetUploads() uint32 {
a := m.Called()
return a.Get(0).(uint32)
}

func (m *MockArchiveStats) GetBackendName() string {
a := m.Called()
return a.Get(0).(string)
}
4 changes: 3 additions & 1 deletion services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).

## Unreleased

### Added
### Fixed
- http archive requests include user agent and metrics ([5166](https://github.com/stellar/go/pull/5166))

### Added
- Add a deprecation warning for using command-line flags when running Horizon ([5051](https://github.com/stellar/go/pull/5051))

### Breaking Changes
Expand Down
28 changes: 28 additions & 0 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/prometheus/client_golang/prometheus"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/errors"
Expand Down Expand Up @@ -523,6 +524,13 @@ func (r resumeState) run(s *system) (transition, error) {
r.addLedgerStatsMetricFromMap(s, "trades", tradeStatsMap)
r.addProcessorDurationsMetricFromMap(s, stats.transactionDurations)

// since a single system instance is shared throughout all states,
// this will sweep up increments to history archive counters
// done elsewhere such as verifyState invocations since the same system
// instance is passed there and the additional usages of archives will just
// roll up and be reported here as part of resumeState transition
addHistoryArchiveStatsMetrics(s, s.historyAdapter.GetStats())

localLog := log.WithFields(logpkg.F{
"sequence": ingestLedger,
"duration": duration,
Expand Down Expand Up @@ -565,6 +573,26 @@ func (r resumeState) addProcessorDurationsMetricFromMap(s *system, m map[string]
}
}

func addHistoryArchiveStatsMetrics(s *system, stats []historyarchive.ArchiveStats) {
for _, historyServerStat := range stats {
s.Metrics().HistoryArchiveStatsCounter.
With(prometheus.Labels{
"source": historyServerStat.GetBackendName(),
"type": "file_downloads"}).
Add(float64(historyServerStat.GetDownloads()))
s.Metrics().HistoryArchiveStatsCounter.
With(prometheus.Labels{
"source": historyServerStat.GetBackendName(),
"type": "file_uploads"}).
Add(float64(historyServerStat.GetUploads()))
s.Metrics().HistoryArchiveStatsCounter.
With(prometheus.Labels{
"source": historyServerStat.GetBackendName(),
"type": "requests"}).
Add(float64(historyServerStat.GetRequests()))
}
}

type waitForCheckpointState struct{}

func (waitForCheckpointState) String() string {
Expand Down
5 changes: 5 additions & 0 deletions services/horizon/internal/ingest/history_archive_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type historyArchiveAdapterInterface interface {
GetLatestLedgerSequence() (uint32, error)
BucketListHash(sequence uint32) (xdr.Hash, error)
GetState(ctx context.Context, sequence uint32) (ingest.ChangeReader, error)
GetStats() []historyarchive.ArchiveStats
}

// newHistoryArchiveAdapter is a constructor to make a historyArchiveAdapter
Expand Down Expand Up @@ -71,3 +72,7 @@ func (haa *historyArchiveAdapter) GetState(ctx context.Context, sequence uint32)

return sr, nil
}

func (haa *historyArchiveAdapter) GetStats() []historyarchive.ArchiveStats {
return haa.archive.GetStats()
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func (m *mockHistoryArchiveAdapter) GetState(ctx context.Context, sequence uint3
return args.Get(0).(ingest.ChangeReader), args.Error(1)
}

func (m *mockHistoryArchiveAdapter) GetStats() []historyarchive.ArchiveStats {
a := m.Called()
return a.Get(0).([]historyarchive.ArchiveStats)
}

func TestGetState_Read(t *testing.T) {
archive, e := getTestArchive()
if !assert.NoError(t, e) {
Expand Down
12 changes: 12 additions & 0 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ type Metrics struct {

// ProcessorsRunDurationSummary exposes processors run durations.
ProcessorsRunDurationSummary *prometheus.SummaryVec

// ArchiveRequestCounter counts how many http requests are sent to history server
HistoryArchiveStatsCounter *prometheus.CounterVec
}

type System interface {
Expand Down Expand Up @@ -390,6 +393,14 @@ func (s *system) initMetrics() {
},
[]string{"name"},
)

s.metrics.HistoryArchiveStatsCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "horizon", Subsystem: "ingest", Name: "history_archive_stats_total",
Help: "counters of different history archive stats",
},
[]string{"source", "type"},
)
}

func (s *system) GetCurrentState() State {
Expand All @@ -415,6 +426,7 @@ func (s *system) RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(s.metrics.ProcessorsRunDuration)
registry.MustRegister(s.metrics.ProcessorsRunDurationSummary)
registry.MustRegister(s.metrics.StateVerifyLedgerEntriesCount)
registry.MustRegister(s.metrics.HistoryArchiveStatsCounter)
s.ledgerBackend = ledgerbackend.WithMetrics(s.ledgerBackend, registry, "horizon")
}

Expand Down
Loading
Loading