Re: Perform streaming logical transactions by background workers and parallel apply

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-05-05 05:45:36
Message-ID: CAHut+PsjTc2EHXp_OWK+hOyN6pckDJJccSgz_Hfu=228WpT=Gw@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Here are my review comments for v5-0001.

I will take a look at the v5-0002 (TAP) patch another time.

======

1. Commit message

The message still refers to "apply background". Should that say "apply
background worker"?

Other parts just call this the "worker". Personally, I think it might
be better to coin some new term for this thing (e.g. "apply-bgworker"
or something like that of your choosing) so then you can just
concisely *always* refer to that everywhere without any ambiguity. e.g
same applies to every comment and every message in this patch. They
should all use identical terminology (e.g. "apply-bgworker").

~~~

2. Commit message

"We also need to allow stream_stop to complete by the apply background
to finish it to..."

Wording: ???

~~~

3. Commit message

This patch also extends the subscription streaming option so that user
can control whether apply the streaming transaction in a apply
background or spill the change to disk.

Wording: "user" -> "the user"
Typo: "whether apply" -> "whether to apply"
Typo: "a apply" -> "an apply"

~~~

4. Commit message

User can set the streaming option to 'on/off', 'apply'. For now,
'apply' means the streaming will be applied via a apply background if
available. 'on' means the streaming transaction will be spilled to
disk.

I think "apply" might not be the best choice of values for this
meaning, but I think Hou-san already said [1] that this was being
reconsidered.

~~~

5. doc/src/sgml/catalogs.sgml - formatting

@@ -7863,11 +7863,15 @@ SCRAM-SHA-256$<replaceable>&lt;iteration
count&gt;</replaceable>:<replaceable>&l

<row>
<entry role="catalog_table_entry"><para role="column_definition">
- <structfield>substream</structfield> <type>bool</type>
+ <structfield>substream</structfield> <type>char</type>
</para>
<para>
- If true, the subscription will allow streaming of in-progress
- transactions
+ Controls how to handle the streaming of in-progress transactions.
+ <literal>f</literal> = disallow streaming of in-progress transactions
+ <literal>o</literal> = spill the changes of in-progress transactions to
+ disk and apply at once after the transaction is committed on the
+ publisher.
+ <literal>a</literal> = apply changes directly using a background worker
</para></entry>
</row>

Needs to be consistent with other value lists on this page.

5a. The first sentence to end with ":"

5b. List items to end with ","

~~~

6. doc/src/sgml/ref/create_subscription.sgml

+ <para>
+ If set to <literal>apply</literal> incoming
+ changes are directly applied via one of the background worker, if
+ available. If no background worker is free to handle streaming
+ transaction then the changes are written to a file and applied after
+ the transaction is committed. Note that if error happen when applying
+ changes in background worker, it might not report the finish LSN of
+ the remote transaction in server log.
</para>

6a. Typo: "one of the background worker," -> "one of the background workers,"

6b. Wording
BEFORE
Note that if error happen when applying changes in background worker,
it might not report the finish LSN of the remote transaction in server
log.
SUGGESTION
Note that if an error happens when applying changes in a background
worker, it might not report the finish LSN of the remote transaction
in the server log.

~~~

7. src/backend/commands/subscriptioncmds.c - defGetStreamingMode

