From 0861f5bb3817fa6bfd9a4f34ab19bb0f662c3aa6 Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Thu, 30 Apr 2026 15:28:50 +0800 Subject: [PATCH v18 5/7] support 2PC This patch allows the PREPARE transaction to be applied in parallel. Parallel apply workers are assigned to a transaction when BEGIN_PREPARE is received. This part and the dependency-waiting mechanism are the same as a normal transaction. A parallel worker can be freed after it handles a PREPARE message. The prepared transaction can be deleted from parallelized_txns at that time; the upcoming transactions will wait until then. The leader apply worker resolves COMMIT PREPARED/ROLLBACK PREPARED. Since it can be serialized automatically, it does not add the transaction to parallelized_txns. --- src/backend/replication/logical/worker.c | 218 +++++++++++++++--- src/test/subscription/t/050_parallel_apply.pl | 60 +++++ 2 files changed, 250 insertions(+), 28 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 64dca6aa9ba..9b1962de840 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -806,6 +806,14 @@ delete_replica_identity_entries_for_txns(List *committed_xids) if (!committed_xids) return; + /* + * Skip if the replica_identity_table is not initialized yet. This can + * happen if the empty transaction was replicated and a parallel apply + * worker was launched. See comments in apply_handle_prepare(). + */ + if (!replica_identity_table) + return; + replica_identity_start_iterate(replica_identity_table, &i); while ((rientry = replica_identity_iterate(replica_identity_table, &i)) != NULL) { @@ -2144,6 +2152,8 @@ static void apply_handle_begin_prepare(StringInfo s) { LogicalRepPreparedTxnData begin_data; + ParallelApplyWorkerInfo *winfo; + TransApplyAction apply_action; /* Tablesync should never receive prepare. */ if (am_tablesync_worker()) @@ -2155,12 +2165,54 @@ apply_handle_begin_prepare(StringInfo s) Assert(!TransactionIdIsValid(stream_xid)); logicalrep_read_begin_prepare(s, &begin_data); - set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn); + + remote_xid = begin_data.xid; + + set_apply_error_context_xact(remote_xid, begin_data.prepare_lsn); remote_final_lsn = begin_data.prepare_lsn; maybe_start_skipping_changes(begin_data.prepare_lsn); + pa_allocate_worker(remote_xid, false); + + apply_action = get_transaction_apply_action(remote_xid, &winfo); + + elog(DEBUG1, "new remote_xid %u", remote_xid); + switch (apply_action) + { + case TRANS_LEADER_APPLY: + break; + + case TRANS_LEADER_SEND_TO_PARALLEL: + Assert(winfo); + + if (pa_send_data(winfo, s->len, s->data)) + { + pa_set_stream_apply_worker(winfo); + break; + } + + /* + * TODO: Support switching to PARTIAL_SERIALIZE mode when the send + * buffer becomes full. + */ + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not send data to the logical replication parallel apply worker")); + break; + + case TRANS_PARALLEL_APPLY: + /* Hold the lock until the end of the transaction. */ + pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock); + pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED); + break; + + default: + elog(ERROR, "unexpected apply action: %d", (int) apply_action); + break; + } + in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); @@ -2210,6 +2262,8 @@ static void apply_handle_prepare(StringInfo s) { LogicalRepPreparedTxnData prepare_data; + ParallelApplyWorkerInfo *winfo; + TransApplyAction apply_action; logicalrep_read_prepare(s, &prepare_data); @@ -2220,36 +2274,131 @@ apply_handle_prepare(StringInfo s) LSN_FORMAT_ARGS(prepare_data.prepare_lsn), LSN_FORMAT_ARGS(remote_final_lsn)))); - /* - * Unlike commit, here, we always prepare the transaction even though no - * change has happened in this transaction or all changes are skipped. It - * is done this way because at commit prepared time, we won't know whether - * we have skipped preparing a transaction because of those reasons. - * - * XXX, We can optimize such that at commit prepared time, we first check - * whether we have prepared the transaction or not but that doesn't seem - * worthwhile because such cases shouldn't be common. - */ - begin_replication_step(); + apply_action = get_transaction_apply_action(remote_xid, &winfo); - apply_handle_prepare_internal(&prepare_data); + switch (apply_action) + { + case TRANS_LEADER_APPLY: - end_replication_step(); - CommitTransactionCommand(); - pgstat_report_stat(false); + /* + * Unlike commit, here, we always prepare the transaction even + * though no change has happened in this transaction or all + * changes are skipped. It is done this way because at commit + * prepared time, we won't know whether we have skipped preparing + * a transaction because of those reasons. + * + * XXX, We can optimize such that at commit prepared time, we + * first check whether we have prepared the transaction or not but + * that doesn't seem worthwhile because such cases shouldn't be + * common. + */ + begin_replication_step(); - /* - * It is okay not to set the local_end LSN for the prepare because we - * always flush the prepare record. So, we can send the acknowledgment of - * the remote_end LSN as soon as prepare is finished. - * - * XXX For the sake of consistency with commit, we could have set it with - * the LSN of prepare but as of now we don't track that value similar to - * XactLastCommitEnd, and adding it for this purpose doesn't seems worth - * it. - */ - store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr, - InvalidTransactionId); + /* Wait until the last transaction finishes */ + if (TransactionIdIsValid(last_parallelized_remote_xid)) + { + pa_wait_for_depended_transaction(last_parallelized_remote_xid); + last_parallelized_remote_xid = InvalidTransactionId; + } + + apply_handle_prepare_internal(&prepare_data); + + end_replication_step(); + CommitTransactionCommand(); + pgstat_report_stat(false); + + /* + * It is okay not to set the local_end LSN for the prepare because + * we always flush the prepare record. So, we can send the + * acknowledgment of the remote_end LSN as soon as prepare is + * finished. + * + * XXX For the sake of consistency with commit, we could have set + * it with the LSN of prepare but as of now we don't track that + * value similar to XactLastCommitEnd, and adding it for this + * purpose doesn't seems worth it. + */ + store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr, + InvalidTransactionId); + + break; + + case TRANS_LEADER_SEND_TO_PARALLEL: + Assert(winfo); + + /* + * Mark this transaction as parallelized. This ensures that + * upcoming transactions wait until this transaction is committed. + */ + pa_add_parallelized_transaction(remote_xid); + + /* + * Build a dependency between this transaction and the lastly + * committed transaction to preserve the commit order. + */ + build_dependency_with_last_committed_txn(winfo); + + if (pa_send_data(winfo, s->len, s->data)) + { + /* Cache the remote_xid */ + last_parallelized_remote_xid = remote_xid; + + /* Finish processing the transaction. */ + pa_xact_finish(winfo, prepare_data.end_lsn); + break; + } + + /* + * TODO: Support switching to PARTIAL_SERIALIZE mode when the send + * buffer becomes full. + */ + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not send data to the logical replication parallel apply worker")); + break; + + case TRANS_PARALLEL_APPLY: + + /* + * If the parallel apply worker is applying spooled messages then + * close the file before committing. + */ + if (stream_fd) + stream_close_file(); + + begin_replication_step(); + + INJECTION_POINT("parallel-worker-before-prepare", NULL); + + /* Mark the transaction as prepared. */ + apply_handle_prepare_internal(&prepare_data); + + end_replication_step(); + + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr, + InvalidTransactionId); + + /* + * It is okay not to set the local_end LSN for the prepare because + * we always flush the prepare record. See apply_handle_prepare. + */ + MyParallelShared->last_commit_end = InvalidXLogRecPtr; + pa_commit_transaction(); + + pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock); + + pa_reset_subtrans(); + break; + + default: + elog(ERROR, "unexpected apply action: %d", (int) apply_action); + break; + } + + remote_xid = InvalidTransactionId; in_remote_transaction = false; @@ -2297,6 +2446,12 @@ apply_handle_commit_prepared(StringInfo s) /* There is no transaction when COMMIT PREPARED is called */ begin_replication_step(); + if (TransactionIdIsValid(last_parallelized_remote_xid)) + { + pa_wait_for_depended_transaction(last_parallelized_remote_xid); + last_parallelized_remote_xid = InvalidTransactionId; + } + /* * Update origin state so we can restart streaming from correct position * in case of crash. @@ -2365,6 +2520,13 @@ apply_handle_rollback_prepared(StringInfo s) /* There is no transaction when ABORT/ROLLBACK PREPARED is called */ begin_replication_step(); + + if (TransactionIdIsValid(last_parallelized_remote_xid)) + { + pa_wait_for_depended_transaction(last_parallelized_remote_xid); + last_parallelized_remote_xid = InvalidTransactionId; + } + FinishPreparedTransaction(gid, false); end_replication_step(); CommitTransactionCommand(); diff --git a/src/test/subscription/t/050_parallel_apply.pl b/src/test/subscription/t/050_parallel_apply.pl index ff6ce045fc8..6e27e672014 100644 --- a/src/test/subscription/t/050_parallel_apply.pl +++ b/src/test/subscription/t/050_parallel_apply.pl @@ -17,6 +17,8 @@ if ($ENV{enable_injection_points} ne 'yes') # Initialize publisher node my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + "max_prepared_transactions = 10"); $node_publisher->start; # Insert initial data @@ -35,6 +37,8 @@ $node_subscriber->init; $node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); $node_subscriber->append_conf('postgresql.conf', "max_logical_replication_workers = 10"); +$node_subscriber->append_conf('postgresql.conf', + "max_prepared_transactions = 10"); $node_subscriber->start; # Check if the extension injection_points is available, as it may be @@ -127,4 +131,60 @@ $result = "SELECT count(1) FROM regress_tab WHERE value = 'updated'"); is ($result, 5, 'updates are also replicated to subscriber'); +# Ensure PREPAREd transaction also affects the parallel apply + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_sub DISABLE;"); +$node_subscriber->poll_query_until('postgres', + "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication apply worker'" +); +$node_subscriber->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION regress_sub SET (two_phase = on); + ALTER SUBSCRIPTION regress_sub ENABLE;"); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(1) FROM pg_stat_activity WHERE backend_type = 'logical replication parallel worker'"); +is($result, '0', "no parallel apply workers exist after restart"); + +# Attach an injection_point. Parallel workers would wait before the prepare +$node_subscriber->safe_psql('postgres', + "SELECT injection_points_attach('parallel-worker-before-prepare','wait');" +); + +# PREPARE a transaction on publisher. It would be handled by a parallel apply +# worker. +$node_publisher->safe_psql('postgres', qq[ + BEGIN; + INSERT INTO regress_tab VALUES (generate_series(51, 60), 'prepare'); + PREPARE TRANSACTION 'regress_prepare'; +]); + +# Wait until the parallel worker enters the injection point. +$node_subscriber->wait_for_event('logical replication parallel worker', + 'parallel-worker-before-prepare'); + +$offset = -s $node_subscriber->logfile; + +# Insert tuples on publisher again. This transaction waits for the prepared +# transaction +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (generate_series(61, 70), 'test');"); + +# Verify the parallel worker waits for the transaction +$str = $node_subscriber->wait_for_log(qr/wait for depended xid ([1-9][0-9]+)/, $offset); +$xid = $str =~ /wait for depended xid ([1-9][0-9]+)/; + +# Wakeup the parallel worker +$node_subscriber->safe_psql('postgres', qq[ + SELECT injection_points_detach('parallel-worker-before-prepare'); + SELECT injection_points_wakeup('parallel-worker-before-prepare'); +]); + +$node_subscriber->wait_for_log(qr/finish waiting for depended xid $xid/, $offset); + +# COMMIT the prepared transaction. It is always handled by the leader +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'regress_prepare';"); +$node_publisher->wait_for_catchup('regress_sub'); + done_testing(); -- 2.43.7