RE: Perform streaming logical transactions by background workers and parallel apply

From: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
To: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
Cc: Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-12-04 11:17:28
Message-ID: OS0PR01MB571690D6FF24E9D000ECBDBD94199@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Thursday, December 1, 2022 8:40 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>
> On Wed, Nov 30, 2022 at 4:23 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
> wrote:
> >
> > 2.
> > + /*
> > + * The stream lock is released when processing changes in a
> > + * streaming block, so the leader needs to acquire the lock here
> > + * before entering PARTIAL_SERIALIZE mode to ensure that the
> > + * parallel apply worker will wait for the leader to release the
> > + * stream lock.
> > + */
> > + if (in_streamed_transaction &&
> > + action != LOGICAL_REP_MSG_STREAM_STOP) {
> > + pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
> >
> > This comment is not completely correct because we can even acquire the
> > lock for the very streaming chunk. This check will work but doesn't
> > appear future-proof or at least not very easy to understand though I
> > don't have a better suggestion at this stage. Can we think of a better
> > check here?
> >
>
> One idea is that we acquire this lock every time and callers like stream_commit
> are responsible to release it. Also, we can handle the close of stream file in the
> respective callers. I think that will make this part of the patch easier to follow.

Changed.

> Some other comments:
> =====================
> 1. The handling of buffile inside pa_stream_abort() looks bit ugly to me. I think
> you primarily required it because the buffile opened by parallel apply worker is
> in CurrentResourceOwner.

Changed to use toplevel transaction's resource.

> Can we think of having a new resource owner to
> apply spooled messages? I think that will avoid the need to have a special
> purpose code to handle buffiles in parallel apply worker.

I am thinking about this and will address this in next version.

> 2.
> @@ -564,6 +571,7 @@ handle_streamed_transaction(LogicalRepMsgType
> action, StringInfo s)
> TransactionId current_xid;
> ParallelApplyWorkerInfo *winfo;
> TransApplyAction apply_action;
> + StringInfoData original_msg;
>
> apply_action = get_transaction_apply_action(stream_xid, &winfo);
>
> @@ -573,6 +581,8 @@ handle_streamed_transaction(LogicalRepMsgType
> action, StringInfo s)
>
> Assert(TransactionIdIsValid(stream_xid));
>
> + original_msg = *s;
> +
> /*
> * We should have received XID of the subxact as the first part of the
> * message, so extract it.
> @@ -596,10 +606,14 @@ handle_streamed_transaction(LogicalRepMsgType
> action, StringInfo s)
> stream_write_change(action, s);
> return true;
>
> + case TRANS_LEADER_PARTIAL_SERIALIZE:
> case TRANS_LEADER_SEND_TO_PARALLEL:
> Assert(winfo);
>
> - pa_send_data(winfo, s->len, s->data);
> + if (apply_action == TRANS_LEADER_SEND_TO_PARALLEL) pa_send_data(winfo,
> + s->len, s->data); else stream_write_change(action, &original_msg);
>
> Please add the comment to specify the reason to remember the original string.

Added.

> 3.
> @@ -1797,8 +1907,8 @@ apply_spooled_messages(TransactionId xid,
> XLogRecPtr lsn)
> changes_filename(path, MyLogicalRepWorker->subid, xid);
> elog(DEBUG1, "replaying changes from file \"%s\"", path);
>
> - fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
> O_RDONLY,
> - false);
> + stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
> + stream_xid = xid;
>
> Why do we need stream_xid here? I think we can avoid having global stream_fd
> if the comment #1 is feasible.

I think we don't need it anymore, I have removed it.

> 4.
> + * TRANS_LEADER_APPLY:
> + * The action means that we
>
> /The/This. Please make a similar change for other actions.
>
> 5. Apart from the above, please find a few changes to the comments for
> 0001 and 0002 patches in the attached patches.

Merged.

Attach the new version patch set which addressed most of the comments received so
far except some comments being discussed[1].

[1] https://www.postgresql.org/message-id/OS0PR01MB57167BF64FC0891734C8E81A94149%40OS0PR01MB5716.jpnprd01.prod.outlook.com

Best regards,
Hou zj

Attachment Content-Type Size
v55-0003-Test-streaming-parallel-option-in-tap-test.patch application/octet-stream 80.1 KB
v55-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch application/octet-stream 22.7 KB
v55-0005-Add-a-main_worker_pid-to-pg_stat_subscription.patch application/octet-stream 8.7 KB
v55-0001-Perform-streaming-logical-transactions-by-parall.patch application/octet-stream 195.1 KB
v55-0002-Serialize-partial-changes-to-a-file-when-the-att.patch application/octet-stream 39.6 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message houzj.fnst@fujitsu.com 2022-12-04 11:18:02 RE: Perform streaming logical transactions by background workers and parallel apply
Previous Message Dean Rasheed 2022-12-04 09:53:42 Re: Improve performance of pg_strtointNN functions