Re: Minimal logical decoding on standbys

From: Amit Khandekar <amitdkhan(dot)pg(at)gmail(dot)com>
To: Andres Freund <andres(at)anarazel(dot)de>
Cc: tushar <tushar(dot)ahuja(at)enterprisedb(dot)com>, Petr Jelinek <petr(dot)jelinek(at)2ndquadrant(dot)com>, Robert Haas <robertmhaas(at)gmail(dot)com>, Craig Ringer <craig(at)2ndquadrant(dot)com>, Petr Jelinek <petr(at)2ndquadrant(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Minimal logical decoding on standbys
Date: 2019-06-24 18:28:50
Message-ID: CAJ3gD9eamOYUUveVoY2ootRHObJh2=kip=KPR-_riOPEP7UJFg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Thu, 20 Jun 2019 at 00:31, Andres Freund <andres(at)anarazel(dot)de> wrote:
>
> > > Or else, do you think we can just increment the record pointer by
> > > doing something like (lastReplayedEndRecPtr % XLOG_BLCKSZ) +
> > > SizeOfXLogShortPHD() ?
> >
> > I found out that we can't do this, because we don't know whether the
> > xlog header is SizeOfXLogShortPHD or SizeOfXLogLongPHD. In fact, in
> > our context, it is SizeOfXLogLongPHD. So we indeed need the
> > XLogReaderState handle.
>
> Well, we can determine whether a long or a short header is going to be
> used, as that's solely dependent on the LSN:

Discussion of this point (plus some more points) is in a separate
reply. You can reply to my comments there :
https://www.postgresql.org/message-id/CAJ3gD9f_HjQ6qP%3D%2B1jwzwy77fwcbT4-M3UvVsqpAzsY-jqM8nw%40mail.gmail.com

>
>
> > /*
> > + * Get the wal_level from the control file.
> > + */
> > +int
> > +ControlFileWalLevel(void)
> > +{
> > + return ControlFile->wal_level;
> > +}
>
> Any reason not to return the type enum WalLevel instead? I'm not sure I
> like the function name - perhaps something like GetActiveWalLevel() or
> such? The fact that it's in the control file doesn't seem relevant
> here. I think it should be close to DataChecksumsEnabled() etc, which
> all return information from the control file.

Done.

>
>
> > +/*
> > * Initialization of shared memory for XLOG
> > */
> > Size
> > @@ -9843,6 +9852,17 @@ xlog_redo(XLogReaderState *record)
> > /* Update our copy of the parameters in pg_control */
> > memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
> >
> > + /*
> > + * Drop logical slots if we are in hot standby and master does not have
> > + * logical data. Don't bother to search for the slots if standby is
> > + * running with wal_level lower than logical, because in that case,
> > + * we would have disallowed creation of logical slots.
> > + */
>
> s/disallowed creation/disallowed creation or previously dropped/

Did this :
* we would have either disallowed creation of logical slots or dropped
* existing ones.

>
> > + if (InRecovery && InHotStandby &&
> > + xlrec.wal_level < WAL_LEVEL_LOGICAL &&
> > + wal_level >= WAL_LEVEL_LOGICAL)
> > + ResolveRecoveryConflictWithSlots(InvalidOid, InvalidTransactionId);
> > +
> > LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
> > ControlFile->MaxConnections = xlrec.MaxConnections;
> > ControlFile->max_worker_processes =
> > xlrec.max_worker_processes;
>
> Not for this patch, but I kinda feel the individual replay routines
> ought to be broken out of xlog_redo().
Yeah, agree.

>
>
> > /* ----------------------------------------
> > * Functions for decoding the data and block references in a record.
> > * ----------------------------------------
> > diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
> > index 151c3ef..c1bd028 100644
> > --- a/src/backend/replication/logical/decode.c
> > +++ b/src/backend/replication/logical/decode.c
> > @@ -190,11 +190,23 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
> > * can restart from there.
> > */
> > break;
> > + case XLOG_PARAMETER_CHANGE:
> > + {
> > + xl_parameter_change *xlrec =
> > + (xl_parameter_change *) XLogRecGetData(buf->record);
> > +
> > + /* Cannot proceed if master itself does not have logical data */
> > + if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
> > + ereport(ERROR,
> > + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> > + errmsg("logical decoding on standby requires "
> > + "wal_level >= logical on master")));
> > + break;
> > + }
>
> This should also HINT to drop the replication slot.

