Stream consistent snapshot via a logical decoding plugin as a series of INSERTs

From: "Shulgin, Oleksandr" <oleksandr(dot)shulgin(at)zalando(dot)de>
To: PostgreSQL Hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Stream consistent snapshot via a logical decoding plugin as a series of INSERTs
Date: 2016-01-15 08:30:43
Message-ID: CACACo5RNZ0OB8KwhuwF4F_xumfgbdXbw1sxdqxmyJ_gCobn=iA@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hello,

I'd like to propose generic functions (probably in an extension, or in core
if not possible otherwise) to facilitate streaming existing data from the
database *in the same format* that one would get if these would be the
changes decoded by a logical decoding plugin.

The idea is to use a snapshot returned from CREATE_REPLICATION_SLOT command
of the replication protocol to get a consistent snapshot of the database,
then start listening to new changes on the slot.

One of the implementations of this approach is "Bottled Water":
https://github.com/confluentinc/bottledwater-pg

The way this initial export phase is implemented there is by providing a
SQL-callable set returning function which is using SPI to run "SELECT *
FROM mytable" behind the scenes and runs the resulting tuples through the
INSERT callback of the logical decoding plugin, which lives in the same
loadable module as this SQL function.

Bottled Water logical decoding plugin uses binary protocol based on Avro
data serialization library. As an experiment I was adding support for JSON
output format to it, and for that I had to re-implement the aforementioned
SRF to export initial data to convert tuples to JSON instead.

Now I'd like to compare performance impact of using JSON vs. Avro vs.
binary format of pglogical_output and for that a missing part is something
that would stream the existing data in pglogical's format. Instead of
writing one more implementation of the export function, this time for
pglogical_output, I'd rather use a generic function that accepts a relation
name, logical decoding plugin name and a set of startup options for the
plugin, then pretends that we're decoding a stream of INSERTs on a slot (no
actual slot is needed for that, but setting transaction snapshot beforehand
is something to be done by the client).

In SQL and C pseudo-code:

CREATE FUNCTION /*pg_catalog.?*/ pg_logical_stream_relation(
relnamespace text,
relname text,
plugin_name text,
nochildren boolean DEFAULT FALSE,
VARIADIC options text[] DEFAULT '{}'::text[]
) RETURNS SETOF text
AS '...', 'pg_logical_stream_relation' LANGUAGE C VOLATILE;

CREATE FUNCTION /*pg_catalog.?*/ pg_logical_stream_relation_binary(
relnamespace text,
relname text,
plugin_name text,
nochildren boolean DEFAULT FALSE,
VARIADIC options text[] DEFAULT '{}'::text[]
) RETURNS SETOF bytea
AS '...', 'pg_logical_stream_relation_binary' LANGUAGE C VOLATILE;

-- usage:
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SET TRANSACTION SNAPSHOT 'XXXXXXXX-N';

SELECT *
FROM pg_logical_stream_relation('myschema', 'mytable', 'test_decoding',
nochildren := FALSE, ...)

Datum
pg_logical_stream_relation(PG_FUNCTION_ARGS)
{
if (SRF_IS_FIRSTCALL())
{
/* create decoding context */ /* starts the plugin up */
/* emit BEGIN */
}
/*
seq scan
=> emit series of INSERTs
*/
/* emit COMMIT */
/* free decoding context */ /* shuts the plugin down */
}

What do you say?

--
Alex

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Magnus Hagander 2016-01-15 09:25:03 Re: Comment typo in port/atomics/generic.h
Previous Message Abhijit Menon-Sen 2016-01-15 06:26:49 dealing with extension dependencies that aren't quite 'e'