From 3c775ee99820ea3e915e1a859fa399651e170ffc Mon Sep 17 00:00:00 2001 From: Petr Jelinek Date: Mon, 24 Oct 2016 17:40:40 +0200 Subject: [PATCH 1/2] Follow timeline switches in logical decoding When decoding from a logical slot, it's necessary for xlog reading to be able to read xlog from historical (i.e. not current) timelines. Otherwise decoding fails after failover to a physical replica because the oldest still-needed archives are in the historical timeline. Supporting logical decoding timeline following is a pre-requisite for logical decoding on physical standby servers. It also makes it possible to promote a replica with logical slots to a master and replay from those slots, allowing logical decoding applications to follow physical failover. Logical slots cannot actually be created on a replica without use of the low-level C slot management APIs so this is mostly foundation work for subsequent changes to enable logical decoding on standbys. This commit includes a module in src/test/modules with functions to manipulate the slots (which is not otherwise possible in SQL code) in order to enable testing, and a new test in src/test/recovery to ensure that the behavior is as expected. Note that an earlier version of logical decoding timeline following was committed to 9.5 as 24c5f1a103ce, 3a3b309041b0, 82c83b337202, and f07d18b6e94d. It was then reverted by c1543a81a7a8 just after 9.5 feature freeze when issues were discovered too late to safely fix them in the 9.5 release cycle. The prior approach failed to consider that a record could be split across pages that are on different segments, where the new segment contains the start of a new timeline. In that case the old segment might be missing or renamed with a .partial suffix. This patch reworks the logic to be page-based and in the process simplify how the last timeline for a segment is looked up. Slot timeline following only works in a backend. Frontend support can be aded separately, where it could be useful for pg_xlogdump etc once support for timeline.c, List, etc is added for frontend code. --- src/backend/access/transam/xlogutils.c | 207 +++++++++++++++++++++++-- src/backend/replication/logical/logicalfuncs.c | 12 +- src/include/access/xlogreader.h | 11 ++ 3 files changed, 209 insertions(+), 21 deletions(-) diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 51a8e8d..014978f 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -19,6 +19,7 @@ #include +#include "access/timeline.h" #include "access/xlog.h" #include "access/xlog_internal.h" #include "access/xlogutils.h" @@ -660,6 +661,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) /* state maintained across calls */ static int sendFile = -1; static XLogSegNo sendSegNo = 0; + static TimeLineID sendTLI = 0; static uint32 sendOff = 0; p = buf; @@ -675,7 +677,8 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) startoff = recptr % XLogSegSize; /* Do we need to switch to a different xlog segment? */ - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo)) + if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) || + sendTLI != tli) { char path[MAXPGPATH]; @@ -702,6 +705,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) path))); } sendOff = 0; + sendTLI = tli; } /* Need to seek in the file? */ @@ -750,6 +754,142 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) } /* + * Determine which timeline to read an xlog page from and set the + * XLogReaderState's state->currTLI to that timeline ID. + * + * wantPage must be set to the start address of the page to read and + * wantLength to the amount of the page that will be read, up to + * XLOG_BLCKSZ. If the amount to be read isn't known, pass XLOG_BLCKSZ. + * + * We switch to an xlog segment from the new timeline eagerly when on a + * historical timeline, as soon as we reach the start of the xlog segment + * containing the timeline switch. The server copied the segment to the new + * timeline so all the data up to the switch point is the same, but there's no + * guarantee the old segment will still exist. It may have been deleted or + * renamed with a .partial suffix so we can't necessarily keep reading from + * the old TLI even though tliSwitchPoint says it's OK. + * + * We can't just check the timeline when we read a page on a different segment + * to the last page. We could've received a timeline switch from a cascading + * upstream, so the current segment ends and we have to switch to a new one. + * Even in the middle of reading a page we could have to dump the cached page + * and switch to a new TLI. + * + * Because of this, callers MAY NOT assume that currTLI is the timeline that + * will be in a page's xlp_tli; the page may begin on an older timeline or we + * might be reading from historical timeline data on a segment that's been + * copied to a new timeline. + * + * The caller must also make sure it doesn't read past the current redo pointer + * so it doesn't fail to notice that the current timeline became historical. + */ +static void +XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength) +{ + const XLogRecPtr lastReadPage = state->readSegNo * XLogSegSize + state->readOff; + + elog(DEBUG4, "Determining timeline for read at %X/%X+%X", + (uint32)(wantPage>>32), (uint32)wantPage, wantLength); + + Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0); + Assert(wantLength <= XLOG_BLCKSZ); + Assert(state->readLen == 0 || state->readLen <= XLOG_BLCKSZ); + + /* + * If the desired page is currently read in and valid, we have nothing to do. + * + * The caller should've ensured that it didn't previously advance readOff + * past the valid limit of this timeline, so it doesn't matter if the current + * TLI has since become historical. + */ + if (lastReadPage == wantPage && + state->readLen != 0 && + lastReadPage + state->readLen >= wantPage + Min(wantLength,XLOG_BLCKSZ-1)) + { + elog(DEBUG4, "Wanted data already valid"); //XXX + return; + } + + /* + * If we're reading from the current timeline, it hasn't become historical + * and the page we're reading is after the last page read, we can again + * just carry on. (Seeking backwards requires a check to make sure the older + * page isn't on a prior timeline). + */ + if (state->currTLI == ThisTimeLineID && wantPage >= lastReadPage) + { + Assert(state->currTLIValidUntil == InvalidXLogRecPtr); + elog(DEBUG4, "On current timeline"); + return; + } + + /* + * If we're just reading pages from a previously validated historical + * timeline and the timeline we're reading from is valid until the + * end of the current segment we can just keep reading. + */ + if (state->currTLIValidUntil != InvalidXLogRecPtr && + state->currTLI != ThisTimeLineID && + state->currTLI != 0 && + (wantPage + wantLength) / XLogSegSize < state->currTLIValidUntil / XLogSegSize) + { + elog(DEBUG4, "Still on historical timeline %u until %X/%X", + state->currTLI, + (uint32)(state->currTLIValidUntil >> 32), + (uint32)(state->currTLIValidUntil)); + return; + } + + /* + * If we reach this point we're either looking up a page for random access, + * the current timeline just became historical, or we're reading from a new + * segment containing a timeline switch. In all cases we need to determine + * the newest timeline on the segment. + * + * If it's the current timeline we can just keep reading from here unless + * we detect a timeline switch that makes the current timeline historical. + * If it's a historical timeline we can read all the segment on the newest + * timeline because it contains all the old timelines' data too. So only + * one switch check is required. + */ + { + /* + * We need to re-read the timeline history in case it's been changed + * by a promotion or replay from a cascaded replica. + */ + List *timelineHistory = readTimeLineHistory(ThisTimeLineID); + + XLogRecPtr endOfSegment = (((wantPage / XLogSegSize) + 1) * XLogSegSize) - 1; + + Assert(wantPage / XLogSegSize == endOfSegment / XLogSegSize); + + /* Find the timeline of the last LSN on the segment containing wantPage. */ + state->currTLI = tliOfPointInHistory(endOfSegment, timelineHistory); + state->currTLIValidUntil = tliSwitchPoint(state->currTLI, timelineHistory, NULL); + + Assert(state->currTLIValidUntil == InvalidXLogRecPtr || + wantPage + wantLength < state->currTLIValidUntil); + + list_free_deep(timelineHistory); + + elog(DEBUG3, "switched to timeline %u valid until %X/%X", + state->currTLI, + (uint32)(state->currTLIValidUntil >> 32), + (uint32)(state->currTLIValidUntil)); + } + + elog(DEBUG3, "page read ptr %X/%X (for record %X/%X) is on segment with TLI %u valid until %X/%X, server current TLI is %u", + (uint32) (wantPage >> 32), + (uint32) wantPage, + (uint32) (state->currRecPtr >> 32), + (uint32) state->currRecPtr, + state->currTLI, + (uint32) (state->currTLIValidUntil >> 32), + (uint32) (state->currTLIValidUntil), + ThisTimeLineID); +} + +/* * read_page callback for reading local xlog files * * Public because it would likely be very helpful for someone writing another @@ -770,28 +910,65 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int count; loc = targetPagePtr + reqLen; + + /* Make sure enough xlog is available... */ while (1) { /* - * TODO: we're going to have to do something more intelligent about - * timelines on standbys. Use readTimeLineHistory() and - * tliOfPointInHistory() to get the proper LSN? For now we'll catch - * that case earlier, but the code and TODO is left in here for when - * that changes. + * Check which timeline to get the record from. + * + * We have to do it each time through the loop because if we're in + * recovery as a cascading standby, the current timeline might've + * become historical. */ - if (!RecoveryInProgress()) + XLogReadDetermineTimeline(state, targetPagePtr, reqLen); + + if (state->currTLI == ThisTimeLineID) { - *pageTLI = ThisTimeLineID; - read_upto = GetFlushRecPtr(); + /* + * We're reading from the current timeline so we might have to + * wait for the desired record to be generated (or, for a standby, + * received & replayed) + */ + if (!RecoveryInProgress()) + { + *pageTLI = ThisTimeLineID; + read_upto = GetFlushRecPtr(); + } + else + read_upto = GetXLogReplayRecPtr(pageTLI); + + if (loc <= read_upto) + break; + + CHECK_FOR_INTERRUPTS(); + pg_usleep(1000L); } else - read_upto = GetXLogReplayRecPtr(pageTLI); - - if (loc <= read_upto) + { + /* + * We're on a historical timeline, so limit reading to the switch + * point where we moved to the next timeline. + * + * We don't need to GetFlushRecPtr or GetXLogReplayRecPtr. We know + * about the new timeline, so we must've received past the end of + * it. + */ + read_upto = state->currTLIValidUntil; + + /* + * Setting pageTLI to our wanted record's TLI is slightly wrong; + * the page might begin on an older timeline if it contains a + * timeline switch, since its xlog segment will have been copied + * from the prior timeline. This is pretty harmless though, as + * nothing cares so long as the timeline doesn't go backwards. We + * should read the page header instead; FIXME someday. + */ + *pageTLI = state->currTLI; + + /* No need to wait on a historical timeline */ break; - - CHECK_FOR_INTERRUPTS(); - pg_usleep(1000L); + } } if (targetPagePtr + XLOG_BLCKSZ <= read_upto) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 318726e..4315fb3 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -234,12 +234,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin rsinfo->setResult = p->tupstore; rsinfo->setDesc = p->tupdesc; - /* compute the current end-of-wal */ - if (!RecoveryInProgress()) - end_of_wal = GetFlushRecPtr(); - else - end_of_wal = GetXLogReplayRecPtr(NULL); - ReplicationSlotAcquire(NameStr(*name)); PG_TRY(); @@ -279,6 +273,12 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin /* invalidate non-timetravel entries */ InvalidateSystemCaches(); + if (!RecoveryInProgress()) + end_of_wal = GetFlushRecPtr(); + else + end_of_wal = GetXLogReplayRecPtr(NULL); + + /* Decode until we run out of records */ while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) || (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal)) { diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index deaa7f5..caff9a6 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -27,6 +27,10 @@ #include "access/xlogrecord.h" +#ifndef FRONTEND +#include "nodes/pg_list.h" +#endif + typedef struct XLogReaderState XLogReaderState; /* Function type definition for the read_page callback */ @@ -160,6 +164,13 @@ struct XLogReaderState /* beginning of the WAL record being read. */ XLogRecPtr currRecPtr; + /* timeline to read it from, 0 if a lookup is required */ + TimeLineID currTLI; + /* + * Safe point to read to in currTLI if current TLI is historical + * (tliSwitchPoint) or InvalidXLogRecPtr if on current timeline. + */ + XLogRecPtr currTLIValidUntil; /* Buffer for current ReadRecord result (expandable) */ char *readRecordBuf; -- 2.7.4