Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions

From: Dilip Kumar <dilipbalaut(at)gmail(dot)com>
To: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
Cc: Michael Paquier <michael(at)paquier(dot)xyz>, Tomas Vondra <tomas(dot)vondra(at)2ndquadrant(dot)com>, Peter Eisentraut <peter(dot)eisentraut(at)2ndquadrant(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions
Date: 2020-02-11 03:12:29
Message-ID: CAFiTN-t1YNmoBf7k1kUrUue4q1Tf3GXjGwZTseNLivKmr9sz1Q@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Wed, Feb 5, 2020 at 4:05 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>
> On Wed, Feb 5, 2020 at 9:46 AM Dilip Kumar <dilipbalaut(at)gmail(dot)com> wrote:
> I think we don't need to maintain
> v8-0007-Support-logical_decoding_work_mem-set-from-create as per
> discussion in one of the above emails [1] as its usage is not clear.

Done

> v8-0008-Add-support-for-streaming-to-built-in-replication
> 1.
> - information. The allowed options are <literal>slot_name</literal> and
> - <literal>synchronous_commit</literal>
> + information. The allowed options are <literal>slot_name</literal>,
> + <literal>synchronous_commit</literal>, <literal>work_mem</literal>
> + and <literal>streaming</literal>.
>
> As per the discussion above [1], I don't think we need work_mem here.
> You might want to remove the other usage from the patch as well.

Done

> 2.
> @@ -59,7 +59,8 @@ parse_subscription_options(List *options, bool
> *connect, bool *enabled_given,
> bool *slot_name_given, char **slot_name,
> bool *copy_data, char **synchronous_commit,
> bool *refresh, int *logical_wm,
> - bool *logical_wm_given)
> + bool *logical_wm_given, bool *streaming,
> + bool *streaming_given)
>
> It is not clear to me why we need two parameters 'streaming' and
> 'streaming_given' in this API. Can't we handle similar to parameter
> 'refresh'?

The streaming option we need to update in the system table, so if we
don't remember whether the user has given its value or not then how we
will know whether to update this column or not? Or you are suggesting
that we should always mark this as updated but IMHO that is not a good
idea.

> 3.
> diff --git a/src/backend/replication/logical/launcher.c
> b/src/backend/replication/logical/launcher.c
> index aec885e..e80d00c 100644
> --- a/src/backend/replication/logical/launcher.c
> +++ b/src/backend/replication/logical/launcher.c
> @@ -14,6 +14,8 @@
> *
> *-------------------------------------------------------------------------
> */
> +#include <sys/types.h>
> +#include <unistd.h>
>
> #include "postgres.h"
>
> I see only the above change in launcher.c. Why we need to include
> these if there is no other change (at least not in this patch).

Removed

> 4.
> stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
> /* Push callback + info on the error context stack */
> state.ctx = ctx;
> state.callback_name = "stream_start";
> - /* state.report_location = apply_lsn; */
> + state.report_location = InvalidXLogRecPtr;
> errcallback.callback = output_plugin_error_callback;
> errcallback.arg = (void *) &state;
> errcallback.previous = error_context_stack;
> @@ -1194,7 +1194,7 @@ stream_stop_cb_wrapper(ReorderBuffer *cache,
> ReorderBufferTXN *txn)
> /* Push callback + info on the error context stack */
> state.ctx = ctx;
> state.callback_name = "stream_stop";
> - /* state.report_location = apply_lsn; */
> + state.report_location = InvalidXLogRecPtr;
> errcallback.callback = output_plugin_error_callback;
> errcallback.arg = (void *) &state;
> errcallback.previous = error_context_stack;
>
> Don't we want to set txn->final_lsn in report location as we do at few
> other places?

Fixed

