Re: Timeline following for logical slots

From: Andres Freund <andres(at)anarazel(dot)de>
To: Alvaro Herrera <alvherre(at)2ndquadrant(dot)com>
Cc: Petr Jelinek <petr(at)2ndquadrant(dot)com>, Craig Ringer <craig(at)2ndquadrant(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Timeline following for logical slots
Date: 2016-03-15 09:12:17
Message-ID: 20160315091217.zkf6dtxfr7jhg4pr@alap3.anarazel.de
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi,

On 2016-03-14 20:10:58 -0300, Alvaro Herrera wrote:
> diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
> index fcb0872..7b60b8f 100644
> --- a/src/backend/access/transam/xlogreader.c
> +++ b/src/backend/access/transam/xlogreader.c
> @@ -10,9 +10,11 @@
> *
> * NOTES
> * See xlogreader.h for more notes on this facility.
> + *
> + * This file is compiled as both front-end and backend code, so it
> + * may not use ereport, server-defined static variables, etc.
> *-------------------------------------------------------------------------
> */
> -

Huh?

> #include "postgres.h"
>
> #include "access/transam.h"
> @@ -116,6 +118,11 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
> return NULL;
> }
>
> +#ifndef FRONTEND
> + /* Will be loaded on first read */
> + state->timelineHistory = NIL;
> +#endif
> +
> return state;
> }
>
> @@ -135,6 +142,10 @@ XLogReaderFree(XLogReaderState *state)
> pfree(state->errormsg_buf);
> if (state->readRecordBuf)
> pfree(state->readRecordBuf);
> +#ifndef FRONTEND
> + if (state->timelineHistory)
> + list_free_deep(state->timelineHistory);
> +#endif

Hm. So we don't support timelines following for frontend code, although
it'd be rather helpful for pg_xlogdump. And possibly pg_rewind.

> pfree(state->readBuf);
> pfree(state);
> }
> @@ -208,10 +219,11 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
>
> if (RecPtr == InvalidXLogRecPtr)
> {
> + /* No explicit start point; read the record after the one we just read */
> RecPtr = state->EndRecPtr;
>
> if (state->ReadRecPtr == InvalidXLogRecPtr)
> - randAccess = true;
> + randAccess = true; /* allow readPageTLI to go backwards */

randAccess is doing more than that, so I'm doubtful that comment is an
improvment.

> /*
> * RecPtr is pointing to end+1 of the previous WAL record. If we're
> @@ -223,6 +235,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
> else
> {
> /*
> + * Caller supplied a position to start at.
> + *
> * In this case, the passed-in record pointer should already be
> * pointing to a valid record starting position.
> */
> @@ -309,8 +323,10 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
> /* XXX: more validation should be done here */
> if (total_len < SizeOfXLogRecord)
> {
> - report_invalid_record(state, "invalid record length at %X/%X",
> - (uint32) (RecPtr >> 32), (uint32) RecPtr);
> + report_invalid_record(state,
> + "invalid record length at %X/%X: wanted %lu, got %u",
> + (uint32) (RecPtr >> 32), (uint32) RecPtr,
> + SizeOfXLogRecord, total_len);
> goto err;
> }
> gotheader = false;
> @@ -466,9 +482,7 @@ err:
> * Invalidate the xlog page we've cached. We might read from a different
> * source after failure.
> */
> - state->readSegNo = 0;
> - state->readOff = 0;
> - state->readLen = 0;
> + XLogReaderInvalCache(state);

I don't think that "cache" is the right way to describe this.

> #include <unistd.h>
>
> -#include "miscadmin.h"
> -

spurious change imo.

> /*
> - * TODO: This is duplicate code with pg_xlogdump, similar to walsender.c, but
> - * we currently don't have the infrastructure (elog!) to share it.
> + * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
> + * in timeline 'tli'.
> + *
> + * Will open, and keep open, one WAL segment stored in the static file
> + * descriptor 'sendFile'. This means if XLogRead is used once, there will
> + * always be one descriptor left open until the process ends, but never
> + * more than one.
> + *
> + * XXX This is very similar to pg_xlogdump's XLogDumpXLogRead and to XLogRead
> + * in walsender.c but for small differences (such as lack of elog() in
> + * frontend). Probably these should be merged at some point.
> */
> static void
> XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
> @@ -648,8 +657,12 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
> XLogRecPtr recptr;
> Size nbytes;
>
> + /*
> + * Cached state across calls.
> + */

One line?

> static int sendFile = -1;
> static XLogSegNo sendSegNo = 0;
> + static TimeLineID sendTLI = 0;
> static uint32 sendOff = 0;
>
> p = buf;
> @@ -664,11 +677,12 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
>
> startoff = recptr % XLogSegSize;
>
> - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
> + /* Do we need to open a new xlog segment? */
> + if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) ||
> + sendTLI != tli)
> {

s/open a new/open a different/? New imo has connotations that we don't
really want here.

> char path[MAXPGPATH];
>
> - /* Switch to another logfile segment */
> if (sendFile >= 0)
> close(sendFile);

E.g. you could just have moved the above comment.

> /* Need to seek in the file? */
> if (sendOff != startoff)
> {
> if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
> - {
> - char path[MAXPGPATH];
> -
> - XLogFilePath(path, tli, sendSegNo);
> -
> ereport(ERROR,
> (errcode_for_file_access(),
> errmsg("could not seek in log segment %s to offset %u: %m",
> - path, startoff)));
> - }
> + XLogFileNameP(tli, sendSegNo), startoff)));
> sendOff = startoff;
> }

