From e510507b036dc655c28a8df780e8081677f6f48b Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Wed, 29 Apr 2026 16:41:44 +0800 Subject: [PATCH v18 2/7] Introduce internal messages to track dependencies This patch introduces a new set of internal worker message types (see PAWorkerMsgType). These types of messages are generated by a leader worker and sent to parallel apply workers based on the needs. For now, two types of messages exist: PA_MSG_XACT_DEPENDENCY and PA_MSG_RELMAP. PA_MSG_XACT_DEPENDENCY ensures that dependent transactions are committed in the correct order. It has a list of transaction IDs that parallel workers must wait for. This type of message is generated when the leader detects a dependency between the current and other transactions, or just before the COMMIT message. The latter one is used to preserve the commit ordering between the publisher and the subscriber. PA_MSG_RELMAP is used to synchronize the remote relation information between the leader and parallel workers. It has a list of relations that the leader already knows, and parallel workers also update the relmap in response to the message. This type of message is generated when the leader allocates a new parallel worker to the transaction, or when the publisher sends additional RELATION messages. This synchronization is necessary for parallel apply workers to map local replication target relations to their remote counterparts during change application. Since the walsender does not send remote relation information with every transaction, the parallel apply worker may not have up-to-date relation info unless synchronized by the leader. In the logical replication protocol, the above messages are encapsulated within the LOGICAL_REP_MSG_INTERNAL_MESSAGE type. Later patches will use a uniform format (LOGICAL_REP_MSG_INTERNAL_MESSAGE + PAWorkerMsgType + internal data) for sending to parallel apply workers or serializing to disk. Author: Zhijie Hou Author: Hayato Kuroda --- .../replication/logical/applyparallelworker.c | 187 ++++++++++++++++-- src/backend/replication/logical/proto.c | 2 + src/backend/replication/logical/worker.c | 4 + src/include/replication/logicalproto.h | 8 + src/include/replication/worker_internal.h | 17 ++ src/tools/pgindent/typedefs.list | 1 + 6 files changed, 202 insertions(+), 17 deletions(-) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 50eb574a1bf..4ccf6a81b14 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -772,6 +772,73 @@ ProcessParallelApplyInterrupts(void) } } +/* + * Handle internal dependency information. + * + * Wait for all transactions listed in the message to commit. + */ +static void +apply_handle_internal_dependency(StringInfo s) +{ + int nxids = pq_getmsgint(s, 4); + + for (int i = 0; i < nxids; i++) + { + TransactionId xid = pq_getmsgint(s, 4); + + pa_wait_for_depended_transaction(xid); + } +} + +/* + * Handle internal relation information. + * + * Update all relation details in the relation map cache. + */ +static void +apply_handle_internal_relation(StringInfo s) +{ + int nrels = pq_getmsgint(s, 4); + + for (int i = 0; i < nrels; i++) + { + LogicalRepRelation *rel = logicalrep_read_rel(s); + + logicalrep_relmap_update(rel); + + /* Also reset all entries in the partition map that refer to remoterel. */ + logicalrep_partmap_reset_relmap(rel); + + elog(DEBUG1, "parallel apply worker init relmap for %s", + rel->relname); + } +} + +/* + * Handle an internal message generated by the leader apply worker. + */ +void +apply_handle_internal_message(StringInfo s) +{ + PAWorkerMsgType action = pq_getmsgbyte(s); + + Assert(am_parallel_apply_worker()); + + switch (action) + { + case PA_MSG_XACT_DEPENDENCY: + apply_handle_internal_dependency(s); + break; + case PA_MSG_RELMAP: + apply_handle_internal_relation(s); + break; + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid worker internal message type \"??? (%d)\"", action))); + } +} + /* Parallel apply worker main loop. */ static void LogicalParallelApplyLoop(shm_mq_handle *mqh) @@ -780,6 +847,14 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh) ErrorContextCallback errcallback; MemoryContext oldcxt = CurrentMemoryContext; + /* + * Ensure LOGICAL_REP_MSG_INTERNAL_MESSAGE does not conflict with + * PqReplMsg_WALData ('d'), as parallel apply workers may receive both types + * of messages. + */ + StaticAssertDecl(LOGICAL_REP_MSG_INTERNAL_MESSAGE != PqReplMsg_WALData, + "LOGICAL_REP_MSG_INTERNAL_MESSAGE conflicts with PqReplMsg_WALData"); + /* * Init the ApplyMessageContext which we clean up after each replication * protocol message. @@ -818,26 +893,46 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh) initReadOnlyStringInfo(&s, data, len); - /* - * The first byte of messages sent from leader apply worker to - * parallel apply workers can only be PqReplMsg_WALData. - */ c = pq_getmsgbyte(&s); - if (c != PqReplMsg_WALData) - elog(ERROR, "unexpected message \"%c\"", c); - /* - * Ignore statistics fields that have been updated by the leader - * apply worker. - * - * XXX We can avoid sending the statistics fields from the leader - * apply worker but for that, it needs to rebuild the entire - * message by removing these fields which could be more work than - * simply ignoring these fields in the parallel apply worker. - */ - s.cursor += SIZE_STATS_MESSAGE; + if (c == PqReplMsg_WALData) + { + /* + * Ignore statistics fields that have been updated by the + * leader apply worker. + * + * XXX We can avoid sending the statistics fields from the + * leader apply worker but for that, it needs to rebuild the + * entire message by removing these fields which could be more + * work than simply ignoring these fields in the parallel apply + * worker. + */ + s.cursor += SIZE_STATS_MESSAGE; + + apply_dispatch(&s); + } + else if (c == LOGICAL_REP_MSG_INTERNAL_MESSAGE) + { + /* + * Rewind the cursor so that apply_dispatch can re-read the + * first byte (LOGICAL_REP_MSG_INTERNAL_MESSAGE) to correctly + * handle the internal message. Alternatively, we could call + * apply_handle_internal_message directly, but using + * apply_dispatch uniformly across all message types is cleaner + * and more consistent. + */ + s.cursor--; - apply_dispatch(&s); + apply_dispatch(&s); + } + else + { + /* + * The first byte of messages sent from leader apply worker to + * parallel apply workers can only be 'w' or 'i'. + */ + elog(ERROR, "unexpected message \"%c\"", c); + } } else if (shmq_res == SHM_MQ_WOULD_BLOCK) { @@ -1744,3 +1839,61 @@ pa_attach_parallelized_txn_hash(dsa_handle *pa_dsa_handle, MemoryContextSwitchTo(oldctx); } + +/* + * Wait for the given remote transaction to finish applying by a parallel apply + * worker. + * + * Both leader and parallel apply workers can call this function to wait for a + * parallelized transaction to finish. + */ +void +pa_wait_for_depended_transaction(TransactionId xid) +{ + ParallelizedTxnEntry *txn_entry; + + /* + * Quick exit if parallelized_txns has not been initialized yet. This can + * happen when the leader worker calls this function before any parallel + * apply workers have been launched. + */ + if (!parallelized_txns) + return; + + elog(DEBUG1, "wait for depended xid %u", xid); + + for (;;) + { + txn_entry = dshash_find(parallelized_txns, &xid, false); + + /* The entry is removed only if the transaction is committed */ + if (txn_entry == NULL) + break; + + dshash_release_lock(parallelized_txns, txn_entry); + + /* + * Wait for the parallel apply worker processing the given remote + * transaction to finish applying and release its lock. + */ + pa_lock_transaction(xid, AccessShareLock); + pa_unlock_transaction(xid, AccessShareLock); + + CHECK_FOR_INTERRUPTS(); + + /* + * Acquiring the lock successfully does not guarantee we can proceed. + * The worker may have errored out and released the lock while leaving + * its shared hash entry intact, or it may not have acquired the lock + * yet because it hasn't processed the BEGIN message. In either case, we + * must continue waiting in the loop until the parallel apply worker + * finishes applying the transaction, or until the leader notifies us of + * a failure and restarts all workers. + * + * The above race window is small and infrequent, so no WaitLatch is + * added. + */ + } + + elog(DEBUG1, "finish waiting for depended xid %u", xid); +} diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 86ad97cd937..a0cac05cdcd 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -1253,6 +1253,8 @@ logicalrep_message_type(LogicalRepMsgType action) return "STREAM ABORT"; case LOGICAL_REP_MSG_STREAM_PREPARE: return "STREAM PREPARE"; + case LOGICAL_REP_MSG_INTERNAL_MESSAGE: + return "INTERNAL MESSAGE"; } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index dd6fc38a41e..1e73f8c5c83 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3875,6 +3875,10 @@ apply_dispatch(StringInfo s) apply_handle_stream_prepare(s); break; + case LOGICAL_REP_MSG_INTERNAL_MESSAGE: + apply_handle_internal_message(s); + break; + default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 058a955e20c..cb0a8f440c0 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -53,6 +53,13 @@ * in logical replication protocol, which uses a single byte to identify a * message type. Hence the values should be single-byte wide and preferably * human-readable characters. + * + * LOGICAL_REP_MSG_INTERNAL_MESSAGE ('i') is reserved for internal messages (see + * PAWorkerMsgType for sub-message types) sent from the leader apply + * worker to parallel apply workers. Centralizing this definition here allows + * all message types to be handled together and avoids the maintenance burden of + * ensuring sub-message types do not conflict with regular LogicalRepMsgType + * values. */ typedef enum LogicalRepMsgType { @@ -75,6 +82,7 @@ typedef enum LogicalRepMsgType LOGICAL_REP_MSG_STREAM_COMMIT = 'c', LOGICAL_REP_MSG_STREAM_ABORT = 'A', LOGICAL_REP_MSG_STREAM_PREPARE = 'p', + LOGICAL_REP_MSG_INTERNAL_MESSAGE = 'i', } LogicalRepMsgType; /* diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index e60e0c3d6d7..43dcae43a62 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -242,6 +242,19 @@ typedef struct ParallelApplyWorkerInfo ParallelApplyWorkerShared *shared; } ParallelApplyWorkerInfo; +/* + * Parallel apply worker internal message types. + * + * These types of messages are generated by the leader apply worker and sent to + * the parallel apply worker, encapsulated within the + * LOGICAL_REP_MSG_INTERNAL_MESSAGE type. + */ +typedef enum PAWorkerMsgType +{ + PA_MSG_XACT_DEPENDENCY = 'd', + PA_MSG_RELMAP = 'r', +} PAWorkerMsgType; + /* Main memory context for apply worker. Permanent during worker lifetime. */ extern PGDLLIMPORT MemoryContext ApplyContext; @@ -341,6 +354,8 @@ extern void pa_allocate_worker(TransactionId xid); extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid); extern void pa_detach_all_error_mq(void); +extern void apply_handle_internal_message(StringInfo s); + extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data); extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, @@ -368,6 +383,8 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); +extern void pa_wait_for_depended_transaction(TransactionId xid); + #define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) #define isTableSyncWorker(worker) ((worker)->in_use && \ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 28b9eaeb7ae..72f9d22215d 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1914,6 +1914,7 @@ OverridingKind PACE_HEADER PACL PATH +PAWorkerMsgType PCtxtHandle PERL_CONTEXT PERL_SI -- 2.43.7