Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions

From: Dilip Kumar <dilipbalaut(at)gmail(dot)com>
To: Kuntal Ghosh <kuntalghosh(dot)2007(at)gmail(dot)com>
Cc: Tomas Vondra <tomas(dot)vondra(at)2ndquadrant(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Michael Paquier <michael(at)paquier(dot)xyz>, Peter Eisentraut <peter(dot)eisentraut(at)2ndquadrant(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions
Date: 2020-04-14 10:10:50
Message-ID: CAFiTN-vy3_KA-+kTff1=UvKasp0r1BDScDuJv5hN2SEw1PHn_A@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Mon, Apr 13, 2020 at 11:43 PM Kuntal Ghosh
<kuntalghosh(dot)2007(at)gmail(dot)com> wrote:
>
> On Mon, Apr 13, 2020 at 6:34 PM Dilip Kumar <dilipbalaut(at)gmail(dot)com> wrote:
> >
> Skipping 0003 for now. Review comments from 0004-Gracefully-handle-*.patch
>
> @@ -5490,6 +5523,14 @@ heap_finish_speculative(Relation relation,
> ItemPointer tid)
> ItemId lp = NULL;
> HeapTupleHeader htup;
>
> + /*
> + * We don't expect direct calls to heap_hot_search with
> + * valid CheckXidAlive for regular tables. Track that below.
> + */
> + if (unlikely(TransactionIdIsValid(CheckXidAlive) &&
> + !(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation))))
> + elog(ERROR, "unexpected heap_hot_search call during logical decoding");
> The call is to heap_finish_speculative.

Fixed

> @@ -481,6 +482,19 @@ systable_getnext(SysScanDesc sysscan)
> }
> }
>
> + if (TransactionIdIsValid(CheckXidAlive) &&
> + !TransactionIdIsInProgress(CheckXidAlive) &&
> + !TransactionIdDidCommit(CheckXidAlive))
> + ereport(ERROR,
> + (errcode(ERRCODE_TRANSACTION_ROLLBACK),
> + errmsg("transaction aborted during system catalog scan")));
> s/transaction aborted/transaction aborted concurrently perhaps? Also,
> can we move this check at the begining of the function? If the
> condition fails, we can skip the sys scan.

We must check this after we get the tuple because our goal is, not to
decode based on the wrong tuple. And, if we move the check before
then, what if the transaction aborted after the check. Once we get
the tuple and if the transaction is alive by that time then it doesn't
matter even if it aborts because we have got the right tuple already.

>
> Some of the checks looks repetative in the same file. Should we
> declare them as inline functions?
>
> Review comments from 0005-Implement-streaming*.patch
>
> +static void
> +AssertChangeLsnOrder(ReorderBufferTXN *txn)
> +{
> +#ifdef USE_ASSERT_CHECKING
> + dlist_iter iter;
> ...
> +#endif
> +}
>
> We can implement the same as following:
> #ifdef USE_ASSERT_CHECKING
> static void
> AssertChangeLsnOrder(ReorderBufferTXN *txn)
> {
> dlist_iter iter;
> ...
> }
> #else
> #define AssertChangeLsnOrder(txn) ((void)true)
> #endif

I am not sure, this doesn't look clean. Moreover, the other similar
functions are defined in the same way. e.g. AssertTXNLsnOrder.

>
> + * if it is aborted we will report an specific error which we can ignore. We
> s/an specific/a specific

Done

>
> + * Set the last last of the stream as the final lsn before calling
> + * stream stop.
> s/last last/last
>
> PG_CATCH();
> {
> + MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
> + ErrorData *errdata = CopyErrorData();
> When we don't re-throw, the errdata should be freed by calling
> FreeErrorData(errdata), right?

Done

>
> + /*
> + * Set the last last of the stream as the final lsn before
> + * calling stream stop.
> + */
> + txn->final_lsn = prev_lsn;
> + rb->stream_stop(rb, txn);
> +
> + FlushErrorState();
> + }
> stream_stop() can still throw some error, right? In that case, we
> should flush the error state before calling stream_stop().

Done

>
> + /*
> + * Remember the command ID and snapshot if transaction is streaming
> + * otherwise free the snapshot if we have copied it.
> + */
> + if (streaming)
> + {
> + txn->command_id = command_id;
> +
> + /* Avoid copying if it's already copied. */
> + if (snapshot_now->copied)
> + txn->snapshot_now = snapshot_now;
> + else
> + txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
> + txn, command_id);
> + }
> + else if (snapshot_now->copied)
> + ReorderBufferFreeSnap(rb, snapshot_now);
> Hmm, it seems this part needs an assumption that after copying the
> snapshot, no subsequent step can throw any error. If they do, then we
> can again create a copy of the snapshot in catch block, which will
> leak some memory. Is my understanding correct?