Not a serious issue, more a general remark: I'm doubtful that going for
palloc in error situations is good practice. This will be allocated in
the current memory context; without access to the emergency error
reserves.

I'm also getting the feeling that the patch is bordering on doing some
relatively random cleanups mixed in with architectural changes. Makes
things a bit harder to review.

> +static void
> +XLogReadDetermineTimeline(XLogReaderState *state)
> +{
> + /* Read the history on first time through */
> + if (state->timelineHistory == NIL)
> + state->timelineHistory = readTimeLineHistory(ThisTimeLineID);
> +
> + /*
> + * Are we reading the record immediately following the one we read last
> + * time? If not, then don't use the cached timeline info.
> + */
> + if (state->currRecPtr != state->EndRecPtr)
> + {
> + state->currTLI = 0;
> + state->currTLIValidUntil = InvalidXLogRecPtr;
> + }

Hm. So we grow essentially a second version of the last end position and
the randAccess stuff in XLogReadRecord().

> + if (state->currTLI == 0)
> + {
> + /*
> + * Something changed; work out what timeline this record is on. We
> + * might read it from the segment on this TLI or, if the segment is
> + * also contained by newer timelines, the copy from a newer TLI.
> + */
> + state->currTLI = tliOfPointInHistory(state->currRecPtr,
> + state->timelineHistory);
> +
> + /*
> + * Look for the most recent timeline that's on the same xlog segment
> + * as this record, since that's the only one we can assume is still
> + * readable.
> + */
> + while (state->currTLI != ThisTimeLineID &&
> + state->currTLIValidUntil == InvalidXLogRecPtr)
> + {
> + XLogRecPtr tliSwitch;
> + TimeLineID nextTLI;
> +
> + tliSwitch = tliSwitchPoint(state->currTLI, state->timelineHistory,
> + &nextTLI);
> +
> + /* round ValidUntil down to start of seg containing the switch */
> + state->currTLIValidUntil =
> + ((tliSwitch / XLogSegSize) * XLogSegSize);
> +
> + if (state->currRecPtr >= state->currTLIValidUntil)
> + {
> + /*
> + * The new currTLI ends on this WAL segment so check the next
> + * TLI to see if it's the last one on the segment.
> + *
> + * If that's the current TLI we'll stop searching.

I don't really understand how we're stopping searching here?

> + */
> + state->currTLI = nextTLI;
> + state->currTLIValidUntil = InvalidXLogRecPtr;
> + }
> + }
> +}

XLogReadDetermineTimeline() doesn't sit quite right with me, I do wonder
whether there's not a simpler way to write this.

> +/*
> + * XLogPageReadCB callback for reading local xlog files
> *
> * Public because it would likely be very helpful for someone writing another
> * output method outside walsender, e.g. in a bgworker.
> *
> - * TODO: The walsender has it's own version of this, but it relies on the
> + * TODO: The walsender has its own version of this, but it relies on the
> * walsender's latch being set whenever WAL is flushed. No such infrastructure
> * exists for normal backends, so we have to do a check/sleep/repeat style of
> * loop for now.
> @@ -754,46 +897,88 @@ int
> read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
> int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
> {
> - XLogRecPtr flushptr,
> + XLogRecPtr read_upto,
> loc;
> int count;
>
> loc = targetPagePtr + reqLen;
> +
> + /* Make sure enough xlog is available... */
> while (1)
> {
> /*
> - * TODO: we're going to have to do something more intelligent about
> - * timelines on standbys. Use readTimeLineHistory() and
> - * tliOfPointInHistory() to get the proper LSN? For now we'll catch
> - * that case earlier, but the code and TODO is left in here for when
> - * that changes.
> + * Check which timeline to get the record from.
> + *
> + * We have to do it each time through the loop because if we're in
> + * recovery as a cascading standby, the current timeline might've
> + * become historical.
> */
> - if (!RecoveryInProgress())
> + XLogReadDetermineTimeline(state);
> +
> + if (state->currTLI == ThisTimeLineID)
> {
> - *pageTLI = ThisTimeLineID;
> - flushptr = GetFlushRecPtr();
> + /*
> + * We're reading from the current timeline so we might have to
> + * wait for the desired record to be generated (or, for a standby,
> + * received & replayed)
> + */
> + if (!RecoveryInProgress())
> + {
> + *pageTLI = ThisTimeLineID;
> + read_upto = GetFlushRecPtr();
> + }
> + else
> + read_upto = GetXLogReplayRecPtr(pageTLI);
> +
> + if (loc <= read_upto)
> + break;
> +
> + CHECK_FOR_INTERRUPTS();
> + pg_usleep(1000L);
> }
> else
> - flushptr = GetXLogReplayRecPtr(pageTLI);
> + {
> + /*
> + * We're on a historical timeline, so limit reading to the switch
> + * point where we moved to the next timeline.
> + */
> + read_upto = state->currTLIValidUntil;

Hm. Is it ok to not check GetFlushRecPtr/GetXLogReplayRecPtr() here? If
so, how come?

> - if (loc <= flushptr)
> + /*
> + * Setting pageTLI to our wanted record's TLI is slightly wrong;
> + * the page might begin on an older timeline if it contains a
> + * timeline switch, since its xlog segment will have been copied
> + * from the prior timeline. This is pretty harmless though, as
> + * nothing cares so long as the timeline doesn't go backwards. We
> + * should read the page header instead; FIXME someday.
> + */
> + *pageTLI = state->currTLI;
> +
> + /* No need to wait on a historical timeline */
> break;
> -
> - CHECK_FOR_INTERRUPTS();
> - pg_usleep(1000L);
> + }
> }
>
> - /* more than one block available */
> - if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
> + if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
> + {
> + /*
> + * more than one block available; read only that block, have caller
> + * come back if they need more.
> + */
> count = XLOG_BLCKSZ;
> - /* not enough data there */
> - else if (targetPagePtr + reqLen > flushptr)
> + }
> + else if (targetPagePtr + reqLen > read_upto)
> + {
> + /* not enough data there */
> return -1;
> - /* part of the page available */
> + }
> else
> - count = flushptr - targetPagePtr;
> + {
> + /* enough bytes available to satisfy the request */
> + count = read_upto - targetPagePtr;
> + }
>
> - XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ);
> + XLogRead(cur_page, *pageTLI, targetPagePtr, count);