In this case, DecodeXLogOp() is being called because somebody is using
the slot itself. Not sure if it makes sense to hint the user to drop
the very slot that he/she is using. It would have made better sense to
hint about dropping the slot if something else was being done that
does not require a slot, but because the slot is becoming a nuisance,
we hint to drop the slot so as to avoid the error. What do you say ?
Probably the error message itself hints at setting the wal-level back
to logical.

>
>
> > + /*
> > + * It is not guaranteed that the restart_lsn points to a valid
> > + * record location. E.g. on standby, restart_lsn initially points to lastReplayedEndRecPtr,
> > + * which is 1 + the end of last replayed record, which means it can point the next
> > + * block header start. So bump it to the next valid record.
> > + */
>
> I'd rephrase this as something like:
>
> restart_lsn initially may point one past the end of the record. If that
> is a XLOG page boundary, it will not be a valid LSN for the start of a
> record. If that's the case, look for the start of the first record.

Done.

>
>
> > + if (!XRecOffIsValid(startptr))
> > + {
>
> Hm, could you before this add an Assert(startptr != InvalidXLogRecPtr)
> or such?

Yeah, done

>
>
> > + elog(DEBUG1, "Invalid restart lsn %X/%X",
> > + (uint32) (startptr >> 32), (uint32) startptr);
> > + startptr = XLogFindNextRecord(ctx->reader, startptr);
> > +
> > + SpinLockAcquire(&slot->mutex);
> > + slot->data.restart_lsn = startptr;
> > + SpinLockRelease(&slot->mutex);
> > + elog(DEBUG1, "Moved slot restart lsn to %X/%X",
> > + (uint32) (startptr >> 32), (uint32) startptr);
> > + }
>
> Minor nit: normally debug messages don't start with upper case.

Done.

