From 79efb925bd931358811689b23ec8aac631418420 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Thu, 4 Jun 2026 18:05:47 +1000 Subject: [PATCH v1] Avoid spurious update_origin_differs conflicts Conflicts such as conflict=update_origin_differs can occur on the subscriber when rows are modified after being initially copied during table synchronisation. Rows inserted by the tablesync worker carry its origin ID. However, this origin is dropped once the tablesync worker exits. As a result, when the apply worker later processes an update or delete for such rows, the missing origin can lead to spurious update_origin_differs or delete_origin_differs conflicts. To address this, retain tablesync origin IDs for the subscription and use them during conflict detection. The pg_subscription_rel catalog is extended with a new field, srtablesyncoriginid, which stores the tablesync worker origin ID for each relation. These origin IDs are loaded into a per-subscription hash table after the tablesync COPY completes, avoiding repeated catalog lookups. When the apply worker encounters an update_origin_differs or delete_origin_differs conflict, it now suppresses logging if the original origin ID matches one of the preserved tablesync origins. --- src/backend/catalog/pg_subscription.c | 40 ++++++- src/backend/commands/subscriptioncmds.c | 2 +- .../replication/logical/sequencesync.c | 3 +- src/backend/replication/logical/tablesync.c | 22 +++- src/backend/replication/logical/worker.c | 111 +++++++++++++++++- src/include/catalog/pg_subscription_rel.h | 7 +- src/include/replication/worker_internal.h | 2 + 7 files changed, 172 insertions(+), 15 deletions(-) mode change 100644 => 100755 src/backend/replication/logical/worker.c diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 1f1fdc75af6..ba057968a84 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -25,6 +25,7 @@ #include "catalog/pg_type.h" #include "foreign/foreign.h" #include "miscadmin.h" +#include "replication/origin.h" #include "storage/lmgr.h" #include "storage/lock.h" #include "utils/acl.h" @@ -327,6 +328,16 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state, values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid); values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid); values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); + + /* + * No tablesync origin is known at start - the origin id is written + * later by UpdateSubscriptionRelState() when the tablesync worker + * transitions the relation to SUBREL_STATE_FINISHEDCOPY. + */ + values[Anum_pg_subscription_rel_srtablesyncoriginid - 1] = + Int16GetDatum((int16) InvalidReplOriginId); + nulls[Anum_pg_subscription_rel_srtablesyncoriginid - 1] = false; + if (XLogRecPtrIsValid(sublsn)) values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); else @@ -356,7 +367,8 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state, */ void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn, bool already_locked) + XLogRecPtr sublsn, bool already_locked, + ReplOriginId originid) { Relation rel; HeapTuple tup; @@ -405,6 +417,30 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state, else nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; + + + /* + * Store the tablesync origin ID used during the initial COPY phase so + * that the apply worker can suppress false update_origin_differs conflicts + * on rows stamped with this origin after crash recovery. If the caller + * passes InvalidReplOriginId, preserve the existing value; all state + * transitions after FINISHEDCOPY have no origin to contribute and pass + * in InvalidReplOriginId and that should not overwrite the one recorded + * during COPY. + */ + + if (originid == InvalidReplOriginId) + { + replaces[Anum_pg_subscription_rel_srtablesyncoriginid - 1] = false; + } + else + { + values[Anum_pg_subscription_rel_srtablesyncoriginid - 1] = + Int16GetDatum((int16) originid); + replaces[Anum_pg_subscription_rel_srtablesyncoriginid - 1] = true; + nulls[Anum_pg_subscription_rel_srtablesyncoriginid - 1] = false; + } + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces); @@ -655,6 +691,8 @@ GetSubscriptionRelations(Oid subid, bool tables, bool sequences, relstate = palloc_object(SubscriptionRelState); relstate->relid = subrel->srrelid; relstate->state = subrel->srsubstate; + relstate->originid = subrel->srtablesyncoriginid; + d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup, Anum_pg_subscription_rel_srsublsn, &isnull); if (isnull) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 523959ba0ce..8e5dbd7defc 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1300,7 +1300,7 @@ AlterSubscription_refresh_seq(Subscription *sub) Oid relid = subrel->relid; UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT, - InvalidXLogRecPtr, false); + InvalidXLogRecPtr, false, InvalidReplOriginId); ereport(DEBUG1, errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state", get_namespace_name(get_rel_namespace(relid)), diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c index e2ff8d77b16..d83b106a835 100644 --- a/src/backend/replication/logical/sequencesync.c +++ b/src/backend/replication/logical/sequencesync.c @@ -59,6 +59,7 @@ #include "pgstat.h" #include "postmaster/interrupt.h" #include "replication/logicalworker.h" +#include "replication/origin.h" #include "replication/worker_internal.h" #include "storage/lwlock.h" #include "utils/acl.h" @@ -372,7 +373,7 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner) * sequence as READY. */ UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY, - seqinfo->page_lsn, false); + seqinfo->page_lsn, false, InvalidReplOriginId); return COPYSEQ_SUCCESS; } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index a04b84ebc1d..ab2d54d5825 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -269,7 +269,8 @@ ProcessSyncingTablesForSync(XLogRecPtr current_lsn) MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn, - false); + false, + InvalidReplOriginId); /* * End streaming so that LogRepWorkerWalRcvConn can be used to drop @@ -469,7 +470,17 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) */ UpdateSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state, - rstate->lsn, true); + rstate->lsn, true, + InvalidReplOriginId); + + /* + * Rebuild the tablesync origins cache now that this relation + * has transitioned to READY. The srtablesyncoriginid written + * at FINISHEDCOPY is now stable and needs to be loaded into + * the cache before the apply worker starts processing WAL + * changes for this relation. + */ + rebuild_tablesync_origins_cache(); } } else @@ -1375,7 +1386,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn, - false); + false, + InvalidReplOriginId); /* * Create the replication origin in a separate transaction from the one @@ -1504,8 +1516,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) MyLogicalRepWorker->relid, SUBREL_STATE_FINISHEDCOPY, MyLogicalRepWorker->relstate_lsn, - false); - + false, + originid); CommitTransactionCommand(); copy_table_done: diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c old mode 100644 new mode 100755 index a3f2406ed83..7062d89a5b1 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -290,6 +290,7 @@ #include "tcop/tcopprot.h" #include "utils/acl.h" #include "utils/guc.h" +#include "utils/hsearch.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -484,6 +485,21 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL; Subscription *MySubscription = NULL; static bool MySubscriptionValid = false; +/* + * Hash table mapping ReplOriginId -> bool for origins used by tablesync + * workers during initial COPY. Built from pg_subscription_rel at apply + * worker startup and refreshed whenever a relation transitions to READY. + * Lets the apply worker suppress false update/delete_origin_differs + * conflicts on rows that were re-stamped with the tablesync origin ID + * during WAL replay after a crash. + */ +typedef struct TablesyncOriginEntry +{ + ReplOriginId originid; /* hash key — must be first */ +} TablesyncOriginEntry; + +static HTAB *tablesync_origins = NULL; + static List *on_commit_wakeup_workers_subids = NIL; bool in_remote_transaction = false; @@ -722,6 +738,75 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) return false; /* dummy for compiler */ } +/* + * Rebuild the in-memory hash table of tablesync origin IDs from + * pg_subscription_rel. Called at apply worker startup and whenever a + * relation transitions to SUBREL_STATE_READY, so newly finished tablesync + * workers are always reflected in the cache. + */ +void +rebuild_tablesync_origins_cache(void) +{ + List *subrels; + ListCell *lc; + HASHCTL ctl; + + /* Destroy the old table if it exists */ + if (tablesync_origins != NULL) + { + hash_destroy(tablesync_origins); + tablesync_origins = NULL; + } + + /* + * Call GetSubscriptionRelations to get all tables for this subscription from + * pg_subscription_rel. + * + */ + subrels = GetSubscriptionRelations(MySubscription->oid, true, false, false); + + foreach(lc, subrels) + { + SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc); + + if (relstate->originid == InvalidReplOriginId) + continue; + + if (tablesync_origins == NULL) + { + memset(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(ReplOriginId); + ctl.entrysize = sizeof(TablesyncOriginEntry); + ctl.hcxt = ApplyContext; + tablesync_origins = hash_create("tablesync origins", + 16, + &ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + } + + hash_search(tablesync_origins, &relstate->originid, HASH_ENTER, NULL); + } + + list_free_deep(subrels); +} + +/* + * is_tablesync_origin + * + * Returns true if the given origin ID is recorded in pg_subscription_rel + * as the tablesync origin for any relation in this subscription. Used to + * suppress false update/delete_origin_differs conflicts on rows that were + * stamped with the tablesync origin ID during WAL replay after a crash. + */ +static inline bool +is_tablesync_origin(ReplOriginId originid) +{ + if (tablesync_origins == NULL || originid == InvalidReplOriginId) + return false; + + return hash_search(tablesync_origins, &originid, HASH_FIND, NULL) != NULL; +} + /* * Begin one step (one INSERT, UPDATE, etc) of a replication transaction. * @@ -2958,11 +3043,13 @@ apply_handle_update_internal(ApplyExecutionData *edata, { /* * Report the conflict if the tuple was modified by a different - * origin. + * origin. Skip if the origin is recorded in pg_subscription_rel + * as a known tablesync origin for this subscription. */ if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin, &conflicttuple.origin, &conflicttuple.ts) && - conflicttuple.origin != replorigin_xact_state.origin) + conflicttuple.origin != replorigin_xact_state.origin && + !is_tablesync_origin(conflicttuple.origin)) { TupleTableSlot *newslot; @@ -2971,7 +3058,6 @@ apply_handle_update_internal(ApplyExecutionData *edata, slot_store_data(newslot, relmapentry, newtup); conflicttuple.slot = localslot; - ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS, remoteslot, newslot, list_make1(&conflicttuple)); @@ -3153,11 +3239,13 @@ apply_handle_delete_internal(ApplyExecutionData *edata, { /* * Report the conflict if the tuple was modified by a different - * origin. + * origin. Skip if the origin is recorded in pg_subscription_rel + * as a known tablesync origin. */ if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin, &conflicttuple.origin, &conflicttuple.ts) && - conflicttuple.origin != replorigin_xact_state.origin) + conflicttuple.origin != replorigin_xact_state.origin && + !is_tablesync_origin(conflicttuple.origin)) { conflicttuple.slot = localslot; ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS, @@ -3525,7 +3613,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin, &conflicttuple.origin, &conflicttuple.ts) && - conflicttuple.origin != replorigin_xact_state.origin) + conflicttuple.origin != replorigin_xact_state.origin && + !is_tablesync_origin(conflicttuple.origin)) { TupleTableSlot *newslot; @@ -5714,6 +5803,16 @@ run_apply_worker(void) origin_startpos = replorigin_session_get_progress(false); CommitTransactionCommand(); + /* + * Build the tablesync origins cache from pg_subscription_rel. This + * lets the apply worker recognise rows that were stamped with a (now + * dropped) tablesync origin ID while applying updates and deletes, and + * suppress false update/delete_origin_differs conflicts for them. + */ + StartTransactionCommand(); + rebuild_tablesync_origins_cache(); + CommitTransactionCommand(); + /* Is the use of a password mandatory? */ must_use_password = MySubscription->passwordrequired && !MySubscription->ownersuperuser; diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 502640d3018..81621ebe464 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -35,6 +35,8 @@ CATALOG(pg_subscription_rel,6102,SubscriptionRelRelationId) Oid srsubid BKI_LOOKUP(pg_subscription); /* Oid of subscription */ Oid srrelid BKI_LOOKUP(pg_class); /* Oid of relation */ char srsubstate; /* state of the relation in subscription */ + int16 srtablesyncoriginid; /* tablesync origin used during COPY, + * InvalidReplOriginId if not applicable */ /* * Although srsublsn is a fixed-width type, it is allowed to be NULL, so @@ -84,6 +86,8 @@ typedef struct SubscriptionRelState Oid relid; XLogRecPtr lsn; char state; + ReplOriginId originid; /* tablesync origin from srtablesyncoriginid, + * InvalidReplOriginId if not set */ } SubscriptionRelState; /* @@ -112,7 +116,8 @@ typedef struct LogicalRepSequenceInfo extern void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock); extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn, bool already_locked); + XLogRecPtr sublsn, bool already_locked, + ReplOriginId originid); extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn); extern void RemoveSubscriptionRel(Oid subid, Oid relid); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 745b7d9e969..6b9233c7e09 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -305,6 +305,8 @@ extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, extern void apply_dispatch(StringInfo s); +extern void rebuild_tablesync_origins_cache(void); + extern void maybe_reread_subscription(void); extern void stream_cleanup_files(Oid subid, TransactionId xid); -- 2.47.3