diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 4144a43afd..cfa955a679 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -618,9 +618,9 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; /* - * We can also skip decoding when in 'fast_forward' mode. This check must - * be last because we don't want to set that processing_required flag - * unnecessarily. + * We also skip decoding in 'fast_forward' mode. This check must be last + * because we don't want to set the processing_required flag unless + * we have a decodable message. */ if (ctx->fast_forward) { @@ -1307,8 +1307,8 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, return true; /* - * We can also skip decoding when in 'fast_forward' mode. In passing set - * the 'processing_required' flag to indicate, were it not for this mode, + * We also skip decoding in 'fast_forward' mode. In passing set the + * 'processing_required' flag to indicate, were it not for this mode, * processing *would* have been required. */ if (ctx->fast_forward) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 32869a75ab..e02cd0fa44 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1953,9 +1953,9 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) } /* - * Read to end of WAL starting from the decoding slot's restart_lsn. Return - * true if any meaningful/decodable WAL records are encountered, otherwise - * false. + * Read up to the end of WAL starting from the decoding slot's restart_lsn. + * Return true if any meaningful/decodable WAL records are encountered, + * otherwise false. * * Although this function is currently used only during pg_upgrade, there are * no reasons to restrict it, so IsBinaryUpgrade is not checked here. diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c index 2a831bc397..a3a8ade405 100644 --- a/src/backend/utils/adt/pg_upgrade_support.c +++ b/src/backend/utils/adt/pg_upgrade_support.c @@ -274,8 +274,8 @@ binary_upgrade_set_missing_value(PG_FUNCTION_ARGS) * Returns true if there are no decodable WAL records after the * confirmed_flush_lsn. Otherwise false. * - * This is a special purpose function to ensure the given slot can be upgraded - * without data loss. + * This is a special purpose function to ensure that the given slot can be + * upgraded without data loss. */ Datum binary_upgrade_slot_has_pending_wal(PG_FUNCTION_ARGS) @@ -294,16 +294,10 @@ binary_upgrade_slot_has_pending_wal(PG_FUNCTION_ARGS) slot_name = PG_GETARG_NAME(0); - /* - * Acquire the given slot. There should be no error because the caller has - * already checked the slot exists. - */ + /* Acquire the given slot. */ ReplicationSlotAcquire(NameStr(*slot_name), true); - /* - * It's caller's responsibility to check the health of the slot. Upcoming - * functions assume the restart_lsn points to a valid record. - */ + /* Slots must be valid as otherwise we won't be able to scan the WAL. */ Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE); end_of_wal = GetFlushRecPtr(NULL); diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index 123f47a81f..8f3f5585a4 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -1541,10 +1541,8 @@ check_new_cluster_logical_replication_slots(void) /* * check_old_cluster_for_valid_slots() * - * Verify that all the logical slots are usable and have consumed all the WAL - * before shutdown. The check has already been done in - * get_old_cluster_logical_slot_infos(), so this function reads the result and - * reports to the user. + * Verify that all the logical slots are valid and have consumed all the WAL + * before shutdown. */ static void check_old_cluster_for_valid_slots(bool live_check) @@ -1607,7 +1605,7 @@ check_old_cluster_for_valid_slots(bool live_check) fclose(script); pg_log(PG_REPORT, "fatal"); - pg_fatal("Your installation contains logical replication slots that cannot be upgraded.\n" + pg_fatal("Your installation contains invalid logical replication slots.\n" "These slots can't be copied, so this cluster cannot be upgraded.\n" "Consider removing invalid slots and/or consuming the pending WAL if any,\n" "and then restart the upgrade.\n" diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index c56769fe54..5494e69227 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -651,8 +651,8 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) /* * Fetch the logical replication slot information. The check whether the * slot is considered caught up is done by an upgrade function. This - * regards the slot is caught up if any changes are not found while - * decoding. See binary_upgrade_slot_has_pending_wal(). + * regards the slot as caught up if we don't find any decodable changes. + * See binary_upgrade_slot_has_pending_wal(). * * Note that we can't ensure whether the slot is caught up during * live_check as the new WAL records could be generated. diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c index 7acdf31d02..3960af4036 100644 --- a/src/bin/pg_upgrade/pg_upgrade.c +++ b/src/bin/pg_upgrade/pg_upgrade.c @@ -190,7 +190,12 @@ main(int argc, char **argv) check_ok(); /* - * If the old cluster has logical slots, migrate them to a new cluster. + * Migrate the logical slots to the new cluster. Note that we need to do + * this after resetting WAL because otherwise the required WAL would be + * removed and slots would become unusable. There is a possibility that + * background processes might generate some WAL before we could create the + * slots in the new cluster but we can ignore that WAL as that won't be + * required downstream. */ if (count_old_cluster_logical_slots()) { @@ -890,7 +895,6 @@ create_logical_replication_slots(void) LogicalSlotInfoArr *slot_arr = &old_db->slot_arr; PGconn *conn; PQExpBuffer query; - char log_file_name[MAXPGPATH]; /* Skip this database if there are no slots */ if (slot_arr->nslots == 0) @@ -899,9 +903,6 @@ create_logical_replication_slots(void) conn = connectToServer(&new_cluster, old_db->db_name); query = createPQExpBuffer(); - snprintf(log_file_name, sizeof(log_file_name), - DB_DUMP_LOG_FILE_MASK, old_db->db_oid); - pg_log(PG_STATUS, "%s", old_db->db_name); for (int slotnum = 0; slotnum < slot_arr->nslots; slotnum++) diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 355247a58b..f8258d7c28 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -110,11 +110,7 @@ typedef struct LogicalDecodingContext /* Are we processing the end LSN of a transaction? */ bool end_xact; - /* - * Did the logical decoding context require processing WALs? - * - * This flag is used only when in 'fast_forward' mode. - */ + /* Do we need to process any change in 'fast_forward' mode? */ bool processing_required; } LogicalDecodingContext;