Re: Reviving the "Stopping logical replication protocol" patch from Vladimir Gordichuk

From: Dave Cramer <davecramer(at)gmail(dot)com>
To: Andres Freund <andres(at)anarazel(dot)de>
Cc: Dmitry Dolgov <9erthalion6(at)gmail(dot)com>, Craig Ringer <craig(at)2ndquadrant(dot)com>, Vladimir Gordiychuk <folyga(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Reviving the "Stopping logical replication protocol" patch from Vladimir Gordichuk
Date: 2019-02-16 20:38:03
Message-ID: CADK3HH+WcV+YbpQf7ZAC8BsySw90vwS=8cGx-GdBF_=GOeOFcg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Andres,

Thanks for looking at this. FYI, I did not originally write this, rather
the original author has not replied to requests.
JDBC could use this, I assume others could as well.

That said I'm certainly open to suggestions on how to do this.

Craig, do you have any other ideas?

Dave Cramer

On Fri, 15 Feb 2019 at 22:01, Andres Freund <andres(at)anarazel(dot)de> wrote:

> Hi,
>
> On 2018-12-03 06:38:43 -0500, Dave Cramer wrote:
> > From 4d023cfc1fed0b5852b4da1aad6a32549b03ce26 Mon Sep 17 00:00:00 2001
> > From: Dave Cramer <davecramer(at)gmail(dot)com>
> > Date: Fri, 30 Nov 2018 18:23:49 -0500
> > Subject: [PATCH 1/5] Respect client initiated CopyDone in walsender
> >
> > ---
> > src/backend/replication/walsender.c | 36
> ++++++++++++++++++++++++++++++------
> > 1 file changed, 30 insertions(+), 6 deletions(-)
> >
> > diff --git a/src/backend/replication/walsender.c
> b/src/backend/replication/walsender.c
> > index 46edb52..93f2648 100644
> > --- a/src/backend/replication/walsender.c
> > +++ b/src/backend/replication/walsender.c
> > @@ -770,6 +770,14 @@ logical_read_xlog_page(XLogReaderState *state,
> XLogRecPtr targetPagePtr, int req
> > sendTimeLineValidUpto = state->currTLIValidUntil;
> > sendTimeLineNextTLI = state->nextTLI;
> >
> > + /*
> > + * If the client sent CopyDone while we were waiting,
> > + * bail out so we can wind up the decoding session.
> > + */
> > + if (streamingDoneSending)
> > + return -1;
> > +
> > + /* more than one block available */
> > /* make sure we have enough WAL available */
> > flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
> >
> > @@ -1341,8 +1349,12 @@ WalSndWaitForWal(XLogRecPtr loc)
> > * It's important to do this check after the recomputation
> of
> > * RecentFlushPtr, so we can send all remaining data
> before shutting
> > * down.
> > - */
> > - if (got_STOPPING)
> > + *
> > + * We'll also exit here if the client sent CopyDone
> because it wants
> > + * to return to command mode.
> > + */
> > +
> > + if (got_STOPPING || streamingDoneReceiving)
> > break;
> >
> > /*
> > @@ -2095,7 +2107,14 @@ WalSndCheckTimeOut(void)
> > }
> > }
> >
> > -/* Main loop of walsender process that streams the WAL over Copy
> messages. */
> > +/*
> > + * Main loop of walsender process that streams the WAL over Copy
> messages.
> > + *
> > + * The send_data callback must enqueue complete CopyData messages to
> libpq
> > + * using pq_putmessage_noblock or similar, since the walsender loop may
> send
> > + * CopyDone then exit and return to command mode in response to a client
> > + * CopyDone between calls to send_data.
> > + */
>
> Wait, how is it ok to end CopyDone before all the pending data has been
> sent out?
>
>
>
> > diff --git a/src/backend/replication/logical/reorderbuffer.c
> b/src/backend/replication/logical/reorderbuffer.c
> > index 23466ba..66b6e90 100644
> > --- a/src/backend/replication/logical/reorderbuffer.c
> > +++ b/src/backend/replication/logical/reorderbuffer.c
> > @@ -1497,7 +1497,9 @@ ReorderBufferCommit(ReorderBuffer *rb,
> TransactionId xid,
> > rb->begin(rb, txn);
> >
> > iterstate = ReorderBufferIterTXNInit(rb, txn);
> > - while ((change = ReorderBufferIterTXNNext(rb, iterstate))
> != NULL)
> > + while ((change = ReorderBufferIterTXNNext(rb, iterstate))
> != NULL &&
> > + (rb->continue_decoding_cb == NULL ||
> > + rb->continue_decoding_cb()))
> > {
> > Relation relation = NULL;
> > Oid reloid;
>
> > @@ -1774,8 +1776,11 @@ ReorderBufferCommit(ReorderBuffer *rb,
> TransactionId xid,
> > ReorderBufferIterTXNFinish(rb, iterstate);
> > iterstate = NULL;
> >
> > - /* call commit callback */
> > - rb->commit(rb, txn, commit_lsn);
> > + if (rb->continue_decoding_cb == NULL ||
> rb->continue_decoding_cb())
> > + {
> > + /* call commit callback */
> > + rb->commit(rb, txn, commit_lsn);
> > + }
>
>
> I'm doubtful it's ok to simply stop in the middle of a transaction.
>
>
>
> > @@ -1194,17 +1212,6 @@ WalSndWriteData(LogicalDecodingContext *ctx,
> XLogRecPtr lsn, TransactionId xid,
> >
> > CHECK_FOR_INTERRUPTS();
> >
> > - /* Try to flush pending output to the client */
> > - if (pq_flush_if_writable() != 0)
> > - WalSndShutdown();
> > -
> > - /* Try taking fast path unless we get too close to walsender
> timeout. */
> > - if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
> > -
> wal_sender_timeout / 2) &&
> > - !pq_is_send_pending())
> > - {
> > - return;
> > - }
>
> As somebody else commented on the thread, I'm also doubtful this is
> ok. This'll introduce significant additional blocking unless I'm missing
> something?
>
>
>
> > /* If we have pending write here, go to slow path */
> > for (;;)
> > @@ -1358,7 +1365,14 @@ WalSndWaitForWal(XLogRecPtr loc)
> > break;
> >
> > /*
> > - * We only send regular messages to the client for full
> decoded
> > + * If we have received CopyDone from the client, sent
> CopyDone
> > + * ourselves, it's time to exit streaming.
> > + */
> > + if (!IsStreamingActive()) {
> > + break;
> > + }
>
> Wrong formatting.
>
>
> I wonder if the saner approach here isn't to support query cancellations
> or something of that vein, and then handle the error.
>
> Greetings,
>
> Andres Freund
>

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Tom Lane 2019-02-16 21:14:49 Re: Early WIP/PoC for inlining CTEs
Previous Message Andres Freund 2019-02-16 20:31:05 Re: 2019-03 CF Summary / Review - Tranche #2