From 3891223e662574347c461106bde2a1f1b18734ca Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Mon, 18 May 2026 11:25:52 +0000 Subject: [PATCH v35 6/9] Review comment fixes for Implement the conflict insertion infrastructure for the conflict log table Review comment fixes for Implement the conflict insertion infrastructure for the conflict log table --- src/backend/replication/logical/conflict.c | 59 ++++++++++------------ src/backend/replication/logical/worker.c | 2 +- src/test/subscription/t/030_origin.pl | 4 +- src/test/subscription/t/035_conflicts.pl | 4 +- 4 files changed, 32 insertions(+), 37 deletions(-) diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index ed2ebae76a4..9764f2a5aaa 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -73,6 +73,17 @@ const ConflictLogColumnDef ConflictLogSchema[] = { StaticAssertDecl(lengthof(ConflictLogSchema) == NUM_CONFLICT_ATTRS, "ConflictLogSchema length mismatch"); +/* Schema for the elements within the 'local_conflicts' JSON array */ +static const ConflictLogColumnDef LocalConflictSchema[] = +{ + { .attname = "xid", .atttypid = XIDOID }, + { .attname = "commit_ts", .atttypid = TIMESTAMPTZOID }, + { .attname = "origin", .atttypid = TEXTOID }, + { .attname = "key", .atttypid = JSONOID }, + { .attname = "tuple", .atttypid = JSONOID } +}; + +#define NUM_LOCAL_CONFLICT_ATTRS lengthof(LocalConflictSchema) static const char *const ConflictTypeNames[] = { [CT_INSERT_EXISTS] = "insert_exists", @@ -85,17 +96,7 @@ static const char *const ConflictTypeNames[] = { [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts" }; -/* Schema for the elements within the 'local_conflicts' JSON array */ -static const ConflictLogColumnDef LocalConflictSchema[] = -{ - { .attname = "xid", .atttypid = XIDOID }, - { .attname = "commit_ts", .atttypid = TIMESTAMPTZOID }, - { .attname = "origin", .atttypid = TEXTOID }, - { .attname = "key", .atttypid = JSONOID }, - { .attname = "tuple", .atttypid = JSONOID } -}; -#define MAX_LOCAL_CONFLICT_INFO_ATTRS lengthof(LocalConflictSchema) static int errcode_apply_conflict(ConflictType type); static void errdetail_apply_conflict(EState *estate, @@ -345,10 +346,8 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, */ conflictlogrel = GetConflictLogDestAndTable(&dest); - if (dest == CONFLICT_LOG_DEST_TABLE || dest == CONFLICT_LOG_DEST_ALL) - log_dest_clt = true; - if (dest == CONFLICT_LOG_DEST_LOG || dest == CONFLICT_LOG_DEST_ALL) - log_dest_logfile = true; + log_dest_clt = CONFLICTS_LOGGED_TO_TABLE(dest); + log_dest_logfile = CONFLICTS_LOGGED_TO_FILE(dest); /* Insert to table if requested. */ if (log_dest_clt) @@ -380,9 +379,8 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, */ ereport(elevel, errcode_apply_conflict(type), - errmsg("conflict detected on relation \"%s.%s\": conflict=%s", - get_namespace_name(RelationGetNamespace(localrel)), - RelationGetRelationName(localrel), + errmsg("conflict detected on relation \"%s\": conflict=%s", + RelationGetQualifiedRelationName(localrel), ConflictTypeNames[type]), errdetail("Conflict details are logged to the conflict log table: %s", RelationGetRelationName(conflictlogrel))); @@ -411,9 +409,8 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, /* Standard reporting with full internal details. */ ereport(elevel, errcode_apply_conflict(type), - errmsg("conflict detected on relation \"%s.%s\": conflict=%s", - get_namespace_name(RelationGetNamespace(localrel)), - RelationGetRelationName(localrel), + errmsg("conflict detected on relation \"%s\": conflict=%s", + RelationGetQualifiedRelationName(localrel), ConflictTypeNames[type]), errdetail_internal("%s", err_detail.data)); } @@ -469,7 +466,7 @@ GetConflictLogDestAndTable(ConflictLogDest *log_dest) *log_dest = GetLogDestination(MySubscription->conflictlogdest); /* Quick exit if a conflict log table was not requested. */ - if (*log_dest == CONFLICT_LOG_DEST_LOG) + if (!CONFLICTS_LOGGED_TO_TABLE(*log_dest)) return NULL; conflictlogrelid = MySubscription->conflictlogrelid; @@ -489,13 +486,11 @@ GetConflictLogDestAndTable(ConflictLogDest *log_dest) void InsertConflictLogTuple(Relation conflictlogrel) { - int options = HEAP_INSERT_NO_LOGICAL; - /* A valid tuple must be prepared and stored in MyLogicalRepWorker. */ Assert(MyLogicalRepWorker->conflict_log_tuple != NULL); heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple, - GetCurrentCommandId(true), options, NULL); + GetCurrentCommandId(true), HEAP_INSERT_NO_LOGICAL, NULL); /* Free conflict log tuple. */ heap_freetuple(MyLogicalRepWorker->conflict_log_tuple); @@ -1077,9 +1072,9 @@ build_conflict_tupledesc(void) { TupleDesc tupdesc; - tupdesc = CreateTemplateTupleDesc(MAX_LOCAL_CONFLICT_INFO_ATTRS); + tupdesc = CreateTemplateTupleDesc(NUM_LOCAL_CONFLICT_ATTRS); - for (int i = 0; i < MAX_LOCAL_CONFLICT_INFO_ATTRS; i++) + for (int i = 0; i < NUM_LOCAL_CONFLICT_ATTRS; i++) TupleDescInitEntry(tupdesc, (AttrNumber) (i + 1), LocalConflictSchema[i].attname, LocalConflictSchema[i].atttypid, @@ -1120,8 +1115,8 @@ build_local_conflicts_json_array(EState *estate, Relation rel, /* Process local conflict tuple list and prepare an array of JSON. */ foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples) { - Datum values[MAX_LOCAL_CONFLICT_INFO_ATTRS] = {0}; - bool nulls[MAX_LOCAL_CONFLICT_INFO_ATTRS] = {0}; + Datum values[NUM_LOCAL_CONFLICT_ATTRS] = {0}; + bool nulls[NUM_LOCAL_CONFLICT_ATTRS] = {0}; char *origin_name = NULL; HeapTuple tuple; Datum json_datum; @@ -1171,7 +1166,7 @@ build_local_conflicts_json_array(EState *estate, Relation rel, else nulls[attno] = true; - Assert(attno + 1 == MAX_LOCAL_CONFLICT_INFO_ATTRS); + Assert(attno + 1 == NUM_LOCAL_CONFLICT_ATTRS); tuple = heap_form_tuple(tupdesc, values, nulls); @@ -1230,8 +1225,8 @@ prepare_conflict_log_tuple(EState *estate, Relation rel, List *conflicttuples, TupleTableSlot *remoteslot) { - Datum values[MAX_CONFLICT_ATTR_NUM] = {0}; - bool nulls[MAX_CONFLICT_ATTR_NUM] = {0}; + Datum values[NUM_CONFLICT_ATTRS] = {0}; + bool nulls[NUM_CONFLICT_ATTRS] = {0}; int attno; char *remote_origin = NULL; MemoryContext oldctx; @@ -1297,7 +1292,7 @@ prepare_conflict_log_tuple(EState *estate, Relation rel, conflict_type, conflicttuples); - Assert(attno + 1 == MAX_CONFLICT_ATTR_NUM); + Assert(attno + 1 == NUM_CONFLICT_ATTRS); oldctx = MemoryContextSwitchTo(ApplyContext); MyLogicalRepWorker->conflict_log_tuple = diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 469451c736a..ba74222f921 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -5688,7 +5688,7 @@ start_apply(XLogRecPtr origin_startpos) /* Open conflict log table and insert the tuple. */ conflictlogrel = GetConflictLogDestAndTable(&dest); - Assert(dest != CONFLICT_LOG_DEST_LOG); + Assert(CONFLICTS_LOGGED_TO_TABLE(dest)); InsertConflictLogTuple(conflictlogrel); table_close(conflictlogrel, RowExclusiveLock); diff --git a/src/test/subscription/t/030_origin.pl b/src/test/subscription/t/030_origin.pl index 6bc6b7874c2..5f4d00bdd33 100644 --- a/src/test/subscription/t/030_origin.pl +++ b/src/test/subscription/t/030_origin.pl @@ -166,7 +166,7 @@ is($result, qq(32), 'The node_A data replicated to node_B'); $node_C->safe_psql('postgres', "UPDATE $tab SET a = 33 WHERE a = 32;"); $node_B->wait_for_log( - qr/conflict detected on relation "public.$tab_unquoted": conflict=update_origin_differs.*\n.*DETAIL:.* Updating the row that was modified by a different origin ".*" in transaction [0-9]+ at .*: local row \(32\), remote row \(33\), replica identity \(a\)=\(32\)./ + qr/conflict detected on relation "public.$tab": conflict=update_origin_differs.*\n.*DETAIL:.* Updating the row that was modified by a different origin ".*" in transaction [0-9]+ at .*: local row \(32\), remote row \(33\), replica identity \(a\)=\(32\)./ ); $node_B->safe_psql('postgres', "DELETE FROM $tab;"); @@ -182,7 +182,7 @@ is($result, qq(33), 'The node_A data replicated to node_B'); $node_C->safe_psql('postgres', "DELETE FROM $tab WHERE a = 33;"); $node_B->wait_for_log( - qr/conflict detected on relation "public.$tab_unquoted": conflict=delete_origin_differs.*\n.*DETAIL:.* Deleting the row that was modified by a different origin ".*" in transaction [0-9]+ at .*: local row \(33\), replica identity \(a\)=\(33\).*/ + qr/conflict detected on relation "public.$tab": conflict=delete_origin_differs.*\n.*DETAIL:.* Deleting the row that was modified by a different origin ".*" in transaction [0-9]+ at .*: local row \(33\), replica identity \(a\)=\(33\).*/ ); # The remaining tests no longer test conflict detection. diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl index 05c2179b9a8..4f3880e5b83 100644 --- a/src/test/subscription/t/035_conflicts.pl +++ b/src/test/subscription/t/035_conflicts.pl @@ -85,11 +85,11 @@ $node_subscriber->wait_for_log( $log_offset); # Verify the contents of the Conflict Log Table (CLT) -# This section ensures that the clt contains the expected +# This section ensures that the CLT contains the expected # type and specific key data. my $subid = $node_subscriber->safe_psql('postgres', "SELECT oid FROM pg_subscription WHERE subname = 'sub_tab';"); -my $clt = "pg_conflict.pg_conflict_log_$subid"; +my $clt = "pg_conflict.pg_conflict_log_for_subid_$subid"; # Wait for the conflict to be logged in the CLT my $log_check = $node_subscriber->poll_query_until( -- 2.53.0