Re: Switching timeline over streaming replication

From: Heikki Linnakangas <hlinnakangas(at)vmware(dot)com>
To: Amit kapila <amit(dot)kapila(at)huawei(dot)com>
Cc: 'PostgreSQL-development' <pgsql-hackers(at)postgreSQL(dot)org>
Subject: Re: Switching timeline over streaming replication
Date: 2012-10-02 10:50:43
Message-ID: 506AC703.40708@vmware.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Thanks for the thorough review! I committed the xlog.c refactoring patch
now. Attached is a new version of the main patch, comments on specific
points below. I didn't adjust the docs per your comments yet, will do
that next.

On 01.10.2012 05:25, Amit kapila wrote:
> 1. In function readTimeLineHistory(),
> two mechanisms are used to fetch timeline from history file
> + sscanf(fline, "%u\t%X/%X", &tli, &switchpoint_hi,
> &switchpoint_lo);
> +
> + /* expect a numeric timeline ID as first field of line */
> + tli = (TimeLineID) strtoul(ptr, &endptr, 0);
> If we use new mechanism, it will not be able to detect error as it is
> doing in current case.

Fixed, by checking the return value of sscanf().

> 2. In function readTimeLineHistory(),
> + fd = AllocateFile(path, "r");
> + if (fd == NULL)
> + {
> + if (errno != ENOENT)
> + ereport(FATAL,
> + (errcode_for_file_access(),
> + errmsg("could not open file
> \"%s\": %m", path)));
> + /* Not there, so assume no parents */
> + return list_make1_int((int) targetTLI);
> + }
> still return list_make1_int((int) targetTLI); is used.

Fixed.

> 3. Function timelineOfPointInHistory(), should return the timeline of recptr
> passed to it.
> a. is it okay to decide based on xlog recordpointer that which timeline
> it belongs to, as different
> timelines can have same xlog recordpointer?

In a particular timeline, the history is linear, and a given point in
WAL unambiguously has one timeline ID. There might be some other
timelines that branch off at different points, but once you pick a
particular timeline, you can unambiguously trace it all the way to the
beginning of WAL, and tell what the timeline ID of each point in WAL was.

> b. it seems from logic that it will return timeline previous to the
> timeline of recptr passed.
> For example if the timeline 3's switchpoint is equal to recptr passed
> then it will return timeline 2.

I expanded the comment in the function a bit, I hope it makes more sense
now.

> 4. In writeTimeLineHistory function variable endTLI is never used.

Removed.

> 5. In header of function writeTimeLineHistory(), can give explanation about
> XLogRecPtr switchpoint

Added.

