Replication Node Identifiers and crashsafe Apply Progress

From: Andres Freund <andres(at)2ndquadrant(dot)com>
To: pgsql-hackers(at)postgresql(dot)org
Cc: Steve Singer <steve(at)ssinger(dot)info>
Subject: Replication Node Identifiers and crashsafe Apply Progress
Date: 2013-11-14 17:26:32
Lists: pgsql-hackers


As you know, the reason we are working changeset extraction is that we
want to build logical unidirection and bidirectional replication
ontop. To use changeset extraction effectively, I think one set of
related features ontop is very useful:

When extracting changes using the changeset extraction patchset (latest
version at [1]) the START_LOGICAL_REPLICATION command is used to stream
changes from a source system. When started it will continue to send
changes as long as the connection is up or it is aborted. For obvious
performance reasons it will *not* wait for an ACK for each transaction
commit it streams out.
Instead it relies on the receiver, exactly as in physical replication,
sending feedback messages containing the LSN up to which data has safely
been received.
That means frequently something like:
walsender: => COMMIT 0/10000000
walsender: => COMMIT 0/10000200
walsender: => COMMIT 0/10000400
walsender: => COMMIT 0/10000600
receiver: <= ACKNOWLEDGE 0/10000270
walsender: => COMMIT 0/10000800
is possible and important for performance. I.e. the server has streamed
out more changes than it got confirmation for.

So, when the the replication connection goes down, e.g. because the
receiving side has crashed, we need to tell the server from where to
start. Every position between the last ACKed and the end of WAL is
The receiver then can ask the source to start replication from the last
replayed commit using START_LOGICAL_REPLICATION 'slot_name'
'0/10000600' which would then re-stream all the changes in the
transaction that committe at 0/10000600 and all that follow.

But for that the receiving side needs to know up to where changes have
been applied. One relatively easy solution for that is that the
receiving side does something like:
UPDATE replication_progress SET lsn = '0/10000600' WHERE source_id = ...;
before the end of every replayed transaction. But that obviously will
quickly cause bloat.

Our solution to that is that a replaying process can tell the backend
that it is currently doing so and setup three variables for every
1) an identifier for the the source database
2) the LSN at which the replayed transaction has committed remotely
3) the time at which the replayed transaction has committed remotely

When the transaction then commits the commit record will set the
XACT_CONTAINS_ORIGIN flag to ->xinfo and will add that data to the end
of the commit record. During crash recovery the startup process will
remember the newest LSN for each remote database in shared memory.

This way, after a crash, restart, disconnect the replay process can look
into shared memory and check how far it has already replayed and restart
seamlessly. With minimal effort.

We previously discussed the topic and some were very adverse to using
any sort of numeric node identifiers across systems and suggested that
those should only be used internally. So what the attached patch does is
to add a new shared system catalog called 'pg_replication_identifier'
(suggestions for a better name welcome) which translates a number of
identifying traits into a numeric identifier.
The set of identifiers currently are:
* the sysid of the remote system, combined with the remote TLI
* the oid of the local database
* the oid of the remote database
* an optional name
but that's just what we needed in our multimaster prototype, and not
what I necessarily think is correct.

The added API (which surely need some work, I am not particularly happy
with the naming of functions for one) basically consists of two parts:
1) functions to query/create replication identifiers:
* GetReplicationIdentifier(identifying traits) - search for a numeric replication identifier
* CreateReplicationIdentifier(identifying traits) - creates a numeric replication identifier
* GetReplicationInfoByIdentifier(numeric identifier) - returns identifying traits

2) functions to query/manipulate replication progress:
* AdvanceReplicationIdentifier(node, local_lsn, remote_lsn)
* XLogRecPtr RemoteCommitFromReplicationIdentifier(node)

Internally the code also maintains some on-disk data which is updated
during checkpoints to store the replication progress, otherwise it'd
vanish if we shutdown gracefully ;).

The attached code also integrates with the "commit timestamp" module
that Alvaro submitted ([2]). Everytime a remote transaction is committed
we store a) the remote commit's timestamp, b) the origin node id in it.
That allows to relatively easily build multimaster systems with conflict
resolution ontop, since whenever there's a conflict the originating
node, and originating commit timestamp for a row can be queried

Having information about the origin of a change/transaction allows to
implement complex replication topologies since the information is
available to changeset extration output plugins.
It allows to do write plugins that:
* decode all changes, independent from the system they were originally
executed on by the user
* decode changes generated locally, but none from remote systems
* pick and choose between those, say only decode those the receiving
system isn't replicating from itself

Questions are:
* Which identifying traits do we want to use to identify nodes?
* How do we want to manipulate replication identifiers? Currently they
can only be manipulated by using C functions, which is fine for some users,
but probably not for others?
* Do we want to allow setting (remote_lsn, remote_timestamp,
remote_node_id) via SQL? Currently the remote_node_id can be set as a
GUC, but the other's can't. They probably should be a function that
can be called instead of GUCs?
* Suggestions for better names!
* Would slony et al need something ontop to use this?

* cleanup/naming
* Set returning function to see the replication progress
* remove old checkpoint files

Note that this only applies a) ontop the changeset extraction code b)
the commit timestamp code. The 'replication-identifiers' git branch
([3]) contains all integrated together.

Comments welcome!


Andres Freund

Andres Freund
PostgreSQL Development, 24x7 Support, Training & Services

