Re: Copy function for logical replication slots

From: Andres Freund <andres(at)anarazel(dot)de>
To: Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>
Cc: Petr Jelinek <petr(dot)jelinek(at)2ndquadrant(dot)com>, Michael Paquier <michael(at)paquier(dot)xyz>, Peter Eisentraut <peter(dot)eisentraut(at)2ndquadrant(dot)com>, PostgreSQL-development <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Copy function for logical replication slots
Date: 2019-02-16 03:33:59
Message-ID: 20190216033359.zfjsu2o2lt23gfjr@alap3.anarazel.de
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi,

On 2019-01-15 10:56:04 +0900, Masahiko Sawada wrote:

> + <primary>pg_copy_physical_replication_slot</primary>
> + </indexterm>
> + <literal><function>pg_copy_physical_replication_slot(<parameter>src_slot_name</parameter> <type>name</type>, <parameter>dst_slot_name</parameter> <optional>, <parameter>temporary</parameter> <type>bool</type></optional>)</function></literal>
> + </entry>
> + <entry>
> + (<parameter>slot_name</parameter> <type>name</type>, <parameter>lsn</parameter> <type>pg_lsn</type>)
> + </entry>
> + <entry>
> + Copies an existing physical replication slot name <parameter>src_slot_name</parameter>
> + to a physical replication slot named <parameter>dst_slot_name</parameter>.
> + The copied physical slot starts to reserve WAL from the same <acronym>LSN</acronym> as the
> + source slot.
> + <parameter>temporary</parameter> is optional. If <parameter>temporary</parameter>
> + is omitted, the same value as the source slot is used.
> + </entry>
> + </row>
> +
> + <row>
> + <entry>
> + <indexterm>
> + <primary>pg_copy_logical_replication_slot</primary>
> + </indexterm>
> + <literal><function>pg_copy_logical_replication_slot(<parameter>src_slot_name</parameter> <type>name</type>, <parameter>dst_slot_name</parameter> <optional>, <parameter>plugin</parameter> <type>name</type> <optional>, <parameter>temporary</parameter> <type>boolean</type></optional></optional>)</function></literal>
> + </entry>
> + <entry>
> + (<parameter>slot_name</parameter> <type>name</type>, <parameter>lsn</parameter> <type>pg_lsn</type>)
> + </entry>
> + <entry>
> + Copies an existing logical replication slot name <parameter>src_slot_name</parameter>
> + to a logical replication slot named <parameter>dst_slot_name</parameter>
> + while changing the output plugin and persistence. The copied logical slot starts
> + from the same <acronym>LSN</acronym> as the source logical slot. Both <parameter>plugin</parameter> and
> + <parameter>temporary</parameter> are optional. If <parameter>plugin</parameter>
> + or <parameter>temporary</parameter> are omitted, the same values as
> + the source logical slot are used.
> + </entry>
> + </row>

Would it make sense to move the differing options to the end of the
argument list? Right now we have a few common params, then a different
one, and then another common one?

> @@ -271,7 +272,7 @@ CreateInitDecodingContext(char *plugin,
> StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN);
> SpinLockRelease(&slot->mutex);
>
> - ReplicationSlotReserveWal();
> + ReplicationSlotReserveWal(restart_lsn);

Why do we even need to call this? It ought to be guaranteed that there's
sufficient WAL, right? And somehow it seems harder to understand to me
that the reserve routine gets an LSN.

