diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 76994f3820..2f411cf2f7 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -74,6 +74,7 @@ static unsigned int prep_stmt_number = 0; static bool xact_got_connection = false; /* prototypes of private functions */ +static void make_new_connection(ConnCacheEntry *entry, UserMapping *user); static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); static void disconnect_pg_server(ConnCacheEntry *entry); static void check_conn_params(const char **keywords, const char **values, UserMapping *user); @@ -108,9 +109,10 @@ PGconn * GetConnection(UserMapping *user, bool will_prep_stmt) { bool found; - volatile bool retry_conn = false; + bool retry = false; ConnCacheEntry *entry; ConnCacheKey key; + MemoryContext ccxt = CurrentMemoryContext; /* First time through, initialize connection cache hashtable */ if (ConnectionHash == NULL) @@ -160,23 +162,14 @@ GetConnection(UserMapping *user, bool will_prep_stmt) /* Reject further use of connections which failed abort cleanup. */ pgfdw_reject_incomplete_xact_state_change(entry); -retry: - /* * If the connection needs to be remade due to invalidation, disconnect as - * soon as we're out of all transactions. Also, if previous attempt to - * start new remote transaction failed on the cached connection, - * disconnect it to retry a new connection. + * soon as we're out of all transactions. */ - if ((entry->conn != NULL && entry->invalidated && - entry->xact_depth == 0) || retry_conn) + if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0) { - if (retry_conn) - elog(DEBUG3, "closing connection %p to reestablish a new one", - entry->conn); - else - elog(DEBUG3, "closing connection %p for option changes to take effect", - entry->conn); + elog(DEBUG3, "closing connection %p for option changes to take effect", + entry->conn); disconnect_pg_server(entry); } @@ -186,58 +179,78 @@ retry: * will remain in a valid empty state, ie conn == NULL.) */ if (entry->conn == NULL) - { - ForeignServer *server = GetForeignServer(user->serverid); - - /* Reset all transient state fields, to be sure all are clean */ - entry->xact_depth = 0; - entry->have_prep_stmt = false; - entry->have_error = false; - entry->changing_xact_state = false; - entry->invalidated = false; - entry->server_hashvalue = - GetSysCacheHashValue1(FOREIGNSERVEROID, - ObjectIdGetDatum(server->serverid)); - entry->mapping_hashvalue = - GetSysCacheHashValue1(USERMAPPINGOID, - ObjectIdGetDatum(user->umid)); - - /* Now try to make the connection */ - entry->conn = connect_pg_server(server, user); - - elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)", - entry->conn, server->servername, user->umid, user->userid); - } + make_new_connection(entry, user); /* * We check the health of the cached connection here when starting a new - * remote transaction. If a broken connection is detected in the first - * attempt, we try to reestablish a new connection. If broken connection - * is detected again here, we give up getting a connection. + * remote transaction. If a broken connection is detected, we try to + * reestablish a new connection later. */ PG_TRY(); { /* Start a new transaction or subtransaction if needed. */ begin_remote_xact(entry); - retry_conn = false; } PG_CATCH(); { - if (PQstatus(entry->conn) != CONNECTION_BAD || - entry->xact_depth > 0 || - retry_conn) + MemoryContext ecxt = MemoryContextSwitchTo(ccxt); + ErrorData *errdata = CopyErrorData(); + + /* + * If connection failure is reported when starting a new remote + * transaction (not subtransaction), new connection will be + * reestablished later. + * + * After a broken connection is detected in libpq, any error other + * than connection failure (e.g., out-of-memory) can be thrown + * somewhere between return from libpq and the expected ereport() call + * in pgfdw_report_error(). In this case, since PQstatus() indicates + * CONNECTION_BAD, checking only PQstatus() causes the false detection + * of connection failure. To avoid this, we also verify that the + * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also + * checking only the sqlstate can cause another false detection + * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE + * for any libpq-originated error condition. + */ + if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE || + PQstatus(entry->conn) != CONNECTION_BAD || + entry->xact_depth > 0) + { + MemoryContextSwitchTo(ecxt); PG_RE_THROW(); - retry_conn = true; + } + + /* Clean up the error state */ + FlushErrorState(); + FreeErrorData(errdata); + errdata = NULL; + + retry = true; } PG_END_TRY(); - if (retry_conn) + /* + * If a broken connection is detected, disconnect it, reestablish a new + * connection and retry a new remote transaction. If connection failure is + * reported again, we give up getting a connection. + */ + if (retry) { + Assert(entry->xact_depth == 0); + ereport(DEBUG3, (errmsg_internal("could not start remote transaction on connection %p", entry->conn)), errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn)))); - goto retry; + + elog(DEBUG3, "closing connection %p to reestablish a new one", + entry->conn); + disconnect_pg_server(entry); + + if (entry->conn == NULL) + make_new_connection(entry, user); + + begin_remote_xact(entry); } /* Remember if caller will prepare statements */ @@ -246,6 +259,37 @@ retry: return entry->conn; } +/* + * Reset all transient state fields in the cached connection entry and + * establish new connection to the remote server. + */ +static void +make_new_connection(ConnCacheEntry *entry, UserMapping *user) +{ + ForeignServer *server = GetForeignServer(user->serverid); + + Assert(entry->conn == NULL); + + /* Reset all transient state fields, to be sure all are clean */ + entry->xact_depth = 0; + entry->have_prep_stmt = false; + entry->have_error = false; + entry->changing_xact_state = false; + entry->invalidated = false; + entry->server_hashvalue = + GetSysCacheHashValue1(FOREIGNSERVEROID, + ObjectIdGetDatum(server->serverid)); + entry->mapping_hashvalue = + GetSysCacheHashValue1(USERMAPPINGOID, + ObjectIdGetDatum(user->umid)); + + /* Now try to make the connection */ + entry->conn = connect_pg_server(server, user); + + elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)", + entry->conn, server->servername, user->umid, user->userid); +} + /* * Connect to remote server using specified server and user mapping properties. */