diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 1d42ca91ea3..ed6ba5b2d7e 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -1267,10 +1267,10 @@ ApplyLauncherMain(Datum main_arg) /* * Compute the minimum xmin required to protect dead tuples * required for conflict detection among all running apply - * workers. This computation is performed under - * LogicalRepWorkerLock to avoid accessing invalid worker - * information in scenarios where a worker may exit and reset - * data concurrently. + * workers. This computation is performed while holding + * LogicalRepWorkerLock to prevent accessing invalid worker + * data, in scenarios where a worker might exit and reset + * its state concurrently. */ if (sub->retaindeadtuples && sub->retentionactive && diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 6d9ff6828a4..722a3678e7f 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4723,12 +4723,12 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) /* * Reaching this point implies should_stop_conflict_info_retention() - * returned false earlier, indicating that the most recent duration for + * returned false earlier, meaning that the most recent duration for * advancing the non-removable transaction ID is within the * max_retention_duration or max_retention_duration is set to 0. * * Therefore, if conflict info retention was previously stopped due to a - * timeout, proceed to resume retention now. + * timeout, it is now safe to resume retention. */ if (!MySubscription->retentionactive) { @@ -4803,8 +4803,9 @@ static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) { /* - * Return if unable to update subretentionactive (see - * update_retention_status). + * If the retention status cannot be updated (e.g., due to active + * transaction), skip further processing to avoid inconsistent retention + * behavior. */ if (!update_retention_status(false)) return; @@ -4828,10 +4829,7 @@ stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) static void resume_conflict_info_retention(RetainDeadTuplesData *rdt_data) { - /* - * Return if unable to update subretentionactive (see - * update_retention_status). - */ + /* We can't resume retention without updating retention status. */ if (!update_retention_status(true)) return; @@ -4844,27 +4842,30 @@ resume_conflict_info_retention(RetainDeadTuplesData *rdt_data) : errdetail("Retention of information used for conflict detection is now indefinite.")); /* - * Restart the worker to allow the launcher to initialize - * oldest_nonremovable_xid value at startup. + * Restart the worker to let the launcher initialize + * oldest_nonremovable_xid at startup. + * + * While it's technically possible to derive this value on-the-fly using + * the conflict detection slot's xmin, doing so risks a race condition: the + * launcher might clean slot.xmin just after retention resumes. This would + * make oldest_nonremovable_xid unreliable, especially during xid + * wraparound. * - * An alternative approach is using the conflict detection slot.xmin to - * initialize the oldest_nonremovable_xid on-the-fly, without restarting - * the worker. However, this could create a race condition where the - * launcher invalidates slot.xmin immediately after the worker resumes - * retention, making oldest_nonremovable_xid unreliable if xid wraparound - * occurs. While implementing a heavy lock to prevent concurrent slot - * updates by the launcher is feasible, given that resuming is an - * infrequent operation, it may not be worthwhile to handle it. + * Although this can be prevented by introducing heavy weight locking, the + * complexity it will bring doesn't seem worthwhile given how rarely + * retention is resumed. */ apply_worker_exit(); } /* - * Update pg_subscription.subretentionactive to the given value within a new - * transaction. + * Updates pg_subscription.subretentionactive to the given value within a + * new transaction. + * + * If already inside an active transaction, skips the update and returns + * false. * - * Returns true upon successful update; however, if currently within an active - * transaction, skip the update and return false. + * Returns true if the update is successfully performed. */ static bool update_retention_status(bool active) @@ -4959,10 +4960,10 @@ adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found) !MySubscription->retentionactive) { /* - * Retention has been stopped, so double the interval, but not beyond - * 3 minutes. The wal_receiver_status_interval is not considered as a - * maximum, since the chance of retention resuming is less than that - * of activity resuming. + * Retention has been stopped, so double the interval-capped at a + * maximum of 3 minutes. The wal_receiver_status_interval is + * intentionally not used as a upper bound, since the likelihood of + * retention resuming is lower than that of general activity resuming. */ rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2, MAX_XID_ADVANCE_INTERVAL); @@ -4977,8 +4978,8 @@ adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found) } /* - * Ensure the wait time remains within the maximum limit when retention is - * active. + * Ensure the wait time remains within the maximum retention time limit + * when retention is active. */ if (MySubscription->retentionactive) rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,