diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 494b8de9ef9..74facd24a61 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -1195,6 +1195,13 @@ ApplyLauncherMain(Datum main_arg) if (!sub->enabled) { + /* + * This is required to ensure that we don't advance the xmin + * of CONFLICT_DETECTION_SLOT even if one of the subscriptions + * is not enabled. Otherwise, we won't be able to detect + * conflicts reliably for such a subscription even though it + * has set the retain_conflict_info option. + */ compute_min_nonremovable_xid(NULL, sub->retainconflictinfo, &xmin, &can_advance_xmin); continue; @@ -1204,6 +1211,10 @@ ApplyLauncherMain(Datum main_arg) w = logicalrep_worker_find(sub->oid, InvalidOid, false); LWLockRelease(LogicalRepWorkerLock); + /* + * Compute the minimum xmin required to protect deleted tuples + * required for conflict detection. + */ compute_min_nonremovable_xid(w, sub->retainconflictinfo, &xmin, &can_advance_xmin); @@ -1243,9 +1254,10 @@ ApplyLauncherMain(Datum main_arg) } /* - * Maintain the xmin value of the replication slot for conflict - * detection if needed. Otherwise, drop the slot if we're no longer - * retaining information useful for conflict detection. + * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription + * that requires us to retain the conflict information. Otherwise, if + * required, advance the slot's xmin to protect deleted tuples + * required for the conflict detection. */ if (!retain_conflict_info) drop_conflict_slot_if_exists(); @@ -1280,11 +1292,13 @@ ApplyLauncherMain(Datum main_arg) } /* - * Compute the minimum non-removable transaction ID from all apply workers for - * subscriptions with retain_conflict_info enabled. Store the result in *xmin. + * Determine the minimum non-removable transaction ID across all apply workers + * for subscriptions that have retain_conflict_info enabled. Store the result + * in *xmin. * - * If the slot cannot be advanced during this cycle, due to either a disabled - * subscription or an inactive worker, *can_advance_xmin is set to false. + * If the replication slot cannot be advanced during this cycle, due to either + * a disabled subscription or an inactive worker, set *can_advance_xmin to + * false. */ static void compute_min_nonremovable_xid(LogicalRepWorker *worker, @@ -1299,8 +1313,8 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId nonremovable_xid; /* - * Assume the replication slot for conflict detection is created - * before the worker starts. + * The replication slot for conflict detection must be created before + * the worker starts. */ Assert(MyReplicationSlot); @@ -1321,21 +1335,23 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker, else { /* - * Create a replication slot to retain information (e.g., dead tuples, - * commit timestamps, and origins) useful for conflict detection if - * any subscription requests it. + * Create a replication slot to retain information necessary for + * conflict detection such as dead tuples, commit timestamps, and + * origins if requested by any subscription. * * The slot is created before starting the apply worker to prevent it - * from unnecessarily maintaining its oldest_nonremovable_xid. It is - * created even for a disabled subscription to ensure information is - * available for detecting conflicts during the application of remote - * changes that occur before the subscription is enabled. + * from unnecessarily maintaining its oldest_nonremovable_xid. + * + * The slot is created even for a disabled subscription to ensure that + * conflict-related information is available when applying remote + * changes that occurred before the subscription was enabled. */ create_conflict_slot_if_not_exists(); /* - * Only collect xmin when all workers for subscriptions with - * retain_conflict_info enabled are running. + * Can't advance xmin of the slot unless all the subscriptions with + * retain_conflict_info are enabled and the corresponding workers are + * running. */ *can_advance_xmin = false; } @@ -1350,19 +1366,19 @@ create_conflict_slot_if_not_exists(void) { TransactionId xmin_horizon; - /* Exit early if the replication slot is already created and acquired */ + /* Exit early, if the replication slot is already created and acquired */ if (MyReplicationSlot) return; - /* If the replication slot exists, acquire it and exit */ + /* If the replication slot exists, acquire it, and exit */ if (SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true)) { ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false); return; } - ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, - RS_PERSISTENT, false, false, false); + ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false, + false, false); LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); @@ -1385,23 +1401,21 @@ create_conflict_slot_if_not_exists(void) } /* - * Attempt to advance the xmin value of the replication slot used to retain - * information useful for conflict detection. + * Advance the xmin the replication slot used to retain information required + * for conflict detection. */ static void advance_conflict_slot_xmin(TransactionId new_xmin) { Assert(MyReplicationSlot); Assert(TransactionIdIsValid(new_xmin)); - Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, - new_xmin)); + Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin)); SpinLockAcquire(&MyReplicationSlot->mutex); MyReplicationSlot->data.xmin = new_xmin; SpinLockRelease(&MyReplicationSlot->mutex); /* first write new xmin to disk, so we know what's up after a crash */ - ReplicationSlotMarkDirty(); ReplicationSlotSave(); elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin); @@ -1428,10 +1442,6 @@ advance_conflict_slot_xmin(TransactionId new_xmin) static void drop_conflict_slot_if_exists(void) { - /* - * Avoid the overhead of scanning shared memory for a replication slot - * that is known to have been dropped. - */ if (conflict_slot_dropped) return;