Re: Horizontal scalability/sharding

From: Thomas Munro <thomas(dot)munro(at)enterprisedb(dot)com>
To: Josh Berkus <josh(at)agliodbs(dot)com>
Cc: Robert Haas <robertmhaas(at)gmail(dot)com>, Petr Jelinek <petr(at)2ndquadrant(dot)com>, Tomas Vondra <tomas(dot)vondra(at)2ndquadrant(dot)com>, PostgreSQL-development <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Horizontal scalability/sharding
Date: 2015-09-08 06:25:00
Message-ID: CAEepm=0HZzL9Qhg+k-Tk60F-HWtHG6Fhx-3b6gcpZsQAsFjpVQ@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Thu, Sep 3, 2015 at 7:03 AM, Josh Berkus <josh(at)agliodbs(dot)com> wrote:

> On 09/02/2015 11:41 AM, Robert Haas wrote:
> > 4. Therefore, I think that we should instead use logical replication,
> > which might be either synchronous or asynchronous. When you modify
> > one copy of the data, that change will then be replicated to all other
> > nodes. If you are OK with eventual consistency, this replication can
> > be asynchronous, and nodes that are off-line will catch up when they
> > are on-line. If you are not OK with that, then you must replicate
> > synchronously to every node before transaction commit; or at least you
> > must replicate synchronously to every node that is currently on-line.
> > This presents some challenges: logical decoding currently can't
> > replicate transactions that are still in process - replication starts
> > when the transaction commits. Also, we don't have any way for
> > synchronous replication to wait for multiple nodes.
>
> Well, there is a WIP patch for that, which IMHO would be much improved
> by having a concrete use-case like this one. What nobody is working on
> -- and we've vetoed in the past -- is a way of automatically failing and
> removing from replication any node which repeatedly fails to sync, which
> would be a requirement for this model.
>
> You'd also need a way to let the connection nodes know when a replica
> has fallen behind so that they can be taken out of
> load-balancing/sharding for read queries. For the synchronous model,
> that would be "fallen behind at all"; for asynchronous it would be
> "fallen more than ### behind".
>

I have been thinking about that problem in the context of
synchronous_commit = apply, and while trying to review the multiple
synchronous patch. How are you supposed to actually make use of
remote-apply semantics without a way to find a replica that is consistent?
And what does consistent mean? I'm going to say consistency means: it sees
at least all effects of all preceding COMMITs that returned successfully.
It's trivial in a no-timeout, single standby topology (if commit returned,
the sole sync replica has applied your transaction and replied), but beyond
that it obviously requires some more infrastructure and concepts. Here is
my suggestion:

Imagine if we could configure setups like this:

1. I have 4 servers called london1, london2, paris1, paris2 (see nearby
multiple sync server thread/patch).
2. I have synchronous_commit = apply (see nearby thread/patch)
3. Currently london1 is master, with the other 3 in the synchronous
replication set, and I want a minimum replication set of size 2 so I can
lose 1 of those and continue

So far so good, now for the double-vapourware part:

4. The replicas know whether they are currently part of the master's
synchronous replication set or not because it tells them
5. While waiting for replicas, the master only waits up to X milliseconds,
and if no reply is received from a given server it drops that server from
the sync rep set, like a RAID controller dropping an unresponsive element
from a RAID array, but still returns control to the user if 2 nodes (our
configured minimum) did reply
6. If the sync rep set reaches our minimum size 2 because of a node being
dropped, then you can no longer drop one, so commit hangs (work is blocked
until enough servers connect and catch up again)
7. If a replica sees that it hasn't received WAL records or pings from the
master with timestamps from the past Y milliseconds, or receives a message
explicitly telling it it's been dropped from the sync rep set, it will
start rejecting my queries on the basis that it's potentially out of date
8. If the master sees suitable apply reply messages stream in from a node
that was dropped but has now caught up (possibly having reconnected), it
will explicitly tell it that it's back in the sync rep set and start
waiting for it again
9. X is sufficiently larger than Y so that, combined with the 'you've been
dropped from/rejoined the sync rep set' messages and well sync'd system
clocks, it should not be possible for a replica to expose snapshots that
don't include all effects from transactions whose COMMIT command returned
on the master. (If you don't have the concept of a dynamic replication set
which replicas are dropped from and rejoin, then the master can't drop a
node and carry on, like a RAID controller would, unless it's happy to wait
for any old N nodes to reply. Waiting for any N nodes to reply may be OK
for log-flush-only sync rep, but if you want to use apply sync rep and have
guarantees about visibility, and you simply wait for any N nodes to reply,
then the nodes themselves don't know if they are up to date with master or
not (they don't know if they were one of the ones that master waited for
some transaction they haven't even heard about yet), so they may show users
old data. Also, if you don't have the master telling the replicas that it
considers them to be in or out of the replication set, they don't know
exactly when the master decides to consider them in again after they
rejoin.)

Now I can connect to any server and specify my requirement to see all
committed-on-the-master transactions (meaning: the COMMIT command returned
success to the client), and either get an error telling me that it can't
guarantee that at that moment (because it has been told it's not in the
sync rep set or hasn't heard from the master recently enough), or happily
proceed to query the database. I can send my writes to the master node,
and do all my reads on any node I like, and be sure they include whatever I
just committed (or someone else just committed and told me about). I can
also use FDW to query the replicas from the master and know that they can
see everything already committed (but of course not uncommitted changes;
I'm guessing you get that on the GTM based systems).

The main problem I can see so far with this scheme is that you can see
things on the replicas *before* the COMMIT returns. Is that a problem? I
suspect all solutions to that problem involve centralised snapshot control
(through a GTM or through the master).

The timeout and ping based drop/join idea is inspired by a non-Postgres
system that was presented at CHAR(14) last year that is due to be released
as open source one day (and whose author egged me on to try implementing
that synchronous_commit = apply patch), and I guess loosely RAID. Maybe
there is better terminology or a name in database literature for this
approach, I'm not sure, and maybe it has unacceptable holes. It's a lot
less radical than the GTM/MPP systems, since it just adds a few bells and
whistles to the existing single master replication model, and obviously
there are plenty more problems to solve to make really useful clustering
technology, like master reelection, query routing/node location, load
balancing and so forth. But the master and the replicas have the
information they need to do that.

--
Thomas Munro
http://www.enterprisedb.com

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message Fabien COELHO 2015-09-08 06:25:19 Re: pgbench progress with timestamp
Previous Message Pavel Stehule 2015-09-08 05:06:04 Re: psql tabcomplete - minor bugfix - tabcomplete for SET ROLE TO xxx