diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 16fbd38373..3ba84a16ea 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -148,38 +148,47 @@ create_logical_replication_slot(char *name, char *plugin, temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, false, failover, false); - /* - * Ensure the logical decoding is enabled before initializing the logical - * decoding context. - */ - EnsureLogicalDecodingEnabled(); - Assert(IsLogicalDecodingEnabled()); + PG_TRY(); + { + /* + * Ensure the logical decoding is enabled before initializing the logical + * decoding context. + */ + EnsureLogicalDecodingEnabled(); + Assert(IsLogicalDecodingEnabled()); - /* - * Create logical decoding context to find start point or, if we don't - * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity. - * - * Note: when !find_startpoint this is still important, because it's at - * this point that the output plugin is validated. - */ - ctx = CreateInitDecodingContext(plugin, NIL, - false, /* just catalogs is OK */ - false, /* not repack */ - restart_lsn, - XL_ROUTINE(.page_read = read_local_xlog_page, - .segment_open = wal_segment_open, - .segment_close = wal_segment_close), - NULL, NULL, NULL); + /* + * Create logical decoding context to find start point or, if we don't + * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity. + * + * Note: when !find_startpoint this is still important, because it's at + * this point that the output plugin is validated. + */ + ctx = CreateInitDecodingContext(plugin, NIL, + false, /* just catalogs is OK */ + false, /* not repack */ + restart_lsn, + XL_ROUTINE(.page_read = read_local_xlog_page, + .segment_open = wal_segment_open, + .segment_close = wal_segment_close), + NULL, NULL, NULL); - /* - * If caller needs us to determine the decoding start point, do so now. - * This might take a while. - */ - if (find_startpoint) - DecodingContextFindStartpoint(ctx); + /* + * If caller needs us to determine the decoding start point, do so now. + * This might take a while. + */ + if (find_startpoint) + DecodingContextFindStartpoint(ctx); - /* don't need the decoding context anymore */ - FreeDecodingContext(ctx); + /* don't need the decoding context anymore */ + FreeDecodingContext(ctx); + } + PG_CATCH(); + { + ReplicationSlotRelease(); + PG_RE_THROW(); + } + PG_END_TRY(); } /* @@ -569,46 +578,55 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) /* Acquire the slot so we "own" it */ ReplicationSlotAcquire(NameStr(*slotname), true, true); - /* A slot whose restart_lsn has never been reserved cannot be advanced */ - if (!XLogRecPtrIsValid(MyReplicationSlot->data.restart_lsn)) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("replication slot \"%s\" cannot be advanced", - NameStr(*slotname)), - errdetail("This slot has never previously reserved WAL, or it has been invalidated."))); + PG_TRY(); + { + /* A slot whose restart_lsn has never been reserved cannot be advanced */ + if (!XLogRecPtrIsValid(MyReplicationSlot->data.restart_lsn)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot \"%s\" cannot be advanced", + NameStr(*slotname)), + errdetail("This slot has never previously reserved WAL, or it has been invalidated."))); - /* - * Check if the slot is not moving backwards. Physical slots rely simply - * on restart_lsn as a minimum point, while logical slots have confirmed - * consumption up to confirmed_flush, meaning that in both cases data - * older than that is not available anymore. - */ - if (OidIsValid(MyReplicationSlot->data.database)) - minlsn = MyReplicationSlot->data.confirmed_flush; - else - minlsn = MyReplicationSlot->data.restart_lsn; + /* + * Check if the slot is not moving backwards. Physical slots rely simply + * on restart_lsn as a minimum point, while logical slots have confirmed + * consumption up to confirmed_flush, meaning that in both cases data + * older than that is not available anymore. + */ + if (OidIsValid(MyReplicationSlot->data.database)) + minlsn = MyReplicationSlot->data.confirmed_flush; + else + minlsn = MyReplicationSlot->data.restart_lsn; - if (moveto < minlsn) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("cannot advance replication slot to %X/%08X, minimum is %X/%08X", - LSN_FORMAT_ARGS(moveto), LSN_FORMAT_ARGS(minlsn)))); + if (moveto < minlsn) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot advance replication slot to %X/%08X, minimum is %X/%08X", + LSN_FORMAT_ARGS(moveto), LSN_FORMAT_ARGS(minlsn)))); - /* Do the actual slot update, depending on the slot type */ - if (OidIsValid(MyReplicationSlot->data.database)) - endlsn = pg_logical_replication_slot_advance(moveto); - else - endlsn = pg_physical_replication_slot_advance(moveto); + /* Do the actual slot update, depending on the slot type */ + if (OidIsValid(MyReplicationSlot->data.database)) + endlsn = pg_logical_replication_slot_advance(moveto); + else + endlsn = pg_physical_replication_slot_advance(moveto); - values[0] = NameGetDatum(&MyReplicationSlot->data.name); - nulls[0] = false; + values[0] = NameGetDatum(&MyReplicationSlot->data.name); + nulls[0] = false; - /* - * Recompute the minimum LSN and xmin across all slots to adjust with the - * advancing potentially done. - */ - ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); + /* + * Recompute the minimum LSN and xmin across all slots to adjust with the + * advancing potentially done. + */ + ReplicationSlotsComputeRequiredXmin(false); + ReplicationSlotsComputeRequiredLSN(); + } + PG_CATCH(); + { + ReplicationSlotRelease(); + PG_RE_THROW(); + } + PG_END_TRY(); ReplicationSlotRelease(); @@ -763,7 +781,13 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) /* * Update the destination slot to current values of the source slot; * recheck that the source slot is still the one we saw previously. + * + * Use PG_TRY to ensure we release (and, for ephemeral slots, drop) + * the destination slot if any validation error occurs. Without this, + * an error caught by a PL/pgSQL EXCEPTION handler would leave + * MyReplicationSlot set, crashing on the next slot operation. */ + PG_TRY(); { TransactionId copy_effective_xmin; TransactionId copy_effective_catalog_xmin; @@ -797,9 +821,6 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) * or the restart_lsn either is invalid or has gone backward. (The * restart_lsn could go backwards if the source slot is dropped and * copied from an older slot during installation.) - * - * Since erroring out will release and drop the destination slot we - * don't need to release it here. */ if (copy_restart_lsn < src_restart_lsn || src_islogical != copy_islogical || @@ -857,6 +878,12 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) } #endif } + PG_CATCH(); + { + ReplicationSlotRelease(); + PG_RE_THROW(); + } + PG_END_TRY(); /* target slot fully created, mark as persistent if needed */ if (logical_slot && !temporary)