Re: Perform streaming logical transactions by background workers and parallel apply

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-09-30 08:26:37
Message-ID: CAHut+PvFENKb5fcMko5HHtNEAaZyNwGhu3PASrcBt+HFoFL=Fw@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Here are my review comments for the v35-0001 patch:

======

1. Commit message

Currently, for large transactions, the publisher sends the data in multiple
streams (changes divided into chunks depending upon logical_decoding_work_mem),
and then on the subscriber-side, the apply worker writes the changes into
temporary files and once it receives the commit, it reads from the file and
applies the entire transaction.

~

There is a mix of plural and singular.

"reads from the file" -> "reads from those files" ?

~~~

2.

This preserves commit ordering and avoids
writing to and reading from file in most cases. We still need to spill if there
is no worker available.

2a.
"file" => "files"

2b.
"in most cases. We still need to spill" -> "in most cases, although we
still need to spill"

======

3. GENERAL

(this comment was written after I wrote all the other ones below so
there might be some unintended overlaps...)

I found the mixed use of the same member names having different
meanings to be quite confusing.

e.g.1
PGOutputData 'streaming' is now a single char internal representation
the subscription parameter streaming mode ('f','t','p')
- bool streaming;
+ char streaming;

e.g.2
WalRcvStreamOptions 'streaming' is a C string version of the
subscription streaming mode ("on", "parallel")
- bool streaming; /* Streaming of large transactions */
+ char *streaming; /* Streaming of large transactions */

e.g.3
SubOpts 'streaming' is again like the first example - a single char
for the mode.
- bool streaming;
+ char streaming;

IMO everything would become much simpler if you did:

3a.
Rename "char streaming;" -> "char streaming_mode;"

3b.
Re-designed the "char *streaming;" code to also use the single char
notation, then also call that member 'streaming_mode'. Then everything
will be consistent.

======

doc/src/sgml/config.sgml

4. - max_parallel_apply_workers_per_subscription

+ <varlistentry
id="guc-max-parallel-apply-workers-per-subscription"
xreflabel="max_parallel_apply_workers_per_subscription">
+ <term><varname>max_parallel_apply_workers_per_subscription</varname>
(<type>integer</type>)
+ <indexterm>
+ <primary><varname>max_parallel_apply_workers_per_subscription</varname>
configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Maximum number of parallel apply workers per subscription. This
+ parameter controls the amount of parallelism for streaming of
+ in-progress transactions with subscription parameter
+ <literal>streaming = parallel</literal>.
+ </para>
+ <para>
+ The parallel apply workers are taken from the pool defined by
+ <varname>max_logical_replication_workers</varname>.
+ </para>
+ <para>
+ The default value is 2. This parameter can only be set in the
+ <filename>postgresql.conf</filename> file or on the server command
+ line.
+ </para>
+ </listitem>
+ </varlistentry>

I felt that maybe this should also xref to the
doc/src/sgml/logical-replication.sgml section where you say about
"max_logical_replication_workers should be increased according to the
desired number of parallel apply workers."

=====

5. doc/src/sgml/protocol.sgml

+ <para>
+ Version <literal>4</literal> is supported only for server version 16
+ and above, and it allows applying streams of large in-progress
+ transactions in parallel.
+ </para>

SUGGESTION
... and it allows streams of large in-progress transactions to be
applied in parallel.

======

6. doc/src/sgml/ref/create_subscription.sgml

+ <para>
+ If set to <literal>parallel</literal>, incoming changes are directly
+ applied via one of the parallel apply workers, if available. If no
+ parallel worker is free to handle streaming transactions then the
+ changes are written to temporary files and applied after the
+ transaction is committed. Note that if an error happens when
+ applying changes in a parallel worker, the finish LSN of the
+ remote transaction might not be reported in the server log.
</para>

6a.
"parallel worker is free" -> "parallel apply worker is free"

~

6b.
"Note that if an error happens when applying changes in a parallel
worker," --> "Note that if an error happens in a parallel apply
worker,"

======

