Skip to content

Commit

Permalink
[apache#30083][prism] Factor out hold tracking to dedicated structures (
Browse files Browse the repository at this point in the history
apache#31105)

* [prism] Factor out hold tracking to dedicated structures

* review comment-reorder move code out of ladder.

---------

Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com>
  • Loading branch information
lostluck and lostluck authored Apr 26, 2024
1 parent 28a2682 commit 673da54
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 75 deletions.
77 changes: 12 additions & 65 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) {
outW := ss.OutputWatermark()
upPCol, upW := ss.UpstreamWatermark()
upS := em.pcolParents[upPCol]
stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds", ss.watermarkHoldHeap, "holdCounts", ss.watermarkHoldsCounts))
stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds", ss.watermarkHolds.heap, "holdCounts", ss.watermarkHolds.counts))
}
panic(fmt.Sprintf("nothing in progress and no refreshes with non zero pending elements: %v\n%v", v, strings.Join(stageState, "")))
}
Expand Down Expand Up @@ -706,18 +706,7 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol
delete(stage.inprogressKeysByBundle, rb.BundleID)

for hold, v := range stage.inprogressHoldsByBundle[rb.BundleID] {
n := stage.watermarkHoldsCounts[hold] - v
if n == 0 {
delete(stage.watermarkHoldsCounts, hold)
for i, h := range stage.watermarkHoldHeap {
if hold == h {
heap.Remove(&stage.watermarkHoldHeap, i)
break
}
}
} else {
stage.watermarkHoldsCounts[hold] = n
}
stage.watermarkHolds.Drop(hold, v)
}
delete(stage.inprogressHoldsByBundle, rb.BundleID)

Expand Down Expand Up @@ -918,8 +907,7 @@ type stageState struct {
// We track the count of timers with the same hold, and clear it from
// the map and heap when the count goes to zero.
// This avoids scanning the heap to remove or access a hold for each element.
watermarkHoldsCounts map[mtime.Time]int
watermarkHoldHeap holdHeap
watermarkHolds *holdTracker
inprogressHoldsByBundle map[string]map[mtime.Time]int // bundle to associated holds.
}

Expand All @@ -940,37 +928,15 @@ type dataAndTimers struct {
timers map[timerKey]timerTimes
}

// holdHeap orders holds based on their timestamps
// so we can always find the minimum timestamp of pending holds.
type holdHeap []mtime.Time

