From dbf44195ca0c51a172634a5d0fc16c34beb0381f Mon Sep 17 00:00:00 2001 From: Fujii Masao Date: Mon, 25 Jul 2022 23:27:14 +0900 Subject: [PATCH 3/3] Merge pgfdw_finish_pre_commit_cleanup and pgfdw_finish_pre_subcommit_cleanup into one. --- contrib/postgres_fdw/connection.c | 78 ++++++++++--------------------- 1 file changed, 25 insertions(+), 53 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index ec290459be..6e23046ad6 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -113,9 +113,8 @@ static bool pgfdw_get_result_timed(PGconn *conn, TimestampTz endtime, static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel); static bool pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char *sql, List **pending_entries, bool toplevel); -static void pgfdw_finish_pre_commit_cleanup(List *pending_entries); -static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, - int curlevel); +static void pgfdw_finish_pre_commit(List *pending_entries, const char *sql, + bool toplevel); static bool UserMappingPasswordRequired(UserMapping *user); static bool disconnect_cached_connections(Oid serverid); @@ -954,7 +953,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) { Assert(event == XACT_EVENT_PARALLEL_PRE_COMMIT || event == XACT_EVENT_PRE_COMMIT); - pgfdw_finish_pre_commit_cleanup(pending_entries); + pgfdw_finish_pre_commit(pending_entries, "COMMIT TRANSACTION", true); } /* @@ -1031,7 +1030,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, if (pending_entries) { Assert(event == SUBXACT_EVENT_PRE_COMMIT_SUB); - pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel); + pgfdw_finish_pre_commit(pending_entries, sql, false); } } @@ -1523,11 +1522,14 @@ pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char *sql, } /* - * Finish pre-commit cleanup of connections on each of which we've sent a - * COMMIT command to the remote server. + * Wait for all remote transactions or subtransactions to be committed + * and finish pre-commit. + * + * "toplevel" should be set to true if toplevel (main) transaction is + * committed, false otherwise. */ static void -pgfdw_finish_pre_commit_cleanup(List *pending_entries) +pgfdw_finish_pre_commit(List *pending_entries, const char *sql, bool toplevel) { ConnCacheEntry *entry; List *pending_deallocs = NIL; @@ -1536,7 +1538,8 @@ pgfdw_finish_pre_commit_cleanup(List *pending_entries) Assert(pending_entries); /* - * Get the result of the COMMIT command for each of the pending entries + * Get the result of COMMIT or RELEASE command for each of the pending + * entries. */ foreach(lc, pending_entries) { @@ -1548,23 +1551,26 @@ pgfdw_finish_pre_commit_cleanup(List *pending_entries) * We might already have received the result on the socket, so pass * consume_input=true to try to consume it first */ - do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true); + do_sql_command_end(entry->conn, sql, true); entry->changing_xact_state = false; /* Do a DEALLOCATE ALL in parallel if needed */ - if (entry->have_prep_stmt && entry->have_error) + if (toplevel) { - /* Ignore errors (see notes in pgfdw_xact_callback) */ - if (PQsendQuery(entry->conn, "DEALLOCATE ALL")) + if (entry->have_prep_stmt && entry->have_error) { - pending_deallocs = lappend(pending_deallocs, entry); - continue; + /* Ignore errors (see notes in pgfdw_xact_callback) */ + if (PQsendQuery(entry->conn, "DEALLOCATE ALL")) + { + pending_deallocs = lappend(pending_deallocs, entry); + continue; + } } + entry->have_prep_stmt = false; + entry->have_error = false; } - entry->have_prep_stmt = false; - entry->have_error = false; - pgfdw_reset_xact_state(entry, true); + pgfdw_reset_xact_state(entry, toplevel); } /* No further work if no pending entries */ @@ -1588,41 +1594,7 @@ pgfdw_finish_pre_commit_cleanup(List *pending_entries) entry->have_prep_stmt = false; entry->have_error = false; - pgfdw_reset_xact_state(entry, true); - } -} - -/* - * Finish pre-subcommit cleanup of connections on each of which we've sent a - * RELEASE command to the remote server. - */ -static void -pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel) -{ - ConnCacheEntry *entry; - char sql[100]; - ListCell *lc; - - Assert(pending_entries); - - /* - * Get the result of the RELEASE command for each of the pending entries - */ - snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel); - foreach(lc, pending_entries) - { - entry = (ConnCacheEntry *) lfirst(lc); - - Assert(entry->changing_xact_state); - - /* - * We might already have received the result on the socket, so pass - * consume_input=true to try to consume it first - */ - do_sql_command_end(entry->conn, sql, true); - entry->changing_xact_state = false; - - pgfdw_reset_xact_state(entry, false); + pgfdw_reset_xact_state(entry, toplevel); } } -- 2.37.1