7. src/backend/access/transam/xact.c - RecordTransactionAbort

+ /*
+ * Are we using the replication origins feature? Or, in other words, are
+ * we replaying remote actions?
+ */
+ replorigin = (replorigin_session_origin != InvalidRepOriginId &&
+ replorigin_session_origin != DoNotReplicateId);

"Or, in other words," -> "In other words,"

======

src/backend/replication/logical/applyparallelworker.c

8. - file header comment

+ * Refer to the comments in file header of logical/worker.c to see more
+ * information about parallel apply worker.

8a.
"in file header" -> "in the file header"

~

8b.
"about parallel apply worker." -> "about parallel apply workers."

~~~

9. - parallel_apply_can_start

+/*
+ * Returns true, if it is allowed to start a parallel apply worker, false,
+ * otherwise.
+ */
+static bool
+parallel_apply_can_start(TransactionId xid)

(The commas are strange)

SUGGESTION
Returns true if it is OK to start a parallel apply worker, false otherwise.

or just SUGGESTION
Returns true if it is OK to start a parallel apply worker.

~~~

10.

+ /*
+ * Don't start a new parallel worker if not in parallel streaming mode or
+ * the publisher does not support parallel apply.
+ */
+ if (!MyLogicalRepWorker->parallel_apply)
+ return false;

10a.
SUGGESTION
Don't start a new parallel apply worker if the subscription is not
using parallel streaming mode, or if the publisher does not support
parallel apply.

~

10b.
IMO this flag might be better to be called 'parallel_apply_enabled' or
something similar.
(see also review comment #55b.)

~~~

11. - parallel_apply_start_worker

+ /* Try to start a new parallel apply worker. */
+ if (winfo == NULL)
+ winfo = parallel_apply_setup_worker();
+
+ /* Failed to start a new parallel apply worker. */
+ if (winfo == NULL)
+ return;

IMO might be cleaner to write that code like below. And now the 2nd
comment is not really adding anything so it can be removed too.

SUGGESTION
if (winfo == NULL)
{
/* Try to start a new parallel apply worker. */
winfo = parallel_apply_setup_worker();

if (winfo == NULL)
return;
}

~~~

12. - parallel_apply_free_worker

+ SpinLockAcquire(&winfo->shared->mutex);
+ slot_no = winfo->shared->logicalrep_worker_slot_no;
+ generation = winfo->shared->logicalrep_worker_generation;
+ SpinLockRelease(&winfo->shared->mutex);

I know there are not many places doing this, but do you think it might
be worth introducing some new set/get function to encapsulate the
set/get of the generation/slot so it does the mutex spin-locks in
common code?

~~~

13. - LogicalParallelApplyLoop

+ /*
+ * Init the ApplyMessageContext which we clean up after each replication
+ * protocol message.
+ */
+ ApplyMessageContext = AllocSetContextCreate(ApplyContext,
+ "ApplyMessageContext",
+ ALLOCSET_DEFAULT_SIZES);

Because this is in the parallel apply worker should the name (e.g. the
2nd param) be changed to "ParallelApplyMessageContext"?

~~~

14.

+ else if (shmq_res == SHM_MQ_DETACHED)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("lost connection to the leader apply worker")));
+ }
+ /* SHM_MQ_WOULD_BLOCK is purposefully ignored */

Instead of that comment sort of floating in space I wonder if this
code would be better written as a switch, so then you can write this
comment in the 'default' case.

OR, maybe the "else if (shmq_res == SHM_MQ_DETACHED)" should be changed to
SUGGESTION
else if (shmq_res != SHM_MQ_WOULD_BLOCK)

OR, just having an empty code block would be better than just a code
comment all by itself.
SUGGESTION
else
{
/* SHM_MQ_WOULD_BLOCK is purposefully ignored */
}

~~~

15. - ParallelApplyWorkerMain

+ /*
+ * Allocate the origin name in long-lived context for error context
+ * message.
+ */
+ snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);

15a.
"in long-lived" -> "in a long-lived"

~

