From e4f9500f41cff6d607abfbf601f7511a7652103b Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Tue, 23 Aug 2016 16:56:09 -0700 Subject: [PATCH 2/2] Support 2PC for postgres_fdw. --- contrib/postgres_fdw/connection.c | 466 ++++++++++++++++++++++++------------ contrib/postgres_fdw/option.c | 5 +- contrib/postgres_fdw/postgres_fdw.c | 22 +- contrib/postgres_fdw/postgres_fdw.h | 11 +- 4 files changed, 348 insertions(+), 156 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 8ca1c1c..28708ba 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -14,7 +14,9 @@ #include "postgres_fdw.h" +#include "access/fdw_xact.h" #include "access/xact.h" +#include "commands/defrem.h" #include "mb/pg_wchar.h" #include "miscadmin.h" #include "storage/latch.h" @@ -63,16 +65,19 @@ static unsigned int prep_stmt_number = 0; static bool xact_got_connection = false; /* prototypes of private functions */ -static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); +static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user, + bool connection_error_ok); static void check_conn_params(const char **keywords, const char **values); static void configure_remote_session(PGconn *conn); static void do_sql_command(PGconn *conn, const char *sql); -static void begin_remote_xact(ConnCacheEntry *entry); +static void begin_remote_xact(ConnCacheEntry *entry, Oid serverid, Oid userid); static void pgfdw_xact_callback(XactEvent event, void *arg); static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg); +static bool server_uses_two_phase_commit(ForeignServer *server); +static void pgfdw_cleanup_after_transaction(ConnCacheEntry *entry); /* @@ -85,6 +90,9 @@ static void pgfdw_subxact_callback(SubXactEvent event, * statements. Since those don't go away automatically at transaction end * (not even on error), we need this flag to cue manual cleanup. * + * connection_error_ok if true, indicates that caller can handle connection + * error by itself. If false, raise error. + * * XXX Note that caching connections theoretically requires a mechanism to * detect change of FDW objects to invalidate already established connections. * We could manage that by watching for invalidation events on the relevant @@ -93,7 +101,8 @@ static void pgfdw_subxact_callback(SubXactEvent event, * mid-transaction anyway. */ PGconn * -GetConnection(UserMapping *user, bool will_prep_stmt) +GetConnection(UserMapping *user, bool will_prep_stmt, + bool start_transaction, bool connection_error_ok) { bool found; ConnCacheEntry *entry; @@ -121,9 +130,6 @@ GetConnection(UserMapping *user, bool will_prep_stmt) RegisterSubXactCallback(pgfdw_subxact_callback, NULL); } - /* Set flag that we did GetConnection during the current transaction */ - xact_got_connection = true; - /* Create hash key for the entry. Assume no pad bytes in key struct */ key = user->umid; @@ -158,7 +164,20 @@ GetConnection(UserMapping *user, bool will_prep_stmt) entry->xact_depth = 0; /* just to be sure */ entry->have_prep_stmt = false; entry->have_error = false; - entry->conn = connect_pg_server(server, user); + entry->conn = connect_pg_server(server, user, connection_error_ok); + + /* + * If the attempt to connect to the foreign server failed, we should not + * come here, unless the caller has indicated so. + */ + Assert(entry->conn || connection_error_ok); + + if (!entry->conn && connection_error_ok) + { + elog(DEBUG3, "attempt to connection to server \"%s\" by postgres_fdw failed", + server->servername); + return NULL; + } elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)", entry->conn, server->servername, user->umid, user->userid); @@ -167,7 +186,12 @@ GetConnection(UserMapping *user, bool will_prep_stmt) /* * Start a new transaction or subtransaction if needed. */ - begin_remote_xact(entry); + if (start_transaction) + { + begin_remote_xact(entry, user->serverid, user->userid); + /* Set flag that we did GetConnection during the current transaction */ + xact_got_connection = true; + } /* Remember if caller will prepare statements */ entry->have_prep_stmt |= will_prep_stmt; @@ -177,9 +201,12 @@ GetConnection(UserMapping *user, bool will_prep_stmt) /* * Connect to remote server using specified server and user mapping properties. + * If the attempt to connect fails, and the caller can handle connection failure + * (connection_error_ok = true) return NULL, throw error otherwise. */ static PGconn * -connect_pg_server(ForeignServer *server, UserMapping *user) +connect_pg_server(ForeignServer *server, UserMapping *user, + bool connection_error_ok) { PGconn *volatile conn = NULL; @@ -234,11 +261,14 @@ connect_pg_server(ForeignServer *server, UserMapping *user) msglen = strlen(connmessage); if (msglen > 0 && connmessage[msglen - 1] == '\n') connmessage[msglen - 1] = '\0'; - ereport(ERROR, - (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), - errmsg("could not connect to server \"%s\"", - server->servername), - errdetail_internal("%s", connmessage))); + + if (connection_error_ok) + return NULL; + else + ereport(ERROR, + (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), + errmsg("could not connect to server \"%s\"", server->servername), + errdetail_internal("%s", connmessage))); } /* @@ -369,15 +399,22 @@ do_sql_command(PGconn *conn, const char *sql) * control which remote queries share a snapshot. */ static void -begin_remote_xact(ConnCacheEntry *entry) +begin_remote_xact(ConnCacheEntry *entry, Oid serverid, Oid userid) { int curlevel = GetCurrentTransactionNestLevel(); + ForeignServer *server = GetForeignServer(serverid); /* Start main transaction if we haven't yet */ if (entry->xact_depth <= 0) { const char *sql; + /* + * Register the new foreign server and check whether the two phase + * compliance is possible. + */ + RegisterXactForeignServer(serverid, userid, server_uses_two_phase_commit(server)); + elog(DEBUG3, "starting remote transaction on connection %p", entry->conn); @@ -585,158 +622,270 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, } /* - * pgfdw_xact_callback --- cleanup at main-transaction end. + * postgresGetPrepareId + * The function crafts prepared transaction identifier. PostgreSQL documentation + * mentions two restrictions on the name + * 1. String literal, less than 200 bytes long. + * 2. Should not be same as any other concurrent prepared transaction id. + * + * To make the prepared transaction id, we should ideally use something like + * UUID, which gives unique ids with high probability, but that may be expensive + * here and UUID extension which provides the function to generate UUID is + * not part of the core. */ -static void -pgfdw_xact_callback(XactEvent event, void *arg) +extern char * +postgresGetPrepareId(Oid serverid, Oid userid, int *prep_info_len) { - HASH_SEQ_STATUS scan; - ConnCacheEntry *entry; +/* Maximum length of the prepared transaction id, borrowed from twophase.c */ +#define PREP_XACT_ID_MAX_LEN 200 +#define RANDOM_LARGE_MULTIPLIER 1000 + char *prep_info; + + /* Allocate the memory in the same context as the hash entry */ + prep_info = (char *)palloc(PREP_XACT_ID_MAX_LEN * sizeof(char)); + snprintf(prep_info, PREP_XACT_ID_MAX_LEN, "%s_%4d_%d_%d", + "px", abs(random() * RANDOM_LARGE_MULTIPLIER), + serverid, userid); + /* Account for the last NULL byte */ + *prep_info_len = strlen(prep_info); + return prep_info; +} - /* Quick exit if no connections were touched in this transaction. */ - if (!xact_got_connection) - return; +bool +postgresPrepareForeignTransaction(Oid serverid, Oid userid, + int prep_info_len, char *prep_info) +{ + StringInfo command; + PGresult *res; + ConnCacheEntry *entry = NULL; + ConnCacheKey key; + bool found; + UserMapping *user_mapping = GetUserMapping(userid, serverid); + + /* Create hash key for the entry. Assume no pad bytes in key struct */ + key = user_mapping->umid; + + Assert(ConnectionHash); + entry = hash_search(ConnectionHash, &key, HASH_FIND, &found); + + if (found && entry->conn) + { + bool result; + + PGconn *conn = entry->conn; + command = makeStringInfo(); + appendStringInfo(command, "PREPARE TRANSACTION '%.*s'", prep_info_len, + prep_info); + res = PQexec(conn, command->data); + result = (PQresultStatus(res) == PGRES_COMMAND_OK); + if (!result) + { + /* + * TODO: check whether we should raise an error or warning. + * The command failed, raise a warning, so that the reason for + * failure gets logged. Do not raise an error, the caller i.e. foreign + * transaction manager takes care of taking appropriate action. + */ + pgfdw_report_error(WARNING, res, conn, false, command->data); + } + + PQclear(res); + pgfdw_cleanup_after_transaction(entry); + return result; + } + else + return false; +} + +bool +postgresEndForeignTransaction(Oid serverid, Oid userid, Oid umid, bool is_commit) +{ + StringInfo command; + PGresult *res; + ConnCacheEntry *entry = NULL; + ConnCacheKey key; + bool found; + + /* Create hash key for the entry. Assume no pad bytes in key struct */ + key = umid; + + Assert(ConnectionHash); + entry = hash_search(ConnectionHash, &key, HASH_FIND, &found); + + if (found && entry->conn) + { + PGconn *conn = entry->conn; + bool result; + + command = makeStringInfo(); + appendStringInfo(command, "%s TRANSACTION", + is_commit ? "COMMIT" : "ROLLBACK"); + res = PQexec(conn, command->data); + result = (PQresultStatus(res) == PGRES_COMMAND_OK); + if (!result) + { + /* + * The local transaction has ended, so there is no point in raising + * error. Raise a warning so that the reason for the failure gets + * logged. + */ + pgfdw_report_error(WARNING, res, conn, false, command->data); + } + + PQclear(res); + pgfdw_cleanup_after_transaction(entry); + return result; + } + return false; +} + +bool +postgresResolvePreparedForeignTransaction(Oid serverid, Oid userid, Oid umid, + bool is_commit, + int prep_info_len, char *prep_info) +{ + PGconn *conn = NULL; /* - * Scan all connection cache entries to find open remote transactions, and - * close them. + * If there exists a connection in the connection cache that can be used, + * use it. If there is none, we need foreign server and user information + * which can be obtained only when in a transaction block. + * If we are resolving prepared foreign transactions immediately after + * preparing them, the connection hash would have a connection. If we are + * resolving them any other time, a resolver would have started a + * transaction. */ - hash_seq_init(&scan, ConnectionHash); - while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + if (ConnectionHash) { - PGresult *res; + /* Connection hash should have a connection we want */ + bool found; + ConnCacheKey key; + ConnCacheEntry *entry; - /* Ignore cache entry if no open connection right now */ - if (entry->conn == NULL) - continue; + /* Create hash key for the entry. Assume no pad bytes in key struct */ + key = umid; + + entry = (ConnCacheEntry *)hash_search(ConnectionHash, &key, HASH_FIND, &found); + if (found && entry->conn) + conn = entry->conn; + } + + if (!conn && IsTransactionState()) + conn = GetConnection(GetUserMapping(userid, serverid), false, false, true); - /* If it has an open remote transaction, try to close it */ - if (entry->xact_depth > 0) + /* Proceed with resolution if we got a connection, else return false */ + if (conn) + { + StringInfo command; + PGresult *res; + bool result; + + command = makeStringInfo(); + appendStringInfo(command, "%s PREPARED '%.*s'", + is_commit ? "COMMIT" : "ROLLBACK", + prep_info_len, prep_info); + res = PQexec(conn, command->data); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { - elog(DEBUG3, "closing remote transaction on connection %p", - entry->conn); + int sqlstate; + char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); + /* + * The command failed, raise a warning to log the reason of failure. + * We may not be in a transaction here, so raising error doesn't + * help. Even if we are in a transaction, it would be the resolver + * transaction, which will get aborted on raising error, thus + * delaying resolution of other prepared foreign transactions. + */ + pgfdw_report_error(WARNING, res, conn, false, command->data); - switch (event) + if (diag_sqlstate) { - case XACT_EVENT_PARALLEL_PRE_COMMIT: - case XACT_EVENT_PRE_COMMIT: - /* Commit all remote transactions during pre-commit */ - do_sql_command(entry->conn, "COMMIT TRANSACTION"); - - /* - * If there were any errors in subtransactions, and we - * made prepared statements, do a DEALLOCATE ALL to make - * sure we get rid of all prepared statements. This is - * annoying and not terribly bulletproof, but it's - * probably not worth trying harder. - * - * DEALLOCATE ALL only exists in 8.3 and later, so this - * constrains how old a server postgres_fdw can - * communicate with. We intentionally ignore errors in - * the DEALLOCATE, so that we can hobble along to some - * extent with older servers (leaking prepared statements - * as we go; but we don't really support update operations - * pre-8.3 anyway). - */ - if (entry->have_prep_stmt && entry->have_error) - { - res = PQexec(entry->conn, "DEALLOCATE ALL"); - PQclear(res); - } - entry->have_prep_stmt = false; - entry->have_error = false; - break; - case XACT_EVENT_PRE_PREPARE: - - /* - * We disallow remote transactions that modified anything, - * since it's not very reasonable to hold them open until - * the prepared transaction is committed. For the moment, - * throw error unconditionally; later we might allow - * read-only cases. Note that the error will cause us to - * come right back here with event == XACT_EVENT_ABORT, so - * we'll clean up the connection state at that point. - */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot prepare a transaction that modified remote tables"))); - break; - case XACT_EVENT_PARALLEL_COMMIT: - case XACT_EVENT_COMMIT: - case XACT_EVENT_PREPARE: - /* Pre-commit should have closed the open transaction */ - elog(ERROR, "missed cleaning up connection during pre-commit"); - break; - case XACT_EVENT_PARALLEL_ABORT: - case XACT_EVENT_ABORT: - /* Assume we might have lost track of prepared statements */ - entry->have_error = true; - - /* - * If a command has been submitted to the remote server by - * using an asynchronous execution function, the command - * might not have yet completed. Check to see if a - * command is still being processed by the remote server, - * and if so, request cancellation of the command. - */ - if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE) - { - PGcancel *cancel; - char errbuf[256]; - - if ((cancel = PQgetCancel(entry->conn))) - { - if (!PQcancel(cancel, errbuf, sizeof(errbuf))) - ereport(WARNING, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("could not send cancel request: %s", - errbuf))); - PQfreeCancel(cancel); - } - } - - /* If we're aborting, abort all remote transactions too */ - res = PQexec(entry->conn, "ABORT TRANSACTION"); - /* Note: can't throw ERROR, it would be infinite loop */ - if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(WARNING, res, entry->conn, true, - "ABORT TRANSACTION"); - else - { - PQclear(res); - /* As above, make sure to clear any prepared stmts */ - if (entry->have_prep_stmt && entry->have_error) - { - res = PQexec(entry->conn, "DEALLOCATE ALL"); - PQclear(res); - } - entry->have_prep_stmt = false; - entry->have_error = false; - } - break; + sqlstate = MAKE_SQLSTATE(diag_sqlstate[0], + diag_sqlstate[1], + diag_sqlstate[2], + diag_sqlstate[3], + diag_sqlstate[4]); } + else + sqlstate = ERRCODE_CONNECTION_FAILURE; + + /* + * If we tried to COMMIT/ABORT a prepared transaction and the pepared + * transaction was missing on the foreign server, it was probably + * resolved by some other means. Anyway, it should be considered as resolved. + */ + result = (sqlstate == ERRCODE_UNDEFINED_OBJECT); } + else + result = true; - /* Reset state to show we're out of a transaction */ - entry->xact_depth = 0; + PQclear(res); + ReleaseConnection(conn); + return result; + } + else + return false; +} - /* - * If the connection isn't in a good idle state, discard it to - * recover. Next GetConnection will open a new connection. - */ - if (PQstatus(entry->conn) != CONNECTION_OK || - PQtransactionStatus(entry->conn) != PQTRANS_IDLE) - { - elog(DEBUG3, "discarding connection %p", entry->conn); - PQfinish(entry->conn); - entry->conn = NULL; - } +static void +pgfdw_cleanup_after_transaction(ConnCacheEntry *entry) +{ + /* + * If there were any errors in subtransactions, and we made prepared + * statements, do a DEALLOCATE ALL to make sure we get rid of all + * prepared statements. This is annoying and not terribly bulletproof, + * but it's probably not worth trying harder. + * + * DEALLOCATE ALL only exists in 8.3 and later, so this constrains how + * old a server postgres_fdw can communicate with. We intentionally + * ignore errors in the DEALLOCATE, so that we can hobble along to some + * extent with older servers (leaking prepared statements as we go; + * but we don't really support update operations pre-8.3 anyway). + */ + if (entry->have_prep_stmt && entry->have_error) + { + PGresult *res = PQexec(entry->conn, "DEALLOCATE ALL"); + PQclear(res); } + entry->have_prep_stmt = false; + entry->have_error = false; + /* Reset state to show we're out of a transaction */ + entry->xact_depth = 0; + + /* + * If the connection isn't in a good idle state, discard it to + * recover. Next GetConnection will open a new connection. + */ + if (PQstatus(entry->conn) != CONNECTION_OK || + PQtransactionStatus(entry->conn) != PQTRANS_IDLE) + { + elog(DEBUG3, "discarding connection %p", entry->conn); + PQfinish(entry->conn); + entry->conn = NULL; + } + + /* + * TODO: these next two statements should be moved to end of transaction + * call back. + * Regardless of the event type, we can now mark ourselves as out of the + * transaction. + */ + xact_got_connection = false; + + /* Also reset cursor numbering for next transaction */ + cursor_number = 0; +} + +/* + * pgfdw_xact_callback --- cleanup at main-transaction end. + */ +static void +pgfdw_xact_callback(XactEvent event, void *arg) +{ /* * Regardless of the event type, we can now mark ourselves as out of the - * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE, - * this saves a useless scan of the hashtable during COMMIT or PREPARE.) + * transction. */ xact_got_connection = false; @@ -835,3 +984,26 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, entry->xact_depth--; } } + +/* + * server_uses_two_phase_commit + * Returns true if the foreign server is configured to support 2PC. + */ +static bool +server_uses_two_phase_commit(ForeignServer *server) +{ + ListCell *lc; + + /* Check the options for two phase compliance */ + foreach(lc, server->options) + { + DefElem *d = (DefElem *) lfirst(lc); + + if (strcmp(d->defname, "two_phase_commit") == 0) + { + return defGetBoolean(d); + } + } + /* By default a server is not 2PC compliant */ + return false; +} diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c index 224aed9..6a20c47 100644 --- a/contrib/postgres_fdw/option.c +++ b/contrib/postgres_fdw/option.c @@ -107,7 +107,8 @@ postgres_fdw_validator(PG_FUNCTION_ARGS) * Validate option value, when we can do so without any context. */ if (strcmp(def->defname, "use_remote_estimate") == 0 || - strcmp(def->defname, "updatable") == 0) + strcmp(def->defname, "updatable") == 0 || + strcmp(def->defname, "two_phase_commit") == 0) { /* these accept only boolean values */ (void) defGetBoolean(def); @@ -176,6 +177,8 @@ InitPgFdwOptions(void) /* fetch_size is available on both server and table */ {"fetch_size", ForeignServerRelationId, false}, {"fetch_size", ForeignTableRelationId, false}, + /* two phase commit support */ + {"two_phase_commit", ForeignServerRelationId, false}, {NULL, InvalidOid, false} }; diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 931bcfd..f585273 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -14,6 +14,8 @@ #include "postgres_fdw.h" +#include "access/fdw_xact.h" +#include "access/xact.h" #include "access/htup_details.h" #include "access/sysattr.h" #include "commands/defrem.h" @@ -455,6 +457,12 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for join push-down */ routine->GetForeignJoinPaths = postgresGetForeignJoinPaths; + /* Support functions for foreign transactions */ + routine->GetPrepareId = postgresGetPrepareId; + routine->PrepareForeignTransaction = postgresPrepareForeignTransaction; + routine->ResolvePreparedForeignTransaction = postgresResolvePreparedForeignTransaction; + routine->EndForeignTransaction = postgresEndForeignTransaction; + PG_RETURN_POINTER(routine); } @@ -1298,7 +1306,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - fsstate->conn = GetConnection(user, false); + fsstate->conn = GetConnection(user, false, true, false); /* Assign a unique ID for my cursor */ fsstate->cursor_number = GetCursorNumber(fsstate->conn); @@ -1679,7 +1687,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate, user = GetUserMapping(userid, table->serverid); /* Open connection; report that we'll create a prepared statement. */ - fmstate->conn = GetConnection(user, true); + fmstate->conn = GetConnection(user, true, true, false); fmstate->p_name = NULL; /* prepared statement not made yet */ /* Deconstruct fdw_private data. */ @@ -2276,7 +2284,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - dmstate->conn = GetConnection(user, false); + dmstate->conn = GetConnection(user, false, true, false); /* Initialize state variable */ dmstate->num_tuples = -1; /* -1 means not set yet */ @@ -2538,7 +2546,7 @@ estimate_path_cost_size(PlannerInfo *root, NULL); /* Get the remote estimate */ - conn = GetConnection(fpinfo->user, false); + conn = GetConnection(fpinfo->user, false, true, false); get_remote_estimate(sql.data, conn, &rows, &width, &startup_cost, &total_cost); ReleaseConnection(conn); @@ -3398,7 +3406,7 @@ postgresAnalyzeForeignTable(Relation relation, */ table = GetForeignTable(RelationGetRelid(relation)); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user, false, true, false); /* * Construct command to get page count for relation. @@ -3490,7 +3498,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, table = GetForeignTable(RelationGetRelid(relation)); server = GetForeignServer(table->serverid); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user, false, true, false); /* * Construct cursor that retrieves whole rows from remote. @@ -3713,7 +3721,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) */ server = GetForeignServer(serverOid); mapping = GetUserMapping(GetUserId(), server->serverid); - conn = GetConnection(mapping, false); + conn = GetConnection(mapping, false, true, false); /* Don't attempt to import collation if remote server hasn't got it */ if (PQserverVersion(conn) < 90100) diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 67126bc..ae2a40d 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -13,6 +13,7 @@ #ifndef POSTGRES_FDW_H #define POSTGRES_FDW_H +#include "access/fdw_xact.h" #include "foreign/foreign.h" #include "lib/stringinfo.h" #include "nodes/relation.h" @@ -99,7 +100,8 @@ extern int set_transmission_modes(void); extern void reset_transmission_modes(int nestlevel); /* in connection.c */ -extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt); +extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt, + bool start_transaction, bool connection_error_ok); extern void ReleaseConnection(PGconn *conn); extern unsigned int GetCursorNumber(PGconn *conn); extern unsigned int GetPrepStmtNumber(PGconn *conn); @@ -160,6 +162,13 @@ extern void deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *foreignrel, List *tlist, List *remote_conds, List *pathkeys, List **retrieved_attrs, List **params_list); +extern char *postgresGetPrepareId(Oid serveroid, Oid userid, int *prep_info_len); +extern bool postgresResolvePreparedForeignTransaction(Oid serverid, Oid userid, + Oid umid, bool is_commit, + int prep_info_len, char *prep_info); +extern bool postgresEndForeignTransaction(Oid serverid, Oid userid, Oid umid, bool is_commit); +extern bool postgresPrepareForeignTransaction(Oid serverid, Oid userid, int prep_info_len, + char *prep_info); /* in shippable.c */ extern bool is_builtin(Oid objectId); -- 2.8.1