From d3fa36f3bc7f8ef4c0c541742ac8ad6d9eee5f09 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Thu, 14 Sep 2023 06:01:40 +0000 Subject: [PATCH v38] Another one: Reads all WAL records ahead confirmed_flush_lsn --- doc/src/sgml/ref/pgupgrade.sgml | 5 +- src/backend/utils/adt/pg_upgrade_support.c | 130 ++++++++++++++++++ src/bin/pg_upgrade/check.c | 8 +- src/bin/pg_upgrade/controldata.c | 39 ------ src/bin/pg_upgrade/info.c | 6 +- src/bin/pg_upgrade/pg_upgrade.h | 1 - .../t/003_logical_replication_slots.pl | 20 +++ src/include/catalog/pg_proc.dat | 6 + 8 files changed, 164 insertions(+), 51 deletions(-) diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml index 4e2281bae4..2588d6d7b8 100644 --- a/doc/src/sgml/ref/pgupgrade.sgml +++ b/doc/src/sgml/ref/pgupgrade.sgml @@ -418,10 +418,7 @@ make prefix=/usr/local/pgsql.new install - pg_replication_slots.confirmed_flush_lsn - of all slots on the old cluster must be the same as the latest - checkpoint location. This ensures that all the data has been replicated - before the upgrade. + Old cluster has replicated all the changes replicated to subscribers. diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c index 0186636d9f..340dc180be 100644 --- a/src/backend/utils/adt/pg_upgrade_support.c +++ b/src/backend/utils/adt/pg_upgrade_support.c @@ -11,14 +11,22 @@ #include "postgres.h" +#include "access/heapam_xlog.h" +#include "access/rmgr.h" +#include "access/xlog.h" +#include "access/xlog_internal.h" +#include "access/xlogutils.h" #include "catalog/binary_upgrade.h" #include "catalog/heap.h" #include "catalog/namespace.h" +#include "catalog/pg_control.h" #include "catalog/pg_type.h" #include "commands/extension.h" #include "miscadmin.h" +#include "storage/standbydefs.h" #include "utils/array.h" #include "utils/builtins.h" +#include "utils/pg_lsn.h" #define CHECK_IS_BINARY_UPGRADE \ @@ -29,6 +37,9 @@ do { \ errmsg("function can only be called when server is in binary upgrade mode"))); \ } while (0) +#define CHECK_WAL_RECORD(rmgrid, info, expected_rmgrid, expected_info) \ + (rmgrid == expected_rmgrid && info == expected_info) + Datum binary_upgrade_set_next_pg_tablespace_oid(PG_FUNCTION_ARGS) { @@ -261,3 +272,122 @@ binary_upgrade_set_missing_value(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + +/* + * Return true if we didn't find any unexpected WAL record, false otherwise. + * + * This function is used to verify that there are no WAL records (except some + * types) after confirmed_flush_lsn of logical slots, which means all the + * changes were replicated to the subscriber. There is a possibility that some + * WALs are inserted after logical waslenders exit, so such types would be + * ignored. + * + * XLOG_CHECKPOINT_SHUTDOWN is ignored because it would be inserted after the + * waslender exits. Moreover, the following types of records would be during + * the pg_upgrade --check, so they are ignored too. + * + * - XLOG_CHECKPOINT_ONLINE + * - XLOG_RUNNING_XACTS + * - XLOG_FPI_FOR_HINT + * - XLOG_HEAP2_PRUNE + */ +Datum +binary_upgrade_validate_wal_record_types_after_lsn(PG_FUNCTION_ARGS) +{ + XLogRecPtr start_lsn = PG_GETARG_LSN(0); + XLogReaderState *xlogreader; + bool initial_record = true; + bool result = true; + ReadLocalXLogPageNoWaitPrivate *private_data; + + CHECK_IS_BINARY_UPGRADE; + + /* Quick exit if the given lsn is larger than current one */ + if (start_lsn >= GetFlushRecPtr(NULL)) + PG_RETURN_BOOL(false); + + private_data = (ReadLocalXLogPageNoWaitPrivate *) + palloc0(sizeof(ReadLocalXLogPageNoWaitPrivate)); + + xlogreader = XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(.page_read = &read_local_xlog_page_no_wait, + .segment_open = &wal_segment_open, + .segment_close = &wal_segment_close), + private_data); + + if (xlogreader == NULL) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"), + errdetail("Failed while allocating a WAL reading processor."))); + + XLogBeginRead(xlogreader, start_lsn); + + /* Loop until all WALs are read, or unexpected record is found */ + while (result) + { + RmgrIds rmid; + uint8 info; + char *errormsg; + XLogRecord *record; + + CHECK_FOR_INTERRUPTS(); + + record = XLogReadRecord(xlogreader, &errormsg); + + if (record == NULL) + { + ReadLocalXLogPageNoWaitPrivate *check_data; + + /* return NULL, if end of WAL is reached */ + check_data = (ReadLocalXLogPageNoWaitPrivate *) + xlogreader->private_data; + + if (check_data->end_of_wal) + break; + + if (errormsg) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read WAL at %X/%X: %s", + LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg))); + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read WAL at %X/%X", + LSN_FORMAT_ARGS(xlogreader->EndRecPtr)))); + } + + /* Check the type of WAL */ + rmid = XLogRecGetRmid(xlogreader); + info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK; + + if (initial_record) + { + /* Initial record must be XLOG_CHECKPOINT_SHUTDOWN */ + if (!CHECK_WAL_RECORD(rmid, info, RM_XLOG_ID, + XLOG_CHECKPOINT_SHUTDOWN)) + result = false; + + initial_record = false; + + continue; + } + + /* + * XXX: There is a possibility that following records may be + * generated during the upgrade. + */ + if (!CHECK_WAL_RECORD(rmid, info, RM_XLOG_ID, XLOG_CHECKPOINT_SHUTDOWN) && + !CHECK_WAL_RECORD(rmid, info, RM_XLOG_ID, XLOG_CHECKPOINT_ONLINE) && + !CHECK_WAL_RECORD(rmid, info, RM_XLOG_ID, XLOG_FPI_FOR_HINT) && + !CHECK_WAL_RECORD(rmid, info, RM_STANDBY_ID, XLOG_RUNNING_XACTS) && + !CHECK_WAL_RECORD(rmid, info, RM_HEAP2_ID, XLOG_HEAP2_PRUNE)) + result = false; + } + + pfree(xlogreader->private_data); + XLogReaderFree(xlogreader); + + PG_RETURN_BOOL(result); +} diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index b1424fdf9c..df1ce67fc0 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -1480,8 +1480,8 @@ check_new_cluster_logical_replication_slots(void) * Following points are checked: * * - All logical replication slots are usable. - * - All logical replication slots consumed all WALs, except a - * CHECKPOINT_SHUTDOWN record. + * - All logical replication slots consumed all WALs, except some acceptable + * types. */ static void check_old_cluster_for_valid_slots(bool live_check) @@ -1521,8 +1521,8 @@ check_old_cluster_for_valid_slots(bool live_check) } /* - * Do additional checks to ensure that confirmed_flush LSN of all - * the slots is the same as the latest checkpoint location. + * Do additional checks to ensure that all logical replication + * slots have reached the current WAL position. * * Note: This can be satisfied only when the old cluster has been * shut down, so we skip this for live checks. diff --git a/src/bin/pg_upgrade/controldata.c b/src/bin/pg_upgrade/controldata.c index f8f823e2be..4beb65ab22 100644 --- a/src/bin/pg_upgrade/controldata.c +++ b/src/bin/pg_upgrade/controldata.c @@ -169,45 +169,6 @@ get_control_data(ClusterInfo *cluster, bool live_check) } got_cluster_state = true; } - - else if ((p = strstr(bufin, "Latest checkpoint location:")) != NULL) - { - /* - * Read the latest checkpoint location if the cluster is PG17 - * or later. This is used for upgrading logical replication - * slots. Currently, we need it only for the old cluster but - * for simplicity chose not to have additional checks. - */ - if (GET_MAJOR_VERSION(cluster->major_version) >= 1700) - { - char *slash = NULL; - uint32 upper_lsn, - lower_lsn; - - p = strchr(p, ':'); - - if (p == NULL || strlen(p) <= 1) - pg_fatal("%d: controldata retrieval problem", __LINE__); - - p++; /* remove ':' char */ - - p = strpbrk(p, "01234567890ABCDEF"); - - if (p == NULL || strlen(p) <= 1) - pg_fatal("%d: controldata retrieval problem", __LINE__); - - /* - * The upper and lower part of LSN must be read separately - * because it is stored as in %X/%X format. - */ - upper_lsn = strtoul(p, &slash, 16); - lower_lsn = strtoul(++slash, NULL, 16); - - /* And combine them */ - cluster->controldata.chkpnt_latest = - ((uint64) upper_lsn << 32) | lower_lsn; - } - } } rc = pclose(output); diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index f7b0deca87..5d25d1604e 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -647,12 +647,12 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo) * removed. */ res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, " - "(confirmed_flush_lsn = '%X/%X') as caught_up, conflicting as invalid " + "pg_catalog.binary_upgrade_validate_wal_record_types_after_lsn(confirmed_flush_lsn) as caught_up, " + "conflicting as invalid " "FROM pg_catalog.pg_replication_slots " "WHERE slot_type = 'logical' AND " "database = current_database() AND " - "temporary IS FALSE;", - LSN_FORMAT_ARGS(old_cluster.controldata.chkpnt_latest)); + "temporary IS FALSE;"); num_slots = PQntuples(res); diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index f5ce6c3b4d..8a7f56831e 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -246,7 +246,6 @@ typedef struct bool date_is_int; bool float8_pass_by_value; uint32 data_checksum_version; - XLogRecPtr chkpnt_latest; } ControlData; /* diff --git a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl index 01cb04ca12..b91fb2f88f 100644 --- a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl +++ b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl @@ -169,6 +169,26 @@ $subscriber->wait_for_subscription_sync($old_publisher, 'sub'); $subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE"); $old_publisher->stop; +# Dry run, successful check is expected. This is not live check, so shutdown +# checkpoint record would be inserted. We want to test that +# binary_upgrade_validate_wal_record_types_after_lsn() skips the WAL and then +# upcoming pg_upgrade would succeed. +command_ok( + [ + 'pg_upgrade', '--no-sync', + '-d', $old_publisher->data_dir, + '-D', $new_publisher->data_dir, + '-b', $bindir, + '-B', $bindir, + '-s', $new_publisher->host, + '-p', $old_publisher->port, + '-P', $new_publisher->port, + $mode, '--check' + ], + 'run of pg_upgrade of old cluster'); +ok( !-d $new_publisher->data_dir . "/pg_upgrade_output.d", + "pg_upgrade_output.d/ removed after pg_upgrade success"); + # Actual run, successful upgrade is expected command_ok( [ diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 9805bc6118..f3d843222b 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11370,6 +11370,12 @@ proname => 'binary_upgrade_set_next_pg_tablespace_oid', provolatile => 'v', proparallel => 'u', prorettype => 'void', proargtypes => 'oid', prosrc => 'binary_upgrade_set_next_pg_tablespace_oid' }, +{ oid => '8046', descr => 'for use by pg_upgrade', + proname => 'binary_upgrade_validate_wal_record_types_after_lsn', + prorows => '10', proretset => 't', provolatile => 's', prorettype => 'bool', + proargtypes => 'pg_lsn', proallargtypes => '{pg_lsn,bool}', + proargmodes => '{i,o}', proargnames => '{start_lsn,is_ok}', + prosrc => 'binary_upgrade_validate_wal_record_types_after_lsn' }, # conversion functions { oid => '4302', -- 2.27.0