15b.
Please watch my other thread [1] where I am hoping to push a patch to
will replace these snprintf's with a common function to do the same.
If/when my patch is pushed then this code needs to be changed to call
that new function.

~~~

16. - HandleParallelApplyMessages

+ res = shm_mq_receive(winfo->error_mq_handle, &nbytes,
+ &data, true);

Seems to have unnecessary wrapping.

~~~

17. - parallel_apply_setup_dsm

+/*
+ * Set up a dynamic shared memory segment.
+ *
+ * We set up a control region that contains a fixed worker info
+ * (ParallelApplyWorkerShared), a message queue, and an error queue.
+ *
+ * Returns true on success, false on failure.
+ */
+static bool
+parallel_apply_setup_dsm(ParallelApplyWorkerInfo *winfo)

"fixed worker info" -> "fixed size worker info" ?

~~~

18.

+ * We need one key to register the location of the header, and we need two
+ * other keys to track the locations of the message queue and the error
+ * message queue.

"and we need two other" -> "and two other"

~~~

19. - parallel_apply_wait_for_xact_finish

+void
+parallel_apply_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
+{
+ for (;;)
+ {
+ if (!parallel_apply_get_in_xact(winfo->shared))
+ break;

Should that condition have a comment? All the others do.

~~~

20. - parallel_apply_savepoint_name

The only callers that I could find are from
parallel_apply_start_subtrans and parallel_apply_stream_abort so...

20a.
Why is there an extern in worker_internal.h?

~

20b.
Why is this not declared static?

~~~

21.
The callers to parallel_apply_start_subtrans are both allocating a
name buffer size like:
char spname[MAXPGPATH];

Is that right?

I thought that PG names were limited by NAMEDATALEN.

~~~

22. - parallel_apply_replorigin_setup

+ snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);

Please watch my other thread [1] where I am hoping to push a patch to
will replace these snprintf's with a common function to do the same.
If/when my patch is pushed then this code needs to be changed to call
that new function.

======

src/backend/replication/logical/launcher.c

23. - GUCs

@@ -54,6 +54,7 @@

int max_logical_replication_workers = 4;
int max_sync_workers_per_subscription = 2;
+int max_parallel_apply_workers_per_subscription = 2;

Please watch my other thread [2] where I am hoping to push a patch to
clean up some of these GUV C variable declarations. It is not really
recommended to assign default values to the C variable like this -
they are kind of misleading because they will be overwritten by the
GUC default value when the GUC mechanism starts up.

~~~

24. - logicalrep_worker_launch

+ /* Sanity check: we don't support table sync in subworker. */
+ Assert(!(is_subworker && OidIsValid(relid)));

IMO "we don't support" makes it sound like this is something that
maybe is intended for the future. In fact, I think just this
combination is not possible so it is just a plain sanity check. I
think might be better just say like below

/* Sanity check - tablesync worker cannot be a subworker */

~~~

25.

+ worker->parallel_apply = is_subworker;

It seems kind of strange to assign one boolean to about but they have
completely different names. I wondered if 'is_subworker' should be
called 'is_parallel_apply_worker'?

~~~

26.

if (OidIsValid(relid))
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication worker for subscription %u sync %u", subid, relid);
+ else if (is_subworker)
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "logical replication parallel apply worker for subscription %u", subid);
else
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication worker for subscription %u", subid);

I think that *last* text now be changed like below:

BEFORE
"logical replication worker for subscription %u"
AFTER
"logical replication apply worker for subscription %u"

~~~

27. - logicalrep_worker_stop_internal

+/*
+ * Workhorse for logicalrep_worker_stop(), logicalrep_worker_detach() and
+ * logicalrep_worker_stop_by_slot(). Stop the worker and wait for it to die.
+ */
+static void
+logicalrep_worker_stop_internal(LogicalRepWorker *worker)

IMO it would be better to define this static function *before* all the
callers of it.

~~~

28. - logicalrep_worker_detach

