diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 31a92d1a24a..13e5fc218d8 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -254,6 +298,9 @@ static ParallelApplyWorkerInfo *stream_apply_worker = NULL; /* A list to maintain subtransactions, if any. */ static List *subxactlist = NIL; +/* GUC */ +bool preserve_commit_order = true; + static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo); static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared); static PartialFileSetState pa_get_fileset_state(void); @@ -2117,2 +2120,76 @@ write_internal_relation(StringInfo s, LogicalRepRelation *rel) } } + +#include "postmaster/bgworker_internals.h" + +typedef struct +{ + ConditionVariable cv; + slock_t mutex; + size_t head; + size_t tail; + TransactionId ring[MAX_PARALLEL_WORKER_LIMIT]; +} ParallelApplyShmem; + +static ParallelApplyShmem* pa_shmem; + +void +pa_commit(TransactionId xid) +{ + SpinLockAcquire(&pa_shmem->mutex); + pa_shmem->ring[pa_shmem->head++ % MAX_PARALLEL_WORKER_LIMIT] = xid; + SpinLockRelease(&pa_shmem->mutex); + ConditionVariableBroadcast(&pa_shmem->cv); +} + + +void +pa_before_apply_commit(void) +{ + TransactionId xid = MyParallelShared->xid; + + if (!preserve_commit_order) + return; + + while (true) + { + SpinLockAcquire(&pa_shmem->mutex); + if (pa_shmem->head > pa_shmem->tail && pa_shmem->ring[pa_shmem->tail % MAX_PARALLEL_WORKER_LIMIT] == xid) + { + SpinLockRelease(&pa_shmem->mutex); + break; + } + SpinLockRelease(&pa_shmem->mutex); + ConditionVariableSleep(&pa_shmem->cv, WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN); + } + ConditionVariableCancelSleep(); +} + +void +pa_after_apply_commit(void) +{ + SpinLockAcquire(&pa_shmem->mutex); + pa_shmem->tail += 1; + SpinLockRelease(&pa_shmem->mutex); + ConditionVariableBroadcast(&pa_shmem->cv); +} + +Size +ParallelApplyShmemSize(void) +{ + return sizeof(ParallelApplyShmem); +} + +void +ParallelApplyShmemInit(void) +{ + bool found; + + pa_shmem = (ParallelApplyShmem*)ShmemInitStruct("Parallel worker shmem", sizeof(ParallelApplyShmem), &found); + if (!found) + { + pa_shmem->head = pa_shmem->tail = 0; + ConditionVariableInit(&pa_shmem->cv); + SpinLockInit(&pa_shmem->mutex); + } +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 22ad9051db3..bf8bfcbdd3b 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1911,40 +1911,43 @@ apply_handle_commit(StringInfo s) if (pa_send_data(winfo, s->len, s->data)) { + pa_commit(winfo->shared->xid); /* Finish processing the transaction. */ pa_xact_finish(winfo, commit_data.end_lsn); break; } /* * Switch to serialize mode when we are not able to send the * change to parallel apply worker. */ pa_switch_to_partial_serialize(winfo, true); /* fall through */ case TRANS_LEADER_PARTIAL_SERIALIZE: Assert(winfo); stream_open_and_write_change(remote_xid, LOGICAL_REP_MSG_COMMIT, &original_msg); pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE); /* Finish processing the transaction. */ pa_xact_finish(winfo, commit_data.end_lsn); break; case TRANS_PARALLEL_APPLY: /* * If the parallel apply worker is applying spooled messages then * close the file before committing. */ if (stream_fd) stream_close_file(); + pa_before_apply_commit(); apply_handle_commit_internal(&commit_data); + pa_after_apply_commit(); MyParallelShared->last_commit_end = XactLastCommitEnd; pa_commit_transaction(); diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 2fa045e6b0f..71b8abc4337 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -150,6 +150,7 @@ CalculateShmemSize(int *num_semaphores) size = add_size(size, InjectionPointShmemSize()); size = add_size(size, SlotSyncShmemSize()); size = add_size(size, AioShmemSize()); + size = add_size(size, ParallelApplyShmemSize()); /* include additional requested shmem from preload libraries */ size = add_size(size, total_addin_request); @@ -332,6 +333,7 @@ CreateOrAttachShmemStructs(void) PgArchShmemInit(); ApplyLauncherShmemInit(); SlotSyncShmemInit(); + ParallelApplyShmemInit(); /* * Set up other modules that need some shared memory space diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index f137129209f..5b60a4c6655 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -73,6 +73,7 @@ #include "postmaster/walsummarizer.h" #include "postmaster/walwriter.h" #include "replication/logicallauncher.h" +#include "replication/logicalworker.h" #include "replication/slot.h" #include "replication/slotsync.h" #include "replication/syncrep.h" @@ -976,6 +977,17 @@ struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"preserve_commit_order", PGC_SIGHUP, REPLICATION_SUBSCRIBERS, + gettext_noop("Commit LR transactions at subscriber in the same order as at publisher."), + NULL, + GUC_EXPLAIN + }, + &preserve_commit_order, + true, + NULL, NULL, NULL + }, + { {"enable_parallel_hash", PGC_USERSET, QUERY_TUNING_METHOD, gettext_noop("Enables the planner's use of parallel hash plans."), diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index b29453e8e4f..2efeff720f2 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -34,4 +34,7 @@ extern bool IsLogicalLauncher(void); extern pid_t GetLeaderApplyWorkerPid(pid_t pid); +extern Size ParallelApplyShmemSize(void); +extern void ParallelApplyShmemInit(void); + #endif /* LOGICALLAUNCHER_H */ diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index 88912606e4d..cb030eea402 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -14,6 +14,8 @@ #include +extern PGDLLIMPORT bool preserve_commit_order; + extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; extern void ApplyWorkerMain(Datum main_arg); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 7c0204dd6f4..e48a2219131 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -363,4 +385,10 @@ am_parallel_apply_worker(void) return isParallelApplyWorker(MyLogicalRepWorker); } +extern void pa_before_apply_commit(void); + +extern void pa_after_apply_commit(void); + +extern void pa_commit(TransactionId xid); + #endif /* WORKER_INTERNAL_H */