diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index bbc7cdaf50..7d5884789c 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -518,10 +518,9 @@ CreateDecodingContext(XLogRecPtr start_lsn, errmsg("cannot use physical replication slot for logical decoding"))); /* - * Do not allow decoding if the replication slot belongs to a different - * database unless we are in fast-forward mode. In fast-forward mode, we - * ignore storage-level changes and do not need to access the database - * object. + * We need to access the system tables during decoding to build the logical + * changes unless we are in fast-forward mode where no changes are + * generated. */ if (slot->data.database != MyDatabaseId && !fast_forward) ereport(ERROR, @@ -530,9 +529,9 @@ CreateDecodingContext(XLogRecPtr start_lsn, NameStr(slot->data.name)))); /* - * Do not allow consumption of a "synchronized" slot until the standby gets - * promoted unless we are syncing replication slots, in which case we need - * to advance the LSN and xmin of the slot during decoding. + * The slots being synced from the primary can't be used for decoding as + * they are used after failover. However, we do allow advancing the LSNs + * during the synchronization of slots. See update_local_synced_slot. */ if (RecoveryInProgress() && slot->data.synced && !IsSyncingReplicationSlots()) ereport(ERROR, @@ -2054,8 +2053,8 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal) * WAL and removal of old catalog tuples. As decoding is done in fast_forward * mode, no changes are generated anyway. * - * *ready_for_decoding will be set to true if the logical decoding reaches - * the consistent point; Otherwise, it will be set to false. + * *ready_for_decoding will be true if the initial decoding snapshot has + * been built; Otherwise, it will be false. */ XLogRecPtr LogicalSlotAdvanceAndCheckReadynessForDecoding(XLogRecPtr moveto, diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 886a9fcc7e..9f6b83d486 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -26,10 +26,13 @@ * pg_sync_replication_slots() periodically to perform the syncs. * * If synchronized slots fail to build a consistent snapshot from the - * restart_lsn, they would become unreliable after promotion due to potential - * data loss from changes before reaching a consistent point. So, we mark such - * slots as RS_TEMPORARY. Once they successfully reach the consistent point, - * they will be marked to RS_PERSISTENT. + * restart_lsn before reaching confirmed_flush_lsn, they would become + * unreliable after promotion due to potential data loss from changes + * before reaching a consistent point. This can happen because the slots can + * be synced at some random time and we may not reach the consistent point + * at the same WAL location as the primary. So, we mark such slots as + * RS_TEMPORARY. Once the decoding from corresponding LSNs can reach a + * consistent point, they will be marked as RS_PERSISTENT. * * The slot sync worker waits for some time before the next synchronization, * with the duration varying based on whether any slots were updated during @@ -155,9 +158,9 @@ static void slotsync_failure_callback(int code, Datum arg); * If no update was needed (the data of the remote slot is the same as the * local slot) return false, otherwise true. * - * If the LSN of the slot is modified, the ready_for_decoding will be set to - * true if the slot can reach a consistent point; otherwise, it will be set to - * false. + * *ready_for_decoding will be true iff the remote slot's LSN or xmin is + * modified, and decoding from the corresponding LSN's can reach a + * consistent snapshot. */ static bool update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, @@ -168,10 +171,21 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, Assert(slot->data.invalidated == RS_INVAL_NONE); + if (ready_for_decoding) + *ready_for_decoding = false; + if (remote_slot->confirmed_lsn != slot->data.confirmed_flush || remote_slot->restart_lsn != slot->data.restart_lsn || remote_slot->catalog_xmin != slot->data.catalog_xmin) { + /* + * We can't directly copy the remote slot's LSN or xmin unless there + * exists a consistent snapshot at that point. Otherwise, after + * promotion, the slots may not reach a consistent point before the + * confirmed_flush_lsn which can lead to a data loss. To avoid data + * loss, we let slot machinery advance the slot which ensures that + * snapbuilder/slot statuses are updated properly. + */ if (SnapBuildSnapshotExists(remote_slot->restart_lsn)) { /* @@ -191,15 +205,6 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, } else { - /* - * By advancing the restart_lsn, confirmed_lsn, and xmin using - * fast-forward logical decoding, we can verify whether a - * consistent snapshot can be built. This process also involves - * saving necessary snapshots to disk during decoding, ensuring - * that logical decoding efficiently reaches a consistent point at - * the restart_lsn without the potential loss of data during - * snapshot creation. - */ LogicalSlotAdvanceAndCheckReadynessForDecoding(remote_slot->confirmed_lsn, ready_for_decoding); } @@ -489,11 +494,11 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) /* * Don't persist the slot if it cannot reach the consistent point from the - * restart_lsn. + * restart_lsn. See comments atop this file. */ if (!ready_for_decoding) { - elog(DEBUG1, "The synced slot could not find consistent point from %X/%X", + elog(DEBUG1, "could not find consistent point for synced slot; restart_lsn = %X/%X", LSN_FORMAT_ARGS(slot->data.restart_lsn)); return false; }