> /*
> * Reserve WAL for the currently active slot.
> *
> - * Compute and set restart_lsn in a manner that's appropriate for the type of
> - * the slot and concurrency safe.
> + * If an lsn to reserve is not requested, compute and set restart_lsn
> + * in a manner that's appropriate for the type of the slot and concurrency safe.
> + * If the reseved WAL is requested, set restart_lsn and check if the corresponding
> + * wal segment is available.
> */
> void
> -ReplicationSlotReserveWal(void)
> +ReplicationSlotReserveWal(XLogRecPtr requested_lsn)
> {
> ReplicationSlot *slot = MyReplicationSlot;
>
> @@ -1005,47 +1007,57 @@ ReplicationSlotReserveWal(void)
> * The replication slot mechanism is used to prevent removal of required
> * WAL. As there is no interlock between this routine and checkpoints, WAL
> * segments could concurrently be removed when a now stale return value of
> - * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
> - * this happens we'll just retry.
> + * ReplicationSlotsComputeRequiredLSN() is used. If the lsn to reserve is
> + * not requested, in the unlikely case that this happens we'll just retry.
> */
> while (true)
> {
> XLogSegNo segno;
> XLogRecPtr restart_lsn;
>
> - /*
> - * For logical slots log a standby snapshot and start logical decoding
> - * at exactly that position. That allows the slot to start up more
> - * quickly.
> - *
> - * That's not needed (or indeed helpful) for physical slots as they'll
> - * start replay at the last logged checkpoint anyway. Instead return
> - * the location of the last redo LSN. While that slightly increases
> - * the chance that we have to retry, it's where a base backup has to
> - * start replay at.
> - */
> - if (!RecoveryInProgress() && SlotIsLogical(slot))
> + if (!XLogRecPtrIsInvalid(requested_lsn))
> {
> - XLogRecPtr flushptr;
> -
> - /* start at current insert position */
> - restart_lsn = GetXLogInsertRecPtr();
> + /* Set the requested lsn */
> SpinLockAcquire(&slot->mutex);
> - slot->data.restart_lsn = restart_lsn;
> + slot->data.restart_lsn = requested_lsn;
> SpinLockRelease(&slot->mutex);
> -
> - /* make sure we have enough information to start */
> - flushptr = LogStandbySnapshot();
> -
> - /* and make sure it's fsynced to disk */
> - XLogFlush(flushptr);
> }
> else
> {
> - restart_lsn = GetRedoRecPtr();
> - SpinLockAcquire(&slot->mutex);
> - slot->data.restart_lsn = restart_lsn;
> - SpinLockRelease(&slot->mutex);
> + /*
> + * For logical slots log a standby snapshot and start logical decoding
> + * at exactly that position. That allows the slot to start up more
> + * quickly.
> + *
> + * That's not needed (or indeed helpful) for physical slots as they'll
> + * start replay at the last logged checkpoint anyway. Instead return
> + * the location of the last redo LSN. While that slightly increases
> + * the chance that we have to retry, it's where a base backup has to
> + * start replay at.
> + */
> + if (!RecoveryInProgress() && SlotIsLogical(slot))
> + {
> + XLogRecPtr flushptr;
> +
> + /* start at current insert position */
> + restart_lsn = GetXLogInsertRecPtr();
> + SpinLockAcquire(&slot->mutex);
> + slot->data.restart_lsn = restart_lsn;
> + SpinLockRelease(&slot->mutex);
> +
> + /* make sure we have enough information to start */
> + flushptr = LogStandbySnapshot();
> +
> + /* and make sure it's fsynced to disk */
> + XLogFlush(flushptr);
> + }
> + else
> + {
> + restart_lsn = GetRedoRecPtr();
> + SpinLockAcquire(&slot->mutex);
> + slot->data.restart_lsn = restart_lsn;
> + SpinLockRelease(&slot->mutex);
> + }
> }
>
> /* prevent WAL removal as fast as possible */
> @@ -1061,6 +1073,21 @@ ReplicationSlotReserveWal(void)
> XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
> if (XLogGetLastRemovedSegno() < segno)
> break;

This seems like it's harder to understand than before. The loop (and
most of the rest of the function) doesn't make sense for the copy case,
so I think it'd be better to just move this into a separate function
that just verifies that all the WAL is there.

> + /*
> + * The caller has requested a specific wal which we failed to reserve.
> + * We can't retry here as the requested wal is no longer available.
> + */
> + if (!XLogRecPtrIsInvalid(requested_lsn))
> + {
> + char filename[MAXFNAMELEN];
> +
> + XLogFileName(filename, ThisTimeLineID, segno, wal_segment_size);
> + ereport(ERROR,
> + (errcode(ERRCODE_UNDEFINED_FILE),
> + errmsg("requested WAL segment %s has already been removed",
> + filename)));
> + }
> }
> }

This ought to be unreachable, right?

> +/*
> + * Copy physical replication slot (3 arguments)
> + *
> + * note: this wrapper is necessary to pass the sanity check in opr_sanity,
> + * which checks that all built-in functions that share the implementing C
> + * function take the same number of arguments
> + */
> +Datum
> +pg_copy_physical_replication_slot_no_temp(PG_FUNCTION_ARGS)
> +{
> + return pg_copy_physical_replication_slot(fcinfo);
> +}

You could avoid this by just defining the wrapper on the SQL level, but
I'm ok with this.

> + /*
> + * To prevent the restart_lsn WAL of the source slot from removal
> + * during copying a new slot, we copy it while holding the source slot.
> + * Since we are not allowed to create a new one while holding another
> + * one, we temporarily save the acquired slot and restore it after
> + * creation. Set callback function to ensure we release replication
> + * slots if fail below.
> + */
> if (immediately_reserve)
> + saveslot = MyReplicationSlot;
> + else
> + ReplicationSlotRelease();

Yikes, this is mightily ugly.

Stupid question, but couldn't we optimize this to something like:

/*
* First copy current data of the slot. Then install those in the
* new slot. The src slot could have progressed while installing,
* but the installed values prevent global horizons from progressing
* further. Therefore a second copy is sufficiently up2date.
*/
SpinLockAcquire(&src->mutex);
copy_lsn = src->data.restart_lsn;
copy_xid = ...;
SpinLockRelease(&src->mutex);

/* install copied values */

SpinLockAcquire(&src->mutex);
/* copy data of slot again */
SpinLockRelease(&src->mutex);

/* install again */

?

Greetings,

Andres Freund

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Andres Freund 2019-02-16 03:41:36 Re: [Patch] pg_rewind: options to use restore_command from recovery.conf or command line
Previous Message Bruce Momjian 2019-02-16 03:21:12 Re: Channel binding