From aa115d03880968c2e5bab68415e06e17a337a45b Mon Sep 17 00:00:00 2001 From: Fujii Masao Date: Mon, 25 Jul 2022 17:25:24 +0900 Subject: [PATCH 1/3] Refactor pgfdw_get_result() and pgfdw_get_cleanup_result(). --- contrib/postgres_fdw/connection.c | 125 +++++++++++------------------- 1 file changed, 47 insertions(+), 78 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 939d114f02..cbee285480 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -108,8 +108,8 @@ static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel); static bool pgfdw_cancel_query(PGconn *conn); static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors); -static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, - PGresult **result, bool *timed_out); +static bool pgfdw_get_result_timed(PGconn *conn, TimestampTz endtime, + PGresult **result, bool *timed_out); static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel); static void pgfdw_finish_pre_commit_cleanup(List *pending_entries); static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, @@ -799,53 +799,12 @@ pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state) PGresult * pgfdw_get_result(PGconn *conn, const char *query) { - PGresult *volatile last_res = NULL; - - /* In what follows, do not leak any PGresults on an error. */ - PG_TRY(); - { - for (;;) - { - PGresult *res; - - while (PQisBusy(conn)) - { - int wc; - - /* Sleep until there's something to do */ - wc = WaitLatchOrSocket(MyLatch, - WL_LATCH_SET | WL_SOCKET_READABLE | - WL_EXIT_ON_PM_DEATH, - PQsocket(conn), - -1L, PG_WAIT_EXTENSION); - ResetLatch(MyLatch); - - CHECK_FOR_INTERRUPTS(); - - /* Data available in socket? */ - if (wc & WL_SOCKET_READABLE) - { - if (!PQconsumeInput(conn)) - pgfdw_report_error(ERROR, NULL, conn, false, query); - } - } - - res = PQgetResult(conn); - if (res == NULL) - break; /* query is complete */ + PGresult *result = NULL; - PQclear(last_res); - last_res = res; - } - } - PG_CATCH(); - { - PQclear(last_res); - PG_RE_THROW(); - } - PG_END_TRY(); + if (pgfdw_get_result_timed(conn, 0, &result, NULL)) + pgfdw_report_error(ERROR, NULL, conn, false, query); - return last_res; + return result; } /* @@ -1295,7 +1254,7 @@ pgfdw_cancel_query(PGconn *conn) } /* Get and discard the result of the query. */ - if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out)) + if (pgfdw_get_result_timed(conn, endtime, &result, &timed_out)) { if (timed_out) ereport(WARNING, @@ -1351,7 +1310,7 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors) } /* Get the result of the query. */ - if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out)) + if (pgfdw_get_result_timed(conn, endtime, &result, &timed_out)) { if (timed_out) ereport(WARNING, @@ -1375,24 +1334,33 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors) } /* - * Get, during abort cleanup, the result of a query that is in progress. This - * might be a query that is being interrupted by transaction abort, or it might - * be a query that was initiated as part of transaction abort to get the remote - * side back to the appropriate state. + * Get the result of a query. + * + * This function offers quick responsiveness by checking for any interruptions. + * + * If timed_out is NULL, the timeout does not occur. Otherwise, the timeout is + * enabled and endtime is used as the time at which this function should + * give up and assume the remote side is dead. + * + * Return true if the timeout expired or connection trouble occurred. Otherwise + * return false and set *result to the last result of a query. Set timed_out to + * true only when the timeout expired. + * + * This function emulates PQexec()'s behavior of returning the last result + * when there are many. + * + * Caller is responsible for the error handling on the result. * - * endtime is the time at which we should give up and assume the remote - * side is dead. Returns true if the timeout expired or connection trouble - * occurred, false otherwise. Sets *result except in case of a timeout. - * Sets timed_out to true only when the timeout expired. */ static bool -pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result, - bool *timed_out) +pgfdw_get_result_timed(PGconn *conn, TimestampTz endtime, PGresult **result, + bool *timed_out) { volatile bool failed = false; PGresult *volatile last_res = NULL; - *timed_out = false; + if (timed_out != NULL) + *timed_out = false; /* In what follows, do not leak any PGresults on an error. */ PG_TRY(); @@ -1404,23 +1372,27 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result, while (PQisBusy(conn)) { int wc; - TimestampTz now = GetCurrentTimestamp(); - long cur_timeout; + long cur_timeout = -1; + int wakeEvents = WL_LATCH_SET | WL_SOCKET_READABLE | + WL_EXIT_ON_PM_DEATH; /* If timeout has expired, give up, else get sleep time. */ - cur_timeout = TimestampDifferenceMilliseconds(now, endtime); - if (cur_timeout <= 0) + if (timed_out != NULL) { - *timed_out = true; - failed = true; - goto exit; + TimestampTz now = GetCurrentTimestamp(); + + cur_timeout = TimestampDifferenceMilliseconds(now, endtime); + if (cur_timeout <= 0) + { + *timed_out = true; + failed = true; + goto exit; + } + wakeEvents |= WL_TIMEOUT; } /* Sleep until there's something to do */ - wc = WaitLatchOrSocket(MyLatch, - WL_LATCH_SET | WL_SOCKET_READABLE | - WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - PQsocket(conn), + wc = WaitLatchOrSocket(MyLatch, wakeEvents, PQsocket(conn), cur_timeout, PG_WAIT_EXTENSION); ResetLatch(MyLatch); @@ -1458,6 +1430,7 @@ exit: ; PQclear(last_res); else *result = last_res; + return failed; } @@ -1599,13 +1572,9 @@ pgfdw_finish_pre_commit_cleanup(List *pending_entries) entry = (ConnCacheEntry *) lfirst(lc); /* Ignore errors (see notes in pgfdw_xact_callback) */ - while ((res = PQgetResult(entry->conn)) != NULL) - { - PQclear(res); - /* Stop if the connection is lost (else we'll loop infinitely) */ - if (PQstatus(entry->conn) == CONNECTION_BAD) - break; - } + pgfdw_get_result_timed(entry->conn, 0, &res, NULL); + PQclear(res); + entry->have_prep_stmt = false; entry->have_error = false; -- 2.37.1