Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: Dilip Kumar <dilipbalaut(at)gmail(dot)com>
Cc: Kuntal Ghosh <kuntalghosh(dot)2007(at)gmail(dot)com>, Tomas Vondra <tomas(dot)vondra(at)2ndquadrant(dot)com>, Michael Paquier <michael(at)paquier(dot)xyz>, Peter Eisentraut <peter(dot)eisentraut(at)2ndquadrant(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions
Date: 2020-05-22 06:24:42
Message-ID: CAA4eK1+aHfEafWOtjsbx8M6LacPCZtPTyfFrsk7kjZG854yB3Q@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Tue, May 19, 2020 at 6:01 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>
> On Fri, May 15, 2020 at 2:48 PM Dilip Kumar <dilipbalaut(at)gmail(dot)com> wrote:
> >

I have further reviewed v22 and below are my comments:

v22-0005-Implement-streaming-mode-in-ReorderBuffer
--------------------------------------------------------------------------
1.
+ * Note: We never do both stream and serialize a transaction (we only spill
+ * to disk when streaming is not supported by the plugin), so only one of
+ * those two flags may be set at any given time.
+ */
+#define rbtxn_is_streamed(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
+)

The above 'Note' is not correct as per the latest implementation.

v22-0006-Add-support-for-streaming-to-built-in-replicatio
----------------------------------------------------------------------------
2.
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -14,7 +14,6 @@
*
*-------------------------------------------------------------------------
*/
-
#include "postgres.h"

Spurious line removal.

3.
+void
+logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, 'c'); /* action STREAM COMMIT */
+
+ Assert(TransactionIdIsValid(txn->xid));
+
+ /* transaction ID (we're starting to stream, so must be valid) */
+ pq_sendint32(out, txn->xid);

The part of the comment "we're starting to stream, so must be valid"
is not correct as we are not at the start of the stream here. The
patch has used the same incorrect sentence at few places, kindly fix
those as well.

4.
+ * XXX Do we need to allocate it in TopMemoryContext?
+ */
+static void
+subxact_info_add(TransactionId xid)
{
..

For this and other places in a patch like in function
stream_open_file(), instead of using TopMemoryContext, can we consider
using a new memory context LogicalStreamingContext or something like
that. We can create LogicalStreamingContext under TopMemoryContext. I
don't see any need of using TopMemoryContext here.

5.
+static void
+subxact_info_add(TransactionId xid)

This function has assumed a valid value for global variables like
stream_fd and stream_xid. I think it is better to have Assert for
those in this function before using them. The Assert for those are
present in handle_streamed_transaction but I feel they should be in
subxact_info_add.

6.
+subxact_info_add(TransactionId xid)
/*
+ * In most cases we're checking the same subxact as we've already seen in
+ * the last call, so make ure just ignore it (this change comes later).
+ */
+ if (subxact_last == xid)
+ return;

Typo and minor correction, /ure just/sure to

7.
+subxact_info_write(Oid subid, TransactionId xid)
{
..
+ /*
+ * But we free the memory allocated for subxact info. There might be one
+ * exceptional transaction with many subxacts, and we don't want to keep
+ * the memory allocated forewer.
+ *
+ */

a. Typo, /forewer/forever
b. The extra line at the end of the comment is not required.

8.
+ * XXX Maybe we should only include the checksum when the cluster is
+ * initialized with checksums?
+ */
+static void
+subxact_info_write(Oid subid, TransactionId xid)

Do we really need to have the checksum for temporary files? I have
checked a few other similar cases like SharedFileSet stuff for
parallel hash join but didn't find them using checksums. Can you also
once see other usages of temporary files and then let us decide if we
see any reason to have checksums for this?

Another point is we don't seem to be doing this for 'changes' file,
see stream_write_change. So, not sure, there is any sense to write
checksum for subxact file.

Tomas, do you see any reason for the same?

9.
+subxact_filename(char *path, Oid subid, TransactionId xid)
+{
+ char tempdirpath[MAXPGPATH];
+
+ TempTablespacePath(tempdirpath, DEFAULTTABLESPACE_OID);
+
+ /*
+ * We might need to create the tablespace's tempfile directory, if no
+ * one has yet done so.
+ */
+ if ((MakePGDirectory(tempdirpath) < 0) && errno != EEXIST)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not create directory \"%s\": %m",
+ tempdirpath)));
+
+ snprintf(path, MAXPGPATH, "%s/logical-%u-%u.subxacts",
+ tempdirpath, subid, xid);
+}

Temporary files created in PGDATA/base/pgsql_tmp follow a certain
naming convention (see docs[1]) which is not followed here. You can
also refer SharedFileSetPath and OpenTemporaryFile. I think we can
just try to follow that convention and then additionally append subid,
xid and .subxacts. Also, a similar change is required for
changes_filename. I would like to know if there is a reason why we
want to use different naming convention here?

10.
+ * This can only be called at the beginning of a "streaming" block, i.e.
+ * between stream_start/stream_stop messages from the upstream.
+ */
+static void
+stream_close_file(void)

The comment seems to be wrong. I think this can be only called at
stream end, so it should be "This can only be called at the end of a
"streaming" block, i.e. at stream_stop message from the upstream."

11.
+ * the order the transactions are sent in. So streamed trasactions are
+ * handled separately by using schema_sent flag in ReorderBufferTXN.
+ *
* For partitions, 'pubactions' considers not only the table's own
* publications, but also those of all of its ancestors.
*/
typedef struct RelationSyncEntry
{
Oid relid; /* relation oid */
-
+ TransactionId xid; /* transaction that created the record */
/*
* Did we send the schema? If ancestor relid is set, its schema must also
* have been sent for this to be true.
*/
bool schema_sent;
+ List *streamed_txns; /* streamed toplevel transactions with this
+ * schema */

The part of comment "So streamed trasactions are handled separately by
using schema_sent flag in ReorderBufferTXN." doesn't seem to match
with what we are doing in the latest version of the patch.

12.
maybe_send_schema()
{
..
+ if (in_streaming)
+ {
+ /*
+ * TOCHECK: We have to send schema after each catalog change and it may
+ * occur when streaming already started, so we have to track new catalog
+ * changes somehow.
+ */
+ schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
..
..
}

I think it is good to once verify/test what this comment says but as
per code we should be sending the schema after each catalog change as
we invalidate the streamed_txns list in rel_sync_cache_relation_cb
which must be called during relcache invalidation. Do we see any
problem with that mechanism?

13.
+/*
+ * Notify downstream to discard the streamed transaction (along with all
+ * it's subtransactions, if it's a toplevel transaction).
+ */
+static void
+pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)

This comment is copied from pgoutput_stream_abort, so doesn't match
what this function is doing.

[1] - https://www.postgresql.org/docs/devel/storage-file-layout.html

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Etsuro Fujita 2020-05-22 06:53:21 Re: Optimizer docs typos
Previous Message Noah Misch 2020-05-22 06:24:23 Re: Problem with pg_atomic_compare_exchange_u64 at 32-bit platforms