diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm index 28e9f0b..64a4633 100644 --- a/src/test/perl/PostgresNode.pm +++ b/src/test/perl/PostgresNode.pm @@ -93,7 +93,6 @@ use RecursiveCopy; use Socket; use Test::More; use TestLib (); -use pg_lsn qw(parse_lsn); use Scalar::Util qw(blessed); our @EXPORT = qw( @@ -1325,38 +1324,62 @@ sub run_log TestLib::run_log(@_); } -=pod $node->lsn +=pod $node->lsn(mode) -Return pg_current_xlog_insert_location() or, on a replica, -pg_last_xlog_replay_location(). +Look up xlog positions on the server: + +* insert position (master only, error on replica) +* write position (master only, error on replica) +* flush position +* receive position (always undef on master) +* replay position + +mode must be specified. =cut sub lsn { - my $self = shift; - return $self->safe_psql('postgres', 'select case when pg_is_in_recovery() then pg_last_xlog_replay_location() else pg_current_xlog_insert_location() end as lsn;'); + my ($self, $mode) = @_; + my %modes = ('insert' => 'pg_current_xlog_insert_location()', + 'flush' => 'pg_current_xlog_flush_location()', + 'write' => 'pg_current_xlog_location()', + 'receive' => 'pg_last_xlog_receive_location()', + 'replay' => 'pg_last_xlog_replay_location()'); + + $mode = '' if !defined($mode); + die "unknown mode for 'lsn': '$mode', valid modes are " . join(', ', keys %modes) + if !defined($modes{$mode}); + + my $result = $self->safe_psql('postgres', "SELECT $modes{$mode}"); + chomp($result); + if ($result eq '') + { + return undef; + } + else + { + return $result; + } } =pod $node->wait_for_catchup(standby_name, mode, target_lsn) Wait for the node with application_name standby_name (usually from node->name) -until its replication equals or passes the upstream's xlog insert point at the -time this function is called. By default the replay_location is waited for, -but 'mode' may be specified to wait for any of sent|write|flush|replay. +until its replication position in pg_stat_replication equals or passes the +upstream's xlog insert point at the time this function is called. By default +the replay_location is waited for, but 'mode' may be specified to wait for any +of sent|write|flush|replay. If there is no active replication connection from this peer, waits until poll_query_until timeout. Requires that the 'postgres' db exists and is accessible. -If pos is passed, use that xlog position instead of the server's current -xlog insert position. +target_lsn may be any arbitrary lsn, but is typically $master_node->lsn('insert'). This is not a test. It die()s on failure. -Returns the LSN caught up to. - =cut sub wait_for_catchup @@ -1364,24 +1387,25 @@ sub wait_for_catchup my ($self, $standby_name, $mode, $target_lsn) = @_; $mode = defined($mode) ? $mode : 'replay'; my %valid_modes = ( 'sent' => 1, 'write' => 1, 'flush' => 1, 'replay' => 1 ); - die "valid modes are " . join(', ', keys(%valid_modes)) unless exists($valid_modes{$mode}); - if ( blessed( $standby_name ) && $standby_name->isa("PostgresNode") ) { + die "unknown mode $mode for 'wait_for_catchup', valid modes are " . join(', ', keys(%valid_modes)) unless exists($valid_modes{$mode}); + # Allow passing of a PostgresNode instance as shorthand + if ( blessed( $standby_name ) && $standby_name->isa("PostgresNode") ) + { $standby_name = $standby_name->name; } - if (!defined($target_lsn)) { - $target_lsn = $self->lsn; - } - $self->poll_query_until('postgres', qq[SELECT '$target_lsn' <= ${mode}_location FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';]) - or die "timed out waiting for catchup"; - return $target_lsn; + die 'target_lsn must be specified' unless defined($target_lsn); + print "Waiting for replication conn " . $standby_name . "'s " . $mode . "_location to pass " . $target_lsn . " on " . $self->name . "\n"; + my $query = qq[SELECT '$target_lsn' <= ${mode}_location FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';]; + $self->poll_query_until('postgres', $query) + or die "timed out waiting for catchup, current position is " . ($self->safe_psql('postgres', $query) || '(unknown)'); + print "done"; } =pod $node->wait_for_slot_catchup(slot_name, mode, target_lsn) -Wait for the named replication slot to equal or pass the xlog position of the -server, or the supplied target_lsn if given. The position used is the -restart_lsn unless mode is given, in which case it may be 'restart' or -'confirmed_flush'. +Wait for the named replication slot to equal or pass the supplied target_lsn. +The position used is the restart_lsn unless mode is given, in which case it may +be 'restart' or 'confirmed_flush'. Requires that the 'postgres' db exists and is accessible. @@ -1389,9 +1413,9 @@ This is not a test. It die()s on failure. If the slot is not active, will time out after poll_query_until's timeout. -Note that for logical slots, restart_lsn is held down by the oldest in progress tx. +target_lsn may be any arbitrary lsn, but is typically $master_node->lsn('insert'). -Returns the LSN caught up to. +Note that for logical slots, restart_lsn is held down by the oldest in-progress tx. =cut @@ -1399,15 +1423,55 @@ sub wait_for_slot_catchup { my ($self, $slot_name, $mode, $target_lsn) = @_; $mode = defined($mode) ? $mode : 'restart'; - if (!($mode eq 'restart' || $mode eq 'confirmed_flush')) { + if (!($mode eq 'restart' || $mode eq 'confirmed_flush')) + { die "valid modes are restart, confirmed_flush"; } - if (!defined($target_lsn)) { - $target_lsn = $self->lsn; - } - $self->poll_query_until('postgres', qq[SELECT '$target_lsn' <= ${mode}_lsn FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name';]) - or die "timed out waiting for catchup"; - return $target_lsn; + die 'target lsn must be specified' unless defined($target_lsn); + print "Waiting for replication slot " . $slot_name . "'s " . $mode . "_lsn to pass " . $target_lsn . " on " . $self->name . "\n"; + my $query = qq[SELECT '$target_lsn' <= ${mode}_lsn FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name';]; + $self->poll_query_until('postgres', $query) + or die "timed out waiting for catchup, current position is " . ($self->safe_psql('postgres', $query) || '(unknown)'); + print "done\n"; +} + +=pod $node->query_hash($dbname, $query, @columns) + +Execute $query on $dbname, replacing any appearance of the string __COLUMNS__ +within the query with a comma-separated list of @columns. + +If __COLUMNS__ does not appear in the query, its result columns must EXACTLY +match the order and number (but not necessarily alias) of supplied @columns. + +The query must return zero or one rows. + +Return a hash-ref representation of the results of the query, with any empty +or null results as defined keys with an empty-string value. There is no way +to differentiate between null and empty-string result fields. + +If the query returns zero rows, return a hash with all columns empty. There +is no way to differentiate between zero rows returned and a row with only +null columns. + +=cut + +sub query_hash +{ + my ($self, $dbname, $query, @columns) = @_; + die 'calls in array context for multi-row results not supported yet' if (wantarray); + # Replace __COLUMNS__ if found + substr($query, index($query, '__COLUMNS__'), length('__COLUMNS__')) = join(', ', @columns) + if index($query, '__COLUMNS__') >= 0; + my $result = $self->safe_psql($dbname, $query); + $result = undef if $result eq ''; + # hash slice, see http://stackoverflow.com/a/16755894/398670 . + # + # Fills the hash with empty strings produced by x-operator element + # duplication if result is an empty row + # + my %val; + @val{@columns} = $result ne '' ? split(qr/\|/, $result) : ('',) x scalar(@columns); + return \%val; } =pod $node->slot(slot_name) @@ -1426,19 +1490,8 @@ either. sub slot { my ($self, $slot_name) = @_; - my @fields = ('plugin', 'slot_type', 'datoid', 'database', 'active', 'active_pid', 'xmin', 'catalog_xmin', 'restart_lsn'); - my $result = $self->safe_psql('postgres', 'SELECT ' . join(', ', @fields) . " FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name'"); - $result = undef if $result eq ''; - # hash slice, see http://stackoverflow.com/a/16755894/398670 . - # - # Fills the hash with empty strings produced by x-operator element - # duplication if result is an empty row - # - my %val; - @val{@fields} = $result ne '' ? split(qr/\|/, $result) : ('',) x scalar(@fields); - $val{'restart_lsn_arr'} = parse_lsn($val{'restart_lsn'}); - $val{'confirmed_flush_lsn_arr'} = parse_lsn($val{'confirmed_flush_lsn'}); - return \%val; + my @columns = ('plugin', 'slot_type', 'datoid', 'database', 'active', 'active_pid', 'xmin', 'catalog_xmin', 'restart_lsn'); + return $self->query_hash('postgres', "SELECT __COLUMNS__ FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name'", @columns); } =pod diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl index 5ce69bb..ba1da8c 100644 --- a/src/test/recovery/t/001_stream_rep.pl +++ b/src/test/recovery/t/001_stream_rep.pl @@ -40,8 +40,8 @@ $node_master->safe_psql('postgres', "CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a"); # Wait for standbys to catch up -$node_master->wait_for_catchup($node_standby_1); -$node_standby_1->wait_for_catchup($node_standby_2); +$node_master->wait_for_catchup($node_standby_1, 'replay', $node_master->lsn('insert')); +$node_standby_1->wait_for_catchup($node_standby_2, 'replay', $node_standby_1->lsn('replay')); my $result = $node_standby_1->safe_psql('postgres', "SELECT count(*) FROM tab_int"); diff --git a/src/test/recovery/t/004_timeline_switch.pl b/src/test/recovery/t/004_timeline_switch.pl index 5f3b2fe..7c6587a 100644 --- a/src/test/recovery/t/004_timeline_switch.pl +++ b/src/test/recovery/t/004_timeline_switch.pl @@ -32,14 +32,9 @@ $node_standby_2->start; # Create some content on master $node_master->safe_psql('postgres', "CREATE TABLE tab_int AS SELECT generate_series(1,1000) AS a"); -my $until_lsn = - $node_master->safe_psql('postgres', "SELECT pg_current_xlog_location();"); # Wait until standby has replayed enough data on standby 1 -my $caughtup_query = - "SELECT '$until_lsn'::pg_lsn <= pg_last_xlog_replay_location()"; -$node_standby_1->poll_query_until('postgres', $caughtup_query) - or die "Timed out while waiting for standby to catch up"; +$node_master->wait_for_catchup($node_standby_1, 'replay', $node_master->lsn('write')); # Stop and remove master, and promote standby 1, switching it to a new timeline $node_master->teardown_node; @@ -50,7 +45,7 @@ rmtree($node_standby_2->data_dir . '/recovery.conf'); my $connstr_1 = $node_standby_1->connstr; $node_standby_2->append_conf( 'recovery.conf', qq( -primary_conninfo='$connstr_1' +primary_conninfo='$connstr_1 application_name=@{[$node_standby_2->name]}' standby_mode=on recovery_target_timeline='latest' )); @@ -60,12 +55,7 @@ $node_standby_2->restart; # to ensure that the timeline switch has been done. $node_standby_1->safe_psql('postgres', "INSERT INTO tab_int VALUES (generate_series(1001,2000))"); -$until_lsn = $node_standby_1->safe_psql('postgres', - "SELECT pg_current_xlog_location();"); -$caughtup_query = - "SELECT '$until_lsn'::pg_lsn <= pg_last_xlog_replay_location()"; -$node_standby_2->poll_query_until('postgres', $caughtup_query) - or die "Timed out while waiting for standby to catch up"; +$node_standby_1->wait_for_catchup($node_standby_2, 'replay', $node_standby_1->lsn('write')); my $result = $node_standby_2->safe_psql('postgres', "SELECT count(*) FROM tab_int");