Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions

From: Dilip Kumar <dilipbalaut(at)gmail(dot)com>
To: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
Cc: Kuntal Ghosh <kuntalghosh(dot)2007(at)gmail(dot)com>, Tomas Vondra <tomas(dot)vondra(at)2ndquadrant(dot)com>, Michael Paquier <michael(at)paquier(dot)xyz>, Peter Eisentraut <peter(dot)eisentraut(at)2ndquadrant(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions
Date: 2020-05-25 14:37:36
Message-ID: CAFiTN-tW35oa_xNNMTKJ7LPLtaanjSwaEC15LrAixHi-i=DBmw@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Fri, May 22, 2020 at 11:54 AM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>
> On Tue, May 19, 2020 at 6:01 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
> >
> > On Fri, May 15, 2020 at 2:48 PM Dilip Kumar <dilipbalaut(at)gmail(dot)com> wrote:
> > >
>
> I have further reviewed v22 and below are my comments:
>
> v22-0005-Implement-streaming-mode-in-ReorderBuffer
> --------------------------------------------------------------------------
> 1.
> + * Note: We never do both stream and serialize a transaction (we only spill
> + * to disk when streaming is not supported by the plugin), so only one of
> + * those two flags may be set at any given time.
> + */
> +#define rbtxn_is_streamed(txn) \
> +( \
> + ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
> +)
>
> The above 'Note' is not correct as per the latest implementation.

That is removed in 0010 in the latest version you can see in 0006.

> v22-0006-Add-support-for-streaming-to-built-in-replicatio
> ----------------------------------------------------------------------------
> 2.
> --- a/src/backend/replication/logical/launcher.c
> +++ b/src/backend/replication/logical/launcher.c
> @@ -14,7 +14,6 @@
> *
> *-------------------------------------------------------------------------
> */
> -
> #include "postgres.h"
>
> Spurious line removal.

Fixed

> 3.
> +void
> +logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
> + XLogRecPtr commit_lsn)
> +{
> + uint8 flags = 0;
> +
> + pq_sendbyte(out, 'c'); /* action STREAM COMMIT */
> +
> + Assert(TransactionIdIsValid(txn->xid));
> +
> + /* transaction ID (we're starting to stream, so must be valid) */
> + pq_sendint32(out, txn->xid);
>
> The part of the comment "we're starting to stream, so must be valid"
> is not correct as we are not at the start of the stream here. The
> patch has used the same incorrect sentence at few places, kindly fix
> those as well.

I have removed that part of the comment.

> 4.
> + * XXX Do we need to allocate it in TopMemoryContext?
> + */
> +static void
> +subxact_info_add(TransactionId xid)
> {
> ..
>
> For this and other places in a patch like in function
> stream_open_file(), instead of using TopMemoryContext, can we consider
> using a new memory context LogicalStreamingContext or something like
> that. We can create LogicalStreamingContext under TopMemoryContext. I
> don't see any need of using TopMemoryContext here.

But, when we will delete/reset the LogicalStreamingContext? because
we are planning to keep this memory until the worker is alive so that
supposed to be the top memory context. If we create any other context
with the same life span as TopMemoryContext then what is the point?
Am I missing something?

> 5.
> +static void
> +subxact_info_add(TransactionId xid)
>
> This function has assumed a valid value for global variables like
> stream_fd and stream_xid. I think it is better to have Assert for
> those in this function before using them. The Assert for those are
> present in handle_streamed_transaction but I feel they should be in
> subxact_info_add.

Done

> 6.
> +subxact_info_add(TransactionId xid)
> /*
> + * In most cases we're checking the same subxact as we've already seen in
> + * the last call, so make ure just ignore it (this change comes later).
> + */
> + if (subxact_last == xid)
> + return;
>
> Typo and minor correction, /ure just/sure to

Done

> 7.
> +subxact_info_write(Oid subid, TransactionId xid)
> {
> ..
> + /*
> + * But we free the memory allocated for subxact info. There might be one
> + * exceptional transaction with many subxacts, and we don't want to keep
> + * the memory allocated forewer.
> + *
> + */
>
> a. Typo, /forewer/forever
> b. The extra line at the end of the comment is not required.

Done

> 8.
> + * XXX Maybe we should only include the checksum when the cluster is
> + * initialized with checksums?
> + */
> +static void
> +subxact_info_write(Oid subid, TransactionId xid)
>
> Do we really need to have the checksum for temporary files? I have
> checked a few other similar cases like SharedFileSet stuff for
> parallel hash join but didn't find them using checksums. Can you also
> once see other usages of temporary files and then let us decide if we
> see any reason to have checksums for this?

Yeah, even I can see other places checksum is not used.

>
> Another point is we don't seem to be doing this for 'changes' file,
> see stream_write_change. So, not sure, there is any sense to write
> checksum for subxact file.

I can see there are comment atop this function

* XXX The subxact file includes CRC32C of the contents. Maybe we should
* include something like that here too, but doing so will not be as
* straighforward, because we write the file in chunks.

>
> Tomas, do you see any reason for the same?

> 9.
> +subxact_filename(char *path, Oid subid, TransactionId xid)
> +{
> + char tempdirpath[MAXPGPATH];
> +
> + TempTablespacePath(tempdirpath, DEFAULTTABLESPACE_OID);
> +
> + /*
> + * We might need to create the tablespace's tempfile directory, if no
> + * one has yet done so.
> + */
> + if ((MakePGDirectory(tempdirpath) < 0) && errno != EEXIST)
> + ereport(ERROR,
> + (errcode_for_file_access(),
> + errmsg("could not create directory \"%s\": %m",
> + tempdirpath)));
> +
> + snprintf(path, MAXPGPATH, "%s/logical-%u-%u.subxacts",
> + tempdirpath, subid, xid);
> +}
>
> Temporary files created in PGDATA/base/pgsql_tmp follow a certain
> naming convention (see docs[1]) which is not followed here. You can
> also refer SharedFileSetPath and OpenTemporaryFile. I think we can
> just try to follow that convention and then additionally append subid,
> xid and .subxacts. Also, a similar change is required for
> changes_filename. I would like to know if there is a reason why we
> want to use different naming convention here?

I have changed it to this: pgsql_tmpPID-subid-xid.subxacts.

> 10.
> + * This can only be called at the beginning of a "streaming" block, i.e.
> + * between stream_start/stream_stop messages from the upstream.
> + */
> +static void
> +stream_close_file(void)
>
> The comment seems to be wrong. I think this can be only called at
> stream end, so it should be "This can only be called at the end of a
> "streaming" block, i.e. at stream_stop message from the upstream."

Right, I have fixed it.

> 11.
> + * the order the transactions are sent in. So streamed trasactions are
> + * handled separately by using schema_sent flag in ReorderBufferTXN.
> + *
> * For partitions, 'pubactions' considers not only the table's own
> * publications, but also those of all of its ancestors.
> */
> typedef struct RelationSyncEntry
> {
> Oid relid; /* relation oid */
> -
> + TransactionId xid; /* transaction that created the record */
> /*
> * Did we send the schema? If ancestor relid is set, its schema must also
> * have been sent for this to be true.
> */
> bool schema_sent;
> + List *streamed_txns; /* streamed toplevel transactions with this
> + * schema */
>
> The part of comment "So streamed trasactions are handled separately by
> using schema_sent flag in ReorderBufferTXN." doesn't seem to match
> with what we are doing in the latest version of the patch.

Yeah, it's wrong, I have fixed it.

> 12.
> maybe_send_schema()
> {
> ..
> + if (in_streaming)
> + {
> + /*
> + * TOCHECK: We have to send schema after each catalog change and it may
> + * occur when streaming already started, so we have to track new catalog
> + * changes somehow.
> + */
> + schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
> ..
> ..
> }
>
> I think it is good to once verify/test what this comment says but as
> per code we should be sending the schema after each catalog change as
> we invalidate the streamed_txns list in rel_sync_cache_relation_cb
> which must be called during relcache invalidation. Do we see any
> problem with that mechanism?

I have tested this, I think we are already sending the schema after
each catalog change.

> 13.
> +/*
> + * Notify downstream to discard the streamed transaction (along with all
> + * it's subtransactions, if it's a toplevel transaction).
> + */
> +static void
> +pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
> + ReorderBufferTXN *txn,
> + XLogRecPtr commit_lsn)
>
> This comment is copied from pgoutput_stream_abort, so doesn't match
> what this function is doing.

Done

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

Attachment Content-Type Size
v24.tar application/x-tar 577.0 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Dilip Kumar 2020-05-25 14:37:49 Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions
Previous Message Joe Conway 2020-05-25 14:10:43 Re: repeat() function, CHECK_FOR_INTERRUPTS(), and unlikely()