Re: Minimal logical decoding on standbys

From: Andres Freund <andres(at)anarazel(dot)de>
To: Amit Khandekar <amitdkhan(dot)pg(at)gmail(dot)com>
Cc: tushar <tushar(dot)ahuja(at)enterprisedb(dot)com>, Petr Jelinek <petr(dot)jelinek(at)2ndquadrant(dot)com>, Robert Haas <robertmhaas(at)gmail(dot)com>, Craig Ringer <craig(at)2ndquadrant(dot)com>, Petr Jelinek <petr(at)2ndquadrant(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Minimal logical decoding on standbys
Date: 2019-06-19 19:01:38
Message-ID: 20190619190138.rufzsld5xgcgshmh@alap3.anarazel.de
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi,

On 2019-06-12 17:30:02 +0530, Amit Khandekar wrote:
> On Tue, 11 Jun 2019 at 12:24, Amit Khandekar <amitdkhan(dot)pg(at)gmail(dot)com> wrote:
> > On Mon, 10 Jun 2019 at 10:37, Amit Khandekar <amitdkhan(dot)pg(at)gmail(dot)com> wrote:
> > > Since this requires the test to handle the
> > > fire-create-slot-and-then-fire-checkpoint-from-master actions, I was
> > > modifying the test file to do this. After doing that, I found that the
> > > slave gets an assertion failure in XLogReadRecord()=>XRecOffIsValid().
> > > This happens only when the restart_lsn is set to ReplayRecPtr.
> > > Somehow, this does not happen when I manually create the logical slot.
> > > It happens only while running testcase. Working on it ...
> >
> > Like I mentioned above, I get an assertion failure for
> > Assert(XRecOffIsValid(RecPtr)) while reading WAL records looking for a
> > start position (DecodingContextFindStartpoint()). This is because in
> > CreateInitDecodingContext()=>ReplicationSlotReserveWal(), I now set
> > the logical slot's restart_lsn to XLogCtl->lastReplayedEndRecPtr. And
> > just after bringing up slave, lastReplayedEndRecPtr's initial values
> > are in this order : 0/2000028, 0/2000060, 0/20000D8, 0/2000100,
> > 0/3000000, 0/3000060. You can see that 0/3000000 is not a valid value
> > because it points to the start of a WAL block, meaning it points to
> > the XLog page header (I think it's possible because it is 1 + endof
> > last replayed record, which can be start of next block). So when we
> > try to create a slot when it's in that position, then XRecOffIsValid()
> > fails while looking for a starting point.
> >
> > One option I considered was : If lastReplayedEndRecPtr points to XLog
> > page header, get a position of the first record on that WAL block,
> > probably with XLogFindNextRecord(). But it is not trivial because
> > while in ReplicationSlotReserveWal(), XLogReaderState is not created
> > yet.
>
> In the attached v6 version of the patch, I did the above. That is, I
> used XLogFindNextRecord() to bump up the restart_lsn of the slot to
> the first valid record. But since XLogReaderState is not available in
> ReplicationSlotReserveWal(), I did this in
> DecodingContextFindStartpoint(). And then updated the slot restart_lsn
> with this corrected position.

> Since XLogFindNextRecord() is currently disabled using #if 0, removed
> this directive.

Well, ifdef FRONTEND. I don't think that's a problem. It's a bit
overkill here, because I think we know the address has to be on a record
boundary (rather than being in the middle of a page spanning WAL
record). So we could just add add the size of the header manually - but
I think that's not worth doing.

> > Or else, do you think we can just increment the record pointer by
> > doing something like (lastReplayedEndRecPtr % XLOG_BLCKSZ) +
> > SizeOfXLogShortPHD() ?
>
> I found out that we can't do this, because we don't know whether the
> xlog header is SizeOfXLogShortPHD or SizeOfXLogLongPHD. In fact, in
> our context, it is SizeOfXLogLongPHD. So we indeed need the
> XLogReaderState handle.

Well, we can determine whether a long or a short header is going to be
used, as that's solely dependent on the LSN:

/*
* If first page of an XLOG segment file, make it a long header.
*/
if ((XLogSegmentOffset(NewPage->xlp_pageaddr, wal_segment_size)) == 0)
{
XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;

NewLongPage->xlp_sysid = ControlFile->system_identifier;
NewLongPage->xlp_seg_size = wal_segment_size;
NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
NewPage->xlp_info |= XLP_LONG_HEADER;
}

but I don't think that's worth it.

> > Do you think that we can solve this using some other approach ? I am
> > not sure whether it's only the initial conditions that cause
> > lastReplayedEndRecPtr value to *not* point to a valid record, or is it
> > just a coincidence and that lastReplayedEndRecPtr can also have such a
> > value any time afterwards.

It's always possible. All that means is that the last record filled the
entire last WAL page.

> > If it's only possible initially, we can
> > just use GetRedoRecPtr() instead of lastReplayedEndRecPtr if
> > lastReplayedEndRecPtr is invalid.

I don't think so? The redo pointer will point to something *much*
earlier, where we'll not yet have done all the necessary conflict
handling during recovery? So we'd not necessarily notice that a slot
is not actually usable for decoding.

We could instead just handle that by starting decoding at the redo
pointer, and just ignore all WAL records until they're after
lastReplayedEndRecPtr, but that has no advantages, and will read a lot
more WAL.

> static void _bt_cachemetadata(Relation rel, BTMetaPageData *input);
> @@ -773,6 +774,7 @@ _bt_log_reuse_page(Relation rel, BlockNumber blkno, TransactionId latestRemovedX
> */
>
> /* XLOG stuff */
> + xlrec_reuse.onCatalogTable = get_rel_logical_catalog(rel->rd_index->indrelid);
> xlrec_reuse.node = rel->rd_node;
> xlrec_reuse.block = blkno;
> xlrec_reuse.latestRemovedXid = latestRemovedXid;
> @@ -1140,6 +1142,7 @@ _bt_delitems_delete(Relation rel, Buffer buf,
> XLogRecPtr recptr;
> xl_btree_delete xlrec_delete;
>
> + xlrec_delete.onCatalogTable = get_rel_logical_catalog(rel->rd_index->indrelid);
> xlrec_delete.latestRemovedXid = latestRemovedXid;
> xlrec_delete.nitems = nitems;

Can we instead pass the heap rel down to here? I think there's only one
caller, and it has the heap relation available these days (it didn't at
the time of the prototype, possibly). There's a few other users of
get_rel_logical_catalog() where that might be harder, but it's easy
here.

> @@ -27,6 +27,7 @@
> #include "storage/indexfsm.h"
> #include "storage/lmgr.h"
> #include "utils/snapmgr.h"
> +#include "utils/lsyscache.h"
>
>
> /* Entry in pending-list of TIDs we need to revisit */
> @@ -502,6 +503,7 @@ vacuumRedirectAndPlaceholder(Relation index, Buffer buffer)
> OffsetNumber itemnos[MaxIndexTuplesPerPage];
> spgxlogVacuumRedirect xlrec;
>
> + xlrec.onCatalogTable = get_rel_logical_catalog(index->rd_index->indrelid);
> xlrec.nToPlaceholder = 0;
> xlrec.newestRedirectXid = InvalidTransactionId;

This one seems harder, but I'm not actually sure why we make it so
hard. It seems like we just ought to add the table to IndexVacuumInfo.

> /*
> + * Get the wal_level from the control file.
> + */
> +int
> +ControlFileWalLevel(void)
> +{
> + return ControlFile->wal_level;
> +}

Any reason not to return the type enum WalLevel instead? I'm not sure I
like the function name - perhaps something like GetActiveWalLevel() or
such? The fact that it's in the control file doesn't seem relevant
here. I think it should be close to DataChecksumsEnabled() etc, which
all return information from the control file.

> +/*
> * Initialization of shared memory for XLOG
> */
> Size
> @@ -9843,6 +9852,17 @@ xlog_redo(XLogReaderState *record)
> /* Update our copy of the parameters in pg_control */
> memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
>
> + /*
> + * Drop logical slots if we are in hot standby and master does not have
> + * logical data. Don't bother to search for the slots if standby is
> + * running with wal_level lower than logical, because in that case,
> + * we would have disallowed creation of logical slots.
> + */

s/disallowed creation/disallowed creation or previously dropped/

> + if (InRecovery && InHotStandby &&
> + xlrec.wal_level < WAL_LEVEL_LOGICAL &&
> + wal_level >= WAL_LEVEL_LOGICAL)
> + ResolveRecoveryConflictWithSlots(InvalidOid, InvalidTransactionId);
> +
> LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
> ControlFile->MaxConnections = xlrec.MaxConnections;
> ControlFile->max_worker_processes =
> xlrec.max_worker_processes;

Not for this patch, but I kinda feel the individual replay routines
ought to be broken out of xlog_redo().

> /* ----------------------------------------
> * Functions for decoding the data and block references in a record.
> * ----------------------------------------
> diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
> index 151c3ef..c1bd028 100644
> --- a/src/backend/replication/logical/decode.c
> +++ b/src/backend/replication/logical/decode.c
> @@ -190,11 +190,23 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
> * can restart from there.
> */
> break;
> + case XLOG_PARAMETER_CHANGE:
> + {
> + xl_parameter_change *xlrec =
> + (xl_parameter_change *) XLogRecGetData(buf->record);
> +
> + /* Cannot proceed if master itself does not have logical data */
> + if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
> + ereport(ERROR,
> + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> + errmsg("logical decoding on standby requires "
> + "wal_level >= logical on master")));
> + break;
> + }

This should also HINT to drop the replication slot.

> + /*
> + * It is not guaranteed that the restart_lsn points to a valid
> + * record location. E.g. on standby, restart_lsn initially points to lastReplayedEndRecPtr,
> + * which is 1 + the end of last replayed record, which means it can point the next
> + * block header start. So bump it to the next valid record.
> + */

I'd rephrase this as something like:

restart_lsn initially may point one past the end of the record. If that
is a XLOG page boundary, it will not be a valid LSN for the start of a
record. If that's the case, look for the start of the first record.

> + if (!XRecOffIsValid(startptr))
> + {

Hm, could you before this add an Assert(startptr != InvalidXLogRecPtr)
or such?

> + elog(DEBUG1, "Invalid restart lsn %X/%X",
> + (uint32) (startptr >> 32), (uint32) startptr);
> + startptr = XLogFindNextRecord(ctx->reader, startptr);
> +
> + SpinLockAcquire(&slot->mutex);
> + slot->data.restart_lsn = startptr;
> + SpinLockRelease(&slot->mutex);
> + elog(DEBUG1, "Moved slot restart lsn to %X/%X",
> + (uint32) (startptr >> 32), (uint32) startptr);
> + }

Minor nit: normally debug messages don't start with upper case.

> /* Wait for a consistent starting point */
> for (;;)
> {
> diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
> index 55c306e..7ffd264 100644
> --- a/src/backend/replication/slot.c
> +++ b/src/backend/replication/slot.c
> @@ -1016,37 +1016,37 @@ ReplicationSlotReserveWal(void)
> /*
> * For logical slots log a standby snapshot and start logical decoding
> * at exactly that position. That allows the slot to start up more
> - * quickly.
> + * quickly. But on a standby we cannot do WAL writes, so just use the
> + * replay pointer; effectively, an attempt to create a logical slot on
> + * standby will cause it to wait for an xl_running_xact record so that
> + * a snapshot can be built using the record.

I'd add "to be logged independently on the primary" after "wait for an
xl_running_xact record".

> - * That's not needed (or indeed helpful) for physical slots as they'll
> - * start replay at the last logged checkpoint anyway. Instead return
> - * the location of the last redo LSN. While that slightly increases
> - * the chance that we have to retry, it's where a base backup has to
> - * start replay at.
> + * None of this is needed (or indeed helpful) for physical slots as
> + * they'll start replay at the last logged checkpoint anyway. Instead
> + * return the location of the last redo LSN. While that slightly
> + * increases the chance that we have to retry, it's where a base backup
> + * has to start replay at.
> */
> +
> + restart_lsn =
> + (SlotIsPhysical(slot) ? GetRedoRecPtr() :
> + (RecoveryInProgress() ? GetXLogReplayRecPtr(NULL) :
> + GetXLogInsertRecPtr()));

Please rewrite this to use normal if blocks. I'm also not convinced that
it's useful to have this if block, and then another if block that
basically tests the same conditions again.

> + SpinLockAcquire(&slot->mutex);
> + slot->data.restart_lsn = restart_lsn;
> + SpinLockRelease(&slot->mutex);
> +
> if (!RecoveryInProgress() && SlotIsLogical(slot))
> {
> XLogRecPtr flushptr;
>
> - /* start at current insert position */
> - restart_lsn = GetXLogInsertRecPtr();
> - SpinLockAcquire(&slot->mutex);
> - slot->data.restart_lsn = restart_lsn;
> - SpinLockRelease(&slot->mutex);
> -
> /* make sure we have enough information to start */
> flushptr = LogStandbySnapshot();
>
> /* and make sure it's fsynced to disk */
> XLogFlush(flushptr);
> }
> - else
> - {
> - restart_lsn = GetRedoRecPtr();
> - SpinLockAcquire(&slot->mutex);
> - slot->data.restart_lsn = restart_lsn;
> - SpinLockRelease(&slot->mutex);
> - }

> /*
> + * Resolve recovery conflicts with slots.
> + *
> + * When xid is valid, it means it's a removed-xid kind of conflict, so need to
> + * drop the appropriate slots whose xmin conflicts with removed xid.

I don't think "removed-xid kind of conflict" is that descriptive. I'd
suggest something like "When xid is valid, it means that rows older than
xid might have been removed. Therefore we need to drop slots that depend
on seeing those rows."

> + * When xid is invalid, drop all logical slots. This is required when the
> + * master wal_level is set back to replica, so existing logical slots need to
> + * be dropped.
> + */
> +void
> +ResolveRecoveryConflictWithSlots(Oid dboid, TransactionId xid)
> +{
> + int i;
> + bool found_conflict = false;
> +
> + if (max_replication_slots <= 0)
> + return;
> +
> +restart:
> + if (found_conflict)
> + {
> + CHECK_FOR_INTERRUPTS();
> + /*
> + * Wait awhile for them to die so that we avoid flooding an
> + * unresponsive backend when system is heavily loaded.
> + */
> + pg_usleep(100000);
> + found_conflict = false;
> + }

Hm, I wonder if we could use the condition variable the slot
infrastructure has these days for this instead.

> + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
> + for (i = 0; i < max_replication_slots; i++)
> + {
> + ReplicationSlot *s;
> + NameData slotname;
> + TransactionId slot_xmin;
> + TransactionId slot_catalog_xmin;
> +
> + s = &ReplicationSlotCtl->replication_slots[i];
> +
> + /* cannot change while ReplicationSlotCtlLock is held */
> + if (!s->in_use)
> + continue;
> +
> + /* Invalid xid means caller is asking to drop all logical slots */
> + if (!TransactionIdIsValid(xid) && SlotIsLogical(s))
> + found_conflict = true;

I'd just add

if (!SlotIsLogical(s))
continue;

because all of this doesn't need to happen for slots that aren't
logical.

> + else
> + {
> + /* not our database, skip */
> + if (s->data.database != InvalidOid && s->data.database != dboid)
> + continue;
> +
> + SpinLockAcquire(&s->mutex);
> + slotname = s->data.name;
> + slot_xmin = s->data.xmin;
> + slot_catalog_xmin = s->data.catalog_xmin;
> + SpinLockRelease(&s->mutex);
> +
> + if (TransactionIdIsValid(slot_xmin) && TransactionIdPrecedesOrEquals(slot_xmin, xid))
> + {
> + found_conflict = true;
> +
> + ereport(LOG,
> + (errmsg("slot %s w/ xmin %u conflicts with removed xid %u",
> + NameStr(slotname), slot_xmin, xid)));
> + }

s/removed xid/xid horizon being increased to %u/

> + if (TransactionIdIsValid(slot_catalog_xmin) && TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid))
> + {
> + found_conflict = true;
> +
> + ereport(LOG,
> + (errmsg("slot %s w/ catalog xmin %u conflicts with removed xid %u",
> + NameStr(slotname), slot_catalog_xmin, xid)));
> + }
> +
> + }
> + if (found_conflict)
> + {

Hm, as far as I can tell you just ignore that the slot might currently
be in use. You can't just drop a slot that somebody is using. I think
you need to send a recovery conflict to that backend.

I guess the easiest way to do that would be something roughly like:

SetInvalidVirtualTransactionId(vxid);

LWLockAcquire(ProcArrayLock, LW_SHARED);
cancel_proc = BackendPidGetProcWithLock(active_pid);
if (cancel_proc)
vxid = GET_VXID_FROM_PGPROC(cancel_proc);
LWLockRelease(ProcArrayLock);

if (VirtualTransactionIdIsValid(vixd))
{
CancelVirtualTransaction(vxid);

/* Wait here until we get signaled, and then restart */
ConditionVariableSleep(&slot->active_cv,
WAIT_EVENT_REPLICATION_SLOT_DROP);
}
ConditionVariableCancelSleep();

when the slot is currently active. Part of this would need to be split
into a procarray.c helper function (mainly all the stuff dealing with
ProcArrayLock).

> + elog(LOG, "Dropping conflicting slot %s", s->data.name.data);

This definitely needs to be expanded, and follow the message style
guideline.

> + LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */

Instead of saying "deadlock" I'd just say that ReplicationSlotDropPtr()
will acquire that lock.

> + ReplicationSlotDropPtr(s);

But more importantly, I don't think this is
correct. ReplicationSlotDropPtr() assumes that the to-be-dropped slot is
acquired by the current backend - without that somebody else could
concurrently acquire that slot.

SO I think you need to do something like ReplicationSlotsDropDBSlots()
does:

/* acquire slot, so ReplicationSlotDropAcquired can be reused */
SpinLockAcquire(&s->mutex);
/* can't change while ReplicationSlotControlLock is held */
slotname = NameStr(s->data.name);
active_pid = s->active_pid;
if (active_pid == 0)
{
MyReplicationSlot = s;
s->active_pid = MyProcPid;
}
SpinLockRelease(&s->mutex);

Greetings,

Andres Freund

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Alexander Korotkov 2019-06-19 19:04:43 Re: SQL/JSON path issues/questions
Previous Message Alexander Korotkov 2019-06-19 18:59:03 Re: SQL/JSON path issues/questions