From e482640d28a9460d24f722ccfaabf6171e24c9f8 Mon Sep 17 00:00:00 2001 From: bdrouvotAWS Date: Tue, 20 Dec 2022 08:36:32 +0000 Subject: [PATCH v33 3/6] Allow logical decoding on standby. Allow a logical slot to be created on standby. Restrict its usage or its creation if wal_level on primary is less than logical. During slot creation, it's restart_lsn is set to the last replayed LSN. Effectively, a logical slot creation on standby waits for an xl_running_xact record to arrive from primary. Conflicting slots would be handled in next commits. Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes Mello --- src/backend/access/transam/xlog.c | 11 ++++ src/backend/replication/logical/decode.c | 22 ++++++- src/backend/replication/logical/logical.c | 37 +++++++----- src/backend/replication/slot.c | 73 +++++++++++++++-------- src/backend/replication/walsender.c | 27 +++++---- src/include/access/xlog.h | 1 + 6 files changed, 118 insertions(+), 53 deletions(-) 4.5% src/backend/access/transam/ 36.6% src/backend/replication/logical/ 57.9% src/backend/replication/ diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index fca6ee4584..f9cc842a6a 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -4462,6 +4462,17 @@ LocalProcessControlFile(bool reset) ReadControlFile(); } +/* + * Get the wal_level from the control file. For a standby, this value should be + * considered as its active wal_level, because it may be different from what + * was originally configured on standby. + */ +WalLevel +GetActiveWalLevelOnStandby(void) +{ + return ControlFile->wal_level; +} + /* * Initialization of shared memory for XLOG */ diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 2cc0ac9eb0..c210721ab0 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -152,11 +152,31 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * can restart from there. */ break; + case XLOG_PARAMETER_CHANGE: + { + xl_parameter_change *xlrec = + (xl_parameter_change *) XLogRecGetData(buf->record); + + /* + * If wal_level on primary is reduced to less than logical, then we + * want to prevent existing logical slots from being used. + * Existing logical slots on standby get invalidated when this WAL + * record is replayed; and further, slot creation fails when the + * wal level is not sufficient; but all these operations are not + * synchronized, so a logical slot may creep in while the wal_level + * is being reduced. Hence this extra check. + */ + if (xlrec->wal_level < WAL_LEVEL_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding on standby requires " + "wal_level to be at least logical on master"))); + break; + } case XLOG_NOOP: case XLOG_NEXTOID: case XLOG_SWITCH: case XLOG_BACKUP_END: - case XLOG_PARAMETER_CHANGE: case XLOG_RESTORE_POINT: case XLOG_FPW_CHANGE: case XLOG_FPI_FOR_HINT: diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 625a7f4273..a9567f2d8c 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -119,23 +119,22 @@ CheckLogicalDecodingRequirements(void) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical decoding requires a database connection"))); - /* ---- - * TODO: We got to change that someday soon... - * - * There's basically three things missing to allow this: - * 1) We need to be able to correctly and quickly identify the timeline a - * LSN belongs to - * 2) We need to force hot_standby_feedback to be enabled at all times so - * the primary cannot remove rows we need. - * 3) support dropping replication slots referring to a database, in - * dbase_redo. There can't be any active ones due to HS recovery - * conflicts, so that should be relatively easy. - * ---- - */ if (RecoveryInProgress()) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("logical decoding cannot be used while in recovery"))); + { + /* + * This check may have race conditions, but whenever + * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we + * verify that there are no existing logical replication slots. And to + * avoid races around creating a new slot, + * CheckLogicalDecodingRequirements() is called once before creating + * the slot, and once when logical decoding is initially starting up. + */ + if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding on standby requires " + "wal_level to be at least logical on master"))); + } } /* @@ -331,6 +330,12 @@ CreateInitDecodingContext(const char *plugin, LogicalDecodingContext *ctx; MemoryContext old_context; + /* + * On standby, this check is also required while creating the slot. Check + * the comments in this function. + */ + CheckLogicalDecodingRequirements(); + /* shorter lines... */ slot = MyReplicationSlot; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 6a4e2cd19b..f554dac6fd 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -51,6 +51,7 @@ #include "storage/proc.h" #include "storage/procarray.h" #include "utils/builtins.h" +#include "access/xlogrecovery.h" /* * Replication slot on-disk data structure. @@ -1175,37 +1176,46 @@ ReplicationSlotReserveWal(void) /* * For logical slots log a standby snapshot and start logical decoding * at exactly that position. That allows the slot to start up more - * quickly. + * quickly. But on a standby we cannot do WAL writes, so just use the + * replay pointer; effectively, an attempt to create a logical slot on + * standby will cause it to wait for an xl_running_xact record to be + * logged independently on the primary, so that a snapshot can be built + * using the record. * - * That's not needed (or indeed helpful) for physical slots as they'll - * start replay at the last logged checkpoint anyway. Instead return - * the location of the last redo LSN. While that slightly increases - * the chance that we have to retry, it's where a base backup has to - * start replay at. + * None of this is needed (or indeed helpful) for physical slots as + * they'll start replay at the last logged checkpoint anyway. Instead + * return the location of the last redo LSN. While that slightly + * increases the chance that we have to retry, it's where a base backup + * has to start replay at. */ - if (!RecoveryInProgress() && SlotIsLogical(slot)) + if (SlotIsPhysical(slot)) + restart_lsn = GetRedoRecPtr(); + else if (RecoveryInProgress()) { - XLogRecPtr flushptr; - - /* start at current insert position */ - restart_lsn = GetXLogInsertRecPtr(); - SpinLockAcquire(&slot->mutex); - slot->data.restart_lsn = restart_lsn; - SpinLockRelease(&slot->mutex); - - /* make sure we have enough information to start */ - flushptr = LogStandbySnapshot(); - - /* and make sure it's fsynced to disk */ - XLogFlush(flushptr); + restart_lsn = GetXLogReplayRecPtr(NULL); + /* + * Replay pointer may point one past the end of the record. If that + * is a XLOG page boundary, it will not be a valid LSN for the + * start of a record, so bump it up past the page header. + */ + if (!XRecOffIsValid(restart_lsn)) + { + if (restart_lsn % XLOG_BLCKSZ != 0) + elog(ERROR, "invalid replay pointer"); + + /* For the first page of a segment file, it's a long header */ + if (XLogSegmentOffset(restart_lsn, wal_segment_size) == 0) + restart_lsn += SizeOfXLogLongPHD; + else + restart_lsn += SizeOfXLogShortPHD; + } } else - { - restart_lsn = GetRedoRecPtr(); - SpinLockAcquire(&slot->mutex); - slot->data.restart_lsn = restart_lsn; - SpinLockRelease(&slot->mutex); - } + restart_lsn = GetXLogInsertRecPtr(); + + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); /* prevent WAL removal as fast as possible */ ReplicationSlotsComputeRequiredLSN(); @@ -1221,6 +1231,17 @@ ReplicationSlotReserveWal(void) if (XLogGetLastRemovedSegno() < segno) break; } + + if (!RecoveryInProgress() && SlotIsLogical(slot)) + { + XLogRecPtr flushptr; + + /* make sure we have enough information to start */ + flushptr = LogStandbySnapshot(); + + /* and make sure it's fsynced to disk */ + XLogFlush(flushptr); + } } /* diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 64fbd52e34..9662e316c9 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -906,14 +906,18 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req int count; WALReadError errinfo; XLogSegNo segno; - TimeLineID currTLI = GetWALInsertionTimeLine(); + TimeLineID currTLI; /* - * Since logical decoding is only permitted on a primary server, we know - * that the current timeline ID can't be changing any more. If we did this - * on a standby, we'd have to worry about the values we compute here - * becoming invalid due to a promotion or timeline change. + * Since logical decoding is also permitted on a standby server, we need + * to check if the server is in recovery to decide how to get the current + * timeline ID (so that it also cover the promotion or timeline change cases). */ + if (!RecoveryInProgress()) + currTLI = GetWALInsertionTimeLine(); + else + GetXLogReplayRecPtr(&currTLI); + XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI); sendTimeLineIsHistoric = (state->currTLI != currTLI); sendTimeLine = state->currTLI; @@ -3074,10 +3078,12 @@ XLogSendLogical(void) * If first time through in this session, initialize flushPtr. Otherwise, * we only need to update flushPtr if EndRecPtr is past it. */ - if (flushPtr == InvalidXLogRecPtr) - flushPtr = GetFlushRecPtr(NULL); - else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) - flushPtr = GetFlushRecPtr(NULL); + if (flushPtr == InvalidXLogRecPtr || + logical_decoding_ctx->reader->EndRecPtr >= flushPtr) + { + flushPtr = (am_cascading_walsender ? + GetStandbyFlushRecPtr(NULL) : GetFlushRecPtr(NULL)); + } /* If EndRecPtr is still past our flushPtr, it means we caught up. */ if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) @@ -3168,7 +3174,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli) receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI); replayPtr = GetXLogReplayRecPtr(&replayTLI); - *tli = replayTLI; + if (tli) + *tli = replayTLI; result = replayPtr; if (receiveTLI == replayTLI && receivePtr > replayPtr) diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 1fbd48fbda..027e155e8e 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -230,6 +230,7 @@ extern void XLOGShmemInit(void); extern void BootStrapXLOG(void); extern void InitializeWalConsistencyChecking(void); extern void LocalProcessControlFile(bool reset); +extern WalLevel GetActiveWalLevelOnStandby(void); extern void StartupXLOG(void); extern void ShutdownXLOG(int code, Datum arg); extern void CreateCheckPoint(int flags); -- 2.34.1