>
>
> > /* Wait for a consistent starting point */
> > for (;;)
> > {
> > diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
> > index 55c306e..7ffd264 100644
> > --- a/src/backend/replication/slot.c
> > +++ b/src/backend/replication/slot.c
> > @@ -1016,37 +1016,37 @@ ReplicationSlotReserveWal(void)
> > /*
> > * For logical slots log a standby snapshot and start logical decoding
> > * at exactly that position. That allows the slot to start up more
> > - * quickly.
> > + * quickly. But on a standby we cannot do WAL writes, so just use the
> > + * replay pointer; effectively, an attempt to create a logical slot on
> > + * standby will cause it to wait for an xl_running_xact record so that
> > + * a snapshot can be built using the record.
>
> I'd add "to be logged independently on the primary" after "wait for an
> xl_running_xact record".

Done.

>
>
> > - * That's not needed (or indeed helpful) for physical slots as they'll
> > - * start replay at the last logged checkpoint anyway. Instead return
> > - * the location of the last redo LSN. While that slightly increases
> > - * the chance that we have to retry, it's where a base backup has to
> > - * start replay at.
> > + * None of this is needed (or indeed helpful) for physical slots as
> > + * they'll start replay at the last logged checkpoint anyway. Instead
> > + * return the location of the last redo LSN. While that slightly
> > + * increases the chance that we have to retry, it's where a base backup
> > + * has to start replay at.
> > */
> > +
> > + restart_lsn =
> > + (SlotIsPhysical(slot) ? GetRedoRecPtr() :
> > + (RecoveryInProgress() ? GetXLogReplayRecPtr(NULL) :
> > + GetXLogInsertRecPtr()));
>
> Please rewrite this to use normal if blocks. I'm also not convinced that
> it's useful to have this if block, and then another if block that
> basically tests the same conditions again.

Will check and get back on this one.

>
>
> > /*
> > + * Resolve recovery conflicts with slots.
> > + *
> > + * When xid is valid, it means it's a removed-xid kind of conflict, so need to
> > + * drop the appropriate slots whose xmin conflicts with removed xid.
>
> I don't think "removed-xid kind of conflict" is that descriptive. I'd
> suggest something like "When xid is valid, it means that rows older than
> xid might have been removed. Therefore we need to drop slots that depend
> on seeing those rows."

Done.

>
>
> > + * When xid is invalid, drop all logical slots. This is required when the
> > + * master wal_level is set back to replica, so existing logical slots need to
> > + * be dropped.
> > + */
> > +void
> > +ResolveRecoveryConflictWithSlots(Oid dboid, TransactionId xid)
> > +{
> > + int i;
> > + bool found_conflict = false;
> > +
> > + if (max_replication_slots <= 0)
> > + return;
> > +
> > +restart:
> > + if (found_conflict)
> > + {
> > + CHECK_FOR_INTERRUPTS();
> > + /*
> > + * Wait awhile for them to die so that we avoid flooding an
> > + * unresponsive backend when system is heavily loaded.
> > + */
> > + pg_usleep(100000);
> > + found_conflict = false;
> > + }
>
> Hm, I wonder if we could use the condition variable the slot
> infrastructure has these days for this instead.

Removed the pg_usleep, since in the attached patch, we now sleep on
the condition variable just after sending a recovery conflict signal
is sent. Details down below.

>
>
> > + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
> > + for (i = 0; i < max_replication_slots; i++)
> > + {
> > + ReplicationSlot *s;
> > + NameData slotname;
> > + TransactionId slot_xmin;
> > + TransactionId slot_catalog_xmin;
> > +
> > + s = &ReplicationSlotCtl->replication_slots[i];
> > +
> > + /* cannot change while ReplicationSlotCtlLock is held */
> > + if (!s->in_use)
> > + continue;
> > +
> > + /* Invalid xid means caller is asking to drop all logical slots */
> > + if (!TransactionIdIsValid(xid) && SlotIsLogical(s))
> > + found_conflict = true;
>
> I'd just add
>
> if (!SlotIsLogical(s))
> continue;
>
> because all of this doesn't need to happen for slots that aren't
> logical.

Yeah right. Done. Also renamed the function to
ResolveRecoveryConflictWithLogicalSlots() to emphasize that it is only
for logical slots.

>
> > + else
> > + {
> > + /* not our database, skip */
> > + if (s->data.database != InvalidOid && s->data.database != dboid)
> > + continue;
> > +
> > + SpinLockAcquire(&s->mutex);
> > + slotname = s->data.name;
> > + slot_xmin = s->data.xmin;
> > + slot_catalog_xmin = s->data.catalog_xmin;
> > + SpinLockRelease(&s->mutex);
> > +
> > + if (TransactionIdIsValid(slot_xmin) && TransactionIdPrecedesOrEquals(slot_xmin, xid))
> > + {
> > + found_conflict = true;
> > +
> > + ereport(LOG,
> > + (errmsg("slot %s w/ xmin %u conflicts with removed xid %u",
> > + NameStr(slotname), slot_xmin, xid)));
> > + }
>
> s/removed xid/xid horizon being increased to %u/

BTW, this message belongs to an older patch. Check v7 onwards for
latest way I used for generating the message. Anyway, I have used the
above suggestion. Now the message detail will look like :
slot xmin: 1234, slot catalog_xmin: 5678, conflicts with xid horizon
being increased to 9012"

>
>
> > + if (TransactionIdIsValid(slot_catalog_xmin) && TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid))
> > + {
> > + found_conflict = true;
> > +
> > + ereport(LOG,
> > + (errmsg("slot %s w/ catalog xmin %u conflicts with removed xid %u",
> > + NameStr(slotname), slot_catalog_xmin, xid)));
> > + }
> > +
> > + }
> > + if (found_conflict)
> > + {
>
>
> Hm, as far as I can tell you just ignore that the slot might currently
> be in use. You can't just drop a slot that somebody is using.

Yeah, I missed that.

> I think
> you need to send a recovery conflict to that backend.
>
> I guess the easiest way to do that would be something roughly like:
>
> SetInvalidVirtualTransactionId(vxid);
>
> LWLockAcquire(ProcArrayLock, LW_SHARED);
> cancel_proc = BackendPidGetProcWithLock(active_pid);
> if (cancel_proc)
> vxid = GET_VXID_FROM_PGPROC(cancel_proc);
> LWLockRelease(ProcArrayLock);
>
> if (VirtualTransactionIdIsValid(vixd))
> {
> CancelVirtualTransaction(vxid);
>
> /* Wait here until we get signaled, and then restart */
> ConditionVariableSleep(&slot->active_cv,
> WAIT_EVENT_REPLICATION_SLOT_DROP);
> }
> ConditionVariableCancelSleep();
>
> when the slot is currently active.

Did that now. Check the new function ReplicationSlotDropConflicting().

Also the below code is something that I added :
* Note: Even if vxid.localTransactionId is invalid, we need to cancel
* that backend, because there is no other way to make it release the
* slot. So don't bother to validate vxid.localTransactionId.
*/
if (vxid.backendId == InvalidBackendId)
continue;

This was done so that we could kill walsender in case pg_recvlogical
made it acquire the slot that we want to drop. walsender does not have
a local transaction id it seems. But CancelVirtualTransaction() works
also if vxid.localTransactionId is invalid. I have added comments to
explain this in CancelVirtualTransaction().

> Part of this would need to be split
> into a procarray.c helper function (mainly all the stuff dealing with
> ProcArrayLock).

I didn't have to split it, by the way.

>
>
> > + elog(LOG, "Dropping conflicting slot %s", s->data.name.data);
>
> This definitely needs to be expanded, and follow the message style
> guideline.

v7 patch onvwards, the message looks :
ereport(LOG,
(errmsg("Dropping conflicting slot %s", NameStr(slotname)),
errdetail("%s", conflict_reason)));
Does that suffice ?

>
>
> > + LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
>
> Instead of saying "deadlock" I'd just say that ReplicationSlotDropPtr()
> will acquire that lock.

Done

>
>
> > + ReplicationSlotDropPtr(s);
>
> But more importantly, I don't think this is
> correct. ReplicationSlotDropPtr() assumes that the to-be-dropped slot is
> acquired by the current backend - without that somebody else could
> concurrently acquire that slot.
>
> SO I think you need to do something like ReplicationSlotsDropDBSlots()
> does:
>
> /* acquire slot, so ReplicationSlotDropAcquired can be reused */
> SpinLockAcquire(&s->mutex);
> /* can't change while ReplicationSlotControlLock is held */
> slotname = NameStr(s->data.name);
> active_pid = s->active_pid;
> if (active_pid == 0)
> {
> MyReplicationSlot = s;
> s->active_pid = MyProcPid;
> }
> SpinLockRelease(&s->mutex);

I have now done this in ReplicationSlotDropConflicting() itself.

>
>
> Greetings,
>
> Andres Freund

I have also removed the code inside #ifdef NOT_ANYMORE that errors out
with "logical decoding cannot be used while in recovery".

I have introduced a new procsignal reason
PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT so that when the conflicting
logical slot is dropped, a new error detail will be shown : "User was
using the logical slot that must be dropped".
Accordingly, added PgStat_StatDBEntry.n_conflict_logicalslot field.

Also, in RecoveryConflictInterrupt(), had to do some special handling
for am_cascading_walsender, so that a conflicting walsender on standby
will be terminated irrespective of the transaction status.

Attached v9 patch.

--
Thanks,
-Amit Khandekar
EnterpriseDB Corporation
The Postgres Database Company

Attachment Content-Type Size
logical-decoding-on-standby_v9.patch application/octet-stream 61.3 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Stephen Frost 2019-06-24 18:52:02 Re: [PATCH] Stop ALTER SYSTEM from making bad assumptions
Previous Message Simon Riggs 2019-06-24 18:05:24 Re: [PATCH] Incremental sort (was: PoC: Partial sort)