Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialize TargetPaginationKeys in constructor #378

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions data_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type DataIterator struct {
StateTracker *StateTracker
TableSorter DataIteratorSorter

targetPaginationKeys *sync.Map
TargetPaginationKeys *sync.Map
batchListeners []func(*RowBatch) error
doneListeners []func() error
logger *logrus.Entry
Expand All @@ -33,7 +33,6 @@ type TableMaxPaginationKey struct {

func (d *DataIterator) Run(tables []*TableSchema) {
d.logger = logrus.WithField("tag", "data_iterator")
d.targetPaginationKeys = &sync.Map{}

// If a state tracker is not provided, then the caller doesn't care about
// tracking state. However, some methods are still useful so we initialize
Expand All @@ -59,7 +58,7 @@ func (d *DataIterator) Run(tables []*TableSchema) {
// We don't need to reiterate those tables as it has already been done.
delete(tablesWithData, table)
} else {
d.targetPaginationKeys.Store(tableName, maxPaginationKey)
d.TargetPaginationKeys.Store(tableName, maxPaginationKey)
}
}

Expand All @@ -79,9 +78,9 @@ func (d *DataIterator) Run(tables []*TableSchema) {

logger := d.logger.WithField("table", table.String())

targetPaginationKeyInterface, found := d.targetPaginationKeys.Load(table.String())
targetPaginationKeyInterface, found := d.TargetPaginationKeys.Load(table.String())
if !found {
err := fmt.Errorf("%s not found in targetPaginationKeys, this is likely a programmer error", table.String())
err := fmt.Errorf("%s not found in TargetPaginationKeys, this is likely a programmer error", table.String())
logger.WithError(err).Error("this is definitely a bug")
d.ErrorHandler.Fatal("data_iterator", err)
return
Expand Down
3 changes: 2 additions & 1 deletion ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (f *Ferry) NewDataIterator() *DataIterator {
ReadRetries: f.Config.DBReadRetries,
},
StateTracker: f.StateTracker,
TargetPaginationKeys: &sync.Map{},
}

if f.CopyFilter != nil {
Expand Down Expand Up @@ -993,7 +994,7 @@ func (f *Ferry) Progress() *Progress {

s.Tables = make(map[string]TableProgress)
targetPaginationKeys := make(map[string]uint64)
f.DataIterator.targetPaginationKeys.Range(func(k, v interface{}) bool {
f.DataIterator.TargetPaginationKeys.Range(func(k, v interface{}) bool {
targetPaginationKeys[k.(string)] = v.(uint64)
return true
})
Expand Down
2 changes: 2 additions & 0 deletions test/go/data_iterator_sorter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package test
import (
"fmt"
"sort"
"sync"
"testing"

"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -58,6 +59,7 @@ func (t *DataIteratorSorterTestSuite) SetupTest() {
t.dataIterator = &ghostferry.DataIterator{
DB: t.Ferry.SourceDB,
ErrorHandler: t.Ferry.ErrorHandler,
TargetPaginationKeys: &sync.Map{},
}
}

Expand Down
1 change: 1 addition & 0 deletions test/go/data_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (this *DataIteratorTestSuite) SetupTest() {
ReadRetries: config.DBReadRetries,
},
StateTracker: ghostferry.NewStateTracker(config.DataIterationConcurrency * 10),
TargetPaginationKeys: &sync.Map{},
}

this.receivedRows = make(map[string][]ghostferry.RowData, 0)
Expand Down
Loading