+ /* Stop the parallel apply workers. */
+ if (am_leader_apply_worker())
+ {

Should that comment rather say like below?

/* If this is the leader apply worker then stop all of its parallel
apply workers. */

~~~

29. - pg_stat_get_subscription

+ /* Skip if this is parallel apply worker */
+ if (worker.apply_leader_pid != InvalidPid)
+ continue;

29a.
"is parallel apply" -> "is a parallel apply"

~

29b.
IMO this condition should be using your macro isParallelApplyWorker(worker).

======

30. src/backend/replication/logical/proto.c - logicalrep_read_stream_abort

+ *
+ * If read_abort_info is true, try to read the abort_lsn and abort_time fields,
+ * otherwise don't.
*/
void
-logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
- TransactionId *subxid)
+logicalrep_read_stream_abort(StringInfo in,
+ LogicalRepStreamAbortData *abort_data,
+ bool read_abort_info)

"try to read" -> "read"

======

31. src/backend/replication/logical/tablesync.c - process_syncing_tables

process_syncing_tables(XLogRecPtr current_lsn)
{
+ if (am_parallel_apply_worker())
+ return;
+

Maybe should have some comment here like:

/* Skip for parallel apply workers. */

======

src/backend/replication/logical/worker.c

32. - file header comment

+ * the list for any available worker. Note that we maintain a maximum of half
+ * the max_parallel_apply_workers_per_subscription workers in the pool and
+ * after that, we simply exit the worker after applying the transaction. This
+ * worker pool threshold is a bit arbitrary and we can provide a guc for this
+ * in the future if required.

IMO that sentence beginning with "This worker pool" should be written
as an XXX-style comment.

Also "guc" -> "GUC variable"

e.g.

* the list for any available worker. Note that we maintain a maximum of half
* the max_parallel_apply_workers_per_subscription workers in the pool and
* after that, we simply exit the worker after applying the transaction.
*
* XXX This worker pool threshold is a bit arbitrary and we can provide a GUC
* variable for this in the future if required.

~~~

33.

* we cannot count how many workers will be started. It may be possible to
* allocate enough shared memory in one segment based on the maximum number of
* parallel apply workers
(max_parallel_apply_workers_per_subscription), but this
* may waste some memory if no process is actually started.

"may waste some memory" -> "would waste memory"

~~~

34.

+ * In case, no worker is available to handle the streamed transaction, we
+ * follow approach 2.

SUGGESTION
If no parallel apply worker is available to handle the streamed
transaction we follow approach 2.

~~~

35. - TransApplyAction

+ * TRANS_LEADER_SERIALIZE means that we are in leader apply worker and changes
+ * are written to temporary files and then applied when the final commit
+ * arrives.

"in leader apply" -> "in the leader apply"

~~~

36 - should_apply_changes_for_rel

should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
{
if (am_tablesync_worker())
return MyLogicalRepWorker->relid == rel->localreloid;
+ else if (am_parallel_apply_worker())
+ {
+ if (rel->state != SUBREL_STATE_READY)
+ ereport(ERROR,
+ (errmsg("logical replication apply workers for subscription \"%s\"
will restart",
+ MySubscription->name),
+ errdetail("Cannot handle streamed replication transaction using parallel "
+ "apply workers until all tables are synchronized.")));
+
+ return true;
+ }
else
return (rel->state == SUBREL_STATE_READY ||
(rel->state == SUBREL_STATE_SYNCDONE &&
@@ -427,43 +519,87 @@ end_replication_step(void)

This function can be made tidier just by removing all the 'else' ...

SUGGESTION
if (am_tablesync_worker())
return ...
if (am_parallel_apply_worker())
{
...
return true;
}

Assert(am_leader_apply_worker());
return ...

~~~

37. - handle_streamed_transaction

+ /*
+ * XXX The publisher side doesn't always send relation/type update
+ * messages after the streaming transaction, so also update the
+ * relation/type in leader apply worker here. See function
+ * cleanup_rel_sync_cache.
+ */
+ if (action == LOGICAL_REP_MSG_RELATION ||
+ action == LOGICAL_REP_MSG_TYPE)
+ return false;
+ return true;

37.
"so also update the relation/type in leader apply worker here"

Is that comment worded correctly? There is nothing being updated "here".

~

37.
That code is the same as:

return (action != LOGICAL_REP_MSG_RELATION && action != LOGICAL_REP_MSG_TYPE);

~~~

38. - apply_handle_commit_prepared

+ *
+ * Note that we don't need to wait here if the transaction was prepared in a
+ * parallel apply worker. Because we have already waited for the prepare to
+ * finish in apply_handle_stream_prepare() which will ensure all the operations
+ * in that transaction have happened in the subscriber and no concurrent
+ * transaction can create deadlock or transaction dependency issues.
*/
static void
apply_handle_commit_prepared(StringInfo s)

"worker. Because" -> "worker because"

~~~

39. - apply_handle_rollback_prepared

+ *
+ * Note that we don't need to wait here if the transaction was prepared in a
+ * parallel apply worker. Because we have already waited for the prepare to
+ * finish in apply_handle_stream_prepare() which will ensure all the operations
+ * in that transaction have happened in the subscriber and no concurrent
+ * transaction can create deadlock or transaction dependency issues.
*/
static void
apply_handle_rollback_prepared(StringInfo s)

See previous review comment #38 above.

~~~

40. - apply_handle_stream_prepare

+ case TRANS_LEADER_SERIALIZE:

- /* Mark the transaction as prepared. */
- apply_handle_prepare_internal(&prepare_data);
+ /*
+ * The transaction has been serialized to file, so replay all the
+ * spooled operations.
+ */

Spurious blank line after the 'case'.

FYI - this same blank line is also in all the other switch/case that
looked like this one, so if you will fix it then please check all
those other places too...

~~~

41. - apply_handle_stream_start

+ *
+ * XXX We can avoid sending pair of the START/STOP messages to the parallel
+ * worker because unlike apply worker it will process only one
+ * transaction-at-a-time. However, it is not clear whether that is worth the
+ * effort because it is sent after logical_decoding_work_mem changes.
*/
static void
apply_handle_stream_start(StringInfo s)

"sending pair" -> "sending pairs"

~~~

42.

- /* notify handle methods we're processing a remote transaction */
+ /* Notify handle methods we're processing a remote transaction. */
in_streamed_transaction = true;
Changing this comment seemed unrelated to this patch, so maybe don't do this.

~~~

43.

/*
- * Initialize the worker's stream_fileset if we haven't yet. This will be
- * used for the entire duration of the worker so create it in a permanent
- * context. We create this on the very first streaming message from any
- * transaction and then use it for this and other streaming transactions.
- * Now, we could create a fileset at the start of the worker as well but
- * then we won't be sure that it will ever be used.
+ * For the first stream start, check if there is any free parallel apply
+ * worker we can use to process this transaction.
*/
- if (MyLogicalRepWorker->stream_fileset == NULL)
+ if (first_segment)
+ parallel_apply_start_worker(stream_xid);

This comment update seems misleading. The
parallel_apply_start_worker() isn't just checking if there is a free
worker. All that free worker logic stuff is *inside* the
parallel_apply_start_worker() function, so maybe no need to mention
about it here at the caller.

~~~

44.

+ case TRANS_PARALLEL_APPLY:
+ break;

Should this include a comment explaining why there is nothing to do?

~~~

39. - apply_handle_stream_abort

+ /* We receive abort information only when we can apply in parallel. */
+ if (MyLogicalRepWorker->parallel_apply)
+ read_abort_info = true;

44a.
SUGGESTION
We receive abort information only when the publisher can support parallel apply.

~

44b.
Why not remove the assignment in the declaration, and just write this code as:
read_abort_info = MyLogicalRepWorker->parallel_apply;

~~~

45.

+ /*
+ * We are in leader apply worker and the transaction has been
+ * serialized to file.
+ */
+ serialize_stream_abort(xid, subxid);

"in leader apply worker" -> "in the leader apply worker"

~~~

46. - store_flush_position

/* Skip if not the leader apply worker */
if (am_parallel_apply_worker())
return;
I previously wrote something about this and Hou-san gave a reason [3]
why not to change the condition.

But the comment still does not match the code, because a tablesync
worker would get past here.

Maybe the comment is wrong?

~~~

47. - InitializeApplyWorker

+/*
+ * The common initialization for leader apply worker and parallel apply worker.
+ *
+ * Initialize the database connection, in-memory subscription and necessary
+ * config options.
+ */
void
-ApplyWorkerMain(Datum main_arg)
+InitializeApplyWorker(void)

"The common initialization" -> "Common initialization"

~~~

48. - ApplyWorkerMain

+/* Logical Replication Apply worker entry point */
+void
+ApplyWorkerMain(Datum main_arg)

"Apply worker" -> "apply worker"

~~~

49.

+ /*
+ * We don't currently need any ResourceOwner in a walreceiver process, but
+ * if we did, we could call CreateAuxProcessResourceOwner here.
+ */

I think this comment should have "XXX" prefix.

~~~

50.

+ if (server_version >= 160000 &&
+ MySubscription->stream == SUBSTREAM_PARALLEL)
+ {
+ options.proto.logical.streaming = pstrdup("parallel");
+ MyLogicalRepWorker->parallel_apply = true;
+ }
+ else if (server_version >= 140000 &&
+ MySubscription->stream != SUBSTREAM_OFF)
+ options.proto.logical.streaming = pstrdup("on");
+ else
+ options.proto.logical.streaming = NULL;

IMO it might make more sense for these conditions to be checking the
'options.proto.logical.proto_version' here instead of checking the
hardwired server versions. Also, I suggest may be better (for clarity)
to always assign the parallel_apply member.

SUGGESTION

if (options.proto.logical.proto_version >=
LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM &&
MySubscription->stream == SUBSTREAM_PARALLEL)
{
options.proto.logical.streaming = pstrdup("parallel");
MyLogicalRepWorker->parallel_apply = true;
}
else if (options.proto.logical.proto_version >=
LOGICALREP_PROTO_STREAM_VERSION_NUM &&
MySubscription->stream != SUBSTREAM_OFF)
{
options.proto.logical.streaming = pstrdup("on");
MyLogicalRepWorker->parallel_apply = false;
}
else
{
options.proto.logical.streaming = NULL;
MyLogicalRepWorker->parallel_apply = false;
}

~~~

51. - clear_subscription_skip_lsn

- if (likely(XLogRecPtrIsInvalid(myskiplsn)))
+ if (likely(XLogRecPtrIsInvalid(myskiplsn)) ||
+ am_parallel_apply_worker())
return;

Unnecessary wrapping.

~~~

52. - get_transaction_apply_action

+static TransApplyAction
+get_transaction_apply_action(TransactionId xid,
ParallelApplyWorkerInfo **winfo)
+{
+ *winfo = NULL;
+
+ if (am_parallel_apply_worker())
+ {
+ return TRANS_PARALLEL_APPLY;
+ }
+ else if (in_remote_transaction)
+ {
+ return TRANS_LEADER_APPLY;
+ }
+
+ /*
+ * Check if we are processing this transaction using a parallel apply
+ * worker and if so, send the changes to that worker.
+ */
+ else if ((*winfo = parallel_apply_find_worker(xid)))
+ {
+ return TRANS_LEADER_SEND_TO_PARALLEL;
+ }
+ else
+ {
+ return TRANS_LEADER_SERIALIZE;
+ }
+}

52a.
All these if/else and code blocks seem excessive. It can be simplified
as follows:

SUGGESTION

static TransApplyAction
get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
{
*winfo = NULL;

if (am_parallel_apply_worker())
return TRANS_PARALLEL_APPLY;

if (in_remote_transaction)
return TRANS_LEADER_APPLY;

/*
* Check if we are processing this transaction using a parallel apply
* worker and if so, send the changes to that worker.
*/
if ((*winfo = parallel_apply_find_worker(xid)))
return TRANS_LEADER_SEND_TO_PARALLEL;

return TRANS_LEADER_SERIALIZE;
}

~

52b.
Can a tablesync worker ever get here? It might be better to
Assert(!am_tablesync_worker()); at top of this function?

======

src/backend/replication/pgoutput/pgoutput.c

53. - pgoutput_startup

ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("requested proto_version=%d does not support streaming, need
%d or higher",
data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
+ else if (data->streaming == SUBSTREAM_PARALLEL &&
+ data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("requested proto_version=%d does not support parallel
streaming mode, need %d or higher",
+ data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));

The previous error message just says "streamimg", not "streaming mode"
so for consistency better to remove that word "mode" IMO.

~~~

54. - pgoutput_stream_abort

- logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid);
+ logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid,
abort_lsn, txn->xact_time.abort_time, write_abort_info);
+

Wrapping is needed here.

======

src/include/replication/worker_internal.h

55. - LogicalRepWorker

+ /* Indicates whether apply can be performed parallelly. */
+ bool parallel_apply;
+

55a.
"parallelly" - ?? is there a better way to phrase this? IMO that is an
uncommon word.

~

55b.
IMO this member name should be named slightly different to give a
better feel for what it really means.

Maybe something like one of:
"parallel_apply_ok"
"parallel_apply_enabled"
"use_parallel_apply"
etc?

~~~

56. - ParallelApplyWorkerInfo

+ /*
+ * Indicates whether the worker is available to be used for parallel apply
+ * transaction?
+ */
+ bool in_use;

As previously posted [4], this member comment is describing the
opposite of the member name. (e.g. the comment would be correct if the
member was called 'is_available', but it isn't)

SUGGESTION
True if the worker is being used to process a parallel apply
transaction. False indicates this worker is available for re-use.

~~~

57. - am_leader_apply_worker

+static inline bool
+am_leader_apply_worker(void)
+{
+ return (!OidIsValid(MyLogicalRepWorker->relid) &&
+ !isParallelApplyWorker(MyLogicalRepWorker));
+}

I wondered if it would be tidier/easier to define this function like
below. The others are inline functions anyhow so it should end up as
the same thing, right?

static inline bool
am_leader_apply_worker(void)
{
return (!am_tablesync_worker() && !am_parallel_apply_worker);
}

======

58.

--- fail - streaming must be boolean
+-- fail - streaming must be boolean or 'parallel'
CREATE SUBSCRIPTION regress_testsub CONNECTION
'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect =
false, streaming = foo);

I think there are tests already for explicitly create/set the
subscription parameter streaming = on/off/parallel

But what about when there is no value explicitly specified? Shouldn't
there also be tests like below to check that *implied* boolean true
still works for this enum?

CREATE SUBSCRIPTION ... WITH (streaming)
ALTER SUBSCRIPTION ... SET (streaming)

------
[1] My patch snprintfs -
https://www.postgresql.org/message-id/flat/CAHut%2BPsB9hEEU-JHqTUBL3bv--vesUvThYr1-95ZyG5PkF9PQQ%40mail.gmail.com#17abe65e826f48d3d5a1cf5b83ce5271
[2] My patch GUC C vars -
https://www.postgresql.org/message-id/flat/CAHut%2BPsWxJgmrAvPsw9smFVAvAoyWstO7ttAkAq8NKDhsVNa3Q%40mail.gmail.com#1526a180383a3374ae4d701f25799926
[3] Houz reply comment #41 -
https://www.postgresql.org/message-id/OS0PR01MB5716E7E5798625AE9437CD6F94439%40OS0PR01MB5716.jpnprd01.prod.outlook.com
[4] Previous review comment #13 -
https://www.postgresql.org/message-id/CAHut%2BPuVjRgGr4saN7qwq0oB8DANHVR7UfDiciB1Q3cYN54F6A%40mail.gmail.com

Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Alvaro Herrera 2022-09-30 09:17:00 Re: interrupted tap tests leave postgres instances around
Previous Message Benjamin Coutu 2022-09-30 08:00:31 Re: disfavoring unparameterized nested loops