Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions

From: Dilip Kumar <dilipbalaut(at)gmail(dot)com>
To: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
Cc: Kuntal Ghosh <kuntalghosh(dot)2007(at)gmail(dot)com>, Tomas Vondra <tomas(dot)vondra(at)2ndquadrant(dot)com>, Michael Paquier <michael(at)paquier(dot)xyz>, Peter Eisentraut <peter(dot)eisentraut(at)2ndquadrant(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions
Date: 2020-05-15 09:17:57
Message-ID: CAFiTN-v_ydjaCksAA3obA67LaC5imN4mbH4J+vr+NBb6YPvmrA@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Wed, May 13, 2020 at 4:50 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>
> On Wed, May 13, 2020 at 11:35 AM Dilip Kumar <dilipbalaut(at)gmail(dot)com> wrote:
> >
> > On Tue, May 12, 2020 at 4:39 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
> > >
> > >
> > > v20-0003-Extend-the-output-plugin-API-with-stream-methods
> > > ----------------------------------------------------------------------------------------
> > > 1.
> > > +static void
> > > +pg_decode_stream_change(LogicalDecodingContext *ctx,
> > > + ReorderBufferTXN *txn,
> > > + Relation relation,
> > > + ReorderBufferChange *change)
> > > +{
> > > + OutputPluginPrepareWrite(ctx, true);
> > > + appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
> > > + OutputPluginWrite(ctx, true);
> > > +}
> > > +
> > > +static void
> > > +pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
> > > + int nrelations, Relation relations[],
> > > + ReorderBufferChange *change)
> > > +{
> > > + OutputPluginPrepareWrite(ctx, true);
> > > + appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
> > > + OutputPluginWrite(ctx, true);
> > > +}
> > >
> > > In the above and similar APIs, there are parameters like relation
> > > which are not used. I think you should add some comments atop these
> > > APIs to explain why it is so? I guess it is because we want to keep
> > > them similar to non-stream version of APIs and we can't display
> > > relation or other information as the transaction is still in-progress.
> >
> > I think because the interfaces are designed that way because other
> > decoding plugins might need it e.g. in pgoutput we need change and
> > relation but not here. We have other similar examples also e.g.
> > pg_decode_message has the parameter txn but not used. Do you think we
> > still need to add comments?
> >
>
> In that case, we can leave but lets ensure that we are not exposing
> any parameter which is not used and if there is any due to some
> reason, we should document it. I will also look into this.

Ok