When are we reading less than a page? That should afaik never be required.

> + /*
> + * We start reading xlog from the restart lsn, even though in
> + * CreateDecodingContext we set the snapshot builder up using the
> + * slot's candidate_restart_lsn. This means we might read xlog we
> + * don't actually decode rows from, but the snapshot builder might
> + * need it to get to a consistent point. The point we start returning
> + * data to *users* at is the candidate restart lsn from the decoding
> + * context.
> + */

Uh? Where are we using candidate_restart_lsn that way? I seriously doubt
it is - candidate_restart_lsn is about a potential future restart_lsn,
which we can set once we get reception confirmation from the client.

> @@ -299,6 +312,18 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
> CHECK_FOR_INTERRUPTS();
> }
>
> + /* Make sure timeline lookups use the start of the next record */
> + startptr = ctx->reader->EndRecPtr;

Huh? startptr isn't used after this, so I'm not sure what this even
means?

> + /*
> + * The XLogReader will read a page past the valid end of WAL because
> + * it doesn't know about timelines. When we switch timelines and ask
> + * it for the first page on the new timeline it will think it has it
> + * cached, but it'll have the old partial page and say it can't find
> + * the next record. So flush the cache.
> + */
> + XLogReaderInvalCache(ctx->reader);
> +

dito.

> diff --git a/src/test/modules/decoding_failover/decoding_failover.c b/src/test/modules/decoding_failover/decoding_failover.c
> new file mode 100644
> index 0000000..669e6c4
> --- /dev/null
> +++ b/src/test/modules/decoding_failover/decoding_failover.c

> +
> +/*
> + * Create a new logical slot, with invalid LSN and xid, directly. This does not
> + * use the snapshot builder or logical decoding machinery. It's only intended
> + * for creating a slot on a replica that mirrors the state of a slot on an
> + * upstream master.
> + *
> + * You should immediately decoding_failover_advance_logical_slot(...) it
> + * after creation.
> + */

Uh. I doubt we want this, even if it's formally located in
src/test/modules. These comments make it appear not to be only intended
for that, and I have serious doubts about the validity of the concept as
is.

This seems to need some more polishing.

Greetings,

Andres Freund

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Ashutosh Bapat 2016-03-15 10:44:08 Re: Re: [COMMITTERS] pgsql: Only try to push down foreign joins if the user mapping OIDs mat
Previous Message Oleg Bartunov 2016-03-15 08:13:13 Re: [PATCH] we have added support for box type in SP-GiST index