Actually, In CATCH we copy only if the error is
ERRCODE_TRANSACTION_ROLLBACK. And, that can occur during systable
scan. Basically, in TRY block we copy snapshot after we have streamed
all the changes i.e. systable scan is done, now if there is any error
that will not be ERRCODE_TRANSACTION_ROLLBACK. So we will not copy
again.

>
> + }
> + else
> + {
> + ReorderBufferCleanupTXN(rb, txn);
> + PG_RE_THROW();
> + }
> Shouldn't we switch back to previously created error memory context
> before re-throwing?

Fixed.

>
> +ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
> + XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
> + TimestampTz commit_time,
> + RepOriginId origin_id, XLogRecPtr origin_lsn)
> +{
> + ReorderBufferTXN *txn;
> + volatile Snapshot snapshot_now;
> + volatile CommandId command_id = FirstCommandId;
> In the modified ReorderBufferCommit(), why is it necessary to declare
> the above two variable as volatile? There is no try-catch block here.

Fixed
>
> @@ -1946,6 +2284,13 @@ ReorderBufferAbort(ReorderBuffer *rb,
> TransactionId xid, XLogRecPtr lsn)
> if (txn == NULL)
> return;
>
> + /*
> + * When the (sub)transaction was streamed, notify the remote node
> + * about the abort only if we have sent any data for this transaction.
> + */
> + if (rbtxn_is_streamed(txn) && txn->any_data_sent)
> + rb->stream_abort(rb, txn, lsn);
> +
> s/When/If
>
> + /*
> + * When the (sub)transaction was streamed, notify the remote node
> + * about the abort.
> + */
> + if (rbtxn_is_streamed(txn))
> + rb->stream_abort(rb, txn, lsn);
> s/When/If. And, in this case, if we've not sent any data, why should
> we send the abort message (similar to the previous one)?

Fixed

>
> + * Note: We never do both stream and serialize a transaction (we only spill
> + * to disk when streaming is not supported by the plugin), so only one of
> + * those two flags may be set at any given time.
> + */
> +#define rbtxn_is_streamed(txn) \
> +( \
> + ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
> +)
> Should we put any assert (not necessarily here) to validate the above comment?

Because of toast handling, this assumption is changed now so I will
remove this note in that patch (0010).

>
> + txn = ReorderBufferLargestTopTXN(rb);
> +
> + /* we know there has to be one, because the size is not zero */
> + Assert(txn && !txn->toptxn);
> + Assert(txn->size > 0);
> + Assert(rb->size >= txn->size);
> The same three assertions are already there in ReorderBufferLargestTopTXN().
>
> +static bool
> +ReorderBufferCanStream(ReorderBuffer *rb)
> +{
> + LogicalDecodingContext *ctx = rb->private_data;
> +
> + return ctx->streaming;
> +}
> Potential inline function.

Done

> +static void
> +ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
> +{
> + volatile Snapshot snapshot_now;
> + volatile CommandId command_id;
> Here also, do we need to declare these two variables as volatile?

Done

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

Attachment Content-Type Size
v14-0001-Immediately-WAL-log-assignments.patch application/octet-stream 10.5 KB
v14-0003-Extend-the-output-plugin-API-with-stream-methods.patch application/octet-stream 34.8 KB
v14-0002-Issue-individual-invalidations-with.patch application/octet-stream 16.7 KB
v14-0005-Implement-streaming-mode-in-ReorderBuffer.patch application/octet-stream 37.9 KB
v14-0004-Gracefully-handle-concurrent-aborts-of-uncommitt.patch application/octet-stream 12.3 KB
v14-0006-Add-support-for-streaming-to-built-in-replicatio.patch application/octet-stream 90.9 KB
v14-0008-Enable-streaming-for-all-subscription-TAP-tests.patch application/octet-stream 14.7 KB
v14-0009-Add-TAP-test-for-streaming-vs.-DDL.patch application/octet-stream 4.4 KB
v14-0007-Track-statistics-for-streaming.patch application/octet-stream 11.8 KB
v14-0010-Bugfix-handling-of-incomplete-toast-tuple.patch application/octet-stream 15.3 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Dilip Kumar 2020-04-14 10:11:05 Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions
Previous Message Anna Akenteva 2020-04-14 09:52:07 Re: [HACKERS] make async slave to wait for lsn to be replayed