Re: Perform streaming logical transactions by background workers and parallel apply

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-10-12 10:10:47
Message-ID: CAHut+PvxL8tJ2ZUpEjkbRFe6qKSH+r54BQ7wM8p=335tUbuXbg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Here are some review comments for v36-0001.

======

1. GENERAL

Houzj wrote ([1] #3a):
The word 'streaming' is the same as the actual option name, so
personally I think it's fine. But if others also agreed that the name
can be improved, I can change it.

~

Sure, I was not really complaining that the name is "wrong". Only I
did not think it was a good idea to have multiple struct members
called 'streaming' when they don't have the same meaning. e.g. one is
the internal character mode equivalent of the parameter, and one is
the parameter value as a string. That's why I thought they should be
different names. e.g. Make the 2nd one 'streaming_valstr' or
something.

======

2. doc/src/sgml/config.sgml

Previously I suggested there should be xrefsto the "Configuration
Settings" page but Houzj wrote ([1] #4):
Not sure about this as we don't have similar thing in the document of
max_logical_replication_workers and max_sync_workers_per_subscription.

~

Fair enough, but IMO perhaps all those others should also xref to the
"Configuration Settings" chapter. So if such a change does not belong
in this patch, then how about if I make another independent thread to
post this suggestion?

======

.../replication/logical/applyparallelworker.c

3. parallel_apply_find_worker

+parallel_apply_find_worker(TransactionId xid)
+{
+ bool found;
+ ParallelApplyWorkerEntry *entry = NULL;
+
+ if (!TransactionIdIsValid(xid))
+ return NULL;
+
+ if (ParallelApplyWorkersHash == NULL)
+ return NULL;
+
+ /* Return the cached parallel apply worker if valid. */
+ if (stream_apply_worker != NULL)
+ return stream_apply_worker;
+
+ /*
+ * Find entry for requested transaction.
+ */
+ entry = hash_search(ParallelApplyWorkersHash, &xid, HASH_FIND, &found);

In function parallel_apply_start_worker() you removed the entry
assignment to NULL because it is never needed. Can do the same here
too.

~~~

4. parallel_apply_free_worker

+/*
+ * Remove the parallel apply worker entry from the hash table. And stop the
+ * worker if there are enough workers in the pool. For more information about
+ * the worker pool, see comments atop worker.c.
+ */
+void
+parallel_apply_free_worker(ParallelApplyWorkerInfo *winfo, TransactionId xid)

"And stop" -> "Stop"

~~~

5. parallel_apply_free_worker

+ * Although some error messages may be lost in rare scenarios, but
+ * since the parallel apply worker has finished processing the
+ * transaction, and error messages may be lost even if we detach the
+ * error queue after terminating the process. So it should be ok.
+ */

SUGGESTION (minor rewording)
Some error messages may be lost in rare scenarios, but it should be OK
because the parallel apply worker has finished processing the
transaction, and error messages may be lost even if we detached the
error queue after terminating the process.

~~~

6. LogicalParallelApplyLoop

+ for (;;)
+ {
+ void *data;
+ Size len;
+ int c;
+ StringInfoData s;
+ MemoryContext oldctx;
+
+ CHECK_FOR_INTERRUPTS();
+
+ /* Ensure we are reading the data into our memory context. */
+ oldctx = MemoryContextSwitchTo(ApplyMessageContext);
+
...
+
+ MemoryContextSwitchTo(oldctx);
+ MemoryContextReset(ApplyMessageContext);
+ }

Do those memory context switches need to happen inside the for(;;)
loop like that? I thought perhaps those can be done *outside* of the
loop instead of always switching and switching back on the next
iteration.

~~~

7. LogicalParallelApplyLoop

Previous I suggested maybe the name (e.g. the 2nd param) should be
changed to "ParallelApplyMessageContext"? Houzj wrote ([1] #13): Not
sure about this, because ApplyMessageContext is used in both worker.c
and applyparallelworker.c.

~

But I thought those are completely independent ApplyMessageContext's
in different processes that happen to have the same name. Shouldn't
they have a name appropriate to who owns them?

~~~

8. ParallelApplyWorkerMain

+ /*
+ * Allocate the origin name in a long-lived context for error context
+ * message.
+ */
+ snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);

Now that ReplicationOriginNameForLogicalRep patch is pushed [2] please
make use of this common function.

~~~

9. HandleParallelApplyMessage

+ case 'X': /* Terminate, indicating clean exit */
+ {
+ shm_mq_detach(winfo->error_mq_handle);
+ winfo->error_mq_handle = NULL;
+ break;
+ }
+
+ /*
+ * Don't need to do anything about NoticeResponse and
+ * NotifyResponse as the logical replication worker doesn't need
+ * to send messages to the client.
+ */
+ case 'N':
+ case 'A':
+ break;
+ default:
+ {
+ elog(ERROR, "unrecognized message type received from parallel apply
worker: %c (message length %d bytes)",
+ msgtype, msg->len);
+ }

9a. case 'X':
There are no variable declarations here so the statement block {} is not needed

~

9b. default:
There are no variable declarations here so the statement block {} is not needed

~~~

10. parallel_apply_stream_abort

+ int i;
+ bool found = false;
+ char spname[MAXPGPATH];
+
+ parallel_apply_savepoint_name(MySubscription->oid, subxid, spname,
+ sizeof(spname));

I posted about using NAMEDATALEN in a previous review ([3] #21) but I
think only one place was fixed and this one was missed.

~~~

11. parallel_apply_replorigin_setup

+ snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
+ originid = replorigin_by_name(originname, false);
+ replorigin_session_setup(originid);
+ replorigin_session_origin = originid;

Same as #8. Please call the new ReplicationOriginNameForLogicalRep function.

======

src/backend/replication/logical/launcher.c

12. logicalrep_worker_launch

Previously I suggested may the apply process name should change

FROM
"logical replication worker for subscription %u"
TO
"logical replication apply worker for subscription %u"

and Houz wrote ([1] #13)
I am not sure if it's a good idea to change existing process description.

~

But that seems inconsistent to me because elsewhere this patch is
already exposing the name to the user (like when it says "logical
replication apply worker for subscription \"%s\" has started".
Shouldn’t the process name match these logs?

======

src/backend/replication/logical/worker.c

13. apply_handle_stream_start

+ *
+ * XXX We can avoid sending pairs of the START messages to the parallel worker
+ * because unlike apply worker it will process only one transaction-at-a-time.
+ * However, it is not clear whether that is worth the effort because it is sent
+ * after logical_decoding_work_mem changes.
*/
static void
apply_handle_stream_start(StringInfo s)

13a.
"transaction-at-a-time." -> "transaction at a time."

~

13b.
I was not sure what does that last sentence mean? Does it mean something like:
"However, it is not clear whether doing this is worth the effort
because pairs of START messages occur only after
logical_decoding_work_mem changes."

~~~

14. apply_handle_stream_start

+ ParallelApplyWorkerInfo *winfo = NULL;

The declaration *winfo assignment to NULL is not needed because
get_transaction_apply_action will always do this anyway.

~~~

15. apply_handle_stream_start

+
+ case TRANS_PARALLEL_APPLY:
+ break;

I had previously suggested this include a comment explaining why there
is nothing to do ([3] #44), but I think there was no reply.

~~~

16. apply_handle_stream_stop

apply_handle_stream_stop(StringInfo s)
{
+ ParallelApplyWorkerInfo *winfo = NULL;
+ TransApplyAction apply_action

The declaration *winfo assignment to NULL is not needed because
get_transaction_apply_action will always do this anyway.

~~~

17. serialize_stream_abort

+ ParallelApplyWorkerInfo *winfo = NULL;
+ TransApplyAction apply_action;

The declaration *winfo assignment to NULL is not needed because
get_transaction_apply_action will always do this anyway.

~~~

18. apply_handle_stream_commit

LogicalRepCommitData commit_data;
+ ParallelApplyWorkerInfo *winfo = NULL;
+ TransApplyAction apply_action;

The declaration *winfo assignment to NULL is not needed because
get_transaction_apply_action will always do this anyway.

~~~

19. ApplyWorkerMain

+
+/* Logical Replication Apply worker entry point */
+void
+ApplyWorkerMain(Datum main_arg)

Previously I suugested changing "Apply worker" to "apply worker", and
Houzj ([1] #48) replied:
Since it's the existing comment, I feel we can leave this.

~

Normally I agree don't change the original code unrelated to the
patch, but in practice, I think no patch would be accepted that just
changes just "A" to "a", so if you don't change it here in this patch
to be consistent then it will never happen. That's why I think should
be part of this patch.

~~~

20. ApplyWorkerMain

+ /*
+ * We don't currently need any ResourceOwner in a walreceiver process, but
+ * if we did, we could call CreateAuxProcessResourceOwner here.
+ */

Previously I suggested prefixing this as "XXX" and Houzj replied ([1] #48):
I am not sure as this comment is just a reminder.

~

OK, then maybe since it is a reminder "Note" then it should be changed:
"We don't currently..." -> "Note: We don't currently..."

~~~

21. ApplyWorkerMain

+ if (server_version >= 160000 &&
+ MySubscription->stream == SUBSTREAM_PARALLEL)
+ {
+ options.proto.logical.streaming = pstrdup("parallel");
+ MyLogicalRepWorker->parallel_apply = true;
+ }
+ else if (server_version >= 140000 &&
+ MySubscription->stream != SUBSTREAM_OFF)
+ {
+ options.proto.logical.streaming = pstrdup("on");
+ MyLogicalRepWorker->parallel_apply = false;
+ }
+ else
+ {
+ options.proto.logical.streaming = NULL;
+ MyLogicalRepWorker->parallel_apply = false;
+ }

I think the block of if/else is only for assigning the
streaming/parallel members so should have some comment to say that:

SUGGESTION
Assign the appropriate streaming flag according to the 'streaming'
mode and the publisher's ability to support that mode.

~~~

22. get_transaction_apply_action

+static TransApplyAction
+get_transaction_apply_action(TransactionId xid,
ParallelApplyWorkerInfo **winfo)
+{
+ *winfo = NULL;
+
+ if (am_parallel_apply_worker())
+ {
+ return TRANS_PARALLEL_APPLY;
+ }
+ else if (in_remote_transaction)
+ {
+ return TRANS_LEADER_APPLY;
+ }
+
+ /*
+ * Check if we are processing this transaction using a parallel apply
+ * worker and if so, send the changes to that worker.
+ */
+ else if ((*winfo = parallel_apply_find_worker(xid)))
+ {
+ return TRANS_LEADER_SEND_TO_PARALLEL;
+ }
+ else
+ {
+ return TRANS_LEADER_SERIALIZE;
+ }
+}

22a.

Previously I suggested the statement blocks are overkill and all the
{} should be removed, and Houzj ([1] #52a) wrote:
I feel this style is fine.

~

Sure, it is fine, but FWIW I thought it is not the normal PG coding
convention to use unnecessary {} unless it would seem strange to omit
them.

~~

22b.
Also previously I had suggested

> Can a tablesync worker ever get here? It might be better to
> Assert(!am_tablesync_worker()); at top of this function?

and Houzj ([1] #52b) replied:
Not sure if it's necessary or not.

~

OTOH you could say no Assert is ever really necessary, but IMO adding
one here would at least be a sanity check and help to document the
function better.

======

23. src/test/regress/sql/subscription.sql

Previously I mentioned testing the 'streaming' option with no value.
Houzj replied ([1]
I didn't find similar tests for no value explicitly specified cases,
so I didn't add this for now.

But as I also responded ([4] #58) already to Amit:
IMO this one is a bit different because it's not really a boolean
option anymore - it's a kind of a hybrid boolean/enum. That's why I
thought this ought to be tested regardless if there are existing tests
for the (normal) boolean options.

Anyway, you can decide what you want.

------
[1] Houzj replies to my v35 review
https://www.postgresql.org/message-id/OS0PR01MB5716B400CD81565E868616DB945F9%40OS0PR01MB5716.jpnprd01.prod.outlook.com
[2] ReplicationOriginNameForLogicalRep
https://github.com/postgres/postgres/commit/776e1c8a5d1494e345e5e1b16a5eba5e98aaddca
[3] My review v35
https://www.postgresql.org/message-id/CAHut%2BPvFENKb5fcMko5HHtNEAaZyNwGhu3PASrcBt%2BHFoFL%3DFw%40mail.gmail.com
[4] Explaining some v35 review comments
https://www.postgresql.org/message-id/CAHut%2BPscac%2BipFSFx89ACmacjPe4Dn%3DqVq8T0V%3DnQkv38QgnBw%40mail.gmail.com

Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message vignesh C 2022-10-12 10:46:19 Re: hash_xlog_split_allocate_page: failed to acquire cleanup lock
Previous Message David Rowley 2022-10-12 09:37:17 Re: slab allocator performance issues