Re: Perform streaming logical transactions by background workers and parallel apply

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-06-21 01:41:20
Message-ID: CAHut+Ptu_eWOVWAKrwkUFdTAh_r-RZsbDFkFmKwEAmxws=Sh5w@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Here are some review comments for the v11-0001 patch.

(I will review the remaining patches 0002-0005 and post any comments later)

======

1. General

I still feel that 'apply' seems like a meaningless enum value for this
feature because from a user point-of-view every replicated change gets
"applied". IMO something like 'streaming = parallel' or 'streaming =
background' (etc) might have more meaning for a user.

======

2. Commit message

We also need to allow stream_stop to complete by the
apply background worker to avoid deadlocks because T-1's current stream of
changes can update rows in conflicting order with T-2's next stream of changes.

Did this mean to say?
"allow stream_stop to complete by" -> "allow stream_stop to be performed by"

~~~

3. Commit message

This patch also extends the SUBSCRIPTION 'streaming' option so that the user
can control whether to apply the streaming transaction in an apply background
worker or spill the change to disk. User can set the streaming option to
'on/off', 'apply'. For now, 'apply' means the streaming will be applied via a
apply background worker if available. 'on' means the streaming transaction will
be spilled to disk.

3a.
"option" -> "parameter" (2x)

3b.
"User can" -> "The user can"

3c.
I think this part should also mention that the stream parameter
default is unchanged...

======

4. doc/src/sgml/config.sgml

+ <para>
+ Maximum number of apply background workers per subscription. This
+ parameter controls the amount of parallelism of the streaming of
+ in-progress transactions if we set subscription option
+ <literal>streaming</literal> to <literal>apply</literal>.
+ </para>

"if we set subscription option <literal>streaming</literal> to
<literal>apply</literal>." -> "when subscription parameter
<literal>streaming = apply</literal>.

======

5. doc/src/sgml/config.sgml

+ <para>
+ Setting streaming mode to <literal>apply</literal> could export invalid LSN
+ as finish LSN of failed transaction. Changing the streaming mode and making
+ the same conflict writes the finish LSN of the failed transaction in the
+ server log if required.
+ </para>

This text made no sense to me. Can you reword it?

IIUC it means something like this:
When the streaming mode is 'apply', the finish LSN of failed
transactions may not be logged. In that case, it may be necessary to
change the streaming mode and cause the same conflicts again so the
finish LSN of the failed transaction will be written to the server
log.

======

6. doc/src/sgml/protocol.sgml

Since there are protocol changes made here, shouldn’t there also be
some corresponding LOGICALREP_PROTO_XXX constants and special checking
added in the worker.c?

======

7. doc/src/sgml/ref/create_subscription.sgml

+ for this subscription. The default value is <literal>off</literal>,
+ all transactions are fully decoded on the publisher and only then
+ sent to the subscriber as a whole.
+ </para>

SUGGESTION
The default value is off, meaning all transactions are fully decoded
on the publisher and only then sent to the subscriber as a whole.

~~~

8. doc/src/sgml/ref/create_subscription.sgml

+ <para>
+ If set to <literal>on</literal>, the changes of transaction are
+ written to temporary files and then applied at once after the
+ transaction is committed on the publisher.
+ </para>

SUGGESTION
If set to on, the incoming changes are written to a temporary file and
then applied only after the transaction is committed on the publisher.

~~~

9. doc/src/sgml/ref/create_subscription.sgml

+ <para>
+ If set to <literal>apply</literal> incoming
+ changes are directly applied via one of the background workers, if
+ available. If no background worker is free to handle streaming
+ transaction then the changes are written to a file and applied after
+ the transaction is committed. Note that if an error happens when
+ applying changes in a background worker, it might not report the
+ finish LSN of the remote transaction in the server log.
</para>

SUGGESTION
If set to apply, the incoming changes are directly applied via one of
the apply background workers, if available. If no background worker is
free to handle streaming transactions then the changes are written to
a file and applied after the transaction is committed. Note that if an
error happens when applying changes in a background worker, the finish
LSN of the remote transaction might not be reported in the server log.