+static char
+defGetStreamingMode(DefElem *def)
+{
+ /*
+ * If no parameter given, assume "true" is meant.
+ */
+ if (def->arg == NULL)
+ return SUBSTREAM_ON;

But is that right? IIUC all the docs said that the default is OFF.

~~~

8. src/backend/commands/subscriptioncmds.c - defGetStreamingMode

+ /*
+ * The set of strings accepted here should match up with the
+ * grammar's opt_boolean_or_string production.
+ */
+ if (pg_strcasecmp(sval, "true") == 0 ||
+ pg_strcasecmp(sval, "on") == 0)
+ return SUBSTREAM_ON;
+ if (pg_strcasecmp(sval, "apply") == 0)
+ return SUBSTREAM_APPLY;
+ if (pg_strcasecmp(sval, "false") == 0 ||
+ pg_strcasecmp(sval, "off") == 0)
+ return SUBSTREAM_OFF;

Perhaps should re-order these OFF/ON/APPLY to be consistent with the
T_Integer case above here.

~~~

9. src/backend/replication/logical/launcher.c - logicalrep_worker_launch

The "start new apply background worker ..." function comment feels a
bit misleading now that seems what you are calling this new kind of
worker. E.g. this is also called to start the sync worker. And also
for the apply worker (which we are not really calling a "background
worker" in other places). This comment is the same as [PSv4] #19.

~~~

10. src/backend/replication/logical/launcher.c - logicalrep_worker_launch

@@ -275,6 +280,9 @@ logicalrep_worker_launch(Oid dbid, Oid subid,
const char *subname, Oid userid,
int nsyncworkers;
TimestampTz now;

+ /* We don't support table sync in subworker */
+ Assert(!((subworker_dsm != DSM_HANDLE_INVALID) && OidIsValid(relid)));

I think you should declare a new variable like:
bool is_subworker = subworker_dsm != DSM_HANDLE_INVALID;

Then this Assert can be simplified, and also you can re-use the
'is_subworker' later multiple times in this same function to simplify
lots of other code also.

~~~

11. src/backend/replication/logical/launcher.c - logicalrep_worker_stop_internal

+/*
+ * Workhorse for logicalrep_worker_stop() and logicalrep_worker_detach(). Stop
+ * the worker and wait for wait for it to die.
+ */
+static void
+logicalrep_worker_stop_internal(LogicalRepWorker *worker)

Typo: "wait for" is repeated 2x.

~~~

12. src/backend/replication/logical/origin.c - replorigin_session_setup

@@ -1110,7 +1110,11 @@ replorigin_session_setup(RepOriginId node)
if (curstate->roident != node)
continue;

- else if (curstate->acquired_by != 0)
+ /*
+ * We allow the apply worker to get the slot which is acquired by its
+ * leader process.
+ */
+ else if (curstate->acquired_by != 0 && acquire)

I still feel this is overly-cofusing. Shouldn't comment say "Allow the
apply bgworker to get the slot...".

Also the parameter name 'acquire' is hard to reconcile with the
comment. E.g. I feel all this would be easier to understand if the
param was was refactored with a name like 'bgworker' and the code was
changed to:
else if (curstate->acquired_by != 0 && !bgworker)

Of course, the value true/false would need to be flipped on calls too.
This is the same as my previous comment [PSv4] #26.

~~~

13. src/backend/replication/logical/proto.c

@@ -1138,14 +1138,11 @@ logicalrep_write_stream_commit(StringInfo out,
ReorderBufferTXN *txn,
/*
* Read STREAM COMMIT from the output stream.
*/
-TransactionId
+void
logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
{
- TransactionId xid;
uint8 flags;

- xid = pq_getmsgint(in, 4);
-
/* read flags (unused for now) */
flags = pq_getmsgbyte(in);

There is something incompatible with the read/write functions here.
The write writes the txid before the flags, but the read_commit does
not read it at all – if only reads the flags (???) if this is really
correct then I think there need to be some comments to explain WHY it
is correct.

NOTE: See also review comment 28 where I proposed another way to write
this code.

~~~

14. src/backend/replication/logical/worker.c - comment

The whole comment is similar to the commit message so any changes
there should be made here also.

~~~

15. src/backend/replication/logical/worker.c - ParallelState

+/*
+ * Shared information among apply workers.
+ */
+typedef struct ParallelState

It looks like there is already another typedef called "ParallelState"
because it is already in the typedefs.list. Maybe this name should be
changed or maybe make it static or something?

~~~

16. src/backend/replication/logical/worker.c - defines

+/*
+ * States for apply background worker.
+ */
+#define APPLY_BGWORKER_ATTACHED 'a'
+#define APPLY_BGWORKER_READY 'r'
+#define APPLY_BGWORKER_BUSY 'b'
+#define APPLY_BGWORKER_FINISHED 'f'
+#define APPLY_BGWORKER_EXIT 'e'

Those char states all look independent. So wouldn’t this be
represented better as an enum to reinforce that fact?

~~~

17. src/backend/replication/logical/worker.c - functions

+/* Worker setup and interactions */
+static WorkerState *apply_bgworker_setup(void);
+static WorkerState *find_or_start_apply_bgworker(TransactionId xid,
+ bool start);

Maybe rename to apply_bgworker_find_or_start() to match the pattern of
the others?

~~~

18. src/backend/replication/logical/worker.c - macros

+#define am_apply_bgworker() (MyLogicalRepWorker->subworker)
+#define applying_changes_in_bgworker() (in_streamed_transaction &&
stream_apply_worker != NULL)

18a. Somehow I felt these are not in the best place.
- Maybe am_apply_bgworker() should be in worker_internal.h?
- Maybe the applying_changes_in_bgworker() should be nearby the
stream_apply_worker declaration

18b. Maybe applying_changes_in_bgworker should be renamed to something
else to match the pattern of the others (e.g. "apply_bgworker_active"
or something)

~~~

19. src/backend/replication/logical/worker.c - handle_streamed_transaction

+ /*
+ * If we decided to apply the changes of this transaction in a apply
+ * background worker, pass the data to the worker.
+ */

Typo: "in a apply" -> "in an apply"

~~~

20. src/backend/replication/logical/worker.c - handle_streamed_transaction

+ /*
+ * XXX The publisher side doesn't always send relation update message
+ * after the streaming transaction, so update the relation in main
+ * apply worker here.
+ */

Wording: "doesn't always send relation update message" -> "doesn't
always send relation update messages" ??

~~~

21. src/backend/replication/logical/worker.c - apply_handle_commit_prepared

+ apply_bgworker_set_state(APPLY_BGWORKER_FINISHED);

It seems somewhat confusing to see calls to apply_bgworker_set_state()
when we may or may not even be an apply bgworker.

I know it adds more code, but I somehow feel it is more readable if
all these calls were changed to look below. Please consider it.

SUGGESTION
if (am_bgworker())
apply_bgworker_set_state(XXX);

Then you can also change the apply_bgworker_set_state to
Assert(am_apply_bgworker());

~~~

22. src/backend/replication/logical/worker.c - find_or_start_apply_bgworker

+
+ if (!start && ApplyWorkersHash == NULL)
+ return NULL;
+

IIUC maybe this extra check is not really necessary. I see no harm to
create the HashTable even if was called in this state. If the 'start'
flag is false then nothing is going to be found anyway, so it will
return NULL. e.g. Might as well make the code a few lines
shorter/simpler by removing this check.

~~~

23. src/backend/replication/logical/worker.c - apply_bgworker_free

+/*
+ * Add the worker to the freelist and remove the entry from hash table.
+ */
+static void
+apply_bgworker_free(WorkerState *wstate)
+{
+ bool found;
+ MemoryContext oldctx;
+ TransactionId xid = wstate->pstate->stream_xid;

If you are not going to check the value of 'found' then why bother to
pass this param at all? Can't you just pass NULL?

~~~

24. src/backend/replication/logical/worker.c - apply_bgworker_free

Should there be an Assert that the bgworker state really was FINISHED?
I think I asked this already [PSv4] #48.

~~~

24. src/backend/replication/logical/worker.c - apply_handle_stream_start

@@ -1088,24 +1416,71 @@ apply_handle_stream_prepare(StringInfo s)
logicalrep_read_stream_prepare(s, &prepare_data);
set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);

- elog(DEBUG1, "received prepare for streamed transaction %u",
prepare_data.xid);
+ /*
+ * If we are in a bgworker, just prepare the transaction.
+ */
+ if (am_apply_bgworker())

Don’t need to say "If we are..." because the am_apply_worker()
condition makes it clear this is true.

~~~

25. src/backend/replication/logical/worker.c - apply_handle_stream_start

- if (MyLogicalRepWorker->stream_fileset == NULL)
+ stream_apply_worker = find_or_start_apply_bgworker(stream_xid, first_segment);
+
+ if (applying_changes_in_bgworker())
{

IIUC this condition seems overkill. I think you can just say if
(stream_apply_worker)

~~~

26. src/backend/replication/logical/worker.c - apply_handle_stream_abort

+ if (found)
+ {
+ elog(LOG, "rolled back to savepoint %s", spname);
+ RollbackToSavepoint(spname);
+ CommitTransactionCommand();
+ subxactlist = list_truncate(subxactlist, i + 1);
+ }

Should that elog use the "[Apply BGW #%u]" format like the others for BGW?

~~~

27. src/backend/replication/logical/worker.c - apply_handle_stream_abort

Should this function be setting stream_apply_worker = NULL somewhere
when all is done?

~~~

28. src/backend/replication/logical/worker.c - apply_handle_stream_commit

+/*
+ * Handle STREAM COMMIT message.
+ */
+static void
+apply_handle_stream_commit(StringInfo s)
+{
+ LogicalRepCommitData commit_data;
+ TransactionId xid;
+
+ if (in_streamed_transaction)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("STREAM COMMIT message without STREAM STOP")));
+
+ xid = pq_getmsgint(s, 4);
+ logicalrep_read_stream_commit(s, &commit_data);
+ set_apply_error_context_xact(xid, commit_data.commit_lsn);

There is something a bit odd about this code. I think the
logicalrep_read_stream_commit() should take another param and the Txid
be extracted/read only INSIDE that logicalrep_read_stream_commit
function. See also review comment #13.

~~~

29. src/backend/replication/logical/worker.c - apply_handle_stream_commit

I am unsure, but should something be setting the stream_apply_worker =
NULL somewhere when all is done?

~~~

30. src/backend/replication/logical/worker.c - LogicalApplyBgwLoop

30a.
+ if (shmq_res != SHM_MQ_SUCCESS)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("lost connection to the main apply worker")));

30b.
+ default:
+ elog(ERROR, "unexpected message");
+ break;

Should both those error messages have the "[Apply BGW #%u]" prefix
like the other BGW messages?

~~~

31. src/backend/replication/logical/worker.c - ApplyBgwShutdown

+/*
+ * Set the failed flag so that the main apply worker can realize we have
+ * shutdown.
+ */
+static void
+ApplyBgwShutdown(int code, Datum arg)

The comment does not seem to be in sync with the code. E.g.
Wording: "failed flag" -> "exit state" ??

~~~

32. src/backend/replication/logical/worker.c - ApplyBgwShutdown

+/*
+ * Set the failed flag so that the main apply worker can realize we have
+ * shutdown.
+ */
+static void
+ApplyBgwShutdown(int code, Datum arg)

If the 'code' param is deliberately unused it might be better to say
so in the comment...

~~~

33. src/backend/replication/logical/worker.c - LogicalApplyBgwMain

33a.
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("unable to map dynamic shared memory segment")));

