Re: Logical decoding on standby

From: Andres Freund <andres(at)anarazel(dot)de>
To: Craig Ringer <craig(at)2ndquadrant(dot)com>
Cc: Simon Riggs <simon(dot)riggs(at)2ndquadrant(dot)com>, Thom Brown <thom(at)linux(dot)com>, Michael Paquier <michael(dot)paquier(at)gmail(dot)com>, Petr Jelinek <petr(dot)jelinek(at)2ndquadrant(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Logical decoding on standby
Date: 2017-03-20 09:33:29
Message-ID: 20170320093329.6lcrlvrmses652fw@alap3.anarazel.de
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi,

Have you checked how high the overhead of XLogReadDetermineTimeline is?
A non-local function call, especially into a different translation-unit
(no partial inlining), for every single page might end up being
noticeable. That's fine in the cases it actually adds functionality,
but for a master streaming out data, that's not actually adding
anything.

Did you check whether you changes to read_local_xlog_page could cause
issues with twophase.c? Because that now also uses it.

Did you check whether ThisTimeLineID is actually always valid in the
processes logical decoding could run in? IIRC it's not consistently
update during recovery in any process but the startup process.

On 2017-03-19 21:12:23 +0800, Craig Ringer wrote:
> From 2fa891a555ea4fb200d75b8c906c6b932699b463 Mon Sep 17 00:00:00 2001
> From: Craig Ringer <craig(at)2ndquadrant(dot)com>
> Date: Thu, 1 Sep 2016 10:16:55 +0800
> Subject: [PATCH 2/3] Follow timeline switches in logical decoding

FWIW, the title doesn't really seem accurate to me.

> Logical slots cannot actually be created on a replica without use of
> the low-level C slot management APIs so this is mostly foundation work
> for subsequent changes to enable logical decoding on standbys.

Everytime I read references to anything like this my blood starts to
boil. I kind of regret not having plastered RecoveryInProgress() errors
all over this code.

> From 8854d44e2227b9d076b0a25a9c8b9df9270b2433 Mon Sep 17 00:00:00 2001
> From: Craig Ringer <craig(at)2ndquadrant(dot)com>
> Date: Mon, 5 Sep 2016 15:30:53 +0800
> Subject: [PATCH 3/3] Logical decoding on standby
>
> * Make walsender aware of ProcSignal and recovery conflicts, make walsender
> exit with recovery conflict on upstream drop database when it has an active
> logical slot on that database.
> * Allow GetOldestXmin to omit catalog_xmin, be called already locked.

"be called already locked"?

> * Send catalog_xmin separately in hot_standby_feedback messages.
> * Store catalog_xmin separately on a physical slot if received in hot_standby_feedback

What does separate mean?

> * Separate the catalog_xmin used by vacuum from ProcArray's replication_slot_catalog_xmin,
> requiring that xlog be emitted before vacuum can remove no longer needed catalogs, store
> it in checkpoints, make vacuum and bgwriter advance it.

I can't parse that sentence.

> * Add a new recovery conflict type for conflict with catalog_xmin. Abort
> in-progress logical decoding sessions with conflict with recovery where needed
> catalog_xmin is too old

Are we retaining WAL for slots broken in that way?

> * Make extra efforts to reserve master's catalog_xmin during decoding startup
> on standby.

What does that mean?

> * Remove checks preventing starting logical decoding on standby

To me that's too many different things in one commit. A bunch of them
seem like it'd be good if they'd get independent buildfarm cycles too.

> diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
> index d7f65a5..36bbb98 100644
> --- a/src/backend/access/heap/rewriteheap.c
> +++ b/src/backend/access/heap/rewriteheap.c
> @@ -812,7 +812,8 @@ logical_begin_heap_rewrite(RewriteState state)
> if (!state->rs_logical_rewrite)
> return;
>
> - ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
> + /* Use the catalog_xmin being retained by vacuum */
> + ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin, NULL);

What does that comment mean? Vacuum isn't the only thing that prunes old
records.