======

10. src/backend/access/transam/xact.c

@@ -1741,6 +1742,13 @@ RecordTransactionAbort(bool isSubXact)
elog(PANIC, "cannot abort transaction %u, it was already committed",
xid);

+ /*
+ * Are we using the replication origins feature? Or, in other words,
+ * are we replaying remote actions?
+ */
+ replorigin = (replorigin_session_origin != InvalidRepOriginId &&
+ replorigin_session_origin != DoNotReplicateId);
+
/* Fetch the data we need for the abort record */
nrels = smgrGetPendingDeletes(false, &rels);
nchildren = xactGetCommittedChildren(&children);
@@ -1765,6 +1773,11 @@ RecordTransactionAbort(bool isSubXact)
MyXactFlags, InvalidTransactionId,
NULL);

+ if (replorigin)
+ /* Move LSNs forward for this replication origin */
+ replorigin_session_advance(replorigin_session_origin_lsn,
+ XactLastRecEnd);
+

I did not see any reason why the code assigning the 'replorigin' and
the code checking the 'replorigin' are separated like they are. I
thought these 2 new code fragments should be kept together. Perhaps it
was decided this assignment must be outside the critical section? But
if that’s the case maybe a comment explaining so would be good.

~~~

11. src/backend/access/transam/xact.c