33b.
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("bad magic number in dynamic shared memory segment")));
+

33c.
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription %u will not "
+ "start because the subscription was removed during startup",
+ MyLogicalRepWorker->subid)));

Should all these messages have "[Apply BGW ?]" prefix even though they
are not yet attached?

~~~

34. src/backend/replication/logical/worker.c - setup_dsm

+ * We need one key to register the location of the header, and we need
+ * nworkers keys to track the locations of the message queues.
+ */

This comment about 'nworkers' seems stale because that variable no
longer exists.

~~~

35. src/backend/replication/logical/worker.c - apply_bgworker_setup

+/*
+ * Start apply worker background worker process and allocat shared memory for
+ * it.
+ */
+static WorkerState *
+apply_bgworker_setup(void)

typo: "allocat" -> "allocate"

~~~

36. src/backend/replication/logical/worker.c - apply_bgworker_setup

+ elog(LOG, "setting up apply worker #%u", list_length(ApplyWorkersList) + 1)

Should this message have the standard "[Apply BGW %u]" pattern?

~~~

37. src/backend/replication/logical/worker.c - apply_bgworker_setup

+ if (launched)
+ {
+ /* Wait for worker to become ready. */
+ apply_bgworker_wait_for(wstate, APPLY_BGWORKER_ATTACHED);
+
+ ApplyWorkersList = lappend(ApplyWorkersList, wstate);
+ }

