Re: Perform streaming logical transactions by background workers and parallel apply

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
Cc: Peter Smith <smithpb2250(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-08-09 11:00:03
Message-ID: CAA4eK1LDFKaWsBwr6wsaXEAgjaxhqmEzX4JtVspyPyroX6o38g@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Thu, Aug 4, 2022 at 12:10 PM wangw(dot)fnst(at)fujitsu(dot)com
<wangw(dot)fnst(at)fujitsu(dot)com> wrote:
>
> On Mon, Jul 25, 2022 at 21:50 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
> > Few comments on 0001:
> > ======================
>
> Thanks for your comments.
>

Review comments on v20-0001-Perform-streaming-logical-transactions-by-backgr
===============================================================
1.
+ <para>
+ If set to <literal>on</literal>, the incoming changes are written to
+ temporary files and then applied only after the transaction is
+ committed on the publisher.

It is not very clear that the transaction is applied when the commit
is received by the subscriber. Can we slightly change it to: "If set
to <literal>on</literal>, the incoming changes are written to
temporary files and then applied only after the transaction is
committed on the publisher and received by the subscriber."

2.
/* First time through, initialize apply workers hashtable */
+ if (ApplyBgworkersHash == NULL)
+ {
+ HASHCTL ctl;
+
+ MemSet(&ctl, 0, sizeof(ctl));
+ ctl.keysize = sizeof(TransactionId);
+ ctl.entrysize = sizeof(ApplyBgworkerEntry);
+ ctl.hcxt = ApplyContext;
+
+ ApplyBgworkersHash = hash_create("logical apply workers hash", 8, &ctl,
+ HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);

I think it would be better if we start with probably 16 element hash
table, 8 seems to be on the lower side.

3.
+/*
+ * Try to look up worker assigned before (see function apply_bgworker_get_free)
+ * inside ApplyBgworkersHash for requested xid.
+ */
+ApplyBgworkerState *
+apply_bgworker_find(TransactionId xid)

The above comment is not very clear. There doesn't seem to be any
function named apply_bgworker_get_free in the patch. Can we write this
comment as: "Find the previously assigned worker for the given
transaction, if any."

4.
/*
+ * Push apply error context callback. Fields will be filled applying a
+ * change.
+ */

/Fields will be filled applying a change./Fields will be filled while
applying a change.

5.
+void
+ApplyBgworkerMain(Datum main_arg)
+{
...
...
+ StartTransactionCommand();
+ oldcontext = MemoryContextSwitchTo(ApplyContext);
+
+ MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
+ if (!MySubscription)
+ {
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription %u will not "
+ "start because the subscription was removed during startup",
+ MyLogicalRepWorker->subid)));
+ proc_exit(0);
+ }
+
+ MySubscriptionValid = true;
+ MemoryContextSwitchTo(oldcontext);
+
+ /* Setup synchronous commit according to the user's wishes */
+ SetConfigOption("synchronous_commit", MySubscription->synccommit,
+ PGC_BACKEND, PGC_S_OVERRIDE);
+
+ /* Keep us informed about subscription changes. */
+ CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
+ subscription_change_cb,
+ (Datum) 0);
+
+ CommitTransactionCommand();
...

This part appears of the code appears to be the same as we have in
ApplyWorkerMain() except that the patch doesn't check whether the
subscription is enabled. Is there a reason to not have that check here
as well? Then in ApplyWorkerMain(), we do LOG the type of worker that
is also missing here. Unless there is a specific reason to have a
different code here, we should move this part to a common function and
call it both from ApplyWorkerMain() and ApplyBgworkerMain().

6. I think the code in ApplyBgworkerMain() to set
session_replication_role, search_path, and connect to the database
also appears to be the same in ApplyWorkerMain(). If so, that can also
be moved to the common function mentioned in the previous point.

7. I think we need to register for subscription rel map invalidation
(invalidate_syncing_table_states) in ApplyBgworkerMain similar to
ApplyWorkerMain. The reason is that we check the table state after
processing a commit or similar change record via a call to
process_syncing_tables.

8. In apply_bgworker_setup_dsm(), we should have handling related to
dsm_create failure due to max_segments reached as we have in
InitializeParallelDSM(). We can follow the regular path of streaming
transactions in case we are not able to create DSM instead of
parallelizing it.

9.
+ shm_toc_initialize_estimator(&e);
+ shm_toc_estimate_chunk(&e, sizeof(ApplyBgworkerShared));
+ shm_toc_estimate_chunk(&e, (Size) queue_size);
+
+ shm_toc_estimate_keys(&e, 1 + 1);