+ if (replorigin)
+ /* Move LSNs forward for this replication origin */
+ replorigin_session_advance(replorigin_session_origin_lsn,
+

The positioning of that comment is unusual. Maybe better before the check?

======

12. src/backend/commands/subscriptioncmds.c - defGetStreamingMode

+ /*
+ * If no parameter given, assume "true" is meant.
+ */
+ if (def->arg == NULL)
+ return SUBSTREAM_ON;

SUGGESTION for comment
If the streaming parameter is given but no parameter value is
specified, then assume "true" is meant.

~~~

13. src/backend/commands/subscriptioncmds.c - defGetStreamingMode

+ /*
+ * Allow 0, 1, "true", "false", "on", "off" or "apply".
+ */

IMO these should be in a order consistent with the code.

SUGGESTION
Allow 0, 1, “false”, "true", “off”, "on", or "apply".

======

14. src/backend/replication/logical/Makefile

- worker.o
+ worker.o \
+ applybgwroker.o

typo "applybgwroker" -> "applybgworker"

======

15. .../replication/logical/applybgwroker.c

+/*-------------------------------------------------------------------------
+ * applybgwroker.c
+ * Support routines for applying xact by apply background worker
+ *
+ * Copyright (c) 2016-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/applybgwroker.c

15a.
Typo in filename: "applybgwroker" -> "applybgworker"

15b.
Typo in file header comment: "applybgwroker" -> "applybgworker"

~~~

16. .../replication/logical/applybgwroker.c

+/*
+ * entry for a hash table we use to map from xid to our apply background worker
+ * state.
+ */
+typedef struct ApplyBgworkerEntry

Comment should start uppercase.

~~~

17. .../replication/logical/applybgwroker.c

+/*
+ * Fields to record the share informations between main apply worker and apply
+ * background worker.
+ */

SUGGESTION
Information shared between main apply worker and apply background worker.

~~~

18. .../replication/logical/applybgwroker.c

+/* apply background worker setup */
+static ApplyBgworkerState *apply_bgworker_setup(void);
+static void apply_bgworker_setup_dsm(ApplyBgworkerState *wstate);

IMO there was not really any need for this comment – these are just
function forward declares.

~~~

19. .../replication/logical/applybgwroker.c - find_or_start_apply_bgworker

+ if (found)
+ {
+ entry->wstate->pstate->status = APPLY_BGWORKER_BUSY;
+ return entry->wstate;
+ }
+ else if (!start)
+ return NULL;

I felt this might be more readable without the else:

if (found)
{
entry->wstate->pstate->status = APPLY_BGWORKER_BUSY;
return entry->wstate;
}
Assert(!found)
if (!start)
return NULL;

~~~

20. .../replication/logical/applybgwroker.c - find_or_start_apply_bgworker

+ /*
+ * Now, we try to get a apply background worker. If there is at least one
+ * worker in the idle list, then take one. Otherwise, we try to start a
+ * new apply background worker.
+ */

20a.
"a apply" -> "an apply"

20b.
IMO it's better to call this the free list (not the idle list)

~~~

21. .../replication/logical/applybgwroker.c - find_or_start_apply_bgworker

+ /*
+ * If the apply background worker cannot be launched, remove entry
+ * in hash table.
+ */

"remove entry in hash table" -> "remove the entry from the hash table"

~~~

22. .../replication/logical/applybgwroker.c - apply_bgworker_free

+/*
+ * Add the worker to the free list and remove the entry from hash table.
+ */

"from hash table" -> "from the hash table"

~~~

23. .../replication/logical/applybgwroker.c - apply_bgworker_free

+ elog(DEBUG1, "adding finished apply worker #%u for xid %u to the idle list",
+ wstate->pstate->n, wstate->pstate->stream_xid);

IMO it's better to call this the free list (not the idle list)

~~~

24. .../replication/logical/applybgwroker.c - LogicalApplyBgwLoop

+/* Apply Background Worker main loop */
+static void
+LogicalApplyBgwLoop(shm_mq_handle *mqh, volatile ApplyBgworkerShared *pst)

Why is the name incosistent with other function names in the file?
Should it be apply_bgworker_loop?

~~~

25. .../replication/logical/applybgwroker.c - LogicalApplyBgwLoop

+ /*
+ * Push apply error context callback. Fields will be filled during
+ * applying a change.
+ */

"during" -> "when"

~~~

26. .../replication/logical/applybgwroker.c - LogicalApplyBgwLoop

+ /*
+ * We use first byte of message for additional communication between
+ * main Logical replication worker and apply bgworkers, so if it
+ * differs from 'w', then process it first.
+ */

"bgworkers" -> "background workers"

~~~

27. .../replication/logical/applybgwroker.c - ApplyBgwShutdown

For consistency should it be called apply_bgworker_shutdown?

~~~

28. .../replication/logical/applybgwroker.c - LogicalApplyBgwMain

For consistency should it be called apply_bgworker_main?

~~~

29. .../replication/logical/applybgwroker.c - apply_bgworker_check_status

+ errdetail("Cannot handle streamed replication transaction by apply "
+ "bgworkers until all tables are synchronized")));

"bgworkers" -> "background workers"

======

30. src/backend/replication/logical/decode.c

@@ -651,9 +651,10 @@ DecodeCommit(LogicalDecodingContext *ctx,
XLogRecordBuffer *buf,
{
for (i = 0; i < parsed->nsubxacts; i++)
{
- ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
+ ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr,
+ commit_time);
}
- ReorderBufferForget(ctx->reorder, xid, buf->origptr);
+ ReorderBufferForget(ctx->reorder, xid, buf->origptr, commit_time);

ReorderBufferForget was declared with 'abort_time' param. So it makes
these calls a bit confusing looking to be passing 'commit_time'

Maybe better to do like below and pass 'forget_time' (inside that
'if') along with an explanatory comment:

TimestampTz forget_time = commit_time;

======

31. src/backend/replication/logical/launcher.c - logicalrep_worker_find

+ /* We only need main apply worker or table sync worker here */

"need" -> "are interested in the"

~~~

32. src/backend/replication/logical/launcher.c - logicalrep_worker_launch

+ if (!is_subworker)
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
+ else
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyBgworkerMain");

IMO better to reverse this and express the condition as 'if (is_subworker)'

~~~

33. src/backend/replication/logical/launcher.c - logicalrep_worker_launch

+ else if (!is_subworker)
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication worker for subscription %u", subid);
+ else
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "logical replication background apply worker for subscription %u ", subid);

