| From: | Peter Smith <smithpb2250(at)gmail(dot)com> |
|---|---|
| To: | "Zhijie Hou (Fujitsu)" <houzj(dot)fnst(at)fujitsu(dot)com> |
| Cc: | shveta malik <shveta(dot)malik(at)gmail(dot)com>, "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>, Tomas Vondra <tomas(at)vondra(dot)me>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Andrei Lepikhov <lepihov(at)gmail(dot)com>, wenhui qiu <qiuwenhuifx(at)gmail(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> |
| Subject: | Re: Parallel Apply |
| Date: | 2026-06-09 04:14:27 |
| Message-ID: | CAHut+Pt=C9Z5PSkELKaTOh3AfjoUVf__FU2sQCFs8L60Qvfxhw@mail.gmail.com |
| Views: | Whole Thread | Raw Message | Download mbox | Resend email |
| Thread: | |
| Lists: | pgsql-hackers |
Hi Hou-San.
Some review comments for v19-0001 and v19-0002
//////////
v19-0001
======
.../replication/logical/applyparallelworker.c
1.
+/* An entry in the parallelized_txns shared hash table */
+typedef struct ParallelizedTxnEntry
+{
+ TransactionId xid; /* Hash key, remote transaction ID */
+} ParallelizedTxnEntry;
+
/*
* A hash table used to cache the state of streaming transactions being applied
- * by the parallel apply workers.
+ * by the parallel apply workers. Entries are of type ParallelApplyWorkerEntry.
*/
static HTAB *ParallelApplyTxnHash = NULL;
+/*
+ * A hash table used to track the parallelized remote transactions
that could be
+ * depended on by other transactions. Entries are of type ParallelizedTxnEntry.
+ *
+ * dshash is used to enable dynamic shared memory allocation based on
the number
+ * of transactions being applied in parallel.
+ */
+static dsa_area *parallel_apply_dsa_area = NULL;
+static dshash_table *parallelized_txns = NULL;
+
+/* parameters for the parallelized_txns shared hash table */
+static const dshash_parameters dsh_params = {
+ sizeof(TransactionId),
+ sizeof(ParallelizedTxnEntry),
+ dshash_memcmp,
+ dshash_memhash,
+ dshash_memcpy,
+ LWTRANCHE_PARALLEL_APPLY_DSA
+};
+
1a.
Maybe that ParallelizedTxnEntry should be moved to just immediately
above 'dshash_parameters' because it seems to belong with that, and
currently it is splitting the ParallelApplyWorkerEntry from the
ParallelApplyTxnHash.
~
1b.
/parameters for/Parameters for/
~~~
pa_attach_parallelized_txn_hash:
2.
+ MemoryContext oldctx;
+
+ if (parallelized_txns)
+ {
+ Assert(parallel_apply_dsa_area);
+ *pa_dsa_handle = dsa_get_handle(parallel_apply_dsa_area);
+ *pa_dshash_handle = dshash_get_hash_table_handle(parallelized_txns);
+ return;
+ }
+
+ /* Be sure any local memory allocated by DSA routines is persistent. */
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+
+ if (am_leader_apply_worker())
+ {
+ /* Initialize dynamic shared hash table for parallelized transactions */
+ parallel_apply_dsa_area = dsa_create(LWTRANCHE_PARALLEL_APPLY_DSA);
+ dsa_pin(parallel_apply_dsa_area);
+ dsa_pin_mapping(parallel_apply_dsa_area);
+ parallelized_txns = dshash_create(parallel_apply_dsa_area, &dsh_params, NULL);
+
+ /* Store handles in shared memory for other backends to use. */
+ *pa_dsa_handle = dsa_get_handle(parallel_apply_dsa_area);
+ *pa_dshash_handle = dshash_get_hash_table_handle(parallelized_txns);
+ }
+ else if (am_parallel_apply_worker())
+ {
+ /* Attach to existing dynamic shared hash table. */
+ parallel_apply_dsa_area =
dsa_attach(MyParallelShared->parallel_apply_dsa_handle);
+ dsa_pin_mapping(parallel_apply_dsa_area);
+ parallelized_txns = dshash_attach(parallel_apply_dsa_area, &dsh_params,
+ MyParallelShared->parallelized_txns_handle,
+ NULL);
+ }
+
+ MemoryContextSwitchTo(oldctx);
2a.
This might be easier to read if rearranged to use if/else instead of
having the early return.
SUGGESTION
if (parallelized_txns)
{
/* Hashtable is already available */
...
}
else
{
/* Create or attach... */
MemoryContext oldctx = ...
if (am_leader_apply_worker())
{
/* Create... */
...
}
else (am_parallel_apply_worker())
{
/* Attach... */
...
}
MemoryContextSwitchTo(oldctx);
}
~~~
2b.
Can it be anything other than
am_leader_apply_worker/am_parallel_apply_worker here? Should there be
an Assert?
~~~
2c.
Since the `dsh_params` are already set up, shouldn't this code be using them?
BEFORE
dsa_create(LWTRANCHE_PARALLEL_APPLY_DSA);
SUGGESTION
dsa_create(dsh_params.tranche_id);
//////////
v19-0002
======
.../replication/logical/applyparallelworker.
1.
+void
+pa_wait_for_depended_transaction(TransactionId xid)
+{
+ ParallelizedTxnEntry *txn_entry;
The declaration of `txn_entry` can be done later within the loop where
it is used.
======
Kind Regards,
Peter Smith.
Fujitsu Australia
| From | Date | Subject | |
|---|---|---|---|
| Next Message | shveta malik | 2026-06-09 04:17:51 | Re: [PATCH] Preserve replication origin OIDs in pg_upgrade |
| Previous Message | shveta malik | 2026-06-09 04:13:40 | Re: [PATCH] Preserve replication origin OIDs in pg_upgrade |