Re: [PATCH] Logical decoding timeline following take II

From: Craig Ringer <craig(at)2ndquadrant(dot)com>
To: Andres Freund <andres(at)anarazel(dot)de>
Cc: Petr Jelinek <petr(at)2ndquadrant(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: [PATCH] Logical decoding timeline following take II
Date: 2016-11-16 04:44:07
Message-ID: CAMsr+YGHLjU+Jd4WFvpBp5xejnfktoj5fBj=YzmFUmV7yLnm3A@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On 12 November 2016 at 23:07, Andres Freund <andres(at)anarazel(dot)de> wrote
> On 2016-10-24 17:49:13 +0200, Petr Jelinek wrote:
>> + * Determine which timeline to read an xlog page from and set the
>> + * XLogReaderState's state->currTLI to that timeline ID.
>
> "XLogReaderState's state->currTLI" - the state's a bit redundant.

Thanks for taking a look at the patch.

Agreed re above, fixed.

You know, having returned to this work after a long break doing other
things, it's clear that so much of the same work is being done in
XLogSendPhysical(...) and walsender.c's XLogRead(...) that maybe it is
worth facing the required refactoring so that we can use that in
logical decoding from both the walsender and the SQL interface.

The approach I originally took focused above all else on being
minimally intrusive, rather than clean and clear. Maybe it's better to
tidy this up instead.

Instead of coupling timeline following in logical decoding to the
XLogReader struct and having effectively duplicated logic to that for
physical walsender, move the walsender.c globals

static TimeLineID sendTimeLine = 0;
static TimeLineID sendTimeLineNextTLI = 0;
static bool sendTimeLineIsHistoric = false;
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;

into new strut in timeline.h and move the logic into timeline.c . So
we stop relying on so many magic globals in the walsender. Don't keep
this state as a global, instead init it in StartReplication and pass
it as a param to WalSndLoop. For logical decoding from walsender,
store the walsender's copy in the XLogReaderState. For logical
decoding from SQL, set it up in pg_logical_slot_get_changes_guts(...)
and again store it in XLogReaderState.

In the process, finally unify the two XLogRead(...) functions in
xlogutils.c and walsender.c and merge walsender's
logical_read_xlog_page(...) with xlogutils'
logical_read_xlog_page(...) . Sure, we can't rely on a normal
backend's latch being set when wal is written like we can for the
walsender, but do we have to duplicate everything else? Can always add
a miscadmin.h var like IsWALSender and test that to see if we can
expect to have our latch set when new WAL comes in and adjust our
latch wait timeout interval appropriately.

The downside is of course that it touches physical replication, unlike
the current approach which avoids touching anything that could affect
physical replication at the cost of increasing the duplication between
physical and logical replication logic.

>> + * The caller must also make sure it doesn't read past the current redo pointer
>> + * so it doesn't fail to notice that the current timeline became historical.
>> + */
>
> Not sure what that means? The redo pointer usually menas the "logical
> start" of the last checkpoint, but I don't see how thta could be meant
> here?

What I was trying to say was the current replay position on a standby. Amended.

>> +static void
>> +XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
>> +{
>> + const XLogRecPtr lastReadPage = state->readSegNo * XLogSegSize + state->readOff;
>> +
>> + elog(DEBUG4, "Determining timeline for read at %X/%X+%X",
>> + (uint32)(wantPage>>32), (uint32)wantPage, wantLength);
>
> Doing this on every single read from a page seems quite verbose. It's
> also (like most or all the following debug elogs) violating the casing
> prescribed by the message style guidelines.

Agreed. It's unnecessary now. It's the sort of thing I'd want to keep
to have if Pg had fine grained module level log control, but we don't.

>> + /*
>> + * If we're reading from the current timeline, it hasn't become historical
>> + * and the page we're reading is after the last page read, we can again
>> + * just carry on. (Seeking backwards requires a check to make sure the older
>> + * page isn't on a prior timeline).
>> + */
>
> How can ThisTimeLineID become historical?

It can't, right now. Though I admit I didn't realise that at the time.
Presently ThisTimeLineID is only set in the walsender by
GetStandbyFlushRecPtr as called by IdentifySystem, and in
StartReplication, but not afterwards for logical replication, so
logical rep won't notice timeline transitions when on a standby unless
a decoding session is restarted. We don't support decoding sessions in
recovery, so it can't happen yet.

It will, when we're on a cascading standby and the upstream is
promoted, once we support logical decoding on standby. As part of that
we'll need to maintain the timeline in the walsender in logical
decoding like we do in physical, and limit logical decoding to the
currently replayed position with something like walsender's
GetStandbyFlushRecPtr(). But usable form the SQL interface too, of
course.

I'm currently implementing logical decoding on standby on top of this
and the catalog_xmin feedback patch, and in the process will be adding
tests for logical decoding on a physical replica where its upstream is
promoted from cascading standby to master, so I'll nail down any
issues for sure in that process.

I think it makes sense to consider this patch as part of the same set
as catalog_xmin feedback and decoding on standby, really. Without
those it's hard to come up with good real world tests and it's not
very useful. As you've correctly noted, the bundled tests are
contrived in the extreme.

Despite that, I've attached a revised version of the current approach
incorporating fixes for the issues you mentioned. It's preceded by the
patch to add an --endpos option to pg_recvlogical so that we can
properly test the walsender interface too.

>> + if (state->currTLI == ThisTimeLineID && wantPage >= lastReadPage)
>> + {
>> + Assert(state->currTLIValidUntil == InvalidXLogRecPtr);
>> + elog(DEBUG4, "On current timeline");
>> + return;
>> + }
>
> Also, is it actually ok to rely on ThisTimeLineID here? That's IIRC not
> maintained outside of the startup process, until recovery ended (cf it
> being set in InitXLOGAccess() called via RecoveryInProgress()).

We don't yet allow decoding in recovery, so yes.

We can be reading xlog from a prior timeline to the server's current
timeline, since we're replaying xlog generated on the old master
before promotion. That's when this test fails and we carry on. Amended
for clarity.

Added this to func header comment, hopefully helps:

* It's necessary to care about timelines in xlogreader and logical decoding
* when we might be reading xlog generated prior to a promotion, either if
* we're currently a standby in recovery or if we're a promoted master reading
* xlogs generated by the old master before our promotion. Notably, logical
* decoding on a standby needs to be able to replay any remaining pending data
* from the old timeline when the standby or one of its upstreams being
* promoted.

>> /*
>> - * TODO: we're going to have to do something more intelligent about
>> - * timelines on standbys. Use readTimeLineHistory() and
>> - * tliOfPointInHistory() to get the proper LSN? For now we'll catch
>> - * that case earlier, but the code and TODO is left in here for when
>> - * that changes.
>> + * Check which timeline to get the record from.
>> + *
>> + * We have to do it each time through the loop because if we're in
>> + * recovery as a cascading standby, the current timeline might've
>> + * become historical.
>> */
>
> I guess you mean cascading as "replicating logically from a physical
> standby"? I don't think it's good to use cascading here, because that's
> usually thought to mean doing physical SR from a standby...

I actually do mean cascading physical standby here.

If we're a physical standby and get promoted. IsInRecovery() becomes
false. Fine. But if the timeline can also change while we remain in
recovery, if a configuration like

A => B => C -> L

exists, where => is a physical replication link and -> is a logical
decoding session.

If A dies and B is promoted, C's timeline will change.

Comment now explains this.

>> diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
>> index 318726e..4315fb3 100644
>> --- a/src/backend/replication/logical/logicalfuncs.c
>> +++ b/src/backend/replication/logical/logicalfuncs.c
>> @@ -234,12 +234,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
>> rsinfo->setResult = p->tupstore;
>> rsinfo->setDesc = p->tupdesc;
>>
>> - /* compute the current end-of-wal */
>> - if (!RecoveryInProgress())
>> - end_of_wal = GetFlushRecPtr();
>> - else
>> - end_of_wal = GetXLogReplayRecPtr(NULL);
>> -
>> ReplicationSlotAcquire(NameStr(*name));
>>
>> PG_TRY();
>> @@ -279,6 +273,12 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
>> /* invalidate non-timetravel entries */
>> InvalidateSystemCaches();
>>
>> + if (!RecoveryInProgress())
>> + end_of_wal = GetFlushRecPtr();
>> + else
>> + end_of_wal = GetXLogReplayRecPtr(NULL);
>> +
>> + /* Decode until we run out of records */
>> while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
>> (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
>> {
>
> That seems like a pretty random change?

You're right. It's a change from when the timeline determination logic
happened inside pg_logical_slot_get_changes_guts that I didn't realise
had become irrelevant now.

Reverted, and in the process:

- end_of_wal = GetXLogReplayRecPtr(NULL);
+ end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);

so we maintain the current timeline when called in recovery, where
it's not inherited from the startup process.

>> diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
>> index deaa7f5..caff9a6 100644
>> --- a/src/include/access/xlogreader.h
>> +++ b/src/include/access/xlogreader.h
>> @@ -27,6 +27,10 @@
>>
>> #include "access/xlogrecord.h"
>>
>> +#ifndef FRONTEND
>> +#include "nodes/pg_list.h"
>> +#endif
>
> Why is that needed?

It isn't anymore. Thanks for the catch. It was used when the timeline
history had to be looked up more frequently, so it was kept in the
xlogreader state. Since pg_xlogdump uses the xlogreader this meant it
had to be conditional on backend use. Now that the timeline history
only has to be looked up when first examining the segment in which the
timeline transition occurs there's no point caching it there anymore.

(I'd still like to teach pg_xlogdump to follow timelines but that's a "later").

>> diff --git a/src/test/modules/test_slot_timelines/README b/src/test/modules/test_slot_timelines/README

> I still don't want to have this code in postgres, it's just an example
> for doing something bad. Lets get proper feedback control in, and go
> from there.

100% agree, and I'm going to cut it from the patch since I'm
submitting updated feedback control now and have a PoC of decoding on
standby too.

--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

Attachment Content-Type Size
0001-Add-an-optional-endpos-LSN-argument-to-pg_recvlogica.patch text/x-patch 10.6 KB
0002-Add-a-pg_recvlogical-wrapper-to-PostgresNode.patch text/x-patch 5.5 KB
0003-Follow-timeline-switches-in-logical-decoding.patch text/x-patch 22.4 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Craig Ringer 2016-11-16 04:48:03 Document how to set up TAP tests for Perl 5.8.8
Previous Message Mithun Cy 2016-11-16 04:31:54 Re: Patch: Implement failover on libpq connect level.