From 1cfb644578a89975710ec05e21faa15f48710876 Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Tue, 21 Apr 2026 15:16:59 +0800 Subject: [PATCH v18 6/7] Track dependencies for streamed transactions This commit allows tracking dependencies of streamed transactions. Regarding the streaming=on case, dependency tracking is enabled while applying spooled changes from files. In the streaming=parallel case, dependency tracking is performed when the leader sends changes to parallel workers. Apart from non-streamed transactions, the leader waits for parallel workers till the assigned transactions are finished at COMMIT/PREPARE/ABORT; thus, the XID of streamed transactions is not cached as the lastly handled one. Also, streamed transactions are not recorded as parallelized transactions because upcoming workers do not have to wait for them. --- .../replication/logical/applyparallelworker.c | 8 ++- src/backend/replication/logical/worker.c | 64 +++++++++++++++---- src/include/replication/worker_internal.h | 1 - src/test/subscription/t/050_parallel_apply.pl | 47 ++++++++++++++ 4 files changed, 104 insertions(+), 16 deletions(-) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 1e8adbd0b87..b249ff2bad6 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -165,7 +165,9 @@ * key) as another ongoing transaction (see handle_dependency_on_change for * details). If so, the leader sends a list of dependent transaction IDs to the * parallel worker, indicating that the parallel apply worker must wait for - * these transactions to commit before proceeding. + * these transactions to commit before proceeding. If transactions are streamed + * but leader deciedes not to assign parallel apply workers, dependencies are + * verified when the transaction is committed. * * Tracking dependencies is necessary even when commit order is preserved. * Consider two transactions: TX-1 (INSERT row 1) and TX-2 (DELETE row 1). If @@ -1855,6 +1857,10 @@ pa_stream_abort(LogicalRepStreamAbortData *abort_data) TransactionId xid = abort_data->xid; TransactionId subxid = abort_data->subxid; + /* Streamed transactions won't be registered */ + Assert(!dshash_find(parallelized_txns, &xid, false) && + !dshash_find(parallelized_txns, &subxid, false)); + /* * Update origin state so we can restart streaming from correct position * in case of crash. diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 9b1962de840..fb9cd104ca0 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1260,8 +1260,6 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, */ if (winfo == NULL) { - Assert(!TransactionIdIsValid(new_depended_xid)); - foreach_xid(xid, depends_on_xids) pa_wait_for_depended_transaction(xid); } @@ -1468,13 +1466,22 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) case TRANS_LEADER_SEND_TO_PARALLEL: Assert(winfo); + /* + * Pass InvalidTransactionId to skip dependency recording for this + * change. Streaming transactions are assumed not to conflict with + * other transactions, so subsequent transactions do not need to + * wait for them to finish. + */ + handle_dependency_on_change(action, s, InvalidTransactionId, winfo); + /* * XXX The publisher side doesn't always send relation/type update * messages after the streaming transaction, so also update the * relation/type in leader apply worker. See function * cleanup_rel_sync_cache. */ - if (pa_send_data(winfo, s->len, s->data)) + if (!winfo->serialize_changes && + pa_send_data(winfo, s->len, s->data)) return (action != LOGICAL_REP_MSG_RELATION && action != LOGICAL_REP_MSG_TYPE); @@ -2595,6 +2602,13 @@ apply_handle_stream_prepare(StringInfo s) apply_spooled_messages(MyLogicalRepWorker->stream_fileset, prepare_data.xid, prepare_data.prepare_lsn); + /* Wait until the last transaction finishes */ + if (TransactionIdIsValid(last_parallelized_remote_xid)) + { + pa_wait_for_depended_transaction(last_parallelized_remote_xid); + last_parallelized_remote_xid = InvalidTransactionId; + } + /* Mark the transaction as prepared. */ apply_handle_prepare_internal(&prepare_data); @@ -2618,6 +2632,12 @@ apply_handle_stream_prepare(StringInfo s) case TRANS_LEADER_SEND_TO_PARALLEL: Assert(winfo); + /* + * Build a dependency between this transaction and the lastly + * committed transaction to preserve the commit order. + */ + build_dependency_with_last_committed_txn(winfo); + if (pa_send_data(winfo, s->len, s->data)) { /* Finish processing the streaming transaction. */ @@ -2684,6 +2704,11 @@ apply_handle_stream_prepare(StringInfo s) pgstat_report_stat(false); + /* + * No need to update the last_parallelized_remote_xid here because leader + * worker always wait until streamed transactions finish. + */ + /* * Process any tables that are being synchronized in parallel, as well as * any newly added tables or sequences. @@ -2797,17 +2822,8 @@ apply_handle_stream_start(StringInfo s) set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr); - /* - * Try to allocate a worker for the streaming transaction. - * - * TODO: Support assigning streaming transactions to parallel apply workers - * even when non-streaming transactions are running and better dependency - * handling between streaming and non-streaming transactions. - */ - if (first_segment && - am_leader_apply_worker() && - (!TransactionIdIsValid(last_parallelized_remote_xid) || - pa_transaction_committed(last_parallelized_remote_xid))) + /* Try to allocate a worker for the streaming transaction. */ + if (first_segment) pa_allocate_worker(stream_xid, true); apply_action = get_transaction_apply_action(stream_xid, &winfo); @@ -3471,6 +3487,13 @@ apply_handle_stream_commit(StringInfo s) apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid, commit_data.commit_lsn); + /* Wait until the last transaction finishes */ + if (TransactionIdIsValid(last_parallelized_remote_xid)) + { + pa_wait_for_depended_transaction(last_parallelized_remote_xid); + last_parallelized_remote_xid = InvalidTransactionId; + } + apply_handle_commit_internal(&commit_data); /* Unlink the files with serialized changes and subxact info. */ @@ -3482,6 +3505,19 @@ apply_handle_stream_commit(StringInfo s) case TRANS_LEADER_SEND_TO_PARALLEL: Assert(winfo); + /* + * Apart from non-streaming case, no need to mark this transaction + * as parallelized. Because the leader waits until the streamed + * transaction is committed thus commit ordering is always + * preserved. + */ + + /* + * Build a dependency between this transaction and the lastly + * committed transaction to preserve the commit order. + */ + build_dependency_with_last_committed_txn(winfo); + if (pa_send_data(winfo, s->len, s->data)) { /* Finish processing the streaming transaction. */ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 02a165ccc0d..3dde899f465 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -370,7 +370,6 @@ extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, extern void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state); extern void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo); - extern void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid); extern void pa_reset_subtrans(void); diff --git a/src/test/subscription/t/050_parallel_apply.pl b/src/test/subscription/t/050_parallel_apply.pl index 6e27e672014..92b4e5d6c70 100644 --- a/src/test/subscription/t/050_parallel_apply.pl +++ b/src/test/subscription/t/050_parallel_apply.pl @@ -187,4 +187,51 @@ $node_subscriber->wait_for_log(qr/finish waiting for depended xid $xid/, $offset $node_publisher->safe_psql('postgres', "COMMIT PREPARED 'regress_prepare';"); $node_publisher->wait_for_catchup('regress_sub'); +# Ensure streamed transactions waits the previous transaction + +$node_publisher->append_conf('postgresql.conf', + "logical_decoding_work_mem = 64kB"); +$node_publisher->reload; +# Run a query to make sure that the reload has taken effect. +$node_publisher->safe_psql('postgres', "SELECT 1"); + +# Attach the injection_point again +$node_subscriber->safe_psql('postgres', + "SELECT injection_points_attach('parallel-worker-before-commit','wait');" +); + +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (generate_series(71, 80), 'test');"); + +# Wait until the parallel worker enters the injection point. +$node_subscriber->wait_for_event('logical replication parallel worker', + 'parallel-worker-before-commit'); + +# Run a transaction which would be streamed +my $h = $node_publisher->background_psql('postgres', on_error_stop => 0); + +$offset = -s $node_subscriber->logfile; + +$h->query_safe( + q{ +BEGIN; +UPDATE regress_tab SET value = 'streamed-updated' WHERE id BETWEEN 71 AND 80; +INSERT INTO regress_tab VALUES (generate_series(100, 5100), 'streamed'); +}); + +# Verify the parallel worker waits for the transaction +$str = $node_subscriber->wait_for_log(qr/wait for depended xid ([1-9][0-9]+)/, $offset); +$xid = $str =~ /wait for depended xid ([1-9][0-9]+)/; + +# Wakeup the parallel worker +$node_subscriber->safe_psql('postgres', qq[ + SELECT injection_points_detach('parallel-worker-before-commit'); + SELECT injection_points_wakeup('parallel-worker-before-commit'); +]); + +# Verify the streamed transaction can be applied +$node_subscriber->wait_for_log(qr/finish waiting for depended xid $xid/, $offset); + +$h->query_safe("COMMIT;"); + done_testing(); -- 2.43.7