> > > 4.
> > > +static void
> > > +stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
> > > +{
> > > + LogicalDecodingContext *ctx = cache->private_data;
> > > + LogicalErrorCallbackState state;
> > > + ErrorContextCallback errcallback;
> > > +
> > > + Assert(!ctx->fast_forward);
> > > +
> > > + /* We're only supposed to call this when streaming is supported. */
> > > + Assert(ctx->streaming);
> > > +
> > > + /* Push callback + info on the error context stack */
> > > + state.ctx = ctx;
> > > + state.callback_name = "stream_start";
> > > + /* state.report_location = apply_lsn; */
> > >
> > > Why can't we supply the report_location here? I think here we need to
> > > report txn->first_lsn if this is the very first stream and
> > > txn->final_lsn if it is any consecutive one.
> >
> > I am not sure about this, Because for the very first stream we will
> > report the location of the first lsn of the stream and for the
> > consecutive stream we will report the last lsn in the stream.
> >
>
> Yeah, that doesn't seem to be consistent. How about if get it as an
> additional parameter? The caller can pass the lsn of the very first
> change it is trying to decode in this stream.

Done

> > > 11.
> > > - * HeapTupleSatisfiesHistoricMVCC.
> > > + * tqual.c's HeapTupleSatisfiesHistoricMVCC.
> > > + *
> > > + * We do build the hash table even if there are no CIDs. That's
> > > + * because when streaming in-progress transactions we may run into
> > > + * tuples with the CID before actually decoding them. Think e.g. about
> > > + * INSERT followed by TRUNCATE, where the TRUNCATE may not be decoded
> > > + * yet when applying the INSERT. So we build a hash table so that
> > > + * ResolveCminCmaxDuringDecoding does not segfault in this case.
> > > + *
> > > + * XXX We might limit this behavior to streaming mode, and just bail
> > > + * out when decoding transaction at commit time (at which point it's
> > > + * guaranteed to see all CIDs).
> > > */
> > > static void
> > > ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
> > > @@ -1350,9 +1498,6 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer
> > > *rb, ReorderBufferTXN *txn)
> > > dlist_iter iter;
> > > HASHCTL hash_ctl;
> > >
> > > - if (!rbtxn_has_catalog_changes(txn) || dlist_is_empty(&txn->tuplecids))
> > > - return;
> > > -
> > >
> > > I don't understand this change. Why would "INSERT followed by
> > > TRUNCATE" could lead to a tuple which can come for decode before its
> > > CID?
> >
> > Actually, even if we haven't decoded the DDL operation but in the
> > actual system table the tuple might have been deleted from the next
> > operation. e.g. while we are streaming the INSERT it is possible that
> > the truncate has already deleted that tuple and set the max for the
> > tuple. So before streaming patch, we were only streaming the INSERT
> > only on commit so by that time we had got all the operation which has
> > done DDL and we would have already prepared tuple CID hash.
> >
>
> Okay, but I think for that case how good is that we always allow CID
> hash table to be built even if there are no catalog changes in TXN
> (see changes in ReorderBufferBuildTupleCidHash). Can't we detect that
> while resolving the cmin/cmax?

Done

>
> Few more comments for v20-0005-Implement-streaming-mode-in-ReorderBuffer:
> ----------------------------------------------------------------------------------------------------------------
> 1.
> /*
> - * Binary heap comparison function.
> + * Binary heap comparison function (regular non-streaming iterator).
> */
> static int
> ReorderBufferIterCompare(Datum a, Datum b, void *arg)
>
> It seems to me the above comment change is not required as per the latest patch.

Done

> 2.
> * For subtransactions, we only mark them as streamed when there are
> + * any changes in them.
> + *
> + * We do it this way because of aborts - we don't want to send aborts
> + * for XIDs the downstream is not aware of. And of course, it always
> + * knows about the toplevel xact (we send the XID in all messages),
> + * but we never stream XIDs of empty subxacts.
> + */
> + if ((!txn->toptxn) || (txn->nentries_mem != 0))
> + txn->txn_flags |= RBTXN_IS_STREAMED;
>
> /when there are any changes in them/when there are changes in them. I
> think we don't need 'any' in the above sentence.

Done

> 3.
> And, during catalog scan we can check the status of the xid and
> + * if it is aborted we will report a specific error that we can ignore. We
> + * might have already streamed some of the changes for the aborted
> + * (sub)transaction, but that is fine because when we decode the abort we will
> + * stream abort message to truncate the changes in the subscriber.
> + */
> +static inline void
> +SetupCheckXidLive(TransactionId xid)
>
> In the above comment, I don't think it is right to say that we ignore
> the error raised due to the aborted transaction. We need to say that
> we discard the already streamed changes on such an error.

Done.

