RE: Perform streaming logical transactions by background workers and parallel apply

From: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
To: "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>
Cc: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-11-10 15:14:37
Message-ID: OS0PR01MB5716C10E35097026E12BB98A94019@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Monday, November 7, 2022 7:43 PM Kuroda, Hayato/黒田 隼人 <kuroda(dot)hayato(at)fujitsu(dot)com> wrote:
>
> Dear Hou,
>
> The followings are my comments. I want to consider the patch more, but I sent
> it once.

Thanks for the comments.

>
> ===
> worker.c
>
> 01. typedef enum TransApplyAction
>
> ```
> /*
> * What action to take for the transaction.
> *
> * TRANS_LEADER_APPLY means that we are in the leader apply worker and
> changes
> * of the transaction are applied directly in the worker.
> *
> * TRANS_LEADER_SERIALIZE means that we are in the leader apply worker or
> table
> * sync worker. Changes are written to temporary files and then applied when
> * the final commit arrives.
> *
> * TRANS_LEADER_SEND_TO_PARALLEL means that we are in the leader apply
> worker
> * and need to send the changes to the parallel apply worker.
> *
> * TRANS_PARALLEL_APPLY means that we are in the parallel apply worker and
> * changes of the transaction are applied directly in the worker.
> */
> ```
>
> TRANS_LEADER_PARTIAL_SERIALIZE should be listed in.
>

Added.

> 02. handle_streamed_transaction()
>
> ```
> + StringInfoData origin_msg;
> ...
> + origin_msg = *s;
> ...
> + /* Write the change to the current file */
> + stream_write_change(action,
> +
> apply_action == TRANS_LEADER_SERIALIZE ?
> +
> + s : &origin_msg);
> ```
>
> I'm not sure why origin_msg is needed. Can we remove the conditional
> operator?

Currently, the parallel apply worker would need the transaction xid of this change to
define savepoint. So, it need to write the original message to file.

>
> 03. apply_handle_stream_start()
>
> ```
> + * XXX We can avoid sending pairs of the START/STOP messages to the
> + parallel
> + * worker because unlike apply worker it will process only one
> + transaction at a
> + * time. However, it is not clear whether any optimization is
> + worthwhile
> + * because these messages are sent only when the
> + logical_decoding_work_mem
> + * threshold is exceeded.
> ```
>
> This comment should be modified because PA must acquire and release locks at
> that time.
>
>
> 04. apply_handle_stream_prepare()
>
> ```
> + /*
> + * After sending the data to the parallel apply worker,
> wait for
> + * that worker to finish. This is necessary to maintain
> commit
> + * order which avoids failures due to transaction
> dependencies and
> + * deadlocks.
> + */
> +
> + parallel_apply_wait_for_xact_finish(winfo->shared);
> ```
>
> Here seems not to be correct. LA may not send data but spill changes to file.

Changed.

> 05. apply_handle_stream_commit()
>
> ```
> + if (apply_action ==
> TRANS_LEADER_PARTIAL_SERIALIZE)
> +
> + stream_cleanup_files(MyLogicalRepWorker->subid, xid);
> ```
>
> I'm not sure whether the stream files should be removed by LA or PAs. Could
> you tell me the reason why you choose LA?

I think the logic would be natural that only LA can write/delete/create the file and
PA only need to read from it.

> ===
> applyparallelworker.c
>
> 05. parallel_apply_can_start()
>
> ```
> + if (switching_to_serialize)
> + return false;
> ```
>
> Could you add a comment like:
> Don't start a new parallel apply worker if the leader apply worker has been
> spilling changes to the disk temporarily.

These codes have been removed.

> 06. parallel_apply_start_worker()
>
> ```
> + /*
> + * Set the xact_state flag in the leader instead of the
> + * parallel apply worker to avoid the race condition where the leader
> has
> + * already started waiting for the parallel apply worker to finish
> + * processing the transaction while the child process has not yet
> + * processed the first STREAM_START and has not set the
> + * xact_state to true.
> + */
> ```
>
> I thinkg the word "flag" should be used for boolean, so the comment should be
> modified.
> (There are so many such code-comments, all of them should be modified.)

Changed.

>
> 07. parallel_apply_get_unique_id()
>
> ```
> +/*
> + * Returns the unique id among all parallel apply workers in the subscriber.
> + */
> +static uint16
> +parallel_apply_get_unique_id()
> ```
>
> I think this function is inefficient: the computational complexity will be increased
> linearly when the number of PAs is increased. I think the Bitmapset data
> structure may be used.

