Re: logical decoding and replication of sequences, take 2

From: Tomas Vondra <tomas(dot)vondra(at)enterprisedb(dot)com>
To: Andres Freund <andres(at)anarazel(dot)de>
Cc: Robert Haas <robertmhaas(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>, Heikki Linnakangas <heikki(dot)linnakangas(at)iki(dot)fi>
Subject: Re: logical decoding and replication of sequences, take 2
Date: 2023-01-11 21:46:26
Message-ID: d295927b-fcab-cb9f-0a0d-a846041fcf04@enterprisedb.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On 1/11/23 21:12, Andres Freund wrote:
> Hi,
>
>
> Heikki, CCed you due to the point about 2c03216d8311 below.
>
>
> On 2023-01-10 19:32:12 +0100, Tomas Vondra wrote:
>> 0001 is a fix for the pre-existing issue in logicalmsg_decode,
>> attempting to build a snapshot before getting into a consistent state.
>> AFAICS this only affects assert-enabled builds and is otherwise
>> harmless, because we are not actually using the snapshot (apply gets a
>> valid snapshot from the transaction).
>
> LGTM.
>
>
>> 0002 is a rebased version of the original approach, committed as
>> 0da92dc530 (and then reverted in 2c7ea57e56). This includes the same fix
>> as 0001 (for the sequence messages), the primary reason for the revert.
>>
>> The rebase was not quite straightforward, due to extensive changes in
>> how publications deal with tables/schemas, and so on. So this adopts
>> them, but other than that it behaves just like the original patch.
>
> This is a huge diff:
>> 72 files changed, 4715 insertions(+), 612 deletions(-)
>
> It'd be nice to split it to make review easier. Perhaps the sequence decoding
> support could be split from the whole publication rigamarole?
>
>
>> This does not include any changes to test_decoding and/or the built-in
>> replication - those will be committed in separate patches.
>
> Looks like that's not the case anymore?
>

Ah, right! Now I realized I originally committed this in chunks, but
the revert was a single commit. And I just "reverted the revert" to
create this patch.

I'll definitely split this into smaller patches. This also explains the
obsolete commit message about test_decoding not being included, etc.

>
>> +/*
>> + * Update the sequence state by modifying the existing sequence data row.
>> + *
>> + * This keeps the same relfilenode, so the behavior is non-transactional.
>> + */
>> +static void
>> +SetSequence_non_transactional(Oid seqrelid, int64 last_value, int64 log_cnt, bool is_called)
>> +{
>> + SeqTable elm;
>> + Relation seqrel;
>> + Buffer buf;
>> + HeapTupleData seqdatatuple;
>> + Form_pg_sequence_data seq;
>> +
>> + /* open and lock sequence */
>> + init_sequence(seqrelid, &elm, &seqrel);
>> +
>> + /* lock page' buffer and read tuple */
>> + seq = read_seq_tuple(seqrel, &buf, &seqdatatuple);
>> +
>> + /* check the comment above nextval_internal()'s equivalent call. */
>> + if (RelationNeedsWAL(seqrel))
>> + {
>> + GetTopTransactionId();
>> +
>> + if (XLogLogicalInfoActive())
>> + GetCurrentTransactionId();
>> + }
>> +
>> + /* ready to change the on-disk (or really, in-buffer) tuple */
>> + START_CRIT_SECTION();
>> +
>> + seq->last_value = last_value;
>> + seq->is_called = is_called;
>> + seq->log_cnt = log_cnt;
>> +
>> + MarkBufferDirty(buf);
>> +
>> + /* XLOG stuff */
>> + if (RelationNeedsWAL(seqrel))
>> + {
>> + xl_seq_rec xlrec;
>> + XLogRecPtr recptr;
>> + Page page = BufferGetPage(buf);
>> +
>> + XLogBeginInsert();
>> + XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
>> +
>> + xlrec.locator = seqrel->rd_locator;
>> + xlrec.created = false;
>> +
>> + XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
>> + XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
>> +
>> + recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG);
>> +
>> + PageSetLSN(page, recptr);
>> + }
>> +
>> + END_CRIT_SECTION();
>> +
>> + UnlockReleaseBuffer(buf);
>> +
>> + /* Clear local cache so that we don't think we have cached numbers */
>> + /* Note that we do not change the currval() state */
>> + elm->cached = elm->last;
>> +
>> + relation_close(seqrel, NoLock);
>> +}
>> +
>> +/*
>> + * Update the sequence state by creating a new relfilenode.
>> + *
>> + * This creates a new relfilenode, to allow transactional behavior.
>> + */
>> +static void
>> +SetSequence_transactional(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called)
>> +{
>> + SeqTable elm;
>> + Relation seqrel;
>> + Buffer buf;
>> + HeapTupleData seqdatatuple;
>> + Form_pg_sequence_data seq;
>> + HeapTuple tuple;
>> +
>> + /* open and lock sequence */
>> + init_sequence(seq_relid, &elm, &seqrel);
>> +
>> + /* lock page' buffer and read tuple */
>> + seq = read_seq_tuple(seqrel, &buf, &seqdatatuple);
>> +
>> + /* Copy the existing sequence tuple. */
>> + tuple = heap_copytuple(&seqdatatuple);
>> +
>> + /* Now we're done with the old page */
>> + UnlockReleaseBuffer(buf);
>> +
>> + /*
>> + * Modify the copied tuple to update the sequence state (similar to what
>> + * ResetSequence does).
>> + */
>> + seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
>> + seq->last_value = last_value;
>> + seq->is_called = is_called;
>> + seq->log_cnt = log_cnt;
>> +
>> + /*
>> + * Create a new storage file for the sequence - this is needed for the
>> + * transactional behavior.
>> + */
>> + RelationSetNewRelfilenumber(seqrel, seqrel->rd_rel->relpersistence);
>> +
>> + /*
>> + * Ensure sequence's relfrozenxid is at 0, since it won't contain any
>> + * unfrozen XIDs. Same with relminmxid, since a sequence will never
>> + * contain multixacts.
>> + */
>> + Assert(seqrel->rd_rel->relfrozenxid == InvalidTransactionId);
>> + Assert(seqrel->rd_rel->relminmxid == InvalidMultiXactId);
>> +
>> + /*
>> + * Insert the modified tuple into the new storage file. This does all the
>> + * necessary WAL-logging etc.
>> + */
>> + fill_seq_with_data(seqrel, tuple);
>> +
>> + /* Clear local cache so that we don't think we have cached numbers */
>> + /* Note that we do not change the currval() state */
>> + elm->cached = elm->last;
>> +
>> + relation_close(seqrel, NoLock);
>> +}
>> +
>> +/*
>> + * Set a sequence to a specified internal state.
>> + *
>> + * The change is made transactionally, so that on failure of the current
>> + * transaction, the sequence will be restored to its previous state.
>> + * We do that by creating a whole new relfilenode for the sequence; so this
>> + * works much like the rewriting forms of ALTER TABLE.
>> + *
>> + * Caller is assumed to have acquired AccessExclusiveLock on the sequence,
>> + * which must not be released until end of transaction. Caller is also
>> + * responsible for permissions checking.
>> + */
>> +void
>> +SetSequence(Oid seq_relid, bool transactional, int64 last_value, int64 log_cnt, bool is_called)
>> +{
>> + if (transactional)
>> + SetSequence_transactional(seq_relid, last_value, log_cnt, is_called);
>> + else
>> + SetSequence_non_transactional(seq_relid, last_value, log_cnt, is_called);
>> +}
>
> That's a lot of duplication with existing code. There's no explanation why
> SetSequence() as well as do_setval() exists.
>

