Stopping logical replication protocol

From: Vladimir Gordiychuk <folyga(at)gmail(dot)com>
To: pgsql-hackers(at)postgresql(dot)org, alvherre(at)2ndquadrant(dot)com
Subject: Stopping logical replication protocol
Date: 2016-05-06 15:23:44
Message-ID: CAFgjRd3hdYOa33m69TbeOfNNer2BZbwa8FFjt2V5VFzTBvUU3w@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi all,

During implementing logical replication protocol for pgjdbc
https://github.com/pgjdbc/pgjdbc/pull/550 I faced with strange behavior of
the *walcender.c*:

1. When WAL consumer catchup master and change his state to streaming,
not available normally complete replication by send CopyDone message until
will not generate/create new WAL record. It occurs because logical decoding
located in *WalSndWaitForWal* until will not available next WAL record,
and it method receive message from consumer even reply on CopyDone with
CopyDone but ignore exit from loop and we can wait many times waiting
CommandStatus & ReadyForQuery packages on consumer.
2. Logical decoding ignore message from consumer during decoding and
writing transaction in socket(*WalSndWriteData*). It affect long
transactions with many changes. Because for example if we start decoding
transaction that insert 1 million records and after consume 1% of it date
we
decide stop replication, it will be not available until whole million
record will not send to consumer. And I see the following problems because
of this:

- It will generate many not necessary network traffic.
- Not available normally stop logical replication, because consumer will
fail with timeout and it broke scenario when consumer put data to external
system asynchronously and by success callback send feedback to master
about flushed LSN, but by fail callback stop replication, and restart it
from the last successfully sent LSN to external system, because slot
will be busy and it will increase latency for streaming system.
- Consumer can send keepalive message with required reply flag to master
that right now decoding and sending huge transaction, and after some time
disconnect master because because reply on keepalive will not get. I
faced with it problem during research bottledwater-pg
<https://github.com/confluentinc/bottledwater-pg> extension that fail
each time during receive transaction in that was modify 1 million
record(restart_lsn was before huge transaction so extension fail again and
again) disconnect master because not keep keepalive package too long.

I prepare small patch that fix problems describe below. Now *WalSndWriteData
*first check message from consumer and during decode transaction check that
replication still active. KeppAlive message now not send if was get
CopyDone package(keep alive now not necessary we preparing to
complete). *WalSndWaitForWal
*after get CopyDone exit from loop. With apply this patch I get next
measurements
Before
-----
logical start and stopping: *15446ms*
logical stopping: *13820ms*

physical start and stopping: 462ms
physical stopping: 348

After
-----
logical start and stopping: 2424ms
logical stopping: *26ms*

physical start and stopping: 458ms
physical stopping: 329ms

Where for measurements was use code from pgjdbc

For physical replicaion:

LogSequenceNumber startLSN = getCurrentLSN();
>
> Statement st = sqlConnection.createStatement();
> st.execute("insert into test_logic_table\n"
> + " select id, md5(random()::text) as name from
> generate_series(1, 1000000) as id;");
> st.close();
>
> long start = System.nanoTime();
>
> PGReplicationStream stream =
> pgConnection
> .replicationStream()
> .physical()
> .withStartPosition(startLSN)
> .start();
>
> //read single message
> stream.read();
> long startStopping = System.nanoTime();
>
> stream.close();
>
> long now = System.nanoTime();
>
> long startAndStopTime = now - start;
> long stopTime = now - startStopping;
>
> System.out.println(TimeUnit.NANOSECONDS.toMillis(startAndStopTime));
> System.out.println(TimeUnit.NANOSECONDS.toMillis(stopTime));
>

For logical replication:

LogSequenceNumber startLSN = getCurrentLSN();
>
> Statement st = sqlConnection.createStatement();
> st.execute("insert into test_logic_table\n"
> + " select id, md5(random()::text) as name from
> generate_series(1, 1000000) as id;");
> st.close();
>
> long start = System.nanoTime();
>
> PGReplicationStream stream =
> pgConnection
> .replicationStream()
> .logical()
> .withSlotName(SLOT_NAME)
> .withStartPosition(startLSN)
> .withSlotOption("include-xids", false)
> .withSlotOption("skip-empty-xacts", true)
> .start();
>
> //read single message
> stream.read();
> long startStopping = System.nanoTime();
>
> stream.close();
>
> long now = System.nanoTime();
>
> long startAndStopTime = now - start;
> long stopTime = now - startStopping;
>
> System.out.println(TimeUnit.NANOSECONDS.toMillis(startAndStopTime));
> System.out.println(TimeUnit.NANOSECONDS.toMillis(stopTime));
>

https://github.com/Gordiychuk/postgres/tree/stopping_logical_replication

Attachment Content-Type Size
0001-Stop-logical-decoding-by-get-CopyDone.patch text/x-patch 14.8 KB

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Mithun Cy 2016-05-06 15:51:11 Re: Perf Benchmarking and regression.
Previous Message Tom Lane 2016-05-06 15:13:40 Re: Poorly-thought-out handling of double variables in pgbench