33a.
Ditto. IMO better to reverse this and express the condition as 'if
(is_subworker)'

33b.
"background apply worker" -> "apply background worker"

~~~

34. src/backend/replication/logical/launcher.c - logicalrep_worker_stop

IMO this code logic should be rewritten to be simpler to have a common
LWLockRelease. This also makes the code more like
logicalrep_worker_detach, which seems like a good thing.

SUGGESTION
logicalrep_worker_stop(Oid subid, Oid relid)
{
LogicalRepWorker *worker;

LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);

worker = logicalrep_worker_find(subid, relid, false);

if (worker)
logicalrep_worker_stop_internal(worker);

LWLockRelease(LogicalRepWorkerLock);
}

~~~

35. src/backend/replication/logical/launcher.c -
logicalrep_apply_background_worker_count

+/*
+ * Count the number of registered (not necessarily running) apply background
+ * worker for a subscription.
+ */

"worker" -> "workers"

~~~

36. src/backend/replication/logical/launcher.c -
logicalrep_apply_background_worker_count

+ int res = 0;
+

A better variable name here would be 'count', or even 'n'.

======

36. src/backend/replication/logical/origin.c

+ * However, If must_acquire is false, we allow process to get the slot which is
+ * already acquired by other process.

SUGGESTION
However, if the function parameter 'must_acquire' is false, we allow
the process to use the same slot already acquired by another process.

~~~

37. src/backend/replication/logical/origin.c

+ ereport(ERROR,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("could not find correct replication state slot for
replication origin with OID %u for apply background worker",
+ node),
+ errhint("There is no replication state slot set by its main apply worker.")));

37a.
Somehow, I felt the errmsg and the errhint could be clearer. Maybe like this?

" apply background worker could not find replication state slot for
replication origin with OID %u",

"There is no replication state slot set by the main apply worker."

37b.
Also, I think thet generally the 'errhint' informs some advice or some
action that the user can take to fix the problem. But is this errhint
actually saying anything useful for the user? Perhaps you meant to say
'errdetail' here?

======

38. src/backend/replication/logical/proto.c - logicalrep_read_stream_abort

+ /*
+ * If the version of the publisher is lower than the version of the
+ * subscriber, it may not support sending these two fields, so only take
+ * these fields when include_abort_lsn is true.
+ */
+ if (include_abort_lsn)
+ {
+ abort_data->abort_lsn = pq_getmsgint64(in);
+ abort_data->abort_time = pq_getmsgint64(in);
+ }
+ else
+ {
+ abort_data->abort_lsn = InvalidXLogRecPtr;
+ abort_data->abort_time = 0;
+ }

This comment is documenting a decision that was made elsewhere.

But it somehow feels wrong to me that the decision to read or not read
the abort time/lsn is made by the caller of this function. IMO it
might make more sense if the server version was simply passed as a
param and then this function can be in control of its own destiny and
make the decision does it need to read those extra fields or not. An
extra member flag can be added to LogicalRepStreamAbortData to
indicate if abort_data read these values or not.

======

39. src/backend/replication/logical/worker.c

* Streamed transactions (large transactions exceeding a memory limit on the
- * upstream) are not applied immediately, but instead, the data is written
- * to temporary files and then applied at once when the final commit arrives.
+ * upstream) are applied via one of two approaches.

"via" -> "using"

~~~

40. src/backend/replication/logical/worker.c

+ * Assign a new apply background worker (if available) as soon as the xact's
+ * first stream is received and the main apply worker will send changes to this
+ * new worker via shared memory. We keep this worker assigned till the
+ * transaction commit is received and also wait for the worker to finish at
+ * commit. This preserves commit ordering and avoids writing to and reading
+ * from file in most cases. We still need to spill if there is no worker
+ * available. We also need to allow stream_stop to complete by the background
+ * worker to avoid deadlocks because T-1's current stream of changes can update
+ * rows in conflicting order with T-2's next stream of changes.

