From 655359eedf37d8f2e522aeb1ec8c48adfc1759b1 Mon Sep 17 00:00:00 2001 From: Bertrand Drouvot Date: Thu, 13 Apr 2023 11:32:28 +0000 Subject: [PATCH v5] Synchronize logical replication slots from primary to standby --- doc/src/sgml/config.sgml | 34 ++ src/backend/commands/subscriptioncmds.c | 4 +- src/backend/postmaster/bgworker.c | 3 + .../libpqwalreceiver/libpqwalreceiver.c | 95 ++++ src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/launcher.c | 263 +++++++---- src/backend/replication/logical/meson.build | 1 + .../replication/logical/reorderbuffer.c | 86 ++++ src/backend/replication/logical/slotsync.c | 413 ++++++++++++++++++ src/backend/replication/logical/tablesync.c | 13 +- src/backend/replication/logical/worker.c | 3 +- src/backend/replication/repl_gram.y | 32 +- src/backend/replication/repl_scanner.l | 2 + src/backend/replication/slotfuncs.c | 2 +- src/backend/replication/walsender.c | 195 +++++++++ src/backend/utils/activity/wait_event.c | 3 + src/backend/utils/misc/guc_tables.c | 26 ++ src/backend/utils/misc/postgresql.conf.sample | 2 + src/include/commands/subscriptioncmds.h | 3 + src/include/nodes/replnodes.h | 9 + src/include/replication/logicallauncher.h | 2 + src/include/replication/logicalworker.h | 9 + src/include/replication/slot.h | 5 +- src/include/replication/walreceiver.h | 20 + src/include/replication/worker_internal.h | 8 +- src/include/utils/wait_event.h | 1 + src/test/recovery/meson.build | 1 + src/test/recovery/t/037_slot_sync.pl | 130 ++++++ 28 files changed, 1272 insertions(+), 94 deletions(-) 3.8% doc/src/sgml/ 7.1% src/backend/replication/libpqwalreceiver/ 54.7% src/backend/replication/logical/ 14.9% src/backend/replication/ 3.3% src/backend/ 4.0% src/include/replication/ 10.9% src/test/recovery/t/ diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 091a79d4f3..1360885208 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4466,6 +4466,23 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows + + standby_slot_names (string) + + standby_slot_names configuration parameter + + + + + List of physical replication slots that logical replication waits for. + If a logical replication connection is meant to switch to a physical + standby after the standby is promoted, the physical replication slot + for the standby should be listed here. This ensures that logical + replication is not ahead of the physical standby. + + + + @@ -4649,6 +4666,23 @@ ANY num_sync ( + synchronize_slot_names (string) + + synchronize_slot_names configuration parameter + + + + + Specifies a list of logical replication slots that a physical standby + should synchronize from the primary server. This is necessary to be + able to retarget those logical replication connections to this standby + if it gets promoted. Specify * to synchronize all + logical replication slots. The default is empty. + + + + diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 3251d89ba8..8721706b79 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -991,7 +991,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, RemoveSubscriptionRel(sub->oid, relid); - logicalrep_worker_stop(sub->oid, relid); + logicalrep_worker_stop(MyDatabaseId, sub->oid, relid); /* * For READY state, we would have already dropped the @@ -1589,7 +1589,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) { LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); - logicalrep_worker_stop(w->subid, w->relid); + logicalrep_worker_stop(w->dbid, w->subid, w->relid); } list_free(subworkers); diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 0dd22b2351..a89d1f10a1 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -129,6 +129,9 @@ static const struct { "ApplyWorkerMain", ApplyWorkerMain }, + { + "ReplSlotSyncMain", ReplSlotSyncMain + }, { "ParallelApplyWorkerMain", ParallelApplyWorkerMain } diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 052505e46f..4f7417c49a 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -34,6 +34,7 @@ #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/tuplestore.h" +#include "utils/varlena.h" PG_MODULE_MAGIC; @@ -58,6 +59,7 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port); static char *libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli); +static List *libpqrcv_list_slots(WalReceiverConn *conn, const char *slot_names); static int libpqrcv_server_version(WalReceiverConn *conn); static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, @@ -96,6 +98,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { .walrcv_receive = libpqrcv_receive, .walrcv_send = libpqrcv_send, .walrcv_create_slot = libpqrcv_create_slot, + .walrcv_list_slots = libpqrcv_list_slots, .walrcv_get_backend_pid = libpqrcv_get_backend_pid, .walrcv_exec = libpqrcv_exec, .walrcv_disconnect = libpqrcv_disconnect @@ -409,6 +412,98 @@ libpqrcv_server_version(WalReceiverConn *conn) return PQserverVersion(conn->streamConn); } +/* + * Get list of slots from primary. + */ +static List * +libpqrcv_list_slots(WalReceiverConn *conn, const char *slot_names) +{ + PGresult *res; + List *slotlist = NIL; + int ntuples; + StringInfoData s; + WalRecvReplicationSlotData *slot_data; + + initStringInfo(&s); + appendStringInfoString(&s, "LIST_SLOTS"); + + if (strcmp(slot_names, "") != 0 && strcmp(slot_names, "*") != 0) + { + char *rawname; + List *namelist; + ListCell *lc; + + appendStringInfoChar(&s, ' '); + rawname = pstrdup(slot_names); + SplitIdentifierString(rawname, ',', &namelist); + foreach (lc, namelist) + { + if (lc != list_head(namelist)) + appendStringInfoChar(&s, ','); + appendStringInfo(&s, "%s", + quote_identifier(lfirst(lc))); + } + } + + res = libpqrcv_PQexec(conn->streamConn, s.data); + pfree(s.data); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + PQclear(res); + ereport(ERROR, + (errmsg("could not receive list of slots the primary server: %s", + pchomp(PQerrorMessage(conn->streamConn))))); + } + if (PQnfields(res) < 10) + { + int nfields = PQnfields(res); + + PQclear(res); + ereport(ERROR, + (errmsg("invalid response from primary server"), + errdetail("Could not get list of slots: got %d fields, expected %d or more fields.", + nfields, 10))); + } + + ntuples = PQntuples(res); + for (int i = 0; i < ntuples; i++) + { + char *slot_type; + + slot_data = palloc0(sizeof(WalRecvReplicationSlotData)); + namestrcpy(&slot_data->persistent_data.name, PQgetvalue(res, i, 0)); + if (!PQgetisnull(res, i, 1)) + namestrcpy(&slot_data->persistent_data.plugin, PQgetvalue(res, i, 1)); + slot_type = PQgetvalue(res, i, 2); + if (!PQgetisnull(res, i, 3)) + slot_data->persistent_data.database = atooid(PQgetvalue(res, i, 3)); + if (strcmp(slot_type, "physical") == 0) + { + if (OidIsValid(slot_data->persistent_data.database)) + elog(ERROR, "unexpected physical replication slot with database set"); + } + if (pg_strtoint32(PQgetvalue(res, i, 5)) == 1) + slot_data->persistent_data.persistency = RS_TEMPORARY; + else + slot_data->persistent_data.persistency = RS_PERSISTENT; + if (!PQgetisnull(res, i, 6)) + slot_data->persistent_data.xmin = atooid(PQgetvalue(res, i, 6)); + if (!PQgetisnull(res, i, 7)) + slot_data->persistent_data.catalog_xmin = atooid(PQgetvalue(res, i, 7)); + if (!PQgetisnull(res, i, 8)) + slot_data->persistent_data.restart_lsn = strtou64(PQgetvalue(res, i, 8), NULL, 10); + if (!PQgetisnull(res, i, 9)) + slot_data->persistent_data.confirmed_flush = strtou64(PQgetvalue(res, i, 9), NULL, 10); + + slot_data->last_sync_time = 0; + slotlist = lappend(slotlist, slot_data); + } + + PQclear(res); + + return slotlist; +} + /* * Start streaming WAL data from given streaming options. * diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index 2dc25e37bb..ba03eeff1c 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -25,6 +25,7 @@ OBJS = \ proto.o \ relation.o \ reorderbuffer.o \ + slotsync.o \ snapbuild.o \ tablesync.o \ worker.o diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 970d170e73..14af724639 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -22,6 +22,7 @@ #include "access/htup_details.h" #include "access/tableam.h" #include "access/xact.h" +#include "catalog/pg_authid.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "funcapi.h" @@ -246,7 +247,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, * We are only interested in the leader apply worker or table sync worker. */ LogicalRepWorker * -logicalrep_worker_find(Oid subid, Oid relid, bool only_running) +logicalrep_worker_find(Oid dbid, Oid subid, Oid relid, bool only_running) { int i; LogicalRepWorker *res = NULL; @@ -262,8 +263,8 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running) if (isParallelApplyWorker(w)) continue; - if (w->in_use && w->subid == subid && w->relid == relid && - (!only_running || w->proc)) + if (w->in_use && w->dbid == dbid && w->subid == subid && + w->relid == relid && (!only_running || w->proc)) { res = w; break; @@ -320,9 +321,13 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, /* Sanity check - tablesync worker cannot be a subworker */ Assert(!(is_parallel_apply_worker && OidIsValid(relid))); - ereport(DEBUG1, - (errmsg_internal("starting logical replication worker for subscription \"%s\"", - subname))); + if (OidIsValid(subid)) + ereport(DEBUG1, + (errmsg_internal("starting logical replication worker for subscription \"%s\"", + subname))); + else + ereport(DEBUG1, + (errmsg_internal("starting replication slot synchronization worker"))); /* Report this after the initial starting message for consistency. */ if (max_replication_slots == 0) @@ -359,7 +364,9 @@ retry: * reason we do this is because if some worker failed to start up and its * parent has crashed while waiting, the in_use state was never cleared. */ - if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription) + if (worker == NULL || + (OidIsValid(relid) && + nsyncworkers >= max_sync_workers_per_subscription)) { bool did_cleanup = false; @@ -455,15 +462,20 @@ retry: memset(&bgw, 0, sizeof(bgw)); bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; - bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + bgw.bgw_start_time = BgWorkerStart_ConsistentState; snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); - if (is_parallel_apply_worker) + if (!OidIsValid(subid)) + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ReplSlotSyncMain"); + else if (is_parallel_apply_worker) snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain"); else snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain"); - if (OidIsValid(relid)) + if (!OidIsValid(subid)) + snprintf(bgw.bgw_name, BGW_MAXLEN, + "replication slot synchronization worker"); + else if (OidIsValid(relid)) snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication worker for subscription %u sync %u", subid, relid); else if (is_parallel_apply_worker) @@ -591,13 +603,13 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) * Stop the logical replication worker for subid/relid, if any. */ void -logicalrep_worker_stop(Oid subid, Oid relid) +logicalrep_worker_stop(Oid dbid, Oid subid, Oid relid) { LogicalRepWorker *worker; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(subid, relid, false); + worker = logicalrep_worker_find(dbid, subid, relid, false); if (worker) { @@ -640,13 +652,13 @@ logicalrep_pa_worker_stop(int slot_no, uint16 generation) * Wake up (using latch) any logical replication worker for specified sub/rel. */ void -logicalrep_worker_wakeup(Oid subid, Oid relid) +logicalrep_worker_wakeup(Oid dbid, Oid subid, Oid relid) { LogicalRepWorker *worker; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(subid, relid, true); + worker = logicalrep_worker_find(dbid, subid, relid, true); if (worker) logicalrep_worker_wakeup_ptr(worker); @@ -888,7 +900,7 @@ ApplyLauncherRegister(void) memset(&bgw, 0, sizeof(bgw)); bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; - bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + bgw.bgw_start_time = BgWorkerStart_ConsistentState; snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, @@ -1071,6 +1083,157 @@ ApplyLauncherWakeup(void) kill(LogicalRepCtx->launcher_pid, SIGUSR1); } +static void +ApplyLauncherStartSlotSync(long *wait_time) +{ + WalReceiverConn *wrconn; + char *err; + List *slots; + ListCell *lc; + MemoryContext tmpctx; + MemoryContext oldctx; + + if (strcmp(synchronize_slot_names, "") == 0) + return; + + wrconn = walrcv_connect(PrimaryConnInfo, false, false, + "Logical Replication Launcher", &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the primary server: %s", err))); + + /* Use temporary context for the slot list and worker info. */ + tmpctx = AllocSetContextCreate(TopMemoryContext, + "Logical Replication Launcher slot sync ctx", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(tmpctx); + + slots = walrcv_list_slots(wrconn, synchronize_slot_names); + + foreach(lc, slots) + { + WalRecvReplicationSlotData *slot_data = lfirst(lc); + LogicalRepWorker *w; + TimestampTz last_sync; + TimestampTz now; + long elapsed; + + if (!OidIsValid(slot_data->persistent_data.database)) + continue; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + w = logicalrep_worker_find(slot_data->persistent_data.database, InvalidOid, + InvalidOid, false); + LWLockRelease(LogicalRepWorkerLock); + + if (w != NULL) + continue; /* worker is running already */ + + /* + * If the worker is eligible to start now, launch it. Otherwise, + * adjust wait_time so that we'll wake up as soon as it can be + * started. + * + * Each apply worker can only be restarted once per + * wal_retrieve_retry_interval, so that errors do not cause us to + * repeatedly restart the worker as fast as possible. + */ + last_sync = slot_data->last_sync_time; + now = GetCurrentTimestamp(); + if (last_sync == 0 || + (elapsed = TimestampDifferenceMilliseconds(last_sync, now)) >= wal_retrieve_retry_interval) + { + slot_data->last_sync_time = now; + logicalrep_worker_launch(slot_data->persistent_data.database, + InvalidOid, NULL, + BOOTSTRAP_SUPERUSERID, InvalidOid, + DSM_HANDLE_INVALID); + } + else + { + *wait_time = Min(*wait_time, + wal_retrieve_retry_interval - elapsed); + } + } + + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(tmpctx); + + walrcv_disconnect(wrconn); +} + +static void +ApplyLauncherStartSubs(long *wait_time) +{ + List *sublist; + ListCell *lc; + MemoryContext subctx; + MemoryContext oldctx; + + /* Use temporary context to avoid leaking memory across cycles. */ + subctx = AllocSetContextCreate(TopMemoryContext, + "Logical Replication Launcher sublist", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); + + /* Start the missing workers for enabled subscriptions. */ + sublist = get_subscription_list(); + foreach(lc, sublist) + { + Subscription *sub = (Subscription *) lfirst(lc); + LogicalRepWorker *w; + TimestampTz last_start; + TimestampTz now; + long elapsed; + + if (!sub->enabled) + continue; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + w = logicalrep_worker_find(sub->dbid, sub->oid, InvalidOid, false); + LWLockRelease(LogicalRepWorkerLock); + + if (w != NULL) + continue; /* worker is running already */ + + /* + * If the worker is eligible to start now, launch it. Otherwise, + * adjust wait_time so that we'll wake up as soon as it can be + * started. + * + * Each subscription's apply worker can only be restarted once per + * wal_retrieve_retry_interval, so that errors do not cause us to + * repeatedly restart the worker as fast as possible. In cases + * where a restart is expected (e.g., subscription parameter + * changes), another process should remove the last-start entry + * for the subscription so that the worker can be restarted + * without waiting for wal_retrieve_retry_interval to elapse. + */ + last_start = ApplyLauncherGetWorkerStartTime(sub->oid); + now = GetCurrentTimestamp(); + if (last_start == 0 || + (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval) + { + ApplyLauncherSetWorkerStartTime(sub->oid, now); + logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, + sub->owner, InvalidOid, + DSM_HANDLE_INVALID); + } + else + { + *wait_time = Min(*wait_time, + wal_retrieve_retry_interval - elapsed); + } + } + + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(subctx); +} + /* * Main loop for the apply launcher process. */ @@ -1096,78 +1259,20 @@ ApplyLauncherMain(Datum main_arg) */ BackgroundWorkerInitializeConnection(NULL, NULL, 0); + load_file("libpqwalreceiver", false); + /* Enter main loop */ for (;;) { int rc; - List *sublist; - ListCell *lc; - MemoryContext subctx; - MemoryContext oldctx; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; CHECK_FOR_INTERRUPTS(); - /* Use temporary context to avoid leaking memory across cycles. */ - subctx = AllocSetContextCreate(TopMemoryContext, - "Logical Replication Launcher sublist", - ALLOCSET_DEFAULT_SIZES); - oldctx = MemoryContextSwitchTo(subctx); - - /* Start any missing workers for enabled subscriptions. */ - sublist = get_subscription_list(); - foreach(lc, sublist) - { - Subscription *sub = (Subscription *) lfirst(lc); - LogicalRepWorker *w; - TimestampTz last_start; - TimestampTz now; - long elapsed; - - if (!sub->enabled) - continue; - - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - w = logicalrep_worker_find(sub->oid, InvalidOid, false); - LWLockRelease(LogicalRepWorkerLock); - - if (w != NULL) - continue; /* worker is running already */ - - /* - * If the worker is eligible to start now, launch it. Otherwise, - * adjust wait_time so that we'll wake up as soon as it can be - * started. - * - * Each subscription's apply worker can only be restarted once per - * wal_retrieve_retry_interval, so that errors do not cause us to - * repeatedly restart the worker as fast as possible. In cases - * where a restart is expected (e.g., subscription parameter - * changes), another process should remove the last-start entry - * for the subscription so that the worker can be restarted - * without waiting for wal_retrieve_retry_interval to elapse. - */ - last_start = ApplyLauncherGetWorkerStartTime(sub->oid); - now = GetCurrentTimestamp(); - if (last_start == 0 || - (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval) - { - ApplyLauncherSetWorkerStartTime(sub->oid, now); - logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, - sub->owner, InvalidOid, - DSM_HANDLE_INVALID); - } - else - { - wait_time = Min(wait_time, - wal_retrieve_retry_interval - elapsed); - } - } - - /* Switch back to original memory context. */ - MemoryContextSwitchTo(oldctx); - /* Clean the temporary memory. */ - MemoryContextDelete(subctx); + if (!RecoveryInProgress()) + ApplyLauncherStartSubs(&wait_time); + else + ApplyLauncherStartSlotSync(&wait_time); /* Wait for more work. */ rc = WaitLatch(MyLatch, diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index d48cd4c590..9e52ec421f 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -11,6 +11,7 @@ backend_sources += files( 'proto.c', 'relation.c', 'reorderbuffer.c', + 'slotsync.c', 'snapbuild.c', 'tablesync.c', 'worker.c', diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 9f44974473..1519b0ec64 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -95,11 +95,14 @@ #include "miscadmin.h" #include "pgstat.h" #include "replication/logical.h" +#include "replication/logicalworker.h" #include "replication/reorderbuffer.h" #include "replication/slot.h" #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */ #include "storage/bufmgr.h" #include "storage/fd.h" +#include "storage/ipc.h" +#include "storage/latch.h" #include "storage/sinval.h" #include "utils/builtins.h" #include "utils/combocid.h" @@ -107,6 +110,7 @@ #include "utils/memutils.h" #include "utils/rel.h" #include "utils/relfilenumbermap.h" +#include "utils/varlena.h" /* entry for a hash table we use to map from xid to our transaction state */ @@ -2053,6 +2057,85 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, } } +static void +wait_for_standby_confirmation(XLogRecPtr commit_lsn) +{ + char *rawname; + List *namelist; + ListCell *lc; + XLogRecPtr flush_pos = InvalidXLogRecPtr; + + if (strcmp(standby_slot_names, "") == 0) + return; + + rawname = pstrdup(standby_slot_names); + SplitIdentifierString(rawname, ',', &namelist); + + while (true) + { + int wait_slots_remaining; + XLogRecPtr oldest_flush_pos = InvalidXLogRecPtr; + int rc; + + wait_slots_remaining = list_length(namelist); + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + bool inlist; + + if (!s->in_use) + continue; + + inlist = false; + foreach (lc, namelist) + { + char *name = lfirst(lc); + if (strcmp(name, NameStr(s->data.name)) == 0) + { + inlist = true; + break; + } + } + if (!inlist) + continue; + + SpinLockAcquire(&s->mutex); + + if (s->data.database == InvalidOid) + /* Physical slots advance restart_lsn on flush and ignore confirmed_flush_lsn */ + flush_pos = s->data.restart_lsn; + else + /* For logical slots we must wait for commit and flush */ + flush_pos = s->data.confirmed_flush; + + SpinLockRelease(&s->mutex); + + /* We want to find out the min(flush pos) over all named slots */ + if (oldest_flush_pos == InvalidXLogRecPtr + || oldest_flush_pos > flush_pos) + oldest_flush_pos = flush_pos; + + if (flush_pos >= commit_lsn && wait_slots_remaining > 0) + wait_slots_remaining --; + } + LWLockRelease(ReplicationSlotControlLock); + + if (wait_slots_remaining == 0) + return; + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + 1000L, PG_WAIT_EXTENSION); + + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + CHECK_FOR_INTERRUPTS(); + } +} + /* * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN. * @@ -2502,6 +2585,9 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * Call either PREPARE (for two-phase transactions) or COMMIT (for * regular ones). */ + + wait_for_standby_confirmation(commit_lsn); + if (rbtxn_prepared(txn)) rb->prepare(rb, txn, commit_lsn); else diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c new file mode 100644 index 0000000000..529ddb21ae --- /dev/null +++ b/src/backend/replication/logical/slotsync.c @@ -0,0 +1,413 @@ +/*------------------------------------------------------------------------- + * slotsync.c + * PostgreSQL worker for synchronizing slots to a standby from primary + * + * Copyright (c) 2016-2018, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/slotsync.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "commands/dbcommands.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "replication/logicallauncher.h" +#include "replication/logicalworker.h" +#include "replication/walreceiver.h" +#include "replication/worker_internal.h" +#include "storage/ipc.h" +#include "storage/procarray.h" +#include "utils/builtins.h" +#include "utils/pg_lsn.h" +#include "utils/varlena.h" + +char *synchronize_slot_names; +char *standby_slot_names; + +/* + * Wait for remote slot to pass localy reserved position. + */ +static void +wait_for_primary_slot_catchup(WalReceiverConn *wrconn, char *slot_name, + XLogRecPtr min_lsn) +{ + WalRcvExecResult *res; + TupleTableSlot *slot; + Oid slotRow[1] = {LSNOID}; + StringInfoData cmd; + bool isnull; + XLogRecPtr restart_lsn; + + for (;;) + { + int rc; + + CHECK_FOR_INTERRUPTS(); + + initStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT restart_lsn" + " FROM pg_catalog.pg_replication_slots" + " WHERE slot_name = %s", + quote_literal_cstr(slot_name)); + res = walrcv_exec(wrconn, cmd.data, 1, slotRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch slot info for slot \"%s\" from primary: %s", + slot_name, res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + ereport(ERROR, + (errmsg("slot \"%s\" disapeared from provider", + slot_name))); + + restart_lsn = DatumGetLSN(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + ExecClearTuple(slot); + walrcv_clear_result(res); + + if (restart_lsn >= min_lsn) + break; + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + wal_retrieve_retry_interval, + WAIT_EVENT_REPL_SLOT_SYNC_MAIN); + + ResetLatch(MyLatch); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + } +} + +/* + * Synchronize single slot to given position. + * + * This optionally creates new slot if there is no existing one. + */ +static void +synchronize_one_slot(WalReceiverConn *wrconn, char *slot_name, char *database, + char *plugin_name, XLogRecPtr target_lsn) +{ + bool found = false; + XLogRecPtr endlsn; + + /* Search for the named slot and mark it active if we find it. */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (!s->in_use) + continue; + + if (strcmp(NameStr(s->data.name), slot_name) == 0) + { + found = true; + break; + } + } + LWLockRelease(ReplicationSlotControlLock); + + StartTransactionCommand(); + + /* Already existing slot, acquire */ + if (found) + { + ReplicationSlotAcquire(slot_name, true); + + if (target_lsn < MyReplicationSlot->data.confirmed_flush) + { + elog(DEBUG1, + "not synchronizing slot %s; synchronization would move it backward", + slot_name); + + ReplicationSlotRelease(); + CommitTransactionCommand(); + return; + } + } + /* Otherwise create the slot first. */ + else + { + TransactionId xmin_horizon = InvalidTransactionId; + ReplicationSlot *slot; + + ReplicationSlotCreate(slot_name, true, RS_EPHEMERAL, false); + slot = MyReplicationSlot; + + SpinLockAcquire(&slot->mutex); + slot->data.database = get_database_oid(database, false); + namestrcpy(&slot->data.plugin, plugin_name); + SpinLockRelease(&slot->mutex); + + ReplicationSlotReserveWal(); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + xmin_horizon = GetOldestSafeDecodingTransactionId(true); + slot->effective_catalog_xmin = xmin_horizon; + slot->data.catalog_xmin = xmin_horizon; + ReplicationSlotsComputeRequiredXmin(true); + LWLockRelease(ProcArrayLock); + + if (target_lsn < MyReplicationSlot->data.restart_lsn) + { + ereport(LOG, + errmsg("waiting for remote slot \"%s\" LSN (%X/%X) to pass local slot LSN (%X/%X)", + slot_name, + LSN_FORMAT_ARGS(target_lsn), LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn))); + + wait_for_primary_slot_catchup(wrconn, slot_name, + MyReplicationSlot->data.restart_lsn); + } + + ReplicationSlotPersist(); + } + + endlsn = pg_logical_replication_slot_advance(target_lsn); + + elog(DEBUG3, "synchronized slot %s to lsn (%X/%X)", + slot_name, LSN_FORMAT_ARGS(endlsn)); + + ReplicationSlotRelease(); + CommitTransactionCommand(); +} + +static void +synchronize_slots(void) +{ + WalRcvExecResult *res; + WalReceiverConn *wrconn = NULL; + TupleTableSlot *slot; + Oid slotRow[3] = {TEXTOID, TEXTOID, LSNOID}; + StringInfoData s; + char *database; + char *err; + MemoryContext oldctx = CurrentMemoryContext; + + if (!WalRcv) + return; + + /* syscache access needs a transaction env. */ + StartTransactionCommand(); + /* make dbname live outside TX context */ + MemoryContextSwitchTo(oldctx); + + database = get_database_name(MyDatabaseId); + initStringInfo(&s); + appendStringInfo(&s, "%s dbname=%s", PrimaryConnInfo, database); + wrconn = walrcv_connect(s.data, true, false, "slot_sync", &err); + + if (wrconn == NULL) + ereport(ERROR, + (errmsg("could not connect to the primary server: %s", err))); + + resetStringInfo(&s); + appendStringInfo(&s, + "SELECT slot_name, plugin, confirmed_flush_lsn" + " FROM pg_catalog.pg_replication_slots" + " WHERE database = %s", + quote_literal_cstr(database)); + if (strcmp(synchronize_slot_names, "") != 0 && strcmp(synchronize_slot_names, "*") != 0) + { + char *rawname; + List *namelist; + ListCell *lc; + + rawname = pstrdup(synchronize_slot_names); + SplitIdentifierString(rawname, ',', &namelist); + + appendStringInfoString(&s, " AND slot_name IN ("); + foreach (lc, namelist) + { + if (lc != list_head(namelist)) + appendStringInfoChar(&s, ','); + appendStringInfo(&s, "%s", + quote_literal_cstr(lfirst(lc))); + } + appendStringInfoChar(&s, ')'); + } + + res = walrcv_exec(wrconn, s.data, 3, slotRow); + pfree(s.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch slot info from primary: %s", + res->err))); + + CommitTransactionCommand(); + /* CommitTransactionCommand switches to TopMemoryContext */ + MemoryContextSwitchTo(oldctx); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *slot_name; + char *plugin_name; + XLogRecPtr confirmed_flush_lsn; + bool isnull; + + slot_name = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + plugin_name = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + confirmed_flush_lsn = DatumGetLSN(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); + + synchronize_one_slot(wrconn, slot_name, database, plugin_name, + confirmed_flush_lsn); + + ExecClearTuple(slot); + } + + walrcv_clear_result(res); + pfree(database); + + walrcv_disconnect(wrconn); +} + +/* + * The main loop of our worker process. + */ +void +ReplSlotSyncMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); + + /* Attach to slot */ + logicalrep_worker_attach(worker_slot); + + /* Establish signal handlers. */ + BackgroundWorkerUnblockSignals(); + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + + /* Connect to our database. */ + BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid, + MyLogicalRepWorker->userid, + 0); + + StartTransactionCommand(); + ereport(LOG, + (errmsg("replication slot synchronization worker for database \"%s\" has started", + get_database_name(MyLogicalRepWorker->dbid)))); + CommitTransactionCommand(); + + /* Main wait loop. */ + for (;;) + { + int rc; + + CHECK_FOR_INTERRUPTS(); + + if (!RecoveryInProgress()) + return; + + if (strcmp(synchronize_slot_names, "") == 0) + return; + + synchronize_slots(); + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + wal_retrieve_retry_interval, + WAIT_EVENT_REPL_SLOT_SYNC_MAIN); + + ResetLatch(MyLatch); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + } +} + +/* + * Routines for handling the GUC variable(s) + */ + +bool +check_synchronize_slot_names(char **newval, void **extra, GucSource source) +{ + /* Special handling for "*" which means all. */ + if (strcmp(*newval, "*") == 0) + { + return true; + } + else + { + char *rawname; + List *namelist; + ListCell *lc; + + /* Need a modifiable copy of string */ + rawname = pstrdup(*newval); + + /* Parse string into list of identifiers */ + if (!SplitIdentifierString(rawname, ',', &namelist)) + { + /* syntax error in name list */ + GUC_check_errdetail("List syntax is invalid."); + pfree(rawname); + list_free(namelist); + return false; + } + + foreach(lc, namelist) + { + char *curname = (char *) lfirst(lc); + + ReplicationSlotValidateName(curname, ERROR); + } + + pfree(rawname); + list_free(namelist); + } + + return true; +} + + +bool +check_standby_slot_names(char **newval, void **extra, GucSource source) +{ + char *rawname; + List *namelist; + ListCell *lc; + + /* Need a modifiable copy of string */ + rawname = pstrdup(*newval); + + /* Parse string into list of identifiers */ + if (!SplitIdentifierString(rawname, ',', &namelist)) + { + /* syntax error in name list */ + GUC_check_errdetail("List syntax is invalid."); + pfree(rawname); + list_free(namelist); + return false; + } + + foreach(lc, namelist) + { + char *curname = (char *) lfirst(lc); + + ReplicationSlotValidateName(curname, ERROR); + } + + pfree(rawname); + list_free(namelist); + + return true; +} diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 6dce355633..2307d187e4 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -100,6 +100,7 @@ #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/copy.h" +#include "commands/subscriptioncmds.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "parser/parse_relation.h" @@ -155,7 +156,8 @@ finish_sync_worker(void) CommitTransactionCommand(); /* Find the leader apply worker and signal it. */ - logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); + logicalrep_worker_wakeup(MyLogicalRepWorker->dbid, + MyLogicalRepWorker->subid, InvalidOid); /* Stop gracefully */ proc_exit(0); @@ -195,7 +197,8 @@ wait_for_relation_state_change(Oid relid, char expected_state) /* Check if the sync worker is still running and bail if not. */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid, + worker = logicalrep_worker_find(MyLogicalRepWorker->dbid, + MyLogicalRepWorker->subid, relid, false); LWLockRelease(LogicalRepWorkerLock); if (!worker) @@ -242,7 +245,8 @@ wait_for_worker_state_change(char expected_state) * waiting. */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(MyLogicalRepWorker->subid, + worker = logicalrep_worker_find(MyLogicalRepWorker->dbid, + MyLogicalRepWorker->subid, InvalidOid, false); if (worker && worker->proc) logicalrep_worker_wakeup_ptr(worker); @@ -508,7 +512,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid, + syncworker = logicalrep_worker_find(MyLogicalRepWorker->dbid, + MyLogicalRepWorker->subid, rstate->relid, false); if (syncworker) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 3d58910c14..b9354bd023 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1600,7 +1600,8 @@ apply_handle_stream_start(StringInfo s) * Signal the leader apply worker, as it may be waiting for * us. */ - logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); + logicalrep_worker_wakeup(MyLogicalRepWorker->dbid, + MyLogicalRepWorker->subid, InvalidOid); } parallel_stream_nchanges = 0; diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index 0c874e33cf..12a4b74368 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -76,11 +76,12 @@ Node *replication_parse_result; %token K_EXPORT_SNAPSHOT %token K_NOEXPORT_SNAPSHOT %token K_USE_SNAPSHOT +%token K_LIST_SLOTS %type command %type base_backup start_replication start_logical_replication create_replication_slot drop_replication_slot identify_system - read_replication_slot timeline_history show + read_replication_slot timeline_history show list_slots %type generic_option_list %type generic_option %type opt_timeline @@ -91,6 +92,7 @@ Node *replication_parse_result; %type opt_temporary %type create_slot_options create_slot_legacy_opt_list %type create_slot_legacy_opt +%type slot_name_list slot_name_list_opt %% @@ -114,6 +116,7 @@ command: | read_replication_slot | timeline_history | show + | list_slots ; /* @@ -126,6 +129,33 @@ identify_system: } ; +slot_name_list: + IDENT + { + $$ = list_make1($1); + } + | slot_name_list ',' IDENT + { + $$ = lappend($1, $3); + } + +slot_name_list_opt: + slot_name_list { $$ = $1; } + | /* EMPTY */ { $$ = NIL; } + ; + +/* + * LIST_SLOTS + */ +list_slots: + K_LIST_SLOTS slot_name_list_opt + { + ListSlotsCmd *cmd = makeNode(ListSlotsCmd); + cmd->slot_names = $2; + $$ = (Node *) cmd; + } + ; + /* * READ_REPLICATION_SLOT %s */ diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index cb467ca46f..9501df38eb 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -128,6 +128,7 @@ DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; } TIMELINE_HISTORY { return K_TIMELINE_HISTORY; } PHYSICAL { return K_PHYSICAL; } RESERVE_WAL { return K_RESERVE_WAL; } +LIST_SLOTS { return K_LIST_SLOTS; } LOGICAL { return K_LOGICAL; } SLOT { return K_SLOT; } TEMPORARY { return K_TEMPORARY; } @@ -304,6 +305,7 @@ replication_scanner_is_replication_command(void) case K_READ_REPLICATION_SLOT: case K_TIMELINE_HISTORY: case K_SHOW: + case K_LIST_SLOTS: /* Yes; push back the first token so we can parse later. */ repl_pushed_back_token = first_token; return true; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 6035cf4816..83ada6db6a 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -467,7 +467,7 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto) * WAL and removal of old catalog tuples. As decoding is done in fast_forward * mode, no changes are generated anyway. */ -static XLogRecPtr +XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr moveto) { LogicalDecodingContext *ctx; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 45b8b3684f..0d01b8967a 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -473,6 +473,194 @@ IdentifySystem(void) end_tup_output(tstate); } +static int +pg_qsort_namecmp(const void *a, const void *b) +{ + return strncmp(NameStr(*(Name) a), NameStr(*(Name) b), NAMEDATALEN); +} + +/* + * Handle the LIST_SLOTS command. + */ +static void +ListSlots(ListSlotsCmd *cmd) +{ + DestReceiver *dest; + TupOutputState *tstate; + TupleDesc tupdesc; + NameData *slot_names; + int numslot_names; + + numslot_names = list_length(cmd->slot_names); + if (numslot_names) + { + ListCell *lc; + int i = 0; + + slot_names = palloc(numslot_names * sizeof(NameData)); + foreach(lc, cmd->slot_names) + { + char *slot_name = lfirst(lc); + + ReplicationSlotValidateName(slot_name, ERROR); + namestrcpy(&slot_names[i++], slot_name); + } + + qsort(slot_names, numslot_names, sizeof(NameData), pg_qsort_namecmp); + } + + dest = CreateDestReceiver(DestRemoteSimple); + + /* need a tuple descriptor representing four columns */ + tupdesc = CreateTemplateTupleDesc(10); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "plugin", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "slot_type", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "datoid", + INT8OID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 5, "database", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 6, "temporary", + INT4OID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 7, "xmin", + INT8OID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 8, "catalog_xmin", + INT8OID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 9, "restart_lsn", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 10, "confirmed_flush", + TEXTOID, -1, 0); + + /* prepare for projection of tuples */ + tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (int slotno = 0; slotno < max_replication_slots; slotno++) + { + ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno]; + char restart_lsn_str[MAXFNAMELEN]; + char confirmed_flush_lsn_str[MAXFNAMELEN]; + Datum values[10]; + bool nulls[10]; + + ReplicationSlotPersistency persistency; + TransactionId xmin; + TransactionId catalog_xmin; + XLogRecPtr restart_lsn; + XLogRecPtr confirmed_flush_lsn; + Oid datoid; + NameData slot_name; + NameData plugin; + int i; + int64 tmpbigint; + + if (!slot->in_use) + continue; + + SpinLockAcquire(&slot->mutex); + + xmin = slot->data.xmin; + catalog_xmin = slot->data.catalog_xmin; + datoid = slot->data.database; + restart_lsn = slot->data.restart_lsn; + confirmed_flush_lsn = slot->data.confirmed_flush; + namestrcpy(&slot_name, NameStr(slot->data.name)); + namestrcpy(&plugin, NameStr(slot->data.plugin)); + persistency = slot->data.persistency; + + SpinLockRelease(&slot->mutex); + + if (numslot_names && + !bsearch((void *) &slot_name, (void *) slot_names, + numslot_names, sizeof(NameData), pg_qsort_namecmp)) + continue; + + memset(nulls, 0, sizeof(nulls)); + + i = 0; + values[i++] = CStringGetTextDatum(NameStr(slot_name)); + + if (datoid == InvalidOid) + nulls[i++] = true; + else + values[i++] = CStringGetTextDatum(NameStr(plugin)); + + if (datoid == InvalidOid) + values[i++] = CStringGetTextDatum("physical"); + else + values[i++] = CStringGetTextDatum("logical"); + + if (datoid == InvalidOid) + nulls[i++] = true; + else + { + tmpbigint = datoid; + values[i++] = Int64GetDatum(tmpbigint); + } + + if (datoid == InvalidOid) + nulls[i++] = true; + else + { + MemoryContext cur = CurrentMemoryContext; + + /* syscache access needs a transaction env. */ + StartTransactionCommand(); + /* make dbname live outside TX context */ + MemoryContextSwitchTo(cur); + values[i++] = CStringGetTextDatum(get_database_name(datoid)); + CommitTransactionCommand(); + /* CommitTransactionCommand switches to TopMemoryContext */ + MemoryContextSwitchTo(cur); + } + + values[i++] = Int32GetDatum(persistency == RS_TEMPORARY ? 1 : 0); + + if (xmin != InvalidTransactionId) + { + tmpbigint = xmin; + values[i++] = Int64GetDatum(tmpbigint); + } + else + nulls[i++] = true; + + if (catalog_xmin != InvalidTransactionId) + { + tmpbigint = catalog_xmin; + values[i++] = Int64GetDatum(tmpbigint); + } + else + nulls[i++] = true; + + if (restart_lsn != InvalidXLogRecPtr) + { + snprintf(restart_lsn_str, sizeof(restart_lsn_str), "%X/%X", + LSN_FORMAT_ARGS(restart_lsn)); + values[i++] = CStringGetTextDatum(restart_lsn_str); + } + else + nulls[i++] = true; + + if (confirmed_flush_lsn != InvalidXLogRecPtr) + { + snprintf(confirmed_flush_lsn_str, sizeof(confirmed_flush_lsn_str), + "%X/%X", LSN_FORMAT_ARGS(confirmed_flush_lsn)); + values[i++] = CStringGetTextDatum(confirmed_flush_lsn_str); + } + else + nulls[i++] = true; + + /* send it to dest */ + do_tup_output(tstate, values, nulls); + } + LWLockRelease(ReplicationSlotControlLock); + + end_tup_output(tstate); +} + /* Handle READ_REPLICATION_SLOT command */ static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd) @@ -1820,6 +2008,13 @@ exec_replication_command(const char *cmd_string) EndReplicationCommand(cmdtag); break; + case T_ListSlotsCmd: + cmdtag = "LIST_SLOTS"; + set_ps_display(cmdtag); + ListSlots((ListSlotsCmd *) cmd_node); + EndReplicationCommand(cmdtag); + break; + case T_StartReplicationCmd: { StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node; diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c index 7940d64639..f2a9517091 100644 --- a/src/backend/utils/activity/wait_event.c +++ b/src/backend/utils/activity/wait_event.c @@ -230,6 +230,9 @@ pgstat_get_wait_activity(WaitEventActivity w) case WAIT_EVENT_LOGICAL_LAUNCHER_MAIN: event_name = "LogicalLauncherMain"; break; + case WAIT_EVENT_REPL_SLOT_SYNC_MAIN: + event_name = "ReplSlotSyncMain"; + break; case WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN: event_name = "LogicalParallelApplyMain"; break; diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index cab3ddbe11..0ee7ad1348 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -63,8 +63,12 @@ #include "postmaster/syslogger.h" #include "postmaster/walwriter.h" #include "replication/logicallauncher.h" +#include "replication/logicalworker.h" +#include "replication/reorderbuffer.h" #include "replication/slot.h" #include "replication/syncrep.h" +#include "replication/walreceiver.h" +#include "replication/walsender.h" #include "storage/bufmgr.h" #include "storage/large_object.h" #include "storage/pg_shmem.h" @@ -4587,6 +4591,28 @@ struct config_string ConfigureNamesString[] = check_io_direct, assign_io_direct, NULL }, + { + {"synchronize_slot_names", PGC_SIGHUP, REPLICATION_STANDBY, + gettext_noop("Sets the names of replication slots which to synchronize from primary to standby."), + gettext_noop("Value of \"*\" means all."), + GUC_LIST_INPUT | GUC_LIST_QUOTE + }, + &synchronize_slot_names, + "", + check_synchronize_slot_names, NULL, NULL + }, + + { + {"standby_slot_names", PGC_SIGHUP, REPLICATION_PRIMARY, + gettext_noop("List of physical slots that must confirm changes before changes are sent to logical replication consumers."), + NULL, + GUC_LIST_INPUT | GUC_LIST_QUOTE + }, + &standby_slot_names, + "", + check_standby_slot_names, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index dce5049bc2..2ff2188c02 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -330,6 +330,7 @@ # and comma-separated list of application_name # from standby(s); '*' = all #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed +#standby_slot_names = '' # physical standby slot names that logical replication waits for # - Standby Servers - @@ -357,6 +358,7 @@ #wal_retrieve_retry_interval = 5s # time to wait before retrying to # retrieve WAL after a failed attempt #recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery +#synchronize_slot_names = '' # logical replication slots to sync to standby # - Subscribers - diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index 214dc6c29e..0e77f9ee5c 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -17,6 +17,7 @@ #include "catalog/objectaddress.h" #include "parser/parse_node.h" +#include "replication/walreceiver.h" extern ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel); @@ -28,4 +29,6 @@ extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId); extern char defGetStreamingMode(DefElem *def); +extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); + #endif /* SUBSCRIPTIONCMDS_H */ diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index 4321ba8f86..980e0b2ee2 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -33,6 +33,15 @@ typedef struct IdentifySystemCmd NodeTag type; } IdentifySystemCmd; +/* ---------------------- + * LIST_SLOTS command + * ---------------------- + */ +typedef struct ListSlotsCmd +{ + NodeTag type; + List *slot_names; +} ListSlotsCmd; /* ---------------------- * BASE_BACKUP command diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index a07c9cb311..80fdbf9657 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -31,4 +31,6 @@ extern bool IsLogicalLauncher(void); extern pid_t GetLeaderApplyWorkerPid(pid_t pid); +extern PGDLLIMPORT char *PrimaryConnInfo; + #endif /* LOGICALLAUNCHER_H */ diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index 39588da79f..6408753557 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -14,10 +14,16 @@ #include +#include "utils/guc.h" + +extern char *synchronize_slot_names; +extern char *standby_slot_names; + extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; extern void ApplyWorkerMain(Datum main_arg); extern void ParallelApplyWorkerMain(Datum main_arg); +extern void ReplSlotSyncMain(Datum main_arg); extern bool IsLogicalWorker(void); extern bool IsLogicalParallelApplyWorker(void); @@ -29,4 +35,7 @@ extern void LogicalRepWorkersWakeupAtCommit(Oid subid); extern void AtEOXact_LogicalRepWorkers(bool isCommit); +extern bool check_synchronize_slot_names(char **newval, void **extra, GucSource source); +extern bool check_standby_slot_names(char **newval, void **extra, GucSource source); + #endif /* LOGICALWORKER_H */ diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index a8a89dc784..5dc2e0d30d 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -15,7 +15,6 @@ #include "storage/lwlock.h" #include "storage/shmem.h" #include "storage/spin.h" -#include "replication/walreceiver.h" /* * Behaviour of replication slots, upon release or crash. @@ -238,7 +237,6 @@ extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_l extern int ReplicationSlotIndex(ReplicationSlot *slot); extern bool ReplicationSlotName(int index, Name name); extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot); -extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); extern void StartupReplicationSlots(void); extern void CheckPointReplicationSlots(void); @@ -246,4 +244,7 @@ extern void CheckPointReplicationSlots(void); extern void CheckSlotRequirements(void); extern void CheckSlotPermissions(void); +extern XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr moveto); + + #endif /* SLOT_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 281626fa6f..9e9d64faf2 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -20,6 +20,7 @@ #include "pgtime.h" #include "port/atomics.h" #include "replication/logicalproto.h" +#include "replication/slot.h" #include "replication/walsender.h" #include "storage/condition_variable.h" #include "storage/latch.h" @@ -191,6 +192,17 @@ typedef struct } proto; } WalRcvStreamOptions; +/* + * Slot information receiver from remote. + * + * Currently same as ReplicationSlotPersistentData except last_sync_time + */ +typedef struct WalRecvReplicationSlotData +{ + ReplicationSlotPersistentData persistent_data; + TimestampTz last_sync_time; +} WalRecvReplicationSlotData; + struct WalReceiverConn; typedef struct WalReceiverConn WalReceiverConn; @@ -280,6 +292,11 @@ typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn, typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn, TimeLineID *primary_tli); +/* + * TODO + */ +typedef List *(*walrcv_list_slots_fn) (WalReceiverConn *conn, const char *slots); + /* * walrcv_server_version_fn * @@ -393,6 +410,7 @@ typedef struct WalReceiverFunctionsType walrcv_get_conninfo_fn walrcv_get_conninfo; walrcv_get_senderinfo_fn walrcv_get_senderinfo; walrcv_identify_system_fn walrcv_identify_system; + walrcv_list_slots_fn walrcv_list_slots; walrcv_server_version_fn walrcv_server_version; walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile; walrcv_startstreaming_fn walrcv_startstreaming; @@ -417,6 +435,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port) #define walrcv_identify_system(conn, primary_tli) \ WalReceiverFunctions->walrcv_identify_system(conn, primary_tli) +#define walrcv_list_slots(conn, slots) \ + WalReceiverFunctions->walrcv_list_slots(conn, slots) #define walrcv_server_version(conn) \ WalReceiverFunctions->walrcv_server_version(conn) #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index dce71d2c50..5b4fda2fd9 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -65,7 +65,7 @@ typedef struct LogicalRepWorker * would be created for each transaction which will be deleted after the * transaction is finished. */ - FileSet *stream_fileset; + struct FileSet *stream_fileset; /* * PID of leader apply worker if this slot is used for a parallel apply @@ -226,15 +226,15 @@ extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker; extern PGDLLIMPORT bool in_remote_transaction; extern void logicalrep_worker_attach(int slot); -extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, +extern LogicalRepWorker *logicalrep_worker_find(Oid dbid, Oid subid, Oid relid, bool only_running); extern List *logicalrep_workers_find(Oid subid, bool only_running); extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm); -extern void logicalrep_worker_stop(Oid subid, Oid relid); +extern void logicalrep_worker_stop(Oid dbid, Oid subid, Oid relid); extern void logicalrep_pa_worker_stop(int slot_no, uint16 generation); -extern void logicalrep_worker_wakeup(Oid subid, Oid relid); +extern void logicalrep_worker_wakeup(Oid dbid, Oid subid, Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); extern int logicalrep_sync_worker_count(Oid subid); diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h index 518d3b0a1f..cccd4d9d32 100644 --- a/src/include/utils/wait_event.h +++ b/src/include/utils/wait_event.h @@ -43,6 +43,7 @@ typedef enum WAIT_EVENT_LOGICAL_APPLY_MAIN, WAIT_EVENT_LOGICAL_LAUNCHER_MAIN, WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN, + WAIT_EVENT_REPL_SLOT_SYNC_MAIN, WAIT_EVENT_RECOVERY_WAL_STREAM, WAIT_EVENT_SYSLOGGER_MAIN, WAIT_EVENT_WAL_RECEIVER_MAIN, diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index 2008958010..b6fcc8704e 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -42,6 +42,7 @@ tests += { 't/034_create_database.pl', 't/035_standby_logical_decoding.pl', 't/036_truncated_dropped.pl', + 't/037_slot_sync.pl', ], }, } diff --git a/src/test/recovery/t/037_slot_sync.pl b/src/test/recovery/t/037_slot_sync.pl new file mode 100644 index 0000000000..0520042d96 --- /dev/null +++ b/src/test/recovery/t/037_slot_sync.pl @@ -0,0 +1,130 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $node_primary = PostgreSQL::Test::Cluster->new('primary'); +my $node_phys_standby = PostgreSQL::Test::Cluster->new('phys_standby'); +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); + +# find $pat in logfile of $node after $off-th byte +sub find_in_log +{ + my ($node, $pat, $off) = @_; + + $off = 0 unless defined $off; + my $log = PostgreSQL::Test::Utils::slurp_file($node->logfile); + return 0 if (length($log) <= $off); + + $log = substr($log, $off); + + return $log =~ m/$pat/; +} + +# Check invalidation in the logfile +sub check_for_invalidation +{ + my ($log_start, $test_name) = @_; + + # message should be issued + ok( find_in_log( + $node_phys_standby, + "invalidating obsolete replication slot \"sub1\"", $log_start), + "sub1 slot invalidation is logged $test_name"); +} + +# Check conflicting status in pg_replication_slots. +sub check_slots_conflicting_status +{ + my $res = $node_phys_standby->safe_psql( + 'postgres', qq( + select bool_and(conflicting) from pg_replication_slots;)); + + is($res, 't', + "Logical slot is reported as conflicting"); +} + +$node_primary->init(allows_streaming => 'logical'); +$node_primary->append_conf('postgresql.conf', "standby_slot_names = 'pslot1'"); +$node_primary->start; +$node_primary->psql('postgres', q{SELECT pg_create_physical_replication_slot('pslot1');}); + +$node_primary->backup('backup'); + +$node_phys_standby->init_from_backup($node_primary, 'backup', has_streaming => 1); +$node_phys_standby->append_conf('postgresql.conf', q{ +synchronize_slot_names = '*' +primary_slot_name = 'pslot1' +hot_standby_feedback = off +}); + +$node_phys_standby->start; + +$node_primary->safe_psql('postgres', "CREATE TABLE t1 (a int PRIMARY KEY)"); +$node_primary->safe_psql('postgres', "INSERT INTO t1 VALUES (1), (2), (3)"); + +# Some tests need to wait for VACUUM to be replayed. But vacuum does not flush +# WAL. An insert into flush_wal outside transaction does guarantee a flush. +$node_primary->psql('postgres', q[CREATE TABLE flush_wal();]); + +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +$node_subscriber->safe_psql('postgres', "CREATE TABLE t1 (a int PRIMARY KEY)"); + +$node_primary->safe_psql('postgres', "CREATE PUBLICATION pub1 FOR TABLE t1"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub1 CONNECTION '" . ($node_primary->connstr . ' dbname=postgres') . "' PUBLICATION pub1"); + +# Wait for initial sync of all subscriptions +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = $node_primary->safe_psql('postgres', + "SELECT slot_name, plugin, database FROM pg_replication_slots WHERE slot_type = 'logical'"); + +is($result, qq(sub1|pgoutput|postgres), 'logical slot on primary'); + +# FIXME: standby needs restart to pick up new slots +$node_phys_standby->restart; +sleep 3; + +$result = $node_phys_standby->safe_psql('postgres', + "SELECT slot_name, plugin, database FROM pg_replication_slots"); + +is($result, qq(sub1|pgoutput|postgres), 'logical slot on standby'); + +$node_primary->safe_psql('postgres', "INSERT INTO t1 VALUES (4), (5), (6)"); +$node_primary->wait_for_catchup('sub1'); + +$node_primary->wait_for_catchup($node_phys_standby->name); + +# Logical subscriber and physical replica are caught up at this point. + +# Drop the subscription so that catalog_xmin is unknown on the primary +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub1"); + +# This should trigger a conflict as hot_standby_feedback is off on the standby +$node_primary->safe_psql('postgres', qq[ + CREATE TABLE conflict_test(x integer, y text); + DROP TABLE conflict_test; + VACUUM full pg_class; + INSERT INTO flush_wal DEFAULT VALUES; -- see create table flush_wal +]); + +# Ensure physical replay catches up +$node_primary->wait_for_catchup($node_phys_standby); + +# Check invalidation in the logfile +check_for_invalidation(1, 'with vacuum FULL on pg_class'); + +# Check conflicting status in pg_replication_slots. +check_slots_conflicting_status(); + +done_testing(); -- 2.34.1