RE: Perform streaming logical transactions by background workers and parallel apply

From: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
To: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
Cc: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-09-01 11:23:22
Message-ID: OS0PR01MB57164A2F6DB3B24987F48571947B9@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Wednesday, August 31, 2022 5:56 PM houzj(dot)fnst(at)fujitsu(dot)com wrote:
>
> On Tuesday, August 30, 2022 7:51 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
> wrote:
> >
> > On Tue, Aug 30, 2022 at 12:12 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
> > wrote:
> > >
> > > Few other comments on v25-0001*
> > > ============================
> > >
> >
> > Some more comments on v25-0001*:
> > =============================
> > 1.
> > +static void
> > +apply_handle_stream_abort(StringInfo s)
> > ...
> > ...
> > + else if (apply_action == TA_SEND_TO_PARALLEL_WORKER) { if (subxid ==
> > + xid) parallel_apply_replorigin_reset();
> > +
> > + /* Send STREAM ABORT message to the apply parallel worker. */
> > + parallel_apply_send_data(winfo, s->len, s->data);
> > +
> > + /*
> > + * After sending the data to the apply parallel worker, wait for
> > + * that worker to finish. This is necessary to maintain commit
> > + * order which avoids failures due to transaction dependencies and
> > + * deadlocks.
> > + */
> > + if (subxid == xid)
> > + {
> > + parallel_apply_wait_for_free(winfo);
> > ...
> > ...
> >
> > From this code, it appears that we are waiting for rollbacks to finish
> > but not doing the same in the rollback to savepoint cases. Is there a
> > reason for the same? I think we need to wait for rollbacks to avoid
> > transaction dependency and deadlock issues. Consider the below case:
> >
> > Consider table t1 (c1 primary key, c2, c3) has a row (1, 2, 3) on both
> > publisher and subscriber.
> >
> > Publisher
> > Session-1
> > ==========
> > Begin;
> > ...
> > Delete from t1 where c1 = 1;
> >
> > Session-2
> > Begin;
> > ...
> > insert into t1 values(1, 4, 5); --This will wait for Session-1's Delete to finish.
> >
> > Session-1
> > Rollback;
> >
> > Session-2
> > -- The wait will be finished and the insert will be successful.
> > Commit;
> >
> > Now, assume both these transactions get streamed and if we didn't wait
> > for rollback/rollback to savepoint, it is possible that the insert
> > gets executed before and leads to a constraint violation. This won't
> > happen in non-parallel mode, so we should wait for rollbacks to finish.
>
> Agreed and changed.
>
> > 2. I think we don't need to wait at Rollback Prepared/Commit Prepared
> > because we wait for prepare to finish in *_stream_prepare function.
> > That will ensure all the operations in that transaction have happened
> > in the subscriber, so no concurrent transaction can create deadlock or
> > transaction dependency issues. If so, I think it is better to explain this in the
> comments.
>
> Added some comments about this.
>
> > 3.
> > +/* What action to take for the transaction. */ typedef enum
> > {
> > - LogicalRepMsgType command; /* 0 if invalid */
> > - LogicalRepRelMapEntry *rel;
> > + /* The action for non-streaming transactions. */
> > + TA_APPLY_IN_LEADER_WORKER,
> >
> > - /* Remote node information */
> > - int remote_attnum; /* -1 if invalid */
> > - TransactionId remote_xid;
> > - XLogRecPtr finish_lsn;
> > - char *origin_name;
> > -} ApplyErrorCallbackArg;
> > + /* Actions for streaming transactions. */ TA_SERIALIZE_TO_FILE,
> > +TA_APPLY_IN_PARALLEL_WORKER, TA_SEND_TO_PARALLEL_WORKER }
> > +TransactionApplyAction;
> >
> > I think each action needs explanation atop this enum typedef.
>
> Added.
>
> > 4.
> > @@ -1149,24 +1315,14 @@ static void
> > apply_handle_stream_start(StringInfo s) { ...
> > + else if (apply_action == TA_SERIALIZE_TO_FILE) {
> > + /*
> > + * For the first stream start, check if there is any free apply
> > + * parallel worker we can use to process this transaction.
> > + */
> > + if (first_segment)
> > + winfo = parallel_apply_start_worker(stream_xid);
> >
> > - /* open the spool file for this transaction */
> > - stream_open_file(MyLogicalRepWorker->subid, stream_xid,
> > first_segment);
> > + if (winfo)
> > + {
> > + /*
> > + * If we have found a free worker, then we pass the data to that
> > + * worker.
> > + */
> > + parallel_apply_send_data(winfo, s->len, s->data);
> >
> > - /* if this is not the first segment, open existing subxact file */
> > - if (!first_segment)
> > - subxact_info_read(MyLogicalRepWorker->subid, stream_xid);
> > + nchanges = 0;
> >
> > - pgstat_report_activity(STATE_RUNNING, NULL);
> > + /* Cache the apply parallel worker for this transaction. */
> > + stream_apply_worker = winfo; }
> > ...
> >
> > This looks odd to me in the sense that even if the action is
> > TA_SERIALIZE_TO_FILE, we still send the information to the parallel
> > worker. Won't it be better if we call parallel_apply_start_worker()
> > for first_segment before checking apply_action with
> > get_transaction_apply_action(). That way we can avoid this special
> > case handling.
>
> Changed as suggested.
>
> > 5.
> > +/*
> > + * Struct for sharing information between apply leader apply worker
> > +and apply
> > + * parallel workers.
> > + */
> > +typedef struct ApplyParallelWorkerShared { slock_t mutex;
> > +
> > + bool in_use;
> > +
> > + /* Logical protocol version. */
> > + uint32 proto_version;
> > +
> > + TransactionId stream_xid;
> >
> > Are we using stream_xid passed by the leader in parallel worker? If
> > so, how? If not, then can we do without this?
>
> No, it seems we don't need this. Removed.
>
> > 6.
> > +void
> > +HandleParallelApplyMessages(void)
> > {
> > ...
> > + /* OK to process messages. Reset the flag saying there are more to
> > + do. */ ParallelApplyMessagePending = false;
> >
> > I don't understand the meaning of the second part of the comment.
> > Shouldn't we say: "Reset the flag saying there is nothing more to
> > do."? I know you have copied from the other part of the code but there
> > also I am not sure if it is correct.
>
> I feel the comment here is not very helpful, so I removed this.
>
> > 7.
> > +static List *ApplyParallelWorkersFreeList = NIL; static List
> > +*ApplyParallelWorkersList = NIL;
> >
> > Do we really need to maintain two different workers' lists? If so,
> > what is the advantage? I think there won't be many parallel apply
> > workers, so even if maintain one list and search it, there shouldn't
> > be any performance impact. I feel maintaining two lists for this
> > purpose is a bit complex and has more chances of bugs, so we should
> > try to avoid it if possible.
>
> Agreed, I removed the ApplyParallelWorkersList and reused
> ApplyParallelWorkersList in other places.
>
> Attach the new version patch set which addressed above comments and
> comments from[1].
>
> [1]
> https://www.postgresql.org/message-id/CAA4eK1%2Be8JsiC8uMZPU25xQRy
> xNvVS24M4%3DZy-xD18jzX%2BvrmA%40mail.gmail.com

Attach a new version patch set which fixes some typos and some cosmetic things.

Best regards,
Hou zj

Attachment Content-Type Size
v27-0005-Add-a-main_worker_pid-to-pg_stat_subscription.patch application/octet-stream 7.7 KB
v27-0001-Perform-streaming-logical-transactions-by-parall.patch application/octet-stream 128.5 KB
v27-0002-Test-streaming-parallel-option-in-tap-test.patch application/octet-stream 75.2 KB
v27-0003-Add-some-checks-before-using-parallel-apply-work.patch application/octet-stream 49.7 KB
v27-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch application/octet-stream 60.8 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Polina Bungina 2022-09-01 11:33:09 Re: pg_rewind WAL segments deletion pitfall
Previous Message Thomas Munro 2022-09-01 11:18:17 Re: pg15b3: recovery fails with wal prefetch enabled