| From: | Andres Freund <andres(at)anarazel(dot)de> | 
|---|---|
| To: | "Drouvot, Bertrand" <bertranddrouvot(dot)pg(at)gmail(dot)com> | 
| Cc: | Jeff Davis <pgsql(at)j-davis(dot)com>, Robert Haas <robertmhaas(at)gmail(dot)com>, Thomas Munro <thomas(dot)munro(at)gmail(dot)com>, Alvaro Herrera <alvherre(at)2ndquadrant(dot)com>, Ibrar Ahmed <ibrar(dot)ahmad(at)gmail(dot)com>, Amit Khandekar <amitdkhan(dot)pg(at)gmail(dot)com>, fabriziomello(at)gmail(dot)com, tushar <tushar(dot)ahuja(at)enterprisedb(dot)com>, Rahila Syed <rahila(dot)syed(at)2ndquadrant(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org> | 
| Subject: | Re: Minimal logical decoding on standbys | 
| Date: | 2023-03-30 07:04:31 | 
| Message-ID: | 20230330070431.tojeodaeo4zmpofu@awork3.anarazel.de | 
| Views: | Whole Thread | Raw Message | Download mbox | Resend email | 
| Thread: | |
| Lists: | pgsql-hackers | 
Hi,
On 2023-03-04 12:19:57 +0100, Drouvot, Bertrand wrote:
> Subject: [PATCH v52 1/6] Add info in WAL records in preparation for logical
>  slot conflict handling.
> 
> Overall design:
> 
> 1. We want to enable logical decoding on standbys, but replay of WAL
> from the primary might remove data that is needed by logical decoding,
> causing error(s) on the standby. To prevent those errors, a new replication
> conflict scenario needs to be addressed (as much as hot standby does).
> 
> 2. Our chosen strategy for dealing with this type of replication slot
> is to invalidate logical slots for which needed data has been removed.
> 
> 3. To do this we need the latestRemovedXid for each change, just as we
> do for physical replication conflicts, but we also need to know
> whether any particular change was to data that logical replication
> might access. That way, during WAL replay, we know when there is a risk of
> conflict and, if so, if there is a conflict.
> 
> 4. We can't rely on the standby's relcache entries for this purpose in
> any way, because the startup process can't access catalog contents.
> 
> 5. Therefore every WAL record that potentially removes data from the
> index or heap must carry a flag indicating whether or not it is one
> that might be accessed during logical decoding.
> 
> Why do we need this for logical decoding on standby?
> 
> First, let's forget about logical decoding on standby and recall that
> on a primary database, any catalog rows that may be needed by a logical
> decoding replication slot are not removed.
> 
> This is done thanks to the catalog_xmin associated with the logical
> replication slot.
> 
> But, with logical decoding on standby, in the following cases:
> 
> - hot_standby_feedback is off
> - hot_standby_feedback is on but there is no a physical slot between
>   the primary and the standby. Then, hot_standby_feedback will work,
>   but only while the connection is alive (for example a node restart
>   would break it)
> 
> Then, the primary may delete system catalog rows that could be needed
> by the logical decoding on the standby (as it does not know about the
> catalog_xmin on the standby).
> 
> So, it’s mandatory to identify those rows and invalidate the slots
> that may need them if any. Identifying those rows is the purpose of
> this commit.
This is a very nice commit message.
> Implementation:
> 
> When a WAL replay on standby indicates that a catalog table tuple is
> to be deleted by an xid that is greater than a logical slot's
> catalog_xmin, then that means the slot's catalog_xmin conflicts with
> the xid, and we need to handle the conflict. While subsequent commits
> will do the actual conflict handling, this commit adds a new field
> isCatalogRel in such WAL records (and a new bit set in the
> xl_heap_visible flags field), that is true for catalog tables, so as to
> arrange for conflict handling.
> 
> The affected WAL records are the ones that already contain the
> snapshotConflictHorizon field, namely:
> 
> - gistxlogDelete
> - gistxlogPageReuse
> - xl_hash_vacuum_one_page
> - xl_heap_prune
> - xl_heap_freeze_page
> - xl_heap_visible
> - xl_btree_reuse_page
> - xl_btree_delete
> - spgxlogVacuumRedirect
> 
> Due to this new field being added, xl_hash_vacuum_one_page and
> gistxlogDelete do now contain the offsets to be deleted as a
> FLEXIBLE_ARRAY_MEMBER. This is needed to ensure correct alignement.
> It's not needed on the others struct where isCatalogRel has
> been added.
> 
> Author: Andres Freund (in an older version), Amit Khandekar, Bertrand
> Drouvot
I think you're first author on this one by now.
I think this commit is ready to go. Unless somebody thinks differently, I
think I might push it tomorrow.
> Subject: [PATCH v52 2/6] Handle logical slot conflicts on standby.
> @@ -6807,7 +6808,8 @@ CreateCheckPoint(int flags)
>  	 */
>  	XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
>  	KeepLogSeg(recptr, &_logSegNo);
> -	if (InvalidateObsoleteReplicationSlots(_logSegNo))
> +	InvalidateObsoleteReplicationSlots(_logSegNo, &invalidated, InvalidOid, NULL);
> +	if (invalidated)
>  	{
>  		/*
>  		 * Some slots have been invalidated; recalculate the old-segment
I don't really understand why you changed InvalidateObsoleteReplicationSlots
to return void instead of bool, and then added an output boolean argument via
a pointer?
> @@ -7964,6 +7968,22 @@ xlog_redo(XLogReaderState *record)
>  		/* Update our copy of the parameters in pg_control */
>  		memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
>  
> +		/*
> +		 * Invalidate logical slots if we are in hot standby and the primary does not
> +		 * have a WAL level sufficient for logical decoding. No need to search
> +		 * for potentially conflicting logically slots if standby is running
> +		 * with wal_level lower than logical, because in that case, we would
> +		 * have either disallowed creation of logical slots or invalidated existing
> +		 * ones.
> +		 */
> +		if (InRecovery && InHotStandby &&
> +			xlrec.wal_level < WAL_LEVEL_LOGICAL &&
> +			wal_level >= WAL_LEVEL_LOGICAL)
> +		{
> +			TransactionId ConflictHorizon = InvalidTransactionId;
> +			InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, NULL, InvalidOid, &ConflictHorizon);
> +		}
> +
Are there races around changing wal_level?
> @@ -855,8 +855,10 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
>  		SpinLockAcquire(&s->mutex);
>  		effective_xmin = s->effective_xmin;
>  		effective_catalog_xmin = s->effective_catalog_xmin;
> -		invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
> -					   XLogRecPtrIsInvalid(s->data.restart_lsn));
> +		invalidated = ((!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
> +						XLogRecPtrIsInvalid(s->data.restart_lsn))
> +					   || (!TransactionIdIsValid(s->data.xmin) &&
> +						   !TransactionIdIsValid(s->data.catalog_xmin)));
>  		SpinLockRelease(&s->mutex);
>  
>  		/* invalidated slots need not apply */
I still would like a wrapper function to determine whether a slot has been
invalidated. This This is too complicated to be repeated in other places.
> @@ -1224,20 +1226,21 @@ ReplicationSlotReserveWal(void)
>  }
>  
>  /*
> - * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
> - * and mark it invalid, if necessary and possible.
> + * Helper for InvalidateObsoleteReplicationSlots
> + *
> + * Acquires the given slot and mark it invalid, if necessary and possible.
>   *
>   * Returns whether ReplicationSlotControlLock was released in the interim (and
>   * in that case we're not holding the lock at return, otherwise we are).
>   *
> - * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
> + * Sets *invalidated true if an obsolete slot was invalidated. (Untouched otherwise.)
>   *
>   * This is inherently racy, because we release the LWLock
>   * for syscalls, so caller must restart if we return true.
>   */
>  static bool
> -InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
> -							   bool *invalidated)
> +InvalidatePossiblyObsoleteOrConflictingLogicalSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
> +												   bool *invalidated, TransactionId *xid)
This is too long a name. I'd probably just leave it at the old name.
> @@ -1261,18 +1267,33 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>  		 * Check if the slot needs to be invalidated. If it needs to be
>  		 * invalidated, and is not currently acquired, acquire it and mark it
>  		 * as having been invalidated.  We do this with the spinlock held to
> -		 * avoid race conditions -- for example the restart_lsn could move
> -		 * forward, or the slot could be dropped.
> +		 * avoid race conditions -- for example the restart_lsn (or the
> +		 * xmin(s) could) move forward or the slot could be dropped.
>  		 */
>  		SpinLockAcquire(&s->mutex);
>  
>  		restart_lsn = s->data.restart_lsn;
> +		slot_xmin = s->data.xmin;
> +		slot_catalog_xmin = s->data.catalog_xmin;
> +
> +		/* slot has been invalidated (logical decoding conflict case) */
> +		if ((xid &&
> +			 ((LogicalReplicationSlotIsInvalid(s))
> +			  ||
>  
Uh, huh?
That's very odd formatting.
>  		/*
> -		 * If the slot is already invalid or is fresh enough, we don't need to
> -		 * do anything.
> +		 * We are not forcing for invalidation because the xid is valid and
> +		 * this is a non conflicting slot.
>  		 */
> -		if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
> +			  (TransactionIdIsValid(*xid) && !(
> +											   (TransactionIdIsValid(slot_xmin) && TransactionIdPrecedesOrEquals(slot_xmin, *xid))
> +											   ||
> +											   (TransactionIdIsValid(slot_catalog_xmin) && TransactionIdPrecedesOrEquals(slot_catalog_xmin, *xid))
> +											   ))
> +			  ))
> +			||
> +		/* slot has been invalidated (obsolete LSN case) */
> +			(!xid && (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)))
>  		{
>  			SpinLockRelease(&s->mutex);
>  			if (released_lock)
This needs some cleanup.
> @@ -1292,9 +1313,16 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>  		{
>  			MyReplicationSlot = s;
>  			s->active_pid = MyProcPid;
> -			s->data.invalidated_at = restart_lsn;
> -			s->data.restart_lsn = InvalidXLogRecPtr;
> -
> +			if (xid)
> +			{
> +				s->data.xmin = InvalidTransactionId;
> +				s->data.catalog_xmin = InvalidTransactionId;
> +			}
> +			else
> +			{
> +				s->data.invalidated_at = restart_lsn;
> +				s->data.restart_lsn = InvalidXLogRecPtr;
> +			}
>  			/* Let caller know */
>  			*invalidated = true;
>  		}
> @@ -1327,15 +1355,39 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>  			 */
>  			if (last_signaled_pid != active_pid)
>  			{
> -				ereport(LOG,
> -						errmsg("terminating process %d to release replication slot \"%s\"",
> -							   active_pid, NameStr(slotname)),
> -						errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
> -								  LSN_FORMAT_ARGS(restart_lsn),
> -								  (unsigned long long) (oldestLSN - restart_lsn)),
> -						errhint("You might need to increase max_slot_wal_keep_size."));
> +				if (xid)
> +				{
> +					if (TransactionIdIsValid(*xid))
> +					{
> +						ereport(LOG,
> +								errmsg("terminating process %d because replication slot \"%s\" conflicts with recovery",
> +									   active_pid, NameStr(slotname)),
> +								errdetail("The slot conflicted with xid horizon %u.",
> +										  *xid));
> +					}
> +					else
> +					{
> +						ereport(LOG,
> +								errmsg("terminating process %d because replication slot \"%s\" conflicts with recovery",
> +									   active_pid, NameStr(slotname)),
> +								errdetail("Logical decoding on standby requires wal_level to be at least logical on the primary server"));
> +					}
> +
> +					(void) SendProcSignal(active_pid, PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, InvalidBackendId);
> +				}
> +				else
> +				{
> +					ereport(LOG,
> +							errmsg("terminating process %d to release replication slot \"%s\"",
> +								   active_pid, NameStr(slotname)),
> +							errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
> +									  LSN_FORMAT_ARGS(restart_lsn),
> +									  (unsigned long long) (oldestLSN - restart_lsn)),
> +							errhint("You might need to increase max_slot_wal_keep_size."));
> +
> +					(void) kill(active_pid, SIGTERM);
I think it ought be possible to deduplicate this a fair bit. For one, two of
the errmsg()s above are identical.  But I think this could be consolidated
further, e.g. by using the same message style for the three cases, and passing
in a separately translated reason for the termination?
> +				}
>  
> -				(void) kill(active_pid, SIGTERM);
>  				last_signaled_pid = active_pid;
>  			}
>  
> @@ -1369,13 +1421,33 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>  			ReplicationSlotSave();
>  			ReplicationSlotRelease();
>  
> -			ereport(LOG,
> -					errmsg("invalidating obsolete replication slot \"%s\"",
> -						   NameStr(slotname)),
> -					errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
> -							  LSN_FORMAT_ARGS(restart_lsn),
> -							  (unsigned long long) (oldestLSN - restart_lsn)),
> -					errhint("You might need to increase max_slot_wal_keep_size."));
> +			if (xid)
> +			{
> +				pgstat_drop_replslot(s);
Why is this done here now?
> +				if (TransactionIdIsValid(*xid))
> +				{
> +					ereport(LOG,
> +							errmsg("invalidating slot \"%s\" because it conflicts with recovery", NameStr(slotname)),
> +							errdetail("The slot conflicted with xid horizon %u.", *xid));
> +				}
> +				else
> +				{
> +					ereport(LOG,
> +							errmsg("invalidating slot \"%s\" because it conflicts with recovery", NameStr(slotname)),
> +							errdetail("Logical decoding on standby requires wal_level to be at least logical on the primary server"));
> +				}
> +			}
> +			else
> +			{
> +				ereport(LOG,
> +						errmsg("invalidating obsolete replication slot \"%s\"",
> +							   NameStr(slotname)),
> +						errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
> +								  LSN_FORMAT_ARGS(restart_lsn),
> +								  (unsigned long long) (oldestLSN - restart_lsn)),
> +						errhint("You might need to increase max_slot_wal_keep_size."));
> +			}
>
I don't like all these repeated elogs...
> @@ -3057,6 +3060,27 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
>  			case PROCSIG_RECOVERY_CONFLICT_LOCK:
>  			case PROCSIG_RECOVERY_CONFLICT_TABLESPACE:
>  			case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
> +			case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
> +
> +				/*
> +				 * For conflicts that require a logical slot to be
> +				 * invalidated, the requirement is for the signal receiver to
> +				 * release the slot, so that it could be invalidated by the
> +				 * signal sender. So for normal backends, the transaction
> +				 * should be aborted, just like for other recovery conflicts.
> +				 * But if it's walsender on standby, we don't want to go
> +				 * through the following IsTransactionOrTransactionBlock()
> +				 * check, so break here.
> +				 */
> +				if (am_cascading_walsender &&
> +					reason == PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT &&
> +					MyReplicationSlot && SlotIsLogical(MyReplicationSlot))
> +				{
> +					RecoveryConflictPending = true;
> +					QueryCancelPending = true;
> +					InterruptPending = true;
> +					break;
> +				}
>  
>  				/*
>  				 * If we aren't in a transaction any longer then ignore.
I can't see any reason for this to be mixed into the same case "body" as LOCK
etc?
> diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
> index 38c6f18886..290d4b45f4 100644
> --- a/src/backend/replication/slot.c
> +++ b/src/backend/replication/slot.c
> @@ -51,6 +51,7 @@
>  #include "storage/proc.h"
>  #include "storage/procarray.h"
>  #include "utils/builtins.h"
> +#include "access/xlogrecovery.h"
Add new includes in the "alphabetically" right place...
Greetings,
Andres Freund
| From | Date | Subject | |
|---|---|---|---|
| Next Message | Julien Rouhaud | 2023-03-30 07:57:22 | Re: [POC] Allow an extension to add data into Query and PlannedStmt nodes | 
| Previous Message | Daniel Gustafsson | 2023-03-30 07:02:15 | Re: [EXTERNAL] Support load balancing in libpq |