Here, you can directly write 2 instead of (1 + 1) stuff. It is quite
clear that we need two keys here.

10.
apply_bgworker_wait_for()
{
...
+ /* Wait to be signalled. */
+ WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
+ WAIT_EVENT_LOGICAL_APPLY_BGWORKER_STATE_CHANGE);
...
}

Typecast with the void, if we don't care for the return value.

11.
+static void
+apply_bgworker_shutdown(int code, Datum arg)
+{
+ SpinLockAcquire(&MyParallelShared->mutex);
+ MyParallelShared->status = APPLY_BGWORKER_EXIT;
+ SpinLockRelease(&MyParallelShared->mutex);

Is there a reason to not use apply_bgworker_set_status() directly?

12.
+ * Special case is if the first change comes from subtransaction, then
+ * we check that current_xid differs from stream_xid.
+ */
+void
+apply_bgworker_subxact_info_add(TransactionId current_xid)
+{
+ if (current_xid != stream_xid &&
+ !list_member_int(subxactlist, (int) current_xid))
...
...

I don't understand the above comment. Does that mean we don't need to
define a savepoint if the first change is from a subtransaction? Also,
keep an empty line before the above comment.

13.
+void
+apply_bgworker_subxact_info_add(TransactionId current_xid)
+{
+ if (current_xid != stream_xid &&
+ !list_member_int(subxactlist, (int) current_xid))
+ {
+ MemoryContext oldctx;
+ char spname[MAXPGPATH];
+
+ snprintf(spname, MAXPGPATH, "savepoint_for_xid_%u", current_xid);

To uniquely generate the savepoint name, it is better to append the
subscription id as well? Something like pg_sp_<subid>_<xid>.

14. The CommitTransactionCommand() call in
apply_bgworker_subxact_info_add looks a bit odd as that function
neither seems to be starting the transaction command nor has any
comments explaining it. Shall we do it in caller where it is more
apparent to do the same?

15.
else
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication worker for subscription %u", subid);
+
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");

Spurious new line

16.
@@ -1153,7 +1162,14 @@ replorigin_session_setup(RepOriginId node)

Assert(session_replication_state->roident != InvalidRepOriginId);

- session_replication_state->acquired_by = MyProcPid;
+ if (must_acquire)
+ session_replication_state->acquired_by = MyProcPid;
+ else if (session_replication_state->acquired_by == 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("apply background worker could not find replication state
slot for replication origin with OID %u",
+ node),
+ errdetail("There is no replication state slot set by its main apply
worker.")));

It is not a good idea to give apply workers specific messages from
this API because I don't think we can assume this is used by only
apply workers. It seems to me that if 'must_acquire' is false, then we
should either give elog(ERROR, ..) or there should be an Assert for
the same. I am not completely sure but maybe we can request the caller
to supply the PID (which already has acquired this origin) in case
must_acquire is false and then use it in Assert/elog to ensure the
correct usage of API. What do you think?

17. The commit message can explain the abort-related new information
this patch sends to the subscribers.

18.
+ * In streaming case (receiving a block of streamed transaction), for
+ * SUBSTREAM_ON mode, simply redirect it to a file for the proper toplevel
+ * transaction, and for SUBSTREAM_PARALLEL mode, send the changes to apply
+ * background workers (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE changes
+ * will also be applied in main apply worker).

In this, part of the comment "(LOGICAL_REP_MSG_RELATION or
LOGICAL_REP_MSG_TYPE changes will also be applied in main apply
worker)" is not very clear. Do you mean to say that these messages are
applied by both main and background apply workers, if so, then please
state the same explicitly?

19.
- /* not in streaming mode */
- if (!in_streamed_transaction)
+ /* Not in streaming mode */
+ if (!(in_streamed_transaction || am_apply_bgworker()))
...
...
- /* write the change to the current file */
+ /* Write the change to the current file */
stream_write_change(action, s);

I don't see the need to change the above comments.

20.
static bool
handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
{
...
...
+ if (am_apply_bgworker())
+ {
+ /* Define a savepoint for a subxact if needed. */
+ apply_bgworker_subxact_info_add(current_xid);
+
+ return false;
+ }
+
+ if (apply_bgworker_active())

Isn't it better to use else if in the above code and probably else for
the remaining part of code in this function?

--
With Regards,
Amit Kapila.

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Michael Paquier 2022-08-09 11:04:59 Re: Fix inconsistencies GUC categories
Previous Message r.zharkov 2022-08-09 10:59:33 Re: Checking pgwin32_is_junction() errors