This function is removed.

> 08. parallel_apply_send_data()
>
> ```
> #define CHANGES_THRESHOLD 1000
> #define SHM_SEND_TIMEOUT_MS 10000
> ```
>
> I think the timeout may be too long. Could you tell me the background about it?

Serializing data to file would affect the performance, so I tried to make it difficult to happen unless the
PA is really blocked by another PA or BA.

> 09. parallel_apply_send_data()
>
> ```
> /*
> * Close the stream file if not in a streaming block, the
> file will
> * be reopened later.
> */
> if (!stream_apply_worker)
> serialize_stream_stop(winfo->shared->xid);
> ```
>
> a.
> IIUC the timings when LA tries to send data but stream_apply_worker is NULL
> are:
> * apply_handle_stream_prepare,
> * apply_handle_stream_start,
> * apply_handle_stream_abort, and
> * apply_handle_stream_commit.
> And at that time the state of TransApplyAction may be
> TRANS_LEADER_SEND_TO_PARALLEL. When should be close the file?

Changed to use another condition to check.

> b.
> Even if this is needed, I think the name of the called function should be modified.
> Here LA may not handle STREAM_STOP message. close_stream_file() or
> something?
>
>
> 10. parallel_apply_send_data()
>
> ```
> /* Initialize the stream fileset. */
> serialize_stream_start(winfo->shared->xid, true); ```
>
> I think the name of the called function should be modified. Here LA may not
> handle STREAM_START message. open_stream_file() or something?
>
> 11. parallel_apply_send_data()
>
> ```
> if (++retry >= CHANGES_THRESHOLD)
> {
> MemoryContext oldcontext;
> StringInfoData msg;
> ...
> initStringInfo(&msg);
> appendBinaryStringInfo(&msg, data, nbytes); ...
> switching_to_serialize = true;
> apply_dispatch(&msg);
> switching_to_serialize = false;
>
> break;
> }
> ```
>
> pfree(msg.data) may be needed.
>
> ===
> 12. worker_internal.h
>
> ```
> + pg_atomic_uint32 left_message;
> ```
>
>
> ParallelApplyWorkerShared has been already controlled by mutex locks. Why
> did you add an atomic variable to the data structure?

I personally feel this value is modified more frequently, so use an atomic
variable here.

> ===
> 13. typedefs.list
>
> ParallelTransState should be added.

Added.

> ===
> 14. General
>
> I have already said old about it directly, but I point it out to notify other members
> again.
> I have caused a deadlock with two PAs. Indeed it could be solved by the lmgr, but
> the output seemed not to be kind. Followings were copied from the log and we
> could see that commands executed by apply workers were not output. Can we
> extend it, or is it the out of scope?
>
>
> ```
> 2022-11-07 11:11:27.449 UTC [11262] ERROR: deadlock detected
> 2022-11-07 11:11:27.449 UTC [11262] DETAIL: Process 11262 waits for
> AccessExclusiveLock on object 16393 of class 6100 of database 0; blocked by
> process 11320.
> Process 11320 waits for ShareLock on transaction 742; blocked by
> process 11266.
> Process 11266 waits for AccessShareLock on object 16393 of class 6100 of
> database 0; blocked by process 11262.
> Process 11262: <command string not enabled>
> Process 11320: <command string not enabled>
> Process 11266: <command string not enabled> ```

On HEAD, a apply worker could also cause a deadlock with a user backend. Like:
Tx1 (backend)
begin;
insert into tbl1 values (100);
Tx2 (replaying streaming transaction)
begin;
insert into tbl1 values (1);
delete from tbl2;
insert into tbl1 values (1);
insert into tbl1 values (100);

logical replication worker ERROR: deadlock detected
logical replication worker DETAIL: Process 2158391 waits for ShareLock on transaction 749; blocked by process 2158410.
Process 2158410 waits for ShareLock on transaction 750; blocked by process 2158391.
Process 2158391: <command string not enabled>
Process 2158410: insert into tbl1 values (1);

So, it looks like the existing behavior. I agree that it would be better to
show something, but maybe we can do that as a separate patch.

Best regards,
Hou zj

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message houzj.fnst@fujitsu.com 2022-11-10 15:15:11 RE: Perform streaming logical transactions by background workers and parallel apply
Previous Message houzj.fnst@fujitsu.com 2022-11-10 15:09:37 RE: Perform streaming logical transactions by background workers and parallel apply