RE: Perform streaming logical transactions by background workers and parallel apply

From: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
To: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
Cc: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-08-31 09:55:45
Message-ID: OS0PR01MB5716546863340CA19EC5F8C394789@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Tuesday, August 30, 2022 7:51 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>
> On Tue, Aug 30, 2022 at 12:12 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
> wrote:
> >
> > Few other comments on v25-0001*
> > ============================
> >
>
> Some more comments on v25-0001*:
> =============================
> 1.
> +static void
> +apply_handle_stream_abort(StringInfo s)
> ...
> ...
> + else if (apply_action == TA_SEND_TO_PARALLEL_WORKER) { if (subxid ==
> + xid) parallel_apply_replorigin_reset();
> +
> + /* Send STREAM ABORT message to the apply parallel worker. */
> + parallel_apply_send_data(winfo, s->len, s->data);
> +
> + /*
> + * After sending the data to the apply parallel worker, wait for
> + * that worker to finish. This is necessary to maintain commit
> + * order which avoids failures due to transaction dependencies and
> + * deadlocks.
> + */
> + if (subxid == xid)
> + {
> + parallel_apply_wait_for_free(winfo);
> ...
> ...
>
> From this code, it appears that we are waiting for rollbacks to finish but not
> doing the same in the rollback to savepoint cases. Is there a reason for the
> same? I think we need to wait for rollbacks to avoid transaction dependency
> and deadlock issues. Consider the below case:
>
> Consider table t1 (c1 primary key, c2, c3) has a row (1, 2, 3) on both publisher and
> subscriber.
>
> Publisher
> Session-1
> ==========
> Begin;
> ...
> Delete from t1 where c1 = 1;
>
> Session-2
> Begin;
> ...
> insert into t1 values(1, 4, 5); --This will wait for Session-1's Delete to finish.
>
> Session-1
> Rollback;
>
> Session-2
> -- The wait will be finished and the insert will be successful.
> Commit;
>
> Now, assume both these transactions get streamed and if we didn't wait for
> rollback/rollback to savepoint, it is possible that the insert gets executed
> before and leads to a constraint violation. This won't happen in non-parallel
> mode, so we should wait for rollbacks to finish.

Agreed and changed.

> 2. I think we don't need to wait at Rollback Prepared/Commit Prepared
> because we wait for prepare to finish in *_stream_prepare function.
> That will ensure all the operations in that transaction have happened in the
> subscriber, so no concurrent transaction can create deadlock or transaction
> dependency issues. If so, I think it is better to explain this in the comments.

Added some comments about this.

> 3.
> +/* What action to take for the transaction. */ typedef enum
> {
> - LogicalRepMsgType command; /* 0 if invalid */
> - LogicalRepRelMapEntry *rel;
> + /* The action for non-streaming transactions. */
> + TA_APPLY_IN_LEADER_WORKER,
>
> - /* Remote node information */
> - int remote_attnum; /* -1 if invalid */
> - TransactionId remote_xid;
> - XLogRecPtr finish_lsn;
> - char *origin_name;
> -} ApplyErrorCallbackArg;
> + /* Actions for streaming transactions. */ TA_SERIALIZE_TO_FILE,
> +TA_APPLY_IN_PARALLEL_WORKER, TA_SEND_TO_PARALLEL_WORKER }
> +TransactionApplyAction;
>
> I think each action needs explanation atop this enum typedef.

Added.

> 4.
> @@ -1149,24 +1315,14 @@ static void
> apply_handle_stream_start(StringInfo s) { ...
> + else if (apply_action == TA_SERIALIZE_TO_FILE) {
> + /*
> + * For the first stream start, check if there is any free apply
> + * parallel worker we can use to process this transaction.
> + */
> + if (first_segment)
> + winfo = parallel_apply_start_worker(stream_xid);
>
> - /* open the spool file for this transaction */
> - stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment);
> + if (winfo)
> + {
> + /*
> + * If we have found a free worker, then we pass the data to that
> + * worker.
> + */
> + parallel_apply_send_data(winfo, s->len, s->data);
>
> - /* if this is not the first segment, open existing subxact file */
> - if (!first_segment)
> - subxact_info_read(MyLogicalRepWorker->subid, stream_xid);
> + nchanges = 0;
>
> - pgstat_report_activity(STATE_RUNNING, NULL);
> + /* Cache the apply parallel worker for this transaction. */
> + stream_apply_worker = winfo; }
> ...
>
> This looks odd to me in the sense that even if the action is
> TA_SERIALIZE_TO_FILE, we still send the information to the parallel
> worker. Won't it be better if we call parallel_apply_start_worker()
> for first_segment before checking apply_action with
> get_transaction_apply_action(). That way we can avoid this special
> case handling.

Changed as suggested.

> 5.
> +/*
> + * Struct for sharing information between apply leader apply worker and apply
> + * parallel workers.
> + */
> +typedef struct ApplyParallelWorkerShared
> +{
> + slock_t mutex;
> +
> + bool in_use;
> +
> + /* Logical protocol version. */
> + uint32 proto_version;
> +
> + TransactionId stream_xid;
>
> Are we using stream_xid passed by the leader in parallel worker? If
> so, how? If not, then can we do without this?

No, it seems we don't need this. Removed.

> 6.
> +void
> +HandleParallelApplyMessages(void)
> {
> ...
> + /* OK to process messages. Reset the flag saying there are more to do. */
> + ParallelApplyMessagePending = false;
>
> I don't understand the meaning of the second part of the comment.
> Shouldn't we say: "Reset the flag saying there is nothing more to
> do."? I know you have copied from the other part of the code but there
> also I am not sure if it is correct.

I feel the comment here is not very helpful, so I removed this.

> 7.
> +static List *ApplyParallelWorkersFreeList = NIL;
> +static List *ApplyParallelWorkersList = NIL;
>
> Do we really need to maintain two different workers' lists? If so,
> what is the advantage? I think there won't be many parallel apply
> workers, so even if maintain one list and search it, there shouldn't
> be any performance impact. I feel maintaining two lists for this
> purpose is a bit complex and has more chances of bugs, so we should
> try to avoid it if possible.

Agreed, I removed the ApplyParallelWorkersList and reused
ApplyParallelWorkersList in other places.

Attach the new version patch set which addressed above comments
and comments from[1].

[1] https://www.postgresql.org/message-id/CAA4eK1%2Be8JsiC8uMZPU25xQRyxNvVS24M4%3DZy-xD18jzX%2BvrmA%40mail.gmail.com

Best regards,
Hou zj

Attachment Content-Type Size
v26-0005-Add-a-main_worker_pid-to-pg_stat_subscription.patch application/octet-stream 7.6 KB
v26-0001-Perform-streaming-logical-transactions-by-parall.patch application/octet-stream 129.2 KB
v26-0002-Test-streaming-parallel-option-in-tap-test.patch application/octet-stream 75.2 KB
v26-0003-Add-some-checks-before-using-parallel-apply-work.patch application/octet-stream 51.3 KB
v26-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch application/octet-stream 60.1 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Daniel Gustafsson 2022-08-31 09:55:50 Re: plpgsql-trigger.html: Format TG_ variables as table (patch)
Previous Message Peter Eisentraut 2022-08-31 09:38:33 Re: First draft of the PG 15 release notes