Since there is a state APPLY_BGWORKER_READY I think either this
comment is wrong or this passed parameter ATTACHED must be wrong.

~~~

38. src/backend/replication/logical/worker.c - apply_bgworker_send_data

+ if (result != SHM_MQ_SUCCESS)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not send tuples to shared-memory queue")));
+}

Wording: Is it right to ocall these "tuples" or better just say
"data"? I am not sure. Already asked this in [PSv4] #68

~~~

39. src/backend/replication/logical/worker.c - apply_bgworker_wait_for

+/*
+ * Wait until the state of apply background worker reach the 'wait_for_state'
+ */
+static void
+apply_bgworker_wait_for(WorkerState *wstate, char wait_for_state)

typo: "reach" -> "reaches"

~~~

40. src/backend/replication/logical/worker.c - apply_bgworker_wait_for

+ /* If the worker is ready, we have succeeded. */
+ SpinLockAcquire(&wstate->pstate->mutex);
+ status = wstate->pstate->state;
+ SpinLockRelease(&wstate->pstate->mutex);
+
+ if (status == wait_for_state)
+ break;

40a. What does this mention "ready". This function might be waiting
for a different state to that.

40b. Anyway, I think this comment should be a few lines lower, above
the if (status == wait_for_state)

~~~

41. src/backend/replication/logical/worker.c - apply_bgworker_wait_for

