Re: Perform streaming logical transactions by background workers and parallel apply

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-08-30 11:51:07
Message-ID: CAA4eK1Kom70AmcKjyKy1K77yCK1ipSUUGcaxXOHuRAqsRiZQmA@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Tue, Aug 30, 2022 at 12:12 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>
> Few other comments on v25-0001*
> ============================
>

Some more comments on v25-0001*:
=============================
1.
+static void
+apply_handle_stream_abort(StringInfo s)
...
...
+ else if (apply_action == TA_SEND_TO_PARALLEL_WORKER)
+ {
+ if (subxid == xid)
+ parallel_apply_replorigin_reset();
+
+ /* Send STREAM ABORT message to the apply parallel worker. */
+ parallel_apply_send_data(winfo, s->len, s->data);
+
+ /*
+ * After sending the data to the apply parallel worker, wait for
+ * that worker to finish. This is necessary to maintain commit
+ * order which avoids failures due to transaction dependencies and
+ * deadlocks.
+ */
+ if (subxid == xid)
+ {
+ parallel_apply_wait_for_free(winfo);
...
...

From this code, it appears that we are waiting for rollbacks to finish
but not doing the same in the rollback to savepoint cases. Is there a
reason for the same? I think we need to wait for rollbacks to avoid
transaction dependency and deadlock issues. Consider the below case:

Consider table t1 (c1 primary key, c2, c3) has a row (1, 2, 3) on both
publisher and subscriber.

Publisher
Session-1
==========
Begin;
...
Delete from t1 where c1 = 1;

Session-2
Begin;
...
insert into t1 values(1, 4, 5); --This will wait for Session-1's
Delete to finish.

Session-1
Rollback;

Session-2
-- The wait will be finished and the insert will be successful.
Commit;

Now, assume both these transactions get streamed and if we didn't wait
for rollback/rollback to savepoint, it is possible that the insert
gets executed before and leads to a constraint violation. This won't
happen in non-parallel mode, so we should wait for rollbacks to
finish.

2. I think we don't need to wait at Rollback Prepared/Commit Prepared
because we wait for prepare to finish in *_stream_prepare function.
That will ensure all the operations in that transaction have happened
in the subscriber, so no concurrent transaction can create deadlock or
transaction dependency issues. If so, I think it is better to explain
this in the comments.

3.
+/* What action to take for the transaction. */
+typedef enum
{
- LogicalRepMsgType command; /* 0 if invalid */
- LogicalRepRelMapEntry *rel;
+ /* The action for non-streaming transactions. */
+ TA_APPLY_IN_LEADER_WORKER,

- /* Remote node information */
- int remote_attnum; /* -1 if invalid */
- TransactionId remote_xid;
- XLogRecPtr finish_lsn;
- char *origin_name;
-} ApplyErrorCallbackArg;
+ /* Actions for streaming transactions. */
+ TA_SERIALIZE_TO_FILE,
+ TA_APPLY_IN_PARALLEL_WORKER,
+ TA_SEND_TO_PARALLEL_WORKER
+} TransactionApplyAction;

I think each action needs explanation atop this enum typedef.

4.
@@ -1149,24 +1315,14 @@ static void
apply_handle_stream_start(StringInfo s)
{
...
+ else if (apply_action == TA_SERIALIZE_TO_FILE)
+ {
+ /*
+ * For the first stream start, check if there is any free apply
+ * parallel worker we can use to process this transaction.
+ */
+ if (first_segment)
+ winfo = parallel_apply_start_worker(stream_xid);

- /* open the spool file for this transaction */
- stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment);
+ if (winfo)
+ {
+ /*
+ * If we have found a free worker, then we pass the data to that
+ * worker.
+ */
+ parallel_apply_send_data(winfo, s->len, s->data);

- /* if this is not the first segment, open existing subxact file */
- if (!first_segment)
- subxact_info_read(MyLogicalRepWorker->subid, stream_xid);
+ nchanges = 0;

- pgstat_report_activity(STATE_RUNNING, NULL);
+ /* Cache the apply parallel worker for this transaction. */
+ stream_apply_worker = winfo;
+ }
...

This looks odd to me in the sense that even if the action is
TA_SERIALIZE_TO_FILE, we still send the information to the parallel
worker. Won't it be better if we call parallel_apply_start_worker()
for first_segment before checking apply_action with
get_transaction_apply_action(). That way we can avoid this special
case handling.

5.
+/*
+ * Struct for sharing information between apply leader apply worker and apply
+ * parallel workers.
+ */
+typedef struct ApplyParallelWorkerShared
+{
+ slock_t mutex;
+
+ bool in_use;
+
+ /* Logical protocol version. */
+ uint32 proto_version;
+
+ TransactionId stream_xid;

Are we using stream_xid passed by the leader in parallel worker? If
so, how? If not, then can we do without this?

6.
+void
+HandleParallelApplyMessages(void)
{
...
+ /* OK to process messages. Reset the flag saying there are more to do. */
+ ParallelApplyMessagePending = false;

I don't understand the meaning of the second part of the comment.
Shouldn't we say: "Reset the flag saying there is nothing more to
do."? I know you have copied from the other part of the code but there
also I am not sure if it is correct.

7.
+static List *ApplyParallelWorkersFreeList = NIL;
+static List *ApplyParallelWorkersList = NIL;

Do we really need to maintain two different workers' lists? If so,
what is the advantage? I think there won't be many parallel apply
workers, so even if maintain one list and search it, there shouldn't
be any performance impact. I feel maintaining two lists for this
purpose is a bit complex and has more chances of bugs, so we should
try to avoid it if possible.

--
With Regards,
Amit Kapila.

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Peter Eisentraut 2022-08-30 11:53:49 Re: Transparent column encryption
Previous Message Peter Eisentraut 2022-08-30 11:40:43 Re: Transparent column encryption