> +/*
> + * Set the global oldest catalog_xmin used to determine when tuples
> + * may be removed from catalogs and user-catalogs accessible from logical
> + * decoding.
> + *
> + * Only to be called from the startup process or by UpdateOldestCatalogXmin(),
> + * which ensures the update is properly written to xlog first.
> + */
> +void
> +SetOldestCatalogXmin(TransactionId oldestCatalogXmin)
> +{
> + Assert(InRecovery || !IsUnderPostmaster || AmStartupProcess() || LWLockHeldByMe(ProcArrayLock));

Uh, that's long-ish. And doesn't agree with the comment above
(s/startup process/process performing recovery/?).

This is a long enough list that I'd consider just dropping the assert.

> + else if (info == XLOG_XACT_CATALOG_XMIN_ADV)
> + {
> + xl_xact_catalog_xmin_advance *xlrec = (xl_xact_catalog_xmin_advance *) XLogRecGetData(record);
> +
> + /*
> + * Unless logical decoding is possible on this node, we don't care about
> + * this record.
> + */
> + if (!XLogLogicalInfoActive() || max_replication_slots == 0)
> + return;

Too many negatives for my taste, but whatever.

> + /*
> + * Apply the new catalog_xmin limit immediately. New decoding sessions
> + * will refuse to start if their slot is past it, and old ones will
> + * notice when we signal them with a recovery conflict. There's no
> + * effect on the catalogs themselves yet, so it's safe for backends
> + * with older catalog_xmins to still exist.
> + *
> + * We don't have to take ProcArrayLock since only the startup process
> + * is allowed to change oldestCatalogXmin when we're in recovery.
> + */
> + SetOldestCatalogXmin(xlrec->new_catalog_xmin);

Which seems to rely on ResolveRecoveryConflictWithLogicalDecoding's
lwlock acquisition for barriers?

> +/*
> + * Record when we advance the catalog_xmin used for tuple removal
> + * so standbys find out before we remove catalog tuples they might
> + * need for logical decoding.
> + */
> +XLogRecPtr
> +XactLogCatalogXminUpdate(TransactionId new_catalog_xmin)
> +{
> + XLogRecPtr ptr = InvalidXLogRecPtr;
> +
> + if (XLogInsertAllowed())
> + {
> + xl_xact_catalog_xmin_advance xlrec;
> +
> + xlrec.new_catalog_xmin = new_catalog_xmin;
> +
> + XLogBeginInsert();
> + XLogRegisterData((char *) &xlrec, SizeOfXactCatalogXminAdvance);
> +
> + ptr = XLogInsert(RM_XACT_ID, XLOG_XACT_CATALOG_XMIN_ADV);
> + }

Huh, why is this test needed and ok?

> @@ -9449,6 +9456,16 @@ XLogReportParameters(void)
> XLogFlush(recptr);
> }
>
> + /*
> + * If wal_level was lowered from WAL_LEVEL_LOGICAL we no longer
> + * require oldestCatalogXmin in checkpoints and it no longer
> + * makes sense, so update shmem and xlog the change. This will
> + * get written out in the next checkpoint.
> + */
> + if (ControlFile->wal_level >= WAL_LEVEL_LOGICAL &&
> + wal_level < WAL_LEVEL_LOGICAL)
> + UpdateOldestCatalogXmin(true);

What if we crash before this happens?

> diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
> index ff633fa..2d16bf0 100644
> --- a/src/backend/commands/vacuum.c
> +++ b/src/backend/commands/vacuum.c
> @@ -518,6 +518,15 @@ vacuum_set_xid_limits(Relation rel,
> MultiXactId safeMxactLimit;
>
> /*
> + * When logical decoding is enabled, we must write any advance of
> + * catalog_xmin to xlog before we allow VACUUM to remove those tuples.
> + * This ensures that any standbys doing logical decoding can cancel
> + * decoding sessions and invalidate slots if we remove tuples they
> + * still need.
> + */
> + UpdateOldestCatalogXmin(false);

I'm on a first read-through through this, but it appears you don't do
anything similar in heap_page_prune()? And we can't just start emitting
loads of additional records there, because it's called much more often...

> /*
> * Make sure the current settings & environment are capable of doing logical
> * decoding.
> @@ -87,23 +95,53 @@ CheckLogicalDecodingRequirements(void)
> (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> errmsg("logical decoding requires a database connection")));
>
> - /* ----
> - * TODO: We got to change that someday soon...
> - *
> - * There's basically three things missing to allow this:
> - * 1) We need to be able to correctly and quickly identify the timeline a
> - * LSN belongs to
> - * 2) We need to force hot_standby_feedback to be enabled at all times so
> - * the primary cannot remove rows we need.
> - * 3) support dropping replication slots referring to a database, in
> - * dbase_redo. There can't be any active ones due to HS recovery
> - * conflicts, so that should be relatively easy.
> - * ----
> - */
> if (RecoveryInProgress())
> - ereport(ERROR,
> - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
> - errmsg("logical decoding cannot be used while in recovery")));
> + {
> + bool walrcv_running, walrcv_has_slot;
> +
> + SpinLockAcquire(&WalRcv->mutex);
> + walrcv_running = WalRcv->pid != 0;
> + walrcv_has_slot = WalRcv->slotname[0] != '\0';
> + SpinLockRelease(&WalRcv->mutex);
> +
> + /*
> + * The walreceiver should be running when we try to create a slot. If
> + * we're unlucky enough to catch the walreceiver just as it's
> + * restarting after an error, well, the client can just retry. We don't
> + * bother to sleep and re-check.
> + */
> + if (!walrcv_running)
> + ereport(ERROR,
> + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> + errmsg("streaming replication is not active"),
> + errhint("Logical decoding on standby requires that streaming replication be configured and active. Ensure that primary_conninfo is correct in recovery.conf and check for streaming replication errors in the logs.")));

That seems quite problematic. What if there's a momentaneous connection
failure? This also has the issue that just because you checked that
walrcv_running at some point, doesn't guarantee anything by the time you
actually check. Seems like life were easier if recovery.conf were
guc-ified already - checking for primary_conninfo/primary_slot_name etc
wouldn't have that issue (and can't be changed while running).

Usage of a slot doesn't actually guarantee much in cascased setups, does
it?

> @@ -266,7 +306,9 @@ CreateInitDecodingContext(char *plugin,
> * xmin horizons by other backends, get the safe decoding xid, and inform
> * the slot machinery about the new limit. Once that's done the
> * ProcArrayLock can be released as the slot machinery now is
> - * protecting against vacuum.
> + * protecting against vacuum - if we're on the master. If we're running on
> + * a replica, we have to wait until hot_standby_feedback locks in our
> + * needed catalogs, per details on WaitForMasterCatalogXminReservation().
> * ----
> */
> LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
> @@ -276,6 +318,12 @@ CreateInitDecodingContext(char *plugin,
>
> ReplicationSlotsComputeRequiredXmin(true);
>
> + if (RecoveryInProgress())
> + WaitForMasterCatalogXminReservation(slot);
> +
> + Assert(TransactionIdPrecedesOrEquals(ShmemVariableCache->oldestCatalogXmin,
> + slot->data.catalog_xmin));
> +
> LWLockRelease(ProcArrayLock);

I think it's quite a bad idea to do a blocking operation like
WaitForMasterCatalogXminReservation while holding ProcArrayLock.

> +/*
> + * Wait until the master's catalog_xmin is set, advancing our catalog_xmin
> + * if needed. Caller must hold exclusive ProcArrayLock, which this function will
> + * temporarily release while sleeping but will re-acquire.

Ah. I see. Hm :(.

> + * We're pretty much just hoping that, if someone else already has a
> + * catalog_xmin reservation affecting the master, it stays where we want it
> + * until our own hot_standby_feedback can pin it down.

Hm.

> + * When we're creating a slot on a standby we can't directly set the
> + * master's catalog_xmin; the catalog_xmin is set locally, then relayed
> + * over hot_standby_feedback. The master may remove the catalogs we
> + * asked to reserve between when we set a local catalog_xmin and when
> + * hs feedback makes that take effect on the master. We need a feedback
> + * reply mechanism here, where:
> + *
> + * - we tentatively reserve catalog_xmin locally

Will that already trigger recovery conflicts?

> + * - we wake the walreceiver by setting its latch
> + * - walreceiver sends hs_feedback
> + * - upstream walsender sends a new 'hs_feedback reply' message with
> + * actual (xmin, catalog_xmin) reserved.
> + * - walreceiver sees reply and updates ShmemVariableCache or some other
> + * handy bit of shmem with hs feedback reservations from reply

"or some other handy bit"?

> + * - we poll the reservations while we wait
> + * - we set our catalog_xmin to that value, which might be later if
> + * we missed our requested reservation, or might be earlier if
> + * someone else is holding down catalog_xmin on master. We got a hs
> + * feedback reply so we know it's reserved.
> + *
> + * For cascading, the actual reservation will need to cascade up the
> + * chain by walsender setting its own walreceiver's latch in turn, etc.
> + *
> + * For now, we just set the local slot catalog_xmin and sleep until
> + * oldestCatalogXmin equals or passes our reservation. This is fine if we're
> + * the only decoding session, but it is vulnerable to races if slots on the
> + * master or other decoding sessions on other standbys connected to the same
> + * master exist. They might advance their reservation before our hs_feedback
> + * locks it down, allowing vacuum to remove tuples we need. So we might start
> + * decoding on our slot then error with a conflict with recovery when we see
> + * catalog_xmin advance.
> + */

I was about to list some of these issues. That's a bit unsatisfying.

Pondering this for a bit, but I'm ~9h into a flight, so maybe not
tonight^Wthis morning^Wwhaaaa.

> +static void
> +WaitForMasterCatalogXminReservation(ReplicationSlot *slot)
> +{

This comment seems to duplicate some of the function header
comment. Such duplication usually leads to either or both getting out of
date rather quickly.

Not commenting line-by-line on the code here, but I'm extremely doubtful
that this approach is stable enough, and that the effect of holding
ProcArrayLock exclusively over prolonged amounts of time is acceptable.

> + ReplicationSlotsComputeRequiredXmin(true);
>
Why do we need this? The caller does it too, no?

> + /* Tell the master what catalog_xmin we settled on */
> + WalRcvForceReply();
> +
> + /* Reset ps display if we changed it */
> + if (new_status)
> + {
> + set_ps_display(new_status, false);
> + pfree(new_status);
> + }

We really shouldn't do stuff like this while holding ProcArrayLock.

> +/*
> + * Test to see if the active logical slot is usable.
> + */
> +static void
> +EnsureActiveLogicalSlotValid()
> +{

Missing (void).

> +/*
> + * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
> + * passed database oid. The caller should hold an exclusive lock on the database
> + * to ensure no replication slots on the database are in use.

Stuff like this really should be it's own commit. It can trivially be
tested on its own, is useful on its own (just have DROP DATABASE do it),
...

> + * If we fail here we'll leave the in-memory state of replication slots
> + * inconsistent with its on-disk state, so we need to PANIC.

We worked quite hard to make it extremely unlikely for that to happen in
practice. I also don't see why there should be any new PANICs in this
code.

> + * This routine isn't as efficient as it could be - but we don't drop databases
> + * often, especially databases with lots of slots.

That seems fine.

> +void
> +ReplicationSlotsDropDBSlots(Oid dboid)
> +{
> + int i;
> +
> + if (max_replication_slots <= 0)
> + return;
> +
> + /*
> + * We only need a shared lock here even though we activate slots,
> + * because we have an exclusive lock on the database we're dropping
> + * slots on and don't touch other databases' slots.
> + */
> + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);

Hm? Acquiring a slot always only takes a shared lock, no?

I don't really see how "database is locked" guarantees enough for your
logic - it's already possible to drop slots from other databases, and
dropping a slot acquires it temporarily?

> + for (i = 0; i < max_replication_slots; i++)
> + {
> + ReplicationSlot *s;
> + NameData slotname;
> + int active_pid;
> +
> + s = &ReplicationSlotCtl->replication_slots[i];
> +
> + /* cannot change while ReplicationSlotCtlLock is held */
> + if (!s->in_use)
> + continue;
> +
> + /* only logical slots are database specific, skip */
> + if (!SlotIsLogical(s))
> + continue;
> +
> + /* not our database, skip */
> + if (s->data.database != dboid)
> + continue;
> +
> + /* Claim the slot, as if ReplicationSlotAcquire()ing */
> + SpinLockAcquire(&s->mutex);
> + strncpy(NameStr(slotname), NameStr(s->data.name), NAMEDATALEN);
> + NameStr(slotname)[NAMEDATALEN-1] = '\0';
> + active_pid = s->active_pid;
> + if (active_pid == 0)
> + {
> + MyReplicationSlot = s;
> + s->active_pid = MyProcPid;
> + }
> + SpinLockRelease(&s->mutex);
> +
> + /*
> + * The caller should have an exclusive lock on the database so
> + * we'll never have any in-use slots, but just in case...
> + */
> + if (active_pid)
> + elog(PANIC, "replication slot %s is in use by pid %d",
> + NameStr(slotname), active_pid);

So, yea, this doesn't seem ok. Why don't we just ERROR out, instead of
PANICing? There seems to be absolutely no correctness reason for a PANIC
here?

> + /*
> + * To avoid largely duplicating ReplicationSlotDropAcquired() or
> + * complicating it with already_locked flags for ProcArrayLock,
> + * ReplicationSlotControlLock and ReplicationSlotAllocationLock, we
> + * just release our ReplicationSlotControlLock to drop the slot.
> + *
> + * There's no race here: we acquired this slot, and no slot "behind"
> + * our scan can be created or become active with our target dboid due
> + * to our exclusive lock on the DB.
> + */
> + LWLockRelease(ReplicationSlotControlLock);
> + ReplicationSlotDropAcquired();
> + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);

I don't see much problem with this, but I'd change the code so you
simply do a goto restart; if you released the slot. Then there's a lot
less chance / complications around temporarily releasing
ReplicationSlotControlLock.

> + *
> + * If logical decoding information is enabled, we also
> + * send immediate hot standby feedback so as to reduce
> + * the delay before our needed catalogs are locked in.

"logical decoding information ... enabled" and "catalogs are locked in"
are a bit too imprecise descriptions for my taste.

> @@ -1175,8 +1181,8 @@ XLogWalRcvSendHSFeedback(bool immed)
> {
> TimestampTz now;
> TransactionId nextXid;
> - uint32 nextEpoch;
> - TransactionId xmin;
> + uint32 xmin_epoch, catalog_xmin_epoch;
> + TransactionId xmin, catalog_xmin;
> static TimestampTz sendTime = 0;
> /* initially true so we always send at least one feedback message */
> static bool master_has_standby_xmin = true;
> @@ -1221,29 +1227,57 @@ XLogWalRcvSendHSFeedback(bool immed)
> * everything else has been checked.
> */
> if (hot_standby_feedback)
> - xmin = GetOldestXmin(NULL, false);
> + {
> + /*
> + * Usually GetOldestXmin() would include the catalog_xmin in its
> + * calculations, but we don't want to hold upstream back from vacuuming
> + * normal user table tuples just because they're within the
> + * catalog_xmin horizon of logical replication slots on this standby.
> + * Instead we report the catalog_xmin to the upstream separately.
> + */

I again don't think it's good to refer to vacuum as it's not the only
thing that can remove tuple versions.

> + xmin = GetOldestXmin(NULL,
> + false, /* don't ignore vacuum */
> + true /* ignore catalog xmin */);
> +
> + /*
> + * The catalog_Xmin reported by GetOldestXmin is the effective
> + * catalog_xmin used by vacuum, as set by xl_xact_catalog_xmin_advance
> + * records from the master. Sending it back to the master would be
> + * circular and prevent its catalog_xmin ever advancing once set.
> + * We should only send the catalog_xmin we actually need for slots.
> + */
> + ProcArrayGetReplicationSlotXmin(NULL, NULL, &catalog_xmin);

Given that you don't have catalog_xmin set by GetOldestXmin that comment
is a bit misleading.

> @@ -1427,19 +1436,93 @@ GetOldestXmin(Relation rel, bool ignoreVacuum)
> NormalTransactionIdPrecedes(replication_slot_xmin, result))
> result = replication_slot_xmin;
>
> + if (!ignoreCatalogXmin && (rel == NULL || RelationIsAccessibleInLogicalDecoding(rel)))
> + {
> + /*
> + * After locks have been released and defer_cleanup_age has been applied,
> + * check whether we need to back up further to make logical decoding
> + * safe. We need to do so if we're computing the global limit (rel =
> + * NULL) or if the passed relation is a catalog relation of some kind.
> + */
> + if (TransactionIdIsValid(replication_slot_catalog_xmin) &&
> + NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
> + result = replication_slot_catalog_xmin;
> + }

The nesting of these checks, and the comments about them, is a bit
weird.

> +/*
> + * Return true if ShmemVariableCache->oldestCatalogXmin needs to be updated
> + * to reflect an advance in procArray->replication_slot_catalog_xmin or
> + * it becoming newly set or unset.
> + *
> + */
> +static bool
> +CatalogXminNeedsUpdate(TransactionId vacuum_catalog_xmin, TransactionId slots_catalog_xmin)
> +{
> + return (TransactionIdPrecedes(vacuum_catalog_xmin, slots_catalog_xmin)
> + || (TransactionIdIsValid(vacuum_catalog_xmin) != TransactionIdIsValid(slots_catalog_xmin)));
> +}

Your lines are really long - pgindent (which you really should run) will
much this. I think it'd be better to rephrase this.

> +/*
> + * If necessary, copy the current catalog_xmin needed by repliation slots to

Typo: repliation

> + * the effective catalog_xmin used for dead tuple removal.
> + *
> + * When logical decoding is enabled we write a WAL record before advancing the
> + * effective value so that standbys find out if catalog tuples they still need
> + * get removed, and can properly cancel decoding sessions and invalidate slots.
> + *
> + * The 'force' option is used when we're turning WAL_LEVEL_LOGICAL off
> + * and need to clear the shmem state, since we want to bypass the wal_level
> + * check and force xlog writing.
> + */
> +void
> +UpdateOldestCatalogXmin(bool force)

I'm a bit confused by this function and variable name. What does

+ TransactionId oldestCatalogXmin; /* oldest xid where complete catalog state
+ * is guaranteed to still exist */

mean? I complained about the overall justification in the commit
already, but looking at this commit alone, the justification for this
part of the change is quite hard to understand.

> +{
> + TransactionId vacuum_catalog_xmin;
> + TransactionId slots_catalog_xmin;
> +
> + /*
> + * If we're not recording logical decoding information, catalog_xmin
> + * must be unset and we don't need to do any work here.

If we don't need to do any work, shouldn't we return early?

> + if (CatalogXminNeedsUpdate(vacuum_catalog_xmin, slots_catalog_xmin) || force)
> + {
> + XactLogCatalogXminUpdate(slots_catalog_xmin);
> +
> + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
> + /*
> + * A concurrent updater could've changed these values so we need to re-check
> + * under ProcArrayLock before updating.
> + */
> + vacuum_catalog_xmin = *((volatile TransactionId*)&ShmemVariableCache->oldestCatalogXmin);
> + slots_catalog_xmin = *((volatile TransactionId*)&procArray->replication_slot_catalog_xmin);

why are there volatile reads here?

> + if (CatalogXminNeedsUpdate(vacuum_catalog_xmin, slots_catalog_xmin))
> + SetOldestCatalogXmin(slots_catalog_xmin);

Why don't we check force here, but above?

> @@ -2167,14 +2250,20 @@ GetOldestSafeDecodingTransactionId(void)
> oldestSafeXid = ShmemVariableCache->nextXid;
>
> /*
> - * If there's already a slot pegging the xmin horizon, we can start with
> - * that value, it's guaranteed to be safe since it's computed by this
> - * routine initially and has been enforced since.
> + * If there's already an effectiveCatalogXmin held down by vacuum
> + * it's definitely safe to start there, and it can't advance
> + * while we hold ProcArrayLock.

What does "held down by vacuum" mean?

> /*
> + * Notify a logical decoding session that it conflicts with a
> + * newly set catalog_xmin from the master.
> + */
> +void
> +CancelLogicalDecodingSessionWithRecoveryConflict(pid_t session_pid)
> +{
> + ProcArrayStruct *arrayP = procArray;
> + int index;
> +
> + /*
> + * We have to scan ProcArray to find the process and set a pending recovery
> + * conflict even though we know the pid. At least we can get the BackendId
> + * and void a ProcSignal scan later.
> + *
> + * The pid might've gone away, in which case we got the desired
> + * outcome anyway.
> + */
> + LWLockAcquire(ProcArrayLock, LW_SHARED);
> +
> + for (index = 0; index < arrayP->numProcs; index++)
> + {
> + int pgprocno = arrayP->pgprocnos[index];
> + volatile PGPROC *proc = &allProcs[pgprocno];
> +
> + if (proc->pid == session_pid)
> + {
> + VirtualTransactionId procvxid;
> +
> + GET_VXID_FROM_PGPROC(procvxid, *proc);
> +
> + proc->recoveryConflictPending = true;
> +
> + /*
> + * Kill the pid if it's still here. If not, that's what we
> + * wanted so ignore any errors.
> + */
> + (void) SendProcSignal(session_pid,
> + PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN, procvxid.backendId);
> +
> + break;
> + }
> + }
> +
> + LWLockRelease(ProcArrayLock);

Doesn't seem ok to do this while holding ProcArrayLock.

> +/*
> + * Scan to see if any clients are using replication slots that are below the
> + * new catalog_xmin theshold and sigal them to terminate with a recovery
> + * conflict.
> + *
> + * We already applied the new catalog_xmin record and updated the shmem
> + * catalog_xmin state, so new clients that try to use a replication slot
> + * whose on-disk catalog_xmin is below the new threshold will ERROR, and we
> + * don't have to guard against them here.
> + *
> + * Replay can only continue safely once every slot that needs the catalogs
> + * we're going to free for removal is gone. So if any conflicting sessions
> + * exist, wait for any standby conflict grace period then signal them to exit.
> + *
> + * The master might clear its reserved catalog_xmin if all upstream slots are
> + * removed or clear their feedback reservations, sending us
> + * InvalidTransactionId. If we're concurrently trying to create a new slot and
> + * reserve catalogs the InvalidXid reservation report might come in while we
> + * have a slot waiting for hs_feedback confirmation of its reservation. That
> + * would cause the waiting process to get canceled with a conflict with
> + * recovery here since its tentative reservation conflicts with the master's
> + * report of 'nothing reserved'. To allow it to continue to seek a startpoint
> + * we ignore slots whose catalog_xmin is >= nextXid, indicating that they're
> + * still looking for where to start. We'll sometimes notice a conflict but the
> + * slot will advance its catalog_xmin to a more recent nextXid and cease to
> + * conflict when we re-check. (The alternative is to track slots being created
> + * differently to slots actively decoding in shmem, which seems unnecessary. Or
> + * to separate the 'tentative catalog_xmin reservation' of a slot from its
> + * actual needed catalog_xmin.)
> + *
> + * We can't use ResolveRecoveryConflictWithVirtualXIDs() here because
> + * walsender-based logical decoding sessions won't have any virtualxid for much
> + * of their life and the end of their virtualxids doesn't mean the end of a
> + * potential conflict. It would also cancel too aggressively, since it cares
> + * about the backend's xmin and logical decoding only needs the catalog_xmin.
> + */

The use of "we" seems confusing here, because it's not the same process.

Generally I think your comments need to be edited a bit for brevity and
preciseness.

> +void
> +ResolveRecoveryConflictWithLogicalDecoding(TransactionId new_catalog_xmin)
> +{
> + int i;
> +
> + if (!InHotStandby)
> + /* nobody can be actively using logical slots */
> + return;
> +
> + /* Already applied new limit, can't have replayed later one yet */
> + Assert(ShmemVariableCache->oldestCatalogXmin == new_catalog_xmin);
> +
> + /*
> + * Find the first conflicting active slot and wait for it to be free,
> + * signalling it if necessary, then repeat until there are no more
> + * conflicts.
> + */
> + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
> + for (i = 0; i < max_replication_slots; i++)
> + {

I'm pretty strongly against any code outside of slot.c doing this.

> @@ -2789,12 +2797,13 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
> Assert(RecoveryConflictPending && (QueryCancelPending || ProcDiePending));
>
> /*
> - * All conflicts apart from database cause dynamic errors where the
> - * command or transaction can be retried at a later point with some
> - * potential for success. No need to reset this, since non-retryable
> - * conflict errors are currently FATAL.
> + * All conflicts apart from database and catalog_xmin cause dynamic
> + * errors where the command or transaction can be retried at a later
> + * point with some potential for success. No need to reset this, since
> + * non-retryable conflict errors are currently FATAL.
> */
> - if (reason == PROCSIG_RECOVERY_CONFLICT_DATABASE)
> + if (reason == PROCSIG_RECOVERY_CONFLICT_DATABASE ||
> + reason == PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN)
> RecoveryConflictRetryable = false;
> }

Hm. Why is this a non-retryable error?

Ok, landing soon. Gotta finish here.

0002 should be doable as a whole this release, I have severe doubts that
0003 as a whole has a chance for 10 - the code is in quite a raw shape,
there's a significant number of open ends. I'd suggest breaking of bits
that are independently useful, and work on getting those committed.

- Andres

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Vitaly Burovoy 2017-03-20 09:43:24 Re: identity columns
Previous Message Alexander Korotkov 2017-03-20 09:33:26 Re: [PATCH] Incremental sort (was: PoC: Partial sort)