40a.
"and the main apply -> ". The main apply"

40b.
"and avoids writing to and reading from file in most cases." -> "and
avoids file I/O in most cases."

40c.
"We still need to spill if" -> "We still need to spill to a file if"

40d.
"We also need to allow stream_stop to complete by the background
worker" -> "We also need to allow stream_stop to be performed by the
background worker"

~~~

41. src/backend/replication/logical/worker.c

-static ApplyErrorCallbackArg apply_error_callback_arg =
+ApplyErrorCallbackArg apply_error_callback_arg =
{
.command = 0,
.rel = NULL,
@@ -242,7 +246,7 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
.origin_name = NULL,
};

Maybe it is still a good idea to at least keep the old comment here:
/* Struct for saving and restoring apply errcontext information */

~~

42. src/backend/replication/logical/worker.c

+/* check if we are applying the transaction in apply background worker */
+#define apply_bgworker_active() (in_streamed_transaction &&
stream_apply_worker != NULL)

42a.
Uppercase comment.

42b.
"in apply background worker" -> "in apply background worker"

~~~

43. src/backend/replication/logical/worker.c - handle_streamed_transaction

@@ -426,41 +437,76 @@ end_replication_step(void)
}

/*
- * Handle streamed transactions.
+ * Handle streamed transactions for both main apply worker and apply background
+ * worker.
*
- * If in streaming mode (receiving a block of streamed transaction), we
- * simply redirect it to a file for the proper toplevel transaction.
+ * In streaming case (receiving a block of streamed transaction), for
+ * SUBSTREAM_ON mode, we simply redirect it to a file for the proper toplevel
+ * transaction, and for SUBSTREAM_APPLY mode, we send the changes to background
+ * apply worker (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE changes will
+ * also be applied in main apply worker).
*
- * Returns true for streamed transactions, false otherwise (regular mode).
+ * For non-streamed transactions, returns false;
+ * For streamed transactions, returns true if in main apply worker (except we
+ * apply streamed transaction in "apply" mode and address
+ * LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE changes), false otherwise.
*/

Maybe it is accurate (I don’t know), but this header comment seems
excessively complicated with so many quirks about when to return
true/false. Can it be reworded into plainer language?

~~~

44. src/backend/replication/logical/worker.c - handle_streamed_transaction

Because there are so many returns for each of these conditions,
consider refactoring the logic to change all the if/else to just be
"if" and then you can comment each separate cases better. I think it
may be clearer.

SUGGESTION

/* This is the apply background worker */
if (am_apply_bgworker())
{
...
return false;
}

/* This is the main apply, but there is an apply background worker */
if (apply_bgworker_active())
{
...
return true;
}

/* This is the main apply, and there is no apply background worker */
...
return true;

~~~

45. src/backend/replication/logical/worker.c - apply_handle_stream_prepare

+ /*
+ * This is the main apply worker. Check if we are processing this
+ * transaction in a apply background worker.
+ */
+ if (wstate)

I think the part that says "This is the main apply worker" should be
at the top of the 'else'

~~~

46. src/backend/replication/logical/worker.c - apply_handle_stream_prepare

+ /*
+ * This is the main apply worker and the transaction has been
+ * serialized to file, replay all the spooled operations.
+ */

SUGGESTION
The transaction has been serialized to file. Replay all the spooled operations.

~~~

47. src/backend/replication/logical/worker.c - apply_handle_stream_prepare

+ /* unlink the files with serialized changes and subxact info. */
+ stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);

Start comment with capital letter.

~~~

48. src/backend/replication/logical/worker.c - apply_handle_stream_start

+ /* If we are in a apply background worker, begin the transaction */
+ AcceptInvalidationMessages();
+ maybe_reread_subscription();

The "if we are" part of the comment is not needed because the fact the
code is inside am_apply_bgworker() makes this obvious anyway/

~~~

49. src/backend/replication/logical/worker.c - apply_handle_stream_start

