Skip to content

Commit

Permalink
improve logging, bump version
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Jan 29, 2025
1 parent b7e1e86 commit 1e6ea8b
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 110 deletions.
3 changes: 2 additions & 1 deletion docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
## v1.12.3

### Server

* Improve noop-mode: will now only send one signal per bundle, without any data
* Improve logging

### Client

Expand Down
25 changes: 22 additions & 3 deletions metrics/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ type Stats struct {

blockRate *dmetrics.AvgRateCounter

startTime time.Time
stages []*pbsubstreamsrpc.Stage
initDuration time.Duration
startTime time.Time
stages []*pbsubstreamsrpc.Stage
initDuration time.Duration
timeToFirstData time.Duration

// moduleStats only contain stats from local execution
modulesStats map[string]*extendedStats
Expand All @@ -37,6 +38,7 @@ type Stats struct {
// counter is used to get the next jobIdx
counter uint64

error error
logger *zap.Logger
}

Expand Down Expand Up @@ -144,6 +146,10 @@ func NewReqStats(config *Config, logger *zap.Logger) *Stats {
}
}

func (s *Stats) SetError(err error) {
s.error = err
}

type extendedStats struct {
*pbssinternal.ModuleStats
merging bool
Expand Down Expand Up @@ -209,6 +215,13 @@ func (s *Stats) RecordInitializationComplete() {
s.initDuration = time.Since(s.startTime)
}

func (s *Stats) RecordDataSent() {
// this is always sent linearly, no need to lock
if s.timeToFirstData == 0 {
s.timeToFirstData = time.Since(s.startTime)
}
}

func (s *Stats) RecordStages(stages []*pbsubstreamsrpc.Stage) {
s.Lock()
defer s.Unlock()
Expand Down Expand Up @@ -601,6 +614,10 @@ func (s *Stats) getZapFields() []zap.Field {
if s.config.Tier2 {
tier = "tier2"
}
errorText := ""
if s.error != nil {
errorText = s.error.Error()
}

out := []zap.Field{
zap.String("user_id", s.config.UserID),
Expand All @@ -614,6 +631,8 @@ func (s *Stats) getZapFields() []zap.Field {
zap.Duration("parallel_duration", s.initDuration),
zap.Duration("module_exec_duration", s.moduleExecDuration()),
zap.Duration("module_wasm_ext_duration", s.moduleWasmExtDuration()),
zap.Duration("time_to_first_data", s.timeToFirstData),
zap.String("error", errorText),
}

return out
Expand Down
2 changes: 1 addition & 1 deletion orchestrator/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (s *Scheduler) cmdShutdownWhenComplete() loop.Cmd {
return nil
}
}
s.logger.Info("scheduler: stores and cached_outputs stream completed, switching to live", fields...)
s.logger.Info("scheduler: stores and cached_outputs stream completed", fields...)
return func() loop.Msg {
err := s.Stages.WaitAsyncWork()
return loop.Quit(err)()
Expand Down
8 changes: 0 additions & 8 deletions pipeline/terminate.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,6 @@ func (p *Pipeline) OnStreamTerminated(ctx context.Context, err error) error {
return err
}

logger.Info("stream of blocks ended",
zap.Uint64("stop_block_num", reqDetails.StopBlockNum),
zap.Bool("eof", errors.Is(err, io.EOF)),
zap.Bool("stop_block_reached", errors.Is(err, stream.ErrStopBlockReached)),
//zap.Uint64("total_bytes_written", bytesMeter.BytesWritten()),
//zap.Uint64("total_bytes_read", bytesMeter.BytesRead()),
)

// TODO(abourget): check, in the tier1, there might not be a `lastFinalClock`
// if we just didn't run the `streamFactoryFunc`
if err := p.execOutputCache.EndOfStream(p.lastFinalClock); err != nil {
Expand Down
149 changes: 91 additions & 58 deletions service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,60 +228,37 @@ func (s *Tier1Service) Blocks(
ctx, span := reqctx.WithSpan(ctx, "substreams/tier1/request")
defer span.EndWithErr(&err)

request := req.Msg
var compressed bool
if matchHeader(req.Header()) {
compressed = true
}
if s.enforceCompression && !compressed {
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("your client does not accept gzip- or zstd-compressed streams. Check how to enable it on your gRPC or ConnectRPC client"))
}

request := req.Msg
if request.Modules == nil {
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("missing modules in request"))
}

execGraph, err := exec.NewOutputModuleGraph(request.OutputModule, request.ProductionMode, request.Modules, bstream.GetProtocolFirstStreamableBlock)
if err != nil {
return bsstream.NewErrInvalidArg(err.Error())
}
outputModuleHash := execGraph.ModuleHashes().Get(request.OutputModule)
ctx = reqctx.WithOutputModuleHash(ctx, outputModuleHash)

// We need to ensure that the response function is NEVER used after this Blocks handler has returned.
// We use a context that will be canceled on defer, and a lock to prevent races. The respFunc is used in various threads
mut := sync.Mutex{}
respContext, cancel := context.WithCancel(ctx)
defer func() {
mut.Lock()
cancel()
mut.Unlock()
}()

respFunc := tier1ResponseHandler(respContext, &mut, logger, stream, request.NoopMode)

span.SetAttributes(attribute.Int64("substreams.tier", 1))

if err := ValidateTier1Request(request, s.blockType); err != nil {
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("validate request: %w", err))
}

moduleNames := make([]string, len(request.Modules.Modules))
for i := 0; i < len(moduleNames); i++ {
moduleNames[i] = request.Modules.Modules[i].Name
}
fields := []zap.Field{
zap.Int64("start_block", request.StartBlockNum),
zap.Uint64("stop_block", request.StopBlockNum),
zap.String("cursor", request.StartCursor),
zap.Strings("modules", moduleNames),
zap.String("output_module", request.OutputModule),
zap.String("output_module_hash", outputModuleHash),
zap.Bool("compressed", compressed),
zap.Bool("final_blocks_only", request.FinalBlocksOnly),
zap.Bool("production_mode", request.ProductionMode),
zap.Bool("noop_mode", request.NoopMode),
}

if s.enforceCompression && !compressed {
err := connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("your client does not accept gzip- or zstd-compressed streams. Check how to enable it on your gRPC or ConnectRPC client"))
fields = append(fields, zap.Error(err))
logger.Info("refusing Substreams Blocks request", fields...)
return err
}

if request.Modules == nil {
err := connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("missing modules in request"))
fields = append(fields, zap.Error(err))
logger.Info("refusing Substreams Blocks request", fields...)
return err
}
fields = append(fields, zap.Bool("production_mode", request.ProductionMode))
fields = append(fields, zap.Bool("noop_mode", request.NoopMode))

if auth := dauth.FromContext(ctx); auth != nil {
fields = append(fields,
zap.String("user_id", auth.UserID()),
Expand Down Expand Up @@ -311,13 +288,70 @@ func (s *Tier1Service) Blocks(

// Refuse the request if the hard limit is currently reached by this instance
if status.hardLimitReached() {
s.logger.Info("refusing request, hard limit reached ()",
append(fields, zap.Int("active_request_count", status.activeRequestCount), zap.Int("hard_limit", status.hardLimit))...,
)
return connect.NewError(connect.CodeUnavailable, fmt.Errorf("service under heavy load, please try connecting again"))
err := connect.NewError(connect.CodeUnavailable, fmt.Errorf("service under heavy load, please try connecting again"))
fields = append(fields, zap.Error(err), zap.Int("active_request_count", status.activeRequestCount), zap.Int("hard_limit", status.hardLimit))
logger.Info("refusing Substreams Blocks request", fields...)
return err
}

execGraph, err := exec.NewOutputModuleGraph(request.OutputModule, request.ProductionMode, request.Modules, bstream.GetProtocolFirstStreamableBlock)
if err != nil {
err := bsstream.NewErrInvalidArg(err.Error())
fields = append(fields, zap.Error(err))
logger.Info("refusing Substreams Blocks request", fields...)
return err
}

outputModuleHash := execGraph.ModuleHashes().Get(request.OutputModule)
ctx = reqctx.WithOutputModuleHash(ctx, outputModuleHash)
fields = append(fields, zap.String("output_module_hash", outputModuleHash))

usedModules := execGraph.UsedModules()

var hasStores bool
var hasFilter bool
moduleNames := make([]string, len(usedModules))
for i, module := range usedModules {
moduleNames[i] = module.Name
if module.GetKindStore() != nil {
hasStores = true
}
if module.BlockFilter != nil {
hasFilter = true
}
}
fields = append(fields,
zap.Strings("modules", moduleNames),
zap.Bool("with_stores", hasStores),
zap.Bool("with_blockfilter", hasFilter),
zap.Int("module_count", len(usedModules)),
)

// We need to ensure that the response function is NEVER used after this Blocks handler has returned.
// We use a context that will be canceled on defer, and a lock to prevent races. The respFunc is used in various threads
mut := sync.Mutex{}
respContext, cancel := context.WithCancel(ctx)
defer func() {
mut.Lock()
cancel()
mut.Unlock()
}()

span.SetAttributes(attribute.Int64("substreams.tier", 1))

if err := ValidateTier1Request(request, s.blockType); err != nil {
err := connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("validate request: %w", err))
fields = append(fields, zap.Error(err))
logger.Info("refusing Substreams Blocks request", fields...)
return err
}

logger.Info("incoming Substreams Blocks request", fields...)

var reqStats *metrics.Stats
ctx, reqStats = setupRequestStats(ctx, request.OutputModule, outputModuleHash, true, true)
defer reqStats.LogAndClose()

metrics.SubstreamsCounter.Inc()
metrics.ActiveRequests.Inc()
defer func() {
Expand Down Expand Up @@ -356,7 +390,9 @@ func (s *Tier1Service) Blocks(
}
}()

respFunc := tier1ResponseHandler(respContext, &mut, logger, stream, request.NoopMode, reqStats)
err = s.blocks(runningContext, request, execGraph, respFunc)
reqStats.SetError(err)

if connectError := toConnectError(runningContext, err); connectError != nil {
switch connect.CodeOf(connectError) {
Expand All @@ -366,13 +402,12 @@ func (s *Tier1Service) Blocks(
logger.Debug("recording failure on request", zap.String("request_id", requestID))
s.recordFailure(requestID, connectError)
case connect.CodeCanceled:
logger.Info("Blocks request canceled by user", zap.Error(connectError))
logger.Debug("Blocks request canceled by user", zap.Error(connectError))
default:
logger.Warn("Blocks request completed with error", zap.Error(connectError))
}
return connectError
}

logger.Debug("Blocks request completed without error")
return nil
}
Expand Down Expand Up @@ -447,10 +482,6 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ

}

var requestStats *metrics.Stats
ctx, requestStats = setupRequestStats(ctx, requestDetails, execGraph.ModuleHashes().Get(requestDetails.OutputModule), false)
defer requestStats.LogAndClose()

traceId := tracing.GetTraceID(ctx).String()
respFunc(&pbsubstreamsrpc.Response{
Message: &pbsubstreamsrpc.Response_Session{
Expand Down Expand Up @@ -642,7 +673,7 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ
return pipe.OnStreamTerminated(ctx, streamErr)
}

func tier1ResponseHandler(ctx context.Context, mut *sync.Mutex, logger *zap.Logger, streamSrv *connect.ServerStream[pbsubstreamsrpc.Response], noop bool) substreams.ResponseFunc {
func tier1ResponseHandler(ctx context.Context, mut *sync.Mutex, logger *zap.Logger, streamSrv *connect.ServerStream[pbsubstreamsrpc.Response], noop bool, stats *metrics.Stats) substreams.ResponseFunc {
auth := dauth.FromContext(ctx)
userID := auth.UserID()
apiKeyID := auth.APIKeyID()
Expand All @@ -664,13 +695,15 @@ func tier1ResponseHandler(ctx context.Context, mut *sync.Mutex, logger *zap.Logg
return ctx.Err()
}

if noop {
if data := resp.GetBlockScopedData(); data != nil {
if data := resp.GetBlockScopedData(); data != nil {
stats.RecordDataSent()
if noop {
data.DebugMapOutputs = nil
data.DebugStoreOutputs = nil
data.Output = &pbsubstreamsrpc.MapModuleOutput{}
}
}

if err := streamSrv.Send(resp); err != nil {
logger.Info("unable to send block probably due to client disconnecting", zap.Error(err))
return connect.NewError(connect.CodeUnavailable, err)
Expand All @@ -681,16 +714,16 @@ func tier1ResponseHandler(ctx context.Context, mut *sync.Mutex, logger *zap.Logg
}
}

func setupRequestStats(ctx context.Context, requestDetails *reqctx.RequestDetails, outputModuleGraph string, tier2 bool) (context.Context, *metrics.Stats) {
func setupRequestStats(ctx context.Context, outputModuleName, outputModuleHash string, productionMode, tier2 bool) (context.Context, *metrics.Stats) {
logger := reqctx.Logger(ctx)
auth := dauth.FromContext(ctx)
stats := metrics.NewReqStats(&metrics.Config{
UserID: auth.UserID(),
ApiKeyID: auth.APIKeyID(),
Tier2: tier2,
OutputModule: requestDetails.OutputModule,
OutputModuleHash: outputModuleGraph,
ProductionMode: requestDetails.ProductionMode,
OutputModule: outputModuleName,
OutputModuleHash: outputModuleHash,
ProductionMode: productionMode,
}, logger)
return reqctx.WithReqStats(ctx, stats), stats
}
Expand Down
Loading

0 comments on commit 1e6ea8b

Please sign in to comment.