RE: Perform streaming logical transactions by background workers and parallel apply

From: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
To: Peter Smith <smithpb2250(at)gmail(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-11-27 04:15:26
Message-ID: OS0PR01MB57161E105EBD94EAACF2A40A94109@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Friday, November 25, 2022 10:54 AM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
>
> Here are some review comments for v51-0001.

Thanks for the comments!
> ======
>
> .../replication/logical/applyparallelworker.c
>
> 1. General - Error messages, get_worker_name()
>
> I previously wrote a comment to ask if the get_worker_name() should be used
> in more places but the reply [1, #2b] was:
>
> > 2b.
> > Consider if maybe all of these ought to be calling get_worker_name()
> > which is currently static in worker.c. Doing this means any future
> > changes to get_worker_name won't cause more inconsistencies.
>
> The most error message in applyparallelxx.c can only use "xx parallel worker",
> so I think it's fine not to call get_worker_name
>
> ~
>
> I thought the reply missed the point I was trying to make -- I meant if it was
> arranged now so *every* message would go via
> get_worker_name() then in future somebody wanted to change the names (e.g.
> from "logical replication parallel apply worker" to "LR PA
> worker") then it would only need to be changed in one central place instead of
> hunting down every hardwired error message.
>

Thanks for the suggestion. I understand your point, but I feel that using
get_worker_name() at some places where the worker type is decided could make
developer think that all kind of worker can enter this code which I am not sure
is better. So I didn't change this.

>
> 2. HandleParallelApplyMessage
>
> + case 'X': /* Terminate, indicating clean exit. */
> + shm_mq_detach(winfo->error_mq_handle);
> + winfo->error_mq_handle = NULL;
> + break;
> + default:
> + elog(ERROR, "unrecognized message type received from logical
> replication parallel apply worker: %c (message length %d bytes)",
> + msgtype, msg->len);
>
> The case 'X' code indentation is too much.

Changed.

> ======
>
> src/backend/replication/logical/origin.c
>
> 3. replorigin_session_setup(RepOriginId node, int acquired_by)
>
> @@ -1075,12 +1075,20 @@ ReplicationOriginExitCleanup(int code, Datum arg)
> * array doesn't have to be searched when calling
> * replorigin_session_advance().
> *
> - * Obviously only one such cached origin can exist per process and the current
> + * Normally only one such cached origin can exist per process and the
> + current
> * cached value can only be set again after the previous value is torn down
> * with replorigin_session_reset().
> + *
> + * However, we do allow multiple processes to point to the same origin
> + slot if
> + * requested by the caller by passing PID of the process that has
> + already
> + * acquired it as acquired_by. This is to allow multiple parallel apply
> + * processes to use the same origin, provided they maintain commit
> + order, for
> + * example, by allowing only one process to commit at a time. For the
> + first
> + * process requesting this origin, the acquired_by parameter needs to
> + be set to
> + * 0.
> */
> void
> -replorigin_session_setup(RepOriginId node)
> +replorigin_session_setup(RepOriginId node, int acquired_by)
>
> I think the meaning of the acquired_by=0 is not fully described here:
> "For the first process requesting this origin, the acquired_by parameter needs
> to be set to 0."
> IMO that seems to be describing it only from POV that you are always going to
> want to allow multiple processes. But really this is an optional feature so you
> might pass acquired_by=0, not just because this is the first of multiple, but also
> because you *never* want to allow multiple at all. The comment does not
> convey this meaning.
>
> Maybe something worded like below is better?
>
> SUGGESTION
> Normally only one such cached origin can exist per process so the cached value
> can only be set again after the previous value is torn down with
> replorigin_session_reset(). For this normal case pass
> acquired_by=0 (meaning the slot is not allowed to be already acquired by
> another process).
>
> However, sometimes multiple processes can safely re-use the same origin slot
> (for example, multiple parallel apply processes can safely use the same origin,
> provided they maintain commit order by allowing only one process to commit
> at a time). For this case the first process must pass acquired_by=0, and then the
> other processes sharing that same origin can pass acquired_by=PID of the first
> process.

Changes as suggested.

> ======
>
> src/backend/replication/logical/worker.c
>
> 4. GENERAL - get_worker_name()
>
> If you decide it is OK to hardwire some error messages instead of
> unconditionally calling the get_worker_name() -- see my #1 review comment in
> this post -- then there are some other messages in this file that also seem like
> they can be also hardwired because the type of worker is already known.
>
> Here are some examples:
>
> 4a.
>
> + else if (am_parallel_apply_worker())
> + {
> + if (rel->state != SUBREL_STATE_READY)
> + ereport(ERROR,
> + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> + /* translator: first %s is the name of logical replication worker */
> + errmsg("%s for subscription \"%s\" will stop", get_worker_name(),
> + MySubscription->name), errdetail("Cannot handle streamed replication
> + transactions using
> parallel apply workers until all tables have been synchronized.")));
> +
> + return true;
> + }
>
> In the above code from should_apply_changes_for_rel we already know this is a
> parallel apply worker.
>
> ~
>
> 4b.
>
> + if (am_parallel_apply_worker())
> + ereport(LOG,
> + /* translator: first %s is the name of logical replication worker */
> + (errmsg("%s for subscription \"%s\" will stop because of a parameter
> + change", get_worker_name(), MySubscription->name))); else
>
> In the above code from maybe_reread_subscription we already know this is a
> parallel apply worker.
>
> 4c.
>
> if (am_tablesync_worker())
> ereport(LOG,
> - (errmsg("logical replication table synchronization worker for subscription
> \"%s\", table \"%s\" has started",
> - MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
> + /* translator: first %s is the name of logical replication worker */
> + (errmsg("%s for subscription \"%s\", table \"%s\" has started",
> + get_worker_name(), MySubscription->name,
> + get_rel_name(MyLogicalRepWorker->relid))));
>
> In the above code from ApplyWorkerMain we already know this is a tablesync
> worker

Thanks for checking these, changed.

> ~~~
>
> 5. get_transaction_apply_action
>
> +
> +/*
> + * Return the action to take for the given transaction. *winfo is
> +assigned to
> + * the destination parallel worker info (if the action is
> + * TRANS_LEADER_SEND_TO_PARALLEL, otherwise *winfo is assigned NULL.
> + */
> +static TransApplyAction
> +get_transaction_apply_action(TransactionId xid,
> ParallelApplyWorkerInfo **winfo)
>
> There is no closing ')' in the function comment.

Added.

> ~~~
>
> 6. apply_worker_clean_exit
>
> + /* Notify the leader apply worker that we have exited cleanly. */ if
> + (am_parallel_apply_worker()) pq_putmessage('X', NULL, 0);
>
> IMO the comment would be better inside the if block
>
> SUGGESTION
> if (am_parallel_apply_worker())
> {
> /* Notify the leader apply worker that we have exited cleanly. */
> pq_putmessage('X', NULL, 0);
> }

Changed.

Best regards,
Hou zj

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message houzj.fnst@fujitsu.com 2022-11-27 04:16:15 RE: Perform streaming logical transactions by background workers and parallel apply
Previous Message houzj.fnst@fujitsu.com 2022-11-27 04:13:34 RE: Perform streaming logical transactions by background workers and parallel apply