+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("Background worker %u failed to apply transaction %u",
+ wstate->pstate->n, wstate->pstate->stream_xid)));

Should this message have the standard "[Apply BGW %u]" pattern?

~~~

42. src/backend/replication/logical/worker.c - check_workers_status

+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("Background worker %u exited unexpectedly",
+ wstate->pstate->n)));

Should this message have the standard "[Apply BGW %u]" pattern? Or if
this is just from Apply worker maybe it should be clearer like "Apply
worker detected apply bgworker %u exited unexpectedly".

~~~

43. src/backend/replication/logical/worker.c - check_workers_status

+ ereport(LOG,
+ (errmsg("logical replication apply workers for subscription \"%s\"
will restart",
+ MySubscription->name),
+ errdetail("Cannot start table synchronization while bgworkers are "
+ "handling streamed replication transaction")));

I am not sure, but isn't the message backwards? e.g. Should it say more like:
"Cannot handle streamed transactions using bgworkers while table
synchronization is still in progress".

~~~

44. src/backend/replication/logical/worker.c - apply_bgworker_set_state

+ elog(LOG, "[Apply BGW #%u] set state to %c",
+ MyParallelState->n, state);

The line wrapping seemed overkill here.

~~~

45. src/backend/utils/activity/wait_event.c

@@ -388,6 +388,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT:
event_name = "HashGrowBucketsReinsert";
break;
+ case WAIT_EVENT_LOGICAL_APPLY_WORKER_READY:
+ event_name = "LogicalApplyWorkerReady";
+ break;

I am not sure this is the best name for this event since the only
place it is used (in apply_bgworker_wait_for) is not only waiting for
READY state. Maybe a name like WAIT_EVENT_LOGICAL_APPLY_BGWORKER or
WAIT_EVENT_LOGICAL_APPLY_WORKER_SYNC would be more appropriate? Need
to change the wait_event.h also.

~~~

46. src/include/catalog/pg_subscription.h

+/* Disallow streaming in-progress transactions */
+#define SUBSTREAM_OFF 'f'
+
+/*
+ * Streaming transactions are written to a temporary file and applied only
+ * after the transaction is committed on upstream.
+ */
+#define SUBSTREAM_ON 'o'
+
+/* Streaming transactions are appied immediately via a background worker */
+#define SUBSTREAM_APPLY 'a'

46a. There is not really any overarching comment that associates these
#defines back to the new 'stream' field so you are just supposed to
guess that's what they are for?

46b. I also feel that using 'o' for ON is not consistent with the 'f'
of OFF. IMO better to use 't/f' for true/false instead of 'o/f'. Also
don't forget update docs, pg_dump.c etc.

46c. Typo: "appied" -> "applied"

~~~~

47. src/test/regress/expected/subscription.out - missting test

Missing some test cases for all new option values? E.g. Where is the
test using streaming value is set to 'apply'. Same comment as [PSv4]
#81

------
[1] https://www.postgresql.org/message-id/OS0PR01MB5716E8D536552467EFB512EF94FC9%40OS0PR01MB5716.jpnprd01.prod.outlook.com
[PSv4] https://www.postgresql.org/message-id/CAHut%2BPuqYP5eD5wcSCtk%3Da6KuMjat2UCzqyGoE7sieCaBsVskQ%40mail.gmail.com

Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Bharath Rupireddy 2022-05-05 06:41:27 Re: Progress report removal of temp files and temp relation files using ereport_startup_progress
Previous Message Amit Kapila 2022-05-05 04:12:32 Re: Skipping schema changes in publication