Skip to content

Optimize ReadN in CDC by collecting records in batches #284

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,14 @@ func (s *Source) Open(ctx context.Context, pos opencdc.Position) error {
fallthrough
case source.CDCModeLogrepl:
i, err := logrepl.NewCombinedIterator(ctx, s.pool, logrepl.Config{
Position: pos,
SlotName: s.config.LogreplSlotName,
PublicationName: s.config.LogreplPublicationName,
Tables: s.config.Tables,
TableKeys: s.tableKeys,
WithSnapshot: s.config.SnapshotMode == source.SnapshotModeInitial,
WithAvroSchema: s.config.WithAvroSchema,
SnapshotFetchSize: s.config.SnapshotFetchSize,
Position: pos,
SlotName: s.config.LogreplSlotName,
PublicationName: s.config.LogreplPublicationName,
Tables: s.config.Tables,
TableKeys: s.tableKeys,
WithSnapshot: s.config.SnapshotMode == source.SnapshotModeInitial,
WithAvroSchema: s.config.WithAvroSchema,
BatchSize: *s.config.BatchSize,
})
if err != nil {
return fmt.Errorf("failed to create logical replication iterator: %w", err)
Expand Down
159 changes: 125 additions & 34 deletions source/logrepl/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit-connector-postgres/source/logrepl/internal"
Expand All @@ -35,15 +36,28 @@ type CDCConfig struct {
Tables []string
TableKeys map[string]string
WithAvroSchema bool
// BatchSize is the maximum size of a batch that will be read from the DB
// in one go and processed by the CDCHandler.
BatchSize int
}

// CDCIterator asynchronously listens for events from the logical replication
// slot and returns them to the caller through NextN.
type CDCIterator struct {
config CDCConfig
records chan opencdc.Record
config CDCConfig
sub *internal.Subscription

sub *internal.Subscription
// batchesCh is a channel shared between this iterator and a CDCHandler,
// to which the CDCHandler is sending batches of records.
// Using a shared queue here would be the fastest option. However,
// we also need to watch for a context that can get cancelled,
// and for the subscription that can end, so using a channel is
// the best option at the moment.
batchesCh chan []opencdc.Record

// recordsForNextRead contains records from the previous batch (returned by the CDCHandler),
// that weren't return by this iterator's ReadN method.
recordsForNextRead []opencdc.Record
}

// NewCDCIterator initializes logical replication by creating the publication and subscription manager.
Expand All @@ -64,8 +78,22 @@ func NewCDCIterator(ctx context.Context, pool *pgxpool.Pool, c CDCConfig) (*CDCI
Msgf("Publication %q already exists.", c.PublicationName)
}

records := make(chan opencdc.Record)
handler := NewCDCHandler(internal.NewRelationSet(), c.TableKeys, records, c.WithAvroSchema)
// Using a buffered channel here so that the handler can send a batch
// to the channel and start building a new batch.
// This is useful when the first batch in the channel didn't reach BatchSize (which is sdk.batch.size).
// The handler can prepare the next batch, and the CDCIterator can use them
// to return the maximum number of records.
batchesCh := make(chan []opencdc.Record, 1)
handler := NewCDCHandler(
ctx,
internal.NewRelationSet(),
c.TableKeys,
batchesCh,
c.WithAvroSchema,
c.BatchSize,
// todo make configurable
time.Second,
)

sub, err := internal.CreateSubscription(
ctx,
Expand All @@ -81,9 +109,9 @@ func NewCDCIterator(ctx context.Context, pool *pgxpool.Pool, c CDCConfig) (*CDCI
}

return &CDCIterator{
config: c,
records: records,
sub: sub,
config: c,
batchesCh: batchesCh,
sub: sub,
}, nil
}

Expand Down Expand Up @@ -113,8 +141,9 @@ func (i *CDCIterator) StartSubscriber(ctx context.Context) error {
return nil
}

// NextN takes and returns up to n records from the queue. NextN is allowed to
// block until either at least one record is available or the context gets canceled.
// NextN returns up to n records from the internal channel with records.
// NextN is allowed to block until either at least one record is available
// or the context gets canceled.
func (i *CDCIterator) NextN(ctx context.Context, n int) ([]opencdc.Record, error) {
if !i.subscriberReady() {
return nil, errors.New("logical replication has not been started")
Expand All @@ -124,9 +153,45 @@ func (i *CDCIterator) NextN(ctx context.Context, n int) ([]opencdc.Record, error
return nil, fmt.Errorf("n must be greater than 0, got %d", n)
}

var recs []opencdc.Record
// First, we check if there are any records from the previous batch
// that we can start with.
recs := make([]opencdc.Record, len(i.recordsForNextRead), n)
copy(recs, i.recordsForNextRead)
i.recordsForNextRead = nil

// NextN needs to wait until at least 1 record is available.
if len(recs) == 0 {
batch, err := i.nextRecordsBatchBlocking(ctx)
if err != nil {
return nil, fmt.Errorf("failed to fetch next batch of records (blocking): %w", err)
}
recs = batch
}

// We add any already available batches (i.e., we're not blocking waiting for any new batches to arrive)
// to return at most n records.
for len(recs) < n {
batch, err := i.nextRecordsBatch(ctx)
if err != nil {
return nil, fmt.Errorf("failed to fetch next batch of records: %w", err)
}
if batch == nil {
break
}
recs = i.appendRecordsWithLimit(recs, batch, n)
}

sdk.Logger(ctx).Trace().
Int("records", len(recs)).
Int("records_for_next_read", len(i.recordsForNextRead)).
Msg("CDCIterator.NextN returning records")
return recs, nil
}

// Block until at least one record is received or context is canceled
// nextRecordsBatchBlocking waits for the next batch of records to arrive,
// or for the context to be done, or for the subscription to be done,
// whichever comes first.
func (i *CDCIterator) nextRecordsBatchBlocking(ctx context.Context) ([]opencdc.Record, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
Expand All @@ -142,33 +207,59 @@ func (i *CDCIterator) NextN(ctx context.Context, n int) ([]opencdc.Record, error
// subscription stopped without an error and the context is still
// open, this is a strange case, shouldn't actually happen
return nil, fmt.Errorf("subscription stopped, no more data to fetch (this smells like a bug)")
case rec := <-i.records:
recs = append(recs, rec)
case batch := <-i.batchesCh:
sdk.Logger(ctx).Trace().
Int("records", len(batch)).
Msg("CDCIterator.NextN received batch of records (blocking)")
return batch, nil
}
}

for len(recs) < n {
select {
case rec := <-i.records:
recs = append(recs, rec)
case <-ctx.Done():
return nil, ctx.Err()
case <-i.sub.Done():
if err := i.sub.Err(); err != nil {
return recs, fmt.Errorf("logical replication error: %w", err)
}
if err := ctx.Err(); err != nil {
// Return what we have with context error
return recs, err
}
// Return what we have with subscription stopped error
return recs, fmt.Errorf("subscription stopped, no more data to fetch (this smells like a bug)")
default:
// No more records currently available
return recs, nil
func (i *CDCIterator) nextRecordsBatch(ctx context.Context) ([]opencdc.Record, error) {
select {
case <-ctx.Done():
// Return what we have with the error
return nil, ctx.Err()
case <-i.sub.Done():
if err := i.sub.Err(); err != nil {
return nil, fmt.Errorf("logical replication error: %w", err)
}
if err := ctx.Err(); err != nil {
// Return what we have with the context error
return nil, err
}
// Return what we have with subscription stopped error
return nil, fmt.Errorf("subscription stopped, no more data to fetch (this smells like a bug)")
case batch := <-i.batchesCh:
sdk.Logger(ctx).Trace().
Int("records", len(batch)).
Msg("CDCIterator.NextN received batch of records")

return batch, nil
default:
// No more records currently available
return nil, nil
}
}

return recs, nil
// appendRecordsWithLimit moves records from src to dst, until the given limit is reached,
// or all records from src have been moved.
// If some records from src are not moved (probably because they lack emotions),
// they are saved to recordsForNextRead.
func (i *CDCIterator) appendRecordsWithLimit(dst []opencdc.Record, src []opencdc.Record, limit int) []opencdc.Record {
if len(src) == 0 || len(dst) > limit {
return src
}

needed := limit - len(dst)
if needed > len(src) {
needed = len(src)
}

dst = append(dst, src[:needed]...)
i.recordsForNextRead = src[needed:]

return dst
}

// Ack forwards the acknowledgment to the subscription.
Expand Down
103 changes: 93 additions & 10 deletions source/logrepl/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,93 @@ func TestCDCIterator_Ack(t *testing.T) {
})
}
}

func TestCDCIterator_NextN_InternalBatching(t *testing.T) {
ctx := test.Context(t)
pool := test.ConnectPool(ctx, t, test.RepmgrConnString)
table := test.SetupEmptyTestTable(ctx, t, pool)

is := is.New(t)
underTest := testCDCIterator(ctx, t, pool, table, true)
<-underTest.sub.Ready()

insertTestRows(ctx, is, pool, table, 1, 1)
// wait until the CDCHandler flushes this one record
// so that we force the CDCIterator to wait for another batch
time.Sleep(time.Second * 2)
insertTestRows(ctx, is, pool, table, 2, 5)

// we request 2 records, expect records 1 and 2
got, err := underTest.NextN(ctx, 2)
is.NoErr(err)
verifyOpenCDCRecords(is, got, table, 1, 2)
time.Sleep(200 * time.Millisecond)

// we request 2 records, expect records 3 and 4
got, err = underTest.NextN(ctx, 2)
is.NoErr(err)
verifyOpenCDCRecords(is, got, table, 3, 4)
time.Sleep(200 * time.Millisecond)

// we request 2 records, expect record 5
got, err = underTest.NextN(ctx, 2)
is.NoErr(err)
verifyOpenCDCRecords(is, got, table, 5, 5)
}

func insertTestRows(ctx context.Context, is *is.I, pool *pgxpool.Pool, table string, from int, to int) {
for i := from; i <= to; i++ {
_, err := pool.Exec(
ctx,
fmt.Sprintf(
`INSERT INTO %s (id, column1, column2, column3, column4, column5)
VALUES (%d, 'test-%d', %d, false, 12.3, 14)`, table, i+10, i, i*100,
),
)
is.NoErr(err)
}
}

func verifyOpenCDCRecords(is *is.I, got []opencdc.Record, tableName string, fromID, toID int) {
// Build the expected records slice
var want []opencdc.Record

for i := fromID; i <= toID; i++ {
id := int64(i + 10)
record := opencdc.Record{
Operation: opencdc.OperationCreate,
Key: opencdc.StructuredData{
"id": id,
},
Payload: opencdc.Change{
After: opencdc.StructuredData{
"id": id,
"key": nil,
"column1": fmt.Sprintf("test-%d", i),
"column2": int32(i) * 100, //nolint:gosec // fine, we know the value is small enough
"column3": false,
"column4": 12.3,
"column5": int64(14),
"column6": nil,
"column7": nil,
"UppercaseColumn1": nil,
},
},
Metadata: opencdc.Metadata{
opencdc.MetadataCollection: tableName,
},
}

want = append(want, record)
}

cmpOpts := []cmp.Option{
cmpopts.IgnoreUnexported(opencdc.Record{}),
cmpopts.IgnoreFields(opencdc.Record{}, "Position", "Metadata"),
}
is.Equal("", cmp.Diff(want, got, cmpOpts...)) // mismatch (-want +got)
}

func TestCDCIterator_NextN(t *testing.T) {
ctx := test.Context(t)
pool := test.ConnectPool(ctx, t, test.RepmgrConnString)
Expand Down Expand Up @@ -575,17 +662,12 @@ func TestCDCIterator_NextN(t *testing.T) {
VALUES (30, 'test-1', 100, false, 12.3, 14)`, table))
is.NoErr(err)

go func() {
time.Sleep(100 * time.Millisecond)
is.NoErr(i.Teardown(ctx))
}()
time.Sleep(100 * time.Millisecond)
is.NoErr(i.Teardown(ctx))

records, err := i.NextN(ctx, 5)
if err != nil {
is.True(strings.Contains(err.Error(), "logical replication error"))
} else {
is.True(len(records) > 0)
}
Comment on lines -583 to -588
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I read this part of the test, I wasn't sure what the expected behavior was (i.e., whether we expected an error or not). Based on the test description ("subscription termination"), it appeared to me that an error is what we should always expect.

_, err = i.NextN(ctx, 5)
is.True(err != nil)
is.True(strings.Contains(err.Error(), "logical replication error"))
})
}

Expand All @@ -597,6 +679,7 @@ func testCDCIterator(ctx context.Context, t *testing.T, pool *pgxpool.Pool, tabl
PublicationName: table, // table is random, reuse for publication name
SlotName: table, // table is random, reuse for slot name
WithAvroSchema: true,
BatchSize: 2,
}

i, err := NewCDCIterator(ctx, pool, config)
Expand Down
Loading