Re: Minimal logical decoding on standbys

From: Andres Freund <andres(at)anarazel(dot)de>
To: Robert Haas <robertmhaas(at)gmail(dot)com>, "Drouvot, Bertrand" <bertranddrouvot(dot)pg(at)gmail(dot)com>, Thomas Munro <thomas(dot)munro(at)gmail(dot)com>
Cc: Alvaro Herrera <alvherre(at)2ndquadrant(dot)com>, Ibrar Ahmed <ibrar(dot)ahmad(at)gmail(dot)com>, Amit Khandekar <amitdkhan(dot)pg(at)gmail(dot)com>, fabriziomello(at)gmail(dot)com, tushar <tushar(dot)ahuja(at)enterprisedb(dot)com>, Rahila Syed <rahila(dot)syed(at)2ndquadrant(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Minimal logical decoding on standbys
Date: 2023-01-06 03:40:36
Message-ID: 20230106034036.2m4qnn7ep7b5ipet@awork3.anarazel.de
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi,

Thomas, there's one point at the bottom wrt ConditionVariables that'd be
interesting for you to comment on.

On 2023-01-05 16:15:39 -0500, Robert Haas wrote:
> On Tue, Jan 3, 2023 at 2:42 AM Drouvot, Bertrand
> <bertranddrouvot(dot)pg(at)gmail(dot)com> wrote:
> > Please find attached v36, tiny rebase due to 1de58df4fe.
>
> 0001 looks committable to me now, though we probably shouldn't do that
> unless we're pretty confident about shipping enough of the rest of
> this to accomplish something useful.

Cool!

ISTM that the ordering of patches isn't quite right later on. ISTM that it
doesn't make sense to introduce working logic decoding without first fixing
WalSndWaitForWal() (i.e. patch 0006). What made you order the patches that
way?

0001:
> 4. We can't rely on the standby's relcache entries for this purpose in
> any way, because the WAL record that causes the problem might be
> replayed before the standby even reaches consistency.

The startup process can't access catalog contents in the first place, so the
consistency issue is secondary.

ISTM that the commit message omits a fairly significant portion of the change:
The introduction of indisusercatalog / the reason for its introduction.

Why is indisusercatalog stored as "full" column, whereas we store the fact of
table being used as a catalog table in a reloption? I'm not adverse to moving
to a full column, but then I think we should do the same for tables.

Earlier version of the patches IIRC sourced the "catalogness" from the
relation. What lead you to changing that? I'm not saying it's wrong, just not
sure it's right either.

It'd be good to introduce cross-checks that indisusercatalog is set
correctly. RelationGetIndexList() seems like a good candidate.

I'd probably split the introduction of indisusercatalog into a separate patch.

Why was HEAP_DEFAULT_USER_CATALOG_TABLE introduced in this patch?

I wonder if we instead should compute a relation's "catalogness" in the
relcache. That'd would have the advantage of making
RelationIsUsedAsCatalogTable() cheaper and working for all kinds of
relations.

VISIBILITYMAP_ON_CATALOG_ACCESSIBLE_IN_LOGICAL_DECODING is a very long
identifier. Given that the field in the xlog records is just named
isCatalogRel, any reason to not just name it correspondingly?

0002:

> +/*
> + * Helper for InvalidateConflictingLogicalReplicationSlot -- acquires the given slot
> + * and mark it invalid, if necessary and possible.
> + *
> + * Returns whether ReplicationSlotControlLock was released in the interim (and
> + * in that case we're not holding the lock at return, otherwise we are).
> + *
> + * This is inherently racy, because we release the LWLock
> + * for syscalls, so caller must restart if we return true.
> + */
> +static bool
> +InvalidatePossiblyConflictingLogicalReplicationSlot(ReplicationSlot *s, TransactionId xid)

This appears to be a near complete copy of InvalidatePossiblyObsoleteSlot(). I
don't think we should have two versions of that non-trivial code. Seems we
could just have an additional argument for InvalidatePossiblyObsoleteSlot()?

> + ereport(LOG,
> + (errmsg("invalidating slot \"%s\" because it conflicts with recovery", NameStr(slotname))));
> +

I think this should report more details, similar to what
InvalidateObsoleteReplicationSlots() does.

> --- a/src/backend/replication/logical/logicalfuncs.c
> +++ b/src/backend/replication/logical/logicalfuncs.c
> @@ -216,11 +216,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
>
> /*
> * After the sanity checks in CreateDecodingContext, make sure the
> - * restart_lsn is valid. Avoid "cannot get changes" wording in this
> + * restart_lsn is valid or both xmin and catalog_xmin are valid.
> + * Avoid "cannot get changes" wording in this
> * errmsg because that'd be confusingly ambiguous about no changes
> * being available.
> */
> - if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
> + if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)
> + || (!TransactionIdIsValid(MyReplicationSlot->data.xmin)
> + && !TransactionIdIsValid(MyReplicationSlot->data.catalog_xmin)))
> ereport(ERROR,
> (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> errmsg("can no longer get changes from replication slot \"%s\"",

Hm. Feels like we should introduce a helper like SlotIsInvalidated() instead
of having this condition in a bunch of places.

> + if (!TransactionIdIsValid(MyReplicationSlot->data.xmin)
> + && !TransactionIdIsValid(MyReplicationSlot->data.catalog_xmin))
> + ereport(ERROR,
> + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> + errmsg("cannot read from logical replication slot \"%s\"",
> + cmd->slotname),
> + errdetail("This slot has been invalidated because it was conflicting with recovery.")));
> +

This is a more precise error than the one in
pg_logical_slot_get_changes_guts().

I think both places should output the same error. ISTM that the relevant code
should be in CreateDecodingContext(). Imo the code to deal with the WAL
version of this has been misplaced...

> --- a/src/backend/storage/ipc/procarray.c
> +++ b/src/backend/storage/ipc/procarray.c
> @@ -3477,6 +3477,10 @@ SignalVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode,
>
> GET_VXID_FROM_PGPROC(procvxid, *proc);
>
> + /*
> + * Note: vxid.localTransactionId can be invalid, which means the
> + * request is to signal the pid that is not running a transaction.
> + */
> if (procvxid.backendId == vxid.backendId &&
> procvxid.localTransactionId == vxid.localTransactionId)
> {

I can't really parse the comment.

> @@ -500,6 +502,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
> PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
> WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
> true);
> +
> + if (isCatalogRel)
> + InvalidateConflictingLogicalReplicationSlots(locator.dbOid, snapshotConflictHorizon);
> }

Might be worth checking if wal_level >= logical before the somewhat expensive
InvalidateConflictingLogicalReplicationSlots().

> @@ -3051,6 +3054,25 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
> case PROCSIG_RECOVERY_CONFLICT_LOCK:
> case PROCSIG_RECOVERY_CONFLICT_TABLESPACE:
> case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
> + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
> + /*
> + * For conflicts that require a logical slot to be invalidated, the
> + * requirement is for the signal receiver to release the slot,
> + * so that it could be invalidated by the signal sender. So for
> + * normal backends, the transaction should be aborted, just
> + * like for other recovery conflicts. But if it's walsender on
> + * standby, then it has to be killed so as to release an
> + * acquired logical slot.
> + */
> + if (am_cascading_walsender &&
> + reason == PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT &&
> + MyReplicationSlot && SlotIsLogical(MyReplicationSlot))
> + {
> + RecoveryConflictPending = true;
> + QueryCancelPending = true;
> + InterruptPending = true;
> + break;
> + }
>
> /*
> * If we aren't in a transaction any longer then ignore.

Why does the walsender need to be killed? I think it might just be that
IsTransactionOrTransactionBlock() might return false, even though we want to
cancel. The code actually seems to only cancel (QueryCancelPending is set
rather than ProcDiePending), but the comment talks about killing?

0003:

> Allow a logical slot to be created on standby. Restrict its usage
> or its creation if wal_level on primary is less than logical.
> During slot creation, it's restart_lsn is set to the last replayed
> LSN. Effectively, a logical slot creation on standby waits for an
> xl_running_xact record to arrive from primary. Conflicting slots
> would be handled in next commits.

I think the commit message might be outdated, the next commit is a test.

> + /*
> + * Replay pointer may point one past the end of the record. If that
> + * is a XLOG page boundary, it will not be a valid LSN for the
> + * start of a record, so bump it up past the page header.
> + */
> + if (!XRecOffIsValid(restart_lsn))
> + {
> + if (restart_lsn % XLOG_BLCKSZ != 0)
> + elog(ERROR, "invalid replay pointer");
> +
> + /* For the first page of a segment file, it's a long header */
> + if (XLogSegmentOffset(restart_lsn, wal_segment_size) == 0)
> + restart_lsn += SizeOfXLogLongPHD;
> + else
> + restart_lsn += SizeOfXLogShortPHD;
> + }

Is this actually needed? Supposedly xlogreader can work just fixe with an
address at the start of a page?

/*
* Caller supplied a position to start at.
*
* In this case, NextRecPtr should already be pointing either to a
* valid record starting position or alternatively to the beginning of
* a page. See the header comments for XLogBeginRead.
*/
Assert(RecPtr % XLOG_BLCKSZ == 0 || XRecOffIsValid(RecPtr));

> /*
> - * Since logical decoding is only permitted on a primary server, we know
> - * that the current timeline ID can't be changing any more. If we did this
> - * on a standby, we'd have to worry about the values we compute here
> - * becoming invalid due to a promotion or timeline change.
> + * Since logical decoding is also permitted on a standby server, we need
> + * to check if the server is in recovery to decide how to get the current
> + * timeline ID (so that it also cover the promotion or timeline change cases).
> */
> + if (!RecoveryInProgress())
> + currTLI = GetWALInsertionTimeLine();
> + else
> + GetXLogReplayRecPtr(&currTLI);
> +

This seems to remove some content from the !recovery case.

It's a bit odd that here RecoveryInProgress() is used, whereas further down
am_cascading_walsender is used.

> @@ -3074,10 +3078,12 @@ XLogSendLogical(void)
> * If first time through in this session, initialize flushPtr. Otherwise,
> * we only need to update flushPtr if EndRecPtr is past it.
> */
> - if (flushPtr == InvalidXLogRecPtr)
> - flushPtr = GetFlushRecPtr(NULL);
> - else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
> - flushPtr = GetFlushRecPtr(NULL);
> + if (flushPtr == InvalidXLogRecPtr ||
> + logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
> + {
> + flushPtr = (am_cascading_walsender ?
> + GetStandbyFlushRecPtr(NULL) : GetFlushRecPtr(NULL));
> + }
>
> /* If EndRecPtr is still past our flushPtr, it means we caught up. */
> if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)

A short if inside a normal if seems ugly to me.

0004:

> @@ -3037,6 +3037,43 @@ $SIG{TERM} = $SIG{INT} = sub {
>
> =pod
>
> +=item $node->create_logical_slot_on_standby(self, master, slot_name, dbname)
> +
> +Create logical replication slot on given standby
> +
> +=cut
> +
> +sub create_logical_slot_on_standby
> +{

Any reason this has to be standby specific?

> + # Now arrange for the xl_running_xacts record for which pg_recvlogical
> + # is waiting.
> + $master->safe_psql('postgres', 'CHECKPOINT');
> +

Hm, that's quite expensive. Perhaps worth adding a C helper that can do that
for us instead? This will likely also be needed in real applications after all.

> + print "starting pg_recvlogical\n";

I don't think tests should just print somewhere. Either diag() or note()
should be used.

> + if ($wait)
> + # make sure activeslot is in use
> + {
> + $node_standby->poll_query_until('testdb',
> + "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'activeslot' AND active_pid IS NOT NULL)"
> + ) or die "slot never became active";
> + }

That comment placement imo is quite odd.

> +# test if basic decoding works
> +is(scalar(my @foobar = split /^/m, $result),
> + 14, 'Decoding produced 14 rows');

Maybe mention that it's 2 transactions + 10 rows?

> +$node_primary->wait_for_catchup($node_standby, 'replay', $node_primary->lsn('flush'));

There's enough copies of this that I wonder if we shouldn't introduce a
Cluster.pm level helper for this.

> +print "waiting to replay $endpos\n";

See above.

> +my $stdout_recv = $node_standby->pg_recvlogical_upto(
> + 'testdb', 'activeslot', $endpos, 180,
> + 'include-xids' => '0',
> + 'skip-empty-xacts' => '1');

I don't think this should use a hardcoded 180 but
$PostgreSQL::Test::Utils::timeout_default.

> +# One way to reproduce recovery conflict is to run VACUUM FULL with
> +# hot_standby_feedback turned off on the standby.
> +$node_standby->append_conf('postgresql.conf',q[
> +hot_standby_feedback = off
> +]);
> +$node_standby->restart;

IIRC a reload should suffice.

> +# This should trigger the conflict
> +$node_primary->safe_psql('testdb', 'VACUUM FULL');

Can we do something cheaper than rewriting the entire database? Seems
rewriting a single table ought to be sufficient?

I think it'd also be good to test that rewriting a non-catalog table doesn't
trigger an issue.

> +##################################################
> +# Recovery conflict: Invalidate conflicting slots, including in-use slots
> +# Scenario 2: conflict due to row removal with hot_standby_feedback off.
> +##################################################
> +
> +# get the position to search from in the standby logfile
> +my $logstart = -s $node_standby->logfile;
> +
> +# drop the logical slots
> +$node_standby->psql('postgres', q[SELECT pg_drop_replication_slot('inactiveslot')]);
> +$node_standby->psql('postgres', q[SELECT pg_drop_replication_slot('activeslot')]);
> +
> +create_logical_slots();
> +
> +# One way to produce recovery conflict is to create/drop a relation and launch a vacuum
> +# with hot_standby_feedback turned off on the standby.
> +$node_standby->append_conf('postgresql.conf',q[
> +hot_standby_feedback = off
> +]);
> +$node_standby->restart;
> +# ensure walreceiver feedback off by waiting for expected xmin and
> +# catalog_xmin on primary. Both should be NULL since hs_feedback is off
> +wait_for_xmins($node_primary, $primary_slotname,
> + "xmin IS NULL AND catalog_xmin IS NULL");
> +
> +$handle = make_slot_active(1);

This is a fair bit of repeated setup, maybe put it into a function?

I think it'd be good to test the ongoing decoding via the SQL interface also
gets correctly handled. But it might be too hard to do reliably.

> +##################################################
> +# Test standby promotion and logical decoding behavior
> +# after the standby gets promoted.
> +##################################################
> +

I think this also should test the streaming / walsender case.

0006:

> diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
> index bc3c3eb3e7..98c96eb864 100644
> --- a/src/backend/access/transam/xlogrecovery.c
> +++ b/src/backend/access/transam/xlogrecovery.c
> @@ -358,6 +358,9 @@ typedef struct XLogRecoveryCtlData
> RecoveryPauseState recoveryPauseState;
> ConditionVariable recoveryNotPausedCV;
>
> + /* Replay state (see getReplayedCV() for more explanation) */
> + ConditionVariable replayedCV;
> +
> slock_t info_lck; /* locks shared variables shown above */
> } XLogRecoveryCtlData;
>

getReplayedCV() doesn't seem to fit into any of the naming scheems in use for
xlogrecovery.h.

> - * Sleep until something happens or we time out. Also wait for the
> - * socket becoming writable, if there's still pending output.
> + * When not in recovery, sleep until something happens or we time out.
> + * Also wait for the socket becoming writable, if there's still pending output.

Hm. Is there a problem with not handling the becoming-writable case in the
in-recovery case?

> + else
> + /*
> + * We are in the logical decoding on standby case.
> + * We are waiting for the startup process to replay wal record(s) using
> + * a timeout in case we are requested to stop.
> + */
> + {

I don't think pgindent will like that formatting....

> + ConditionVariablePrepareToSleep(replayedCV);
> + ConditionVariableTimedSleep(replayedCV, 1000,
> + WAIT_EVENT_WAL_SENDER_WAIT_REPLAY);
> + }

I think this is racy, see ConditionVariablePrepareToSleep()'s comment:

* Caution: "before entering the loop" means you *must* test the exit
* condition between calling ConditionVariablePrepareToSleep and calling
* ConditionVariableSleep. If that is inconvenient, omit calling
* ConditionVariablePrepareToSleep.

Basically, the ConditionVariablePrepareToSleep() should be before the loop
body.

I don't think the fixed timeout here makes sense. For one, we need to wake up
based on WalSndComputeSleeptime(), otherwise we're ignoring wal_sender_timeout
(which can be quite small). It's also just way too frequent - we're trying to
avoid constantly waking up unnecessarily.

Perhaps we could deal with the pq_is_send_pending() issue by having a version
of ConditionVariableTimedSleep() that accepts a WaitEventSet?

Greetings,

Andres Freund

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message houzj.fnst@fujitsu.com 2023-01-06 03:54:53 Notify downstream to discard the streamed transaction which was aborted due to crash.
Previous Message Justin Pryzby 2023-01-06 03:16:53 pg_ftruncate hardcodes length=0 but only under windows