From 4205943a12d50255e7150f3b70aa75b061070e5e Mon Sep 17 00:00:00 2001 From: Fujii Masao Date: Mon, 25 Jul 2022 22:45:06 +0900 Subject: [PATCH 2/3] Add common function to commit xact or subxact during pre-commit. --- contrib/postgres_fdw/connection.c | 122 ++++++++++++++++-------------- 1 file changed, 66 insertions(+), 56 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index cbee285480..ec290459be 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -111,6 +111,8 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, static bool pgfdw_get_result_timed(PGconn *conn, TimestampTz endtime, PGresult **result, bool *timed_out); static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel); +static bool pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char *sql, + List **pending_entries, bool toplevel); static void pgfdw_finish_pre_commit_cleanup(List *pending_entries); static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel); @@ -894,8 +896,6 @@ pgfdw_xact_callback(XactEvent event, void *arg) hash_seq_init(&scan, ConnectionHash); while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) { - PGresult *res; - /* Ignore cache entry if no open connection right now */ if (entry->conn == NULL) continue; @@ -911,45 +911,10 @@ pgfdw_xact_callback(XactEvent event, void *arg) case XACT_EVENT_PARALLEL_PRE_COMMIT: case XACT_EVENT_PRE_COMMIT: - /* - * If abort cleanup previously failed for this connection, - * we can't issue any more commands against it. - */ - pgfdw_reject_incomplete_xact_state_change(entry); - /* Commit all remote transactions during pre-commit */ - entry->changing_xact_state = true; - if (entry->parallel_commit) - { - do_sql_command_begin(entry->conn, "COMMIT TRANSACTION"); - pending_entries = lappend(pending_entries, entry); + if (pgfdw_exec_pre_commit(entry, "COMMIT TRANSACTION", + &pending_entries, true)) continue; - } - do_sql_command(entry->conn, "COMMIT TRANSACTION"); - entry->changing_xact_state = false; - - /* - * If there were any errors in subtransactions, and we - * made prepared statements, do a DEALLOCATE ALL to make - * sure we get rid of all prepared statements. This is - * annoying and not terribly bulletproof, but it's - * probably not worth trying harder. - * - * DEALLOCATE ALL only exists in 8.3 and later, so this - * constrains how old a server postgres_fdw can - * communicate with. We intentionally ignore errors in - * the DEALLOCATE, so that we can hobble along to some - * extent with older servers (leaking prepared statements - * as we go; but we don't really support update operations - * pre-8.3 anyway). - */ - if (entry->have_prep_stmt && entry->have_error) - { - res = PQexec(entry->conn, "DEALLOCATE ALL"); - PQclear(res); - } - entry->have_prep_stmt = false; - entry->have_error = false; break; case XACT_EVENT_PRE_PREPARE: @@ -1014,6 +979,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, ConnCacheEntry *entry; int curlevel; List *pending_entries = NIL; + char sql[100]; /* Nothing to do at subxact start, nor after commit. */ if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB || @@ -1029,11 +995,11 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, * of the current level, and close them. */ curlevel = GetCurrentTransactionNestLevel(); + snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel); + hash_seq_init(&scan, ConnectionHash); while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) { - char sql[100]; - /* * We only care about connections with open remote subtransactions of * the current level. @@ -1047,23 +1013,9 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, if (event == SUBXACT_EVENT_PRE_COMMIT_SUB) { - /* - * If abort cleanup previously failed for this connection, we - * can't issue any more commands against it. - */ - pgfdw_reject_incomplete_xact_state_change(entry); - /* Commit all remote subtransactions during pre-commit */ - snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel); - entry->changing_xact_state = true; - if (entry->parallel_commit) - { - do_sql_command_begin(entry->conn, sql); - pending_entries = lappend(pending_entries, entry); + if (pgfdw_exec_pre_commit(entry, sql, &pending_entries, false)) continue; - } - do_sql_command(entry->conn, sql); - entry->changing_xact_state = false; } else { @@ -1512,6 +1464,64 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel) entry->changing_xact_state = false; } +/* + * Commit all remote transactions or subtransactions during pre-commit. + * + * If parallel_commit is enabled at this connection cache entry and + * the result of "sql" needs to be gotten later, return true and append + * this entry to "pending_entries". + * + * "toplevel" should be set to true if toplevel (main) transaction is + * committed, false otherwise. + */ +static bool +pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char *sql, + List **pending_entries, bool toplevel) +{ + PGresult *res; + + /* + * If abort cleanup previously failed for this connection, we can't issue + * any more commands against it. + */ + pgfdw_reject_incomplete_xact_state_change(entry); + + entry->changing_xact_state = true; + if (entry->parallel_commit) + { + do_sql_command_begin(entry->conn, sql); + *pending_entries = lappend(*pending_entries, entry); + return true; + } + do_sql_command(entry->conn, sql); + entry->changing_xact_state = false; + + if (!toplevel) + return false; + + /* + * If there were any errors in subtransactions, and we made prepared + * statements, do a DEALLOCATE ALL to make sure we get rid of all prepared + * statements. This is annoying and not terribly bulletproof, but it's + * probably not worth trying harder. + * + * DEALLOCATE ALL only exists in 8.3 and later, so this constrains how old + * a server postgres_fdw can communicate with. We intentionally ignore + * errors in the DEALLOCATE, so that we can hobble along to some extent + * with older servers (leaking prepared statements as we go; but we don't + * really support update operations pre-8.3 anyway). + */ + if (entry->have_prep_stmt && entry->have_error) + { + res = PQexec(entry->conn, "DEALLOCATE ALL"); + PQclear(res); + } + entry->have_prep_stmt = false; + entry->have_error = false; + + return false; +} + /* * Finish pre-commit cleanup of connections on each of which we've sent a * COMMIT command to the remote server. -- 2.37.1