From: | "Zhijie Hou (Fujitsu)" <houzj(dot)fnst(at)fujitsu(dot)com> |
---|---|
To: | Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> |
Cc: | PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org> |
Subject: | RE: Parallel Apply |
Date: | 2025-08-13 10:46:45 |
Message-ID: | OS0PR01MB5716D43CB68DB8FFE73BF65D942AA@OS0PR01MB5716.jpnprd01.prod.outlook.com |
Views: | Whole Thread | Raw Message | Download mbox | Resend email |
Thread: | |
Lists: | pgsql-hackers |
On Monday, August 11, 2025 12:46 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
> Background and Motivation
> -------------------------------------
> In high-throughput systems, where hundreds of sessions generate data
> on the publisher, the subscriber's apply process often becomes a
> bottleneck due to the single apply worker model. While users can
> mitigate this by creating multiple publication-subscription pairs,
> this approach has scalability and usability limitations.
>
> Currently, PostgreSQL supports parallel apply only for large streaming
> transactions (streaming=parallel). This proposal aims to extend
> parallelism to non-streaming transactions, thereby improving
> replication performance in workloads dominated by smaller, frequent
> transactions.
>
> Design Overview
> ------------------------
> To safely parallelize non-streaming transactions, we must ensure that
> transaction dependencies are respected to avoid failures and
> deadlocks. Consider the following scenarios to understand it better:
> (a) Transaction failures: Say, if we insert a row in the first
> transaction and update it in the second transaction on the publisher,
> then allowing the subscriber to apply both in parallel can lead to
> failure in the update; (b) Deadlocks - allowing transactions that
> update the same set of rows in a table in the opposite order in
> parallel can lead to deadlocks.
>
> The core idea is that the leader apply worker ensures the following:
> a. Identifies dependencies between transactions. b. Coordinates
> parallel workers to apply independent transactions concurrently. c.
> Ensures correct ordering for dependent transactions.
>
> Dependency Detection
> --------------------------------
> 1. Basic Dependency Tracking: Maintain a hash table keyed by
> (RelationId, ReplicaIdentity) with the value as the transaction XID.
> Before dispatching a change to a parallel worker, the leader checks
> for existing entries: (a) If no match: add the entry and proceed; (b)
> If match: instruct the worker to wait until the dependent transaction
> completes.
>
> 2. Unique Keys
> In addition to RI, track unique keys to detect conflicts. Example:
> CREATE TABLE tab1(a INT PRIMARY KEY, b INT UNIQUE);
> Transactions on publisher:
> Txn1: INSERT (1,1)
> Txn2: INSERT (2,2)
> Txn3: DELETE (2,2)
> Txn4: UPDATE (1,1) → (1,2)
>
> If Txn4 is applied before Txn2 and Txn3, it will fail due to a unique
> constraint violation. To prevent this, track both RI and unique keys
> in the hash table. Compare keys of both old and new tuples to detect
> dependencies. Then old_tuple's RI needs to be compared, and new
> tuple's, both unique key and RI (new tuple's RI is required to detect
> some prior insertion with the same key) needs to be compared with
> existing hash table entries to identify transaction dependency.
>
> 3. Foreign Keys
> Consider FK constraints between tables. Example:
>
> TABLE owner(user_id INT PRIMARY KEY);
> TABLE car(car_name TEXT, user_id INT REFERENCES owner);
>
> Transactions:
> Txn1: INSERT INTO owner(1)
> Txn2: INSERT INTO car('bz', 1)
>
> Applying Txn2 before Txn1 will fail. To avoid this, check if FK values
> in new tuples match any RI or unique key in the hash table. If
> matched, treat the transaction as dependent.
>
> 4. Triggers and Constraints
> For the initial version, exclude tables with user-defined triggers or
> constraints from parallel apply due to complexity in dependency
> detection. We may need some parallel-apply-safe marking to allow this.
>
> Replication Progress Tracking
> -----------------------------------------
> Parallel apply introduces out-of-order commit application,
> complicating replication progress tracking. To handle restarts and
> ensure consistency:
>
> Track Three Key Metrics:
> lowest_remote_lsn: Starting point for applying transactions.
> highest_remote_lsn: Highest LSN that has been applied.
> list_remote_lsn: List of commit LSNs applied between the lowest and highest.
>
> Mechanism:
> Store these in ReplicationState: lowest_remote_lsn,
> highest_remote_lsn, list_remote_lsn. Flush these to disk during
> checkpoints similar to CheckPointReplicationOrigin.
>
> After Restart, Start from lowest_remote_lsn and for each transaction,
> if its commit LSN is in list_remote_lsn, skip it, otherwise, apply it.
> Once commit LSN > highest_remote_lsn, apply without checking the list.
>
> During apply, the leader maintains list_in_progress_xacts in the
> increasing commit order. On commit, update highest_remote_lsn. If
> commit LSN matches the first in-progress xact of
> list_in_progress_xacts, update lowest_remote_lsn, otherwise, add to
> list_remote_lsn. After commit, also remove it from the
> list_in_progress_xacts. We need to clean up entries below
> lowest_remote_lsn in list_remote_lsn while updating its value.
>
> To illustrate how this mechanism works, consider the following four
> transactions:
>
> Transaction ID Commit LSN
> 501 1000
> 502 1100
> 503 1200
> 504 1300
>
> Assume:
> Transactions 501 and 502 take longer to apply whereas transactions 503
> and 504 finish earlier. Parallel apply workers are assigned as
> follows:
> pa-1 → 501
> pa-2 → 502
> pa-3 → 503
> pa-4 → 504
>
> Initial state: list_in_progress_xacts = [501, 502, 503, 504]
>
> Step 1: Transaction 503 commits first and in RecordTransactionCommit,
> it updates highest_remote_lsn to 1200. In apply_handle_commit, since
> 503 is not the first in list_in_progress_xacts, add 1200 to
> list_remote_lsn. Remove 503 from list_in_progress_xacts.
> Step 2: Transaction 504 commits, Update highest_remote_lsn to 1300.
> Add 1300 to list_remote_lsn. Remove 504 from list_in_progress_xacts.
> ReplicationState now:
> lowest_remote_lsn = 0
> list_remote_lsn = [1200, 1300]
> highest_remote_lsn = 1300
> list_in_progress_xacts = [501, 502]
>
> Step 3: Transaction 501 commits. Since 501 is now the first in
> list_in_progress_xacts, update lowest_remote_lsn to 1000. Remove 501
> from list_in_progress_xacts. Clean up list_remote_lsn to remove
> entries < lowest_remote_lsn (none in this case).
> ReplicationState now:
> lowest_remote_lsn = 1000
> list_remote_lsn = [1200, 1300]
> highest_remote_lsn = 1300
> list_in_progress_xacts = [502]
>
> Step 4: System crash and restart
> Upon restart, Start replication from lowest_remote_lsn = 1000. First
> transaction encountered is 502, since it is not present in
> list_remote_lsn, apply it. As transactions 503 and 504 are present in
> list_remote_lsn, we skip them. Note that each transaction's
> end_lsn/commit_lsn has to be compared which the apply worker receives
> along with the first transaction command BEGIN. This ensures
> correctness and avoids duplicate application of already committed
> transactions.
>
> Upon restart, start replication from lowest_remote_lsn = 1000. First
> transaction encountered is 502 with commit LSN 1100, since it is not
> present in list_remote_lsn, apply it. As transactions 503 and 504's
> respective commit LSNs [1200, 1300] are present in list_remote_lsn, we
> skip them. This ensures correctness and avoids duplicate application
> of already committed transactions.
>
> Now, it is possible that some users may want to parallelize the
> transaction but still want to maintain commit order because they don't
> explicitly annotate FK, PK for columns but maintain the integrity via
> application. So, in such cases as we won't be able to detect
> transaction dependencies, it would be better to allow out-of-order
> commits optionally.
>
> Thoughts?
Here is the initial POC patch for this idea.
The basic implementation is outlined below. Please note that there are several
TODO items remaining, which we are actively working on; these are also detailed
further down.
The leader worker assigns each non-streaming transaction to a parallel apply
worker. Before dispatching changes to a parallel worker, the leader verifies if
the current modification affects the same row (identitied by replica identity
key) as another ongoing transaction. If so, the leader sends a list of dependent
transaction IDs to the parallel worker, indicating that the parallel apply
worker must wait for these transactions to commit before proceeding. Parallel
apply workers do not maintain commit order; transactions can be committed at any
time provided there are no dependencies.
Each parallel apply worker records the local end LSN of the transaction it
applies in shared memory. Subsequently, the leader gathers these local end LSNs
and logs them in the local 'lsn_mapping' for verifying whether they have been
flushed to disk (following the logic in get_flush_position()).
If no parallel apply worker is available, the leader will apply the transaction
independently.
For further details, please refer to the following:
The leader maintains a local hash table, using the remote change's replica
identity column values and relid as keys, with remote transaction IDs as values.
Before sending changes to the parallel apply worker, the leader computes a hash
using RI key values and the relid of the current change to search the hash
table. If an existing entry is found, the leader tells the parallel worker
to wait for the remote xid in the hash entry, after which the leader updates the
hash entry with the current xid.
If the remote relation lacks a replica identity (RI), it indicates that only
INSERT can be replicated for this table. In such cases, the leader skips
dependency checks, allowing the parallel apply worker to proceed with applying
changes without delay. This is because the only potential conflict could happen
is related to the local unique key or foreign key, which that is yet to be
implemented (see TODO - dependency on local unique key, foreign key.).
In cases of TRUNCATE or remote schema changes affecting the entire table, the
leader retrieves all remote xids touching the same table (via sequential scans
of the hash table) and tells the parallel worker to wait for those transactions
to commit.
Hash entries are cleaned up once the transaction corresponding to the remote xid
in the entry has been committed. Clean-up typically occurs when collecting the
flush position of each transaction, but is forced if the hash table exceeds a
set threshold.
If a transaction is relied upon by others, the leader adds its xid to a shared
hash table. The shared hash table entry is cleared by the parallel apply worker
upon completing the transaction. Workers needing to wait for a transaction check
the shared hash table entry; if present, they lock the transaction ID (using
pa_lock_transaction). If absent, it indicates the transaction has been
committed, negating the need to wait.
--
TODO - replication progress tracking for out of order commit.
TODO - dependency on local unique key, foreign key.
TODO - restrict user defined trigger and constraints.
TODO - enable the parallel apply optionally
TODO - potential improvement to use shared hash table for tracking dependencies.
--
The above TODO items are also included in the initial email[1].
Best Regards,
Hou zj
Attachment | Content-Type | Size |
---|---|---|
v1-0001-Parallel-apply-non-streaming-transactions.patch | application/octet-stream | 73.1 KB |
From | Date | Subject | |
---|---|---|---|
Next Message | vignesh C | 2025-08-13 10:56:58 | Re: Logical Replication of sequences |
Previous Message | shveta malik | 2025-08-13 10:45:41 | Re: Conflict detection for update_deleted in logical replication |