func (h holdHeap) Len() int { return len(h) }
func (h holdHeap) Less(i, j int) bool { return h[i] < h[j] }
func (h holdHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

func (h *holdHeap) Push(x any) {
// Push and Pop use pointer receivers because they modify the slice's length,
// not just its contents.
*h = append(*h, x.(mtime.Time))
}

func (h *holdHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

// makeStageState produces an initialized stageState.
func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID) *stageState {
ss := &stageState{
ID: ID,
outputIDs: outputIDs,
sides: sides,
strat: defaultStrat{},
state: map[LinkID]map[typex.Window]map[string]StateData{},
watermarkHoldsCounts: map[mtime.Time]int{},
ID: ID,
outputIDs: outputIDs,
sides: sides,
strat: defaultStrat{},
state: map[LinkID]map[typex.Window]map[string]StateData{},
watermarkHolds: newHoldTracker(),

input: mtime.MinTimestamp,
output: mtime.MinTimestamp,
Expand Down Expand Up @@ -1016,29 +982,13 @@ func (ss *stageState) AddPending(newPending []element) int {
// don't increase the count this time, as "this" timer is already pending.
count--
// clear out the existing hold for accounting purposes.
v := ss.watermarkHoldsCounts[lastSet.hold] - 1
if v == 0 {
delete(ss.watermarkHoldsCounts, lastSet.hold)
for i, hold := range ss.watermarkHoldHeap {
if hold == lastSet.hold {
heap.Remove(&ss.watermarkHoldHeap, i)
break
}
}
} else {
ss.watermarkHoldsCounts[lastSet.hold] = v
}
ss.watermarkHolds.Drop(lastSet.hold, 1)
}
// Update the last set time on the timer.
dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}] = timerTimes{firing: e.timestamp, hold: e.holdTimestamp}

// Mark the hold in the heap.
ss.watermarkHoldsCounts[e.holdTimestamp] = ss.watermarkHoldsCounts[e.holdTimestamp] + 1

if len(ss.watermarkHoldsCounts) != len(ss.watermarkHoldHeap) {
// The hold should not be in the heap, so we add it.
heap.Push(&ss.watermarkHoldHeap, e.holdTimestamp)
}
ss.watermarkHolds.Add(e.holdTimestamp, 1)
}
}
return count
Expand Down Expand Up @@ -1308,10 +1258,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] {
defer ss.mu.Unlock()

minPending := ss.minPendingTimestampLocked()
minWatermarkHold := mtime.MaxTimestamp
if ss.watermarkHoldHeap.Len() > 0 {
minWatermarkHold = ss.watermarkHoldHeap[0]
}
minWatermarkHold := ss.watermarkHolds.Min()

// PCollection watermarks are based on their parents's output watermark.
_, newIn := ss.UpstreamWatermark()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func TestStageState_updateWatermarks(t *testing.T) {
ss.output = test.initOutput
ss.updateUpstreamWatermark(inputCol, test.upstream)
ss.pending = append(ss.pending, element{timestamp: test.minPending})
ss.watermarkHoldHeap = append(ss.watermarkHoldHeap, test.minStateHold)
ss.watermarkHolds.Add(test.minStateHold, 1)
ss.updateWatermarks(em)
if got, want := ss.input, test.wantInput; got != want {
pcol, up := ss.UpstreamWatermark()
Expand Down
105 changes: 105 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/holds.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package engine

import (
"container/heap"
"fmt"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
)

// holdHeap orders holds based on their timestamps
// so we can always find the minimum timestamp of pending holds.
type holdHeap []mtime.Time

func (h holdHeap) Len() int { return len(h) }
func (h holdHeap) Less(i, j int) bool { return h[i] < h[j] }
func (h holdHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

func (h *holdHeap) Push(x any) {
// Push and Pop use pointer receivers because they modify the slice's length,
// not just its contents.
*h = append(*h, x.(mtime.Time))
}

func (h *holdHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

// holdTracker track the watermark holds for a stage.
//
// Timers hold back the watermark until they fire, but multiple
// timers may set the same watermark hold.
// To track when the watermark may advance further this structure maintains
// counts for each set watermark hold.
// As timers are processed, their associated holds are removed, reducing the counts.
//
// A heap of the hold times is kept so we have quick access to the minimum hold, for calculating
// how to advance the watermark.
type holdTracker struct {
heap holdHeap
counts map[mtime.Time]int
}

func newHoldTracker() *holdTracker {
return &holdTracker{
counts: map[mtime.Time]int{},
}
}

// Drop the given hold count. When the count of a hold time reaches zero, it's
// removed from the heap. Drop panics if holds become negative.
func (ht *holdTracker) Drop(hold mtime.Time, v int) {
n := ht.counts[hold] - v
if n > 0 {
ht.counts[hold] = n
return
} else if n < 0 {
panic(fmt.Sprintf("prism error: negative watermark hold count %v for time %v", n, hold))
}
delete(ht.counts, hold)
for i, h := range ht.heap {
if hold == h {
heap.Remove(&ht.heap, i)
break
}
}
}

// Add a hold a number of times to heap. If the hold time isn't already present in the heap, it is added.
func (ht *holdTracker) Add(hold mtime.Time, v int) {
// Mark the hold in the heap.
ht.counts[hold] = ht.counts[hold] + v

if len(ht.counts) != len(ht.heap) {
// Since there's a difference, the hold should not be in the heap, so we add it.
heap.Push(&ht.heap, hold)
}
}

// Min returns the earliest hold in the heap. Returns [mtime.MaxTimestamp] if the heap is empty.
func (ht *holdTracker) Min() mtime.Time {
minWatermarkHold := mtime.MaxTimestamp
if len(ht.heap) > 0 {
minWatermarkHold = ht.heap[0]
}
return minWatermarkHold
}
115 changes: 115 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/holds_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package engine

import (
"testing"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
)

func TestHoldTracker(t *testing.T) {

type op func(*holdTracker)
add := func(hold mtime.Time, count int) op {
return func(ht *holdTracker) {
ht.Add(hold, count)
}
}

drop := func(hold mtime.Time, count int) op {
return func(ht *holdTracker) {
ht.Drop(hold, count)
}
}

tests := []struct {
name string
ops []op
wantMin mtime.Time
wantLen int
}{
{
name: "zero-max",
wantMin: mtime.MaxTimestamp,
wantLen: 0,
}, {

name: "one-min",
ops: []op{
add(mtime.MinTimestamp, 1),
},
wantMin: mtime.MinTimestamp,
wantLen: 1,
}, {

name: "cleared-max",
ops: []op{
add(mtime.MinTimestamp, 1),
drop(mtime.MinTimestamp, 1),
},
wantMin: mtime.MaxTimestamp,
wantLen: 0,
}, {
name: "cleared-non-eogw",
ops: []op{
add(mtime.MinTimestamp, 1),
add(mtime.EndOfGlobalWindowTime, 1),
drop(mtime.MinTimestamp, 1),
},
wantMin: mtime.EndOfGlobalWindowTime,
wantLen: 1,
}, {
name: "uncleared-non-min",
ops: []op{
add(mtime.MinTimestamp, 2),
add(mtime.EndOfGlobalWindowTime, 1),
drop(mtime.MinTimestamp, 1),
},
wantMin: mtime.MinTimestamp,
wantLen: 2,
}, {
name: "uncleared-non-min",
ops: []op{
add(1, 1),
add(2, 1),
add(3, 1),
drop(2, 1),
add(4, 1),
add(3, 1),
drop(1, 1),
add(2, 1),
drop(4, 1),
},
wantMin: 2,
wantLen: 2,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
tracker := newHoldTracker()
for _, op := range test.ops {
op(tracker)
}
if got, want := tracker.Min(), test.wantMin; got != want {
t.Errorf("tracker.heap.Min() = %v, want %v", got, want)
}
if got, want := tracker.heap.Len(), test.wantLen; got != want {
t.Errorf("tracker.heap.Len() = %v, want %v", got, want)
}
})
}
}
12 changes: 3 additions & 9 deletions sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package engine

import (
"container/heap"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
Expand Down Expand Up @@ -139,13 +138,9 @@ func (ts *testStreamHandler) UpdateHold(em *ElementManager, newHold mtime.Time)
ss.mu.Lock()
defer ss.mu.Unlock()

if ss.watermarkHoldsCounts[ts.currentHold] > 0 {
heap.Pop(&ss.watermarkHoldHeap)
ss.watermarkHoldsCounts[ts.currentHold] = ss.watermarkHoldsCounts[ts.currentHold] - 1
}
ss.watermarkHolds.Drop(ts.currentHold, 1)
ts.currentHold = newHold
heap.Push(&ss.watermarkHoldHeap, ts.currentHold)
ss.watermarkHoldsCounts[ts.currentHold] = 1
ss.watermarkHolds.Add(ts.currentHold, 1)

// kick the TestStream and Impulse stages too.
kick := singleSet(ts.ID)
Expand Down Expand Up @@ -281,8 +276,7 @@ func (tsi *testStreamImpl) initHandler(id string) {
tsi.em.addPending(1) // We subtrack a pending after event execution, so add one now for the final event to avoid a race condition.

// Arrest the watermark initially to prevent terminal advancement.
heap.Push(&ss.watermarkHoldHeap, tsi.em.testStreamHandler.currentHold)
ss.watermarkHoldsCounts[tsi.em.testStreamHandler.currentHold] = 1
ss.watermarkHolds.Add(tsi.em.testStreamHandler.currentHold, 1)
}
}

Expand Down

0 comments on commit 673da54

Please sign in to comment.