+ /* open the spool file for this transaction */
+ stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment);
+

Start the comment uppercase.

+ /* if this is not the first segment, open existing subxact file */
+ if (!first_segment)
+ subxact_info_read(MyLogicalRepWorker->subid, stream_xid);

Start the comment uppercase.

~~~

50. src/backend/replication/logical/worker.c - apply_handle_stream_abort

+ /* Check whether the publisher sends abort_lsn and abort_time. */
+ if (am_apply_bgworker())
+ include_abort_lsn = MyParallelState->server_version >= 150000;
+
+ logicalrep_read_stream_abort(s, &abort_data, include_abort_lsn);

Here is where I felt maybe just the server version could be passed so
the logicalrep_read_stream_abort could decide itself what message
parts needed to be read. Basically it seems strange that the message
contain parts which might not be read. I felt it is better to always
read the whole message then later you can choose what parts you are
interested in.

~~~

51. src/backend/replication/logical/worker.c - apply_handle_stream_abort

+ /*
+ * This is the main apply worker. Check if we are processing this
+ * transaction in a apply background worker.
+ */

+ /*
+ * We are in main apply worker and the transaction has been serialized
+ * to file.
+ */

51a.
I thought the "This is the main apply worker" and "We are in main
apply worker" should just be be a comment top of this "else"

51b.
"a apply worker" -> "an apply worker"

51c.
There seemed to be some missing comment to say this logic is telling
the bgworker to abort and then waiting for it to do so.

~~~

52. src/backend/replication/logical/worker.c - apply_handle_stream_commit

I did not really understand why the patch relocates this function to
another place in the file. Can't it be left in the same place?

~~~

53. src/backend/replication/logical/worker.c - apply_handle_stream_commit

+ /*
+ * This is the main apply worker. Check if we are processing this
+ * transaction in an apply background worker.
+ */

I thought the top of the else should just say "This is the main apply worker."

Then the if (wstate) part should say “Check if we are processing this
transaction in an apply background worker, and if so tell it to
comment the message”/

~~~

54. src/backend/replication/logical/worker.c - apply_handle_stream_commit

+ /*
+ * This is the main apply worker and the transaction has been
+ * serialized to file, replay all the spooled operations.
+ */

SUGGESTION
The transaction has been serialized to file, so replay all the spooled
operations.

~~~

55. src/backend/replication/logical/worker.c - apply_handle_stream_commit

+ /* unlink the files with serialized changes and subxact info */
+ stream_cleanup_files(MyLogicalRepWorker->subid, xid);

Uppercase comment.

======

56. src/backend/utils/misc/guc.c

@@ -3220,6 +3220,18 @@ static struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},

+ {
+ {"max_apply_bgworkers_per_subscription",
+ PGC_SIGHUP,
+ REPLICATION_SUBSCRIBERS,
+ gettext_noop("Maximum number of apply backgrand workers per subscription."),
+ NULL,
+ },
+ &max_apply_bgworkers_per_subscription,
+ 3, 0, MAX_BACKENDS,
+ NULL, NULL, NULL
+ },
+

"backgrand" -> "background"

======

57. src/include/catalog/pg_subscription.h

@@ -109,7 +110,7 @@ typedef struct Subscription
bool enabled; /* Indicates if the subscription is enabled */
bool binary; /* Indicates if the subscription wants data in
* binary format */
- bool stream; /* Allow streaming in-progress transactions. */
+ char stream; /* Allow streaming in-progress transactions. */
char twophasestate; /* Allow streaming two-phase transactions */
bool disableonerr; /* Indicates if the subscription should be
* automatically disabled if a worker error

I felt probably this 'stream' comment should be the same as for 'substream'.

======

58. src/include/replication/worker_internal.h

+/*
+ * Shared information among apply workers.
+ */
+typedef struct ApplyBgworkerShared

SUGGESTION (maybe you can do better than this)
Struct for sharing information between apply main and apply background workers.

~~~

59. src/include/replication/worker_internal.h

