Re: Reviving the "Stopping logical replication protocol" patch from Vladimir Gordichuk

From: Andres Freund <andres(at)anarazel(dot)de>
To: Dave Cramer <davecramer(at)gmail(dot)com>
Cc: 9erthalion6(at)gmail(dot)com, Craig Ringer <craig(at)2ndquadrant(dot)com>, Vladimir Gordiychuk <folyga(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Reviving the "Stopping logical replication protocol" patch from Vladimir Gordichuk
Date: 2019-02-16 03:01:50
Message-ID: 20190216030150.yn6bqmcfqc2wenbz@alap3.anarazel.de
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi,

On 2018-12-03 06:38:43 -0500, Dave Cramer wrote:
> From 4d023cfc1fed0b5852b4da1aad6a32549b03ce26 Mon Sep 17 00:00:00 2001
> From: Dave Cramer <davecramer(at)gmail(dot)com>
> Date: Fri, 30 Nov 2018 18:23:49 -0500
> Subject: [PATCH 1/5] Respect client initiated CopyDone in walsender
>
> ---
> src/backend/replication/walsender.c | 36 ++++++++++++++++++++++++++++++------
> 1 file changed, 30 insertions(+), 6 deletions(-)
>
> diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
> index 46edb52..93f2648 100644
> --- a/src/backend/replication/walsender.c
> +++ b/src/backend/replication/walsender.c
> @@ -770,6 +770,14 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
> sendTimeLineValidUpto = state->currTLIValidUntil;
> sendTimeLineNextTLI = state->nextTLI;
>
> + /*
> + * If the client sent CopyDone while we were waiting,
> + * bail out so we can wind up the decoding session.
> + */
> + if (streamingDoneSending)
> + return -1;
> +
> + /* more than one block available */
> /* make sure we have enough WAL available */
> flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
>
> @@ -1341,8 +1349,12 @@ WalSndWaitForWal(XLogRecPtr loc)
> * It's important to do this check after the recomputation of
> * RecentFlushPtr, so we can send all remaining data before shutting
> * down.
> - */
> - if (got_STOPPING)
> + *
> + * We'll also exit here if the client sent CopyDone because it wants
> + * to return to command mode.
> + */
> +
> + if (got_STOPPING || streamingDoneReceiving)
> break;
>
> /*
> @@ -2095,7 +2107,14 @@ WalSndCheckTimeOut(void)
> }
> }
>
> -/* Main loop of walsender process that streams the WAL over Copy messages. */
> +/*
> + * Main loop of walsender process that streams the WAL over Copy messages.
> + *
> + * The send_data callback must enqueue complete CopyData messages to libpq
> + * using pq_putmessage_noblock or similar, since the walsender loop may send
> + * CopyDone then exit and return to command mode in response to a client
> + * CopyDone between calls to send_data.
> + */

Wait, how is it ok to end CopyDone before all the pending data has been
sent out?

> diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
> index 23466ba..66b6e90 100644
> --- a/src/backend/replication/logical/reorderbuffer.c
> +++ b/src/backend/replication/logical/reorderbuffer.c
> @@ -1497,7 +1497,9 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
> rb->begin(rb, txn);
>
> iterstate = ReorderBufferIterTXNInit(rb, txn);
> - while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
> + while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL &&
> + (rb->continue_decoding_cb == NULL ||
> + rb->continue_decoding_cb()))
> {
> Relation relation = NULL;
> Oid reloid;

> @@ -1774,8 +1776,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
> ReorderBufferIterTXNFinish(rb, iterstate);
> iterstate = NULL;
>
> - /* call commit callback */
> - rb->commit(rb, txn, commit_lsn);
> + if (rb->continue_decoding_cb == NULL || rb->continue_decoding_cb())
> + {
> + /* call commit callback */
> + rb->commit(rb, txn, commit_lsn);
> + }

I'm doubtful it's ok to simply stop in the middle of a transaction.

> @@ -1194,17 +1212,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
>
> CHECK_FOR_INTERRUPTS();
>
> - /* Try to flush pending output to the client */
> - if (pq_flush_if_writable() != 0)
> - WalSndShutdown();
> -
> - /* Try taking fast path unless we get too close to walsender timeout. */
> - if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
> - wal_sender_timeout / 2) &&
> - !pq_is_send_pending())
> - {
> - return;
> - }

As somebody else commented on the thread, I'm also doubtful this is
ok. This'll introduce significant additional blocking unless I'm missing
something?

> /* If we have pending write here, go to slow path */
> for (;;)
> @@ -1358,7 +1365,14 @@ WalSndWaitForWal(XLogRecPtr loc)
> break;
>
> /*
> - * We only send regular messages to the client for full decoded
> + * If we have received CopyDone from the client, sent CopyDone
> + * ourselves, it's time to exit streaming.
> + */
> + if (!IsStreamingActive()) {
> + break;
> + }

Wrong formatting.

I wonder if the saner approach here isn't to support query cancellations
or something of that vein, and then handle the error.

Greetings,

Andres Freund

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Andres Freund 2019-02-16 03:13:23 Re: [HACKERS] Restricting maximum keep segments by repslots
Previous Message Andres Freund 2019-02-16 02:45:02 Re: Refactoring the checkpointer's fsync request queue