From a148d389d3ec3b87a45de4267aeea6e559a0c556 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Fri, 19 Jun 2026 11:09:08 +0530 Subject: [PATCH v3] Improve error handling for conflict log table insertions. When a subscription logs conflicts to its conflict log table (CLT), the apply worker prepares the conflict tuple and inserts it either inline (for sub-ERROR conflicts, where apply continues) or, for ERROR-level conflicts, defers the insertion to a fresh transaction after the apply transaction aborts (ProcessPendingConflictLogTuple), so the log row is not rolled back with the failed change. The previous error handling around that insertion had a few problems: * ProcessPendingConflictLogTuple() wrapped the insert in its own PG_TRY/PG_CATCH that, on failure, called FlushErrorState() and downgraded the failure to a WARNING. When called from start_apply()'s PG_CATCH (which then does PG_RE_THROW()), that FlushErrorState() reset the error stack and discarded the very error being re-thrown, so a failure of the deferred insert could corrupt error reporting. It also silently swallowed genuine insertion failures. * If the inline insert (sub-ERROR path) failed, the conflict was not recorded anywhere: the apply transaction aborted before the conflict was written to the server log. * In the parallel apply worker, a failed deferred insert likewise lost the original error. Rework this so that a failure to write the CLT is treated as a normal apply error, and so that the conflict is always identifiable: * Drop the internal PG_TRY/PG_CATCH from ProcessPendingConflictLogTuple(). A failed insert now raises an ERROR like any other apply failure; such failures (e.g. the CLT was dropped, or out-of-space) are expected to be rare and persistent. * Annotate insertion failures with the conflict being logged. prepare_conflict_log_tuple() stashes a short description ("while logging conflict detected on relation ") in a new LogicalRepWorker.conflict_log_errcontext field (allocated, like conflict_log_tuple, in ApplyContext), and InsertConflictLogTuple() installs an error context callback around the heap_insert(). Any error raised during the insert therefore carries enough context to identify the conflict, even on the deferred path where the original conflict error is not separately reported. * Report the conflict to the server log before inserting it into the table. ReportApplyConflict() now prepares the tuple, emits the server-log message (full details when the destination includes the log, otherwise a short message pointing at the table), and only then inserts. This guarantees the conflict is recorded even if the table insert fails. The inline (sub-ERROR) insert is wrapped so that, on failure, the prepared tuple is discarded before re-throwing, ensuring the deferred path does not retry the same failing insert. * Rework the worker error handlers to do the deferred insertion cleanly. start_apply() and ParallelApplyWorkerMain() now copy the error into a long-lived context, FlushErrorState(), reset error_context_stack (the callbacks active at throw time belong to unwound frames, and the insert installs its own), abort, run ProcessPendingConflictLogTuple(), and finally ReThrowError() the original error. On success the original error is re-thrown; on a failed insert the insert error (carrying the conflict context) propagates instead. For the parallel apply worker this means the leader always receives a real ErrorResponse rather than a "lost connection to the parallel apply worker" message. * In DisableSubscriptionAndExit(), perform the deferred insertion after the subscription has been disabled and committed (and before the dead-tuple retention check), so a failure to log the conflict cannot prevent the subscription from being disabled or leave the worker restarting and failing forever. --- .../replication/logical/applyparallelworker.c | 38 ++-- src/backend/replication/logical/conflict.c | 214 +++++++++++------- src/backend/replication/logical/worker.c | 51 ++++- src/include/replication/worker_internal.h | 7 + 4 files changed, 211 insertions(+), 99 deletions(-) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index d24c52d43e6..48cb5558367 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -993,32 +993,44 @@ ParallelApplyWorkerMain(Datum main_arg) PG_CATCH(); { MemoryContext oldcontext; - ErrorData *edata; + ErrorData *edata; /* - * Copy the ErrorData before doing any further work. The error may - * have been raised while running under ErrorContext, so switch to - * a safe context (TopMemoryContext) to avoid assertions and ensure - * the error data survives subsequent cleanup. + * Reset the origin state to prevent the advancement of origin + * progress if we fail to apply. Otherwise, this will result in + * transaction loss as that transaction won't be sent again by the + * server. + */ + replorigin_xact_clear(true); + + /* + * Copy the error and recover to an idle state so we can insert the + * deferred conflict log tuple (if any) before re-throwing. Copy the + * error into a longer-lived context first, as it may have been raised + * under ErrorContext. Also reset the error context stack: the + * callbacks in effect when the error was thrown belong to unwound stack + * frames, and the deferred insert installs its own. */ oldcontext = MemoryContextSwitchTo(TopMemoryContext); edata = CopyErrorData(); MemoryContextSwitchTo(oldcontext); FlushErrorState(); + error_context_stack = NULL; + AbortOutOfAnyTransaction(); /* - * Reset the origin state to prevent the advancement of origin - * progress if we fail to apply. Otherwise, this will result in - * transaction loss as that transaction won't be sent again by the - * server. + * Insert the deferred conflict log tuple before re-throwing. + * Re-throwing is what reports the error to the leader (via the error + * queue set up above), so the insertion must happen first: otherwise + * the leader could start tearing down this worker while it is still + * writing the conflict log tuple. If the insertion itself fails, that + * error (annotated with the conflict context, see InsertConflictLogTuple) + * propagates to the leader instead of the original. */ - replorigin_xact_clear(true); - - AbortOutOfAnyTransaction(); ProcessPendingConflictLogTuple(); - /* Re-throw the original error. */ + /* Re-throw the original error, which reports it to the leader. */ ReThrowError(edata); } PG_END_TRY(); diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 6bf3d6d5a44..c2c15f055e6 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -335,17 +335,16 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, log_dest_table = CONFLICTS_LOGGED_TO_TABLE(dest); log_dest_logfile = CONFLICTS_LOGGED_TO_LOG(dest); - /* Insert to table if requested. */ + /* + * Prepare the conflict log tuple first when the destination includes the + * table. This must happen before the ereport() below, because for an + * ERROR-level conflict that ereport() raises the error and defers the + * actual insertion to ProcessPendingConflictLogTuple(), which relies on the + * tuple having been prepared. + */ if (log_dest_table) { Assert(conflictlogrel != NULL); - - /* - * Prepare the conflict log tuple. If the error level is below ERROR, - * insert it immediately. Otherwise, defer the insertion to a new - * transaction after the current one aborts, ensuring the insertion of - * the log tuple is not rolled back. - */ prepare_conflict_log_tuple(estate, relinfo->ri_RelationDesc, conflictlogrel, @@ -353,29 +352,16 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, searchslot, conflicttuples, remoteslot); - if (elevel < ERROR) - InsertConflictLogTuple(conflictlogrel); - - if (!log_dest_logfile) - { - /* - * Not logging conflict details to the server log; Report the error - * msg but omit raw tuple data from server logs since it's already - * captured in the conflict log table. - */ - ereport(elevel, - errcode_apply_conflict(type), - errmsg("conflict detected on relation \"%s\": conflict=%s", - RelationGetQualifiedRelationName(localrel), - ConflictTypeNames[type]), - errdetail("Conflict details are logged to the conflict log table: %s", - RelationGetRelationName(conflictlogrel))); - } - - table_close(conflictlogrel, RowExclusiveLock); } - /* Log into the server log if requested. */ + /* + * Report the conflict to the server log before inserting it into the + * conflict log table. Emitting it first guarantees the conflict is + * recorded even if the table insert below fails; it is also what raises the + * error for ERROR-level conflicts. When the server log is one of the + * destinations we emit the full details, otherwise (table-only) we emit a + * shorter message since the details are captured in the table. + */ if (log_dest_logfile) { StringInfoData err_detail; @@ -400,6 +386,64 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictTypeNames[type]), errdetail_internal("%s", err_detail.data)); } + else if (log_dest_table) + { + /* + * Not logging conflict details to the server log; report the conflict + * but omit raw tuple data since it is captured in the conflict log + * table. + */ + ereport(elevel, + errcode_apply_conflict(type), + errmsg("conflict detected on relation \"%s\": conflict=%s", + RelationGetQualifiedRelationName(localrel), + ConflictTypeNames[type]), + errdetail("Conflict details are logged to the conflict log table: %s", + RelationGetRelationName(conflictlogrel))); + } + + /* + * Insert into the conflict log table if requested. For conflicts below + * ERROR the apply transaction continues, so insert immediately; for + * ERROR-level conflicts the ereport() above already raised the error and + * the insertion is deferred to a new transaction + * (ProcessPendingConflictLogTuple) so that it is not rolled back. + */ + if (log_dest_table) + { + if (elevel < ERROR) + { + PG_TRY(); + { + InsertConflictLogTuple(conflictlogrel); + } + PG_CATCH(); + { + /* + * The insert failed, so the apply transaction will abort and + * the error will propagate to the worker's error handler. The + * conflict was already reported to the server log above, so it + * is not lost. Discard the prepared tuple so that the deferred + * insertion path (ProcessPendingConflictLogTuple) does not retry + * this same failing insert. + */ + if (MyLogicalRepWorker->conflict_log_tuple != NULL) + { + heap_freetuple(MyLogicalRepWorker->conflict_log_tuple); + MyLogicalRepWorker->conflict_log_tuple = NULL; + } + if (MyLogicalRepWorker->conflict_log_errcontext != NULL) + { + pfree(MyLogicalRepWorker->conflict_log_errcontext); + MyLogicalRepWorker->conflict_log_errcontext = NULL; + } + PG_RE_THROW(); + } + PG_END_TRY(); + } + + table_close(conflictlogrel, RowExclusiveLock); + } } /* @@ -428,60 +472,28 @@ ProcessPendingConflictLogTuple(void) if (MyLogicalRepWorker->conflict_log_tuple == NULL) return; - PG_TRY(); - { - StartTransactionCommand(); - PushActiveSnapshot(GetTransactionSnapshot()); - - /* Open conflict log table and insert the tuple */ - conflictlogrel = GetConflictLogDestAndTable(&dest); - Assert(conflictlogrel); - - InsertConflictLogTuple(conflictlogrel); - - table_close(conflictlogrel, RowExclusiveLock); - - PopActiveSnapshot(); - CommitTransactionCommand(); - } - PG_CATCH(); - { - ErrorData *edata; - MemoryContext oldctx; - - /* Save error info in our memory context */ - oldctx = MemoryContextSwitchTo(TopMemoryContext); - edata = CopyErrorData(); - MemoryContextSwitchTo(oldctx); - - /* Clear the error state so we can continue */ - FlushErrorState(); + /* + * Insert the deferred conflict log tuple in its own transaction. A + * failure here (e.g. the conflict log table was dropped, or an + * out-of-disk-space error) is treated like any other apply error and + * raises an ERROR; such failures are expected to be rare and persistent. + * Callers must therefore have already reported (and cleared) any + * in-progress apply error before calling this, so that this error does not + * mask the original one. + */ + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); - /* Abort the transaction we started above */ - AbortOutOfAnyTransaction(); + /* Open conflict log table and insert the tuple */ + conflictlogrel = GetConflictLogDestAndTable(&dest); + Assert(conflictlogrel); - /* - * Report the error as a warning. We use WARNING because we don't want - * this to be a fatal error for the worker, and we want to allow the - * caller's original error to remain primary. - */ - ereport(WARNING, - (errmsg("could not log conflict to table for subscription \"%s\": %s", - MySubscription->name, edata->message))); + InsertConflictLogTuple(conflictlogrel); - FreeErrorData(edata); + table_close(conflictlogrel, RowExclusiveLock); - /* - * Free the conflict log tuple and set it to NULL. This ensures we - * don't try to insert the same problematic tuple again. - */ - if (MyLogicalRepWorker->conflict_log_tuple != NULL) - { - heap_freetuple(MyLogicalRepWorker->conflict_log_tuple); - MyLogicalRepWorker->conflict_log_tuple = NULL; - } - } - PG_END_TRY(); + PopActiveSnapshot(); + CommitTransactionCommand(); } /* @@ -544,6 +556,19 @@ GetConflictLogDestAndTable(ConflictLogDest *log_dest) return table_open(conflictlogrelid, RowExclusiveLock); } +/* + * Error context callback for failures while inserting into the conflict log + * table. Adds a line identifying the conflict that was being logged. + */ +static void +conflict_log_insert_errcontext(void *arg) +{ + char *ctx = (char *) arg; + + if (ctx) + errcontext("%s", ctx); +} + /* * InsertConflictLogTuple * @@ -554,15 +579,34 @@ GetConflictLogDestAndTable(ConflictLogDest *log_dest) void InsertConflictLogTuple(Relation conflictlogrel) { + ErrorContextCallback errcallback; + /* A valid tuple must be prepared and stored in MyLogicalRepWorker. */ Assert(MyLogicalRepWorker->conflict_log_tuple != NULL); + /* + * Set up an error context so that a failure to insert (e.g. the conflict + * log table was dropped, or an out-of-space error) carries information + * identifying the conflict we were trying to log. + */ + errcallback.callback = conflict_log_insert_errcontext; + errcallback.arg = MyLogicalRepWorker->conflict_log_errcontext; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple, GetCurrentCommandId(true), HEAP_INSERT_NO_LOGICAL, NULL); - /* Free conflict log tuple. */ + error_context_stack = errcallback.previous; + + /* Free the conflict log tuple and its context string. */ heap_freetuple(MyLogicalRepWorker->conflict_log_tuple); MyLogicalRepWorker->conflict_log_tuple = NULL; + if (MyLogicalRepWorker->conflict_log_errcontext) + { + pfree(MyLogicalRepWorker->conflict_log_errcontext); + MyLogicalRepWorker->conflict_log_errcontext = NULL; + } } /* @@ -1381,5 +1425,15 @@ prepare_conflict_log_tuple(EState *estate, Relation rel, oldctx = MemoryContextSwitchTo(ApplyContext); MyLogicalRepWorker->conflict_log_tuple = heap_form_tuple(RelationGetDescr(conflictlogrel), values, nulls); + + /* + * Stash a context string describing this conflict, so that if inserting + * the tuple into the conflict log table fails, the resulting error carries + * enough context to identify the conflict (see InsertConflictLogTuple). + */ + MyLogicalRepWorker->conflict_log_errcontext = + psprintf("while logging conflict \"%s\" detected on relation \"%s\"", + ConflictTypeNames[conflict_type], + RelationGetRelationName(rel)); MemoryContextSwitchTo(oldctx); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 05ba6d8b1ad..69480cbc886 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -5658,6 +5658,9 @@ start_apply(XLogRecPtr origin_startpos) } PG_CATCH(); { + MemoryContext oldcontext; + ErrorData *edata; + /* * Reset the origin state to prevent the advancement of origin * progress if we fail to apply. Otherwise, this will result in @@ -5671,15 +5674,33 @@ start_apply(XLogRecPtr origin_startpos) else { /* - * Report the worker failed while applying changes. Abort the - * current transaction so that the stats message is sent in an - * idle state. + * Save the error and recover to an idle state so we can insert the + * deferred conflict log tuple (if any) before re-throwing. Copy + * the error into a long-lived context first, as it may have been + * raised under ErrorContext. Also reset the error context stack: + * the callbacks in effect when the error was thrown belong to + * unwound stack frames, and the deferred insert installs its own. */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + edata = CopyErrorData(); + MemoryContextSwitchTo(oldcontext); + + FlushErrorState(); + error_context_stack = NULL; AbortOutOfAnyTransaction(); pgstat_report_subscription_error(MySubscription->oid); + + /* + * Insert the deferred conflict log tuple in its own transaction. + * If this fails, that error (annotated with the conflict context, + * see InsertConflictLogTuple) propagates instead of the original; + * such failures are expected to be rare and persistent (e.g. out of + * disk space). + */ ProcessPendingConflictLogTuple(); - PG_RE_THROW(); + /* Re-throw the original error. */ + ReThrowError(edata); } } PG_END_TRY(); @@ -6046,14 +6067,19 @@ DisableSubscriptionAndExit(void) RESUME_INTERRUPTS(); + /* + * The error context callbacks in effect when the error was thrown belong + * to now-unwound stack frames; reset the stack before running further code + * (including the deferred conflict log insertion, which installs its own). + */ + error_context_stack = NULL; + /* * Report the worker failed during sequence synchronization, table * synchronization, or apply. */ pgstat_report_subscription_error(MyLogicalRepWorker->subid); - ProcessPendingConflictLogTuple(); - /* Disable the subscription */ StartTransactionCommand(); @@ -6076,6 +6102,19 @@ DisableSubscriptionAndExit(void) errmsg("subscription \"%s\" has been disabled because of an error", MySubscription->name)); + /* + * Insert the deferred conflict log tuple (if any) now that the + * subscription has been disabled and committed. Doing it after the + * disable means a failure to log the conflict (treated as a hard error, + * see ProcessPendingConflictLogTuple) cannot prevent the subscription from + * being disabled and so cannot leave the worker restarting and failing + * forever. Do it before the dead-tuple retention check below: that check + * only warns today, but it takes an elevel and could raise an error, which + * must not prevent the conflict from being recorded. The original error + * was already reported above. + */ + ProcessPendingConflictLogTuple(); + /* * Skip the track_commit_timestamp check when disabling the worker due to * an error, as verifying commit timestamps is unnecessary in this diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 6b6525dc2e2..c0059b1b810 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -103,6 +103,13 @@ typedef struct LogicalRepWorker /* A conflict log tuple that is prepared but not yet inserted. */ HeapTuple conflict_log_tuple; + /* + * Error-context string describing the conflict above, used to annotate any + * error raised while inserting conflict_log_tuple into the conflict log + * table. Allocated, like conflict_log_tuple, in ApplyContext. + */ + char *conflict_log_errcontext; + /* Stats. */ XLogRecPtr last_lsn; TimestampTz last_send_time; -- 2.54.0