From 346f8e18f1dd834fbb1f2586b72c7cff46053fe2 Mon Sep 17 00:00:00 2001 From: bdrouvotAWS Date: Fri, 6 Aug 2021 09:42:43 +0000 Subject: [PATCH v23 3/5] Allow logical decoding on standby. Allow a logical slot to be created on standby. Restrict its usage or its creation if wal_level on primary is less than logical. During slot creation, it's restart_lsn is set to the last replayed LSN. Effectively, a logical slot creation on standby waits for an xl_running_xact record to arrive from primary. Conflicting slots would be handled in next commits. Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes Mello --- src/backend/access/transam/xlog.c | 33 ++++++++++- src/backend/access/transam/xlogfuncs.c | 2 +- src/backend/access/transam/xlogutils.c | 2 +- src/backend/postmaster/checkpointer.c | 4 +- src/backend/replication/logical/decode.c | 22 +++++++- src/backend/replication/logical/logical.c | 37 ++++++------ .../replication/logical/logicalfuncs.c | 2 +- src/backend/replication/slot.c | 56 ++++++++++--------- src/backend/replication/slotfuncs.c | 2 +- src/backend/replication/walreceiver.c | 4 +- src/backend/replication/walreceiverfuncs.c | 2 +- src/backend/replication/walsender.c | 26 ++++++--- src/include/access/xlog.h | 3 +- 13 files changed, 132 insertions(+), 63 deletions(-) 16.7% src/backend/access/transam/ 32.7% src/backend/replication/logical/ 45.2% src/backend/replication/ 5.3% src/ diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 7b6d75cd4b..d3c04182a0 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -5082,6 +5082,17 @@ LocalProcessControlFile(bool reset) ReadControlFile(); } +/* + * Get the wal_level from the control file. For a standby, this value should be + * considered as its active wal_level, because it may be different from what + * was originally configured on standby. + */ +WalLevel +GetActiveWalLevelOnStandby(void) +{ + return ControlFile->wal_level; +} + /* * Initialization of shared memory for XLOG */ @@ -9608,7 +9619,7 @@ CreateRestartPoint(int flags) * whichever is later. */ receivePtr = GetWalRcvFlushRecPtr(NULL, NULL); - replayPtr = GetXLogReplayRecPtr(&replayTLI); + replayPtr = GetXLogReplayRecPtr(&replayTLI, false); endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; KeepLogSeg(endptr, &_logSegNo); if (InvalidateObsoleteReplicationSlots(_logSegNo)) @@ -11746,7 +11757,7 @@ register_persistent_abort_backup_handler(void) * Exported to allow WALReceiver to read the pointer directly. */ XLogRecPtr -GetXLogReplayRecPtr(TimeLineID *replayTLI) +GetXLogReplayRecPtr(TimeLineID *replayTLI, bool avoid_header) { XLogRecPtr recptr; TimeLineID tli; @@ -11758,6 +11769,24 @@ GetXLogReplayRecPtr(TimeLineID *replayTLI) if (replayTLI) *replayTLI = tli; + + /* + * Replay pointer may point one past the end of the record. If that + * is a XLOG page boundary, it will not be a valid LSN for the + * start of a record, so bump it up past the page header. + */ + if (avoid_header && !XRecOffIsValid(recptr)) + { + if (recptr % XLOG_BLCKSZ != 0) + elog(ERROR, "invalid replay pointer"); + + /* For the first page of a segment file, it's a long header */ + if (XLogSegmentOffset(recptr, wal_segment_size) == 0) + recptr += SizeOfXLogLongPHD; + else + recptr += SizeOfXLogShortPHD; + } + return recptr; } diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c index b98deb72ec..a173f8d6fc 100644 --- a/src/backend/access/transam/xlogfuncs.c +++ b/src/backend/access/transam/xlogfuncs.c @@ -417,7 +417,7 @@ pg_last_wal_replay_lsn(PG_FUNCTION_ARGS) { XLogRecPtr recptr; - recptr = GetXLogReplayRecPtr(NULL); + recptr = GetXLogReplayRecPtr(NULL, false); if (recptr == 0) PG_RETURN_NULL(); diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index b1702bc6be..9cc2a2144b 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -870,7 +870,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, if (!RecoveryInProgress()) read_upto = GetFlushRecPtr(); else - read_upto = GetXLogReplayRecPtr(&ThisTimeLineID); + read_upto = GetXLogReplayRecPtr(&ThisTimeLineID, false); tli = ThisTimeLineID; /* diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c index be7366379d..ea1bf7d247 100644 --- a/src/backend/postmaster/checkpointer.c +++ b/src/backend/postmaster/checkpointer.c @@ -434,7 +434,7 @@ CheckpointerMain(void) */ ckpt_active = true; if (do_restartpoint) - ckpt_start_recptr = GetXLogReplayRecPtr(NULL); + ckpt_start_recptr = GetXLogReplayRecPtr(NULL, false); else ckpt_start_recptr = GetInsertRecPtr(); ckpt_start_time = now; @@ -794,7 +794,7 @@ IsCheckpointOnSchedule(double progress) * value that was in effect when the WAL was generated). */ if (RecoveryInProgress()) - recptr = GetXLogReplayRecPtr(NULL); + recptr = GetXLogReplayRecPtr(NULL, false); else recptr = GetInsertRecPtr(); elapsed_xlogs = (((double) (recptr - ckpt_start_recptr)) / diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 2874dc0612..b8be0c83ed 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -215,11 +215,31 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * can restart from there. */ break; + case XLOG_PARAMETER_CHANGE: + { + xl_parameter_change *xlrec = + (xl_parameter_change *) XLogRecGetData(buf->record); + + /* + * If wal_level on primary is reduced to less than logical, then we + * want to prevent existing logical slots from being used. + * Existing logical slots on standby get invalidated when this WAL + * record is replayed; and further, slot creation fails when the + * wal level is not sufficient; but all these operations are not + * synchronized, so a logical slot may creep in while the wal_level + * is being reduced. Hence this extra check. + */ + if (xlrec->wal_level < WAL_LEVEL_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding on standby requires " + "wal_level >= logical on master"))); + break; + } case XLOG_NOOP: case XLOG_NEXTOID: case XLOG_SWITCH: case XLOG_BACKUP_END: - case XLOG_PARAMETER_CHANGE: case XLOG_RESTORE_POINT: case XLOG_FPW_CHANGE: case XLOG_FPI_FOR_HINT: diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 64b8280c13..c134feef89 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -119,23 +119,22 @@ CheckLogicalDecodingRequirements(void) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical decoding requires a database connection"))); - /* ---- - * TODO: We got to change that someday soon... - * - * There's basically three things missing to allow this: - * 1) We need to be able to correctly and quickly identify the timeline a - * LSN belongs to - * 2) We need to force hot_standby_feedback to be enabled at all times so - * the primary cannot remove rows we need. - * 3) support dropping replication slots referring to a database, in - * dbase_redo. There can't be any active ones due to HS recovery - * conflicts, so that should be relatively easy. - * ---- - */ if (RecoveryInProgress()) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("logical decoding cannot be used while in recovery"))); + { + /* + * This check may have race conditions, but whenever + * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we + * verify that there are no existing logical replication slots. And to + * avoid races around creating a new slot, + * CheckLogicalDecodingRequirements() is called once before creating + * the slot, and once when logical decoding is initially starting up. + */ + if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding on standby requires " + "wal_level >= logical on master"))); + } } /* @@ -330,6 +329,12 @@ CreateInitDecodingContext(const char *plugin, LogicalDecodingContext *ctx; MemoryContext old_context; + /* + * On standby, this check is also required while creating the slot. Check + * the comments in this function. + */ + CheckLogicalDecodingRequirements(); + /* shorter lines... */ slot = MyReplicationSlot; diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 1d9400ea63..9069f3e50d 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -223,7 +223,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin if (!RecoveryInProgress()) end_of_wal = GetFlushRecPtr(); else - end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID); + end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID, false); ReplicationSlotAcquire(NameStr(*name), true); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 6e89bab255..251af17253 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1088,37 +1088,28 @@ ReplicationSlotReserveWal(void) /* * For logical slots log a standby snapshot and start logical decoding * at exactly that position. That allows the slot to start up more - * quickly. + * quickly. But on a standby we cannot do WAL writes, so just use the + * replay pointer; effectively, an attempt to create a logical slot on + * standby will cause it to wait for an xl_running_xact record to be + * logged independently on the primary, so that a snapshot can be built + * using the record. * - * That's not needed (or indeed helpful) for physical slots as they'll - * start replay at the last logged checkpoint anyway. Instead return - * the location of the last redo LSN. While that slightly increases - * the chance that we have to retry, it's where a base backup has to - * start replay at. + * None of this is needed (or indeed helpful) for physical slots as + * they'll start replay at the last logged checkpoint anyway. Instead + * return the location of the last redo LSN. While that slightly + * increases the chance that we have to retry, it's where a base backup + * has to start replay at. */ - if (!RecoveryInProgress() && SlotIsLogical(slot)) - { - XLogRecPtr flushptr; - - /* start at current insert position */ + if (SlotIsPhysical(slot)) + restart_lsn = GetRedoRecPtr(); + else if (RecoveryInProgress()) + restart_lsn = GetXLogReplayRecPtr(NULL, true); + else restart_lsn = GetXLogInsertRecPtr(); - SpinLockAcquire(&slot->mutex); - slot->data.restart_lsn = restart_lsn; - SpinLockRelease(&slot->mutex); - - /* make sure we have enough information to start */ - flushptr = LogStandbySnapshot(); - /* and make sure it's fsynced to disk */ - XLogFlush(flushptr); - } - else - { - restart_lsn = GetRedoRecPtr(); - SpinLockAcquire(&slot->mutex); - slot->data.restart_lsn = restart_lsn; - SpinLockRelease(&slot->mutex); - } + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); /* prevent WAL removal as fast as possible */ ReplicationSlotsComputeRequiredLSN(); @@ -1134,6 +1125,17 @@ ReplicationSlotReserveWal(void) if (XLogGetLastRemovedSegno() < segno) break; } + + if (!RecoveryInProgress() && SlotIsLogical(slot)) + { + XLogRecPtr flushptr; + + /* make sure we have enough information to start */ + flushptr = LogStandbySnapshot(); + + /* and make sure it's fsynced to disk */ + XLogFlush(flushptr); + } } /* diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 31e74d3832..48d24442e2 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -636,7 +636,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) if (!RecoveryInProgress()) moveto = Min(moveto, GetFlushRecPtr()); else - moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID)); + moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID, false)); /* Acquire the slot so we "own" it */ ReplicationSlotAcquire(NameStr(*slotname), true); diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 9a2bc37fd7..74d3fa0cf0 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -407,7 +407,7 @@ WalReceiverMain(void) first_stream = false; /* Initialize LogstreamResult and buffers for processing messages */ - LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL); + LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL, false); initStringInfo(&reply_message); initStringInfo(&incoming_message); @@ -1072,7 +1072,7 @@ XLogWalRcvSendReply(bool force, bool requestReply) /* Construct a new message */ writePtr = LogstreamResult.Write; flushPtr = LogstreamResult.Flush; - applyPtr = GetXLogReplayRecPtr(NULL); + applyPtr = GetXLogReplayRecPtr(NULL, false); resetStringInfo(&reply_message); pq_sendbyte(&reply_message, 'r'); diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 6f0acbfdef..f264b71f73 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -372,7 +372,7 @@ GetReplicationApplyDelay(void) receivePtr = walrcv->flushedUpto; SpinLockRelease(&walrcv->mutex); - replayPtr = GetXLogReplayRecPtr(NULL); + replayPtr = GetXLogReplayRecPtr(NULL, false); if (receivePtr == replayPtr) return 0; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d0e247b104..da2533e1c9 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1174,6 +1174,16 @@ StartLogicalReplication(StartReplicationCmd *cmd) got_STOPPING = true; } + /* + * In case of logical decoding on standby it may be that ThisTimeLineID + * is not set yet. + * Indeed we are not going through InitXLOGAccess on a Standby and + * it may also be that IdentifySystem has not been called yet. + * So let's get it through GetXLogReplayRecPtr(). + */ + if (ThisTimeLineID == 0) + (void) GetXLogReplayRecPtr(&ThisTimeLineID, false); + /* * Create our decoding context, making it start at the previously ack'ed * position. @@ -1398,7 +1408,7 @@ WalSndWaitForWal(XLogRecPtr loc) if (!RecoveryInProgress()) RecentFlushPtr = GetFlushRecPtr(); else - RecentFlushPtr = GetXLogReplayRecPtr(NULL); + RecentFlushPtr = GetXLogReplayRecPtr(NULL, false); for (;;) { @@ -1432,7 +1442,7 @@ WalSndWaitForWal(XLogRecPtr loc) if (!RecoveryInProgress()) RecentFlushPtr = GetFlushRecPtr(); else - RecentFlushPtr = GetXLogReplayRecPtr(NULL); + RecentFlushPtr = GetXLogReplayRecPtr(NULL, false); /* * If postmaster asked us to stop, don't wait anymore. @@ -2898,10 +2908,12 @@ XLogSendLogical(void) * If first time through in this session, initialize flushPtr. Otherwise, * we only need to update flushPtr if EndRecPtr is past it. */ - if (flushPtr == InvalidXLogRecPtr) - flushPtr = GetFlushRecPtr(); - else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) - flushPtr = GetFlushRecPtr(); + if (flushPtr == InvalidXLogRecPtr || + logical_decoding_ctx->reader->EndRecPtr >= flushPtr) + { + flushPtr = (am_cascading_walsender ? + GetStandbyFlushRecPtr() : GetFlushRecPtr()); + } /* If EndRecPtr is still past our flushPtr, it means we caught up. */ if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) @@ -2990,7 +3002,7 @@ GetStandbyFlushRecPtr(void) */ receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI); - replayPtr = GetXLogReplayRecPtr(&replayTLI); + replayPtr = GetXLogReplayRecPtr(&replayTLI, false); ThisTimeLineID = replayTLI; diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 0a8ede700d..1d44637bef 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -282,7 +282,7 @@ extern bool HotStandbyActive(void); extern bool HotStandbyActiveInReplay(void); extern bool XLogInsertAllowed(void); extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); -extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI); +extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI, bool avoid_header); extern XLogRecPtr GetXLogInsertRecPtr(void); extern XLogRecPtr GetXLogWriteRecPtr(void); extern RecoveryPauseState GetRecoveryPauseState(void); @@ -299,6 +299,7 @@ extern Size XLOGShmemSize(void); extern void XLOGShmemInit(void); extern void BootStrapXLOG(void); extern void LocalProcessControlFile(bool reset); +extern WalLevel GetActiveWalLevelOnStandby(void); extern void StartupXLOG(void); extern void ShutdownXLOG(int code, Datum arg); extern void InitXLOGAccess(void); -- 2.18.4