Skip to content

Commit

Permalink
[#22489] CDC: Address YB_TODOs in logical replication related files
Browse files Browse the repository at this point in the history
Summary:
This diff addresses the YB_TODOs left in the logical replication related files during PG 15 merge. Following files contained these YB_TODOs:

  - **reorderbuffer.c**
PG15 introduced two-phase transactions, but these are not yet supported in YB. Hence kept the code related to streaming of partially streamed two-phase transactions inside `!IsYugaByteEnabled()` block.

  - **yb_decode.c**
The signature of the function `ReorderBufferQueueChange()` has changed in PG15. A new argument called `toast_insert` has been added. Since YB does not support on disk TOASTing, the value of this argument will be always be false.

  - **walsender.c**
A new check has been introduced in PG15 to verify that `restart_lsn` is not invalid (0) at the time of starting logical replication. We simply enable this check here.
Jira: DB-11416

Test Plan: Jenkins: test regex: .*(CDC|ReplicationSlot).*

Reviewers: skumar, siddharth.shah, stiwary, utkarsh.munjal

Reviewed By: siddharth.shah, stiwary

Subscribers: yql, ycdcxcluster

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D42112
  • Loading branch information
Sumukh-Phalgaonkar committed Feb 25, 2025
1 parent 1be9e85 commit b2e06d6
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 13 deletions.
9 changes: 4 additions & 5 deletions src/postgres/src/backend/replication/logical/reorderbuffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -2669,7 +2669,10 @@ ReorderBufferReplay(ReorderBufferTXN *txn,
txn->origin_lsn = origin_lsn;

/*
* YB note: Snapshot is used to read the catalog table entries at the time of
* YB note:
* 1. YB does not support two-phase transactions yet. So we disable the code
* relating to partially streamed transactions.
* 2. Snapshot is used to read the catalog table entries at the time of
* transaction start. This mechanism is not yet applicable to YB. So we
* disable the snapshot related code here.
*/
Expand All @@ -2683,10 +2686,6 @@ ReorderBufferReplay(ReorderBufferTXN *txn,
* Called after everything (origin ID, LSN, ...) is stored in the
* transaction to avoid passing that information directly.
*/
/*
* YB_TODO(stiwary): evaluate whether this code is applicable for
* ysql.
*/
if (rbtxn_is_streamed(txn))
{
ReorderBufferStreamCommit(rb, txn);
Expand Down
10 changes: 4 additions & 6 deletions src/postgres/src/backend/replication/logical/yb_decode.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ YBDecodeInsert(LogicalDecodingContext *ctx, XLogReaderState *record)
change->data.tp.yb_table_oid = yb_record->table_oid;

change->data.tp.clear_toast_afterwards = true;
/* YB_TODO: Do the right thing for the last parameter, toast_insert. */
/* YB does not support on-disk TOASTing, so toast_insert will always be false. */
ReorderBufferQueueChange(ctx->reorder, yb_record->xid,
ctx->reader->ReadRecPtr, change, false);
ctx->reader->ReadRecPtr, change, false /* toast_insert */ );
}

/*
Expand Down Expand Up @@ -351,9 +351,8 @@ YBDecodeUpdate(LogicalDecodingContext *ctx, XLogReaderState *record)
change->data.tp.yb_table_oid = yb_record->table_oid;

change->data.tp.clear_toast_afterwards = true;
/* YB_TODO: Do the right thing for the last parameter, toast_insert. */
ReorderBufferQueueChange(ctx->reorder, yb_record->xid,
ctx->reader->ReadRecPtr, change, false);
ctx->reader->ReadRecPtr, change, false /* toast_insert */ );

RelationClose(relation);
}
Expand Down Expand Up @@ -393,9 +392,8 @@ YBDecodeDelete(LogicalDecodingContext *ctx, XLogReaderState *record)
change->data.tp.yb_table_oid = yb_record->table_oid;

change->data.tp.clear_toast_afterwards = true;
/* YB_TODO: Do the right thing for the last parameter, toast_insert. */
ReorderBufferQueueChange(ctx->reorder, yb_record->xid,
ctx->reader->ReadRecPtr, change, false);
ctx->reader->ReadRecPtr, change, false /* toast_insert */ );
}

/*
Expand Down
2 changes: 0 additions & 2 deletions src/postgres/src/backend/replication/walsender.c
Original file line number Diff line number Diff line change
Expand Up @@ -1505,14 +1505,12 @@ StartLogicalReplication(StartReplicationCmd *cmd)

ReplicationSlotAcquire(cmd->slotname, true);

#ifdef YB_TODO
if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot read from logical replication slot \"%s\"",
cmd->slotname),
errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
#endif

/*
* Force a disconnect, so that the decoding code doesn't need to care
Expand Down

0 comments on commit b2e06d6

Please sign in to comment.