Re: Perform streaming logical transactions by background workers and parallel apply

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-11-29 12:33:53
Message-ID: CAA4eK1Kx76kakoz-YWxH1We_Ck_x7OAYPkmS2x5-sDeYkR1gAg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Tue, Nov 29, 2022 at 10:18 AM houzj(dot)fnst(at)fujitsu(dot)com
<houzj(dot)fnst(at)fujitsu(dot)com> wrote:
>
> Attach the new version patch which addressed all comments.
>

Review comments on v53-0001*
==========================
1.
Subscription *MySubscription = NULL;
-static bool MySubscriptionValid = false;
+bool MySubscriptionValid = false;

It seems still this variable is used in worker.c, so why it's scope changed?

2.
/* fields valid only when processing streamed transaction */
-static bool in_streamed_transaction = false;
+bool in_streamed_transaction = false;

Is it really required to change the scope of this variable? Can we
think of exposing a macro or inline function to check it in
applyparallelworker.c?

3.
should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
{
if (am_tablesync_worker())
return MyLogicalRepWorker->relid == rel->localreloid;
+ else if (am_parallel_apply_worker())
+ {
+ if (rel->state != SUBREL_STATE_READY)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication parallel apply worker for subscription
\"%s\" will stop",

Is this check sufficient? What if the rel->state is
SUBREL_STATE_UNKNOWN? I think that will be possible when the refresh
publication has not been yet performed after adding a new relation to
the publication. If that is true then won't we need to simply ignore
that change and continue instead of erroring out? Can you please once
test and check this case?

4.
+
+ case TRANS_PARALLEL_APPLY:
+ list_free(subxactlist);
+ subxactlist = NIL;
+
+ apply_handle_commit_internal(&commit_data);

I don't think we need to retail pfree subxactlist as this is allocated
in TopTransactionContext and will be freed at commit/prepare. This way
freeing looks a bit adhoc to me and you need to expose this list
outside applyparallelworker.c which doesn't seem like a good idea to
me either.

5.
+ apply_handle_commit_internal(&commit_data);
+
+ pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
+ pa_unlock_transaction(xid, AccessShareLock);
+
+ elog(DEBUG1, "finished processing the transaction finish command");

I think in this and similar DEBUG logs, we can tell the exact command
instead of writing 'finish'.

6.
apply_handle_stream_commit()
{
...
+ /*
+ * After sending the data to the parallel apply worker, wait for
+ * that worker to finish. This is necessary to maintain commit
+ * order which avoids failures due to transaction dependencies and
+ * deadlocks.
+ */
+ pa_wait_for_xact_finish(winfo);
+
+ pgstat_report_stat(false);
+ store_flush_position(commit_data.end_lsn);
+ stop_skipping_changes();
+
+ (void) pa_free_worker(winfo, xid);
...
}

apply_handle_stream_prepare(StringInfo s)
{
+
+ /*
+ * After sending the data to the parallel apply worker, wait for
+ * that worker to finish. This is necessary to maintain commit
+ * order which avoids failures due to transaction dependencies and
+ * deadlocks.
+ */
+ pa_wait_for_xact_finish(winfo);
+ (void) pa_free_worker(winfo, prepare_data.xid);

- /* unlink the files with serialized changes and subxact info. */
- stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
+ in_remote_transaction = false;
+
+ store_flush_position(prepare_data.end_lsn);

In both of the above functions, we should be consistent in calling
pa_free_worker() function which I think should be immediately after
pa_wait_for_xact_finish(). Is there a reason for not being consistent
here?

7.
+ res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true);
+
+ /*
+ * The leader will detach from the error queue and set it to NULL
+ * before preparing to stop all parallel apply workers, so we don't
+ * need to handle error messages anymore.
+ */
+ if (!winfo->error_mq_handle)
+ continue;

This check must be done before calling shm_mq_receive. So, changed it
in the attached patch.

8.
@@ -2675,6 +3156,10 @@ store_flush_position(XLogRecPtr remote_lsn)
{
FlushPosition *flushpos;

+ /* Skip for parallel apply workers. */
+ if (am_parallel_apply_worker())
+ return;

It is okay to always update the flush position by leader apply worker
but I think the leader won't have updated value for XactLastCommitEnd
as the local transaction is committed by parallel apply worker.

9.
@@ -3831,11 +4366,11 @@ ApplyWorkerMain(Datum main_arg)

ereport(DEBUG1,
(errmsg_internal("logical replication apply worker for subscription
\"%s\" two_phase is %s",
- MySubscription->name,
- MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED
? "DISABLED" :
- MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ?
"PENDING" :
- MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ?
"ENABLED" :
- "?")));
+ MySubscription->name,
+ MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED
? "DISABLED" :
+ MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ?
"PENDING" :
+ MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ?
"ENABLED" :
+ "?")));

Is this change related to this patch?

10. What is the reason to expose ApplyErrorCallbackArg via worker_internal.h?

11. The order to declare pa_set_stream_apply_worker() in
worker_internal.h and define in applyparallelworker.c is not the same.
Similarly, please check all other functions.

12. Apart from the above, I have made a few changes in the comments
and some other cosmetic changes in the attached patch.

--
With Regards,
Amit Kapila.

Attachment Content-Type Size
changes_amit_v53_1.patch application/octet-stream 6.7 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Ilya Anfimov 2022-11-29 12:38:41 Re: Patch: Global Unique Index
Previous Message Simon Riggs 2022-11-29 12:32:38 Re: Reducing power consumption on idle servers