Re: Parallel Apply

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: "Zhijie Hou (Fujitsu)" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: shveta malik <shveta(dot)malik(at)gmail(dot)com>, "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>, Tomas Vondra <tomas(at)vondra(dot)me>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Andrei Lepikhov <lepihov(at)gmail(dot)com>, wenhui qiu <qiuwenhuifx(at)gmail(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
Subject: Re: Parallel Apply
Date: 2026-06-09 04:14:27
Message-ID: CAHut+Pt=C9Z5PSkELKaTOh3AfjoUVf__FU2sQCFs8L60Qvfxhw@mail.gmail.com
Views: Whole Thread | Raw Message | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi Hou-San.

Some review comments for v19-0001 and v19-0002

//////////
v19-0001

======
.../replication/logical/applyparallelworker.c

1.
+/* An entry in the parallelized_txns shared hash table */
+typedef struct ParallelizedTxnEntry
+{
+ TransactionId xid; /* Hash key, remote transaction ID */
+} ParallelizedTxnEntry;
+
/*
* A hash table used to cache the state of streaming transactions being applied
- * by the parallel apply workers.
+ * by the parallel apply workers. Entries are of type ParallelApplyWorkerEntry.
*/
static HTAB *ParallelApplyTxnHash = NULL;

+/*
+ * A hash table used to track the parallelized remote transactions
that could be
+ * depended on by other transactions. Entries are of type ParallelizedTxnEntry.
+ *
+ * dshash is used to enable dynamic shared memory allocation based on
the number
+ * of transactions being applied in parallel.
+ */
+static dsa_area *parallel_apply_dsa_area = NULL;
+static dshash_table *parallelized_txns = NULL;
+
+/* parameters for the parallelized_txns shared hash table */
+static const dshash_parameters dsh_params = {
+ sizeof(TransactionId),
+ sizeof(ParallelizedTxnEntry),
+ dshash_memcmp,
+ dshash_memhash,
+ dshash_memcpy,
+ LWTRANCHE_PARALLEL_APPLY_DSA
+};
+

1a.
Maybe that ParallelizedTxnEntry should be moved to just immediately
above 'dshash_parameters' because it seems to belong with that, and
currently it is splitting the ParallelApplyWorkerEntry from the
ParallelApplyTxnHash.

~

1b.
/parameters for/Parameters for/

~~~

pa_attach_parallelized_txn_hash:

2.
+ MemoryContext oldctx;
+
+ if (parallelized_txns)
+ {
+ Assert(parallel_apply_dsa_area);
+ *pa_dsa_handle = dsa_get_handle(parallel_apply_dsa_area);
+ *pa_dshash_handle = dshash_get_hash_table_handle(parallelized_txns);
+ return;
+ }
+
+ /* Be sure any local memory allocated by DSA routines is persistent. */
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+
+ if (am_leader_apply_worker())
+ {
+ /* Initialize dynamic shared hash table for parallelized transactions */
+ parallel_apply_dsa_area = dsa_create(LWTRANCHE_PARALLEL_APPLY_DSA);
+ dsa_pin(parallel_apply_dsa_area);
+ dsa_pin_mapping(parallel_apply_dsa_area);
+ parallelized_txns = dshash_create(parallel_apply_dsa_area, &dsh_params, NULL);
+
+ /* Store handles in shared memory for other backends to use. */
+ *pa_dsa_handle = dsa_get_handle(parallel_apply_dsa_area);
+ *pa_dshash_handle = dshash_get_hash_table_handle(parallelized_txns);
+ }
+ else if (am_parallel_apply_worker())
+ {
+ /* Attach to existing dynamic shared hash table. */
+ parallel_apply_dsa_area =
dsa_attach(MyParallelShared->parallel_apply_dsa_handle);
+ dsa_pin_mapping(parallel_apply_dsa_area);
+ parallelized_txns = dshash_attach(parallel_apply_dsa_area, &dsh_params,
+ MyParallelShared->parallelized_txns_handle,
+ NULL);
+ }
+
+ MemoryContextSwitchTo(oldctx);

2a.
This might be easier to read if rearranged to use if/else instead of
having the early return.

SUGGESTION
if (parallelized_txns)
{
/* Hashtable is already available */
...
}
else
{
/* Create or attach... */

MemoryContext oldctx = ...

if (am_leader_apply_worker())
{
/* Create... */
...
}
else (am_parallel_apply_worker())
{
/* Attach... */
...
}

MemoryContextSwitchTo(oldctx);
}

~~~

2b.
Can it be anything other than
am_leader_apply_worker/am_parallel_apply_worker here? Should there be
an Assert?

~~~

2c.
Since the `dsh_params` are already set up, shouldn't this code be using them?

BEFORE
dsa_create(LWTRANCHE_PARALLEL_APPLY_DSA);

SUGGESTION
dsa_create(dsh_params.tranche_id);

//////////
v19-0002

======
.../replication/logical/applyparallelworker.

1.
+void
+pa_wait_for_depended_transaction(TransactionId xid)
+{
+ ParallelizedTxnEntry *txn_entry;

The declaration of `txn_entry` can be done later within the loop where
it is used.

======
Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message shveta malik 2026-06-09 04:17:51 Re: [PATCH] Preserve replication origin OIDs in pg_upgrade
Previous Message shveta malik 2026-06-09 04:13:40 Re: [PATCH] Preserve replication origin OIDs in pg_upgrade