Skip to content

Commit

Permalink
fix: mint and burn func signatures, adjustable batch size for pool an…
Browse files Browse the repository at this point in the history
…d backfill buffer
  • Loading branch information
kamikazechaser committed Sep 5, 2024
1 parent 591518b commit 22ffc22
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 29 deletions.
8 changes: 5 additions & 3 deletions cmd/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func main() {
poolOpts := pool.PoolOpts{
Logg: lo,
WorkerCount: ko.Int("core.pool_size"),
BatchSize: ko.MustInt("core.batch_size"),
Processor: blockProcessor,
}
if ko.Int("core.pool_size") <= 0 {
Expand Down Expand Up @@ -136,9 +137,10 @@ func main() {
}

backfill := backfill.New(backfill.BackfillOpts{
DB: db,
Logg: lo,
Pool: workerPool,
BatchSize: ko.MustInt("core.batch_size"),
DB: db,
Logg: lo,
Pool: workerPool,
})

apiServer := &http.Server{
Expand Down
1 change: 1 addition & 0 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ db_type = "bolt"
# Defaults to (nproc * 3)
pool_size = 0
# If you are using an archive node, set this to true
batch_size = 100

[redis]
dsn = "127.0.0.1:6379"
Expand Down
39 changes: 20 additions & 19 deletions internal/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,35 @@ import (

type (
BackfillOpts struct {
DB db.DB
Logg *slog.Logger
Pool *pool.Pool
BatchSize int
DB db.DB
Logg *slog.Logger
Pool *pool.Pool
}

Backfill struct {
db db.DB
logg *slog.Logger
pool *pool.Pool
stopCh chan struct{}
ticker *time.Ticker
batchSize int
db db.DB
logg *slog.Logger
pool *pool.Pool
stopCh chan struct{}
ticker *time.Ticker
}
)

const (
idleCheckInterval = 60 * time.Second
busyCheckInterval = 1 * time.Second

maxPoolSizePush = 100
)

func New(o BackfillOpts) *Backfill {
return &Backfill{
db: o.DB,
logg: o.Logg,
pool: o.Pool,
stopCh: make(chan struct{}),
ticker: time.NewTicker(idleCheckInterval),
batchSize: o.BatchSize,
db: o.DB,
logg: o.Logg,
pool: o.Pool,
stopCh: make(chan struct{}),
ticker: time.NewTicker(idleCheckInterval),
}
}

Expand Down Expand Up @@ -90,25 +91,25 @@ func (b *Backfill) Run(skipLatest bool) error {
if missingBlocksCount > 0 {
b.logg.Info("found missing blocks", "skip_latest", skipLatest, "missing_blocks_count", missingBlocksCount)

buffer := make([]uint, maxPoolSizePush)
buffer := make([]uint, b.batchSize)
j := uint(0)
pushedCount := 0
j, buffer = missingBlocks.NextSetMany(j, buffer)
for ; len(buffer) > 0; j, buffer = missingBlocks.NextSetMany(j, buffer) {
for k := range buffer {
if pushedCount >= maxPoolSizePush {
if pushedCount >= b.batchSize {
break
}

b.pool.Push(uint64(buffer[k]))
b.logg.Debug("pushed block from backfill", "block", buffer[k])
pushedCount++
}
j += 1
j++
}
}

if missingBlocksCount > maxPoolSizePush {
if missingBlocksCount > uint(b.batchSize) {
b.ticker.Reset(busyCheckInterval)
} else {
b.ticker.Reset(idleCheckInterval)
Expand Down
5 changes: 2 additions & 3 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

type (
PoolOpts struct {
BatchSize int
Logg *slog.Logger
WorkerCount int
Processor *processor.Processor
Expand All @@ -23,14 +24,12 @@ type (
}
)

const blocksBuffer = 100

func New(o PoolOpts) *Pool {
return &Pool{
logg: o.Logg,
workerPool: pond.New(
o.WorkerCount,
blocksBuffer,
o.BatchSize,
pond.Strategy(pond.Balanced()),
pond.PanicHandler(panicHandler(o.Logg)),
),
Expand Down
4 changes: 2 additions & 2 deletions internal/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (p *Processor) ProcessBlock(ctx context.Context, blockNumber uint64) error
Timestamp: block.Time(),
},
); err != nil && !errors.Is(err, context.Canceled) {
return err
return fmt.Errorf("route success transaction error: tx %s: %v", receipt.TxHash.Hex(), err)
}
}
}
Expand Down Expand Up @@ -100,7 +100,7 @@ func (p *Processor) ProcessBlock(ctx context.Context, blockNumber uint64) error
TxHash: receipt.TxHash.Hex(),
},
); err != nil && !errors.Is(err, context.Canceled) {
return err
return fmt.Errorf("route revert transaction error: tx %s: %v", receipt.TxHash.Hex(), err)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/router/token_burn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var (
_ Handler = (*tokenBurnHandler)(nil)

tokenBurnEvent = w3.MustNewEvent("Burn(address indexed _tokenBurner, uint256 _value)")
tokenBurnToSig = w3.MustNewFunc("Burn(uint256)", "bool")
tokenBurnToSig = w3.MustNewFunc("burn(uint256)", "bool")
)

func (h *tokenBurnHandler) Name() string {
Expand Down
2 changes: 1 addition & 1 deletion internal/router/token_mint.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var (
_ Handler = (*tokenMintHandler)(nil)

tokenMintEvent = w3.MustNewEvent("Mint(address indexed _tokenMinter, address indexed _beneficiary, uint256 _value)")
tokenMintToSig = w3.MustNewFunc("MintTo(address, uint256)", "bool")
tokenMintToSig = w3.MustNewFunc("mintTo(address, uint256)", "bool")
)

func (h *tokenMintHandler) Name() string {
Expand Down

0 comments on commit 22ffc22

Please sign in to comment.