RE: Perform streaming logical transactions by background workers and parallel apply

From: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
To: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
Cc: Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-11-30 13:40:39
Message-ID: OS0PR01MB57161603FC9CB886F2CEE53194159@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Tuesday, November 29, 2022 8:34 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>
> On Tue, Nov 29, 2022 at 10:18 AM houzj(dot)fnst(at)fujitsu(dot)com
> <houzj(dot)fnst(at)fujitsu(dot)com> wrote:
> >
> > Attach the new version patch which addressed all comments.
> >
>
> Review comments on v53-0001*

Thanks for the comments!
> ==========================
> 1.
> Subscription *MySubscription = NULL;
> -static bool MySubscriptionValid = false;
> +bool MySubscriptionValid = false;
>
> It seems still this variable is used in worker.c, so why it's scope changed?

I think it's not needed. Removed.

> 2.
> /* fields valid only when processing streamed transaction */ -static bool
> in_streamed_transaction = false;
> +bool in_streamed_transaction = false;
>
> Is it really required to change the scope of this variable? Can we think of
> exposing a macro or inline function to check it in applyparallelworker.c?

Introduced a new function.

> 3.
> should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) {
> if (am_tablesync_worker())
> return MyLogicalRepWorker->relid == rel->localreloid;
> + else if (am_parallel_apply_worker())
> + {
> + if (rel->state != SUBREL_STATE_READY)
> + ereport(ERROR,
> + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> + errmsg("logical replication parallel apply worker for subscription
> \"%s\" will stop",
>
> Is this check sufficient? What if the rel->state is SUBREL_STATE_UNKNOWN? I
> think that will be possible when the refresh publication has not been yet
> performed after adding a new relation to the publication. If that is true then
> won't we need to simply ignore that change and continue instead of erroring
> out? Can you please once test and check this case?

You are right. Changed to not report an ERROR for SUBREL_STATE_UNKNOWN.

> 4.
> +
> + case TRANS_PARALLEL_APPLY:
> + list_free(subxactlist);
> + subxactlist = NIL;
> +
> + apply_handle_commit_internal(&commit_data);
>
> I don't think we need to retail pfree subxactlist as this is allocated in
> TopTransactionContext and will be freed at commit/prepare. This way freeing
> looks a bit adhoc to me and you need to expose this list outside
> applyparallelworker.c which doesn't seem like a good idea to me either.

Removed the list_free.

> 5.
> + apply_handle_commit_internal(&commit_data);
> +
> + pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
> + pa_unlock_transaction(xid, AccessShareLock);
> +
> + elog(DEBUG1, "finished processing the transaction finish command");
>
> I think in this and similar DEBUG logs, we can tell the exact command instead of
> writing 'finish'.

Changed.

> 6.
> apply_handle_stream_commit()
> {
> ...
> + /*
> + * After sending the data to the parallel apply worker, wait for
> + * that worker to finish. This is necessary to maintain commit
> + * order which avoids failures due to transaction dependencies and
> + * deadlocks.
> + */
> + pa_wait_for_xact_finish(winfo);
> +
> + pgstat_report_stat(false);
> + store_flush_position(commit_data.end_lsn);
> + stop_skipping_changes();
> +
> + (void) pa_free_worker(winfo, xid);
> ...
> }

> apply_handle_stream_prepare(StringInfo s) {
> +
> + /*
> + * After sending the data to the parallel apply worker, wait for
> + * that worker to finish. This is necessary to maintain commit
> + * order which avoids failures due to transaction dependencies and
> + * deadlocks.
> + */
> + pa_wait_for_xact_finish(winfo);
> + (void) pa_free_worker(winfo, prepare_data.xid);
>
> - /* unlink the files with serialized changes and subxact info. */
> - stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
> + in_remote_transaction = false;
> +
> + store_flush_position(prepare_data.end_lsn);
>
>
> In both of the above functions, we should be consistent in calling
> pa_free_worker() function which I think should be immediately after
> pa_wait_for_xact_finish(). Is there a reason for not being consistent here?

Changed the order to make them consistent.

> 7.
> + res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true);
> +
> + /*
> + * The leader will detach from the error queue and set it to NULL
> + * before preparing to stop all parallel apply workers, so we don't
> + * need to handle error messages anymore.
> + */
> + if (!winfo->error_mq_handle)
> + continue;
>
> This check must be done before calling shm_mq_receive. So, changed it in the
> attached patch.

Thanks, merged.

> 8.
> @@ -2675,6 +3156,10 @@ store_flush_position(XLogRecPtr remote_lsn) {
> FlushPosition *flushpos;
>
> + /* Skip for parallel apply workers. */ if (am_parallel_apply_worker())
> + return;
>
> It is okay to always update the flush position by leader apply worker but I think
> the leader won't have updated value for XactLastCommitEnd as the local
> transaction is committed by parallel apply worker.

I added a field in shared memory so that the parallel apply worker can pass
the XactLastCommitEnd to leader and then the leader will store that.

> 9.
> @@ -3831,11 +4366,11 @@ ApplyWorkerMain(Datum main_arg)
>
> ereport(DEBUG1,
> (errmsg_internal("logical replication apply worker for subscription \"%s\"
> two_phase is %s",
> - MySubscription->name,
> - MySubscription->twophasestate ==
> LOGICALREP_TWOPHASE_STATE_DISABLED
> ? "DISABLED" :
> - MySubscription->twophasestate ==
> LOGICALREP_TWOPHASE_STATE_PENDING ?
> "PENDING" :
> - MySubscription->twophasestate ==
> LOGICALREP_TWOPHASE_STATE_ENABLED ?
> "ENABLED" :
> - "?")));
> + MySubscription->name,
> + MySubscription->twophasestate ==
> LOGICALREP_TWOPHASE_STATE_DISABLED
> ? "DISABLED" :
> + MySubscription->twophasestate ==
> LOGICALREP_TWOPHASE_STATE_PENDING ?
> "PENDING" :
> + MySubscription->twophasestate ==
> LOGICALREP_TWOPHASE_STATE_ENABLED ?
> "ENABLED" :
> + "?")));
>
> Is this change related to this patch?

I think accidentally changed due to pgident. Reverted.

> 10. What is the reason to expose ApplyErrorCallbackArg via worker_internal.h?

The parallel apply worker need to set the origin name into this. I introduced another function
to set this.

> 11. The order to declare pa_set_stream_apply_worker() in worker_internal.h and
> define in applyparallelworker.c is not the same.
> Similarly, please check all other functions.

Changed.

> 12. Apart from the above, I have made a few changes in the comments and
> some other cosmetic changes in the attached patch.

Thanks, I have checked and merged them.

Attach the new version patch set.

I haven't addressed comment #1 and #2 from [1], I need to think about it and
will handle it soon. Besides, I haven't renamed serialize_stream_start/stop and
haven't finished the word consistency check for comments, I think I will handle
them soon.

[1] https://www.postgresql.org/message-id/CAA4eK1LGKYUDFZ_jFPrU497wQf2HNvt5a%2BtCTpqSeWSG6kfpSA%40mail.gmail.com

Best regards,
Hou zj

Attachment Content-Type Size
v54-0001-Perform-streaming-logical-transactions-by-parall.patch application/octet-stream 195.1 KB
v54-0002-Serialize-partial-changes-to-a-file-when-the-att.patch application/octet-stream 38.3 KB
v54-0003-Test-streaming-parallel-option-in-tap-test.patch application/octet-stream 80.1 KB
v54-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch application/octet-stream 22.7 KB
v54-0005-Add-a-main_worker_pid-to-pg_stat_subscription.patch application/octet-stream 8.7 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message houzj.fnst@fujitsu.com 2022-11-30 13:51:48 RE: Perform streaming logical transactions by background workers and parallel apply
Previous Message Robert Haas 2022-11-30 13:39:23 Re: pgsql: Revoke PUBLIC CREATE from public schema, now owned by pg_databas