From 34ba1abca653de871ca2bc095e1535f1cb975835 Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Thu, 30 Apr 2026 15:24:20 +0800 Subject: [PATCH v18 4/7] Parallel apply non-streaming transactions -- Basic design -- The leader worker assigns each non-streaming transaction to a parallel apply worker. Before dispatching changes to a parallel worker, the leader verifies if the current modification affects the same row (identitied by replica identity key) as another ongoing transaction. If so, the leader sends a list of dependent transaction IDs to the parallel worker, indicating that the parallel apply worker must wait for these transactions to commit before proceeding. The leader preserves publisher commit order for all transactions by instructing the parallel worker to wait for the last transaction to commit before committing the current transaction. (Note that the leader itself does not wait the parallelized transaction to be committed unless partial serialization mode is active, where the parallel apply worker can no longer be reused.) See the "commit order" and "worker interaction" sections below for details. Tracking dependencies is necessary even when commit order is preserved. Consider two transactions: TX-1 (INSERT row 1) and TX-2 (DELETE row 1). If both are allowed to apply in parallel, TX-2's DELETE could be applied before TX-1's INSERT, resulting in a delete_missing conflict. Each parallel apply worker records the local end LSN of the transaction it applies in shared memory. Subsequently, the leader gathers these local end LSNs and logs them in the local 'lsn_mapping' for verifying whether they have been flushed to disk (following the logic in get_flush_position()). If no parallel apply worker is available, the leader will apply the transaction independently. For further details, please refer to the following: -- dedendency tracking -- The leader maintains a local hash table, using the remote change's replica identity column values and relid as keys, with remote transaction IDs as values. Before sending changes to the parallel apply worker, the leader computes a hash using RI key values and the relid of the current change to search the hash table. If an existing entry is found, the leader first updates the hash entry with the receiving remote xid then tells the parallel worker to wait for it. If the remote relation lacks a replica identity (RI), it indicates that only INSERT can be replicated for this table. In such cases, the leader skips dependency checks, allowing the parallel apply worker to proceed with applying changes without delay. This is because the only potential conflict could happen is related to the local unique key or foreign key, which that is yet to be implemented. In cases of TRUNCATE or remote schema changes affecting the entire table, the leader retrieves all remote xids touching the same table (via sequential scans of the hash table) and tells the parallel worker to wait for those transactions to commit. Hash entries are cleaned up once the transaction corresponding to the remote xid in the entry has been committed. Clean-up typically occurs when collecting the flush position of each transaction, but is forced if the hash table exceeds a set threshold. -- dedendency waiting -- If a transaction is relied upon by others, the leader adds its xid to a shared hash table. The shared hash table entry is cleared by the parallel apply worker upon completing the transaction. Workers needing to wait for a transaction check the shared hash table entry; if present, they lock the transaction ID (using pa_lock_transaction). If absent, it indicates the transaction has been committed, negating the need to wait. -- commit order -- We preserve publisher commit order for all transactions for two reasons: 1) User-visible consistency Out-of-order commits can expose states on the subscriber that were never visible on the publisher. For example, suppose a user updates table A and then updates table B on the publisher. If the subscriber commits those transactions out of order, a query that sees the latest row in B might still see stale data in A. Although eventual consistency would still be reached, that behavior may be unacceptable for some users. In the future, we could provide a GUC to allow out-of-order commits for users who prefer higher parallelism. 2) Replication progress tracking We currently track replication progress using transaction commit LSNs. With out-of-order commits, this becomes ambiguous after failures. For example, if TX-2 is applied before TX-1 and replication stops due to an error, we cannot reliably determine whether TX-1 was applied before restart. As a result, transactions that were already committed on the subscriber may be replayed. -- worker interaction -- After sending the COMMIT message for a transaction, the leader apply worker does not wait for the parallel apply worker to finish applying that transaction. Instead, it sends a PA_MSG_XACT_DEPENDENCY message to the parallel apply worker, instructing it to wait for the last transaction to commit. This allows the leader to remain busy receiving and dispatching changes to more parallel apply workers, enabling greater parallelism in transaction application. Author: Zhijie Hou Author: Hayato Kuroda --- .../replication/logical/applyparallelworker.c | 348 +++++++++++++++-- src/backend/replication/logical/proto.c | 38 ++ src/backend/replication/logical/relation.c | 31 ++ src/backend/replication/logical/worker.c | 358 ++++++++++++++++-- src/include/replication/logicalproto.h | 2 + src/include/replication/logicalrelation.h | 2 + src/include/replication/worker_internal.h | 10 +- src/test/subscription/meson.build | 1 + src/test/subscription/t/001_rep_changes.pl | 2 + src/test/subscription/t/010_truncate.pl | 2 +- src/test/subscription/t/015_stream.pl | 8 +- src/test/subscription/t/026_stats.pl | 1 + src/test/subscription/t/027_nosuperuser.pl | 1 + src/test/subscription/t/050_parallel_apply.pl | 130 +++++++ src/tools/pgindent/typedefs.list | 2 + 15 files changed, 858 insertions(+), 78 deletions(-) create mode 100644 src/test/subscription/t/050_parallel_apply.pl diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index c290e15b75d..1e8adbd0b87 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -14,6 +14,9 @@ * ParallelApplyWorkerInfo which is required so the leader worker and parallel * apply workers can communicate with each other. * + * Streaming transactions + * ====================== + * * The parallel apply workers are assigned (if available) as soon as xact's * first stream is received for subscriptions that have set their 'streaming' * option as parallel. The leader apply worker will send changes to this new @@ -36,12 +39,9 @@ * its information is added to the ParallelApplyWorkerPool. Once the worker * finishes applying the transaction, it is marked as available for re-use. * Now, before starting a new worker to apply the streaming transaction, we - * check the list for any available worker. Note that we retain a maximum of - * half the max_parallel_apply_workers_per_subscription workers in the pool and - * after that, we simply exit the worker after applying the transaction. - * - * XXX This worker pool threshold is arbitrary and we can provide a GUC - * variable for this in the future if required. + * check the list for any available worker. We do not stop workers in the pool + * unless partial of the changes of the transaction are serialized due to a send + * timeout. * * The leader apply worker will create a separate dynamic shared memory segment * when each parallel apply worker starts. The reason for this design is that @@ -152,6 +152,75 @@ * session-level locks because both locks could be acquired outside the * transaction, and the stream lock in the leader needs to persist across * transaction boundaries i.e. until the end of the streaming transaction. + * + * Non-streaming transactions + * ====================== + * The handling is similar to streaming transactions, but including few + * differences: + * + * Transaction dependency + * ---------------------- + * Before dispatching changes to a parallel worker, the leader verifies if the + * current modification affects the same row (identitied by replica identity + * key) as another ongoing transaction (see handle_dependency_on_change for + * details). If so, the leader sends a list of dependent transaction IDs to the + * parallel worker, indicating that the parallel apply worker must wait for + * these transactions to commit before proceeding. + * + * Tracking dependencies is necessary even when commit order is preserved. + * Consider two transactions: TX-1 (INSERT row 1) and TX-2 (DELETE row 1). If + * both are allowed to apply in parallel, TX-2's DELETE could be applied before + * TX-1's INSERT, resulting in a delete_missing conflict. + * + * Commit order + * ------------ + * We preserve publisher commit order for all transactions for two reasons: + * + * 1) User-visible consistency + * + * Out-of-order commits can expose states on the subscriber that were never + * visible on the publisher. + * + * For example, suppose a user updates table A and then updates table B on the + * publisher. If the subscriber commits those transactions out of order, a + * query that sees the latest row in B might still see stale data in A. + * Although eventual consistency would still be reached, that behavior may be + * unacceptable for some users. In the future, we could provide a subscription + * option to allow out-of-order commits for users who prefer higher parallelism. + * + * 2) Replication progress tracking + * + * We currently track replication progress using the last transaction's commit + * LSNs. With out-of-order commits, this becomes ambiguous after failures. + * + * For example, if TX-2 is applied before TX-1 and replication stops due to an + * error, we cannot reliably determine whether TX-1 was applied before restart. + * As a result, transactions that were already committed on the subscriber may + * be replayed. + * + * Worker interaction + * ------------ + * After sending the COMMIT message for a transaction, the leader apply worker + * does not wait for the parallel apply worker to finish applying that + * transaction. Instead, it sends a PA_MSG_XACT_DEPENDENCY message to + * the parallel apply worker, instructing it to wait for the last transaction to + * commit. This allows the leader to remain busy receiving and dispatching + * changes to more parallel apply workers, enabling greater parallelism in + * transaction application. + * + * Locking considerations + * ---------------------- + * When handling a PA_MSG_RELMAP message, the worker attempts + * to acquire the transaction lock of the depended transaction and releases it + * immediately after acquisition (see pa_wait_for_depended_transaction). This + * allows deadlock detection when one worker (either leader or parallel apply + * worker) is waiting for a dependency on a transaction being applied by another + * worker, while that other worker is also blocked by a lock held by the first + * worker. + * + * The lock graph for the above example will look as follows: Worker_1 (waiting + * for depended transaction to finish) -> Worker_2 (waiting to acquire a + * relation lock) -> Worker_1 *------------------------------------------------------------------------- */ @@ -307,6 +376,7 @@ static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared); static PartialFileSetState pa_get_fileset_state(void); static void pa_attach_parallelized_txn_hash(dsa_handle *pa_dsa_handle, dshash_table_handle *pa_dshash_handle); +static void write_internal_relation(StringInfo s, LogicalRepRelation *rel); /* * Returns true if it is OK to start a parallel apply worker, false otherwise. @@ -424,6 +494,7 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo) shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared)); SpinLockInit(&shared->mutex); + shared->xid = InvalidTransactionId; shared->xact_state = PARALLEL_TRANS_UNKNOWN; pg_atomic_init_u32(&(shared->pending_stream_count), 0); shared->last_commit_end = InvalidXLogRecPtr; @@ -467,6 +538,8 @@ pa_launch_parallel_worker(void) MemoryContext oldcontext; bool launched; ParallelApplyWorkerInfo *winfo; + dsa_handle pa_dsa_handle; + dshash_table_handle pa_dshash_handle; ListCell *lc; /* Try to get an available parallel apply worker from the worker pool. */ @@ -476,8 +549,36 @@ pa_launch_parallel_worker(void) if (!winfo->in_use) return winfo; + + /* + * The leader does not explicitly free workers that handle non-streaming + * transactions, so we check whether the transaction has committed, and + * if so, reuse the worker. + */ + if (!winfo->stream_txn && + pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED) + { + /* + * Save the local commit LSN of the last transaction applied by + * this worker before reusing it for another transaction. This WAL + * position is crucial for determining the flush position in + * responses to the publisher (see get_flush_position()). + */ + (void) pa_get_last_commit_end(winfo->shared->xid, false, NULL); + return winfo; + } } + pa_attach_parallelized_txn_hash(&pa_dsa_handle, &pa_dshash_handle); + + /* + * Return if the number of parallel apply workers has reached the maximum + * limit. + */ + if (list_length(ParallelApplyWorkerPool) == + max_parallel_apply_workers_per_subscription) + return NULL; + /* * Start a new parallel apply worker. * @@ -505,18 +606,50 @@ pa_launch_parallel_worker(void) dsm_segment_handle(winfo->dsm_seg), false); - if (launched) - { - ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo); - } - else + if (!launched) { + MemoryContextSwitchTo(oldcontext); pa_free_worker_info(winfo); - winfo = NULL; + return NULL; } + ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo); + MemoryContextSwitchTo(oldcontext); + /* + * Send all existing remote relation information to the parallel apply + * worker. This allows the parallel worker to initialize the + * LogicalRepRelMapEntry locally before applying remote changes. This is + * needed since the walsender does not send remote relation information + * with every transaction. + */ + if (logicalrep_get_num_rels()) + { + StringInfoData out; + shm_mq_result result; + + initStringInfo(&out); + + write_internal_relation(&out, NULL); + + /* + * Send relation information synchronously rather than using + * non-blocking send (pa_send_data). This avoids the need to serialize + * the data, which would add complexity for the parallel apply worker to + * deserialize outside of transaction application. + * + * This is safe from deadlocks because a newly started worker does not + * hold any strong locks when processing relation information. + */ + result = shm_mq_send(winfo->mq_handle, out.len, out.data, false, true); + + if (result != SHM_MQ_SUCCESS) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not send remote relation information to the logical replication parallel apply worker")); + } + return winfo; } @@ -621,25 +754,27 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo) { Assert(!am_parallel_apply_worker()); Assert(winfo->in_use); - Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED); + Assert(!winfo->stream_txn || + pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED); if (!hash_search(ParallelApplyTxnHash, &winfo->shared->xid, HASH_REMOVE, NULL)) elog(ERROR, "hash table corrupted"); /* - * Stop the worker if there are enough workers in the pool. + * XXX we stop the worker if the leader apply worker serialize part of the + * transaction data due to a send timeout. This is because the message could + * be partially written to the queue and there is no way to clean the queue + * other than resending the message until it succeeds. Instead of trying to + * send the data which anyway would have been serialized and then letting + * the parallel apply worker deal with the spurious message, we stop the + * worker. * - * XXX Additionally, we also stop the worker if the leader apply worker - * serialize part of the transaction data due to a send timeout. This is - * because the message could be partially written to the queue and there - * is no way to clean the queue other than resending the message until it - * succeeds. Instead of trying to send the data which anyway would have - * been serialized and then letting the parallel apply worker deal with - * the spurious message, we stop the worker. + * For other cases, we do not stop workers once started. All transactions + * (whether streamed or not) are assigned to parallel apply workers, so + * restarting workers frequently would only increase CPU overhead and slow + * down the leader's ability to dispatch changes to workers. */ - if (winfo->serialize_changes || - list_length(ParallelApplyWorkerPool) > - (max_parallel_apply_workers_per_subscription / 2)) + if (winfo->serialize_changes) { logicalrep_pa_worker_stop(winfo); pa_free_worker_info(winfo); @@ -772,15 +907,17 @@ pa_process_spooled_messages_if_required(void) /* * Get the local end LSN for a transaction applied by a parallel apply worker. * + * Set delete_entry to true if you intend to remove the transaction from the + * ParallelApplyTxnHash after collecting its LSN. Otherwise, the local end LSN + * is copied from shared memory into the local entry of ParallelApplyTxnHash for + * later collection by the leader. + * * If the parallel apply worker did not write any changes during the transaction * application due to situations like update/delete_missing or a before trigger, * the *skipped_write will be set to true. - * - * Once the LSN is retrieved, the transaction entry is removed from - * ParallelApplyTxnHash. */ XLogRecPtr -pa_get_last_commit_end(TransactionId xid, bool *skipped_write) +pa_get_last_commit_end(TransactionId xid, bool delete_entry, bool *skipped_write) { bool found; ParallelApplyWorkerEntry *entry; @@ -806,7 +943,8 @@ pa_get_last_commit_end(TransactionId xid, bool *skipped_write) winfo = entry->winfo; if (winfo == NULL) { - if (!hash_search(ParallelApplyTxnHash, &xid, HASH_REMOVE, NULL)) + if (delete_entry && + !hash_search(ParallelApplyTxnHash, &xid, HASH_REMOVE, NULL)) elog(ERROR, "hash table corrupted"); if (skipped_write) @@ -829,7 +967,8 @@ pa_get_last_commit_end(TransactionId xid, bool *skipped_write) elog(DEBUG1, "store local commit %X/%X end to txn entry: %u", LSN_FORMAT_ARGS(entry->local_end), xid); - if (!hash_search(ParallelApplyTxnHash, &xid, HASH_REMOVE, NULL)) + if (delete_entry && + !hash_search(ParallelApplyTxnHash, &xid, HASH_REMOVE, NULL)) elog(ERROR, "hash table corrupted"); return entry->local_end; @@ -1082,6 +1221,9 @@ pa_shutdown(int code, Datum arg) INVALID_PROC_NUMBER); dsm_detach((dsm_segment *) DatumGetPointer(arg)); + + if (parallel_apply_dsa_area) + dsa_detach(parallel_apply_dsa_area); } /* @@ -1393,7 +1535,6 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) shm_mq_result result; TimestampTz startTime = 0; - Assert(!IsTransactionState()); Assert(!winfo->serialize_changes); /* @@ -1445,6 +1586,58 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) } } +/* + * Distribute remote relation information to all active parallel apply workers + * that require it. + */ +void +pa_distribute_schema_changes_to_workers(LogicalRepRelation *rel) +{ + StringInfoData out; + + /* Only leader apply workers can distribute schema changes */ + if (!am_leader_apply_worker()) + return; + + /* Quick exit if there are no parallel apply workers */ + if (!ParallelApplyWorkerPool) + return; + + initStringInfo(&out); + + write_internal_relation(&out, rel); + + foreach_ptr(ParallelApplyWorkerInfo, winfo, ParallelApplyWorkerPool) + { + /* + * Skip the worker responsible for the current transaction, as the + * relation information has already been sent to it. + */ + if (winfo == stream_apply_worker) + continue; + + /* + * Skip the worker that is in serialize mode, as they will soon stop + * once they finish applying the transaction. + */ + if (winfo->serialize_changes) + continue; + + elog(DEBUG1, "distributing schema changes to pa workers"); + + if (pa_send_data(winfo, out.len, out.data)) + continue; + + /* + * TODO: Support switching to PARTIAL_SERIALIZE mode when the send + * buffer becomes full. + */ + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not send data to the logical replication parallel apply worker for subscription")); + } +} + /* * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means * that the current data and any subsequent data for this transaction will be @@ -1861,25 +2054,41 @@ pa_decr_and_wait_stream_block(void) void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn) { + XLogRecPtr local_lsn = InvalidXLogRecPtr; + TransactionId pa_remote_xid = winfo->shared->xid; + Assert(am_leader_apply_worker()); /* - * Unlock the shared object lock so that parallel apply worker can - * continue to receive and apply changes. + * Unlock the shared object lock taken for streaming transactions so that + * parallel apply worker can continue to receive and apply changes. */ - pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock); + if (winfo->stream_txn) + pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock); /* - * Wait for that worker to finish. This is necessary to maintain commit - * order which avoids failures due to transaction dependencies and - * deadlocks. + * Wait for that worker for streaming transaction to finish. This is + * necessary to maintain commit order which avoids failures due to + * transaction dependencies and deadlocks. + * + * For transactions in partial seralize mode, wait for stop as well as the + * worker is anyway cannot be reused anymore (see pa_free_worker() for + * details). */ - pa_wait_for_xact_finish(winfo); + if (winfo->serialize_changes || winfo->stream_txn) + { + pa_wait_for_xact_finish(winfo); + + local_lsn = winfo->shared->last_commit_end; + pa_remote_xid = InvalidTransactionId; + + pa_free_worker(winfo); + } if (XLogRecPtrIsValid(remote_lsn)) - store_flush_position(remote_lsn, winfo->shared->last_commit_end); + store_flush_position(remote_lsn, local_lsn, pa_remote_xid); - pa_free_worker(winfo); + pa_set_stream_apply_worker(NULL); } /* @@ -2013,3 +2222,62 @@ pa_wait_for_depended_transaction(TransactionId xid) elog(DEBUG1, "finish waiting for depended xid %u", xid); } + +/* + * Mark the transaction state as finished and remove the shared hash entry. + */ +void +pa_commit_transaction(void) +{ + TransactionId xid = MyParallelShared->xid; + + SpinLockAcquire(&MyParallelShared->mutex); + MyParallelShared->xact_state = PARALLEL_TRANS_FINISHED; + SpinLockRelease(&MyParallelShared->mutex); + + dshash_delete_key(parallelized_txns, &xid); + elog(DEBUG1, "xid %u committed", xid); +} + +/* + * Write internal relation description to the output stream. + */ +static void +write_internal_relation(StringInfo s, LogicalRepRelation *rel) +{ + pq_sendbyte(s, LOGICAL_REP_MSG_INTERNAL_MESSAGE); + pq_sendbyte(s, PA_MSG_RELMAP); + + if (rel) + { + pq_sendint(s, 1, 4); + logicalrep_write_internal_rel(s, rel); + } + else + { + pq_sendint(s, logicalrep_get_num_rels(), 4); + logicalrep_write_all_rels(s); + } +} + +/* + * Register a transaction to the shared hash table. + * + * This function is called by the leader during the commit phase of non-streamed + * transactions. The parallel apply worker that applies the transaction will + * remove it from the hash table upon completion. + */ +void +pa_add_parallelized_transaction(TransactionId xid) +{ + bool found; + ParallelizedTxnEntry *txn_entry; + + Assert(parallelized_txns); + Assert(TransactionIdIsValid(xid)); + Assert(am_leader_apply_worker()); + + txn_entry = dshash_find_or_insert(parallelized_txns, &xid, &found); + + dshash_release_lock(parallelized_txns, txn_entry); +} diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index a0cac05cdcd..eb1a846ce0c 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -691,6 +691,44 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, logicalrep_write_attrs(out, rel, columns, include_gencols_type); } +/* + * Write internal relation description to the output stream. + */ +void +logicalrep_write_internal_rel(StringInfo out, LogicalRepRelation *rel) +{ + pq_sendint32(out, rel->remoteid); + + /* Write relation name */ + pq_sendstring(out, rel->nspname); + pq_sendstring(out, rel->relname); + + /* Write the replica identity. */ + pq_sendbyte(out, rel->replident); + + /* Write attribute description */ + pq_sendint16(out, rel->natts); + + for (int i = 0; i < rel->natts; i++) + { + uint8 flags = 0; + + if (bms_is_member(i, rel->attkeys)) + flags |= LOGICALREP_IS_REPLICA_IDENTITY; + + pq_sendbyte(out, flags); + + /* attribute name */ + pq_sendstring(out, rel->attnames[i]); + + /* attribute type id */ + pq_sendint32(out, rel->atttyps[i]); + + /* ignore attribute mode for now */ + pq_sendint32(out, 0); + } +} + /* * Read the relation info from stream and return as LogicalRepRelation. */ diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index ab6313eb1bc..e1ce183dfd3 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -961,6 +961,37 @@ FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel, return InvalidOid; } +/* + * Get the number of entries in the LogicalRepRelMap. + */ +int +logicalrep_get_num_rels(void) +{ + if (LogicalRepRelMap == NULL) + return 0; + + return hash_get_num_entries(LogicalRepRelMap); +} + +/* + * Write all the remote relation information from the LogicalRepRelMapEntry to + * the output stream. + */ +void +logicalrep_write_all_rels(StringInfo out) +{ + LogicalRepRelMapEntry *entry; + HASH_SEQ_STATUS status; + + if (LogicalRepRelMap == NULL) + return; + + hash_seq_init(&status, LogicalRepRelMap); + + while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) + logicalrep_write_internal_rel(out, &entry->remoterel); +} + /* * Get the LogicalRepRelMapEntry corresponding to the given relid without * opening the local relation. diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 58920b746f7..64dca6aa9ba 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -290,6 +290,7 @@ #include "tcop/tcopprot.h" #include "utils/acl.h" #include "utils/guc.h" +#include "utils/injection_point.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -514,6 +515,8 @@ static List *on_commit_wakeup_workers_subids = NIL; bool in_remote_transaction = false; static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; +static TransactionId remote_xid = InvalidTransactionId; +static TransactionId last_parallelized_remote_xid = InvalidTransactionId; /* fields valid only when processing streamed transaction */ static bool in_streamed_transaction = false; @@ -722,6 +725,7 @@ static void on_exit_clear_xact_state(int code, Datum arg); static void send_internal_dependencies(ParallelApplyWorkerInfo *winfo, List *depends_on_xids); +static void build_dependency_with_last_committed_txn(ParallelApplyWorkerInfo *winfo); /* * Compute the hash value for entries in the replica_identity_table. @@ -837,7 +841,7 @@ cleanup_committed_replica_identity_entries(void) XLogRecPtrIsValid(pos->local_end)) continue; - pos->local_end = pa_get_last_commit_end(pos->pa_remote_xid, + pos->local_end = pa_get_last_commit_end(pos->pa_remote_xid, false, &skipped_write); elog(DEBUG1, @@ -1411,14 +1415,14 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) TransApplyAction apply_action; StringInfoData original_msg; + if (!in_streamed_transaction) + return false; + apply_action = get_transaction_apply_action(stream_xid, &winfo); /* not in streaming mode */ if (apply_action == TRANS_LEADER_APPLY) - { - handle_dependency_on_change(action, s, InvalidTransactionId, winfo); return false; - } Assert(TransactionIdIsValid(stream_xid)); @@ -1493,6 +1497,73 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) } } +/* + * Handle non-streaming transactions when parallel apply is in use. + * + * This function runs only in the leader apply worker while processing a remote + * transaction. It checks whether the current change has dependencies on + * preceding parallelized transactions and decides whether to send the change to + * a parallel apply worker. + * + * Returns true if the change has been dispatched to a parallel worker, + * indicating the leader does not need to apply it directly. Returns false + * otherwise. + * + * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION or + * LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent to a + * parallel apply worker. + */ +static bool +handle_parallelized_transaction(LogicalRepMsgType action, StringInfo s) +{ + ParallelApplyWorkerInfo *winfo; + TransApplyAction apply_action; + + /* + * Dependency checking for non-streaming transactions is only required in + * the leader apply worker during a remote transaction. + */ + if (!in_remote_transaction || !am_leader_apply_worker()) + return false; + + apply_action = get_transaction_apply_action(remote_xid, &winfo); + + /* not assigned to parallel apply worker, apply in leader */ + if (apply_action == TRANS_LEADER_APPLY) + { + handle_dependency_on_change(action, s, InvalidTransactionId, winfo); + return false; + } + + Assert(TransactionIdIsValid(remote_xid)); + + handle_dependency_on_change(action, s, remote_xid, winfo); + + switch (apply_action) + { + case TRANS_LEADER_SEND_TO_PARALLEL: + Assert(winfo); + + /* Always update relation and type cache in leader apply worker */ + if (pa_send_data(winfo, s->len, s->data)) + return (action != LOGICAL_REP_MSG_RELATION && + action != LOGICAL_REP_MSG_TYPE); + + /* + * TODO: Support switching to PARTIAL_SERIALIZE mode when the send + * buffer becomes full. + */ + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not send data to the logical replication parallel apply worker")); + return false; /* silence compiler warning */ + + default: + elog(ERROR, "unexpected apply action: %d", (int) apply_action); + return false; /* silence compiler warning */ + } +} + /* * Executor state preparation for evaluation of constraint expressions, * indexes and triggers for the specified relation. @@ -1844,17 +1915,61 @@ static void apply_handle_begin(StringInfo s) { LogicalRepBeginData begin_data; + ParallelApplyWorkerInfo *winfo; + TransApplyAction apply_action; /* There must not be an active streaming transaction. */ Assert(!TransactionIdIsValid(stream_xid)); logicalrep_read_begin(s, &begin_data); - set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn); + + remote_xid = begin_data.xid; + + set_apply_error_context_xact(remote_xid, begin_data.final_lsn); remote_final_lsn = begin_data.final_lsn; maybe_start_skipping_changes(begin_data.final_lsn); + pa_allocate_worker(remote_xid, false); + + apply_action = get_transaction_apply_action(remote_xid, &winfo); + + elog(DEBUG1, "new remote_xid %u", remote_xid); + switch (apply_action) + { + case TRANS_LEADER_APPLY: + break; + + case TRANS_LEADER_SEND_TO_PARALLEL: + Assert(winfo); + + if (pa_send_data(winfo, s->len, s->data)) + { + pa_set_stream_apply_worker(winfo); + break; + } + + /* + * TODO: Support switching to PARTIAL_SERIALIZE mode when the send + * buffer becomes full. + */ + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not send data to the logical replication parallel apply worker")); + break; + + case TRANS_PARALLEL_APPLY: + /* Hold the lock until the end of the transaction. */ + pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock); + pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED); + break; + + default: + elog(ERROR, "unexpected apply action: %d", (int) apply_action); + break; + } + in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); @@ -1877,12 +1992,33 @@ send_internal_dependencies(ParallelApplyWorkerInfo *winfo, List *depends_on_xids foreach_xid(xid, depends_on_xids) pq_sendint32(&dependencies, xid); - if (pa_send_data(winfo, dependencies.len, dependencies.data)) + /* + * TODO: Support switching to PARTIAL_SERIALIZE mode when the send + * buffer becomes full. + */ + if (!pa_send_data(winfo, dependencies.len, dependencies.data)) ereport(ERROR, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("could not send data to the logical replication parallel apply worker")); } +/* + * Make a dependency between this and the lastly committed transaction. + * + * Sends an INTERNAL_DEPENDENCY message to the parallel apply worker, + * instructing it to wait for the last committed transaction to finish before + * committing its own, thereby preserving commit order. + */ +static void +build_dependency_with_last_committed_txn(ParallelApplyWorkerInfo *winfo) +{ + /* Skip if transactions have not been applied yet */ + if (!TransactionIdIsValid(last_parallelized_remote_xid)) + return; + + send_internal_dependencies(winfo, list_make1_xid(last_parallelized_remote_xid)); +} + /* * Handle COMMIT message. * @@ -1892,6 +2028,8 @@ static void apply_handle_commit(StringInfo s) { LogicalRepCommitData commit_data; + ParallelApplyWorkerInfo *winfo; + TransApplyAction apply_action; logicalrep_read_commit(s, &commit_data); @@ -1902,7 +2040,92 @@ apply_handle_commit(StringInfo s) LSN_FORMAT_ARGS(commit_data.commit_lsn), LSN_FORMAT_ARGS(remote_final_lsn)))); - apply_handle_commit_internal(&commit_data); + apply_action = get_transaction_apply_action(remote_xid, &winfo); + + switch (apply_action) + { + case TRANS_LEADER_APPLY: + + /* + * Apart from parallelized transactions, we do not have to + * register this transaction to parallelized_txns. The commit + * ordering is always preserved. + */ + + /* Wait until the last transaction finishes */ + if (TransactionIdIsValid(last_parallelized_remote_xid)) + { + pa_wait_for_depended_transaction(last_parallelized_remote_xid); + last_parallelized_remote_xid = InvalidTransactionId; + } + + apply_handle_commit_internal(&commit_data); + + break; + + case TRANS_LEADER_SEND_TO_PARALLEL: + Assert(winfo); + + /* + * Mark this transaction as parallelized. This ensures that + * upcoming transactions wait until this transaction is committed. + */ + pa_add_parallelized_transaction(remote_xid); + + /* + * Build a dependency between this transaction and the lastly + * committed transaction to preserve the commit order. + */ + build_dependency_with_last_committed_txn(winfo); + + if (pa_send_data(winfo, s->len, s->data)) + { + /* Cache the remote_xid */ + last_parallelized_remote_xid = remote_xid; + + /* Finish processing the transaction. */ + pa_xact_finish(winfo, commit_data.end_lsn); + break; + } + + /* + * TODO: Support switching to PARTIAL_SERIALIZE mode when the send + * buffer becomes full. + */ + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not send data to the logical replication parallel apply worker")); + break; + + case TRANS_PARALLEL_APPLY: + + /* + * If the parallel apply worker is applying spooled messages then + * close the file before committing. + */ + if (stream_fd) + stream_close_file(); + + INJECTION_POINT("parallel-worker-before-commit", NULL); + + apply_handle_commit_internal(&commit_data); + + MyParallelShared->last_commit_end = XactLastCommitEnd; + + pa_commit_transaction(); + + pa_unlock_transaction(remote_xid, AccessExclusiveLock); + break; + + default: + elog(ERROR, "unexpected apply action: %d", (int) apply_action); + break; + } + + remote_xid = InvalidTransactionId; + in_remote_transaction = false; + + elog(DEBUG1, "reset remote_xid %u", remote_xid); /* * Process any tables that are being synchronized in parallel, as well as @@ -2025,7 +2248,8 @@ apply_handle_prepare(StringInfo s) * XactLastCommitEnd, and adding it for this purpose doesn't seems worth * it. */ - store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr); + store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr, + InvalidTransactionId); in_remote_transaction = false; @@ -2085,7 +2309,8 @@ apply_handle_commit_prepared(StringInfo s) CommitTransactionCommand(); pgstat_report_stat(false); - store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); + store_flush_position(prepare_data.end_lsn, XactLastCommitEnd, + InvalidTransactionId); in_remote_transaction = false; /* @@ -2154,7 +2379,8 @@ apply_handle_rollback_prepared(StringInfo s) * transaction because we always flush the WAL record for it. See * apply_handle_prepare. */ - store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr); + store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr, + InvalidTransactionId); in_remote_transaction = false; /* @@ -2216,7 +2442,8 @@ apply_handle_stream_prepare(StringInfo s) * It is okay not to set the local_end LSN for the prepare because * we always flush the prepare record. See apply_handle_prepare. */ - store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr); + store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr, + InvalidTransactionId); in_remote_transaction = false; @@ -2408,8 +2635,17 @@ apply_handle_stream_start(StringInfo s) set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr); - /* Try to allocate a worker for the streaming transaction. */ - if (first_segment) + /* + * Try to allocate a worker for the streaming transaction. + * + * TODO: Support assigning streaming transactions to parallel apply workers + * even when non-streaming transactions are running and better dependency + * handling between streaming and non-streaming transactions. + */ + if (first_segment && + am_leader_apply_worker() && + (!TransactionIdIsValid(last_parallelized_remote_xid) || + pa_transaction_committed(last_parallelized_remote_xid))) pa_allocate_worker(stream_xid, true); apply_action = get_transaction_apply_action(stream_xid, &winfo); @@ -2924,8 +3160,7 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, int fileno; pgoff_t offset; - if (!am_parallel_apply_worker()) - maybe_start_skipping_changes(lsn); + maybe_start_skipping_changes(lsn); /* Make sure we have an open transaction */ begin_replication_step(); @@ -3195,7 +3430,8 @@ apply_handle_commit_internal(LogicalRepCommitData *commit_data) pgstat_report_stat(false); - store_flush_position(commit_data->end_lsn, XactLastCommitEnd); + store_flush_position(commit_data->end_lsn, XactLastCommitEnd, + InvalidTransactionId); } else { @@ -3220,7 +3456,8 @@ apply_handle_relation(StringInfo s) { LogicalRepRelation *rel; - if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s)) + if (handle_parallelized_transaction(LOGICAL_REP_MSG_RELATION, s) || + handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s)) return; rel = logicalrep_read_rel(s); @@ -3228,6 +3465,8 @@ apply_handle_relation(StringInfo s) /* Also reset all entries in the partition map that refer to remoterel. */ logicalrep_partmap_reset_relmap(rel); + + pa_distribute_schema_changes_to_workers(rel); } /* @@ -3243,7 +3482,8 @@ apply_handle_type(StringInfo s) { LogicalRepTyp typ; - if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s)) + if (handle_parallelized_transaction(LOGICAL_REP_MSG_TYPE, s) || + handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s)) return; logicalrep_read_typ(s, &typ); @@ -3303,6 +3543,7 @@ apply_handle_insert(StringInfo s) * streamed transactions. */ if (is_skipping_changes() || + handle_parallelized_transaction(LOGICAL_REP_MSG_INSERT, s) || handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s)) return; @@ -3463,6 +3704,7 @@ apply_handle_update(StringInfo s) * streamed transactions. */ if (is_skipping_changes() || + handle_parallelized_transaction(LOGICAL_REP_MSG_UPDATE, s) || handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s)) return; @@ -3682,6 +3924,7 @@ apply_handle_delete(StringInfo s) * streamed transactions. */ if (is_skipping_changes() || + handle_parallelized_transaction(LOGICAL_REP_MSG_DELETE, s) || handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s)) return; @@ -4318,6 +4561,7 @@ apply_handle_truncate(StringInfo s) * streamed transactions. */ if (is_skipping_changes() || + handle_parallelized_transaction(LOGICAL_REP_MSG_TRUNCATE, s) || handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s)) return; @@ -4548,6 +4792,10 @@ apply_dispatch(StringInfo s) * check which entries on it are already locally flushed. Those we can report * as having been flushed. * + * For non-streaming transactions managed by a parallel apply worker, we will + * get the local commit end from the shared parallel apply worker info once the + * transaction has been committed by the worker. + * * The have_pending_txes is true if there are outstanding transactions that * need to be flushed. */ @@ -4557,6 +4805,7 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, { dlist_mutable_iter iter; XLogRecPtr local_flush = GetFlushRecPtr(NULL); + List *committed_pa_xid = NIL; *write = InvalidXLogRecPtr; *flush = InvalidXLogRecPtr; @@ -4566,6 +4815,40 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, FlushPosition *pos = dlist_container(FlushPosition, node, iter.cur); + /* + * If the transaction was assigned to a parallel apply worker, attempt + * to retrieve its commit end LSN from that worker. + */ + if (TransactionIdIsValid(pos->pa_remote_xid) && + !XLogRecPtrIsValid(pos->local_end)) + { + bool skipped_write; + + pos->local_end = pa_get_last_commit_end(pos->pa_remote_xid, true, + &skipped_write); + + elog(DEBUG1, + "got commit end from parallel apply worker, " + "txn: %u, remote_end %X/%X, local_end %X/%X", + pos->pa_remote_xid, LSN_FORMAT_ARGS(pos->remote_end), + LSN_FORMAT_ARGS(pos->local_end)); + + /* + * Break the loop if the worker has not finished applying the + * transaction. There's no need to check subsequent transactions, + * as they must commit after the current transaction being + * examined and thus won't have their commit end available yet. + */ + if (!skipped_write && XLogRecPtrIsInvalid(pos->local_end)) + break; + + committed_pa_xid = lappend_xid(committed_pa_xid, pos->pa_remote_xid); + } + + /* + * Worker has finished applying or the transaction was applied in the + * leader apply worker. + */ *write = pos->remote_end; if (pos->local_end <= local_flush) @@ -4574,29 +4857,22 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, dlist_delete(iter.cur); pfree(pos); } - else - { - /* - * Don't want to uselessly iterate over the rest of the list which - * could potentially be long. Instead get the last element and - * grab the write position from there. - */ - pos = dlist_tail_element(FlushPosition, node, - &lsn_mapping); - *write = pos->remote_end; - *have_pending_txes = true; - return; - } } *have_pending_txes = !dlist_is_empty(&lsn_mapping); + + delete_replica_identity_entries_for_txns(committed_pa_xid); } /* * Store current remote/local lsn pair in the tracking list. + * + * pa_remote_xid should be a valid transaction ID only when the transaction was + * assigned to a parallel apply worker; otherwise, pass InvalidTransactionId. */ void -store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn) +store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn, + TransactionId pa_remote_xid) { FlushPosition *flushpos; @@ -4614,7 +4890,7 @@ store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn) flushpos = palloc_object(FlushPosition); flushpos->local_end = local_lsn; flushpos->remote_end = remote_lsn; - flushpos->pa_remote_xid = InvalidTransactionId; + flushpos->pa_remote_xid = pa_remote_xid; dlist_push_tail(&lsn_mapping, &flushpos->node); MemoryContextSwitchTo(ApplyMessageContext); @@ -6739,6 +7015,22 @@ static void maybe_start_skipping_changes(XLogRecPtr finish_lsn) { Assert(!is_skipping_changes()); + + /* + * For streaming transactions applied in a parallel apply worker, the remote + * end LSN is unknown, so skipping is not possible. For non-streaming + * transactions, the leader determines whether to skip the transaction. If + * skipping is needed, the leader simply does not send the transaction to a + * parallel apply worker. + */ + if (am_parallel_apply_worker()) + return; + + /* + * These assertions apply only to leader apply and table sync workers. + * Parallel workers may apply spooled BEGIN or STREAM_START messages, which + * can set these flags to true. + */ Assert(!in_remote_transaction); Assert(!in_streamed_transaction); diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index cb0a8f440c0..d9f375295de 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -259,6 +259,8 @@ extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecP extern void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type); +extern void logicalrep_write_internal_rel(StringInfo out, + LogicalRepRelation *rel); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); extern void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid); diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index 09890cef8d4..cd852337165 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -58,6 +58,8 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode); extern bool IsIndexUsableForReplicaIdentityFull(Relation idxrel, AttrMap *attrmap); extern Oid GetRelationIdentityOrPK(Relation rel); +extern int logicalrep_get_num_rels(void); +extern void logicalrep_write_all_rels(StringInfo out); extern LogicalRepRelMapEntry *logicalrep_get_relentry(LogicalRepRelId remoteid); #endif /* LOGICALRELATION_H */ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 3c88d9e1027..02a165ccc0d 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -345,7 +345,8 @@ extern void SetupApplyOrSyncWorker(int worker_slot); extern void DisableSubscriptionAndExit(void); -extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn); +extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn, + TransactionId pa_remote_xid); /* Function for apply error callback */ extern void apply_error_callback(void *arg); @@ -354,13 +355,15 @@ extern void set_apply_error_context_origin(char *originname); /* Parallel apply worker setup and interactions */ extern void pa_allocate_worker(TransactionId xid, bool stream_txn); extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid); -extern XLogRecPtr pa_get_last_commit_end(TransactionId xid, bool *skipped_write); +extern XLogRecPtr pa_get_last_commit_end(TransactionId xid, bool delete_entry, + bool *skipped_write); extern void pa_detach_all_error_mq(void); extern void apply_handle_internal_message(StringInfo s); extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data); +extern void pa_distribute_schema_changes_to_workers(LogicalRepRelation *rel); extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked); @@ -386,8 +389,9 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); extern bool pa_transaction_committed(TransactionId xid); - +extern void pa_commit_transaction(void); extern void pa_wait_for_depended_transaction(TransactionId xid); +extern void pa_add_parallelized_transaction(TransactionId xid); #define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index e71e95c6297..48ae698e786 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -48,6 +48,7 @@ tests += { 't/036_sequences.pl', 't/037_except.pl', 't/038_walsnd_shutdown_timeout.pl', + 't/050_parallel_apply.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 7d41715ed81..c863b430bec 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -16,6 +16,8 @@ $node_publisher->start; # Create subscriber node my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', + "max_logical_replication_workers = 10"); $node_subscriber->start; # Create some preexisting content on publisher diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl index 945505d0239..e15a6bb2a03 100644 --- a/src/test/subscription/t/010_truncate.pl +++ b/src/test/subscription/t/010_truncate.pl @@ -17,7 +17,7 @@ $node_publisher->start; my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init; $node_subscriber->append_conf('postgresql.conf', - qq(max_logical_replication_workers = 6)); + qq(max_logical_replication_workers = 7)); $node_subscriber->start; my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl index ac96bc3f009..705bc83f5ea 100644 --- a/src/test/subscription/t/015_stream.pl +++ b/src/test/subscription/t/015_stream.pl @@ -232,6 +232,12 @@ $node_subscriber->wait_for_log( $node_publisher->safe_psql('postgres', "INSERT INTO test_tab_2 values(1)"); +# FIXME: Currently, non-streaming transactions are applied in parallel by +# default. So, the first transaction is handled by a parallel apply worker. To +# trigger the deadlock, initiate an more transaction to be applied by the +# leader. +$node_publisher->safe_psql('postgres', "INSERT INTO test_tab_2 values(1)"); + $h->query_safe('COMMIT'); $h->quit; @@ -247,7 +253,7 @@ $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); -is($result, qq(5001), 'data replicated to subscriber after dropping index'); +is($result, qq(5002), 'data replicated to subscriber after dropping index'); # Clean up test data from the environment. $node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab_2"); diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl index 5d457060a02..911005bde20 100644 --- a/src/test/subscription/t/026_stats.pl +++ b/src/test/subscription/t/026_stats.pl @@ -16,6 +16,7 @@ $node_publisher->start; # Create subscriber node. my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', "max_logical_replication_workers = 10"); $node_subscriber->start; diff --git a/src/test/subscription/t/027_nosuperuser.pl b/src/test/subscription/t/027_nosuperuser.pl index 322f5b4cc6a..fdfc44ac729 100644 --- a/src/test/subscription/t/027_nosuperuser.pl +++ b/src/test/subscription/t/027_nosuperuser.pl @@ -86,6 +86,7 @@ $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_publisher->init(allows_streaming => 'logical'); $node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', "max_logical_replication_workers = 10"); $node_publisher->start; $node_subscriber->start; $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; diff --git a/src/test/subscription/t/050_parallel_apply.pl b/src/test/subscription/t/050_parallel_apply.pl new file mode 100644 index 00000000000..ff6ce045fc8 --- /dev/null +++ b/src/test/subscription/t/050_parallel_apply.pl @@ -0,0 +1,130 @@ + +# Copyright (c) 2025, PostgreSQL Global Development Group + +# This tests that dependency tracking between transactions can work well + +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +if ($ENV{enable_injection_points} ne 'yes') +{ + plan skip_all => 'Injection points not supported by this build'; +} + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Insert initial data +$node_publisher->safe_psql('postgres', + "CREATE TABLE regress_tab (id int PRIMARY KEY, value text);"); +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (generate_series(1, 10), 'test');"); + +# Create a publication +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION regress_pub FOR ALL TABLES;"); + +# Initialize subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); +$node_subscriber->append_conf('postgresql.conf', + "max_logical_replication_workers = 10"); +$node_subscriber->start; + +# Check if the extension injection_points is available, as it may be +# possible that this script is run with installcheck, where the module +# would not be installed by default. +if (!$node_subscriber->check_extension('injection_points')) +{ + plan skip_all => 'Extension injection_points not installed'; +} + +$node_subscriber->safe_psql('postgres', 'CREATE EXTENSION injection_points;'); + +# Create a subscription +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE regress_tab (id int PRIMARY KEY, value text);"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION regress_sub CONNECTION '$publisher_connstr' PUBLICATION regress_pub;"); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'regress_sub'); + +# Insert tuples on publisher +# +# XXX This may not enough to launch a parallel apply worker, because +# table_states_not_ready is not discarded yet. +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (generate_series(11, 20), 'test');"); +$node_publisher->wait_for_catchup('regress_sub'); + +# Insert tuples again +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (generate_series(21, 30), 'test');"); +$node_publisher->wait_for_catchup('regress_sub'); + +# Verify the parallel apply worker is launched +my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(1) FROM pg_stat_activity WHERE backend_type = 'logical replication parallel worker'"); +is($result, '1', "parallel apply worker is launched by a non-streamed transaction"); + +# Attach an injection_point. Parallel workers would wait before the commit +$node_subscriber->safe_psql('postgres', + "SELECT injection_points_attach('parallel-worker-before-commit','wait');" +); + +# Insert tuples on publisher +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (generate_series(31, 40), 'test');"); + +# Wait until the parallel worker enters the injection point. +$node_subscriber->wait_for_event('logical replication parallel worker', + 'parallel-worker-before-commit'); + +my $offset = -s $node_subscriber->logfile; + +# Insert tuples on publisher again. This transaction is independent from the +# previous one, but the parallel worker would wait till it finishes +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (generate_series(41, 50), 'test');"); + +# Verify the parallel worker waits for the transaction +my $str = $node_subscriber->wait_for_log(qr/wait for depended xid ([1-9][0-9]+)/, $offset); +my $xid = $str =~ /wait for depended xid ([1-9][0-9]+)/; + +# Update tuples which have not been applied yet on subscriber because the +# parallel worker stops at the injection point. Newly assigned worker also +# waits for the same transactions as above. +$node_publisher->safe_psql('postgres', + "UPDATE regress_tab SET value = 'updated' WHERE id BETWEEN 31 AND 35;"); + +# Verify the parallel worker waits for the same transaction +$node_subscriber->wait_for_log(qr/wait for depended xid $xid/, $offset); + +# Wakeup the parallel worker. We detach first no to stop other parallel workers +$node_subscriber->safe_psql('postgres', qq[ + SELECT injection_points_detach('parallel-worker-before-commit'); + SELECT injection_points_wakeup('parallel-worker-before-commit'); +]); + +# Verify the parallel worker wakes up +$node_subscriber->wait_for_log(qr/finish waiting for depended xid $xid/, $offset); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(1) FROM regress_tab"); +is ($result, 50, 'inserts are replicated to subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT count(1) FROM regress_tab WHERE value = 'updated'"); +is ($result, 5, 'updates are also replicated to subscriber'); + +done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 5f3fe168574..447d60dbb7f 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2150,6 +2150,7 @@ ParallelHashGrowth ParallelHashJoinBatch ParallelHashJoinBatchAccessor ParallelHashJoinState +ParallelizedTxnEntry ParallelIndexScanDesc ParallelizedTxnEntry ParallelSlot @@ -4234,6 +4235,7 @@ rendezvousHashEntry rep replace_rte_variables_callback replace_rte_variables_context +replica_identity_hash report_error_fn ret_type rewind_source -- 2.43.7