From 4028b33496732bf7ee4e3140134b49e98d7cffcc Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Thu, 2 Jul 2026 09:17:51 +0530 Subject: [PATCH v63 1/3] Implement the conflict insertion infrastructure for the conflict log table This patch introduces the core logic to populate the conflict log table whenever a logical replication conflict is detected. It captures the remote transaction details along with the corresponding local state at the time of the conflict. Handling Multi-row Conflicts: A single remote tuple may conflict with multiple local tuples (e.g., in the case of multiple_unique_conflicts). To handle this, the infrastructure creates a single row in the conflict log table for each remote tuple. The details of all conflicting local rows are aggregated into a single JSON array in the local_conflicts column. The JSON array uses the following structured format: [ { "xid": "1001", "commit_ts": "2025-12-25 10:00:00+05:30", "origin": "node_1", "key": {"id": 1}, "tuple": {"id": 1, "val": "old_data"} }, ... ] Example of querying the structured conflict data: SELECT remote_xid, relname, remote_origin, local_conflicts[1] ->> 'xid' AS local_xid, local_conflicts[1] ->> 'tuple' AS local_tuple FROM pg_conflict.pg_conflict_log_16396; remote_xid | relname | remote_origin | local_xid | local_tuple ------------+----------+---------------+-----------+--------------------- 760 | test | pg_16406 | 771 | {"a":1,"b":10} 765 | conf_tab | pg_16406 | 775 | {"a":2,"b":2,"c":2} --- doc/src/sgml/logical-replication.sgml | 659 +++++++++------ doc/src/sgml/ref/alter_subscription.sgml | 3 +- doc/src/sgml/ref/create_subscription.sgml | 17 +- .../replication/logical/applyparallelworker.c | 85 +- src/backend/replication/logical/conflict.c | 777 +++++++++++++++++- src/backend/replication/logical/worker.c | 67 +- src/include/replication/conflict.h | 4 + src/include/replication/worker_internal.h | 7 + src/include/utils/rel.h | 13 +- src/test/subscription/meson.build | 1 + src/test/subscription/t/030_origin.pl | 4 +- src/test/subscription/t/035_conflicts.pl | 49 +- .../t/039_pa_conflict_log_lock_wait.pl | 203 +++++ src/tools/pgindent/typedefs.list | 1 + 14 files changed, 1554 insertions(+), 336 deletions(-) create mode 100644 src/test/subscription/t/039_pa_conflict_log_lock_wait.pl diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index f21239af6b8..e0399eed6b7 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -2024,120 +2024,230 @@ Included in publications: operations will simply be skipped. - - Additional logging is triggered, and the conflict statistics are collected (displayed in the - pg_stat_subscription_stats view) - in the following conflict cases: - - - insert_exists - - - Inserting a row that violates a NOT DEFERRABLE - unique constraint. Note that to log the origin and commit - timestamp details of the conflicting key, - track_commit_timestamp - should be enabled on the subscriber. In this case, an error will be - raised until the conflict is resolved manually. - - - - - update_origin_differs - - - Updating a row that was previously modified by another origin. - Note that this conflict can only be detected when - track_commit_timestamp - is enabled on the subscriber. Currently, the update is always applied - regardless of the origin of the local row. - - - - - update_exists - - - The updated value of a row violates a NOT DEFERRABLE - unique constraint. Note that to log the origin and commit - timestamp details of the conflicting key, - track_commit_timestamp - should be enabled on the subscriber. In this case, an error will be - raised until the conflict is resolved manually. Note that when updating a - partitioned table, if the updated row value satisfies another partition - constraint resulting in the row being inserted into a new partition, the - insert_exists conflict may arise if the new row - violates a NOT DEFERRABLE unique constraint. - - - - - update_deleted - - - The tuple to be updated was concurrently deleted by another origin. The - update will simply be skipped in this scenario. Note that this conflict - can only be detected when - track_commit_timestamp - and retain_dead_tuples - are enabled. Note that if a tuple cannot be found due to the table being - truncated, only a update_missing conflict will - arise. Additionally, if the tuple was deleted by the same origin, an - update_missing conflict will arise. - - - - - update_missing - - - The row to be updated was not found. The update will simply be - skipped in this scenario. - - - - - delete_origin_differs - - - Deleting a row that was previously modified by another origin. Note that - this conflict can only be detected when - track_commit_timestamp - is enabled on the subscriber. Currently, the delete is always applied - regardless of the origin of the local row. - - - - - delete_missing - - - The row to be deleted was not found. The delete will simply be - skipped in this scenario. - - - - - multiple_unique_conflicts - - - Inserting or updating a row violates multiple - NOT DEFERRABLE unique constraints. Note that to log - the origin and commit timestamp details of conflicting keys, ensure - that track_commit_timestamp - is enabled on the subscriber. In this case, an error will be raised until - the conflict is resolved manually. - - - - - Note that there are other conflict scenarios, such as exclusion constraint - violations. Currently, we do not provide additional details for them in the - log. - + + Conflict logging + + Additional logging is triggered, and the conflict statistics are collected (displayed in the + pg_stat_subscription_stats view) + in the following conflict cases: + + + insert_exists + + + Inserting a row that violates a NOT DEFERRABLE + unique constraint. Note that to log the origin and commit + timestamp details of the conflicting key, + track_commit_timestamp + should be enabled on the subscriber. In this case, an error will be + raised until the conflict is resolved manually. + + + + + update_origin_differs + + + Updating a row that was previously modified by another origin. + Note that this conflict can only be detected when + track_commit_timestamp + is enabled on the subscriber. Currently, the update is always applied + regardless of the origin of the local row. + + + + + update_exists + + + The updated value of a row violates a NOT DEFERRABLE + unique constraint. Note that to log the origin and commit + timestamp details of the conflicting key, + track_commit_timestamp + should be enabled on the subscriber. In this case, an error will be + raised until the conflict is resolved manually. Note that when updating a + partitioned table, if the updated row value satisfies another partition + constraint resulting in the row being inserted into a new partition, the + insert_exists conflict may arise if the new row + violates a NOT DEFERRABLE unique constraint. + + + + + update_deleted + + + The tuple to be updated was concurrently deleted by another origin. The + update will simply be skipped in this scenario. Note that this conflict + can only be detected when + track_commit_timestamp + and retain_dead_tuples + are enabled. Note that if a tuple cannot be found due to the table being + truncated, only a update_missing conflict will + arise. Additionally, if the tuple was deleted by the same origin, an + update_missing conflict will arise. + + + + + update_missing + + + The row to be updated was not found. The update will simply be + skipped in this scenario. + + + + + delete_origin_differs + + + Deleting a row that was previously modified by another origin. Note that + this conflict can only be detected when + track_commit_timestamp + is enabled on the subscriber. Currently, the delete is always applied + regardless of the origin of the local row. + + + + + delete_missing + + + The row to be deleted was not found. The delete will simply be + skipped in this scenario. + + + + + multiple_unique_conflicts + + + Inserting or updating a row violates multiple + NOT DEFERRABLE unique constraints. Note that to log + the origin and commit timestamp details of conflicting keys, ensure + that track_commit_timestamp + is enabled on the subscriber. In this case, an error will be raised until + the conflict is resolved manually. + + + + + Note that there are other conflict scenarios, such as exclusion constraint + violations. Currently, we do not provide additional details for them in the + log. + + - - The log format for logical replication conflicts is as follows: + + Table-based logging + + If the conflict_log_destination + parameter is set to table or all, + a dedicated conflict log table will be automatically created. This table is + created in the pg_conflict namespace. The name of the + conflict log table is + pg_conflict_log_<subid>. The predefined + schema of this table is detailed in + . + + + + Conflict Log Table Schema + + + + Column + Type + Description + + + + + relid + oid + The OID of the local table where the conflict occurred. + + + schemaname + text + The schema name of the conflicting table. + + + relname + text + The name of the conflicting table. + + + conflict_type + text + The type of conflict that occurred (e.g., insert_exists). + + + remote_xid + xid + The remote transaction ID that caused the conflict. + + + remote_commit_lsn + pg_lsn + The final LSN of the remote transaction. + + + remote_commit_ts + timestamptz + The remote commit timestamp of the remote transaction. + + + remote_origin + text + The origin of the remote transaction. + + + replica_identity_full + boolean + Indicates whether replica_identity represents a full tuple (true) or key values of a replica identity index (false). + + + replica_identity + json + The JSON representation of the replica identity key values or full tuple. + + + remote_tuple + json + The JSON representation of the incoming remote row that caused + the conflict. + + + local_conflicts + json[] + + An array of JSON objects representing the state of existing local + row(s) that caused the conflict. Each object includes the local + transaction ID (xid), commit timestamp + (commit_ts), origin (origin), + conflicting key data (key), and the full local row + image (tuple). + + + + +
+ + + The conflicting row data, including the incoming remote row (remote_tuple) + and the associated local conflict details (local_conflicts), is stored in + JSON formats for flexible querying and analysis. + +
+ + + File-based logging + + If the conflict_log_destination + parameter is set to log or all, conflicts + are logged to the server log using the following format: LOG: conflict detected on relation "schemaname.tablename": conflict=conflict_type DETAIL: detailed_explanation[: detail_values [, ... ]]. @@ -2150,182 +2260,185 @@ DETAIL: detailed_explanation[: replica identity {(column_name , ...)=(column_value , ...) | full (column_name , ...)=(column_value , ...)} - The log provides the following information: - - - LOG - + The log provides the following information: + + + LOG + + + + + schemaname.tablename + identifies the local relation involved in the conflict. + + + + + conflict_type is the type of conflict that occurred + (e.g., insert_exists, update_exists). + + + + + + + + DETAIL + - schemaname.tablename - identifies the local relation involved in the conflict. + detailed_explanation includes + the origin, transaction ID, and commit timestamp of the transaction that + modified the local row, if available. + + + + + The key section includes the key values of the local + row that violated a unique constraint for + insert_exists, update_exists or + multiple_unique_conflicts conflicts. + + + + + The local row section includes the local row if its + origin differs from the remote row for + update_origin_differs or delete_origin_differs + conflicts, or if the key value conflicts with the remote row for + insert_exists, update_exists or + multiple_unique_conflicts conflicts. + + + + + The remote row section includes the new row from + the remote insert or update operation that caused the conflict. Note that + for an update operation, the column value of the new row will be null + if the value is unchanged and toasted. + + + + + The replica identity section includes the replica + identity key values that were used to search for the existing local + row to be updated or deleted. This may include the full row value + if the local relation is marked with + REPLICA IDENTITY FULL. - conflict_type is the type of conflict that occurred - (e.g., insert_exists, update_exists). + column_name is the column name. + For local row, remote row, and + replica identity full cases, column names are + logged only if the user lacks the privilege to access all columns of + the table. If column names are present, they appear in the same order + as the corresponding column values. + + + + + column_value is the column value. + The large column values are truncated to 64 bytes. + + + + + Note that in case of multiple_unique_conflicts conflict, + multiple detailed_explanation + and detail_values lines + will be generated, each detailing the conflict information associated + with distinct unique constraints. - - - - DETAIL - - - - - detailed_explanation includes - the origin, transaction ID, and commit timestamp of the transaction that - modified the local row, if available. - - - - - The key section includes the key values of the local - row that violated a unique constraint for - insert_exists, update_exists or - multiple_unique_conflicts conflicts. - - - - - The local row section includes the local row if its - origin differs from the remote row for - update_origin_differs or delete_origin_differs - conflicts, or if the key value conflicts with the remote row for - insert_exists, update_exists or - multiple_unique_conflicts conflicts. - - - - - The remote row section includes the new row from - the remote insert or update operation that caused the conflict. Note that - for an update operation, the column value of the new row will be null - if the value is unchanged and toasted. - - - - - The replica identity section includes the replica - identity key values that were used to search for the existing local - row to be updated or deleted. This may include the full row value - if the local relation is marked with - REPLICA IDENTITY FULL. - - - - - column_name is the column name. - For local row, remote row, and - replica identity full cases, column names are - logged only if the user lacks the privilege to access all columns of - the table. If column names are present, they appear in the same order - as the corresponding column values. - - - - - column_value is the column value. - The large column values are truncated to 64 bytes. - - - - - Note that in case of multiple_unique_conflicts conflict, - multiple detailed_explanation - and detail_values lines - will be generated, each detailing the conflict information associated - with distinct unique - constraints. - - - - - - - + + +
+ - - Logical replication operations are performed with the privileges of the role - which owns the subscription. Permissions failures on target tables will - cause replication conflicts, as will enabled - row-level security on target tables - that the subscription owner is subject to, without regard to whether any - policy would ordinarily reject the INSERT, - UPDATE, DELETE or - TRUNCATE which is being replicated. This restriction on - row-level security may be lifted in a future version of - PostgreSQL. - + + Notes + + Logical replication operations are performed with the privileges of the role + which owns the subscription. Permissions failures on target tables will + cause replication conflicts, as will enabled + row-level security on target tables + that the subscription owner is subject to, without regard to whether any + policy would ordinarily reject the INSERT, + UPDATE, DELETE or + TRUNCATE which is being replicated. This restriction on + row-level security may be lifted in a future version of + PostgreSQL. + - - A conflict that produces an error will stop the replication; it must be - resolved manually by the user. Details about the conflict can be found in - the subscriber's server log. - + + A conflict that produces an error will stop the replication; it must be + resolved manually by the user. Details about the conflict can be found in + the subscriber's server log. + - - The resolution can be done either by changing data or permissions on the subscriber so - that it does not conflict with the incoming change or by skipping the - transaction that conflicts with the existing data. When a conflict produces - an error, the replication won't proceed, and the logical replication worker will - emit the following kind of message to the subscriber's server log: + + The resolution can be done either by changing data or permissions on the subscriber so + that it does not conflict with the incoming change or by skipping the + transaction that conflicts with the existing data. When a conflict produces + an error, the replication won't proceed, and the logical replication worker will + emit the following kind of message to the subscriber's server log: ERROR: conflict detected on relation "public.test": conflict=insert_exists DETAIL: Could not apply remote change: remote row (1, 'remote'). Key already exists in unique index "test_pkey", modified locally in transaction 800 at 2026-01-16 18:15:25.652759+09: key (c)=(1), local row (1, 'local'). CONTEXT: processing remote data for replication origin "pg_16395" during "INSERT" for replication target relation "public.test" in transaction 725 finished at 0/014C0378 - The LSN of the transaction that contains the change violating the constraint and - the replication origin name can be found from the server log (LSN 0/014C0378 and - replication origin pg_16395 in the above case). The - transaction that produced the conflict can be skipped by using - ALTER SUBSCRIPTION ... SKIP - with the finish LSN - (i.e., LSN 0/014C0378). The finish LSN could be an LSN at which the transaction - is committed or prepared on the publisher. Alternatively, the transaction can - also be skipped by calling the - pg_replication_origin_advance() function. - Before using this function, the subscription needs to be disabled temporarily - either by - ALTER SUBSCRIPTION ... DISABLE or, the - subscription can be used with the - disable_on_error - option. Then, you can use pg_replication_origin_advance() - function with the node_name (i.e., pg_16395) - and the next LSN of the finish LSN (i.e., 0/014C0379). The current position of - origins can be seen in the - pg_replication_origin_status system view. - Please note that skipping the whole transaction includes skipping changes that - might not violate any constraint. This can easily make the subscriber - inconsistent. - The additional details regarding conflicting rows, such as their origin and - commit timestamp can be seen in the DETAIL line of the - log. But note that this information is only available when - track_commit_timestamp - is enabled on the subscriber. Users can use this information to decide - whether to retain the local change or adopt the remote alteration. For - instance, the DETAIL line in the above log indicates that - the existing row was modified locally. Users can manually perform a - remote-change-win. - - - - When the - streaming - mode is parallel, the finish LSN of failed transactions - may not be logged. In that case, it may be necessary to change the streaming - mode to on or off and cause the same - conflicts again so the finish LSN of the failed transaction will be written - to the server log. For the usage of finish LSN, please refer to ALTER SUBSCRIPTION ... - SKIP. - + The LSN of the transaction that contains the change violating the constraint and + the replication origin name can be found from the server log (LSN 0/014C0378 and + replication origin pg_16395 in the above case). The + transaction that produced the conflict can be skipped by using + ALTER SUBSCRIPTION ... SKIP + with the finish LSN + (i.e., LSN 0/014C0378). The finish LSN could be an LSN at which the transaction + is committed or prepared on the publisher. Alternatively, the transaction can + also be skipped by calling the + pg_replication_origin_advance() function. + Before using this function, the subscription needs to be disabled temporarily + either by + ALTER SUBSCRIPTION ... DISABLE or, the + subscription can be used with the + disable_on_error + option. Then, you can use pg_replication_origin_advance() + function with the node_name (i.e., pg_16395) + and the next LSN of the finish LSN (i.e., 0/014C0379). The current position of + origins can be seen in the + pg_replication_origin_status system view. + Please note that skipping the whole transaction includes skipping changes that + might not violate any constraint. This can easily make the subscriber + inconsistent. + The additional details regarding conflicting rows, such as their origin and + commit timestamp can be seen in the DETAIL line of the + log. But note that this information is only available when + track_commit_timestamp + is enabled on the subscriber. Users can use this information to decide + whether to retain the local change or adopt the remote alteration. For + instance, the DETAIL line in the above log indicates that + the existing row was modified locally. Users can manually perform a + remote-change-win. + + + + When the + streaming + mode is parallel, the finish LSN of failed transactions + may not be logged. In that case, it may be necessary to change the streaming + mode to on or off and cause the same + conflicts again so the finish LSN of the failed transaction will be written + to the server log. For the usage of finish LSN, please refer to ALTER SUBSCRIPTION ... + SKIP. + + @@ -2430,6 +2543,14 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER key or replica identity defined for it. + + + + Conflict log tables (see conflict_log_destination parameter) + are never published, even when using FOR ALL TABLES in a + publication. + + diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index fad1f90956a..13b413d142f 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -358,7 +358,8 @@ ALTER SUBSCRIPTION name RENAME TO < When the conflict_log_destination parameter is set to table or all, the system automatically creates the conflict log table. Conversely, if the destination is changed to - log, the conflict log table is automatically dropped. + log, logging to the table stops and the conflict log + table is automatically dropped. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 81fbf3487a4..0be1aa4698d 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -272,8 +272,10 @@ CREATE SUBSCRIPTION subscription_namelog - Conflict details are recorded in the server log. - This is the default behavior. + Conflict details are recorded in the server log. This is + the default behavior. See + + for details. @@ -284,14 +286,17 @@ CREATE SUBSCRIPTION subscription_namepg_conflict_log_<subid> in the pg_conflict schema. This allows for easy - querying and analysis of conflicts. + querying and analysis of conflicts. See + + for details. The conflict log table is strictly tied to the lifecycle of the subscription or the conflict_log_destination setting. If the subscription is dropped, or if the destination is changed to - log, the table is dropped. + log, the table and all its recorded conflict data are + permanently deleted. @@ -300,8 +305,8 @@ CREATE SUBSCRIPTION subscription_nameall - This is equivalent to configuring both destinations log - and table together. + Records conflict details to both destinations log + and table. diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 012d55e9d3d..55a0ee4831a 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -986,7 +986,66 @@ ParallelApplyWorkerMain(Datum main_arg) set_apply_error_context_origin(originname); - LogicalParallelApplyLoop(mqh); + PG_TRY(); + { + LogicalParallelApplyLoop(mqh); + } + PG_CATCH(); + { + MemoryContext oldcontext; + ErrorData *edata; + + /* + * Reset the origin state to prevent the advancement of origin + * progress if we fail to apply. Otherwise, this will result in + * transaction loss as that transaction won't be sent again by the + * server. + */ + replorigin_xact_clear(true); + + /* + * Copy the error and recover to an idle state so we can insert the + * deferred conflict log tuple (if any) before re-throwing. Copy the + * error into a longer-lived context first, as it may have been raised + * under ErrorContext. Also reset the error context stack: the + * callbacks in effect when the error was thrown belong to unwound + * stack frames, and the deferred insert installs its own. + */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + edata = CopyErrorData(); + MemoryContextSwitchTo(oldcontext); + + FlushErrorState(); + error_context_stack = NULL; + + /* + * Tell the leader we failed and are about to report the error and log + * the conflict. This must be set before AbortOutOfAnyTransaction() + * below releases the transaction lock that the leader waits on in + * pa_wait_for_xact_finish(); otherwise the leader would see a + * non-finished state, assume the connection was lost, and tear this + * worker down while it is still writing the conflict log tuple. + */ + pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_ERROR); + + AbortOutOfAnyTransaction(); + + /* + * Insert the deferred conflict log tuple before re-throwing. + * Re-throwing is what reports the error to the leader (via the error + * queue set up above), so the insertion must happen first: otherwise + * the leader could start tearing down this worker while it is still + * writing the conflict log tuple. If the insertion itself fails, + * that error (annotated with the conflict context, see + * InsertConflictLogTuple) propagates to the leader instead of the + * original. + */ + ProcessPendingConflictLogTuple(); + + /* Re-throw the original error, which reports it to the leader. */ + ReThrowError(edata); + } + PG_END_TRY(); /* * The parallel apply worker must not get here because the parallel apply @@ -1314,9 +1373,33 @@ pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo) * released. */ if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED) + { + /* + * If the worker signalled that it errored (PARALLEL_TRANS_ERROR), it + * is logging the conflict and will report the actual error via the + * error queue before exiting. Wait for that rather than reporting a + * generic lost connection: CHECK_FOR_INTERRUPTS() drives + * ProcessParallelApplyMessages(), which raises the real error on the + * worker's ErrorResponse (or "lost connection" if the worker died + * without reporting). Waiting here also keeps the worker alive long + * enough to finish writing the conflict log tuple. + */ + while (pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_ERROR) + { + CHECK_FOR_INTERRUPTS(); + + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 10L, + WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE); + + ResetLatch(MyLatch); + } + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("lost connection to the logical replication parallel apply worker"))); + } } /* diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 5ea8c455cd4..5551d87d1bc 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -16,17 +16,25 @@ #include "access/commit_ts.h" #include "access/genam.h" +#include "access/heapam.h" #include "access/tableam.h" +#include "access/xact.h" #include "catalog/heap.h" #include "catalog/pg_am.h" #include "catalog/pg_namespace.h" #include "catalog/toasting.h" #include "executor/executor.h" +#include "funcapi.h" #include "pgstat.h" #include "replication/conflict.h" #include "replication/worker_internal.h" #include "storage/lmgr.h" +#include "utils/array.h" +#include "utils/builtins.h" +#include "utils/injection_point.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/pg_lsn.h" /* * String representations for the supported conflict logging destinations. @@ -40,7 +48,6 @@ const char *const ConflictLogDestNames[] = { StaticAssertDecl(lengthof(ConflictLogDestNames) == CONFLICT_LOG_DEST_ALL + 1, "ConflictLogDestNames length mismatch"); - /* Structure to hold metadata for one column of the conflict log table */ typedef struct ConflictLogColumnDef { @@ -81,6 +88,18 @@ static const ConflictLogColumnDef ConflictLogSchema[] = { #define NUM_CONFLICT_ATTRS lengthof(ConflictLogSchema) +/* Schema for the elements within the 'local_conflicts' JSON array */ +static const ConflictLogColumnDef LocalConflictSchema[] = +{ + {.attname = "xid", .atttypid = XIDOID}, + {.attname = "commit_ts", .atttypid = TIMESTAMPTZOID}, + {.attname = "origin", .atttypid = TEXTOID}, + {.attname = "key", .atttypid = JSONOID}, + {.attname = "tuple", .atttypid = JSONOID} +}; + +#define NUM_LOCAL_CONFLICT_ATTRS lengthof(LocalConflictSchema) + static const char *const ConflictTypeNames[] = { [CT_INSERT_EXISTS] = "insert_exists", [CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs", @@ -92,6 +111,25 @@ static const char *const ConflictTypeNames[] = { [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts" }; +/* + * Worker-private state for a conflict that has been prepared for the conflict + * log table but not yet inserted. It carries the prepared tuple, and a + * description of the conflict used for error context, from + * prepare_conflict_log_tuple() across the apply error boundary to + * ProcessPendingConflictLogTuple()/InsertConflictLogTuple(). Both pointers + * reference memory allocated in ApplyContext. + * + * This is purely process-local state, so it lives here rather than in the + * shared LogicalRepWorker struct. + */ +typedef struct PendingConflictLogData +{ + HeapTuple tuple; /* prepared, not-yet-inserted conflict tuple */ + char *errcontext_str; /* conflict description for error context */ +} PendingConflictLogData; + +static PendingConflictLogData pending_conflict_log = {0}; + static int errcode_apply_conflict(ConflictType type); static void errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, @@ -108,8 +146,27 @@ static void get_tuple_desc(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, char **remote_desc, TupleTableSlot *searchslot, char **search_desc, Oid indexoid); +static void build_index_datums_from_slot(EState *estate, Relation localrel, + TupleTableSlot *slot, + Relation indexDesc, Datum *values, + bool *isnull); static char *build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot, Oid indexoid); +static Datum tuple_table_slot_to_json_datum(TupleTableSlot *slot); +static Datum tuple_table_slot_to_indextup_json(EState *estate, + Relation localrel, + Oid replica_index, + TupleTableSlot *slot); +static TupleDesc build_conflict_tupledesc(void); +static Datum build_local_conflicts_json_array(EState *estate, Relation rel, + ConflictType conflict_type, + List *conflicttuples); +static void prepare_conflict_log_tuple(EState *estate, Relation rel, + Relation conflictlogrel, + ConflictType conflict_type, + TupleTableSlot *searchslot, + List *conflicttuples, + TupleTableSlot *remoteslot); /* * Builds the TupleDesc for the conflict log table. @@ -279,29 +336,219 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, TupleTableSlot *remoteslot, List *conflicttuples) { Relation localrel = relinfo->ri_RelationDesc; - StringInfoData err_detail; + ConflictLogDest dest; + Relation conflictlogrel; + bool log_dest_table; + bool log_dest_logfile; - initStringInfo(&err_detail); + pgstat_report_subscription_conflict(MySubscription->oid, type); - /* Form errdetail message by combining conflicting tuples information. */ - foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples) - errdetail_apply_conflict(estate, relinfo, type, searchslot, - conflicttuple->slot, remoteslot, - conflicttuple->indexoid, - conflicttuple->xmin, - conflicttuple->origin, - conflicttuple->ts, - &err_detail); + /* + * Get the conflict log destination. Also, if a table is one of the + * destinations, return the CLT relation already opened and ready for + * insertion -- or NULL if it has been dropped concurrently. + */ + conflictlogrel = GetConflictLogDestAndTable(&dest); - pgstat_report_subscription_conflict(MySubscription->oid, type); + log_dest_table = CONFLICTS_LOGGED_TO_TABLE(dest); + log_dest_logfile = CONFLICTS_LOGGED_TO_LOG(dest); + + /* + * If a conflict log table was requested but it has been dropped + * concurrently (e.g. a concurrent ALTER SUBSCRIPTION changed + * conflict_log_destination), GetConflictLogDestAndTable() returned NULL. + * Fall back to logging the full conflict details to the server log so + * that the conflict is not lost. + */ + if (log_dest_table && conflictlogrel == NULL) + { + log_dest_table = false; + log_dest_logfile = true; + } + + /* + * Prepare the conflict log tuple first when the destination includes the + * table. This must happen before the ereport() below, because for an + * ERROR-level conflict that ereport() raises the error and defers the + * actual insertion to ProcessPendingConflictLogTuple(), which relies on + * the tuple having been prepared. + */ + if (log_dest_table) + { + Assert(conflictlogrel != NULL); + prepare_conflict_log_tuple(estate, + relinfo->ri_RelationDesc, + conflictlogrel, + type, + searchslot, + conflicttuples, + remoteslot); + } + + /* + * Report the conflict to the server log before inserting it into the + * conflict log table. Emitting it first guarantees the conflict is + * recorded even if the table insert below fails; it is also what raises + * the error for ERROR-level conflicts. When the server log is one of the + * destinations we emit the full details, otherwise (table-only) we emit a + * shorter message since the details are captured in the table. + */ + if (log_dest_logfile) + { + StringInfoData err_detail; + + initStringInfo(&err_detail); + + /* Form errdetail message by combining conflicting tuples information. */ + foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples) + errdetail_apply_conflict(estate, relinfo, type, searchslot, + conflicttuple->slot, remoteslot, + conflicttuple->indexoid, + conflicttuple->xmin, + conflicttuple->origin, + conflicttuple->ts, + &err_detail); + + /* Standard reporting with full internal details. */ + ereport(elevel, + errcode_apply_conflict(type), + errmsg("conflict detected on relation \"%s\": conflict=%s", + RelationGetQualifiedRelationName(localrel), + ConflictTypeNames[type]), + errdetail_internal("%s", err_detail.data)); + } + else if (log_dest_table) + { + /* + * Not logging conflict details to the server log; report the conflict + * but omit raw tuple data since it is captured in the conflict log + * table. + */ + ereport(elevel, + errcode_apply_conflict(type), + errmsg("conflict detected on relation \"%s\": conflict=%s", + RelationGetQualifiedRelationName(localrel), + ConflictTypeNames[type]), + errdetail("Conflict details are logged to the conflict log table: %s", + RelationGetRelationName(conflictlogrel))); + } + + /* + * Insert into the conflict log table if requested. For conflicts below + * ERROR the apply transaction continues, so insert immediately; for + * ERROR-level conflicts the ereport() above already raised the error and + * the insertion is deferred to a new transaction + * (ProcessPendingConflictLogTuple) so that it is not rolled back. + */ + if (log_dest_table) + { + if (elevel < ERROR) + { + PG_TRY(); + { + InsertConflictLogTuple(conflictlogrel); + } + PG_CATCH(); + { + /* + * The insert failed, so the apply transaction will abort and + * the error will propagate to the worker's error handler. The + * conflict was already reported to the server log above, so + * it is not lost. Discard the prepared tuple so that the + * deferred insertion path (ProcessPendingConflictLogTuple) + * does not retry this same failing insert. + */ + if (pending_conflict_log.tuple != NULL) + { + heap_freetuple(pending_conflict_log.tuple); + pending_conflict_log.tuple = NULL; + } + if (pending_conflict_log.errcontext_str != NULL) + { + pfree(pending_conflict_log.errcontext_str); + pending_conflict_log.errcontext_str = NULL; + } + PG_RE_THROW(); + } + PG_END_TRY(); + } + + table_close(conflictlogrel, RowExclusiveLock); + } +} + +/* + * ProcessPendingConflictLogTuple + * Insert any deferred conflict log tuple in a separate transaction. + * + * For conflicts raised at ERROR level, the conflict log tuple cannot be + * inserted immediately because the surrounding transaction will abort. + * To ensure that conflict information is not lost, such tuples are prepared + * during error processing (see prepare_conflict_log_tuple()) but their + * insertion is deferred. + * + * This function is responsible for completing that deferred insertion after + * the failing transaction has been aborted and the system has returned to an + * idle state. It executes the insertion in a new, independent transaction, + * ensuring that the conflict log entry is durable and not rolled back + * together with the failed apply transaction. + */ +void +ProcessPendingConflictLogTuple(void) +{ + Relation conflictlogrel; + ConflictLogDest dest; + + /* Nothing to do */ + if (pending_conflict_log.tuple == NULL) + return; + + /* + * Insert the deferred conflict log tuple in its own transaction. A + * failure here (e.g. an out-of-disk-space error) is treated like any + * other apply error and raises an ERROR; such failures are expected to be + * rare and persistent. Callers must therefore have already reported (and + * cleared) any in-progress apply error before calling this, so that this + * error does not mask the original one. + */ + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + + /* + * Test hook: pause here so a TAP test can take a conflicting lock on the + * conflict log table before this transaction tries to open it. See + * src/test/subscription/t/039_pa_conflict_log_lock_wait.pl. + */ + INJECTION_POINT("clt-pending-flush-before-open", NULL); + + /* Open the conflict log table and insert the tuple. */ + conflictlogrel = GetConflictLogDestAndTable(&dest); - ereport(elevel, - errcode_apply_conflict(type), - errmsg("conflict detected on relation \"%s.%s\": conflict=%s", - get_namespace_name(RelationGetNamespace(localrel)), - RelationGetRelationName(localrel), - ConflictTypeNames[type]), - errdetail_internal("%s", err_detail.data)); + if (conflictlogrel != NULL) + { + InsertConflictLogTuple(conflictlogrel); + table_close(conflictlogrel, RowExclusiveLock); + } + else + { + /* + * The conflict log table was dropped concurrently (e.g. by an ALTER + * SUBSCRIPTION that changed conflict_log_destination) after the + * conflict was already reported to the server log by + * ReportApplyConflict(). Nothing more to do; just discard the + * prepared tuple and its context string. + */ + heap_freetuple(pending_conflict_log.tuple); + pending_conflict_log.tuple = NULL; + if (pending_conflict_log.errcontext_str) + { + pfree(pending_conflict_log.errcontext_str); + pending_conflict_log.errcontext_str = NULL; + } + } + + PopActiveSnapshot(); + CommitTransactionCommand(); } /* @@ -335,6 +582,98 @@ InitConflictIndexes(ResultRelInfo *relInfo) relInfo->ri_onConflictArbiterIndexes = uniqueIndexes; } +/* + * GetConflictLogDestAndTable + * + * Fetches conflict logging metadata from the cached MySubscription pointer. + * Sets the destination enum in *log_dest and, if a table is one of the + * destinations, opens and returns the relation handle for the conflict log + * table. + * + * The table is opened with try_table_open(), so NULL is returned if the + * conflict log table has been dropped concurrently (e.g. by an ALTER + * SUBSCRIPTION that changed conflict_log_destination). Callers must treat a + * NULL result for a table destination as "table unavailable" and fall back to + * server-log reporting rather than failing. + */ +Relation +GetConflictLogDestAndTable(ConflictLogDest *log_dest) +{ + Oid conflictlogrelid; + + /* + * Convert the text log destination to the internal enum. MySubscription + * already contains the data from pg_subscription. + */ + *log_dest = GetConflictLogDest(MySubscription->conflictlogdest); + + /* Quick exit if a conflict log table was not requested. */ + if (!CONFLICTS_LOGGED_TO_TABLE(*log_dest)) + return NULL; + + conflictlogrelid = MySubscription->conflictlogrelid; + + Assert(OidIsValid(conflictlogrelid)); + + /* + * Use try_table_open(): the table may have been dropped concurrently by + * an ALTER SUBSCRIPTION that changed conflict_log_destination. Returning + * NULL lets the caller fall back to the server log instead of failing. + */ + return try_table_open(conflictlogrelid, RowExclusiveLock); +} + +/* + * Error context callback for failures while inserting into the conflict log + * table. Adds a line identifying the conflict that was being logged. + */ +static void +conflict_log_insert_errcontext(void *arg) +{ + char *ctx = (char *) arg; + + if (ctx) + errcontext("%s", ctx); +} + +/* + * InsertConflictLogTuple + * + * Insert conflict log tuple into the conflict log table. + */ +void +InsertConflictLogTuple(Relation conflictlogrel) +{ + ErrorContextCallback errcallback; + + /* A valid tuple must be prepared and stored in pending_conflict_log. */ + Assert(pending_conflict_log.tuple != NULL); + + /* + * Set up an error context so that a failure to insert (e.g. the conflict + * log table was dropped, or an out-of-space error) carries information + * identifying the conflict we were trying to log. + */ + errcallback.callback = conflict_log_insert_errcontext; + errcallback.arg = pending_conflict_log.errcontext_str; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + heap_insert(conflictlogrel, pending_conflict_log.tuple, + GetCurrentCommandId(true), 0, NULL); + + error_context_stack = errcallback.previous; + + /* Free the conflict log tuple and its context string. */ + heap_freetuple(pending_conflict_log.tuple); + pending_conflict_log.tuple = NULL; + if (pending_conflict_log.errcontext_str) + { + pfree(pending_conflict_log.errcontext_str); + pending_conflict_log.errcontext_str = NULL; + } +} + /* * Add SQLSTATE error code to the current conflict report. */ @@ -768,6 +1107,40 @@ get_tuple_desc(EState *estate, ResultRelInfo *relinfo, ConflictType type, } } +/* + * Helper function to extract the "raw" index key Datums and their null flags + * from a TupleTableSlot, given an already open index descriptor. + * This is the reusable core logic. + */ +static void +build_index_datums_from_slot(EState *estate, Relation localrel, + TupleTableSlot *slot, + Relation indexDesc, Datum *values, + bool *isnull) +{ + TupleTableSlot *tableslot = slot; + + /* + * If the slot is a virtual slot, copy it into a heap tuple slot as + * FormIndexDatum only works with heap tuple slots. + */ + if (TTS_IS_VIRTUAL(slot)) + { + /* Slot is created within the EState's tuple table */ + tableslot = table_slot_create(localrel, &estate->es_tupleTable); + tableslot = ExecCopySlot(tableslot, slot); + } + + /* + * Initialize ecxt_scantuple for potential use in FormIndexDatum + */ + GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot; + + /* Form the index datums */ + FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, + isnull); +} + /* * Helper functions to construct a string describing the contents of an index * entry. See BuildIndexValueDescription for details. @@ -783,41 +1156,359 @@ build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot, Relation indexDesc; Datum values[INDEX_MAX_KEYS]; bool isnull[INDEX_MAX_KEYS]; - TupleTableSlot *tableslot = slot; - if (!tableslot) + if (!slot) return NULL; Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true)); indexDesc = index_open(indexoid, NoLock); - /* - * If the slot is a virtual slot, copy it into a heap tuple slot as - * FormIndexDatum only works with heap tuple slots. - */ - if (TTS_IS_VIRTUAL(slot)) + build_index_datums_from_slot(estate, localrel, slot, indexDesc, values, + isnull); + + index_value = BuildIndexValueDescription(indexDesc, values, isnull); + + index_close(indexDesc, NoLock); + + return index_value; +} + +/* + * tuple_table_slot_to_json_datum + * + * Helper function to convert a TupleTableSlot to JSON. + */ +static Datum +tuple_table_slot_to_json_datum(TupleTableSlot *slot) +{ + HeapTuple tuple; + Datum datum; + Datum json; + + Assert(slot != NULL); + + tuple = ExecCopySlotHeapTuple(slot); + datum = heap_copy_tuple_as_datum(tuple, slot->tts_tupleDescriptor); + + json = DirectFunctionCall1(row_to_json, datum); + heap_freetuple(tuple); + + return json; +} + +/* + * tuple_table_slot_to_indextup_json + * + * Fetch replica identity key from the tuple table slot and convert into a + * JSON datum. + */ +static Datum +tuple_table_slot_to_indextup_json(EState *estate, Relation localrel, + Oid indexid, TupleTableSlot *slot) +{ + Relation indexDesc; + Datum values[INDEX_MAX_KEYS]; + bool isnull[INDEX_MAX_KEYS]; + HeapTuple tuple; + TupleDesc tupdesc; + Datum datum; + + Assert(slot != NULL); + + Assert(CheckRelationOidLockedByMe(indexid, RowExclusiveLock, true)); + + indexDesc = index_open(indexid, NoLock); + + build_index_datums_from_slot(estate, localrel, slot, indexDesc, values, + isnull); + tupdesc = CreateTupleDescCopy(RelationGetDescr(indexDesc)); + + /* Bless the tupdesc so it can be looked up by row_to_json. */ + BlessTupleDesc(tupdesc); + + /* Form the replica identity tuple. */ + tuple = heap_form_tuple(tupdesc, values, isnull); + datum = heap_copy_tuple_as_datum(tuple, tupdesc); + + heap_freetuple(tuple); + FreeTupleDesc(tupdesc); + index_close(indexDesc, NoLock); + + /* Convert to a JSON datum. */ + return DirectFunctionCall1(row_to_json, datum); +} + +/* + * build_conflict_tupledesc + * + * Build and bless a tuple descriptor for the conflict log table based on the + * predefined LocalConflictSchema. + */ +static TupleDesc +build_conflict_tupledesc(void) +{ + static TupleDesc cached_tupdesc = NULL; + + if (cached_tupdesc == NULL) { - tableslot = table_slot_create(localrel, &estate->es_tupleTable); - tableslot = ExecCopySlot(tableslot, slot); + MemoryContext oldcxt; + + oldcxt = MemoryContextSwitchTo(CacheMemoryContext); + + cached_tupdesc = CreateTemplateTupleDesc(NUM_LOCAL_CONFLICT_ATTRS); + + for (int i = 0; i < NUM_LOCAL_CONFLICT_ATTRS; i++) + TupleDescInitEntry(cached_tupdesc, + (AttrNumber) (i + 1), + LocalConflictSchema[i].attname, + LocalConflictSchema[i].atttypid, + -1, 0); + + TupleDescFinalize(cached_tupdesc); + + /* + * Bless once so it can be used as a RECORD type (e.g. for row_to_json + * or other record-based operations). + */ + BlessTupleDesc(cached_tupdesc); + + MemoryContextSwitchTo(oldcxt); } - /* - * Initialize ecxt_scantuple for potential use in FormIndexDatum when - * index expressions are present. - */ - GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot; + return cached_tupdesc; +} - /* - * The values/nulls arrays passed to BuildIndexValueDescription should be - * the results of FormIndexDatum, which are the "raw" input to the index - * AM. - */ - FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull); +/* + * Builds the local conflicts JSON array column from the list of + * ConflictTupleInfo objects. + * + * Example output structure: + * [ { "xid": "1001", "commit_ts": "...", "origin": "...", "tuple": {...} }, ... ] + */ +static Datum +build_local_conflicts_json_array(EState *estate, Relation rel, + ConflictType conflict_type, + List *conflicttuples) +{ + ListCell *lc; + List *json_datums = NIL; + Datum *json_datum_array; + Datum json_array_datum; + int num_conflicts; + int i; + int16 typlen; + bool typbyval; + char typalign; + TupleDesc tupdesc; - index_value = BuildIndexValueDescription(indexDesc, values, isnull); + /* Build local conflicts tuple descriptor. */ + tupdesc = build_conflict_tupledesc(); - index_close(indexDesc, NoLock); + /* Process local conflict tuple list and prepare an array of JSON. */ + foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples) + { + Datum values[NUM_LOCAL_CONFLICT_ATTRS] = {0}; + bool nulls[NUM_LOCAL_CONFLICT_ATTRS] = {0}; + char *origin_name = NULL; + HeapTuple tuple; + Datum json_datum; + int attno; + + attno = 0; + values[attno++] = TransactionIdGetDatum(conflicttuple->xmin); + + if (conflicttuple->ts) + values[attno++] = TimestampTzGetDatum(conflicttuple->ts); + else + nulls[attno++] = true; - return index_value; + if (conflicttuple->origin != InvalidReplOriginId) + replorigin_by_oid(conflicttuple->origin, true, &origin_name); + + /* Store empty string if origin name for the tuple is NULL. */ + if (origin_name != NULL) + values[attno++] = CStringGetTextDatum(origin_name); + else + nulls[attno++] = true; + + /* + * Add the conflicting key values in the case of a unique constraint + * violation. + */ + if (conflict_type == CT_INSERT_EXISTS || + conflict_type == CT_UPDATE_EXISTS || + conflict_type == CT_MULTIPLE_UNIQUE_CONFLICTS) + { + Oid indexoid = conflicttuple->indexoid; + + Assert(OidIsValid(indexoid) && conflicttuple->slot && + CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, + true)); + values[attno++] = + tuple_table_slot_to_indextup_json(estate, rel, + indexoid, + conflicttuple->slot); + } + else + nulls[attno++] = true; + + /* Convert conflicting tuple to JSON datum. */ + if (conflicttuple->slot) + values[attno] = tuple_table_slot_to_json_datum(conflicttuple->slot); + else + nulls[attno] = true; + + Assert(attno + 1 == NUM_LOCAL_CONFLICT_ATTRS); + + tuple = heap_form_tuple(tupdesc, values, nulls); + + json_datum = heap_copy_tuple_as_datum(tuple, tupdesc); + + /* + * Build the higher level JSON datum in format described in function + * header. + */ + json_datum = DirectFunctionCall1(row_to_json, json_datum); + + /* Done with the temporary tuple. */ + heap_freetuple(tuple); + + /* Add to the array element. */ + json_datums = lappend(json_datums, (void *) json_datum); + } + + num_conflicts = list_length(json_datums); + + json_datum_array = palloc_array(Datum, num_conflicts); + + i = 0; + foreach(lc, json_datums) + { + json_datum_array[i] = (Datum) lfirst(lc); + i++; + } + + /* Construct the JSON array Datum. */ + get_typlenbyvalalign(JSONOID, &typlen, &typbyval, &typalign); + json_array_datum = PointerGetDatum(construct_array(json_datum_array, + num_conflicts, + JSONOID, + typlen, + typbyval, + typalign)); + pfree(json_datum_array); + + return json_array_datum; +} + +/* + * prepare_conflict_log_tuple + * + * This routine prepares a tuple detailing a conflict encountered during + * logical replication. The prepared tuple will be stored in + * pending_conflict_log.tuple which should be inserted into the + * conflict log table by calling InsertConflictLogTuple. + */ +static void +prepare_conflict_log_tuple(EState *estate, Relation rel, + Relation conflictlogrel, + ConflictType conflict_type, + TupleTableSlot *searchslot, + List *conflicttuples, + TupleTableSlot *remoteslot) +{ + Datum values[NUM_CONFLICT_ATTRS] = {0}; + bool nulls[NUM_CONFLICT_ATTRS] = {0}; + int attno; + char *remote_origin = NULL; + MemoryContext oldctx; + + Assert(pending_conflict_log.tuple == NULL); + + /* Populate the values and nulls arrays. */ + attno = 0; + values[attno++] = ObjectIdGetDatum(RelationGetRelid(rel)); + + values[attno++] = + CStringGetTextDatum(get_namespace_name(RelationGetNamespace(rel))); + + values[attno++] = CStringGetTextDatum(RelationGetRelationName(rel)); + + values[attno++] = CStringGetTextDatum(ConflictTypeNames[conflict_type]); + + if (TransactionIdIsValid(remote_xid)) + values[attno++] = TransactionIdGetDatum(remote_xid); + else + nulls[attno++] = true; + + values[attno++] = LSNGetDatum(remote_final_lsn); + + if (remote_commit_ts > 0) + values[attno++] = TimestampTzGetDatum(remote_commit_ts); + else + nulls[attno++] = true; + + if (replorigin_xact_state.origin != InvalidReplOriginId) + replorigin_by_oid(replorigin_xact_state.origin, true, &remote_origin); + + if (remote_origin != NULL) + values[attno++] = CStringGetTextDatum(remote_origin); + else + nulls[attno++] = true; + + if (!TupIsNull(searchslot)) + { + Oid replica_index = GetRelationIdentityOrPK(rel); + + /* + * If the table has a valid replica identity index, build the index + * JSON datum from key value. Otherwise, construct it from the + * complete tuple in REPLICA IDENTITY FULL cases. + */ + if (OidIsValid(replica_index)) + { + values[attno++] = BoolGetDatum(false); + values[attno++] = tuple_table_slot_to_indextup_json(estate, rel, + replica_index, + searchslot); + } + else + { + values[attno++] = BoolGetDatum(true); + values[attno++] = tuple_table_slot_to_json_datum(searchslot); + } + } + else + { + nulls[attno++] = true; + nulls[attno++] = true; + } + + if (!TupIsNull(remoteslot)) + values[attno++] = tuple_table_slot_to_json_datum(remoteslot); + else + nulls[attno++] = true; + + values[attno] = build_local_conflicts_json_array(estate, rel, + conflict_type, + conflicttuples); + + Assert(attno + 1 == NUM_CONFLICT_ATTRS); + + oldctx = MemoryContextSwitchTo(ApplyContext); + pending_conflict_log.tuple = + heap_form_tuple(RelationGetDescr(conflictlogrel), values, nulls); + + /* + * Stash a context string describing this conflict, so that if inserting + * the tuple into the conflict log table fails, the resulting error + * carries enough context to identify the conflict (see + * InsertConflictLogTuple). + */ + pending_conflict_log.errcontext_str = + psprintf("while logging conflict \"%s\" detected on relation \"%s\"", + ConflictTypeNames[conflict_type], + RelationGetRelationName(rel)); + MemoryContextSwitchTo(oldctx); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 7799266c614..042c7203619 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -487,7 +487,9 @@ static bool MySubscriptionValid = false; static List *on_commit_wakeup_workers_subids = NIL; bool in_remote_transaction = false; -static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; +XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; +TransactionId remote_xid = InvalidTransactionId; +TimestampTz remote_commit_ts = 0; /* fields valid only when processing streamed transaction */ static bool in_streamed_transaction = false; @@ -1236,6 +1238,8 @@ apply_handle_begin(StringInfo s) set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn); remote_final_lsn = begin_data.final_lsn; + remote_commit_ts = begin_data.committime; + remote_xid = begin_data.xid; maybe_start_skipping_changes(begin_data.final_lsn); @@ -1296,6 +1300,8 @@ apply_handle_begin_prepare(StringInfo s) set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn); remote_final_lsn = begin_data.prepare_lsn; + remote_xid = begin_data.xid; + remote_commit_ts = 0; maybe_start_skipping_changes(begin_data.prepare_lsn); @@ -1767,6 +1773,10 @@ apply_handle_stream_start(StringInfo s) (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid transaction ID in streamed replication transaction"))); + remote_xid = stream_xid; + remote_final_lsn = InvalidXLogRecPtr; + remote_commit_ts = 0; + set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr); /* Try to allocate a worker for the streaming transaction. */ @@ -2428,6 +2438,9 @@ apply_handle_stream_commit(StringInfo s) { case TRANS_LEADER_APPLY: + /* Set remote_commit_ts for conflict logging. */ + remote_commit_ts = commit_data.committime; + /* * The transaction has been serialized to file, so replay all the * spooled operations. @@ -5647,6 +5660,9 @@ start_apply(XLogRecPtr origin_startpos) } PG_CATCH(); { + MemoryContext oldcontext; + ErrorData *edata; + /* * Reset the origin state to prevent the advancement of origin * progress if we fail to apply. Otherwise, this will result in @@ -5660,14 +5676,34 @@ start_apply(XLogRecPtr origin_startpos) else { /* - * Report the worker failed while applying changes. Abort the - * current transaction so that the stats message is sent in an - * idle state. + * Save the error and recover to an idle state so we can insert + * the deferred conflict log tuple (if any) before re-throwing. + * Copy the error into a long-lived context first, as it may have + * been raised under ErrorContext. Also reset the error context + * stack: the callbacks in effect when the error was thrown belong + * to unwound stack frames, and the deferred insert installs its + * own. */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + edata = CopyErrorData(); + MemoryContextSwitchTo(oldcontext); + + FlushErrorState(); + error_context_stack = NULL; AbortOutOfAnyTransaction(); pgstat_report_subscription_error(MySubscription->oid); - PG_RE_THROW(); + /* + * Insert the deferred conflict log tuple in its own transaction. + * If this fails, that error (annotated with the conflict context, + * see InsertConflictLogTuple) propagates instead of the original; + * such failures are expected to be rare and persistent (e.g. out + * of disk space). + */ + ProcessPendingConflictLogTuple(); + + /* Re-throw the original error. */ + ReThrowError(edata); } } PG_END_TRY(); @@ -6034,6 +6070,14 @@ DisableSubscriptionAndExit(void) RESUME_INTERRUPTS(); + /* + * The error context callbacks in effect when the error was thrown belong + * to now-unwound stack frames; reset the stack before running further + * code (including the deferred conflict log insertion, which installs its + * own). + */ + error_context_stack = NULL; + /* * Report the worker failed during sequence synchronization, table * synchronization, or apply. @@ -6062,6 +6106,19 @@ DisableSubscriptionAndExit(void) errmsg("subscription \"%s\" has been disabled because of an error", MySubscription->name)); + /* + * Insert the deferred conflict log tuple (if any) now that the + * subscription has been disabled and committed. Doing it after the + * disable means a failure to log the conflict (treated as a hard error, + * see ProcessPendingConflictLogTuple) cannot prevent the subscription + * from being disabled and so cannot leave the worker restarting and + * failing forever. Do it before the dead-tuple retention check below: + * that check only warns today, but it takes an elevel and could raise an + * error, which must not prevent the conflict from being recorded. The + * original error was already reported above. + */ + ProcessPendingConflictLogTuple(); + /* * Skip the track_commit_timestamp check when disabling the worker due to * an error, as verifying commit timestamps is unnecessary in this diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index 326570ed639..4db3a36d4df 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -10,6 +10,7 @@ #define CONFLICT_H #include "access/xlogdefs.h" +#include "access/genam.h" #include "datatype/timestamp.h" #include "nodes/pg_list.h" @@ -114,5 +115,8 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples); +extern void ProcessPendingConflictLogTuple(void); extern void InitConflictIndexes(ResultRelInfo *relInfo); +extern Relation GetConflictLogDestAndTable(ConflictLogDest *log_dest); +extern void InsertConflictLogTuple(Relation conflictlogrel); #endif diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 745b7d9e969..394f4c6265e 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -121,6 +121,9 @@ typedef enum ParallelTransState PARALLEL_TRANS_UNKNOWN, PARALLEL_TRANS_STARTED, PARALLEL_TRANS_FINISHED, + PARALLEL_TRANS_ERROR, /* worker failed; it will report the error + * (and log the conflict, if any) before + * exiting */ } ParallelTransState; /* @@ -255,6 +258,10 @@ extern PGDLLIMPORT bool InitializingApplyWorker; extern PGDLLIMPORT List *table_states_not_ready; +extern XLogRecPtr remote_final_lsn; +extern TimestampTz remote_commit_ts; +extern TransactionId remote_xid; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index fa07ebf8ff7..55279edfda6 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -713,16 +713,19 @@ RelationCloseSmgr(Relation relation) * * We don't log information for unlogged tables (since they don't WAL log * anyway), for foreign tables (since they don't WAL log, either), - * and for system tables (their content is hard to make sense of, and - * it would complicate decoding slightly for little gain). Note that we *do* - * log information for user defined catalog tables since they presumably are - * interesting to the user... + * for system tables (their content is hard to make sense of, and + * it would complicate decoding slightly for little gain), and for conflict + * log tables in the pg_conflict namespace (which are system-managed tables + * used internally on subscribers). Note that we *do* log information for + * user defined catalog tables since they presumably are interesting to the + * user... */ #define RelationIsLogicallyLogged(relation) \ (XLogLogicalInfoActive() && \ RelationNeedsWAL(relation) && \ (relation)->rd_rel->relkind != RELKIND_FOREIGN_TABLE && \ - !IsCatalogRelation(relation)) + !IsCatalogRelation(relation) && \ + !IsConflictLogTableNamespace(RelationGetNamespace(relation))) /* routines in utils/cache/relcache.c */ extern void RelationIncrementReferenceCount(Relation rel); diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index e71e95c6297..225f90a37b3 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -48,6 +48,7 @@ tests += { 't/036_sequences.pl', 't/037_except.pl', 't/038_walsnd_shutdown_timeout.pl', + 't/039_pa_conflict_log_lock_wait.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/030_origin.pl b/src/test/subscription/t/030_origin.pl index 6bc6b7874c2..5f4d00bdd33 100644 --- a/src/test/subscription/t/030_origin.pl +++ b/src/test/subscription/t/030_origin.pl @@ -166,7 +166,7 @@ is($result, qq(32), 'The node_A data replicated to node_B'); $node_C->safe_psql('postgres', "UPDATE $tab SET a = 33 WHERE a = 32;"); $node_B->wait_for_log( - qr/conflict detected on relation "public.$tab_unquoted": conflict=update_origin_differs.*\n.*DETAIL:.* Updating the row that was modified by a different origin ".*" in transaction [0-9]+ at .*: local row \(32\), remote row \(33\), replica identity \(a\)=\(32\)./ + qr/conflict detected on relation "public.$tab": conflict=update_origin_differs.*\n.*DETAIL:.* Updating the row that was modified by a different origin ".*" in transaction [0-9]+ at .*: local row \(32\), remote row \(33\), replica identity \(a\)=\(32\)./ ); $node_B->safe_psql('postgres', "DELETE FROM $tab;"); @@ -182,7 +182,7 @@ is($result, qq(33), 'The node_A data replicated to node_B'); $node_C->safe_psql('postgres', "DELETE FROM $tab WHERE a = 33;"); $node_B->wait_for_log( - qr/conflict detected on relation "public.$tab_unquoted": conflict=delete_origin_differs.*\n.*DETAIL:.* Deleting the row that was modified by a different origin ".*" in transaction [0-9]+ at .*: local row \(33\), replica identity \(a\)=\(33\).*/ + qr/conflict detected on relation "public.$tab": conflict=delete_origin_differs.*\n.*DETAIL:.* Deleting the row that was modified by a different origin ".*" in transaction [0-9]+ at .*: local row \(33\), replica identity \(a\)=\(33\).*/ ); # The remaining tests no longer test conflict detection. diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl index 3910a49c0aa..6a7d094712d 100644 --- a/src/test/subscription/t/035_conflicts.pl +++ b/src/test/subscription/t/035_conflicts.pl @@ -84,10 +84,35 @@ $node_subscriber->wait_for_log( .*Key already exists in unique index \"conf_tab_c_key\", modified in transaction .*: key \(c\)=\(4\), local row \(4, 4, 4\)./, $log_offset); +# Verify the contents of the Conflict Log Table (CLT) +# This section ensures that the CLT contains the expected +# type and specific key data. +my $subid = $node_subscriber->safe_psql('postgres', + "SELECT oid FROM pg_subscription WHERE subname = 'sub_tab';"); +my $clt = "pg_conflict.pg_conflict_log_$subid"; + +# Wait for the conflict to be logged in the CLT +my $log_check = $node_subscriber->poll_query_until( + 'postgres', + "SELECT count(*) > 0 FROM $clt;" +); + +my $conflict_check = $node_subscriber->safe_psql('postgres', + "SELECT count(*) >= 1 FROM $clt WHERE conflict_type = 'multiple_unique_conflicts';"); +is($conflict_check, 't', 'Verified multiple_unique_conflicts logged into conflict log table'); + +my $json_query = "SELECT local_conflicts FROM $clt;"; +my $raw_json = $node_subscriber->safe_psql('postgres', $json_query); + +# Verify that '2' is present inside the JSON structure using a regex +# This matches the key/value pattern for "a": 2 +like($raw_json, qr/\\"a\\":2/, 'Verified that key 2 exists in the local_conflicts'); + pass('multiple_unique_conflicts detected during insert'); # Truncate table to get rid of the error $node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;"); +$node_subscriber->safe_psql('postgres', "DELETE FROM $clt"); ################################################## # Test multiple_unique_conflicts due to UPDATE @@ -114,6 +139,26 @@ $node_subscriber->wait_for_log( .*Key already exists in unique index \"conf_tab_c_key\", modified in transaction .*: key \(c\)=\(8\), local row \(8, 8, 8\)./, $log_offset); +# Verify the contents of the Conflict Log Table (CLT) +# This section ensures that the CLT contains the expected +# type and specific key data. + +# Wait for the conflict to be logged in the CLT +$log_check = $node_subscriber->poll_query_until( + 'postgres', + "SELECT count(*) > 0 FROM $clt;" +); + +$conflict_check = $node_subscriber->safe_psql('postgres', + "SELECT count(*) >= 1 FROM $clt WHERE conflict_type = 'multiple_unique_conflicts';"); +is($conflict_check, 't', 'Verified multiple_unique_conflicts logged into conflict log table'); + +$raw_json = $node_subscriber->safe_psql('postgres', $json_query); + +# Verify that '6' is present inside the JSON structure using a regex +# This matches the key/value pattern for "a": 6 +like($raw_json, qr/\\"a\\":6/, 'Verified that key 6 exists in the local_conflicts'); + pass('multiple_unique_conflicts detected during update'); # Truncate table to get rid of the error @@ -674,10 +719,6 @@ ok( $node_A->poll_query_until( # A conflict log table is system-managed and cannot be altered directly, so # moving it to another tablespace must be rejected. ############################################################################### -my $subid = $node_subscriber->safe_psql('postgres', - "SELECT oid FROM pg_subscription WHERE subname = 'sub_tab';"); -my $clt = "pg_conflict.pg_conflict_log_$subid"; - (undef, undef, $stderr) = $node_subscriber->psql('postgres', "ALTER TABLE $clt SET TABLESPACE pg_default"); like( diff --git a/src/test/subscription/t/039_pa_conflict_log_lock_wait.pl b/src/test/subscription/t/039_pa_conflict_log_lock_wait.pl new file mode 100644 index 00000000000..f80047dcb50 --- /dev/null +++ b/src/test/subscription/t/039_pa_conflict_log_lock_wait.pl @@ -0,0 +1,203 @@ +# Copyright (c) 2026, PostgreSQL Global Development Group + +# Test that a parallel apply (PA) worker correctly inserts a deferred +# conflict-log tuple even when, by the time it reaches +# ProcessPendingConflictLogTuple(), the conflict log table is held under +# ACCESS EXCLUSIVE by another session. +# +# The window we want to exercise is narrow: PA must already be past +# ReportApplyConflict() (so the error has fired and PA is in PG_CATCH), +# and the locker must take the CLT lock *before* PA reaches the second +# CLT open inside ProcessPendingConflictLogTuple(). An injection point +# pauses PA at exactly that point so the locker can grab the lock first; +# without it, the locker either takes the lock too early (PA blocks in +# the inline CLT open inside ReportApplyConflict, before the error has +# fired) or too late (PA inserts before the lock is taken). + +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +if ($ENV{enable_injection_points} ne 'yes') +{ + plan skip_all => 'Injection points not supported by this build'; +} + +# --------------------------------------------------------------------- +# Set up publisher and subscriber. Force every transaction to stream so +# the conflict is handled by a parallel apply worker rather than the +# leader. +# --------------------------------------------------------------------- +my $node_pub = PostgreSQL::Test::Cluster->new('publisher'); +$node_pub->init(allows_streaming => 'logical'); +$node_pub->append_conf('postgresql.conf', q{ +debug_logical_replication_streaming = immediate +logical_decoding_work_mem = 64kB +}); +$node_pub->start; + +my $node_sub = PostgreSQL::Test::Cluster->new('subscriber'); +$node_sub->init; +$node_sub->append_conf('postgresql.conf', q{ +shared_preload_libraries = 'injection_points' +max_logical_replication_workers = 4 +max_parallel_apply_workers_per_subscription = 2 +}); +$node_sub->start; + +# Replicated table; the pre-existing row on the subscriber is what makes +# the publisher's INSERT (id=1) trigger an INSERT_EXISTS conflict. +$node_pub->safe_psql('postgres', q{ + CREATE TABLE t (id int PRIMARY KEY, val text); + ALTER TABLE t REPLICA IDENTITY FULL; + CREATE PUBLICATION p FOR TABLE t; +}); + +$node_sub->safe_psql('postgres', q{ + CREATE TABLE t (id int PRIMARY KEY, val text); + INSERT INTO t VALUES (1, 'pre-existing'); + CREATE EXTENSION injection_points; +}); + +my $pub_connstr = $node_pub->connstr . ' dbname=postgres'; +$node_sub->safe_psql('postgres', qq{ + CREATE SUBSCRIPTION s + CONNECTION '$pub_connstr' + PUBLICATION p + WITH (streaming = parallel, + conflict_log_destination = 'all', + disable_on_error = true); +}); + +$node_sub->wait_for_subscription_sync($node_pub, 's'); + +# --------------------------------------------------------------------- +# Send a non-conflicting INSERT and then wait until pg_subscription_rel +# reaches 'r' (ready) on every relation. pa_can_start() requires +# AllTablesyncsReady(), which returns true only when every +# pg_subscription_rel row is 'r'. The 's' (syncdone) -> 'r' transition +# fires inside ProcessSyncingTablesForApply, which only flips the state +# when the apply worker's last_received LSN has advanced past the +# tablesync end LSN -- so we need a triggering commit on the publisher +# to drive last_received forward. Without this step, the conflict txn +# below would arrive while the table is still 's', pa_can_start() would +# return false, the leader would spool to file and apply serially, and +# no parallel apply worker would ever spawn. +# --------------------------------------------------------------------- +$node_pub->safe_psql('postgres', "INSERT INTO t VALUES (1000, 'warmup');"); +$node_sub->poll_query_until('postgres', + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r');" +) or die "subscription tables did not reach READY state"; + +# --------------------------------------------------------------------- +# Look up the per-subscription CLT name. +# --------------------------------------------------------------------- +my $sub_oid = $node_sub->safe_psql('postgres', + "SELECT oid FROM pg_subscription WHERE subname = 's'"); +my $clt = "pg_conflict.pg_conflict_log_$sub_oid"; +note "conflict log table for subscription s: $clt"; + +# --------------------------------------------------------------------- +# Arm the injection point. This pauses the PA worker inside +# ProcessPendingConflictLogTuple() — i.e. *after* the error has fired +# and the PG_CATCH has run, *before* the second open of the CLT. This +# is the exact window the deferred-insert path needs to be tested in. +# --------------------------------------------------------------------- +$node_sub->safe_psql('postgres', + "SELECT injection_points_attach('clt-pending-flush-before-open', 'wait');"); + +# --------------------------------------------------------------------- +# Drive the conflict. PA receives the streamed txn, hits INSERT_EXISTS +# inside ReportApplyConflict (which opens/closes the CLT cleanly while +# preparing the deferred tuple), then ereport(ERROR) fires, PG_CATCH +# runs, and PA enters ProcessPendingConflictLogTuple — where it pauses +# at the injection point. +# --------------------------------------------------------------------- +my $log_offset = -s $node_sub->logfile; + +$node_pub->safe_psql('postgres', q{ + BEGIN; + INSERT INTO t SELECT g, repeat('x', 1000) FROM generate_series(2, 200) g; + INSERT INTO t VALUES (1, 'conflict'); + COMMIT; +}); + +# Wait until PA is parked at the injection point. +$node_sub->wait_for_event('logical replication parallel worker', + 'clt-pending-flush-before-open'); + +# --------------------------------------------------------------------- +# Now take ACCESS EXCLUSIVE on the CLT. TRUNCATE is permitted on CLTs; +# At this point the CLT is empty, so the TRUNCATE is effectively a no-op +# that just acquires the lock. +# Because PA is paused at the injection point, this lock is guaranteed +# to be acquired *before* PA tries to open the CLT. +# --------------------------------------------------------------------- +my $locker = $node_sub->background_psql('postgres'); +$locker->query_safe(qq{ + BEGIN; + TRUNCATE $clt; +}); + +# --------------------------------------------------------------------- +# Wake the PA from the injection point. It will now try to open the +# CLT inside ProcessPendingConflictLogTuple and block on the lock the +# locker session holds. +# --------------------------------------------------------------------- +$node_sub->safe_psql('postgres', + "SELECT injection_points_wakeup('clt-pending-flush-before-open'); + SELECT injection_points_detach('clt-pending-flush-before-open');"); + +# Confirm the PA worker is actually parked waiting on the CLT lock — +# this verifies we are exercising the deferred-insert lock-wait path, +# not racing past it. +my $clt_oid = $node_sub->safe_psql('postgres', + "SELECT '$clt'::regclass::oid"); +ok( $node_sub->poll_query_until( + 'postgres', qq{ + SELECT EXISTS ( + SELECT 1 + FROM pg_locks l + JOIN pg_stat_activity a ON l.pid = a.pid + WHERE NOT l.granted + AND l.relation = $clt_oid + AND a.backend_type = 'logical replication parallel worker' + ); + }, 't'), + 'PA worker is blocked on the CLT lock inside ProcessPendingConflictLogTuple'); + +# --------------------------------------------------------------------- +# Release the lock. PA wakes, inserts the deferred row, commits its +# CLT txn, re-throws the original error to the leader, and the leader +# disables the subscription (disable_on_error = true). +# --------------------------------------------------------------------- +$locker->query_safe('COMMIT;'); +ok($locker->quit, 'locker session closed cleanly'); + +ok( $node_sub->poll_query_until( + 'postgres', + "SELECT subenabled = false FROM pg_subscription WHERE subname = 's'", + 't'), + 'subscription disabled after the conflict'); + +# --------------------------------------------------------------------- +# Verify the deferred conflict log tuple survived the lock wait. +# --------------------------------------------------------------------- +my $rows = $node_sub->safe_psql('postgres', + "SELECT count(*) FROM $clt WHERE conflict_type = 'insert_exists'"); +is($rows, '1', + 'deferred CLT insert by PA worker succeeded after lock release'); + +# --------------------------------------------------------------------- +# Also verify the conflict was reported in the server log +# (conflict_log_destination = 'all' logs to both the table and the log). +# --------------------------------------------------------------------- +my $log_contents = slurp_file($node_sub->logfile, $log_offset); +like( + $log_contents, + qr/ERROR:\s+conflict detected on relation "public\.t": conflict=insert_exists/, + 'conflict reported in server log'); + +done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 117e7379f10..fa55410860d 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2249,6 +2249,7 @@ PatternInfo PatternInfoArray Pattern_Prefix_Status Pattern_Type +PendingConflictLogData PendingFsyncEntry PendingListenAction PendingListenEntry -- 2.49.0