Re: Perform streaming logical transactions by background workers and parallel apply

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-12-01 12:39:54
Message-ID: CAA4eK1Jwh7j86Egk1cye=x2R_yrTjzXGj7Fx12wVybBAEq91kA@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Wed, Nov 30, 2022 at 4:23 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>
> 2.
> + /*
> + * The stream lock is released when processing changes in a
> + * streaming block, so the leader needs to acquire the lock here
> + * before entering PARTIAL_SERIALIZE mode to ensure that the
> + * parallel apply worker will wait for the leader to release the
> + * stream lock.
> + */
> + if (in_streamed_transaction &&
> + action != LOGICAL_REP_MSG_STREAM_STOP)
> + {
> + pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
>
> This comment is not completely correct because we can even acquire the
> lock for the very streaming chunk. This check will work but doesn't
> appear future-proof or at least not very easy to understand though I
> don't have a better suggestion at this stage. Can we think of a better
> check here?
>

One idea is that we acquire this lock every time and callers like
stream_commit are responsible to release it. Also, we can handle the
close of stream file in the respective callers. I think that will make
this part of the patch easier to follow.

Some other comments:
=====================
1. The handling of buffile inside pa_stream_abort() looks bit ugly to
me. I think you primarily required it because the buffile opened by
parallel apply worker is in CurrentResourceOwner. Can we think of
having a new resource owner to apply spooled messages? I think that
will avoid the need to have a special purpose code to handle buffiles
in parallel apply worker.

2.
@@ -564,6 +571,7 @@ handle_streamed_transaction(LogicalRepMsgType
action, StringInfo s)
TransactionId current_xid;
ParallelApplyWorkerInfo *winfo;
TransApplyAction apply_action;
+ StringInfoData original_msg;

apply_action = get_transaction_apply_action(stream_xid, &winfo);

@@ -573,6 +581,8 @@ handle_streamed_transaction(LogicalRepMsgType
action, StringInfo s)

Assert(TransactionIdIsValid(stream_xid));

+ original_msg = *s;
+
/*
* We should have received XID of the subxact as the first part of the
* message, so extract it.
@@ -596,10 +606,14 @@ handle_streamed_transaction(LogicalRepMsgType
action, StringInfo s)
stream_write_change(action, s);
return true;

+ case TRANS_LEADER_PARTIAL_SERIALIZE:
case TRANS_LEADER_SEND_TO_PARALLEL:
Assert(winfo);

- pa_send_data(winfo, s->len, s->data);
+ if (apply_action == TRANS_LEADER_SEND_TO_PARALLEL)
+ pa_send_data(winfo, s->len, s->data);
+ else
+ stream_write_change(action, &original_msg);

Please add the comment to specify the reason to remember the original string.

3.
@@ -1797,8 +1907,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
changes_filename(path, MyLogicalRepWorker->subid, xid);
elog(DEBUG1, "replaying changes from file \"%s\"", path);

- fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
- false);
+ stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
+ stream_xid = xid;

Why do we need stream_xid here? I think we can avoid having global
stream_fd if the comment #1 is feasible.

4.
+ * TRANS_LEADER_APPLY:
+ * The action means that we

/The/This. Please make a similar change for other actions.

5. Apart from the above, please find a few changes to the comments for
0001 and 0002 patches in the attached patches.

--
With Regards,
Amit Kapila.

Attachment Content-Type Size
changes_amit_v54_0001.patch application/octet-stream 3.3 KB
changes_amit_v54_0002.patch application/octet-stream 3.7 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Amit Langote 2022-12-01 12:43:28 Re: generic plans and "initial" pruning
Previous Message Sergey Shinderuk 2022-12-01 11:21:47 Re: Bug in row_number() optimization