From adbd72f70ee3592965f2a52500820d1387dcbf85 Mon Sep 17 00:00:00 2001 From: Michael Paquier Date: Thu, 2 Sep 2021 16:25:25 +0900 Subject: [PATCH v4] Add READ_REPLICATION_SLOT command --- src/include/nodes/nodes.h | 1 + src/include/nodes/replnodes.h | 10 ++ src/backend/replication/repl_gram.y | 16 ++- src/backend/replication/repl_scanner.l | 1 + src/backend/replication/walsender.c | 112 ++++++++++++++++++++ src/test/recovery/t/001_stream_rep.pl | 47 +++++++- src/test/recovery/t/006_logical_decoding.pl | 15 ++- doc/src/sgml/protocol.sgml | 66 ++++++++++++ src/tools/pgindent/typedefs.list | 1 + 9 files changed, 266 insertions(+), 3 deletions(-) diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 6a4d82f0a8..5f78bdd573 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -495,6 +495,7 @@ typedef enum NodeTag * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h) */ T_IdentifySystemCmd, + T_ReadReplicationSlotCmd, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index faa3a251f2..46384ea074 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -33,6 +33,16 @@ typedef struct IdentifySystemCmd NodeTag type; } IdentifySystemCmd; +/* ---------------------- + * READ_REPLICATION_SLOT command + * ---------------------- + */ +typedef struct ReadReplicationSlotCmd +{ + NodeTag type; + char *slotname; +} ReadReplicationSlotCmd; + /* ---------------------- * BASE_BACKUP command diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index e1e8ec29cc..d34d617045 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -64,6 +64,7 @@ static SQLCmd *make_sqlcmd(void); /* Keyword tokens. */ %token K_BASE_BACKUP %token K_IDENTIFY_SYSTEM +%token K_READ_REPLICATION_SLOT %token K_SHOW %token K_START_REPLICATION %token K_CREATE_REPLICATION_SLOT @@ -94,7 +95,7 @@ static SQLCmd *make_sqlcmd(void); %type command %type base_backup start_replication start_logical_replication create_replication_slot drop_replication_slot identify_system - timeline_history show sql_cmd + read_replication_slot timeline_history show sql_cmd %type base_backup_opt_list %type base_backup_opt %type opt_timeline @@ -120,6 +121,7 @@ opt_semicolon: ';' command: identify_system + | read_replication_slot | base_backup | start_replication | start_logical_replication @@ -140,6 +142,18 @@ identify_system: } ; +/* + * READ_REPLICATION_SLOT %s + */ +read_replication_slot: + K_READ_REPLICATION_SLOT var_name + { + ReadReplicationSlotCmd *n = makeNode(ReadReplicationSlotCmd); + n->slotname = $2; + $$ = (Node *) n; + } + ; + /* * SHOW setting */ diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index c038a636c3..1b599c255e 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -85,6 +85,7 @@ identifier {ident_start}{ident_cont}* BASE_BACKUP { return K_BASE_BACKUP; } FAST { return K_FAST; } IDENTIFY_SYSTEM { return K_IDENTIFY_SYSTEM; } +READ_REPLICATION_SLOT { return K_READ_REPLICATION_SLOT; } SHOW { return K_SHOW; } LABEL { return K_LABEL; } NOWAIT { return K_NOWAIT; } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3ca2a11389..afcc5f6612 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -232,6 +232,7 @@ static void XLogSendLogical(void); static void WalSndDone(WalSndSendDataCallback send_data); static XLogRecPtr GetStandbyFlushRecPtr(void); static void IdentifySystem(void); +static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd); static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd); static void DropReplicationSlot(DropReplicationSlotCmd *cmd); static void StartReplication(StartReplicationCmd *cmd); @@ -457,6 +458,110 @@ IdentifySystem(void) end_tup_output(tstate); } +/* Handle READ_REPLICATION_SLOT command */ +static void +ReadReplicationSlot(ReadReplicationSlotCmd *cmd) +{ +#define READ_REPLICATION_SLOT_COLS 5 + ReplicationSlot *slot; + DestReceiver *dest; + TupOutputState *tstate; + TupleDesc tupdesc; + Datum values[READ_REPLICATION_SLOT_COLS]; + bool nulls[READ_REPLICATION_SLOT_COLS]; + + tupdesc = CreateTemplateTupleDesc(READ_REPLICATION_SLOT_COLS); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "confirmed_flush_lsn", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "restart_tli", + INT4OID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 5, "confirmed_flush_tli", + INT4OID, -1, 0); + + MemSet(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool)); + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + slot = SearchNamedReplicationSlot(cmd->slotname, false); + if (slot == NULL || !slot->in_use) + { + LWLockRelease(ReplicationSlotControlLock); + } + else + { + List *timeline_history = NIL; + ReplicationSlot slot_contents; + int i = 0; + char xloc[MAXFNAMELEN]; + TimeLineID slots_position_timeline; + + /* Copy slot contents while holding spinlock */ + SpinLockAcquire(&slot->mutex); + slot_contents = *slot; + SpinLockRelease(&slot->mutex); + LWLockRelease(ReplicationSlotControlLock); + + if (OidIsValid(slot_contents.data.database)) + values[i] = CStringGetTextDatum("logical"); + else + values[i] = CStringGetTextDatum("physical"); + nulls[i] = false; + i++; + + if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn)) + { + snprintf(xloc, sizeof(xloc), "%X/%X", + LSN_FORMAT_ARGS(slot_contents.data.restart_lsn)); + values[i] = CStringGetTextDatum(xloc); + nulls[i] = false; + } + i++; + + if (!XLogRecPtrIsInvalid(slot_contents.data.confirmed_flush)) + { + snprintf(xloc, sizeof(xloc), "%X/%X", + LSN_FORMAT_ARGS(slot_contents.data.confirmed_flush)); + values[i] = CStringGetTextDatum(xloc); + nulls[i] = false; + } + i++; + + /* + * Now get the timeline this wal was produced on, to get to the + * current timeline + */ + if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn)) + { + timeline_history = readTimeLineHistory(ThisTimeLineID); + slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn, + timeline_history); + values[i] = Int32GetDatum(slots_position_timeline); + nulls[i] = false; + } + i++; + + if (!XLogRecPtrIsInvalid(slot_contents.data.confirmed_flush)) + { + if (!timeline_history) + timeline_history = readTimeLineHistory(ThisTimeLineID); + slots_position_timeline = tliOfPointInHistory(slot_contents.data.confirmed_flush, + timeline_history); + values[i] = Int32GetDatum(slots_position_timeline); + nulls[i] = false; + } + i++; + Assert(i == READ_REPLICATION_SLOT_COLS); + } + + dest = CreateDestReceiver(DestRemoteSimple); + tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); + do_tup_output(tstate, values, nulls); + end_tup_output(tstate); +} + /* * Handle TIMELINE_HISTORY command. @@ -1618,6 +1723,13 @@ exec_replication_command(const char *cmd_string) EndReplicationCommand(cmdtag); break; + case T_ReadReplicationSlotCmd: + cmdtag = "READ_REPLICATION_SLOT"; + set_ps_display(cmdtag); + ReadReplicationSlot((ReadReplicationSlotCmd *) cmd_node); + EndReplicationCommand(cmdtag); + break; + case T_BaseBackupCmd: cmdtag = "BASE_BACKUP"; set_ps_display(cmdtag); diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl index ac581c1c07..cbd817cda3 100644 --- a/src/test/recovery/t/001_stream_rep.pl +++ b/src/test/recovery/t/001_stream_rep.pl @@ -6,7 +6,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 49; +use Test::More tests => 55; # Initialize primary node my $node_primary = PostgresNode->new('primary'); @@ -252,6 +252,51 @@ ok( $ret == 0, "SHOW with superuser-settable parameter, replication role and logical replication" ); +note "testing READ_REPLICATION_SLOT command"; + +my $slotname = 'test_read_replication_slot_physical'; + +($ret, $stdout, $stderr) = $node_primary->psql( + 'postgres', + 'READ_REPLICATION_SLOT non_existent_slot;', + extra_params => [ '-d', $connstr_rep ]); +ok( $ret == 0, + "READ_REPLICATION_SLOT does not produce an error with non existent slot"); +ok($stdout eq '||||', + "READ_REPLICATION_SLOT returns NULL values if slot does not exist"); + +($ret, $stdout, $stderr) = $node_primary->psql( + 'postgres', + "CREATE_REPLICATION_SLOT $slotname PHYSICAL RESERVE_WAL;", + extra_params => [ '-d', $connstr_rep ], + 0, + 'physical slot created on primary'); + +($ret, $stdout, $stderr) = $node_primary->psql( + 'postgres', + "READ_REPLICATION_SLOT $slotname;", + extra_params => [ '-d', $connstr_rep ]); +ok($ret == 0, + "READ_REPLICATION_SLOT does not produce an error with existing slot"); +ok($stdout =~ 'physical\|[^|]*\|\|1\|', + "READ_REPLICATION_SLOT returns tuple corresponding to the slot"); + +$node_primary->psql( + 'postgres', + "DROP_REPLICATION_SLOT $slotname;", + extra_params => [ '-d', $connstr_rep ], + 0, + 'physical slot dropped on primary'); + +($ret, $stdout, $stderr) = $node_primary->psql( + 'postgres', + "READ_REPLICATION_SLOT $slotname;", + extra_params => [ '-d', $connstr_rep ]); +ok($ret == 0, + "READ_REPLICATION_SLOT does not produce an error with dropped slot"); +ok($stdout eq '||||', + "READ_REPLICATION_SLOT returns NULL values if slot has been dropped"); + note "switching to physical replication slot"; # Switch to using a physical replication slot. We can do this without a new diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index cc116062c2..50e4fa1042 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -10,7 +10,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 14; +use Test::More tests => 16; use Config; # Initialize primary node @@ -39,6 +39,19 @@ ok( $stderr =~ m/replication slot "test_slot" was not created in this database/, "Logical decoding correctly fails to start"); +($result, $stdout, $stderr) = $node_primary->psql( + 'template1', + qq[READ_REPLICATION_SLOT test_slot;], + replication => 'database'); +ok($stdout =~ 'logical\|[^|]*\|[^|]*\|1\|1', + 'Logical replication slot can be read on any logical connection'); +($result, $stdout, $stderr) = $node_primary->psql( + 'postgres', + qq[READ_REPLICATION_SLOT test_slot;], + replication => '1'); +ok($stdout =~ 'logical\|[^|]*\|[^|]*\|1\|1', + 'Logical replication slot can be read on a physical connection'); + # Check case of walsender not using a database connection. Logical # decoding should not be allowed. ($result, $stdout, $stderr) = $node_primary->psql( diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index a232546b1d..8191f17137 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2052,6 +2052,72 @@ The commands accepted in replication mode are: + + READ_REPLICATION_SLOT slot_name + READ_REPLICATION_SLOT + + + + Read the information of a replication slot. Returns a tuple with + NULL values if the replication slot does not + exist. + + + In response to this command, the server will return a one-row result set, + containing the following fields: + + + slot_type (text) + + + The replication slot's type, either physical or + logical. + + + + + + restart_lsn (text) + + + The replication slot's restart_lsn. + + + + + + confirmed_flush_lsn (text) + + + The replication slot's confirmed_flush_lsn. + + + + + + restart_tli (int4) + + + The timeline ID for the restart_lsn position, + when following the current timeline history. + + + + + + confirmed_flush_tli (int4) + + + The timeline ID for the confirmed_flush_lsn + position, when following the current timeline history. + + + + + + + + START_REPLICATION [ SLOT slot_name ] [ PHYSICAL ] XXX/XXX [ TIMELINE tli ] START_REPLICATION diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index f31a1e4e1e..b256ee3be6 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2126,6 +2126,7 @@ ReadBufferMode ReadBytePtrType ReadExtraTocPtrType ReadFunc +ReadReplicationSlotCmd ReassignOwnedStmt RecheckForeignScan_function RecordCacheEntry -- 2.33.0