Thanks, I'll look into this.

>
>> /*
>> * Initialize a sequence's relation with the specified tuple as content
>> *
>> @@ -406,8 +560,13 @@ fill_seq_fork_with_data(Relation rel, HeapTuple tuple, ForkNumber forkNum)
>>
>> /* check the comment above nextval_internal()'s equivalent call. */
>> if (RelationNeedsWAL(rel))
>> + {
>> GetTopTransactionId();
>>
>> + if (XLogLogicalInfoActive())
>> + GetCurrentTransactionId();
>> + }
>
> Is it actually possible to reach this without an xid already having been
> assigned for the current xact?
>

I believe it is. That's probably how I found this change is needed,
actually.

>
>
>> @@ -806,10 +966,28 @@ nextval_internal(Oid relid, bool check_permissions)
>> * It's sufficient to ensure the toplevel transaction has an xid, no need
>> * to assign xids subxacts, that'll already trigger an appropriate wait.
>> * (Have to do that here, so we're outside the critical section)
>> + *
>> + * We have to ensure we have a proper XID, which will be included in
>> + * the XLOG record by XLogRecordAssemble. Otherwise the first nextval()
>> + * in a subxact (without any preceding changes) would get XID 0, and it
>> + * would then be impossible to decide which top xact it belongs to.
>> + * It'd also trigger assert in DecodeSequence. We only do that with
>> + * wal_level=logical, though.
>> + *
>> + * XXX This might seem unnecessary, because if there's no XID the xact
>> + * couldn't have done anything important yet, e.g. it could not have
>> + * created a sequence. But that's incorrect, because of subxacts. The
>> + * current subtransaction might not have done anything yet (thus no XID),
>> + * but an earlier one might have created the sequence.
>> */
>
> What about restricting this to the case you're mentioning,
> i.e. subtransactions?
>

That might work, but I need to think about it a bit.

I don't think it'd save us much, though. I mean, vast majority of
transactions (and subtransactions) calling nextval() will then do
something else which requires a XID. This just moves the XID a bit,
that's all.

>
>> @@ -845,6 +1023,7 @@ nextval_internal(Oid relid, bool check_permissions)
>> seq->log_cnt = 0;
>>
>> xlrec.locator = seqrel->rd_locator;
>
> I realize this isn't from this patch, but:
>
> Why do we include the locator in the record? We already have it via
> XLogRegisterBuffer(), no? And afaict we don't even use it, as we read the page
> via XLogInitBufferForRedo() during recovery.
>
> Kinda looks like an oversight in 2c03216d8311
>

I don't know, it's what the code did.

>
>
>
>> +/*
>> + * Handle sequence decode
>> + *
>> + * Decoding sequences is a bit tricky, because while most sequence actions
>> + * are non-transactional (not subject to rollback), some need to be handled
>> + * as transactional.
>> + *
>> + * By default, a sequence increment is non-transactional - we must not queue
>> + * it in a transaction as other changes, because the transaction might get
>> + * rolled back and we'd discard the increment. The downstream would not be
>> + * notified about the increment, which is wrong.
>> + *
>> + * On the other hand, the sequence may be created in a transaction. In this
>> + * case we *should* queue the change as other changes in the transaction,
>> + * because we don't want to send the increments for unknown sequence to the
>> + * plugin - it might get confused about which sequence it's related to etc.
>> + */
>> +void
>> +sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
>> +{
>
>> + /* extract the WAL record, with "created" flag */
>> + xlrec = (xl_seq_rec *) XLogRecGetData(r);
>> +
>> + /* XXX how could we have sequence change without data? */
>> + if(!datalen || !tupledata)
>> + return;
>
> Yea, I think we should error out here instead, something has gone quite wrong
> if this happens.
>

OK

>
>> + tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
>> + DecodeSeqTuple(tupledata, datalen, tuplebuf);
>> +
>> + /*
>> + * Should we handle the sequence increment as transactional or not?
>> + *
>> + * If the sequence was created in a still-running transaction, treat
>> + * it as transactional and queue the increments. Otherwise it needs
>> + * to be treated as non-transactional, in which case we send it to
>> + * the plugin right away.
>> + */
>> + transactional = ReorderBufferSequenceIsTransactional(ctx->reorder,
>> + target_locator,
>> + xlrec->created);
>
> Why re-create this information during decoding, when we basically already have
> it available on the primary? I think we already pay the price for that
> tracking, which we e.g. use for doing a non-transactional truncate:
>
> /*
> * Normally, we need a transaction-safe truncation here. However, if
> * the table was either created in the current (sub)transaction or has
> * a new relfilenumber in the current (sub)transaction, then we can
> * just truncate it in-place, because a rollback would cause the whole
> * table or the current physical file to be thrown away anyway.
> */
> if (rel->rd_createSubid == mySubid ||
> rel->rd_newRelfilelocatorSubid == mySubid)
> {
> /* Immediate, non-rollbackable truncation is OK */
> heap_truncate_one_rel(rel);
> }
>
> Afaict we could do something similar for sequences, except that I think we
> would just check if the sequence was created in the current transaction
> (i.e. any of the fields are set).
>

Hmm, good point.

>
>> +/*
>> + * A transactional sequence increment is queued to be processed upon commit
>> + * and a non-transactional increment gets processed immediately.
>> + *
>> + * A sequence update may be both transactional and non-transactional. When
>> + * created in a running transaction, treat it as transactional and queue
>> + * the change in it. Otherwise treat it as non-transactional, so that we
>> + * don't forget the increment in case of a rollback.
>> + */
>> +void
>> +ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
>> + Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
>> + RelFileLocator rlocator, bool transactional, bool created,
>> + ReorderBufferTupleBuf *tuplebuf)
>
>
>> + /*
>> + * Decoding needs access to syscaches et al., which in turn use
>> + * heavyweight locks and such. Thus we need to have enough state around to
>> + * keep track of those. The easiest way is to simply use a transaction
>> + * internally. That also allows us to easily enforce that nothing writes
>> + * to the database by checking for xid assignments.
>> + *
>> + * When we're called via the SQL SRF there's already a transaction
>> + * started, so start an explicit subtransaction there.
>> + */
>> + using_subtxn = IsTransactionOrTransactionBlock();
>
> This duplicates a lot of the code from ReorderBufferProcessTXN(). But only
> does so partially. It's hard to tell whether some of the differences are
> intentional. Could we de-duplicate that code with ReorderBufferProcessTXN()?
>
> Maybe something like
>
> void
> ReorderBufferSetupXactEnv(ReorderBufferXactEnv *, bool process_invals);
>
> void
> ReorderBufferTeardownXactEnv(ReorderBufferXactEnv *, bool is_error);
>

Thanks for the suggestion, I'll definitely consider that in the next
version of the patch.

regards

--
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Andres Freund 2023-01-11 21:53:10 Re: logical decoding and replication of sequences, take 2
Previous Message Andres Freund 2023-01-11 21:39:05 Re: Option to not use ringbuffer in VACUUM, using it in failsafe mode