> 4.
> +static inline void
> +SetupCheckXidLive(TransactionId xid)
> +{
> /*
> - * If this transaction has no snapshot, it didn't make any changes to the
> - * database, so there's nothing to decode. Note that
> - * ReorderBufferCommitChild will have transferred any snapshots from
> - * subtransactions if there were any.
> + * setup CheckXidAlive if it's not committed yet. We don't check if the xid
> + * aborted. That will happen during catalog access. Also reset the
> + * sysbegin_called flag.
> */
> - if (txn->base_snapshot == NULL)
> + if (!TransactionIdDidCommit(xid))
> {
> - Assert(txn->ninvalidations == 0);
> - ReorderBufferCleanupTXN(rb, txn);
> - return;
> + CheckXidAlive = xid;
> + bsysscan = false;
> }
>
> I think this function is inline as it needs to be called for each
> change. If that is the case and otherwise also, isn't it better that
> we check if passed xid is the same as CheckXidAlive before checking
> TransactionIdDidCommit as TransactionIdDidCommit can be costly and
> calling it for each change might not be a good idea?

Done, Also I think it is good the check the TransactionIdIsInProgress
instead of !TransactionIdDidCommit. I have changed that as well.

> 5.
> setup CheckXidAlive if it's not committed yet. We don't check if the xid
> + * aborted. That will happen during catalog access. Also reset the
> + * sysbegin_called flag.
>
> /if the xid aborted/if the xid is aborted. missing comma after Also.

Done

> 6.
> ReorderBufferProcessTXN()
> {
> ..
> - /* build data to be able to lookup the CommandIds of catalog tuples */
> + /*
> + * build data to be able to lookup the CommandIds of catalog tuples
> + */
> ReorderBufferBuildTupleCidHash(rb, txn);
> ..
> }
>
> Is there a need to change the formatting of the comment?

No need changed back.

>
> 7.
> ReorderBufferProcessTXN()
> {
> ..
> if (using_subtxn)
> - BeginInternalSubTransaction("replay");
> + BeginInternalSubTransaction("stream");
> else
> StartTransactionCommand();
> ..
> }
>
> I am not sure changing unconditionally "replay" to "stream" is a good
> idea. How about something like BeginInternalSubTransaction(streaming
> ? "stream" : "replay");?

Done

> 8.
> @@ -1588,8 +1766,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
> * use as a normal record. It'll be cleaned up at the end
> * of INSERT processing.
> */
> - if (specinsert == NULL)
> - elog(ERROR, "invalid ordering of speculative insertion changes");
>
> You have removed this check but all other handling of specinsert is
> same as far as this patch is concerned. Why so?

Seems like a merge issue, or the leftover from the old design of the
toast handling where we were streaming with the partial tuple.
fixed now.

> 9.
> @@ -1676,8 +1860,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
> * freed/reused while restoring spooled data from
> * disk.
> */
> - Assert(change->data.tp.newtuple != NULL);
> -
> dlist_delete(&change->node);
>
> Why is this Assert removed?

Same cause as above so fixed.

> 10.
> @@ -1753,7 +1935,15 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
> relations[nrelations++] = relation;
> }
>
> - rb->apply_truncate(rb, txn, nrelations, relations, change);
> + if (streaming)
> + {
> + rb->stream_truncate(rb, txn, nrelations, relations, change);
> +
> + /* Remember that we have sent some data. */
> + change->txn->any_data_sent = true;
> + }
> + else
> + rb->apply_truncate(rb, txn, nrelations, relations, change);
>
> Can we encapsulate this in a separate function like
> ReorderBufferApplyTruncate or something like that? Basically, rather
> than having streaming check in this function, lets do it in some other
> internal function. And we can likewise do it for all the streaming
> checks in this function or at least whereever it is feasible. That
> will make this function look clean.

Done for truncate and change. I think we can create a few more such
functions for
start/stop and cleanup handling on error. I will work on that.

> 11.
> + * We currently can only decode a transaction's contents when its commit
> + * record is read because that's the only place where we know about cache
> + * invalidations. Thus, once a toplevel commit is read, we iterate over the top
> + * and subtransactions (using a k-way merge) and replay the changes in lsn
> + * order.
> + */
> +void
> +ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
> {
> ..
>
> I think the above comment needs to be updated after this patch. This
> API can now be used during the decode of both a in-progress and a
> committed transaction.

Done

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Muhammad Usama 2020-05-15 10:05:52 Re: Transactions involving multiple postgres foreign servers, take 2
Previous Message Dilip Kumar 2020-05-15 09:17:27 Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions