diff --git a/src/backend/replication/logical/logicalctl.c b/src/backend/replication/logical/logicalctl.c index 72f68ec58ef..a78c8d18be1 100644 --- a/src/backend/replication/logical/logicalctl.c +++ b/src/backend/replication/logical/logicalctl.c @@ -256,33 +256,20 @@ write_logical_decoding_status_update_record(bool status) } /* - * A PG_ENSURE_ERROR_CLEANUP callback for activating logical decoding, resetting - * the shared flags to revert the logical decoding activation process. + * A PG_ENSURE_ERROR_CLEANUP callback for activating logical decoding. + * + * Rather than directly reverting xlog_logical_info here, we request + * that the checkpointer handle it via the normal disable path. This + * avoids race conditions when multiple backends attempt concurrent + * activation: the checkpointer will only reset xlog_logical_info when + * no valid logical slots exist, which naturally protects any other + * in-progress activation. */ static void abort_logical_decoding_activation(int code, Datum arg) { - Assert(MyReplicationSlot); - Assert(!LogicalDecodingCtl->logical_decoding_enabled); - elog(DEBUG1, "aborting logical decoding activation process"); - - /* - * Abort the change to xlog_logical_info. We don't need to check - * CheckLogicalSlotExists() as we're still holding a logical slot. - */ - LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); - LogicalDecodingCtl->xlog_logical_info = false; - LWLockRelease(LogicalDecodingControlLock); - - /* - * Some processes might have already started logical info WAL logging, so - * tell all running processes to update their caches. We don't need to - * wait for all processes to disable xlog_logical_info locally as it's - * always safe to write logical information to WAL records, even when not - * strictly required. - */ - EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO); + RequestDisableLogicalDecoding(); } /* @@ -489,16 +476,19 @@ void DisableLogicalDecoding(void) { bool in_recovery = RecoveryInProgress(); + bool was_enabled; LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); /* * Check if we can disable logical decoding. * - * Skip CheckLogicalSlotExists() check during recovery because the - * existing slots will be invalidated after disabling logical decoding. + * Nothing to do if both flags are already off, or if valid slots exist + * (skip the slot check during recovery because the existing slots will be + * invalidated after disabling logical decoding.) */ - if (!LogicalDecodingCtl->logical_decoding_enabled || + if ((!LogicalDecodingCtl->logical_decoding_enabled && + !LogicalDecodingCtl->xlog_logical_info) || (!in_recovery && CheckLogicalSlotExists())) { LogicalDecodingCtl->pending_disable = false; @@ -506,7 +496,7 @@ DisableLogicalDecoding(void) return; } - START_CRIT_SECTION(); + was_enabled = LogicalDecodingCtl->logical_decoding_enabled; /* * We need to disable logical decoding first and then disable logical @@ -515,22 +505,27 @@ DisableLogicalDecoding(void) */ LogicalDecodingCtl->logical_decoding_enabled = false; - /* Write the WAL to disable logical decoding on standbys too */ - if (!in_recovery) - write_logical_decoding_status_update_record(false); + if (was_enabled) + { + START_CRIT_SECTION(); + + /* Write the WAL to disable logical decoding on standbys too */ + if (!in_recovery) + write_logical_decoding_status_update_record(false); + + END_CRIT_SECTION(); + } /* Now disable logical information WAL logging */ LogicalDecodingCtl->xlog_logical_info = false; LogicalDecodingCtl->pending_disable = false; - END_CRIT_SECTION(); + LWLockRelease(LogicalDecodingControlLock); - if (!in_recovery) + if (!in_recovery && was_enabled) ereport(LOG, errmsg("logical decoding is disabled because there are no valid logical replication slots")); - LWLockRelease(LogicalDecodingControlLock); - /* * Tell all running processes to reflect the xlog_logical_info update. * Unlike when enabling logical decoding, we don't need to wait for all diff --git a/src/test/recovery/t/051_effective_wal_level.pl b/src/test/recovery/t/051_effective_wal_level.pl index c862073c34e..416acac9fac 100644 --- a/src/test/recovery/t/051_effective_wal_level.pl +++ b/src/test/recovery/t/051_effective_wal_level.pl @@ -410,8 +410,46 @@ select pg_cancel_backend(pid) from pg_stat_activity where query ~ 'slot_canceled # Verify that the backend aborted the activation process. $primary->wait_for_log("aborting logical decoding activation process"); - test_wal_level($primary, "replica|replica", - "the activation process aborted"); + wait_for_logical_decoding_disabled($primary); + pass("the activation process aborted"); + + # Test concurrent activation processes run and one is interrupted. + $psql_create_slot = $primary->background_psql('postgres'); + + # Start a psql session and stops in the middle of the activation + # process. + $psql_create_slot->query_until( + qr/create_slot_canceled/, + q(\echo create_slot_canceled +select injection_points_set_local(); +select injection_points_attach('logical-decoding-activation', 'wait'); +select pg_create_logical_replication_slot('slot_canceled2', 'pgoutput'); +\q +)); + $primary->wait_for_event('client backend', 'logical-decoding-activation'); + note("injection_point 'logical-decoding-activation' is reached"); + + # Another backend concurrently enables the logical decoding. + $primary->safe_psql('postgres', + qq[select pg_create_logical_replication_slot('test_slot2', 'pgoutput')]); + + # Concurrent slot creation should not be blocked. So wait until + # test_slot2 is created and logical decoding is enabled. + $primary->wait_for_log("logical decoding is enabled upon creating a new logical replication slot"); + test_wal_level($primary, "replica|logical", + "the concurrent activation has done properly"); + + # Cancel the backend initiated by $psql_create_slot, aborting its activation + # process. + $primary->safe_psql( + 'postgres', + qq[ +select pg_cancel_backend(pid) from pg_stat_activity where query ~ 'slot_canceled2' and pid <> pg_backend_pid() +]); + # Canceling the backend should not affect the concurrent slot creation. + $primary->wait_for_log("canceling statement due to user request"); + test_wal_level($primary, "replica|logical", + "the concurrent activation interrupt is handled properly"); } $primary->stop;