Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: Dilip Kumar <dilipbalaut(at)gmail(dot)com>
Cc: Kuntal Ghosh <kuntalghosh(dot)2007(at)gmail(dot)com>, Tomas Vondra <tomas(dot)vondra(at)2ndquadrant(dot)com>, Michael Paquier <michael(at)paquier(dot)xyz>, Peter Eisentraut <peter(dot)eisentraut(at)2ndquadrant(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions
Date: 2020-05-12 11:09:35
Message-ID: CAA4eK1JMx9aWZiEqfAFdavj7YxriDstR0rdkazM5b6eV0zoMLQ@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Thu, May 7, 2020 at 6:17 PM Dilip Kumar <dilipbalaut(at)gmail(dot)com> wrote:
>
> On Tue, May 5, 2020 at 7:13 PM Dilip Kumar <dilipbalaut(at)gmail(dot)com> wrote:
>
> I have fixed one more issue in 0010 patch. The issue was that once
> the transaction is serialized due to the incomplete toast after
> streaming the serialized store was not cleaned up so it was streaming
> the same tuple multiple times.
>

I have reviewed a few patches (003, 004, and 005) and below are my comments.

v20-0003-Extend-the-output-plugin-API-with-stream-methods
----------------------------------------------------------------------------------------
1.
+static void
+pg_decode_stream_change(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ Relation relation,
+ ReorderBufferChange *change)
+{
+ OutputPluginPrepareWrite(ctx, true);
+ appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
+ OutputPluginWrite(ctx, true);
+}
+
+static void
+pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ int nrelations, Relation relations[],
+ ReorderBufferChange *change)
+{
+ OutputPluginPrepareWrite(ctx, true);
+ appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
+ OutputPluginWrite(ctx, true);
+}

In the above and similar APIs, there are parameters like relation
which are not used. I think you should add some comments atop these
APIs to explain why it is so? I guess it is because we want to keep
them similar to non-stream version of APIs and we can't display
relation or other information as the transaction is still in-progress.

2.
+ <para>
+ Similar to spill-to-disk behavior, streaming is triggered when the total
+ amount of changes decoded from the WAL (for all in-progress transactions)
+ exceeds limit defined by
<varname>logical_decoding_work_mem</varname> setting.
+ At that point the largest toplevel transaction (measured by
amount of memory
+ currently used for decoded changes) is selected and streamed.
+ </para>

I think we need to explain here the cases/exception where we need to
spill even when stream is enabled and check if this is per latest
implementation, otherwise, update it.

3.
+ * To support streaming, we require change/commit/abort callbacks. The
+ * message callback is optional, similarly to regular output plugins.

/similarly/similar

4.
+static void
+stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_start";
+ /* state.report_location = apply_lsn; */

Why can't we supply the report_location here? I think here we need to
report txn->first_lsn if this is the very first stream and
txn->final_lsn if it is any consecutive one.

5.
+static void
+stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_stop";
+ /* state.report_location = apply_lsn; */

Can't we report txn->final_lsn here?

6. I think it will be good if we can provide an example of streaming
changes via test_decoding at
https://www.postgresql.org/docs/devel/test-decoding.html. I think we
can also explain there why the user is not expected to see the actual
data in the stream.

v20-0004-Gracefully-handle-concurrent-aborts-of-uncommitt
----------------------------------------------------------------------------------------
7.
+ /*
+ * We don't expect direct calls to table_tuple_get_latest_tid with valid
+ * CheckXidAlive for catalog or regular tables.

There is an extra space between 'CheckXidAlive' and 'for'. I can see
similar problems in other places as well where this comment is used,
fix those as well.

8.
+/*
+ * CheckXidAlive is a xid value pointing to a possibly ongoing (sub)
+ * transaction. Currently, it is used in logical decoding. It's possible
+ * that such transactions can get aborted while the decoding is ongoing in
+ * which case we skip decoding that particular transaction. To ensure that we
+ * check whether the CheckXidAlive is aborted after fetching the tuple from
+ * system tables. We also ensure that during logical decoding we never
+ * directly access the tableam or heap APIs because we are checking for the
+ * concurrent aborts only in systable_* APIs.
+ */

In this comment, there is an inconsistency in the space used after
completing the sentence. In the part "transaction. To", single space
is used whereas at other places two spaces are used after a full stop.

v20-0005-Implement-streaming-mode-in-ReorderBuffer
-----------------------------------------------------------------------------
9.
Implement streaming mode in ReorderBuffer

Instead of serializing the transaction to disk after reaching the
maximum number of changes in memory (4096 changes), we consume the
changes we have in memory and invoke new stream API methods. This
happens in ReorderBufferStreamTXN() using about the same logic as
in ReorderBufferCommit() logic.

I think the above part of the commit message needs to be updated.

10.
Theoretically, we could get rid of the k-way merge, and append the
changes to the toplevel xact directly (and remember the position
in the list in case the subxact gets aborted later).

I don't think this part of the commit message is correct as we
sometimes need to spill even during streaming. Please check the
entire commit message and update according to the latest
implementation.

11.
- * HeapTupleSatisfiesHistoricMVCC.
+ * tqual.c's HeapTupleSatisfiesHistoricMVCC.
+ *
+ * We do build the hash table even if there are no CIDs. That's
+ * because when streaming in-progress transactions we may run into
+ * tuples with the CID before actually decoding them. Think e.g. about
+ * INSERT followed by TRUNCATE, where the TRUNCATE may not be decoded
+ * yet when applying the INSERT. So we build a hash table so that
+ * ResolveCminCmaxDuringDecoding does not segfault in this case.
+ *
+ * XXX We might limit this behavior to streaming mode, and just bail
+ * out when decoding transaction at commit time (at which point it's
+ * guaranteed to see all CIDs).
*/
static void
ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
@@ -1350,9 +1498,6 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer
*rb, ReorderBufferTXN *txn)
dlist_iter iter;
HASHCTL hash_ctl;

- if (!rbtxn_has_catalog_changes(txn) || dlist_is_empty(&txn->tuplecids))
- return;
-

I don't understand this change. Why would "INSERT followed by
TRUNCATE" could lead to a tuple which can come for decode before its
CID? The patch has made changes based on this assumption in
HeapTupleSatisfiesHistoricMVCC which appears to be very risky as the
behavior could be dependent on whether we are streaming the changes
for in-progress xact or at the commit of a transaction. We might want
to generate a test to once validate this behavior.

Also, the comment refers to tqual.c which is wrong as this API is now
in heapam_visibility.c.

12.
+ * setup CheckXidAlive if it's not committed yet. We don't check if the xid
+ * aborted. That will happen during catalog access. Also reset the
+ * sysbegin_called flag.
*/
- if (txn->base_snapshot == NULL)
+ if (!TransactionIdDidCommit(xid))
{
- Assert(txn->ninvalidations == 0);
- ReorderBufferCleanupTXN(rb, txn);
- return;
+ CheckXidAlive = xid;
+ bsysscan = false;
}

In the comment, the flag name 'sysbegin_called' should be bsysscan.

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Joe Conway 2020-05-12 12:06:58 repeat() function, CHECK_FOR_INTERRUPTS(), and unlikely()
Previous Message Andrey Lepikhov 2020-05-12 10:24:18 Re: Global snapshots