> 5.
> -logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
> +logicalrep_write_delete(StringInfo out, TransactionId xid,
> + Relation rel, HeapTuple oldtuple)
> {
> + pq_sendbyte(out, 'D'); /* action DELETE */
> +
> Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
> rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
> rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
>
> - pq_sendbyte(out, 'D'); /* action DELETE */
>
> Why this patch need to change the above code?

Fixed

> 6.
> +void
> +logicalrep_write_stream_start(StringInfo out,
> + TransactionId xid, bool first_segment)
> +{
> + pq_sendbyte(out, 'S'); /* action STREAM START */
> +
> + Assert(TransactionIdIsValid(xid));
> +
> + /* transaction ID (we're starting to stream, so must be valid) */
> + pq_sendint32(out, xid);
> +
> + /* 1 if this is the first streaming segment for this xid */
> + pq_sendint32(out, first_segment ? 1 : 0);
> +}
> +
> +TransactionId
> +logicalrep_read_stream_start(StringInfo in, bool *first_segment)
> +{
> + TransactionId xid;
> +
> + Assert(first_segment);
> +
> + xid = pq_getmsgint(in, 4);
> + *first_segment = (pq_getmsgint(in, 4) == 1);
> +
> + return xid;
> +}
>
> In these functions for sending bool, pq_sendint32 is used. Can't we
> use pq_sendbyte similar to what we do in boolsend?

Done

> 7.
> +void
> +logicalrep_write_stream_stop(StringInfo out, TransactionId xid)
> +{
> + pq_sendbyte(out, 'E'); /* action STREAM END */
> +
> + Assert(TransactionIdIsValid(xid));
> +
> + /* transaction ID (we're starting to stream, so must be valid) */
> + pq_sendint32(out, xid);
> +}
>
> In comments, 'starting to stream' is mentioned whereas this function
> is to stop it.

Fixed

> 8.
> +void
> +logicalrep_write_stream_stop(StringInfo out, TransactionId xid)
> +{
> + pq_sendbyte(out, 'E'); /* action STREAM END */
> +
> + Assert(TransactionIdIsValid(xid));
> +
> + /* transaction ID (we're starting to stream, so must be valid) */
> + pq_sendint32(out, xid);
> +}
> +
> +TransactionId
> +logicalrep_read_stream_stop(StringInfo in)
> +{
> + TransactionId xid;
> +
> + xid = pq_getmsgint(in, 4);
> +
> + return xid;
> +}
>
> Is there a reason to send xid on stopping stream? I don't see any use
> of function logicalrep_read_stream_stop.

Removed

> 9.
> + * XXX Add calls to pgstat_report_wait_start/pgstat_report_wait_end.
> + */
> +static void
> +subxact_info_write(Oid subid, TransactionId xid)
> {
> ..
> + pgstat_report_wait_start(WAIT_EVENT_LOGICAL_SUBXACT_WRITE);
> ..
> + pgstat_report_wait_end();
> ..
> }
>
> I see the calls to pgstat_report_wait_start/pgstat_report_wait_end in
> this function, so not sure if the above comment makes sense.

Fixed

> 10.
> + * The files are placed in /tmp by default, and the filenames include both
> + * the XID of the toplevel transaction and OID of the subscription.
>
> Are we keeping files in /tmp or pg's temp tablespace dir. Seeing
> below code, it doesn't seem that we place them in /tmp. If I am
> correct, then can you update the comment.
> +static void
> +subxact_filename(char *path, Oid subid, TransactionId xid)
> +{
> + char tempdirpath[MAXPGPATH];
> +
> + TempTablespacePath(tempdirpath, DEFAULTTABLESPACE_OID);

Done

> 11.
> + * The change is serialied in a simple format, with length (not including
> + * the length), action code (identifying the message type) and message
> + * contents (without the subxact TransactionId value).
> + *
> ..
> + */
> +static void
> +stream_write_change(char action, StringInfo s)
>
> The part of the comment which says "with length (not including the
> length) .." is not clear to me. What does "not including the length"
> mean?

Basically, it says that the 4 bytes which are used for storing then
the length of total data doesn't include the 4 bytes.

> 12.
> + * TODO: Add missing_ok flag to specify in which cases it's OK not to
> + * find the files, and when it's an error.
> + */
> +static void
> +stream_cleanup_files(Oid subid, TransactionId xid)
>
> I think we can implement this TODO. It is clear when this function is
> called from apply_handle_stream_commit, the file must exist. We can
> similarly analyze other callers of this API.

Done

> 13.
> +apply_handle_stream_abort(StringInfo s)
> {
> ..
> + /* FIXME optimize the search by bsearch on sorted data */
> + for (i = nsubxacts; i > 0; i--)
> ..
>
> I am not sure how important this optimization is, so instead of FIXME,
> it is better to keep it as a XXX comment. In the future, if we hit
> any performance issue due to this, we can revisit our decision.

Done

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

Attachment Content-Type Size
v10-0002-Issue-individual-invalidations-with-wal_level-lo.patch application/octet-stream 16.4 KB
v10-0003-Extend-the-output-plugin-API-with-stream-methods.patch application/octet-stream 34.8 KB
v10-0005-Implement-streaming-mode-in-ReorderBuffer.patch application/octet-stream 37.8 KB
v10-0004-Gracefully-handle-concurrent-aborts-of-uncommitt.patch application/octet-stream 12.6 KB
v10-0008-Enable-streaming-for-all-subscription-TAP-tests.patch application/octet-stream 14.7 KB
v10-0009-Add-TAP-test-for-streaming-vs.-DDL.patch application/octet-stream 4.4 KB
v10-0006-Add-support-for-streaming-to-built-in-replicatio.patch application/octet-stream 89.9 KB
v10-0007-Track-statistics-for-streaming.patch application/octet-stream 11.7 KB
v10-0010-Bugfix-handling-of-incomplete-toast-tuple.patch application/octet-stream 13.3 KB
v10-0001-Immediately-WAL-log-assignments.patch application/octet-stream 10.3 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Craig Ringer 2020-02-11 03:36:08 Re: POC: GUC option for skipping shared buffers in core dumps
Previous Message Andy Fan 2020-02-11 02:57:26 Re: [PATCH] Erase the distinctClause if the result is unique by definition