Re: Parallel Apply

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>
Cc: PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>, Tomas Vondra <tomas(at)vondra(dot)me>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Andrei Lepikhov <lepihov(at)gmail(dot)com>, "Zhijie Hou (Fujitsu)" <houzj(dot)fnst(at)fujitsu(dot)com>, wenhui qiu <qiuwenhuifx(at)gmail(dot)com>
Subject: Re: Parallel Apply
Date: 2026-04-14 08:51:08
Message-ID: CAA4eK1+jXOyZYeDmc1LVsgLpsWyKusis_y0ix_Ubv3vA3iFXuQ@mail.gmail.com
Views: Whole Thread | Raw Message | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Mon, Apr 13, 2026 at 4:41 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>
> Few comments:
> =============
> 1.
> index 058a955e20c..9042470f500 100644
> --- a/src/include/replication/logicalproto.h
> +++ b/src/include/replication/logicalproto.h
> @@ -75,6 +75,8 @@ typedef enum LogicalRepMsgType
> LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
> LOGICAL_REP_MSG_STREAM_ABORT = 'A',
> LOGICAL_REP_MSG_STREAM_PREPARE = 'p',
> + LOGICAL_REP_MSG_INTERNAL_DEPENDENCY = 'd',
> + LOGICAL_REP_MSG_INTERNAL_RELATION = 'i',
> } LogicalRepMsgType;
>
> I don't think it is good to classify these as logical_rep messages as
> they would be probably used to communicate between leader and parallel
> workers.
>

On checking more, I noticed that the above is required because we are
using apply_dispatch even to handle internal messages in following
code:

+ if (c == PqReplMsg_WALData)
+ {
+ /*
+ * Ignore statistics fields that have been updated by the
+ * leader apply worker.
+ *
+ * XXX We can avoid sending the statistics fields from the
+ * leader apply worker but for that, it needs to rebuild the
+ * entire message by removing these fields which could be more
+ * work than simply ignoring these fields in the parallel
+ * apply worker.
+ */
+ s.cursor += SIZE_STATS_MESSAGE;

- apply_dispatch(&s);
+ apply_dispatch(&s);
+ }
+ else if (c == PARALLEL_APPLY_INTERNAL_MESSAGE)
+ {
+ apply_dispatch(&s);
+ }

Isn't it better to invent a new apply_internal_message() or something
like it to handle internal messages? I think if we do that then the
previous point of not using LOGICAL_REP for
LOGICAL_REP_MSG_INTERNAL_DEPENDENCY and other messages would make more
sense.

Few other comments:
==================
1. In 0004, in file header and in commit message, we explained how
dependency tracking is achieved but why we need it when we are anyway
maintaining commit order is not explained.

2.
- if (winfo->serialize_changes ||
- list_length(ParallelApplyWorkerPool) >
- (max_parallel_apply_workers_per_subscription / 2))
+ if (winfo->serialize_changes)
{
logicalrep_pa_worker_stop(winfo);

Did you make this change because now we will anyway assign
transactions to parallel workers even though they are dependent
transactions, so no use of this optimization? If so, let's add a
comment to mention the same.

3.
@@ -921,6 +1036,9 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)

if (rc & WL_LATCH_SET)
ResetLatch(MyLatch);
+
+ if (!IsTransactionState())
+ pgstat_report_stat(true);

Why is this change required?

4.
/*
* Wait for the transaction lock to be released. This is required to
- * detect deadlock among leader and parallel apply workers. Refer to the
- * comments atop this file.
+ * detect detect deadlock among leader and parallel apply workers. Refer
+ * to the comments atop this file.
*/

This change doesn't make sense as the only change is the use of
'detect' word twice in the second sentence.

5.
void
pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
{
+ if (!TransactionIdIsValid(top_xid))
+ return;

Why is this change required for this patch? It will be better to add a
comment so that it is easier to understand.

6.
/*
+ * Build a dependency between this transaction and the lastly
+ * committed transaction to preserve the commit order. Then try to
+ * send a COMMIT message if succeeded.
+ */
+ if (build_dependency_with_last_committed_txn(winfo)

I think this is the key part of patch that helps with parallel apply.
We should add that somewhere in the comments.

7.
+
+ if (am_leader_apply_worker())
+ pa_distribute_schema_changes_to_workers(rel);

It is better to move the above if check inside
pa_distribute_schema_changes_to_workers() as we already have some
parallel worker related check inside that function.

--
With Regards,
Amit Kapila.

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Heikki Linnakangas 2026-04-14 09:02:28 Re: Compress prune/freeze records with Delta Frame of Reference algorithm
Previous Message Chao Li 2026-04-14 08:46:11 Re: Add bms_offset_members() function for bitshifting Bitmapsets