From 7e51aa6493edfcb7b3aafa8bce66642509e0cdec Mon Sep 17 00:00:00 2001 From: jian he Date: Fri, 24 Jan 2025 22:49:47 +0800 Subject: [PATCH v12 2/2] pg_restore --exclude-database=PATTERN --- src/bin/pg_dump/common_dumpall_restore.c | 49 +++++++ src/bin/pg_dump/common_dumpall_restore.h | 3 +- src/bin/pg_dump/pg_restore.c | 164 +++++++++++------------ 3 files changed, 133 insertions(+), 83 deletions(-) diff --git a/src/bin/pg_dump/common_dumpall_restore.c b/src/bin/pg_dump/common_dumpall_restore.c index ace5077085..fd966c42a0 100644 --- a/src/bin/pg_dump/common_dumpall_restore.c +++ b/src/bin/pg_dump/common_dumpall_restore.c @@ -312,3 +312,52 @@ parseDumpFormat(const char *format) return archDumpFormat; } + +static size_t +quote_literal_internal(char *dst, const char *src, size_t len) +{ + const char *s; + char *savedst = dst; + + for (s = src; s < src + len; s++) + { + if (*s == '\\') + { + *dst++ = ESCAPE_STRING_SYNTAX; + break; + } + } + + *dst++ = '\''; + while (len-- > 0) + { + if (SQL_STR_DOUBLE(*src, true)) + *dst++ = *src; + *dst++ = *src++; + } + *dst++ = '\''; + + return dst - savedst; +} + +/* + * quote_literal_cstr - + * returns a properly quoted literal + * copied from src/backend/utils/adt/quote.c + */ +char * +quote_literal_cstr(const char *rawstr) +{ + char *result; + int len; + int newlen; + + len = strlen(rawstr); + /* We make a worst-case result area; wasting a little space is OK */ + result = pg_malloc(len * 2 + 3 + 1); + + newlen = quote_literal_internal(result, rawstr, len); + result[newlen] = '\0'; + + return result; +} diff --git a/src/bin/pg_dump/common_dumpall_restore.h b/src/bin/pg_dump/common_dumpall_restore.h index aef7abdf4f..d8893befca 100644 --- a/src/bin/pg_dump/common_dumpall_restore.h +++ b/src/bin/pg_dump/common_dumpall_restore.h @@ -16,10 +16,11 @@ #include "pg_backup.h" -extern PGconn *v(const char *dbname, const char *connection_string, const char *pghost, +extern PGconn *connectDatabase(const char *dbname, const char *connection_string, const char *pghost, const char *pgport, const char *pguser, trivalue prompt_password, bool fail_on_error, const char *progname, const char **connstr, int *server_version); extern PGresult *executeQuery(PGconn *conn, const char *query); extern ArchiveFormat parseDumpFormat(const char *format); +extern char *quote_literal_cstr(const char *s); #endif /* COMMON_DUMPALL_RESTORE_H */ diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c index a715448dce..cef8af652c 100644 --- a/src/bin/pg_dump/pg_restore.c +++ b/src/bin/pg_dump/pg_restore.c @@ -81,22 +81,27 @@ static bool restoreOneDatabase(const char *inputFileSpec, RestoreOptions *opts, int numWorkers, bool append_data); static int ReadOneStatement(StringInfo inBuf, FILE *pfile); static int restoreAllDatabases(PGconn *conn, const char *dumpdirpath, - SimpleStringList db_exclude_patterns, RestoreOptions *opts, int numWorkers); + RestoreOptions *opts, int numWorkers); static void execute_global_sql_commands(PGconn *conn, const char *dumpdirpath, - const char *outfile); -static int filter_dbnames_for_restore(PGconn *conn, - SimpleDatabaseOidList *dbname_oid_list, SimpleStringList db_exclude_patterns); + const char *outfile); + +static void expand_db_pattern_restore(PGconn *conn, + SimpleStringList *patterns, + SimpleStringList *names, + SimpleDatabaseOidList *dboid_list); +static int filter_dbnames_for_restore(SimpleDatabaseOidList *dbname_oid_list); static int get_dbname_oid_list_from_mfile(const char *dumpdirpath, SimpleDatabaseOidList *dbname_oid_list); static void simple_db_oid_list_append(SimpleDatabaseOidList *list, Oid db_oid, const char *dbname); -static bool is_full_pattern(PGconn *conn, const char *str, const char *ptrn); static void simple_string_full_list_delete(SimpleStringList *list); static void simple_db_oid_full_list_delete(SimpleDatabaseOidList *list); static void simple_db_oid_list_delete(SimpleDatabaseOidList *list, SimpleDatabaseOidListCell *cell, SimpleDatabaseOidListCell *prev); +static SimpleStringList db_exclude_patterns = {NULL, NULL}; +static SimpleStringList database_exclude_names = {NULL, NULL}; int main(int argc, char **argv) { @@ -118,7 +123,6 @@ main(int argc, char **argv) static int strict_names = 0; bool data_only = false; bool schema_only = false; - SimpleStringList db_exclude_patterns = {NULL, NULL}; bool globals_only = false; struct option cmdopts[] = { @@ -491,8 +495,7 @@ main(int argc, char **argv) else { /* Now restore all the databases from map.dat file. */ - exit_code = restoreAllDatabases(conn, inputFileSpec, db_exclude_patterns, - opts, numWorkers); + exit_code = restoreAllDatabases(conn, inputFileSpec, opts, numWorkers); } /* Free db pattern list. */ @@ -795,17 +798,75 @@ ReadOneStatement(StringInfo inBuf, FILE *pfile) return 'Q'; } + +/* + * Find a list of database names that match the given patterns, + * the results is stored in *names*. + */ +static void +expand_db_pattern_restore(PGconn *conn, + SimpleStringList *patterns, + SimpleStringList *names, + SimpleDatabaseOidList *dboid_list) +{ + PQExpBuffer query; + PGresult *res; + SimpleDatabaseOidListCell *dboid_cell = NULL; + + if (patterns->head == NULL) + return; + + query = createPQExpBuffer(); + + /* + * the construct pattern matching query: + * SELECT 1 WHERE XXX OPERATOR(pg_catalog.~) '^(PATTERN)$' COLLATE pg_catalog.default + * + * XXX represents the string literal database name derived from the + * dboid_list variable, which is initially extracted from the map.dat + * file located in the backup directory. + * that's why we need quote_literal_cstr. + */ + for (dboid_cell = dboid_list->head; dboid_cell; dboid_cell = dboid_cell->next) + { + for (SimpleStringListCell *cell = patterns->head; cell; cell = cell->next) + { + int dotcnt; + + appendPQExpBufferStr(query, "SELECT 1 "); + processSQLNamePattern(conn, query, cell->val, false, + false, NULL, quote_literal_cstr(dboid_cell->db_name), + NULL, NULL, NULL, &dotcnt); + + if (dotcnt > 0) + { + pg_log_error("improper qualified name (too many dotted names): %s", + cell->val); + PQfinish(conn); + exit_nicely(1); + } + + res = executeQuery(conn, query->data); + if ((PQresultStatus(res) == PGRES_TUPLES_OK) && PQntuples(res)) + simple_string_list_append(names, dboid_cell->db_name); + + PQclear(res); + resetPQExpBuffer(query); + } + } + destroyPQExpBuffer(query); +} + /* * filter_dbnames_for_restore * - * This will remove names from all dblist that are given with exclude-database - * option. + * This will remove names from all dblist that are same as + * database_exclude_names list element * * returns number of dbnames those will be restored. */ static int -filter_dbnames_for_restore(PGconn *conn, SimpleDatabaseOidList *dbname_oid_list, - SimpleStringList db_exclude_patterns) +filter_dbnames_for_restore(SimpleDatabaseOidList *dbname_oid_list) { SimpleDatabaseOidListCell *dboid_cell = dbname_oid_list->head; SimpleDatabaseOidListCell *dboidprecell = NULL; @@ -821,18 +882,13 @@ filter_dbnames_for_restore(PGconn *conn, SimpleDatabaseOidList *dbname_oid_list, bool skip_db_restore = false; SimpleDatabaseOidListCell *next = dboid_cell->next; - /* Now match this dbname with exclude-database list. */ - for (SimpleStringListCell *celldb = db_exclude_patterns.head; celldb; celldb = celldb->next) + for (SimpleStringListCell *celldb = database_exclude_names.head; celldb; celldb = celldb->next) { - if ((conn && is_full_pattern(conn, dboid_cell->db_name, celldb->val)) || - (!conn && pg_strcasecmp(dboid_cell->db_name, celldb->val) == 0)) + if (pg_strcasecmp(dboid_cell->db_name, celldb->val) == 0) { /* * As we need to skip this dbname so set flag to remove it from * list. - * - * Note: we can't remove this pattern from skip list as we - * might have multiple database name with this same pattern. */ skip_db_restore = true; break; @@ -937,8 +993,7 @@ get_dbname_oid_list_from_mfile(const char *dumpdirpath, SimpleDatabaseOidList *d */ static int restoreAllDatabases(PGconn *conn, const char *dumpdirpath, - SimpleStringList db_exclude_patterns, RestoreOptions *opts, - int numWorkers) + RestoreOptions *opts, int numWorkers) { SimpleDatabaseOidList dbname_oid_list = {NULL, NULL}; SimpleDatabaseOidListCell *dboid_cell; @@ -977,13 +1032,13 @@ restoreAllDatabases(PGconn *conn, const char *dumpdirpath, } /* - * TODO: To skip databases, we need to make a design. - * - * Skip any explicitly excluded database. If there is no database - * connection, then just consider pattern as simple name to compare. + * processing pg_retsore --exclude-database=PATTERN. */ - num_db_restore = filter_dbnames_for_restore(conn, &dbname_oid_list, - db_exclude_patterns); + expand_db_pattern_restore(conn, + &db_exclude_patterns, + &database_exclude_names, + &dbname_oid_list); + num_db_restore = filter_dbnames_for_restore(&dbname_oid_list); /* Close the db connection as we are done globals and patterns. */ if (conn) @@ -1218,58 +1273,3 @@ simple_db_oid_list_delete(SimpleDatabaseOidList *list, SimpleDatabaseOidListCell pfree(cell); } } - -/* - * is_full_pattern - * - * This uses substring function to make 1st string from pattern. - * Outstring of substring function is compared with 1st string to - * validate this pattern. - * - * Returns true if 1st string can be constructed from given pattern. - * - */ -static bool -is_full_pattern(PGconn *conn, const char *str, const char *ptrn) -{ - PQExpBuffer query; - PGresult *result; - - query = createPQExpBuffer(); - - printfPQExpBuffer(query, - "SELECT substring ( " - " '%s' , " - " '%s' ) ", str, ptrn); - - result = executeQuery(conn, query->data); - - if (PQresultStatus(result) == PGRES_TUPLES_OK) - { - if (PQntuples(result) == 1) - { - const char *outstr = NULL; - - /* - * If output string of substring function is matches with str, then - * we can construct str from pattern. - */ - if (!PQgetisnull(result, 0, 0)) - outstr = PQgetvalue(result, 0, 0); - - if (outstr && pg_strcasecmp(outstr, str) == 0) - { - PQclear(result); - destroyPQExpBuffer(query); - return true; - } - } - } - else - pg_log_error("could not execute query: \"%s\" \nCommand was: \"%s\"", PQerrorMessage(conn), query->data); - - PQclear(result); - destroyPQExpBuffer(query); - - return false; -} -- 2.34.1