diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 705f60a3ae..1e67da983c 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -105,6 +105,8 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors); static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result); +static void pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql, + bool toplevel); static bool UserMappingPasswordRequired(UserMapping *user); static bool disconnect_cached_connections(Oid serverid); @@ -872,8 +874,6 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* If it has an open remote transaction, try to close it */ if (entry->xact_depth > 0) { - bool abort_cleanup_failure = false; - elog(DEBUG3, "closing remote transaction on connection %p", entry->conn); @@ -940,67 +940,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) case XACT_EVENT_PARALLEL_ABORT: case XACT_EVENT_ABORT: - /* - * Don't try to clean up the connection if we're already - * in error recursion trouble. - */ - if (in_error_recursion_trouble()) - entry->changing_xact_state = true; - - /* - * If connection is already unsalvageable, don't touch it - * further. - */ - if (entry->changing_xact_state) - break; - - /* - * Mark this connection as in the process of changing - * transaction state. - */ - entry->changing_xact_state = true; - - /* Assume we might have lost track of prepared statements */ - entry->have_error = true; - - /* - * If a command has been submitted to the remote server by - * using an asynchronous execution function, the command - * might not have yet completed. Check to see if a - * command is still being processed by the remote server, - * and if so, request cancellation of the command. - */ - if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE && - !pgfdw_cancel_query(entry->conn)) - { - /* Unable to cancel running query. */ - abort_cleanup_failure = true; - } - else if (!pgfdw_exec_cleanup_query(entry->conn, - "ABORT TRANSACTION", - false)) - { - /* Unable to abort remote transaction. */ - abort_cleanup_failure = true; - } - else if (entry->have_prep_stmt && entry->have_error && - !pgfdw_exec_cleanup_query(entry->conn, - "DEALLOCATE ALL", - true)) - { - /* Trouble clearing prepared statements. */ - abort_cleanup_failure = true; - } - else - { - entry->have_prep_stmt = false; - entry->have_error = false; - /* Also reset per-connection state */ - memset(&entry->state, 0, sizeof(entry->state)); - } - - /* Disarm changing_xact_state if it all worked. */ - entry->changing_xact_state = abort_cleanup_failure; + pgfdw_abort_cleanup(entry, "ABORT TRANSACTION", true); break; } } @@ -1091,46 +1031,13 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, do_sql_command(entry->conn, sql); entry->changing_xact_state = false; } - else if (in_error_recursion_trouble()) + else { - /* - * Don't try to clean up the connection if we're already in error - * recursion trouble. - */ - entry->changing_xact_state = true; - } - else if (!entry->changing_xact_state) - { - bool abort_cleanup_failure = false; - - /* Remember that abort cleanup is in progress. */ - entry->changing_xact_state = true; - - /* Assume we might have lost track of prepared statements */ - entry->have_error = true; - - /* - * If a command has been submitted to the remote server by using - * an asynchronous execution function, the command might not have - * yet completed. Check to see if a command is still being - * processed by the remote server, and if so, request cancellation - * of the command. - */ - if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE && - !pgfdw_cancel_query(entry->conn)) - abort_cleanup_failure = true; - else - { - /* Rollback all remote subtransactions during abort */ - snprintf(sql, sizeof(sql), - "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", - curlevel, curlevel); - if (!pgfdw_exec_cleanup_query(entry->conn, sql, false)) - abort_cleanup_failure = true; - } - - /* Disarm changing_xact_state if it all worked. */ - entry->changing_xact_state = abort_cleanup_failure; + /* Rollback all remote subtransactions during abort */ + snprintf(sql, sizeof(sql), + "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", + curlevel, curlevel); + pgfdw_abort_cleanup(entry, sql, false); } /* OK, we're outta that level of subtransaction */ @@ -1409,6 +1316,72 @@ exit: ; return timed_out; } +/* + * Abort remote transaction. + * + * The statement specified in "sql" is sent to the remote server, + * in order to rollback the remote transaction. + * + * "toplevel" should be set to true if toplevel (main) transaction is + * rollbacked, false otherwise. + * + * Set entry->changing_xact_state to false on success, true on failure. + */ +static void +pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql, bool toplevel) +{ + /* + * Don't try to clean up the connection if we're already in error + * recursion trouble. + */ + if (in_error_recursion_trouble()) + entry->changing_xact_state = true; + + /* + * If connection is already unsalvageable, don't touch it further. + */ + if (entry->changing_xact_state) + return; + + /* + * Mark this connection as in the process of changing transaction state. + */ + entry->changing_xact_state = true; + + /* Assume we might have lost track of prepared statements */ + entry->have_error = true; + + /* + * If a command has been submitted to the remote server by using an + * asynchronous execution function, the command might not have yet + * completed. Check to see if a command is still being processed by the + * remote server, and if so, request cancellation of the command. + */ + if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE && + !pgfdw_cancel_query(entry->conn)) + return; /* Unable to cancel running query */ + + if (!pgfdw_exec_cleanup_query(entry->conn, sql, false)) + return; /* Unable to abort remote transaction */ + + if (toplevel) + { + if (entry->have_prep_stmt && entry->have_error && + !pgfdw_exec_cleanup_query(entry->conn, + "DEALLOCATE ALL", + true)) + return; /* Trouble clearing prepared statements */ + + entry->have_prep_stmt = false; + entry->have_error = false; + /* Also reset per-connection state */ + memset(&entry->state, 0, sizeof(entry->state)); + } + + /* Disarm changing_xact_state if it all worked */ + entry->changing_xact_state = false; +} + /* * List active foreign server connections. *