+ /* Status for apply background worker. */
+ ApplyBgworkerStatus status;

"Status for" -> "Status of"

~~~

60. src/include/replication/worker_internal.h

+extern PGDLLIMPORT MemoryContext ApplyMessageContext;
+
+extern PGDLLIMPORT ApplyErrorCallbackArg apply_error_callback_arg;
+
+extern PGDLLIMPORT bool MySubscriptionValid;
+
+extern PGDLLIMPORT volatile ApplyBgworkerShared *MyParallelState;
+extern PGDLLIMPORT List *subxactlist;
+

I did not recognise the significance why are the last 2 externs
grouped togeth but the others are not.

~~~

61. src/include/replication/worker_internal.h

+/* prototype needed because of stream_commit */
+extern void apply_dispatch(StringInfo s);

61a.
I was unsure if this comment is useful to anyone...

61b.
If you decide to keep it, please use uppercase.

~~~

62. src/include/replication/worker_internal.h

+/* apply background worker setup and interactions */
+extern ApplyBgworkerState *apply_bgworker_find_or_start(TransactionId xid,
+ bool start);

Uppercase comment.

======

63.

I also did a quick check of all the new debug logging added. Here is
everyhing from patch v11-0001.

apply_bgworker_free:
+ elog(DEBUG1, "adding finished apply worker #%u for xid %u to the idle list",
+ wstate->pstate->n, wstate->pstate->stream_xid);

LogicalApplyBgwLoop:
+ elog(DEBUG1, "[Apply BGW #%u] ended processing streaming chunk,"
+ "waiting on shm_mq_receive", pst->n);

+ elog(DEBUG1, "[Apply BGW #%u] exiting", pst->n);

ApplyBgworkerMain:
+ elog(DEBUG1, "[Apply BGW #%u] started", pst->n);

apply_bgworker_setup:
+ elog(DEBUG1, "setting up apply worker #%u",
list_length(ApplyWorkersList) + 1);

apply_bgworker_set_status:
+ elog(DEBUG1, "[Apply BGW #%u] set status to %d", MyParallelState->n, status);

apply_bgworker_subxact_info_add:
+ elog(DEBUG1, "[Apply BGW #%u] defining savepoint %s",
+ MyParallelState->n, spname);

apply_handle_stream_prepare:
+ elog(DEBUG1, "received prepare for streamed transaction %u",
+ prepare_data.xid);

apply_handle_stream_start:
+ elog(DEBUG1, "starting streaming of xid %u", stream_xid);

apply_handle_stream_stop:
+ elog(DEBUG1, "stopped streaming of xid %u, %u changes streamed",
stream_xid, nchanges);

apply_handle_stream_abort:
+ elog(DEBUG1, "[Apply BGW #%u] aborting current transaction xid=%u, subxid=%u",
+ MyParallelState->n, GetCurrentTransactionIdIfAny(),
+ GetCurrentSubTransactionId());

+ elog(DEBUG1, "[Apply BGW #%u] rolling back to savepoint %s",
+ MyParallelState->n, spname);

apply_handle_stream_commit:
+ elog(DEBUG1, "received commit for streamed transaction %u", xid);

Observations:

63a.
Every new introduced message is at level DEBUG1 (not DEBUG). AFAIK
this is OK, because the messages are all protocol related and every
other existing debug message of the current replication worker.c was
also at the same DEBUG1 level.

63b.
The prefix "[Apply BGW #%u]" is used to indicate the bgworker is
executing the code, but it does not seem to be used 100% consistently
- e.g. there are some apply_bgworker_XXX functions not using this
prefix. Is that OK or a mistake?

------
Kind Regards,
Peter Smith.
Fujitsu Austrlia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Michael Paquier 2022-06-21 01:44:57 Re: Non-replayable WAL records through overflows and >MaxAllocSize lengths
Previous Message Kyotaro Horiguchi 2022-06-21 01:35:33 Re: [BUG] Panic due to incorrect missingContrecPtr after promotion