Skip to content

Commit

Permalink
op-node/rollup/derive: Implement Holocene Frame Queue (ethereum-optim…
Browse files Browse the repository at this point in the history
…ism#12069)

* op-node/rollup/derive: Implement Holocene Frame Queue

* add FrameQueue test

* use non-nil context

* address reviews, refactor frame loading and pruning
  • Loading branch information
sebastianst authored Sep 27, 2024
1 parent b0a4c11 commit 6b2a3fe
Show file tree
Hide file tree
Showing 5 changed files with 328 additions and 12 deletions.
91 changes: 82 additions & 9 deletions op-node/rollup/derive/frame_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (

"github.com/ethereum/go-ethereum/log"

"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
)

var _ NextFrameProvider = &FrameQueue{}

//go:generate mockery --name NextDataProvider --case snake
type NextDataProvider interface {
NextData(context.Context) ([]byte, error)
Origin() eth.L1BlockRef
Expand All @@ -20,12 +22,14 @@ type FrameQueue struct {
log log.Logger
frames []Frame
prev NextDataProvider
cfg *rollup.Config
}

func NewFrameQueue(log log.Logger, prev NextDataProvider) *FrameQueue {
func NewFrameQueue(log log.Logger, cfg *rollup.Config, prev NextDataProvider) *FrameQueue {
return &FrameQueue{
log: log,
prev: prev,
cfg: cfg,
}
}

Expand All @@ -34,18 +38,15 @@ func (fq *FrameQueue) Origin() eth.L1BlockRef {
}

func (fq *FrameQueue) NextFrame(ctx context.Context) (Frame, error) {
// Find more frames if we need to
// TODO(12157): reset frame queue once at Holocene L1 origin block

// Only load more frames if necessary
if len(fq.frames) == 0 {
if data, err := fq.prev.NextData(ctx); err != nil {
if err := fq.loadNextFrames(ctx); err != nil {
return Frame{}, err
} else {
if new, err := ParseFrames(data); err == nil {
fq.frames = append(fq.frames, new...)
} else {
fq.log.Warn("Failed to parse frames", "origin", fq.prev.Origin(), "err", err)
}
}
}

// If we did not add more frames but still have more data, retry this function.
if len(fq.frames) == 0 {
return Frame{}, NotEnoughData
Expand All @@ -56,6 +57,78 @@ func (fq *FrameQueue) NextFrame(ctx context.Context) (Frame, error) {
return ret, nil
}

func (fq *FrameQueue) loadNextFrames(ctx context.Context) error {
data, err := fq.prev.NextData(ctx)
if err != nil {
return err
}

if frames, err := ParseFrames(data); err == nil {
fq.frames = append(fq.frames, frames...)
} else {
fq.log.Warn("Failed to parse frames", "origin", fq.prev.Origin(), "err", err)
return nil
}

// Note: this implementation first parses all frames from the next L1 transaction and only then
// prunes all frames that were parsed. An even more memory-efficient implementation could prune
// the frame queue each time after pulling out only a single frame.

if fq.cfg.IsHolocene(fq.Origin().Time) {
// We only need to prune the queue after adding more frames to it.
// Moving frames out of the queue to the next stage cannot invalidate any frames in
// the queue.
fq.prune()
}

return nil
}

func (fq *FrameQueue) prune() {
fq.frames = pruneFrameQueue(fq.frames)
}

// pruneFrameQueue prunes the frame queue to only hold contiguous and ordered
// frames, conforming to Holocene frame queue rules.
func pruneFrameQueue(frames []Frame) []Frame {
for i := 0; i < len(frames)-1; {
current, next := frames[i], frames[i+1]
discard := func(d int) {
frames = append(frames[0:i+d], frames[i+1+d:]...)
}
// frames for the same channel ID must arrive in order
if current.ID == next.ID {
if current.IsLast {
discard(1) // discard next
continue
}
if next.FrameNumber != current.FrameNumber+1 {
discard(1) // discard next
continue
}
} else {
// first frames discard previously unclosed channels
if next.FrameNumber == 0 && !current.IsLast {
discard(0) // discard current
// make sure we backwards invalidate more frames of unclosed channel
if i > 0 {
i--
}
continue
}
// non-first frames of new channels are dropped
if next.FrameNumber != 0 {
discard(1) // discard next
continue
}
}
// We only update the cursor if we didn't remove any frame, so if any frame got removed, the
// checks are applied to the new pair in the queue at the same position.
i++
}
return frames
}

func (fq *FrameQueue) Reset(_ context.Context, _ eth.L1BlockRef, _ eth.SystemConfig) error {
fq.frames = fq.frames[:0]
return io.EOF
Expand Down
159 changes: 159 additions & 0 deletions op-node/rollup/derive/frame_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package derive

import (
"bytes"
"context"
"io"
"log/slog"
"testing"

"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive/mocks"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestPruneFrameQueue(t *testing.T) {
for _, tt := range []struct {
desc string
frames []testFrame
expected []testFrame
}{
{
desc: "empty",
frames: []testFrame{},
expected: []testFrame{},
},
{
desc: "one",
frames: []testFrame{"a:2:"},
expected: []testFrame{"a:2:"},
},
{
desc: "one-last",
frames: []testFrame{"a:2:!"},
expected: []testFrame{"a:2:!"},
},
{
desc: "last-new",
frames: []testFrame{"a:2:!", "b:0:"},
expected: []testFrame{"a:2:!", "b:0:"},
},
{
desc: "last-ooo",
frames: []testFrame{"a:2:!", "b:1:"},
expected: []testFrame{"a:2:!"},
},
{
desc: "middle-lastooo",
frames: []testFrame{"b:1:", "a:2:!"},
expected: []testFrame{"b:1:"},
},
{
desc: "middle-first",
frames: []testFrame{"b:1:", "a:0:"},
expected: []testFrame{"a:0:"},
},
{
desc: "last-first",
frames: []testFrame{"b:1:!", "a:0:"},
expected: []testFrame{"b:1:!", "a:0:"},
},
{
desc: "last-ooo",
frames: []testFrame{"b:1:!", "b:2:"},
expected: []testFrame{"b:1:!"},
},
{
desc: "ooo",
frames: []testFrame{"b:1:", "b:3:"},
expected: []testFrame{"b:1:"},
},
{
desc: "other-ooo",
frames: []testFrame{"b:1:", "c:3:"},
expected: []testFrame{"b:1:"},
},
{
desc: "other-ooo-last",
frames: []testFrame{"b:1:", "c:3:", "b:2:!"},
expected: []testFrame{"b:1:", "b:2:!"},
},
{
desc: "ooo-resubmit",
frames: []testFrame{"b:1:", "b:3:!", "b:2:", "b:3:!"},
expected: []testFrame{"b:1:", "b:2:", "b:3:!"},
},
{
desc: "first-discards-multiple",
frames: []testFrame{"c:0:", "c:1:", "c:2:", "d:0:", "c:3:!"},
expected: []testFrame{"d:0:"},
},
{
desc: "complex",
frames: []testFrame{"b:1:", "b:2:!", "a:0:", "c:1:!", "a:1:", "a:2:!", "c:0:", "c:1:", "d:0:", "c:2:!", "e:0:"},
expected: []testFrame{"b:1:", "b:2:!", "a:0:", "a:1:", "a:2:!", "e:0:"},
},
} {
t.Run(tt.desc, func(t *testing.T) {
pfs := pruneFrameQueue(testFramesToFrames(tt.frames...))
require.Equal(t, testFramesToFrames(tt.expected...), pfs)
})
}
}

func TestFrameQueue_NextFrame(t *testing.T) {
t.Run("pre-holocene", func(t *testing.T) { testFrameQueue_NextFrame(t, false) })
t.Run("holocene", func(t *testing.T) { testFrameQueue_NextFrame(t, true) })
}

func testFrameQueue_NextFrame(t *testing.T, holocene bool) {
lgr := testlog.Logger(t, slog.LevelWarn)
cfg := &rollup.Config{}
dp := mocks.NewNextDataProvider(t)
fq := NewFrameQueue(lgr, cfg, dp)

inFrames := testFramesToFrames("b:1:", "b:2:!", "a:0:", "c:1:!", "a:1:", "a:2:!", "c:0:", "c:1:", "d:0:", "c:2:!", "e:0:")
var expFrames []Frame
if holocene {
cfg.HoloceneTime = ptr(uint64(0))
// expect pruned frames with Holocene
expFrames = testFramesToFrames("b:1:", "b:2:!", "a:0:", "a:1:", "a:2:!", "e:0:")
} else {
expFrames = inFrames
}

var inBuf bytes.Buffer
inBuf.WriteByte(DerivationVersion0)
for _, f := range inFrames {
require.NoError(t, f.MarshalBinary(&inBuf))
}

dp.On("Origin").Return(eth.L1BlockRef{})
dp.On("NextData", mock.Anything).Return(inBuf.Bytes(), nil).Once()
dp.On("NextData", mock.Anything).Return(nil, io.EOF)

gotFrames := make([]Frame, 0, len(expFrames))
for i := 0; i <= len(inFrames); i++ { // make sure we hit EOF case
frame, err := fq.NextFrame(context.Background())
if err != nil {
require.ErrorIs(t, err, io.EOF)
break
}
require.NoError(t, err)
gotFrames = append(gotFrames, frame)
}
require.Equal(t, expFrames, gotFrames)
}

func ptr[T any](t T) *T { return &t }

func testFramesToFrames(tfs ...testFrame) []Frame {
fs := make([]Frame, 0, len(tfs))
for _, f := range tfs {
fs = append(fs, f.ToFrame())
}
return fs
}
6 changes: 6 additions & 0 deletions op-node/rollup/derive/frame_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ func TestParseFramesInvalidVer(t *testing.T) {
require.Error(t, err)
}

func TestParseFramesOnlyVersion(t *testing.T) {
frames, err := ParseFrames([]byte{DerivationVersion0})
require.Empty(t, frames)
require.Error(t, err)
}

func TestParseFrames(t *testing.T) {
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
numFrames := rng.Intn(16) + 1
Expand Down
78 changes: 78 additions & 0 deletions op-node/rollup/derive/mocks/next_data_provider.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 6b2a3fe

Please sign in to comment.