> 6. @@ -6869,11 +5947,35 @@ StartupXLOG(void)
> */
> if (InArchiveRecovery)
> {
> + char reason[200];
> +
>
>
> + /*
> + * Write comment to history file to explain why and where
> timeline
> + * changed. Comment varies according to the recovery target
> used.
> + */
> + if (recoveryTarget == RECOVERY_TARGET_XID)
> + snprintf(reason, sizeof(reason),
> + "%s transaction %u",
> + recoveryStopAfter ? "after" :
> "before",
> + recoveryStopXid);
>
> In the comment above this line you mentioned why and where timeline changed.
>
> However in the reason field only specifies about where part.

I didn't change this in the patch. I guess it's not obvious, but you can
deduce the 'why' part from the message.

> 7. + * Returns the redo pointer of the "previous" checkpoint.
> +GetOldestRestartPoint(XLogRecPtr *oldrecptr, TimeLineID *oldtli)
> +{
> + if (InRedo)
> + {
> + LWLockAcquire(ControlFileLock, LW_SHARED);
> + *oldrecptr = ControlFile->checkPointCopy.redo;
> + *oldtli = ControlFile->checkPointCopy.ThisTimeLineID;
> + LWLockRelease(ControlFileLock);
> + }
>
> a. In this function, is it required to take ControlFileLock as earlier also
> there was no lock to protect this read
> when it get called from RestoreArchivedFile, and I think at this point no
> one else can modify these values.
> However for code consistency purpose like whenever or wherever read the
> controlfile values, read it with read lock.

Yeah, it's just for the sake of consistency.

> b. As per your comment it should have returned "previous" checkpoint,
> however the function returns values of
> latest checkpoint.

Changed the comment. I wonder if we should be more conservative, and
really keep the WAL back to the "previous" checkpoint, but I won't
change that as part of this patch.

> 8. In function writeTimeLineHistoryFile(), will it not be better to directly
> write rather than to later do pg_fsync().
> as it is just one time write.

Not sure I understood this right, but writeTimeLineHistoryFile() needs
to avoid putting a corrupt, e.g incomplete, file in pg_xlog. The same as
writeTimeLineHistory(). That's why the write+fsync+rename dance is needed.

> 9. +XLogRecPtr
> +timeLineSwitchPoint(XLogRecPtr startpoint, TimeLineID tli)
> ..
> ..
> + * starting point. This is because the client can
> legimately
> spelling of legitimately needs to be corrected.

Fixed.

> 10.+XLogRecPtr
> +timeLineSwitchPoint(XLogRecPtr startpoint, TimeLineID tli)
> ..
> ..
> + if (tli < ThisTimeLineID)
> + {
> + if (!nexttle)
> + elog(ERROR, "could not find history entry for child
> of timeline %u", tli); /* shouldn't happen */
> + }
>
> I don't understand the meaning of the above check, as I think this
> situation can occur when this function gets called from StartReplication,
> because always tli sent by standby to new master will be less than
> ThisTimeLineID and it can be first in list.

Note that the list is in newest-first order. Ie. the last line in the
history file is first in the list. The first entry in the list is always
for ThisTimeLineID, which is why the (tli < ThisTimeLineID && !nexttle)
combination isn't possible.

> 11. In function readTimeLineHistory()
> ereport(DEBUG3,
> (errmsg_internal("history of timeline %u is %s",
> targetTLI, nodeToString(result))));
>
>
> Don't nodeToString(result) needs to be changed as it contain list of structure TimeLineHistoryEntry

Yep. Since this is just a DEBUG3, I'll just remove that, rather than add
the extra code needed to keep the output.

> 12. In function @@ -3768,6 +3773,8 @@ rescanLatestTimeLine(void)
> + * The current timeline must be found in the history file, and the
> + * next timeline must've forked off from it *after* the current
> + * recovery location.
> */
> - if (!list_member_int(newExpectedTLIs,
> - (int) recoveryTargetTLI))
> - ereport(LOG,
> - (errmsg("new timeline %u is not a child of database system timeline %u",
> - newtarget,
> - ThisTimeLineID)));
>
>
> is there any logic in the current patch which ensures that above check is not require now?
>
>
> 13. In function @@ -3768,6 +3773,8 @@ rescanLatestTimeLine(void)
> + found = false;
> + foreach (cell, newExpectedTLIs)
> ..
> ..
> - list_free(expectedTLIs);
> + list_free_deep(expectedTLIs);
> whats the use of the found variable and freeing expectedTLIs in loop might cause problem.

Oops, there's some code missing there. Apparently I botched that at some
point while splitting the patch into two. Fixed.

> 14. In function @@ -3768,6 +3773,8 @@ rescanLatestTimeLine(void)
> newExpectedTLIs = readTimeLineHistory(newtarget);
> Shouldn't this variable be declared as newExpectedTLEs as the list returned by readTimeLineHistory contains target list entry
>
>
> 15. StartupXLOG
> /* Now we can determine the list of expected TLIs */
> expectedTLIs = readTimeLineHistory(recoveryTargetTLI);
>
>
> Should expectedTLIs be changed to expectedTLEs as the list returned by readTimeLineHistory contains target list entry

Makes sense, renamed these two.

> 16.@@ -5254,8 +5252,8 @@ StartupXLOG(void)
> writeTimeLineHistory(ThisTimeLineID, recoveryTargetTLI,
> - curFileTLI, endLogSegNo, reason);
> + curFileTLI, EndRecPtr, reason);
> if endLogSegNo is not used here, it needs to be removd from function declaration as well.

I didn't understand this one. endLogSegNo is still used earlier in
StartupXLOG.

> 17.@@ -5254,8 +5252,8 @@ StartupXLOG(void)
> if (InArchiveRecovery)
> ..
> ..
> +
> + /* The history file can be archived immediately. */
> + TLHistoryFileName(histfname, ThisTimeLineID);
> + XLogArchiveNotify(histfname);
>
>
> Shouldn't this be done archive_mode is on. Right now InArchiveRecovery is true even when we do recovery for standby

Fixed.

> 18. +static bool
> +WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, bool fetching_ckpt)
> {
> ..
> + if (XLByteLT(RecPtr, receivedUpto))
> + havedata = true;
> + else
> + {
> + XLogRecPtr latestChunkStart;
> +
> + receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart,&receiveTLI);
> + if (XLByteLT(RecPtr, receivedUpto)&& receiveTLI == curFileTLI)
> + {
> + havedata = true;
> + if (!XLByteLT(RecPtr, latestChunkStart))
> + {
> + XLogReceiptTime = GetCurrentTimestamp();
> + SetCurrentChunkStartTime(XLogReceiptTime);
> + }
> + }
> + else
> + havedata = false;
> + }
>
>
> In the above logic, it seems there is inconsistency in setting havedata = true;
> In the above code in else loop, let us say cond. receiveTLI == curFileTLI is false but XLByteLT(RecPtr, receivedUpto) is true,
> then in next round in for loop, the check if (XLByteLT(RecPtr, receivedUpto)) will get true and will set havedata = true;
> which seems to be contradictory.

Hmm, I think you're saying that we should check that receiveTLI ==
curFileTLI also in the first if-statement above. Did that.

> 19.
>
>
> +static bool
> +WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, bool fetching_ckpt)
> {
> ..
> + if (PrimaryConnInfo)
> + {
> + XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr;
> + TimeLineID tli = timelineOfPointInHistory(ptr, expectedTLIs);
> +
> + if (tli< curFileTLI)
>
>
> I think in some cases if (tli< curFileTLI) might not make sense, as for case where curFileTLI =0 for randAccess.

Well, if curFileTLI == 0, then it's surely < tli. I think that's the
correct behavior, but I'll add an explicit check for randAccess to make
it more explicit.

> 20. Function name WaitForWALToBecomeAvailable suggests that it waits for WAL, but it also returns true when trigger file is present,
> which can be little misleading.

Added a comment above the function to clarify that.

> 21. @@ -2411,27 +2411,6 @@ reaper(SIGNAL_ARGS)
>
> a. won't it impact stop of online basebackup functionality because earlier on promotion
> I think this code will stop walsenders and basebackup stop will also give error in such cases.

Hmm, should a base backup be aborted when the standby is promoted? Does
the promotion render the backup corrupt?

> 22. @@ -63,10 +66,17 @@ void
> _PG_init(void)
> {
> /* Tell walreceiver how to reach us */
> - if (walrcv_connect != NULL || walrcv_receive != NULL ||
> - walrcv_send != NULL || walrcv_disconnect != NULL)
> + if (walrcv_connect != NULL || walrcv_identify_system ||
> + walrcv_readtimelinehistoryfile != NULL ||
>
>
> check for walrcv_identify_system != NULL is missing.

Fixed.

> 23. write the function header for newly added functions (libpqrcv_startstreaming, libpqrcv_identify_system, ..)

Fixed.

> 24. In header of function libpqrcv_receive(), *type needs to be removed.
> + * If data was received, returns the length of the data. *type and *buffer

Fixed.

> 25.
> +timeline_history:
> + K_TIMELINE_HISTORY ICONST
> + {
> + TimeLineHistoryCmd *cmd;
> +
> + cmd = makeNode(TimeLineHistoryCmd);
> + cmd->timeline = $2;
>
>
> can handle invalid timeline error same as for opt_timeline

Fixed.

> 26.@@ -170,6 +187,7 @@ WalReceiverMain(void)
> + case WALRCV_WAITING:
> + case WALRCV_STREAMING:
> /* Shouldn't happen */
> elog(PANIC, "walreceiver still running according to shared memory state");
> elog message should be changed according to new states.

I think it's ok as is. Both 'waiting' and 'streaming' can be thought of
as 'running'. WalRcvRunning() also returns true for both states.

> 27.@@ -259,8 +281,11 @@ WalReceiverMain(void)
>
> /* Load the libpq-specific functions */
> load_file("libpqwalreceiver", false);
> - if (walrcv_connect == NULL || walrcv_receive == NULL ||
> - walrcv_send == NULL || walrcv_disconnect == NULL)
> + if (walrcv_connect == NULL || walrcv_startstreaming == NULL ||
> + walrcv_endstreaming == NULL ||
> + walrcv_readtimelinehistoryfile == NULL ||
> + walrcv_receive == NULL || walrcv_send == NULL ||
> + walrcv_disconnect == NULL)
>
>
> check for walrcv_identify_system is missing.

Fixed.

> 28.
> +/*
> + * Check that we're connected to a valid server using the IDENTIFY_SERVER
> + * replication command, and fetch any timeline history files present in the
> + * master but missing from this server's pg_xlog directory.
> + */
> +static void
> +WalRcvHandShake(TimeLineID startpointTLI)
>
>
> In the function header the command name should be IDENTIFY_SYSTEM instead of IDENTIFY_SERVER

Fixed.

> 29. @@ -170,6 +187,7 @@ WalReceiverMain(void)
> + * timeline. In case we've already reached the end of the old timeline,
> + * the server will finish the streaming immediately, and we will
> + * disconnect. If recovery_target_timeline is 'latest', the startup
> + * process will then pg_xlog and find the new history file, bump recovery
>
>
> a. I think after reaching at end of old timeline rather than discoonect it will start from new timeline,
to> which will be updated by startup process.
> b. The above line seems to be incorrect, "will then (scan/search) pg_xlog"

Fixed the comment.

> 30. +static void
> +WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
>
>
> /*
> + * nudge startup process to notice that we've stopped streaming and are
> + * now waiting for instructions.
> + */
> + WakeupRecovery();
> for (;;)
> {
>
>
> In this for loop don't we need to check interrupts or postmaster alive or recovery in progress
> so that if any other process continues, it should not wait indefinately.

Added a ProcessWalRcvInterrupts() check. There is a PostmasterIsAlive()
check there already.

> 31.+static void
> +WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
>
>
> /*
> + * nudge startup process to notice that we've stopped streaming and are
> + * now waiting for instructions.
> + */
> + WakeupRecovery();
> for (;;)
> {
>
>
> + SpinLockAcquire(&walrcv->mutex);
> + if (walrcv->walRcvState == WALRCV_STARTING)
> {
>
>
> I think it can reach WALRCV_STOPPING state also after WALRCV_WAITING from shutdown,
> so we should check for that state as well.

Added that, although we should receive the SIGTERM when the startup
process wants the walreceiver to die.

> 32.@@ -170,6 +187,7 @@ WalReceiverMain(void)
> {
> ..
> ..
>
>
> + elog(LOG, "walreceiver ended streaming and awaits new instructions");
> + WalRcvWaitForStartPosition(&startpoint,&startpointTLI);
>
>
> a. After getting new startpoint and tli, it will go again for WalRcvHandShake(startpointTLI);
> so here chances are there, it will again fetch the history files from server which we have
> already fetched.

WalRcvHandShake() only fetches history files that don't exist in pg_xlog
already.

> b. Also Identify_system command will run again and get the information such as system identifier,
> which is completely redundant at this point.
> It fetches tli from primary also which I think can't be changed from what earlier it has fetched.

The server's tli might've changed in cascading replication, where the
server is also a standby running with recovery_target_timeline='latest'.
It's fairly unlikely if we just ran the Identify_system command, but I'm
inclined to keep that. One extra roundtrip isn't that bad, and I think
it'd complicate the logic to try to avoid that.

> 33. .@@ -170,6 +187,7 @@ WalReceiverMain(void)
> + for (;;)
> + {
> + if (len> 0)
> + XLogWalRcvProcessMsg(buf[0],&buf[1], len - 1);
> + else if (len == 0)
> + break;
> + else if (len< 0)
> + {
> + ereport(LOG,
> + (errmsg("replication terminated by primary server"),
> + errdetail("End of WAL reached on timeline %u", startpointTLI)));
> + endofwal = true;
> + break;
> + }
> + len = walrcv_receive(0,&buf);
> + }
> +
> + /* Let the master know that we received some data. */
> + XLogWalRcvSendReply();
> +
> + /*
> + * If we've written some records, flush them to disk and let the
> + * startup process and primary server know about them.
> + */
> + XLogWalRcvFlush(false);
>
>
> a. In the above code in for loop, when it breaks due to len< 0, there is no need to send reply to master.

Well, I think it's prudent to send one more reply at the end of streaming.

> b. also when it breaks due to len< 0, there can be 2 reasons, one is end of copy mode or primary server has
> disconnected. I think in second case handling should be same as what without this feature.
> Not sure if its eventually turning out to be same.

No, libpq_receive() will throw an error if the connection is broken for
some reason. It only returns -1 at end-of-copy.

> 34.
> +bool
> +WalRcvStreaming(void)
> +{
>
>
> In this function, right now if state is WALRCV_WAITING, then it will return false.
> I think waiting is better than starting for the matter of checking if walreceiver is in progress.
> or is state WALRCV_WAITING anytime expected when this function is called, if not then we log the
> error for invalid state.

It's normal to call WalRcvStreaming() when it's in waiting mode.
WalRcvStreaming() is called from WaitForWALToBecomeAvailable, always,
regardless of whether streaming replication is even enabled.

Thanks again for the detailed review!

- Heikki

Attachment Content-Type Size
streaming-tli-switch-3.patch.gz application/x-gzip 29.6 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Pavel Stehule 2012-10-02 11:09:07 Re: ToDo: allow to get a number of processed rows by COPY statement [Review of Patch]
Previous Message Magnus Hagander 2012-10-02 10:13:51 Re: small LDAP error message change