From ec00dc6ab8bafefc00e9b1c78ac9348b643b8a87 Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Mon, 3 Jan 2022 14:43:36 +0100 Subject: [PATCH v3] Synchronize logical replication slots from primary to standby Discussion: https://www.postgresql.org/message-id/flat/514f6f2f-6833-4539-39f1-96cd1e011f23%40enterprisedb.com --- doc/src/sgml/config.sgml | 34 ++ src/backend/commands/subscriptioncmds.c | 4 +- src/backend/postmaster/bgworker.c | 3 + .../libpqwalreceiver/libpqwalreceiver.c | 94 ++++ src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/launcher.c | 202 ++++++--- .../replication/logical/reorderbuffer.c | 85 ++++ src/backend/replication/logical/slotsync.c | 412 ++++++++++++++++++ src/backend/replication/logical/tablesync.c | 13 +- src/backend/replication/repl_gram.y | 32 +- src/backend/replication/repl_scanner.l | 1 + src/backend/replication/slotfuncs.c | 2 +- src/backend/replication/walsender.c | 196 ++++++++- src/backend/utils/activity/wait_event.c | 3 + src/backend/utils/misc/guc.c | 23 + src/backend/utils/misc/postgresql.conf.sample | 2 + src/include/commands/subscriptioncmds.h | 3 + src/include/nodes/nodes.h | 1 + src/include/nodes/replnodes.h | 9 + src/include/replication/logicalworker.h | 9 + src/include/replication/slot.h | 4 +- src/include/replication/walreceiver.h | 16 + src/include/replication/worker_internal.h | 8 +- src/include/utils/wait_event.h | 1 + src/test/recovery/t/030_slot_sync.pl | 58 +++ 25 files changed, 1146 insertions(+), 70 deletions(-) create mode 100644 src/backend/replication/logical/slotsync.c create mode 100644 src/test/recovery/t/030_slot_sync.pl diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index afbb6c35e3..2b2a21a251 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4406,6 +4406,23 @@ Primary Server + + standby_slot_names (string) + + standby_slot_names configuration parameter + + + + + List of physical replication slots that logical replication waits for. + If a logical replication connection is meant to switch to a physical + standby after the standby is promoted, the physical replication slot + for the standby should be listed here. This ensures that logical + replication is not ahead of the physical standby. + + + + @@ -4794,6 +4811,23 @@ Standby Servers + + synchronize_slot_names (string) + + synchronize_slot_names configuration parameter + + + + + Specifies a list of logical replication slots that a physical standby + should synchronize from the primary server. This is necessary to be + able to retarget those logical replication connections to this standby + if it gets promoted. Specify * to synchronize all + logical replication slots. The default is empty. + + + + diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 2b658080fe..7cdea20207 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -737,7 +737,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) RemoveSubscriptionRel(sub->oid, relid); - logicalrep_worker_stop(sub->oid, relid); + logicalrep_worker_stop(MyDatabaseId, sub->oid, relid); /* * For READY state, we would have already dropped the @@ -1239,7 +1239,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) { LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); - logicalrep_worker_stop(w->subid, w->relid); + logicalrep_worker_stop(w->dbid, w->subid, w->relid); } list_free(subworkers); diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index c05f500639..818b8a35e9 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -128,6 +128,9 @@ static const struct }, { "ApplyWorkerMain", ApplyWorkerMain + }, + { + "ReplSlotSyncMain", ReplSlotSyncMain } }; diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index c08e599eef..c71fb0c1ec 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -33,6 +33,7 @@ #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/tuplestore.h" +#include "utils/varlena.h" PG_MODULE_MAGIC; @@ -58,6 +59,7 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port); static char *libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli); +static List *libpqrcv_list_slots(WalReceiverConn *conn, const char *slot_names); static int libpqrcv_server_version(WalReceiverConn *conn); static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, @@ -89,6 +91,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { libpqrcv_get_conninfo, libpqrcv_get_senderinfo, libpqrcv_identify_system, + libpqrcv_list_slots, libpqrcv_server_version, libpqrcv_readtimelinehistoryfile, libpqrcv_startstreaming, @@ -397,6 +400,97 @@ libpqrcv_server_version(WalReceiverConn *conn) return PQserverVersion(conn->streamConn); } +/* + * Get list of slots from primary. + */ +static List * +libpqrcv_list_slots(WalReceiverConn *conn, const char *slot_names) +{ + PGresult *res; + List *slotlist = NIL; + int ntuples; + StringInfoData s; + WalRecvReplicationSlotData *slot_data; + + initStringInfo(&s); + appendStringInfoString(&s, "LIST_SLOTS"); + + if (strcmp(slot_names, "") != 0 && strcmp(slot_names, "*") != 0) + { + char *rawname; + List *namelist; + ListCell *lc; + + appendStringInfoChar(&s, ' '); + rawname = pstrdup(slot_names); + SplitIdentifierString(rawname, ',', &namelist); + foreach (lc, namelist) + { + if (lc != list_head(namelist)) + appendStringInfoChar(&s, ','); + appendStringInfo(&s, "%s", + quote_identifier(lfirst(lc))); + } + } + + res = libpqrcv_PQexec(conn->streamConn, s.data); + pfree(s.data); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + PQclear(res); + ereport(ERROR, + (errmsg("could not receive list of slots the primary server: %s", + pchomp(PQerrorMessage(conn->streamConn))))); + } + if (PQnfields(res) < 10) + { + int nfields = PQnfields(res); + + PQclear(res); + ereport(ERROR, + (errmsg("invalid response from primary server"), + errdetail("Could not get list of slots: got %d fields, expected %d or more fields.", + nfields, 10))); + } + + ntuples = PQntuples(res); + for (int i = 0; i < ntuples; i++) + { + char *slot_type; + + slot_data = palloc0(sizeof(WalRecvReplicationSlotData)); + namestrcpy(&slot_data->name, PQgetvalue(res, i, 0)); + if (!PQgetisnull(res, i, 1)) + namestrcpy(&slot_data->plugin, PQgetvalue(res, i, 1)); + slot_type = PQgetvalue(res, i, 2); + if (!PQgetisnull(res, i, 3)) + slot_data->database = atooid(PQgetvalue(res, i, 3)); + if (strcmp(slot_type, "physical") == 0) + { + if (OidIsValid(slot_data->database)) + elog(ERROR, "unexpected physical replication slot with database set"); + } + if (pg_strtoint32(PQgetvalue(res, i, 5)) == 1) + slot_data->persistency = RS_TEMPORARY; + else + slot_data->persistency = RS_PERSISTENT; + if (!PQgetisnull(res, i, 6)) + slot_data->xmin = atooid(PQgetvalue(res, i, 6)); + if (!PQgetisnull(res, i, 7)) + slot_data->catalog_xmin = atooid(PQgetvalue(res, i, 7)); + if (!PQgetisnull(res, i, 8)) + slot_data->restart_lsn = strtou64(PQgetvalue(res, i, 8), NULL, 10); + if (!PQgetisnull(res, i, 9)) + slot_data->confirmed_flush = strtou64(PQgetvalue(res, i, 9), NULL, 10); + + slotlist = lappend(slotlist, slot_data); + } + + PQclear(res); + + return slotlist; +} + /* * Start streaming WAL data from given streaming options. * diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index c4e2fdeb71..bc3f23b5a2 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -24,6 +24,7 @@ OBJS = \ proto.o \ relation.o \ reorderbuffer.o \ + slotsync.o \ snapbuild.o \ tablesync.o \ worker.o diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 3fb4caa803..207ef9bc8b 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -22,6 +22,7 @@ #include "access/htup_details.h" #include "access/tableam.h" #include "access/xact.h" +#include "catalog/pg_authid.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "funcapi.h" @@ -212,7 +213,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, * subscription id and relid. */ LogicalRepWorker * -logicalrep_worker_find(Oid subid, Oid relid, bool only_running) +logicalrep_worker_find(Oid dbid, Oid subid, Oid relid, bool only_running) { int i; LogicalRepWorker *res = NULL; @@ -224,8 +225,8 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (w->in_use && w->subid == subid && w->relid == relid && - (!only_running || w->proc)) + if (w->in_use && w->dbid == dbid && w->subid == subid && + w->relid == relid && (!only_running || w->proc)) { res = w; break; @@ -275,9 +276,13 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, int nsyncworkers; TimestampTz now; - ereport(DEBUG1, - (errmsg_internal("starting logical replication worker for subscription \"%s\"", - subname))); + if (OidIsValid(subid)) + ereport(DEBUG1, + (errmsg_internal("starting logical replication worker for subscription \"%s\"", + subname))); + else + ereport(DEBUG1, + (errmsg_internal("starting replication slot synchronization worker"))); /* Report this after the initial starting message for consistency. */ if (max_replication_slots == 0) @@ -314,7 +319,9 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, * reason we do this is because if some worker failed to start up and its * parent has crashed while waiting, the in_use state was never cleared. */ - if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription) + if (worker == NULL || + (OidIsValid(relid) && + nsyncworkers >= max_sync_workers_per_subscription)) { bool did_cleanup = false; @@ -348,7 +355,7 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, * silently as we might get here because of an otherwise harmless race * condition. */ - if (nsyncworkers >= max_sync_workers_per_subscription) + if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription) { LWLockRelease(LogicalRepWorkerLock); return; @@ -395,15 +402,22 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, memset(&bgw, 0, sizeof(bgw)); bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; - bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + bgw.bgw_start_time = BgWorkerStart_ConsistentState; snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain"); + if (OidIsValid(subid)) + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain"); + else + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ReplSlotSyncMain"); if (OidIsValid(relid)) snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication worker for subscription %u sync %u", subid, relid); - else + else if (OidIsValid(subid)) snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication worker for subscription %u", subid); + else + snprintf(bgw.bgw_name, BGW_MAXLEN, + "replication slot synchronization worker"); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker"); bgw.bgw_restart_time = BGW_NEVER_RESTART; @@ -434,14 +448,14 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, * it detaches from the slot. */ void -logicalrep_worker_stop(Oid subid, Oid relid) +logicalrep_worker_stop(Oid dbid, Oid subid, Oid relid) { LogicalRepWorker *worker; uint16 generation; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(subid, relid, false); + worker = logicalrep_worker_find(dbid, subid, relid, false); /* No worker, nothing to do. */ if (!worker) @@ -531,13 +545,13 @@ logicalrep_worker_stop(Oid subid, Oid relid) * Wake up (using latch) any logical replication worker for specified sub/rel. */ void -logicalrep_worker_wakeup(Oid subid, Oid relid) +logicalrep_worker_wakeup(Oid dbid, Oid subid, Oid relid) { LogicalRepWorker *worker; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(subid, relid, true); + worker = logicalrep_worker_find(dbid, subid, relid, true); if (worker) logicalrep_worker_wakeup_ptr(worker); @@ -714,7 +728,7 @@ ApplyLauncherRegister(void) memset(&bgw, 0, sizeof(bgw)); bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; - bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + bgw.bgw_start_time = BgWorkerStart_ConsistentState; snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, @@ -795,6 +809,116 @@ ApplyLauncherWakeup(void) kill(LogicalRepCtx->launcher_pid, SIGUSR1); } +static void +ApplyLauncherStartSlotSync(TimestampTz *last_start_time, long *wait_time) +{ + WalReceiverConn *wrconn; + TimestampTz now; + char *err; + List *slots; + ListCell *lc; + MemoryContext tmpctx; + MemoryContext oldctx; + + if (strcmp(synchronize_slot_names, "") == 0) + return; + + wrconn = walrcv_connect(PrimaryConnInfo, false, + "Logical Replication Launcher", &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the primary server: %s", err))); + + /* Use temporary context for the slot list and worker info. */ + tmpctx = AllocSetContextCreate(TopMemoryContext, + "Logical Replication Launcher slot sync ctx", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(tmpctx); + + slots = walrcv_list_slots(wrconn, synchronize_slot_names); + + now = GetCurrentTimestamp(); + + foreach(lc, slots) + { + WalRecvReplicationSlotData *slot_data = lfirst(lc); + LogicalRepWorker *w; + + if (!OidIsValid(slot_data->database)) + continue; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + w = logicalrep_worker_find(slot_data->database, InvalidOid, + InvalidOid, false); + LWLockRelease(LogicalRepWorkerLock); + + if (w == NULL) + { + *last_start_time = now; + *wait_time = wal_retrieve_retry_interval; + + logicalrep_worker_launch(slot_data->database, InvalidOid, NULL, + BOOTSTRAP_SUPERUSERID, InvalidOid); + } + } + + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(tmpctx); + + walrcv_disconnect(wrconn); +} + +static void +ApplyLauncherStartSubs(TimestampTz *last_start_time, long *wait_time) +{ + TimestampTz now; + List *sublist; + ListCell *lc; + MemoryContext subctx; + MemoryContext oldctx; + + now = GetCurrentTimestamp(); + + /* Use temporary context for the database list and worker info. */ + subctx = AllocSetContextCreate(TopMemoryContext, + "Logical Replication Launcher sublist", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); + + /* search for subscriptions to start or stop. */ + sublist = get_subscription_list(); + + /* Start the missing workers for enabled subscriptions. */ + foreach(lc, sublist) + { + Subscription *sub = (Subscription *) lfirst(lc); + LogicalRepWorker *w; + + if (!sub->enabled) + continue; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + w = logicalrep_worker_find(sub->dbid, sub->oid, InvalidOid, false); + LWLockRelease(LogicalRepWorkerLock); + + if (w == NULL) + { + *last_start_time = now; + *wait_time = wal_retrieve_retry_interval; + + logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, + sub->owner, InvalidOid); + } + } + + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(subctx); +} + /* * Main loop for the apply launcher process. */ @@ -822,14 +946,12 @@ ApplyLauncherMain(Datum main_arg) */ BackgroundWorkerInitializeConnection(NULL, NULL, 0); + load_file("libpqwalreceiver", false); + /* Enter main loop */ for (;;) { int rc; - List *sublist; - ListCell *lc; - MemoryContext subctx; - MemoryContext oldctx; TimestampTz now; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; @@ -841,42 +963,10 @@ ApplyLauncherMain(Datum main_arg) if (TimestampDifferenceExceeds(last_start_time, now, wal_retrieve_retry_interval)) { - /* Use temporary context for the database list and worker info. */ - subctx = AllocSetContextCreate(TopMemoryContext, - "Logical Replication Launcher sublist", - ALLOCSET_DEFAULT_SIZES); - oldctx = MemoryContextSwitchTo(subctx); - - /* search for subscriptions to start or stop. */ - sublist = get_subscription_list(); - - /* Start the missing workers for enabled subscriptions. */ - foreach(lc, sublist) - { - Subscription *sub = (Subscription *) lfirst(lc); - LogicalRepWorker *w; - - if (!sub->enabled) - continue; - - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - w = logicalrep_worker_find(sub->oid, InvalidOid, false); - LWLockRelease(LogicalRepWorkerLock); - - if (w == NULL) - { - last_start_time = now; - wait_time = wal_retrieve_retry_interval; - - logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, - sub->owner, InvalidOid); - } - } - - /* Switch back to original memory context. */ - MemoryContextSwitchTo(oldctx); - /* Clean the temporary memory. */ - MemoryContextDelete(subctx); + if (!RecoveryInProgress()) + ApplyLauncherStartSubs(&last_start_time, &wait_time); + else + ApplyLauncherStartSlotSync(&last_start_time, &wait_time); } else { diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 7aa5647a2c..f0b3b9ad87 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -95,11 +95,13 @@ #include "miscadmin.h" #include "pgstat.h" #include "replication/logical.h" +#include "replication/logicalworker.h" #include "replication/reorderbuffer.h" #include "replication/slot.h" #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */ #include "storage/bufmgr.h" #include "storage/fd.h" +#include "storage/ipc.h" #include "storage/sinval.h" #include "utils/builtins.h" #include "utils/combocid.h" @@ -107,6 +109,7 @@ #include "utils/memutils.h" #include "utils/rel.h" #include "utils/relfilenodemap.h" +#include "utils/varlena.h" /* entry for a hash table we use to map from xid to our transaction state */ @@ -2006,6 +2009,85 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, } } +static void +wait_for_standby_confirmation(XLogRecPtr commit_lsn) +{ + char *rawname; + List *namelist; + ListCell *lc; + XLogRecPtr flush_pos = InvalidXLogRecPtr; + + if (strcmp(standby_slot_names, "") == 0) + return; + + rawname = pstrdup(standby_slot_names); + SplitIdentifierString(rawname, ',', &namelist); + + while (true) + { + int wait_slots_remaining; + XLogRecPtr oldest_flush_pos = InvalidXLogRecPtr; + int rc; + + wait_slots_remaining = list_length(namelist); + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + bool inlist; + + if (!s->in_use) + continue; + + inlist = false; + foreach (lc, namelist) + { + char *name = lfirst(lc); + if (strcmp(name, NameStr(s->data.name)) == 0) + { + inlist = true; + break; + } + } + if (!inlist) + continue; + + SpinLockAcquire(&s->mutex); + + if (s->data.database == InvalidOid) + /* Physical slots advance restart_lsn on flush and ignore confirmed_flush_lsn */ + flush_pos = s->data.restart_lsn; + else + /* For logical slots we must wait for commit and flush */ + flush_pos = s->data.confirmed_flush; + + SpinLockRelease(&s->mutex); + + /* We want to find out the min(flush pos) over all named slots */ + if (oldest_flush_pos == InvalidXLogRecPtr + || oldest_flush_pos > flush_pos) + oldest_flush_pos = flush_pos; + + if (flush_pos >= commit_lsn && wait_slots_remaining > 0) + wait_slots_remaining --; + } + LWLockRelease(ReplicationSlotControlLock); + + if (wait_slots_remaining == 0) + return; + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + 1000L, PG_WAIT_EXTENSION); + + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + CHECK_FOR_INTERRUPTS(); + } +} + /* * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN. * @@ -2434,6 +2516,9 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * Call either PREPARE (for two-phase transactions) or COMMIT (for * regular ones). */ + + wait_for_standby_confirmation(commit_lsn); + if (rbtxn_prepared(txn)) rb->prepare(rb, txn, commit_lsn); else diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c new file mode 100644 index 0000000000..654ac154ea --- /dev/null +++ b/src/backend/replication/logical/slotsync.c @@ -0,0 +1,412 @@ +/*------------------------------------------------------------------------- + * slotsync.c + * PostgreSQL worker for synchronizing slots to a standby from primary + * + * Copyright (c) 2016-2018, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/slotsync.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "commands/dbcommands.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "replication/logicalworker.h" +#include "replication/walreceiver.h" +#include "replication/worker_internal.h" +#include "storage/ipc.h" +#include "storage/procarray.h" +#include "utils/builtins.h" +#include "utils/pg_lsn.h" +#include "utils/varlena.h" + +char *synchronize_slot_names; +char *standby_slot_names; + +/* + * Wait for remote slot to pass localy reserved position. + */ +static void +wait_for_primary_slot_catchup(WalReceiverConn *wrconn, char *slot_name, + XLogRecPtr min_lsn) +{ + WalRcvExecResult *res; + TupleTableSlot *slot; + Oid slotRow[1] = {LSNOID}; + StringInfoData cmd; + bool isnull; + XLogRecPtr restart_lsn; + + for (;;) + { + int rc; + + CHECK_FOR_INTERRUPTS(); + + initStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT restart_lsn" + " FROM pg_catalog.pg_replication_slots" + " WHERE slot_name = %s", + quote_literal_cstr(slot_name)); + res = walrcv_exec(wrconn, cmd.data, 1, slotRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch slot info for slot \"%s\" from primary: %s", + slot_name, res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + ereport(ERROR, + (errmsg("slot \"%s\" disapeared from provider", + slot_name))); + + restart_lsn = DatumGetLSN(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + ExecClearTuple(slot); + walrcv_clear_result(res); + + if (restart_lsn >= min_lsn) + break; + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + wal_retrieve_retry_interval, + WAIT_EVENT_REPL_SLOT_SYNC_MAIN); + + ResetLatch(MyLatch); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + } +} + +/* + * Synchronize single slot to given position. + * + * This optionally creates new slot if there is no existing one. + */ +static void +synchronize_one_slot(WalReceiverConn *wrconn, char *slot_name, char *database, + char *plugin_name, XLogRecPtr target_lsn) +{ + bool found = false; + XLogRecPtr endlsn; + + /* Search for the named slot and mark it active if we find it. */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (!s->in_use) + continue; + + if (strcmp(NameStr(s->data.name), slot_name) == 0) + { + found = true; + break; + } + } + LWLockRelease(ReplicationSlotControlLock); + + StartTransactionCommand(); + + /* Already existing slot, acquire */ + if (found) + { + ReplicationSlotAcquire(slot_name, true); + + if (target_lsn < MyReplicationSlot->data.confirmed_flush) + { + elog(DEBUG1, + "not synchronizing slot %s; synchronization would move it backward", + slot_name); + + ReplicationSlotRelease(); + CommitTransactionCommand(); + return; + } + } + /* Otherwise create the slot first. */ + else + { + TransactionId xmin_horizon = InvalidTransactionId; + ReplicationSlot *slot; + + ReplicationSlotCreate(slot_name, true, RS_EPHEMERAL, false); + slot = MyReplicationSlot; + + SpinLockAcquire(&slot->mutex); + slot->data.database = get_database_oid(database, false); + namestrcpy(&slot->data.plugin, plugin_name); + SpinLockRelease(&slot->mutex); + + ReplicationSlotReserveWal(); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + xmin_horizon = GetOldestSafeDecodingTransactionId(true); + slot->effective_catalog_xmin = xmin_horizon; + slot->data.catalog_xmin = xmin_horizon; + ReplicationSlotsComputeRequiredXmin(true); + LWLockRelease(ProcArrayLock); + + if (target_lsn < MyReplicationSlot->data.restart_lsn) + { + ereport(LOG, + errmsg("waiting for remote slot \"%s\" LSN (%X/%X) to pass local slot LSN (%X/%X)", + slot_name, + LSN_FORMAT_ARGS(target_lsn), LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn))); + + wait_for_primary_slot_catchup(wrconn, slot_name, + MyReplicationSlot->data.restart_lsn); + } + + ReplicationSlotPersist(); + } + + endlsn = pg_logical_replication_slot_advance(target_lsn); + + elog(DEBUG3, "synchronized slot %s to lsn (%X/%X)", + slot_name, LSN_FORMAT_ARGS(endlsn)); + + ReplicationSlotRelease(); + CommitTransactionCommand(); +} + +static void +synchronize_slots(void) +{ + WalRcvExecResult *res; + WalReceiverConn *wrconn = NULL; + TupleTableSlot *slot; + Oid slotRow[3] = {TEXTOID, TEXTOID, LSNOID}; + StringInfoData s; + char *database; + char *err; + MemoryContext oldctx = CurrentMemoryContext; + + if (!WalRcv) + return; + + /* syscache access needs a transaction env. */ + StartTransactionCommand(); + /* make dbname live outside TX context */ + MemoryContextSwitchTo(oldctx); + + database = get_database_name(MyDatabaseId); + initStringInfo(&s); + appendStringInfo(&s, "%s dbname=%s", PrimaryConnInfo, database); + wrconn = walrcv_connect(s.data, true, "slot_sync", &err); + + if (wrconn == NULL) + ereport(ERROR, + (errmsg("could not connect to the primary server: %s", err))); + + resetStringInfo(&s); + appendStringInfo(&s, + "SELECT slot_name, plugin, confirmed_flush_lsn" + " FROM pg_catalog.pg_replication_slots" + " WHERE database = %s", + quote_literal_cstr(database)); + if (strcmp(synchronize_slot_names, "") != 0 && strcmp(synchronize_slot_names, "*") != 0) + { + char *rawname; + List *namelist; + ListCell *lc; + + rawname = pstrdup(synchronize_slot_names); + SplitIdentifierString(rawname, ',', &namelist); + + appendStringInfoString(&s, " AND slot_name IN ("); + foreach (lc, namelist) + { + if (lc != list_head(namelist)) + appendStringInfoChar(&s, ','); + appendStringInfo(&s, "%s", + quote_literal_cstr(lfirst(lc))); + } + appendStringInfoChar(&s, ')'); + } + + res = walrcv_exec(wrconn, s.data, 3, slotRow); + pfree(s.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch slot info from primary: %s", + res->err))); + + CommitTransactionCommand(); + /* CommitTransactionCommand switches to TopMemoryContext */ + MemoryContextSwitchTo(oldctx); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *slot_name; + char *plugin_name; + XLogRecPtr confirmed_flush_lsn; + bool isnull; + + slot_name = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + plugin_name = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + confirmed_flush_lsn = DatumGetLSN(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); + + synchronize_one_slot(wrconn, slot_name, database, plugin_name, + confirmed_flush_lsn); + + ExecClearTuple(slot); + } + + walrcv_clear_result(res); + pfree(database); + + walrcv_disconnect(wrconn); +} + +/* + * The main loop of our worker process. + */ +void +ReplSlotSyncMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); + + /* Attach to slot */ + logicalrep_worker_attach(worker_slot); + + /* Establish signal handlers. */ + BackgroundWorkerUnblockSignals(); + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + + /* Connect to our database. */ + BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid, + MyLogicalRepWorker->userid, + 0); + + StartTransactionCommand(); + ereport(LOG, + (errmsg("replication slot synchronization worker for database \"%s\" has started", + get_database_name(MyLogicalRepWorker->dbid)))); + CommitTransactionCommand(); + + /* Main wait loop. */ + for (;;) + { + int rc; + + CHECK_FOR_INTERRUPTS(); + + if (!RecoveryInProgress()) + return; + + if (strcmp(synchronize_slot_names, "") == 0) + return; + + synchronize_slots(); + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + wal_retrieve_retry_interval, + WAIT_EVENT_REPL_SLOT_SYNC_MAIN); + + ResetLatch(MyLatch); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + } +} + +/* + * Routines for handling the GUC variable(s) + */ + +bool +check_synchronize_slot_names(char **newval, void **extra, GucSource source) +{ + /* Special handling for "*" which means all. */ + if (strcmp(*newval, "*") == 0) + { + return true; + } + else + { + char *rawname; + List *namelist; + ListCell *lc; + + /* Need a modifiable copy of string */ + rawname = pstrdup(*newval); + + /* Parse string into list of identifiers */ + if (!SplitIdentifierString(rawname, ',', &namelist)) + { + /* syntax error in name list */ + GUC_check_errdetail("List syntax is invalid."); + pfree(rawname); + list_free(namelist); + return false; + } + + foreach(lc, namelist) + { + char *curname = (char *) lfirst(lc); + + ReplicationSlotValidateName(curname, ERROR); + } + + pfree(rawname); + list_free(namelist); + } + + return true; +} + + +bool +check_standby_slot_names(char **newval, void **extra, GucSource source) +{ + char *rawname; + List *namelist; + ListCell *lc; + + /* Need a modifiable copy of string */ + rawname = pstrdup(*newval); + + /* Parse string into list of identifiers */ + if (!SplitIdentifierString(rawname, ',', &namelist)) + { + /* syntax error in name list */ + GUC_check_errdetail("List syntax is invalid."); + pfree(rawname); + list_free(namelist); + return false; + } + + foreach(lc, namelist) + { + char *curname = (char *) lfirst(lc); + + ReplicationSlotValidateName(curname, ERROR); + } + + pfree(rawname); + list_free(namelist); + + return true; +} diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index f07983a43c..0e0593f716 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -100,6 +100,7 @@ #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/copy.h" +#include "commands/subscriptioncmds.h" #include "miscadmin.h" #include "parser/parse_relation.h" #include "pgstat.h" @@ -151,7 +152,8 @@ finish_sync_worker(void) CommitTransactionCommand(); /* Find the main apply worker and signal it. */ - logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); + logicalrep_worker_wakeup(MyLogicalRepWorker->dbid, + MyLogicalRepWorker->subid, InvalidOid); /* Stop gracefully */ proc_exit(0); @@ -191,7 +193,8 @@ wait_for_relation_state_change(Oid relid, char expected_state) /* Check if the sync worker is still running and bail if not. */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid, + worker = logicalrep_worker_find(MyLogicalRepWorker->dbid, + MyLogicalRepWorker->subid, relid, false); LWLockRelease(LogicalRepWorkerLock); if (!worker) @@ -238,7 +241,8 @@ wait_for_worker_state_change(char expected_state) * waiting. */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(MyLogicalRepWorker->subid, + worker = logicalrep_worker_find(MyLogicalRepWorker->dbid, + MyLogicalRepWorker->subid, InvalidOid, false); if (worker && worker->proc) logicalrep_worker_wakeup_ptr(worker); @@ -484,7 +488,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid, + syncworker = logicalrep_worker_find(MyLogicalRepWorker->dbid, + MyLogicalRepWorker->subid, rstate->relid, false); if (syncworker) diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index dcb1108579..902841efe6 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -91,11 +91,12 @@ static SQLCmd *make_sqlcmd(void); %token K_USE_SNAPSHOT %token K_MANIFEST %token K_MANIFEST_CHECKSUMS +%token K_LIST_SLOTS %type command %type base_backup start_replication start_logical_replication create_replication_slot drop_replication_slot identify_system - read_replication_slot timeline_history show sql_cmd + read_replication_slot timeline_history show sql_cmd list_slots %type base_backup_legacy_opt_list generic_option_list %type base_backup_legacy_opt generic_option %type opt_timeline @@ -106,6 +107,7 @@ static SQLCmd *make_sqlcmd(void); %type opt_temporary %type create_slot_options create_slot_legacy_opt_list %type create_slot_legacy_opt +%type slot_name_list slot_name_list_opt %% @@ -129,6 +131,7 @@ command: | read_replication_slot | timeline_history | show + | list_slots | sql_cmd ; @@ -142,6 +145,33 @@ identify_system: } ; +slot_name_list: + IDENT + { + $$ = list_make1($1); + } + | slot_name_list ',' IDENT + { + $$ = lappend($1, $3); + } + +slot_name_list_opt: + slot_name_list { $$ = $1; } + | /* EMPTY */ { $$ = NIL; } + ; + +/* + * LIST_SLOTS + */ +list_slots: + K_LIST_SLOTS slot_name_list_opt + { + ListSlotsCmd *cmd = makeNode(ListSlotsCmd); + cmd->slot_names = $2; + $$ = (Node *) cmd; + } + ; + /* * READ_REPLICATION_SLOT %s */ diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index 1b599c255e..9ee638355d 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -85,6 +85,7 @@ identifier {ident_start}{ident_cont}* BASE_BACKUP { return K_BASE_BACKUP; } FAST { return K_FAST; } IDENTIFY_SYSTEM { return K_IDENTIFY_SYSTEM; } +LIST_SLOTS { return K_LIST_SLOTS; } READ_REPLICATION_SLOT { return K_READ_REPLICATION_SLOT; } SHOW { return K_SHOW; } LABEL { return K_LABEL; } diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index d11daeb1fc..e93fea55ad 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -484,7 +484,7 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto) * WAL and removal of old catalog tuples. As decoding is done in fast_forward * mode, no changes are generated anyway. */ -static XLogRecPtr +XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr moveto) { LogicalDecodingContext *ctx; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 84915ed95b..2fce290ed6 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -456,6 +456,194 @@ IdentifySystem(void) end_tup_output(tstate); } +static int +pg_qsort_namecmp(const void *a, const void *b) +{ + return strncmp(NameStr(*(Name) a), NameStr(*(Name) b), NAMEDATALEN); +} + +/* + * Handle the LIST_SLOTS command. + */ +static void +ListSlots(ListSlotsCmd *cmd) +{ + DestReceiver *dest; + TupOutputState *tstate; + TupleDesc tupdesc; + NameData *slot_names; + int numslot_names; + + numslot_names = list_length(cmd->slot_names); + if (numslot_names) + { + ListCell *lc; + int i = 0; + + slot_names = palloc(numslot_names * sizeof(NameData)); + foreach(lc, cmd->slot_names) + { + char *slot_name = lfirst(lc); + + ReplicationSlotValidateName(slot_name, ERROR); + namestrcpy(&slot_names[i++], slot_name); + } + + qsort(slot_names, numslot_names, sizeof(NameData), pg_qsort_namecmp); + } + + dest = CreateDestReceiver(DestRemoteSimple); + + /* need a tuple descriptor representing four columns */ + tupdesc = CreateTemplateTupleDesc(10); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "plugin", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "slot_type", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "datoid", + INT8OID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 5, "database", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 6, "temporary", + INT4OID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 7, "xmin", + INT8OID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 8, "catalog_xmin", + INT8OID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 9, "restart_lsn", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 10, "confirmed_flush", + TEXTOID, -1, 0); + + /* prepare for projection of tuples */ + tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (int slotno = 0; slotno < max_replication_slots; slotno++) + { + ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno]; + char restart_lsn_str[MAXFNAMELEN]; + char confirmed_flush_lsn_str[MAXFNAMELEN]; + Datum values[10]; + bool nulls[10]; + + ReplicationSlotPersistency persistency; + TransactionId xmin; + TransactionId catalog_xmin; + XLogRecPtr restart_lsn; + XLogRecPtr confirmed_flush_lsn; + Oid datoid; + NameData slot_name; + NameData plugin; + int i; + int64 tmpbigint; + + if (!slot->in_use) + continue; + + SpinLockAcquire(&slot->mutex); + + xmin = slot->data.xmin; + catalog_xmin = slot->data.catalog_xmin; + datoid = slot->data.database; + restart_lsn = slot->data.restart_lsn; + confirmed_flush_lsn = slot->data.confirmed_flush; + namestrcpy(&slot_name, NameStr(slot->data.name)); + namestrcpy(&plugin, NameStr(slot->data.plugin)); + persistency = slot->data.persistency; + + SpinLockRelease(&slot->mutex); + + if (numslot_names && + !bsearch((void *) &slot_name, (void *) slot_names, + numslot_names, sizeof(NameData), pg_qsort_namecmp)) + continue; + + memset(nulls, 0, sizeof(nulls)); + + i = 0; + values[i++] = CStringGetTextDatum(NameStr(slot_name)); + + if (datoid == InvalidOid) + nulls[i++] = true; + else + values[i++] = CStringGetTextDatum(NameStr(plugin)); + + if (datoid == InvalidOid) + values[i++] = CStringGetTextDatum("physical"); + else + values[i++] = CStringGetTextDatum("logical"); + + if (datoid == InvalidOid) + nulls[i++] = true; + else + { + tmpbigint = datoid; + values[i++] = Int64GetDatum(tmpbigint); + } + + if (datoid == InvalidOid) + nulls[i++] = true; + else + { + MemoryContext cur = CurrentMemoryContext; + + /* syscache access needs a transaction env. */ + StartTransactionCommand(); + /* make dbname live outside TX context */ + MemoryContextSwitchTo(cur); + values[i++] = CStringGetTextDatum(get_database_name(datoid)); + CommitTransactionCommand(); + /* CommitTransactionCommand switches to TopMemoryContext */ + MemoryContextSwitchTo(cur); + } + + values[i++] = Int32GetDatum(persistency == RS_TEMPORARY ? 1 : 0); + + if (xmin != InvalidTransactionId) + { + tmpbigint = xmin; + values[i++] = Int64GetDatum(tmpbigint); + } + else + nulls[i++] = true; + + if (catalog_xmin != InvalidTransactionId) + { + tmpbigint = catalog_xmin; + values[i++] = Int64GetDatum(tmpbigint); + } + else + nulls[i++] = true; + + if (restart_lsn != InvalidXLogRecPtr) + { + snprintf(restart_lsn_str, sizeof(restart_lsn_str), "%X/%X", + LSN_FORMAT_ARGS(restart_lsn)); + values[i++] = CStringGetTextDatum(restart_lsn_str); + } + else + nulls[i++] = true; + + if (confirmed_flush_lsn != InvalidXLogRecPtr) + { + snprintf(confirmed_flush_lsn_str, sizeof(confirmed_flush_lsn_str), + "%X/%X", LSN_FORMAT_ARGS(confirmed_flush_lsn)); + values[i++] = CStringGetTextDatum(confirmed_flush_lsn_str); + } + else + nulls[i++] = true; + + /* send it to dest */ + do_tup_output(tstate, values, nulls); + } + LWLockRelease(ReplicationSlotControlLock); + + end_tup_output(tstate); +} + /* Handle READ_REPLICATION_SLOT command */ static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd) @@ -554,7 +742,6 @@ ReadReplicationSlot(ReadReplicationSlotCmd *cmd) end_tup_output(tstate); } - /* * Handle TIMELINE_HISTORY command. */ @@ -1749,6 +1936,13 @@ exec_replication_command(const char *cmd_string) EndReplicationCommand(cmdtag); break; + case T_ListSlotsCmd: + cmdtag = "LIST_SLOTS"; + set_ps_display(cmdtag); + ListSlots((ListSlotsCmd *) cmd_node); + EndReplicationCommand(cmdtag); + break; + case T_StartReplicationCmd: { StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node; diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c index 4d53f040e8..6922353c94 100644 --- a/src/backend/utils/activity/wait_event.c +++ b/src/backend/utils/activity/wait_event.c @@ -230,6 +230,9 @@ pgstat_get_wait_activity(WaitEventActivity w) case WAIT_EVENT_LOGICAL_LAUNCHER_MAIN: event_name = "LogicalLauncherMain"; break; + case WAIT_EVENT_REPL_SLOT_SYNC_MAIN: + event_name = "ReplSlotSyncMain"; + break; case WAIT_EVENT_PGSTAT_MAIN: event_name = "PgStatMain"; break; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index f9504d3aec..0234f5ab87 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -75,6 +75,7 @@ #include "postmaster/syslogger.h" #include "postmaster/walwriter.h" #include "replication/logicallauncher.h" +#include "replication/logicalworker.h" #include "replication/reorderbuffer.h" #include "replication/slot.h" #include "replication/syncrep.h" @@ -4636,6 +4637,28 @@ static struct config_string ConfigureNamesString[] = check_backtrace_functions, assign_backtrace_functions, NULL }, + { + {"synchronize_slot_names", PGC_SIGHUP, REPLICATION_STANDBY, + gettext_noop("Sets the names of replication slots which to synchronize from primary to standby."), + gettext_noop("Value of \"*\" means all."), + GUC_LIST_INPUT | GUC_LIST_QUOTE + }, + &synchronize_slot_names, + "", + check_synchronize_slot_names, NULL, NULL + }, + + { + {"standby_slot_names", PGC_SIGHUP, REPLICATION_PRIMARY, + gettext_noop("List of physical slots that must confirm changes before changes are sent to logical replication consumers."), + NULL, + GUC_LIST_INPUT | GUC_LIST_QUOTE + }, + &standby_slot_names, + "", + check_standby_slot_names, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index a1acd46b61..e8b5f76125 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -315,6 +315,7 @@ # and comma-separated list of application_name # from standby(s); '*' = all #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed +#standby_slot_names = '' # physical standby slot names that logical replication waits for # - Standby Servers - @@ -343,6 +344,7 @@ #wal_retrieve_retry_interval = 5s # time to wait before retrying to # retrieve WAL after a failed attempt #recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery +#synchronize_slot_names = '' # logical replication slots to sync to standby # - Subscribers - diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index aec7e478ab..1cc19e0c99 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -17,6 +17,7 @@ #include "catalog/objectaddress.h" #include "parser/parse_node.h" +#include "replication/walreceiver.h" extern ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel); @@ -26,4 +27,6 @@ extern void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel); extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId); extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId); +extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); + #endif /* SUBSCRIPTIONCMDS_H */ diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 7c657c1241..920a510c4c 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -501,6 +501,7 @@ typedef enum NodeTag T_StartReplicationCmd, T_TimeLineHistoryCmd, T_SQLCmd, + T_ListSlotsCmd, /* * TAGS FOR RANDOM OTHER STUFF diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index a746fafc12..3f81409e50 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -33,6 +33,15 @@ typedef struct IdentifySystemCmd NodeTag type; } IdentifySystemCmd; +/* ---------------------- + * LIST_SLOTS command + * ---------------------- + */ +typedef struct ListSlotsCmd +{ + NodeTag type; + List *slot_names; +} ListSlotsCmd; /* ---------------------- * BASE_BACKUP command diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index 2ad61a001a..f5b5ef07e8 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -12,8 +12,17 @@ #ifndef LOGICALWORKER_H #define LOGICALWORKER_H +#include "utils/guc.h" + +extern char *synchronize_slot_names; +extern char *standby_slot_names; + extern void ApplyWorkerMain(Datum main_arg); +extern void ReplSlotSyncMain(Datum main_arg); extern bool IsLogicalWorker(void); +extern bool check_synchronize_slot_names(char **newval, void **extra, GucSource source); +extern bool check_standby_slot_names(char **newval, void **extra, GucSource source); + #endif /* LOGICALWORKER_H */ diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 53d773ccff..a29e517707 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -216,7 +216,6 @@ extern void ReplicationSlotsDropDBSlots(Oid dboid); extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno); extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock); extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot); -extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); extern void StartupReplicationSlots(void); extern void CheckPointReplicationSlots(void); @@ -224,4 +223,7 @@ extern void CheckPointReplicationSlots(void); extern void CheckSlotRequirements(void); extern void CheckSlotPermissions(void); +extern XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr moveto); + + #endif /* SLOT_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 0b607ed777..dbeb447e7a 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -18,6 +18,7 @@ #include "pgtime.h" #include "port/atomics.h" #include "replication/logicalproto.h" +#include "replication/slot.h" #include "replication/walsender.h" #include "storage/condition_variable.h" #include "storage/latch.h" @@ -187,6 +188,13 @@ typedef struct } proto; } WalRcvStreamOptions; +/* + * Slot information receiver from remote. + * + * Currently this is same as ReplicationSlotPersistentData + */ +#define WalRecvReplicationSlotData ReplicationSlotPersistentData + struct WalReceiverConn; typedef struct WalReceiverConn WalReceiverConn; @@ -274,6 +282,11 @@ typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn, typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn, TimeLineID *primary_tli); +/* + * TODO + */ +typedef List *(*walrcv_list_slots_fn) (WalReceiverConn *conn, const char *slots); + /* * walrcv_server_version_fn * @@ -387,6 +400,7 @@ typedef struct WalReceiverFunctionsType walrcv_get_conninfo_fn walrcv_get_conninfo; walrcv_get_senderinfo_fn walrcv_get_senderinfo; walrcv_identify_system_fn walrcv_identify_system; + walrcv_list_slots_fn walrcv_list_slots; walrcv_server_version_fn walrcv_server_version; walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile; walrcv_startstreaming_fn walrcv_startstreaming; @@ -411,6 +425,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port) #define walrcv_identify_system(conn, primary_tli) \ WalReceiverFunctions->walrcv_identify_system(conn, primary_tli) +#define walrcv_list_slots(conn, slots) \ + WalReceiverFunctions->walrcv_list_slots(conn, slots) #define walrcv_server_version(conn) \ WalReceiverFunctions->walrcv_server_version(conn) #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 9d29849d80..5de7f80e79 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -58,7 +58,7 @@ typedef struct LogicalRepWorker * exits. Under this, separate buffiles would be created for each * transaction which will be deleted after the transaction is finished. */ - FileSet *stream_fileset; + struct FileSet *stream_fileset; /* Stats. */ XLogRecPtr last_lsn; @@ -81,13 +81,13 @@ extern LogicalRepWorker *MyLogicalRepWorker; extern bool in_remote_transaction; extern void logicalrep_worker_attach(int slot); -extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, +extern LogicalRepWorker *logicalrep_worker_find(Oid dbid, Oid subid, Oid relid, bool only_running); extern List *logicalrep_workers_find(Oid subid, bool only_running); extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid); -extern void logicalrep_worker_stop(Oid subid, Oid relid); -extern void logicalrep_worker_wakeup(Oid subid, Oid relid); +extern void logicalrep_worker_stop(Oid dbid, Oid subid, Oid relid); +extern void logicalrep_worker_wakeup(Oid dbid, Oid subid, Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); extern int logicalrep_sync_worker_count(Oid subid); diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h index 8785a8e12c..3274eade53 100644 --- a/src/include/utils/wait_event.h +++ b/src/include/utils/wait_event.h @@ -42,6 +42,7 @@ typedef enum WAIT_EVENT_CHECKPOINTER_MAIN, WAIT_EVENT_LOGICAL_APPLY_MAIN, WAIT_EVENT_LOGICAL_LAUNCHER_MAIN, + WAIT_EVENT_REPL_SLOT_SYNC_MAIN, WAIT_EVENT_PGSTAT_MAIN, WAIT_EVENT_RECOVERY_WAL_STREAM, WAIT_EVENT_SYSLOGGER_MAIN, diff --git a/src/test/recovery/t/030_slot_sync.pl b/src/test/recovery/t/030_slot_sync.pl new file mode 100644 index 0000000000..c87e7dc016 --- /dev/null +++ b/src/test/recovery/t/030_slot_sync.pl @@ -0,0 +1,58 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More tests => 2; + +my $node_primary = PostgreSQL::Test::Cluster->new('primary'); +$node_primary->init(allows_streaming => 'logical'); +$node_primary->append_conf('postgresql.conf', "standby_slot_names = 'pslot1'"); +$node_primary->start; +$node_primary->psql('postgres', q{SELECT pg_create_physical_replication_slot('pslot1');}); + +$node_primary->backup('backup'); + +my $node_phys_standby = PostgreSQL::Test::Cluster->new('phys_standby'); +$node_phys_standby->init_from_backup($node_primary, 'backup', has_streaming => 1); +$node_phys_standby->append_conf('postgresql.conf', "synchronize_slot_names = '*'"); +$node_phys_standby->append_conf('postgresql.conf', "primary_slot_name = 'pslot1'"); +$node_phys_standby->start; + +$node_primary->safe_psql('postgres', "CREATE TABLE t1 (a int PRIMARY KEY)"); +$node_primary->safe_psql('postgres', "INSERT INTO t1 VALUES (1), (2), (3)"); + +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +$node_subscriber->safe_psql('postgres', "CREATE TABLE t1 (a int PRIMARY KEY)"); + +$node_primary->safe_psql('postgres', "CREATE PUBLICATION pub1 FOR ALL TABLES"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub1 CONNECTION '" . ($node_primary->connstr . ' dbname=postgres') . "' PUBLICATION pub1"); + +# Wait for initial sync of all subscriptions +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = $node_primary->safe_psql('postgres', + "SELECT slot_name, plugin, database FROM pg_replication_slots WHERE slot_type = 'logical'"); + +is($result, qq(sub1|pgoutput|postgres), 'logical slot on primary'); + +# FIXME: standby needs restart to pick up new slots +$node_phys_standby->restart; +sleep 3; + +$result = $node_phys_standby->safe_psql('postgres', + "SELECT slot_name, plugin, database FROM pg_replication_slots"); + +is($result, qq(sub1|pgoutput|postgres), 'logical slot on standby'); + +$node_primary->safe_psql('postgres', "INSERT INTO t1 VALUES (4), (5), (6)"); +$node_primary->wait_for_catchup('sub1'); -- 2.34.1