Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add post machine endpoint and get machine data access #962

Merged
merged 5 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions backend/pkg/api/data_access/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/gobitfly/beaconchain/pkg/api/enums"
"github.com/gobitfly/beaconchain/pkg/api/types"
t "github.com/gobitfly/beaconchain/pkg/api/types"
commontypes "github.com/gobitfly/beaconchain/pkg/commons/types"
"github.com/gobitfly/beaconchain/pkg/userservice"
"github.com/shopspring/decimal"
)
Expand Down Expand Up @@ -752,23 +753,27 @@ func (d *DummyService) GetValidatorDashboardMobileWidget(ctx context.Context, da
return getDummyStruct[t.MobileWidgetData]()
}

func (d *DummyService) GetUserMachineMetrics(ctx context.Context, userID uint64, limit uint64, offset uint64) (*types.MachineMetricsData, error) {
func (d *DummyService) GetUserMachineMetrics(ctx context.Context, userID uint64, limit int, offset int) (*types.MachineMetricsData, error) {
data, err := getDummyStruct[types.MachineMetricsData]()
if err != nil {
return nil, err
}
data.SystemMetrics = slices.SortedFunc(slices.Values(data.SystemMetrics), func(i, j *t.MachineMetricSystem) int {
data.SystemMetrics = slices.SortedFunc(slices.Values(data.SystemMetrics), func(i, j *commontypes.MachineMetricSystem) int {
return int(i.Timestamp) - int(j.Timestamp)
})
data.ValidatorMetrics = slices.SortedFunc(slices.Values(data.ValidatorMetrics), func(i, j *t.MachineMetricValidator) int {
data.ValidatorMetrics = slices.SortedFunc(slices.Values(data.ValidatorMetrics), func(i, j *commontypes.MachineMetricValidator) int {
return int(i.Timestamp) - int(j.Timestamp)
})
data.NodeMetrics = slices.SortedFunc(slices.Values(data.NodeMetrics), func(i, j *t.MachineMetricNode) int {
data.NodeMetrics = slices.SortedFunc(slices.Values(data.NodeMetrics), func(i, j *commontypes.MachineMetricNode) int {
return int(i.Timestamp) - int(j.Timestamp)
})
return data, nil
}

func (d *DummyService) PostUserMachineMetrics(ctx context.Context, userID uint64, machine, process string, data []byte) error {
return nil
}

func (d *DummyService) GetValidatorDashboardMobileValidators(ctx context.Context, dashboardId t.VDBId, period enums.TimePeriod, cursor string, colSort t.Sort[enums.VDBMobileValidatorsColumn], search string, limit uint64) ([]t.MobileValidatorDashboardValidatorsTableRow, *t.Paging, error) {
return getDummyWithPaging[t.MobileValidatorDashboardValidatorsTableRow]()
}
51 changes: 47 additions & 4 deletions backend/pkg/api/data_access/machine_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,57 @@ package dataaccess

import (
"context"
"strings"

"github.com/gobitfly/beaconchain/pkg/api/types"
apiTypes "github.com/gobitfly/beaconchain/pkg/api/types"
"github.com/gobitfly/beaconchain/pkg/commons/db"
"github.com/gobitfly/beaconchain/pkg/commons/types"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)

type MachineRepository interface {
GetUserMachineMetrics(context context.Context, userID uint64, limit uint64, offset uint64) (*types.MachineMetricsData, error)
GetUserMachineMetrics(context context.Context, userID uint64, limit int, offset int) (*apiTypes.MachineMetricsData, error)
PostUserMachineMetrics(context context.Context, userID uint64, machine, process string, data []byte) error
}

func (d *DataAccessService) GetUserMachineMetrics(ctx context.Context, userID uint64, limit uint64, offset uint64) (*types.MachineMetricsData, error) {
return d.dummy.GetUserMachineMetrics(ctx, userID, limit, offset)
func (d *DataAccessService) GetUserMachineMetrics(ctx context.Context, userID uint64, limit int, offset int) (*apiTypes.MachineMetricsData, error) {
data := &apiTypes.MachineMetricsData{}

g := errgroup.Group{}

g.Go(func() error {
var err error
data.SystemMetrics, err = d.bigtable.GetMachineMetricsSystem(types.UserId(userID), limit, offset)
return err
})

g.Go(func() error {
var err error
data.ValidatorMetrics, err = d.bigtable.GetMachineMetricsValidator(types.UserId(userID), limit, offset)
return err
})

g.Go(func() error {
var err error
data.NodeMetrics, err = d.bigtable.GetMachineMetricsNode(types.UserId(userID), limit, offset)
return err
})

if err := g.Wait(); err != nil {
return nil, errors.Wrap(err, "could not get stats")
}

return data, nil
}

func (d *DataAccessService) PostUserMachineMetrics(ctx context.Context, userID uint64, machine, process string, data []byte) error {
err := db.BigtableClient.SaveMachineMetric(process, types.UserId(userID), machine, data)
if err != nil {
if strings.HasPrefix(err.Error(), "rate limit") {
return err
}
return errors.Wrap(err, "could not save stats")
}
return nil
}
184 changes: 183 additions & 1 deletion backend/pkg/api/handlers/machine_metrics.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
package handlers

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"

"github.com/gobitfly/beaconchain/pkg/api/types"

"github.com/gobitfly/beaconchain/pkg/commons/db"
commontypes "github.com/gobitfly/beaconchain/pkg/commons/types"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"

"google.golang.org/protobuf/proto"
)

func (h *HandlerService) InternalGetUserMachineMetrics(w http.ResponseWriter, r *http.Request) {
Expand All @@ -17,14 +29,29 @@ func (h *HandlerService) PublicGetUserMachineMetrics(w http.ResponseWriter, r *h
handleErr(w, r, err)
return
}

userInfo, err := h.dai.GetUserInfo(r.Context(), userId)
if err != nil {
handleErr(w, r, err)
return
}

q := r.URL.Query()
offset := v.checkUint(q.Get("offset"), "offset")
limit := uint64(180)
if limitParam := q.Get("limit"); limitParam != "" {
limit = v.checkUint(limitParam, "limit")
}

data, err := h.dai.GetUserMachineMetrics(r.Context(), userId, limit, offset)
// validate limit and offset according to user's premium perks
maxDataPoints := userInfo.PremiumPerks.MachineMonitoringHistorySeconds / 60 // one entry per minute
timeframe := offset + limit
if timeframe > maxDataPoints {
limit = maxDataPoints
offset = 0
}

data, err := h.dai.GetUserMachineMetrics(r.Context(), userId, int(limit), int(offset))
if err != nil {
handleErr(w, r, err)
return
Expand All @@ -35,3 +62,158 @@ func (h *HandlerService) PublicGetUserMachineMetrics(w http.ResponseWriter, r *h

returnOk(w, r, response)
}

func (h *HandlerService) LegacyPostUserMachineMetrics(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
apiKey := q.Get("apikey")
machine := q.Get("machine")

if apiKey == "" {
apiKey = r.Header.Get("apikey")
}

if !h.isPostMachineMetricsEnabled {
returnError(w, r, http.StatusServiceUnavailable, fmt.Errorf("machine metrics pushing is temporarily disabled"))
return
}

userID, err := h.dai.GetUserIdByApiKey(r.Context(), apiKey)
if err != nil {
returnBadRequest(w, r, fmt.Errorf("no user found with api key"))
return
}

userInfo, err := h.dai.GetUserInfo(r.Context(), userID)
if err != nil {
handleErr(w, r, err)
return
}

if contentType := r.Header.Get("Content-Type"); !reJsonContentType.MatchString(contentType) {
returnBadRequest(w, r, fmt.Errorf("invalid content type, expected application/json"))
return
}

body, err := io.ReadAll(r.Body)
if err != nil {
returnBadRequest(w, r, fmt.Errorf("could not read request body"))
return
}

var jsonObjects []map[string]interface{}
err = json.Unmarshal(body, &jsonObjects)
if err != nil {
var jsonObject map[string]interface{}
err = json.Unmarshal(body, &jsonObject)
if err != nil {
returnBadRequest(w, r, errors.Wrap(err, "Invalid JSON format in request body"))
return
}
jsonObjects = []map[string]interface{}{jsonObject}
}

if len(jsonObjects) >= 10 {
returnBadRequest(w, r, fmt.Errorf("Max number of stat entries are 10"))
return
}

var rateLimitErrs = 0
for i := 0; i < len(jsonObjects); i++ {
err := h.internal_processMachine(r.Context(), machine, &jsonObjects[i], userInfo)
if err != nil {
if strings.HasPrefix(err.Error(), "rate limit") {
rateLimitErrs++
continue
}
handleErr(w, r, err)
return
}
}

if rateLimitErrs >= len(jsonObjects) {
returnTooManyRequests(w, r, fmt.Errorf("too many metric requests, max allowed is 1 per user per machine per process"))
return
}

returnOk(w, r, nil)
}

func (h *HandlerService) internal_processMachine(context context.Context, machine string, obj *map[string]interface{}, userInfo *types.UserInfo) error {
var parsedMeta *commontypes.StatsMeta
err := mapstructure.Decode(obj, &parsedMeta)
if err != nil {
return fmt.Errorf("%w: %w", errBadRequest, err)
}

parsedMeta.Machine = machine

if parsedMeta.Version > 2 || parsedMeta.Version <= 0 {
return newBadRequestErr("unsupported data format version")
}

if parsedMeta.Process != "validator" && parsedMeta.Process != "beaconnode" && parsedMeta.Process != "slasher" && parsedMeta.Process != "system" {
return newBadRequestErr("unknown process")
}

maxNodes := userInfo.PremiumPerks.MonitorMachines

count, err := db.BigtableClient.GetMachineMetricsMachineCount(commontypes.UserId(userInfo.Id))
if err != nil {
return errors.Wrap(err, "could not get machine count")
}

if count > maxNodes {
return newForbiddenErr("user has reached max machine count")
}

// protobuf encode
var data []byte
if parsedMeta.Process == "system" {
var parsedResponse *commontypes.MachineMetricSystem
err = DecodeMapStructure(obj, &parsedResponse)
if err != nil {
return fmt.Errorf("%w: %w could not parse stats (system stats)", errBadRequest, err)
}
data, err = proto.Marshal(parsedResponse)
if err != nil {
return errors.Wrap(err, "could not parse stats (system stats)")
}
} else if parsedMeta.Process == "validator" {
var parsedResponse *commontypes.MachineMetricValidator
err = DecodeMapStructure(obj, &parsedResponse)
if err != nil {
return fmt.Errorf("%w: %w could not parse stats (validator stats)", errBadRequest, err)
}
data, err = proto.Marshal(parsedResponse)
if err != nil {
return errors.Wrap(err, "could not parse stats (validator stats)")
}
} else if parsedMeta.Process == "beaconnode" {
var parsedResponse *commontypes.MachineMetricNode
err = DecodeMapStructure(obj, &parsedResponse)
if err != nil {
return fmt.Errorf("%w: %w could not parse stats (beaconnode stats)", errBadRequest, err)
}
data, err = proto.Marshal(parsedResponse)
if err != nil {
return errors.Wrap(err, "could not parse stats (beaconnode stats)")
}
}

return h.dai.PostUserMachineMetrics(context, userInfo.Id, machine, parsedMeta.Process, data)
}

func DecodeMapStructure(input interface{}, output interface{}) error {
config := &mapstructure.DecoderConfig{
Metadata: nil,
Result: output,
TagName: "json",
}

decoder, err := mapstructure.NewDecoder(config)
if err != nil {
return err
}

return decoder.Decode(input)
}
7 changes: 7 additions & 0 deletions backend/pkg/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func NewApiRouter(dataAccessor dataaccess.DataAccessor, dummy dataaccess.DataAcc
router := mux.NewRouter()
apiRouter := router.PathPrefix("/api").Subrouter()
publicRouter := apiRouter.PathPrefix("/v2").Subrouter()
legacyRouter := apiRouter.PathPrefix("/v1").Subrouter()
internalRouter := apiRouter.PathPrefix("/i").Subrouter()
sessionManager := newSessionManager(cfg)
internalRouter.Use(sessionManager.LoadAndSave)
Expand All @@ -39,6 +40,7 @@ func NewApiRouter(dataAccessor dataaccess.DataAccessor, dummy dataaccess.DataAcc
internalRouter.Use(handlerService.StoreUserIdBySessionMiddleware)

addRoutes(handlerService, publicRouter, internalRouter, cfg)
addLegacyRoutes(handlerService, legacyRouter)

// serve static files
publicRouter.PathPrefix("/docs/").Handler(http.StripPrefix("/api/v2/docs/", http.FileServer(http.FS(docs.Files))))
Expand Down Expand Up @@ -243,6 +245,11 @@ func addRoutes(hs *handlers.HandlerService, publicRouter, internalRouter *mux.Ro
addEndpointsToRouters(endpoints, publicRouter, internalRouter)
}

// Legacy routes are available behind the /v1 prefix and guarantee backwards compatibility with the old API
func addLegacyRoutes(hs *handlers.HandlerService, publicRouter *mux.Router) {
publicRouter.HandleFunc("/client/metrics", hs.LegacyPostUserMachineMetrics).Methods(http.MethodPost, http.MethodOptions)
}

func addValidatorDashboardRoutes(hs *handlers.HandlerService, publicRouter, internalRouter *mux.Router, cfg *types.Config) {
vdbPath := "/validator-dashboards"
publicRouter.HandleFunc(vdbPath, hs.PublicPostValidatorDashboards).Methods(http.MethodPost, http.MethodOptions)
Expand Down
Loading
Loading