From 767e11c048606508fd0ef251f4f2f3783c0cd677 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Mon, 29 Sep 2025 17:40:18 -0700 Subject: [PATCH v1501 2/6] POC change. --- src/backend/access/transam/xlog.c | 28 ++ src/backend/postmaster/checkpointer.c | 6 - src/backend/replication/logical/logicalctl.c | 263 +++--------------- src/backend/replication/slot.c | 8 +- src/include/access/xlog.h | 2 + src/include/replication/logicalctl.h | 2 +- .../recovery/t/049_effective_wal_level.pl | 25 +- 7 files changed, 76 insertions(+), 258 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index ec43557152a..1729302a8ed 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6224,6 +6224,10 @@ StartupXLOG(void) /* * Update logical decoding status in shared memory and write an * XLOG_LOGICAL_DECODING_STATUS_CHANGE, if necessary. + * + * Note that processes are allowed to change the logical decoding status + * and write the XLOG_LOGICAL_DECODING_STATUS_CHANGE records from this + * point. */ UpdateLogicalDecodingStatusEndOfRecovery(); @@ -7509,6 +7513,30 @@ CreateEndOfRecoveryRecord(void) END_CRIT_SECTION(); } +/* + * Writes a XLOG_LOGICAL_DECODING_STATUS_CHANGE record with the given + * status. + * + * Note that this function temporary acquires an exception of writing + * the WAL record even during recovery to deal with the race condition + * (see comments in start_logical_decoding_status_change() for details). + */ +void +CreateLogicalDecodingStatusChangeRecord(bool status) +{ + int oldXLogAllowed; + XLogRecPtr recptr; + + oldXLogAllowed = LocalSetXLogInsertAllowed(); + + XLogBeginInsert(); + XLogRegisterData(&status, sizeof(bool)); + recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE); + XLogFlush(recptr); + + LocalXLogInsertAllowed = oldXLogAllowed; +} + /* * Write an OVERWRITE_CONTRECORD message. * diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c index 4edce058f9d..e188a02e2db 100644 --- a/src/backend/postmaster/checkpointer.c +++ b/src/backend/postmaster/checkpointer.c @@ -378,12 +378,6 @@ CheckpointerMain(const void *startup_data, size_t startup_data_len) */ AbsorbSyncRequests(); - /* - * Disable logical decoding if someone requested to disable logical - * decoding. See comments atop logicalctl.c. - */ - DisableLogicalDecodingIfNecessary(true); - ProcessCheckpointerInterrupts(); if (ShutdownXLOGPending || ShutdownRequestPending) break; diff --git a/src/backend/replication/logical/logicalctl.c b/src/backend/replication/logical/logicalctl.c index 9113b1cf13f..ebd34234231 100644 --- a/src/backend/replication/logical/logicalctl.c +++ b/src/backend/replication/logical/logicalctl.c @@ -21,15 +21,6 @@ * decoding. Deactivation follows a similar careful, multi-step process * in reverse order. * - * Activation and deactivation typically occur synchronously after the first - * logical slot is created and after the last logical slot is dropped, - * respectively. However, if deactivation is required when a process is exiting, - * it is delegated to the checkpointer and performed lazily. This is because - * deactivation involves waits and writing a WAL record, which could be - * problematic during process exit when the process is holding interrupts. - * This situation might arise when a process cleans up temporary or ephemeral - * slots on error or at process exit without releasing them explicitly. - * * Standby servers inherit the logical decoding and logical WAL writing status * from the primary server. Unlike normal activation and deactivation, these * are updated simultaneously without status change coordination, solely by @@ -91,22 +82,11 @@ typedef struct LogicalDecodingCtlData bool status_change_inprogress; /* - * Logical decoding is normally disabled after dropping the last logical - * slot, but if it happens during process exit time for example when - * releasing a temporary logical slot on an error, the process sets this - * flag to true, delegating the checkpointer to disable logical decoding - * asynchronously. - */ - bool pending_disable; - - /* - * This flag is set to true by the startup process during recovery, to - * delay any logical decoding status change attempts until the recovery - * actually completes. The flag is set to true only during the recovery by - * the startup process. See comments in - * start_logical_decoding_status_change() for details. + * True if the status change is allowed even during recovery. This flag + * starts with false and is set to true at end of the recovery by the + * startup process. */ - bool delay_status_change; + bool allow_status_change; /* Condition variable signaled when a status change completes */ ConditionVariable cv; @@ -124,8 +104,6 @@ bool XLogLogicalInfo = false; static void update_xlog_logical_info(void); static void abort_logical_decoding_activation(int code, Datum arg); static bool start_logical_decoding_status_change(bool new_status); -static bool check_wait_for_recovery_completion(void); -static void wait_for_recovery_completion(void); Size LogicalDecodingCtlShmemSize(void) @@ -147,8 +125,7 @@ LogicalDecodingCtlShmemInit(void) LogicalDecodingCtl->xlog_logical_info = false; LogicalDecodingCtl->logical_decoding_enabled = false; LogicalDecodingCtl->status_change_inprogress = false; - LogicalDecodingCtl->pending_disable = false; - LogicalDecodingCtl->delay_status_change = false; + LogicalDecodingCtl->allow_status_change = false; ConditionVariableInit(&LogicalDecodingCtl->cv); } } @@ -300,13 +277,31 @@ abort_logical_decoding_activation(int code, Datum arg) * the process of logical decoding status change. If the status change is * required, it ensures we can change logical decoding status, setting * LogicalDecodingCtl->status_change_inprogress on, and returns true. - * Otherwise, if it's not required or not allowed (e.g., during recovery - * or wal_level = 'logical'), it returns false. + * Otherwise, if it's not required returns false. */ static bool start_logical_decoding_status_change(bool new_status) { - Assert(!RecoveryInProgress()); + if (RecoveryInProgress()) + { + bool allow_status_change; + + /* + * During recovery, there is a window between the startup updates the + * logical decoding status at the end of recovery and it actually + * completes the recovery (i.e., until RecoveryInProgress() returns + * false), where no one is basically allowed to insert WAL records but + * logical decoding change could happen. Therefore, we need to check + * if we're in this window, and if so we can proceed with the status + * change. + */ + LWLockAcquire(LogicalDecodingControlLock, LW_SHARED); + allow_status_change = LogicalDecodingCtl->allow_status_change; + LWLockRelease(LogicalDecodingControlLock); + + if (!allow_status_change) + return false; + } retry: LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); @@ -317,7 +312,6 @@ retry: */ if (!new_status && CheckLogicalSlotExists()) { - LogicalDecodingCtl->pending_disable = false; LWLockRelease(LogicalDecodingControlLock); return false; } @@ -336,11 +330,11 @@ retry: goto retry; } - /* Return if we don't need to change the status */ if (LogicalDecodingCtl->logical_decoding_enabled == new_status) { - LogicalDecodingCtl->pending_disable = false; LWLockRelease(LogicalDecodingControlLock); + + /* no need to change the status */ return false; } @@ -352,56 +346,6 @@ retry: return true; } -/* - * Returns true if the caller needs to wait for the recovery to complete - * before proceeding with the status change process. - */ -static bool -check_wait_for_recovery_completion(void) -{ - bool delay_status_change; - - /* - * During recovery, there is a race condition with the startup process's - * end-of-recovery action; after the startup process updates logical - * decoding status at the end of recovery, it's possible that other - * processes try to enable or disable logical decoding status before the - * recovery completes but are unable to write WAL records. Therefore, if - * the startup process has done its end-of-recovery work, we need to wait - * for the recovery to actually finish. - */ - LWLockAcquire(LogicalDecodingControlLock, LW_SHARED); - delay_status_change = LogicalDecodingCtl->delay_status_change; - LWLockRelease(LogicalDecodingControlLock); - - return delay_status_change; -} - -/* - * Wait for the recovery to complete, i.e., until RecoveryInProgress() - * return false. - */ -static void -wait_for_recovery_completion(void) -{ - elog(DEBUG1, - "waiting for recovery completion to change logical decoding status"); - - /* - * The startup process already updated logical decoding status at the end - * of recovery but it might not be allowed to write WAL records yet. Wait - * for the recovery to complete and check the status again. - */ - while (RecoveryInProgress()) - { - CHECK_FOR_INTERRUPTS(); - - pgstat_report_wait_start(WAIT_EVENT_LOGICAL_DECODING_STATUS_CHANGE_DELAY); - pg_usleep(100000L); /* wait for 100 msec */ - pgstat_report_wait_end(); - } -} - /* * Enable logical decoding if disabled. * @@ -418,19 +362,6 @@ EnsureLogicalDecodingEnabled(void) if (wal_level != WAL_LEVEL_REPLICA) return; - if (RecoveryInProgress()) - { - /* - * Check if we need to wait for the recovery completion. See the - * comments in check_wait_for_recovery_completion() for the reason why - * we check it here. - */ - if (!check_wait_for_recovery_completion()) - return; - - wait_for_recovery_completion(); - } - /* Prepare and start the activation process if it's disabled */ if (!start_logical_decoding_status_change(true)) return; @@ -480,18 +411,9 @@ EnsureLogicalDecodingEnabled(void) LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); LogicalDecodingCtl->logical_decoding_enabled = true; LogicalDecodingCtl->status_change_inprogress = false; - LogicalDecodingCtl->pending_disable = false; LWLockRelease(LogicalDecodingControlLock); - { - XLogRecPtr recptr; - bool logical_decoding = true; - - XLogBeginInsert(); - XLogRegisterData(&logical_decoding, sizeof(bool)); - recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE); - XLogFlush(recptr); - } + CreateLogicalDecodingStatusChangeRecord(true); END_CRIT_SECTION(); @@ -506,100 +428,16 @@ EnsureLogicalDecodingEnabled(void) * Disable logical decoding if necessary. * * This function expects to be called after dropping a possibly-last logical - * replication slot, or to complete the pending logical decoding deactivation. - * It disable logical decoding only if it was the last remaining logical slot - * and wal_level = 'replica'. Otherwise, it performs no action. - - * If complete_pending is true, it disables logical decoding only if there - * is a pending disable request. + * replication slot. It disable logical decoding only if it was the last + * remaining logical slot and wal_level = 'replica'. Otherwise, it performs + * no action. */ void -DisableLogicalDecodingIfNecessary(bool complete_pending) +DisableLogicalDecodingIfNecessary(void) { - bool need_wait = false; - - /* - * Both complete_pending and proc_exit_inprogress must not be true at the - * same time. - */ - Assert(!(complete_pending && proc_exit_inprogress)); - if (wal_level != WAL_LEVEL_REPLICA) return; - /* Check if there is a pending disable request */ - if (complete_pending) - { - bool pending_disable; - - LWLockAcquire(LogicalDecodingControlLock, LW_SHARED); - pending_disable = LogicalDecodingCtl->pending_disable; - LWLockRelease(LogicalDecodingControlLock); - - if (!pending_disable) - return; - - elog(DEBUG1, - "start completing pending logical decoding disable request"); - } - - if (RecoveryInProgress()) - { - /* - * Check if we need to wait for the recovery completion. See the - * comments in check_wait_for_recovery_completion() for the reason why - * we check it here. - */ - need_wait = check_wait_for_recovery_completion(); - - if (!need_wait) - return; - - /* - * We might be in the process of process exit, in which case we need - * to delegate to the checkpointer the deactivation. We will wait for - * the recovery completion after checking that. - */ - } - - /* - * If this function is called at process exit time for example when - * cleaning up temporary logical slots, we skip to disabling logical - * decoding and delegate to the checkpointer to do that since it could - * involve waits and writing a WAL record even though the process is - * holding any interrupts. - */ - if (proc_exit_inprogress) - { - volatile PROC_HDR *procglobal = ProcGlobal; - ProcNumber checkpointerProc = procglobal->checkpointerProc; - - /* - * It's possible that we might not actually need to disable logical - * decoding if someone creates a new logical slot concurrently. We set - * the flag anyway and the checkpointer will check it and disable - * logical decoding if necessary. - */ - LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); - LogicalDecodingCtl->pending_disable = true; - LWLockRelease(LogicalDecodingControlLock); - - if (checkpointerProc != INVALID_PROC_NUMBER) - SetLatch(&GetPGProcByNumber(checkpointerProc)->procLatch); - - ereport(LOG, - (errmsg("skip disabling logical decoding during process exit"))); - - return; - } - - /* - * Wait for the recovery to complete before entering the deactivation - * process - */ - if (need_wait) - wait_for_recovery_completion(); - /* Prepare and start the deactivation process if it's enabled */ if (!start_logical_decoding_status_change(false)) return; @@ -623,16 +461,7 @@ DisableLogicalDecodingIfNecessary(bool complete_pending) LWLockRelease(LogicalDecodingControlLock); /* Write the WAL to disable logical decoding on standbys too */ - if (XLogStandbyInfoActive()) - { - bool logical_decoding = false; - XLogRecPtr recptr; - - XLogBeginInsert(); - XLogRegisterData(&logical_decoding, sizeof(bool)); - recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE); - XLogFlush(recptr); - } + CreateLogicalDecodingStatusChangeRecord(false); /* Now disable logical information WAL logging */ LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); @@ -652,7 +481,6 @@ DisableLogicalDecodingIfNecessary(bool complete_pending) /* Complete the transition */ LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); LogicalDecodingCtl->status_change_inprogress = false; - LogicalDecodingCtl->pending_disable = false; LWLockRelease(LogicalDecodingControlLock); END_CRIT_SECTION(); @@ -700,29 +528,16 @@ UpdateLogicalDecodingStatusEndOfRecovery(void) UpdateLogicalDecodingStatus(new_status, false); /* - * We disallow any logical decoding status change until we actually - * completes the recovery, i.e., RecoveryInProgress() returns false. This - * is necessary to deal with the race condition that could happen after - * this point; processes are able to create or drop logical replication - * slots and tries to enable or disable logical decoding accordingly, but - * they are not allowed to write any WAL records until the recovery - * completes. + * Now that we completes the status change on the shmem, we allow + * processes to write XLOG_LOGICAL_DECODING_STATUS_CHANGE records prior to + * completing all end-of-recovery actions. */ - LogicalDecodingCtl->delay_status_change = true; + LogicalDecodingCtl->allow_status_change = true; LWLockRelease(LogicalDecodingControlLock); if (need_wal) - { - XLogRecPtr recptr; - - Assert(XLogStandbyInfoActive()); - - XLogBeginInsert(); - XLogRegisterData(&new_status, sizeof(bool)); - recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE); - XLogFlush(recptr); - } + CreateLogicalDecodingStatusChangeRecord(new_status); /* * Ensure all running processes have the updated status. We don't need to diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 66de1a30cec..7978e5894c3 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -753,7 +753,7 @@ ReplicationSlotRelease(void) ReplicationSlotDropAcquired(); if (is_logical) - DisableLogicalDecodingIfNecessary(false); + DisableLogicalDecodingIfNecessary(); } /* @@ -864,7 +864,7 @@ restart: LWLockRelease(ReplicationSlotControlLock); if (dropped_logical && nlogicalslots == 0) - DisableLogicalDecodingIfNecessary(false); + DisableLogicalDecodingIfNecessary(); } /* @@ -894,7 +894,7 @@ ReplicationSlotDrop(const char *name, bool nowait) ReplicationSlotDropAcquired(); if (is_logical) - DisableLogicalDecodingIfNecessary(false); + DisableLogicalDecodingIfNecessary(); } /* @@ -1519,7 +1519,7 @@ restart: LWLockRelease(ReplicationSlotControlLock); if (dropped && nlogicalslots == 0) - DisableLogicalDecodingIfNecessary(false); + DisableLogicalDecodingIfNecessary(); } /* diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index e90438a4111..02aefeddc41 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -268,6 +268,8 @@ extern XLogRecPtr GetLastImportantRecPtr(void); extern void SetWalWriterSleeping(bool sleeping); +extern void CreateLogicalDecodingStatusChangeRecord(bool status); + extern Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, TimeLineID tli); diff --git a/src/include/replication/logicalctl.h b/src/include/replication/logicalctl.h index 907bd562c7d..85ec7217805 100644 --- a/src/include/replication/logicalctl.h +++ b/src/include/replication/logicalctl.h @@ -22,7 +22,7 @@ extern bool ProcessBarrierUpdateXLogLogicalInfo(void); extern bool IsLogicalDecodingEnabled(void); extern bool IsXLogLogicalInfoEnabled(void); extern void EnsureLogicalDecodingEnabled(void); -extern void DisableLogicalDecodingIfNecessary(bool complete_pending); +extern void DisableLogicalDecodingIfNecessary(void); extern void UpdateLogicalDecodingStatus(bool new_status, bool need_lock); extern void UpdateLogicalDecodingStatusEndOfRecovery(void); diff --git a/src/test/recovery/t/049_effective_wal_level.pl b/src/test/recovery/t/049_effective_wal_level.pl index 64c748b7c18..83051f9cbf7 100644 --- a/src/test/recovery/t/049_effective_wal_level.pl +++ b/src/test/recovery/t/049_effective_wal_level.pl @@ -43,20 +43,10 @@ $primary->safe_psql('postgres', qq[select pg_drop_replication_slot('test_phy_slot')]); # Create a temporarly logical slot and exits without releasing it explicitely. -# This enables logical decoding but skips disabling it and delegetes to the -# checkpointer. $primary->safe_psql('postgres', qq[select pg_create_logical_replication_slot('test_tmp_slot', 'test_decoding', true)] ); -# Check if the process skipped to disable logical decoding. -$primary->wait_for_log( - "skip disabling logical decoding during process exit"); - -# Wait for the checkpointer to disable logical decoding. -$primary->poll_query_until('postgres', - qq[select current_setting('effective_wal_level') = 'replica';]); - # Create a new logical slot and check if effective_wal_level must be increased # to 'logical'. $primary->safe_psql('postgres', @@ -327,20 +317,12 @@ if ( $ENV{enable_injection_points} eq 'yes' "injection_point 'startup-logical-decoding-status-change-end-of-recovery' is reached" ); - # Drop the logical slot in background. We can drop the logical replication slot - # but have to wait for the recovery to complete before disabling logical decoding. - my $psql = $standby5->background_psql('postgres'); - $psql->query_until( - qr/drop_slot/, - q(\echo drop_slot -select pg_drop_replication_slot('standby5_slot'); -\q -)); + $standby5->safe_psql('postgres', + qq[select pg_drop_replication_slot('standby5_slot');]); $standby5->safe_psql('postgres', qq[select injection_points_wakeup('startup-logical-decoding-status-change-end-of-recovery')] ); - $psql->quit; # Check if logical decoding got disabled after the recovery. test_wal_level($standby5, "replica|replica", @@ -391,9 +373,6 @@ select pg_create_logical_replication_slot('test_slot', 'pgoutput'); select pg_cancel_backend(pid) from pg_stat_activity where query ~ 'slot_cancelled' and pid <> pg_backend_pid() ]); - # Check if the backend aborted the activation process. - $primary->wait_for_log("aborting logical decoding activation process"); - # Wait for the logical slot 'test_slot' to be created. $primary->poll_query_until('postgres', qq[select exists (select 1 from pg_replication_slots where slot_name = 'test_slot')] -- 2.47.3