From c3ca4e9253f3573cb22d828916a14d86dc4754fd Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Thu, 30 Apr 2026 16:52:41 +0800 Subject: [PATCH v18 10/10] Support serializing changes to disk when the send buffer is full This commit adds support for the leader apply worker to serialize changes to disk when the send buffer to parallel apply workers becomes full. If a parallel apply worker cannot send changes to the leader due to a send timeout, it switches to serialization mode and writes the remaining changes of the remote transaction to a file. The parallel apply worker later reads and applies the serialized changes once it has caught up. --- .../replication/logical/applyparallelworker.c | 52 ++++- src/backend/replication/logical/worker.c | 184 +++++++++++++----- src/test/subscription/t/050_parallel_apply.pl | 30 +++ 3 files changed, 209 insertions(+), 57 deletions(-) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index b249ff2bad6..2550681e800 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -976,6 +976,38 @@ pa_get_last_commit_end(TransactionId xid, bool delete_entry, bool *skipped_write return entry->local_end; } +/* + * Wait for the remote transaction associated with the specified remote xid to + * complete. + */ +static void +pa_wait_for_transaction(TransactionId wait_for_xid) +{ + if (!am_leader_apply_worker()) + return; + + if (!TransactionIdIsValid(wait_for_xid)) + return; + + elog(DEBUG1, "plan to wait for remote_xid %u to finish", + wait_for_xid); + + for (;;) + { + if (pa_transaction_committed(wait_for_xid)) + break; + + pa_lock_transaction(wait_for_xid, AccessShareLock); + pa_unlock_transaction(wait_for_xid, AccessShareLock); + + /* An interrupt may have occurred while we were waiting. */ + CHECK_FOR_INTERRUPTS(); + } + + elog(DEBUG1, "finished wait for remote_xid %u to finish", + wait_for_xid); +} + /* * Interrupt handler for main loop of parallel apply worker. */ @@ -1595,6 +1627,7 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) void pa_distribute_schema_changes_to_workers(LogicalRepRelation *rel) { + List *workers_stopped = NIL; StringInfoData out; /* Only leader apply workers can distribute schema changes */ @@ -1631,13 +1664,22 @@ pa_distribute_schema_changes_to_workers(LogicalRepRelation *rel) continue; /* - * TODO: Support switching to PARTIAL_SERIALIZE mode when the send - * buffer becomes full. + * Distribution to this worker failed due to a sending timeout. Wait + * for the worker to complete its transaction and then stop it. This + * is consistent with the handling of workers in serialize mode (see + * pa_free_worker() for details). */ - ereport(ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not send data to the logical replication parallel apply worker for subscription")); + pa_wait_for_transaction(winfo->shared->xid); + + pa_get_last_commit_end(winfo->shared->xid, false, NULL); + + logicalrep_pa_worker_stop(winfo); + + workers_stopped = lappend(workers_stopped, winfo); } + + foreach_ptr(ParallelApplyWorkerInfo, winfo, workers_stopped) + pa_free_worker_info(winfo); } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 92b44093ebf..ff2e64686b6 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -733,9 +733,9 @@ static void set_wal_receiver_timeout(void); static void on_exit_clear_xact_state(int code, Datum arg); -static void send_internal_dependencies(ParallelApplyWorkerInfo *winfo, +static bool send_internal_dependencies(ParallelApplyWorkerInfo *winfo, List *depends_on_xids); -static void build_dependency_with_last_committed_txn(ParallelApplyWorkerInfo *winfo); +static bool build_dependency_with_last_committed_txn(ParallelApplyWorkerInfo *winfo); /* * Compute the hash value for entries in the replica_identity_table. @@ -1904,6 +1904,9 @@ handle_parallelized_transaction(LogicalRepMsgType action, StringInfo s) ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + /* Save the message before it is consumed. */ + StringInfoData original_msg = *s; + /* * Dependency checking for non-streaming transactions is only required in * the leader apply worker during a remote transaction. @@ -1924,6 +1927,12 @@ handle_parallelized_transaction(LogicalRepMsgType action, StringInfo s) handle_dependency_on_change(action, s, remote_xid, winfo); + /* + * Re-fetch the latest apply action as it might have been changed during + * dependency check. + */ + apply_action = get_transaction_apply_action(remote_xid, &winfo); + switch (apply_action) { case TRANS_LEADER_SEND_TO_PARALLEL: @@ -1935,13 +1944,18 @@ handle_parallelized_transaction(LogicalRepMsgType action, StringInfo s) action != LOGICAL_REP_MSG_TYPE); /* - * TODO: Support switching to PARTIAL_SERIALIZE mode when the send - * buffer becomes full. + * Switch to serialize mode when we are not able to send the + * change to parallel apply worker. */ - ereport(ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not send data to the logical replication parallel apply worker")); - return false; /* silence compiler warning */ + pa_switch_to_partial_serialize(winfo, false); + + pg_fallthrough; + case TRANS_LEADER_PARTIAL_SERIALIZE: + stream_write_change(action, &original_msg); + + /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */ + return (action != LOGICAL_REP_MSG_RELATION && + action != LOGICAL_REP_MSG_TYPE); default: elog(ERROR, "unexpected apply action: %d", (int) apply_action); @@ -2303,6 +2317,9 @@ apply_handle_begin(StringInfo s) ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + /* Save the message before it is consumed. */ + StringInfoData original_msg = *s; + /* There must not be an active streaming transaction. */ Assert(!TransactionIdIsValid(stream_xid)); @@ -2336,12 +2353,19 @@ apply_handle_begin(StringInfo s) } /* - * TODO: Support switching to PARTIAL_SERIALIZE mode when the send - * buffer becomes full. + * Switch to serialize mode when we are not able to send the + * change to parallel apply worker. */ - ereport(ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not send data to the logical replication parallel apply worker")); + pa_switch_to_partial_serialize(winfo, true); + + pg_fallthrough; + case TRANS_LEADER_PARTIAL_SERIALIZE: + Assert(winfo); + + stream_write_change(LOGICAL_REP_MSG_BEGIN, &original_msg); + + /* Cache the parallel apply worker for this transaction. */ + pa_set_stream_apply_worker(winfo); break; case TRANS_PARALLEL_APPLY: @@ -2363,7 +2387,7 @@ apply_handle_begin(StringInfo s) /* * Send an INTERNAL_DEPENDENCY message to a parallel apply worker. */ -static void +static bool send_internal_dependencies(ParallelApplyWorkerInfo *winfo, List *depends_on_xids) { StringInfoData dependencies; @@ -2377,14 +2401,22 @@ send_internal_dependencies(ParallelApplyWorkerInfo *winfo, List *depends_on_xids foreach_xid(xid, depends_on_xids) pq_sendint32(&dependencies, xid); + if (!winfo->serialize_changes) + { + if (pa_send_data(winfo, dependencies.len, dependencies.data)) + return true; + + pa_switch_to_partial_serialize(winfo, true); + } + /* - * TODO: Support switching to PARTIAL_SERIALIZE mode when the send - * buffer becomes full. + * Skip writing the first internal message flag because + * stream_write_change() accepts it as the argument. */ - if (!pa_send_data(winfo, dependencies.len, dependencies.data)) - ereport(ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not send data to the logical replication parallel apply worker")); + dependencies.cursor++; + stream_write_change(LOGICAL_REP_MSG_INTERNAL_MESSAGE, &dependencies); + + return false; } /* @@ -2394,14 +2426,14 @@ send_internal_dependencies(ParallelApplyWorkerInfo *winfo, List *depends_on_xids * instructing it to wait for the last committed transaction to finish before * committing its own, thereby preserving commit order. */ -static void +static bool build_dependency_with_last_committed_txn(ParallelApplyWorkerInfo *winfo) { /* Skip if transactions have not been applied yet */ if (!TransactionIdIsValid(last_parallelized_remote_xid)) - return; + return true; - send_internal_dependencies(winfo, list_make1_xid(last_parallelized_remote_xid)); + return send_internal_dependencies(winfo, list_make1_xid(last_parallelized_remote_xid)); } /* @@ -2416,6 +2448,9 @@ apply_handle_commit(StringInfo s) ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + /* Save the message before it is consumed. */ + StringInfoData original_msg = *s; + logicalrep_read_commit(s, &commit_data); if (commit_data.commit_lsn != remote_final_lsn) @@ -2459,11 +2494,11 @@ apply_handle_commit(StringInfo s) /* * Build a dependency between this transaction and the lastly - * committed transaction to preserve the commit order. + * committed transaction to preserve the commit order. Then try to + * send a COMMIT message if succeeded. */ - build_dependency_with_last_committed_txn(winfo); - - if (pa_send_data(winfo, s->len, s->data)) + if (build_dependency_with_last_committed_txn(winfo) && + pa_send_data(winfo, s->len, s->data)) { /* Cache the remote_xid */ last_parallelized_remote_xid = remote_xid; @@ -2474,12 +2509,29 @@ apply_handle_commit(StringInfo s) } /* - * TODO: Support switching to PARTIAL_SERIALIZE mode when the send - * buffer becomes full. + * Switch to serialize mode when we are not able to send the + * change to parallel apply worker. */ - ereport(ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not send data to the logical replication parallel apply worker")); + pa_switch_to_partial_serialize(winfo, true); + + pg_fallthrough; + case TRANS_LEADER_PARTIAL_SERIALIZE: + Assert(winfo); + + /* + * Build a dependency with the last committed transaction if not + * already done. + */ + if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL) + build_dependency_with_last_committed_txn(winfo); + + stream_open_and_write_change(remote_xid, LOGICAL_REP_MSG_COMMIT, + &original_msg); + + pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE); + + /* Finish processing the transaction. */ + pa_xact_finish(winfo, commit_data.end_lsn); break; case TRANS_PARALLEL_APPLY: @@ -2532,6 +2584,9 @@ apply_handle_begin_prepare(StringInfo s) ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + /* Save the message before it is consumed. */ + StringInfoData original_msg = *s; + /* Tablesync should never receive prepare. */ if (am_tablesync_worker()) ereport(ERROR, @@ -2571,12 +2626,19 @@ apply_handle_begin_prepare(StringInfo s) } /* - * TODO: Support switching to PARTIAL_SERIALIZE mode when the send - * buffer becomes full. + * Switch to serialize mode when we are not able to send the + * change to parallel apply worker. */ - ereport(ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not send data to the logical replication parallel apply worker")); + pa_switch_to_partial_serialize(winfo, true); + + pg_fallthrough; + case TRANS_LEADER_PARTIAL_SERIALIZE: + Assert(winfo); + + stream_write_change(LOGICAL_REP_MSG_BEGIN_PREPARE, &original_msg); + + /* Cache the parallel apply worker for this transaction. */ + pa_set_stream_apply_worker(winfo); break; case TRANS_PARALLEL_APPLY: @@ -2642,6 +2704,9 @@ apply_handle_prepare(StringInfo s) ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + /* Save the message before it is consumed. */ + StringInfoData original_msg = *s; + logicalrep_read_prepare(s, &prepare_data); if (prepare_data.prepare_lsn != remote_final_lsn) @@ -2711,11 +2776,11 @@ apply_handle_prepare(StringInfo s) /* * Build a dependency between this transaction and the lastly - * committed transaction to preserve the commit order. + * committed transaction to preserve the commit order. Then try to + * send a COMMIT message if succeeded. */ - build_dependency_with_last_committed_txn(winfo); - - if (pa_send_data(winfo, s->len, s->data)) + if (build_dependency_with_last_committed_txn(winfo) && + pa_send_data(winfo, s->len, s->data)) { /* Cache the remote_xid */ last_parallelized_remote_xid = remote_xid; @@ -2726,12 +2791,29 @@ apply_handle_prepare(StringInfo s) } /* - * TODO: Support switching to PARTIAL_SERIALIZE mode when the send - * buffer becomes full. + * Switch to serialize mode when we are not able to send the + * change to parallel apply worker. */ - ereport(ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not send data to the logical replication parallel apply worker")); + pa_switch_to_partial_serialize(winfo, true); + pg_fallthrough; + + case TRANS_LEADER_PARTIAL_SERIALIZE: + Assert(winfo); + + /* + * Build a dependency with the last committed transaction if not + * already done. + */ + if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL) + build_dependency_with_last_committed_txn(winfo); + + stream_open_and_write_change(remote_xid, LOGICAL_REP_MSG_PREPARE, + &original_msg); + + pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE); + + /* Finish processing the transaction. */ + pa_xact_finish(winfo, prepare_data.end_lsn); break; case TRANS_PARALLEL_APPLY: @@ -3006,9 +3088,8 @@ apply_handle_stream_prepare(StringInfo s) * Build a dependency between this transaction and the lastly * committed transaction to preserve the commit order. */ - build_dependency_with_last_committed_txn(winfo); - - if (pa_send_data(winfo, s->len, s->data)) + if (build_dependency_with_last_committed_txn(winfo) && + pa_send_data(winfo, s->len, s->data)) { /* Finish processing the streaming transaction. */ pa_xact_finish(winfo, prepare_data.end_lsn); @@ -3886,9 +3967,8 @@ apply_handle_stream_commit(StringInfo s) * Build a dependency between this transaction and the lastly * committed transaction to preserve the commit order. */ - build_dependency_with_last_committed_txn(winfo); - - if (pa_send_data(winfo, s->len, s->data)) + if (build_dependency_with_last_committed_txn(winfo) && + pa_send_data(winfo, s->len, s->data)) { /* Finish processing the streaming transaction. */ pa_xact_finish(winfo, commit_data.end_lsn); diff --git a/src/test/subscription/t/050_parallel_apply.pl b/src/test/subscription/t/050_parallel_apply.pl index d347bd67d64..a87d4134d02 100644 --- a/src/test/subscription/t/050_parallel_apply.pl +++ b/src/test/subscription/t/050_parallel_apply.pl @@ -335,4 +335,34 @@ $node_subscriber->safe_psql('postgres', qq[ $node_subscriber->wait_for_log(qr/finish waiting for depended xid $xid/, $offset); +# Cleanup +$node_publisher->safe_psql('postgres', "DELETE FROM regress_tab;"); +$node_publisher->wait_for_catchup('regress_sub'); + +# Force the leader apply worker to serialize changes to disk +$node_subscriber->append_conf('postgresql.conf', + "debug_logical_replication_streaming = immediate"); +$node_subscriber->reload; + +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (generate_series(1, 10), 'test');"); + +$node_publisher->wait_for_catchup('regress_sub'); + +# Reset to buffered mode so that subsequent tests send changes via shared +# memory. +$node_subscriber->append_conf('postgresql.conf', + "debug_logical_replication_streaming = buffered"); +$node_subscriber->reload; + +# Verify that changes have been serialized +$node_subscriber->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/, + $offset); + +# Verify that the parallel apply worker can restore serialized changes correctly +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(1) FROM regress_tab"); +is ($result, 10, 'inserts are replicated to subscriber'); + done_testing(); -- 2.54.0.windows.1