RE: Perform streaming logical transactions by background workers and parallel apply

From: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
To: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
Cc: Peter Smith <smithpb2250(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-08-11 07:47:59
Message-ID: OS0PR01MB57160A440C9821C48ABD4D9C94649@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Tuesday, August 9, 2022 7:00 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>
> On Thu, Aug 4, 2022 at 12:10 PM wangw(dot)fnst(at)fujitsu(dot)com
> <wangw(dot)fnst(at)fujitsu(dot)com> wrote:
> >
> > On Mon, Jul 25, 2022 at 21:50 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
> wrote:
> > > Few comments on 0001:
> > > ======================
> >
> > Thanks for your comments.
> >
>
> Review comments on
> v20-0001-Perform-streaming-logical-transactions-by-backgr
> ===================================================
> ============
> 1.
> + <para>
> + If set to <literal>on</literal>, the incoming changes are written to
> + temporary files and then applied only after the transaction is
> + committed on the publisher.
>
> It is not very clear that the transaction is applied when the commit is received
> by the subscriber. Can we slightly change it to: "If set to <literal>on</literal>,
> the incoming changes are written to temporary files and then applied only after
> the transaction is committed on the publisher and received by the subscriber."

Changed.

> 2.
> /* First time through, initialize apply workers hashtable */
> + if (ApplyBgworkersHash == NULL)
> + {
> + HASHCTL ctl;
> +
> + MemSet(&ctl, 0, sizeof(ctl));
> + ctl.keysize = sizeof(TransactionId);
> + ctl.entrysize = sizeof(ApplyBgworkerEntry); ctl.hcxt = ApplyContext;
> +
> + ApplyBgworkersHash = hash_create("logical apply workers hash", 8, &ctl,
> + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
>
> I think it would be better if we start with probably 16 element hash table, 8
> seems to be on the lower side.

Changed.

> 3.
> +/*
> + * Try to look up worker assigned before (see function
> +apply_bgworker_get_free)
> + * inside ApplyBgworkersHash for requested xid.
> + */
> +ApplyBgworkerState *
> +apply_bgworker_find(TransactionId xid)
>
> The above comment is not very clear. There doesn't seem to be any function
> named apply_bgworker_get_free in the patch. Can we write this comment as:
> "Find the previously assigned worker for the given transaction, if any."

Changed the comments.

> 4.
> /*
> + * Push apply error context callback. Fields will be filled applying a
> + * change.
> + */
>
> /Fields will be filled applying a change./Fields will be filled while applying a
> change.

Changed.

> 5.
> +void
> +ApplyBgworkerMain(Datum main_arg)
> +{
> ...
> ...
> + StartTransactionCommand();
> + oldcontext = MemoryContextSwitchTo(ApplyContext);
> +
> + MySubscription = GetSubscription(MyLogicalRepWorker->subid, true); if
> + (!MySubscription) { ereport(LOG, (errmsg("logical replication apply
> + worker for subscription %u will not "
> + "start because the subscription was removed during startup",
> + MyLogicalRepWorker->subid)));
> + proc_exit(0);
> + }
> +
> + MySubscriptionValid = true;
> + MemoryContextSwitchTo(oldcontext);
> +
> + /* Setup synchronous commit according to the user's wishes */
> + SetConfigOption("synchronous_commit", MySubscription->synccommit,
> + PGC_BACKEND, PGC_S_OVERRIDE);
> +
> + /* Keep us informed about subscription changes. */
> + CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
> + subscription_change_cb,
> + (Datum) 0);
> +
> + CommitTransactionCommand();
> ...
>
> This part appears of the code appears to be the same as we have in
> ApplyWorkerMain() except that the patch doesn't check whether the
> subscription is enabled. Is there a reason to not have that check here as well?
> Then in ApplyWorkerMain(), we do LOG the type of worker that is also missing
> here. Unless there is a specific reason to have a different code here, we should
> move this part to a common function and call it both from ApplyWorkerMain()
> and ApplyBgworkerMain().
> 6. I think the code in ApplyBgworkerMain() to set session_replication_role,
> search_path, and connect to the database also appears to be the same in
> ApplyWorkerMain(). If so, that can also be moved to the common function
> mentioned in the previous point.
>
> 7. I think we need to register for subscription rel map invalidation
> (invalidate_syncing_table_states) in ApplyBgworkerMain similar to
> ApplyWorkerMain. The reason is that we check the table state after processing
> a commit or similar change record via a call to process_syncing_tables.

Agreed and changed.

> 8. In apply_bgworker_setup_dsm(), we should have handling related to
> dsm_create failure due to max_segments reached as we have in
> InitializeParallelDSM(). We can follow the regular path of streaming
> transactions in case we are not able to create DSM instead of parallelizing it.

Changed.

> 9.
> + shm_toc_initialize_estimator(&e);
> + shm_toc_estimate_chunk(&e, sizeof(ApplyBgworkerShared));
> + shm_toc_estimate_chunk(&e, (Size) queue_size);
> +
> + shm_toc_estimate_keys(&e, 1 + 1);
>
> Here, you can directly write 2 instead of (1 + 1) stuff. It is quite clear that we
> need two keys here.

Changed.

> 10.
> apply_bgworker_wait_for()
> {
> ...
> + /* Wait to be signalled. */
> + WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
> + WAIT_EVENT_LOGICAL_APPLY_BGWORKER_STATE_CHANGE);
> ...
> }
>
> Typecast with the void, if we don't care for the return value.

Changed.

> 11.
> +static void
> +apply_bgworker_shutdown(int code, Datum arg) {
> +SpinLockAcquire(&MyParallelShared->mutex);
> + MyParallelShared->status = APPLY_BGWORKER_EXIT;
> + SpinLockRelease(&MyParallelShared->mutex);
>
> Is there a reason to not use apply_bgworker_set_status() directly?

No, changed to use that function.

> 12.
> + * Special case is if the first change comes from subtransaction, then
> + * we check that current_xid differs from stream_xid.
> + */
> +void
> +apply_bgworker_subxact_info_add(TransactionId current_xid) { if
> +(current_xid != stream_xid && !list_member_int(subxactlist, (int)
> +current_xid))
> ...
> ...
>
> I don't understand the above comment. Does that mean we don't need to
> define a savepoint if the first change is from a subtransaction? Also, keep an
> empty line before the above comment.

After checking, I think this comment is not very clear and have removed it
and improve other comments.

> 13.
> +void
> +apply_bgworker_subxact_info_add(TransactionId current_xid) { if
> +(current_xid != stream_xid && !list_member_int(subxactlist, (int)
> +current_xid)) { MemoryContext oldctx; char spname[MAXPGPATH];
> +
> + snprintf(spname, MAXPGPATH, "savepoint_for_xid_%u", current_xid);
>
> To uniquely generate the savepoint name, it is better to append the
> subscription id as well? Something like pg_sp_<subid>_<xid>.

Changed.

> 14. The CommitTransactionCommand() call in
> apply_bgworker_subxact_info_add looks a bit odd as that function neither
> seems to be starting the transaction command nor has any comments
> explaining it. Shall we do it in caller where it is more apparent to do the same?

I think the CommitTransactionCommand here is used to cooperate the
DefineSavepoint because we need to invoke CommitTransactionCommand to
start a new subtransaction. I tried to add some comments to explain the same.

> 15.
> else
> snprintf(bgw.bgw_name, BGW_MAXLEN,
> "logical replication worker for subscription %u", subid);
> +
> snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
>
> Spurious new line

Removed.

> 16.
> @@ -1153,7 +1162,14 @@ replorigin_session_setup(RepOriginId node)
>
> Assert(session_replication_state->roident != InvalidRepOriginId);
>
> - session_replication_state->acquired_by = MyProcPid;
> + if (must_acquire)
> + session_replication_state->acquired_by = MyProcPid; else if
> + (session_replication_state->acquired_by == 0) ereport(ERROR,
> + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
> + errmsg("apply background worker could not find replication state
> slot for replication origin with OID %u",
> + node),
> + errdetail("There is no replication state slot set by its main apply
> worker.")));
>
> It is not a good idea to give apply workers specific messages from this API
> because I don't think we can assume this is used by only apply workers. It seems
> to me that if 'must_acquire' is false, then we should either give elog(ERROR, ..)
> or there should be an Assert for the same. I am not completely sure but maybe
> we can request the caller to supply the PID (which already has acquired this
> origin) in case must_acquire is false and then use it in Assert/elog to ensure the
> correct usage of API. What do you think?

Agreed. I think we can replace the 'must_acquire' with the pid of worker which
acquired this origin(called 'acquired_by'). We can use this pid to check and
report the error if needed.

> 17. The commit message can explain the abort-related new information this
> patch sends to the subscribers.

Added.

> 18.
> + * In streaming case (receiving a block of streamed transaction), for
> + * SUBSTREAM_ON mode, simply redirect it to a file for the proper
> + toplevel
> + * transaction, and for SUBSTREAM_PARALLEL mode, send the changes to
> + apply
> + * background workers (LOGICAL_REP_MSG_RELATION or
> LOGICAL_REP_MSG_TYPE
> + changes
> + * will also be applied in main apply worker).
>
> In this, part of the comment "(LOGICAL_REP_MSG_RELATION or
> LOGICAL_REP_MSG_TYPE changes will also be applied in main apply worker)" is
> not very clear. Do you mean to say that these messages are applied by both
> main and background apply workers, if so, then please state the same
> explicitly?

Changed.

> 19.
> - /* not in streaming mode */
> - if (!in_streamed_transaction)
> + /* Not in streaming mode */
> + if (!(in_streamed_transaction || am_apply_bgworker()))
> ...
> ...
> - /* write the change to the current file */
> + /* Write the change to the current file */
> stream_write_change(action, s);
>
> I don't see the need to change the above comments.

Remove the changes.

> 20.
> static bool
> handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) { ...
> ...
> + if (am_apply_bgworker())
> + {
> + /* Define a savepoint for a subxact if needed. */
> + apply_bgworker_subxact_info_add(current_xid);
> +
> + return false;
> + }
> +
> + if (apply_bgworker_active())
>
> Isn't it better to use else if in the above code and probably else for the
> remaining part of code in this function?

Changed.

Attach new version(v21) patch set which addressed all the comments received so far.

Best regards,
Hou zj

Attachment Content-Type Size
v21-0003-Add-some-checks-before-using-apply-background-wo.patch application/octet-stream 55.4 KB
v21-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch application/octet-stream 30.9 KB
v21-0005-Add-a-main_worker_pid-to-pg_stat_subscription.patch application/octet-stream 6.2 KB
v21-0001-Perform-streaming-logical-transactions-by-backgr.patch application/octet-stream 113.2 KB
v21-0002-Test-streaming-parallel-option-in-tap-test.patch application/octet-stream 69.1 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message houzj.fnst@fujitsu.com 2022-08-11 07:48:36 RE: Perform streaming logical transactions by background workers and parallel apply
Previous Message Dmitry Koval 2022-08-11 06:56:37 Re: Add SPLIT PARTITION/MERGE PARTITIONS commands