From 6c438e39f625c9a88b4242fe605deaba43087744 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Mon, 8 Aug 2016 15:04:18 -0700 Subject: [PATCH 1/2] Support transaction with foreign servers. --- src/backend/access/rmgrdesc/Makefile | 9 +- src/backend/access/rmgrdesc/fdw_xactdesc.c | 61 + src/backend/access/rmgrdesc/xlogdesc.c | 5 +- src/backend/access/transam/Makefile | 2 +- src/backend/access/transam/fdw_xact.c | 2034 +++++++++++++++++++++++++ src/backend/access/transam/rmgr.c | 1 + src/backend/access/transam/twophase.c | 7 + src/backend/access/transam/xact.c | 14 + src/backend/access/transam/xlog.c | 27 +- src/backend/bootstrap/bootstrap.c | 1 + src/backend/commands/foreigncmds.c | 26 + src/backend/replication/logical/decode.c | 1 + src/backend/storage/ipc/ipci.c | 3 + src/backend/storage/lmgr/lwlocknames.txt | 1 + src/backend/utils/misc/guc.c | 14 + src/backend/utils/misc/postgresql.conf.sample | 6 + src/bin/initdb/initdb.c | 1 + src/bin/pg_controldata/pg_controldata.c | 2 + src/bin/pg_resetxlog/pg_resetxlog.c | 2 + src/bin/pg_xlogdump/rmgrdesc.c | 2 + src/include/access/fdw_xact.h | 75 + src/include/access/rmgrlist.h | 1 + src/include/access/xlog_internal.h | 1 + src/include/catalog/pg_control.h | 1 + src/include/catalog/pg_proc.h | 6 + src/include/foreign/fdwapi.h | 24 + src/include/storage/proc.h | 5 +- src/include/utils/builtins.h | 4 + src/test/regress/pg_regress.c | 11 +- 29 files changed, 2333 insertions(+), 14 deletions(-) create mode 100644 src/backend/access/rmgrdesc/fdw_xactdesc.c create mode 100644 src/backend/access/transam/fdw_xact.c create mode 100644 src/include/access/fdw_xact.h diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile index 5514db1..6e23ec1 100644 --- a/src/backend/access/rmgrdesc/Makefile +++ b/src/backend/access/rmgrdesc/Makefile @@ -8,9 +8,10 @@ subdir = src/backend/access/rmgrdesc top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o genericdesc.o \ - gindesc.o gistdesc.o hashdesc.o heapdesc.o logicalmsgdesc.o \ - mxactdesc.o nbtdesc.o relmapdesc.o replorigindesc.o seqdesc.o \ - smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o +OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o fdw_xactdesc.o \ + genericdesc.o gindesc.o gistdesc.o hashdesc.o heapdesc.o \ + logicalmsgdesc.o mxactdesc.o nbtdesc.o relmapdesc.o replorigindesc.o \ + seqdesc.o smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o xactdesc.o \ + xlogdesc.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/rmgrdesc/fdw_xactdesc.c b/src/backend/access/rmgrdesc/fdw_xactdesc.c new file mode 100644 index 0000000..b01ccf8 --- /dev/null +++ b/src/backend/access/rmgrdesc/fdw_xactdesc.c @@ -0,0 +1,61 @@ +/*------------------------------------------------------------------------- + * + * fdw_xactdesc.c + * PostgreSQL distributed transaction manager. + * + * This module describes the WAL records for foreign transaction manager. + * + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/access/transam/fdw_xactdesc.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/fdw_xact.h" +#include "access/xloginsert.h" +#include "lib/stringinfo.h" + +extern void +fdw_xact_desc(StringInfo buf, XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info == XLOG_FDW_XACT_INSERT) + { + FDWXactOnDiskData *fdw_insert_xlog = (FDWXactOnDiskData *)rec; + appendStringInfo(buf, "Foreign server oid: %u", fdw_insert_xlog->serverid); + appendStringInfo(buf, " user oid: %u", fdw_insert_xlog->userid); + appendStringInfo(buf, " database id: %u", fdw_insert_xlog->dboid); + /* TODO: This should be really interpreted by each FDW */ + /* TODO: we also need to assess whether we want to add this information */ + appendStringInfo(buf, " foreign transaction info: "); + appendStringInfo(buf, "%.*s", fdw_insert_xlog->fdw_xact_id_len, + fdw_insert_xlog->fdw_xact_id); + } + else + { + FdwRemoveXlogRec *fdw_remove_xlog = (FdwRemoveXlogRec *)rec; + appendStringInfo(buf, "Foreign server oid: %u", fdw_remove_xlog->serverid); + appendStringInfo(buf, " user oid: %u", fdw_remove_xlog->userid); + appendStringInfo(buf, " database id: %u", fdw_remove_xlog->dbid); + } + +} + +extern const char * +fdw_xact_identify(uint8 info) +{ + switch(info & ~XLR_INFO_MASK) + { + case XLOG_FDW_XACT_INSERT: + return "NEW FOREIGN TRANSACTION"; + case XLOG_FDW_XACT_REMOVE: + return "REMOVE FOREIGN TRANSACTION"; + } + /* Keep compiler happy */ + return NULL; +} diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c index 62ed1dc..c2f36c7 100644 --- a/src/backend/access/rmgrdesc/xlogdesc.c +++ b/src/backend/access/rmgrdesc/xlogdesc.c @@ -112,14 +112,15 @@ xlog_desc(StringInfo buf, XLogReaderState *record) appendStringInfo(buf, "max_connections=%d max_worker_processes=%d " "max_prepared_xacts=%d max_locks_per_xact=%d " "wal_level=%s wal_log_hints=%s " - "track_commit_timestamp=%s", + "track_commit_timestamp=%s max_fdw_xacts=%d", xlrec.MaxConnections, xlrec.max_worker_processes, xlrec.max_prepared_xacts, xlrec.max_locks_per_xact, wal_level_str, xlrec.wal_log_hints ? "on" : "off", - xlrec.track_commit_timestamp ? "on" : "off"); + xlrec.track_commit_timestamp ? "on" : "off", + xlrec.max_fdw_xacts); } else if (info == XLOG_FPW_CHANGE) { diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 16fbe47..dd7ee32 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global OBJS = clog.o commit_ts.o generic_xlog.o multixact.o parallel.o rmgr.o slru.o \ subtrans.o timeline.o transam.o twophase.o twophase_rmgr.o varsup.o \ xact.o xlog.o xlogarchive.o xlogfuncs.o \ - xloginsert.o xlogreader.o xlogutils.o + xloginsert.o xlogreader.o xlogutils.o fdw_xact.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/transam/fdw_xact.c b/src/backend/access/transam/fdw_xact.c new file mode 100644 index 0000000..df305e5 --- /dev/null +++ b/src/backend/access/transam/fdw_xact.c @@ -0,0 +1,2034 @@ +/*------------------------------------------------------------------------- + * + * fdw_xact.c + * PostgreSQL distributed transaction manager. + * + * This module manages the transactions involving foreign servers. + * + * Copyright (c) 2016, PostgreSQL Global Development Group + * + * src/backend/access/transam/fdw_xact.c + * + *------------------------------------------------------------------------- + */ +#include +#include +#include + +#include "postgres.h" + +#include "miscadmin.h" +#include "funcapi.h" + +#include "access/fdw_xact.h" +#include "access/htup_details.h" +#include "access/xact.h" +#include "access/xlog.h" +#include "access/xloginsert.h" +#include "catalog/pg_type.h" +#include "foreign/foreign.h" +#include "foreign/fdwapi.h" +#include "libpq/pqsignal.h" +#include "storage/fd.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/lock.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/shmem.h" +#include "utils/builtins.h" +#include "utils/memutils.h" +#include "utils/guc.h" +#include "utils/snapmgr.h" + +/* + * This comment summarises how the transaction manager handles transactions + * involving one or more foreign server/s. + * + * When an foreign data wrapper starts transaction on a foreign server, it is + * required to register the foreign server and user who initiated the + * transaction using function RegisterXactForeignServer(). A foreign server + * connection is identified by oid of foreign server and user. + * + * The commit is executed in two phases: + * First phase (executed during pre-commit processing) + * ----------- + * Transactions are prepared on all the foreign servers, which can participate + * in two-phase commit protocol. Transaction on other foreign servers are + * committed in the same phase. + * + * Second phase (executed during post-commit/abort processing) + * ------------ + * If first phase succeeds, foreign servers are requested to commit respective + * prepared transactions. If the first phase does not succeed because of any + * failure, the foreign servers are asked to rollback respective prepared + * transactions or abort the transactions if they are not prepared. + * + * Any network failure, server crash after preparing foreign transaction leaves + * that prepared transaction unresolved. During the first phase, before actually + * preparing the transactions, enough information is persisted to the disk and + * logs in order to resolve such transactions. + */ + +/* Shared memory entry for a prepared or being prepared foreign transaction */ +typedef struct FDWXactData *FDWXact; + +/* Structure to bundle the foreign connection participating in transaction */ +typedef struct +{ + Oid serverid; + Oid userid; + Oid umid; + char *servername; + FDWXact fdw_xact; /* foreign prepared transaction entry + in case prepared */ + bool two_phase_commit; /* Should use two phase commit + * protocol while committing + * transaction on this + * server, whenever + * necessary. + */ + GetPrepareId_function prepare_id_provider; + EndForeignTransaction_function end_foreing_xact; + PrepareForeignTransaction_function prepare_foreign_xact; + ResolvePreparedForeignTransaction_function resolve_prepared_foreign_xact; +} FDWConnection; + +/* List of foreign connections participating in the transaction */ +List *MyFDWConnections = NIL; + +/* + * By default we assume that all the foreign connections participating in this + * transaction can use two phase commit protocol. + */ +bool TwoPhaseReady = true; + +/* Record the server, userid participating in the transaction. */ +void +RegisterXactForeignServer(Oid serverid, Oid userid, bool two_phase_commit) +{ + FDWConnection *fdw_conn; + ListCell *lcell; + ForeignServer *foreign_server; + ForeignDataWrapper *fdw; + UserMapping *user_mapping; + FdwRoutine *fdw_routine; + MemoryContext old_context; + + TwoPhaseReady = TwoPhaseReady && two_phase_commit; + + /* Check if the entry already exists, if so, raise an error */ + foreach(lcell, MyFDWConnections) + { + fdw_conn = lfirst(lcell); + + if (fdw_conn->serverid == serverid && + fdw_conn->userid == userid) + ereport(ERROR, + (errmsg("attempt to start transction again on server %u user %u", + serverid, userid))); + } + + /* This list and its contents needs to be saved in the transaction context memory */ + old_context = MemoryContextSwitchTo(TopTransactionContext); + /* Add this foreign connection to the list for transaction management */ + fdw_conn = (FDWConnection *) palloc(sizeof(FDWConnection)); + + /* Make sure that the FDW has at least a transaction handler */ + foreign_server = GetForeignServer(serverid); + fdw = GetForeignDataWrapper(foreign_server->fdwid); + fdw_routine = GetFdwRoutine(fdw->fdwhandler); + user_mapping = GetUserMapping(userid, serverid); + + if (!fdw_routine->EndForeignTransaction) + elog(ERROR, "no function to end a foreign transaction provided for FDW %s", + fdw->fdwname); + + if (two_phase_commit) + { + if (!fdw_routine->GetPrepareId) + elog(ERROR, "no prepared transaction identifier provider function for FDW %s", + fdw->fdwname); + + if (!fdw_routine->PrepareForeignTransaction) + elog(ERROR, "no function provided for preparing foreign transaction for FDW %s", + fdw->fdwname); + + if (!fdw_routine->ResolvePreparedForeignTransaction) + elog(ERROR, "no function provided for resolving prepared foreign transaction for FDW %s", + fdw->fdwname); + } + + fdw_conn->serverid = serverid; + fdw_conn->userid = userid; + fdw_conn->umid = user_mapping->umid; + + /* + * We may need following information at the end of a transaction, when the + * system caches are not available. So save it before hand. + */ + fdw_conn->servername = foreign_server->servername; + fdw_conn->prepare_id_provider = fdw_routine->GetPrepareId; + fdw_conn->prepare_foreign_xact = fdw_routine->PrepareForeignTransaction; + fdw_conn->resolve_prepared_foreign_xact = fdw_routine->ResolvePreparedForeignTransaction; + fdw_conn->end_foreing_xact = fdw_routine->EndForeignTransaction; + fdw_conn->fdw_xact = NULL; + fdw_conn->two_phase_commit = two_phase_commit; + MyFDWConnections = lappend(MyFDWConnections, fdw_conn); + /* Revert back the context */ + MemoryContextSwitchTo(old_context); + + return; +} + +/* Prepared transaction identifier can be maximum 256 bytes long */ +#define MAX_FDW_XACT_ID_LEN 256 + +/* Enum to track the status of prepared foreign transaction */ +typedef enum +{ + FDW_XACT_PREPARING, /* foreign transaction is (being) prepared */ + FDW_XACT_COMMITTING_PREPARED, /* foreign prepared transaction is to be committed */ + FDW_XACT_ABORTING_PREPARED, /* foreign prepared transaction is to be aborted */ + FDW_XACT_RESOLVED /* Status used only by pg_fdw_resolve(). + It doesn't appear in the in-memory entry. */ +} FDWXactStatus; + +typedef struct FDWXactData +{ + FDWXact fx_next; /* Next free FDWXact entry */ + Oid dboid; /* database oid where to find foreign server and + * user mapping + */ + TransactionId local_xid; /* XID of local transaction */ + Oid serverid; /* foreign server where transaction takes place */ + Oid userid; /* user who initiated the foreign transaction */ + Oid umid; + FDWXactStatus fdw_xact_status; /* The state of the foreign transaction. + This doubles as the action to be + taken on this entry.*/ + XLogRecPtr fdw_xact_lsn; /* LSN of the log record for inserting this entry */ + bool fdw_xact_valid; /* Has the entry been complete and written to file? */ + BackendId locking_backend; /* Backend working on this entry */ + int fdw_xact_id_len; /* Length of prepared transaction identifier */ + char fdw_xact_id[MAX_FDW_XACT_ID_LEN]; /* prepared transaction identifier */ +} FDWXactData; + +/* Directory where the foreign prepared transaction files will reside */ +#define FDW_XACTS_DIR "pg_fdw_xact" + +/* + * Name of foreign prepared transaction file is 8 bytes xid, 8 bytes foreign + * server oid and 8 bytes user oid separated by '_'. + */ +#define FDW_XACT_FILE_NAME_LEN (8 + 1 + 8 + 1 + 8) +#define FDWXactFilePath(path, xid, serverid, userid) \ + snprintf(path, MAXPGPATH, FDW_XACTS_DIR "/%08X_%08X_%08X", xid, \ + serverid, userid) + +/* Shared memory layout for maintaining foreign prepared transaction entries. */ +typedef struct +{ + /* Head of linked list of free FDWXactData structs */ + FDWXact freeFDWXacts; + + /* Number of valid FDW transaction entries */ + int num_fdw_xacts; + + /* Upto max_fdw_xacts entries in the array */ + FDWXact fdw_xacts[FLEXIBLE_ARRAY_MEMBER]; /* Variable length array */ +} FDWXactGlobalData; + +static void AtProcExit_FDWXact(int code, Datum arg); +static bool resolve_fdw_xact(FDWXact fdw_xact, + ResolvePreparedForeignTransaction_function prepared_foreign_xact_resolver); +static FDWXact insert_fdw_xact(Oid dboid, TransactionId xid, Oid serverid, Oid userid, + int fdw_xact_id_len, char *fdw_xact_id, + FDWXactStatus fdw_xact_status); +static void unlock_fdw_xact(FDWXact fdw_xact); +static void unlock_fdw_xact_entries(); +static void remove_fdw_xact(FDWXact fdw_xact); +static FDWXact register_fdw_xact(Oid dbid, TransactionId xid, Oid serverid, Oid userid, + Oid umid, int fdw_xact_info_len, char *fdw_xact_info); +static int GetFDWXactList(FDWXact *fdw_xacts); +static ResolvePreparedForeignTransaction_function get_prepared_foreign_xact_resolver(FDWXact fdw_xact); +static FDWXactOnDiskData *ReadFDWXactFile(TransactionId xid, Oid serverid, + Oid userid); +static void RemoveFDWXactFile(TransactionId xid, Oid serverid, Oid userid, + bool giveWarning); +static void prepare_foreign_transactions(void); +bool search_fdw_xact(TransactionId xid, Oid dbid, Oid serverid, Oid userid, + List **qualifying_xacts); + +/* + * Maximum number of foreign prepared transaction entries at any given time + * GUC variable, change requires restart. + */ +int max_fdw_xacts = 0; + +/* Keep track of registering process exit call back. */ +static bool fdwXactExitRegistered = false; + +/* Pointer to the shared memory holding the foreign transactions data */ +static FDWXactGlobalData *FDWXactGlobal; + +/* foreign transaction entries locked by this backend */ +List *MyLockedFDWXacts = NIL; + +/* + * FDWXactShmemSize + * Calculates the size of shared memory allocated for maintaining foreign + * prepared transaction entries. + */ +extern Size +FDWXactShmemSize(void) +{ + Size size; + + /* Need the fixed struct, foreign transaction information array */ + size = offsetof(FDWXactGlobalData, fdw_xacts); + size = add_size(size, mul_size(max_fdw_xacts, + sizeof(FDWXact))); + size = MAXALIGN(size); + size = add_size(size, mul_size(max_fdw_xacts, + sizeof(FDWXactData))); + + return size; +} + +/* + * FDWXactShmemInit + * Initialization of shared memory for maintaining foreign prepared transaction + * entries. The shared memory layout is defined in definition of + * FDWXactGlobalData structure. + */ +extern void +FDWXactShmemInit(void) +{ + bool found; + + FDWXactGlobal = ShmemInitStruct("Foreign transactions table", + FDWXactShmemSize(), + &found); + if (!IsUnderPostmaster) + { + FDWXact fdw_xacts; + int cnt; + + Assert(!found); + FDWXactGlobal->freeFDWXacts = NULL; + FDWXactGlobal->num_fdw_xacts = 0; + + /* Initialise the linked list of free FDW transactions */ + fdw_xacts = (FDWXact) + ((char *) FDWXactGlobal + + MAXALIGN(offsetof(FDWXactGlobalData, fdw_xacts) + + sizeof(FDWXact) * max_fdw_xacts)); + for (cnt = 0; cnt < max_fdw_xacts; cnt++) + { + fdw_xacts[cnt].fx_next = FDWXactGlobal->freeFDWXacts; + FDWXactGlobal->freeFDWXacts = &fdw_xacts[cnt]; + } + } + else + { + Assert(FDWXactGlobal); + Assert(found); + } +} + +/* + * PreCommit_FDWXacts + * The function is responsible for pre-commit processing on foreign connections. + * The foreign transactions are prepared on the foreign servers which can + * execute two-phase-commit protocol. Those will be aborted or committed after + * the current transaction has been aborted or committed resp. We try to commit + * transactions on rest of the foreign servers now. For these foreign servers + * it is possible that some transactions commit even if the local transaction + * aborts. + */ +void +PreCommit_FDWXacts(void) +{ + ListCell *cur; + ListCell *prev; + ListCell *next; + + /* If there are no foreign servers involved, we have no business here */ + if (list_length(MyFDWConnections) < 1) + return; + + /* + * Try committing transactions on the foreign servers, which can not execute + * two-phase-commit protocol. + */ + for (cur = list_head(MyFDWConnections), prev = NULL; cur; cur = next) + { + FDWConnection *fdw_conn = lfirst(cur); + next = lnext(cur); + + if (!fdw_conn->two_phase_commit) + { + /* + * The FDW has to make sure that the connection opened to the + * foreign server is out of transaction. Even if the handler + * function returns failure statue, there's hardly anything to do. + */ + if (!fdw_conn->end_foreing_xact(fdw_conn->serverid, fdw_conn->userid, + fdw_conn->umid, true)) + elog(WARNING, "could not commit transaction on server %s", + fdw_conn->servername); + + /* The connection is no more part of this transaction, forget it */ + MyFDWConnections = list_delete_cell(MyFDWConnections, cur, prev); + } + else + prev = cur; + } + + /* + * Prepare the transactions on the foreign servers, which can execute + * two-phase-commit protocol. + */ + prepare_foreign_transactions(); +} + +/* + * Prepare transactions on the foreign servers which can execute two phase + * commit protocol. Rest of the foreign servers are ignored. + */ +static void +prepare_foreign_transactions(void) +{ + ListCell *lcell; + + /* + * Loop over the foreign connections + */ + foreach(lcell, MyFDWConnections) + { + FDWConnection *fdw_conn = (FDWConnection *)lfirst(lcell); + char *fdw_xact_info; + int fdw_xact_info_len; + FDWXact fdw_xact; + + if (!fdw_conn->two_phase_commit) + continue; + + Assert(fdw_conn->prepare_id_provider); + fdw_xact_info = fdw_conn->prepare_id_provider(fdw_conn->serverid, + fdw_conn->userid, + &fdw_xact_info_len); + + /* + * Register the foreign transaction with the identifier used to prepare + * it on the foreign server. Registration persists this information to + * the disk and logs (that way relaying it on standby). Thus in case we + * loose connectivity to the foreign server or crash ourselves, we will + * remember that we have prepared transaction on the foreign server and + * try to resolve it when connectivity is restored or after crash + * recovery. + * + * If we crash after persisting the information but before preparing the + * transaction on the foreign server, we will try to resolve a + * never-prepared transaction, and get an error. This is fine as long as + * the FDW provides us unique prepared transaction identifiers. + * + * If we prepare the transaction on the foreign server before persisting + * the information to the disk and crash in-between these two steps, we + * will forget that we prepared the transaction on the foreign server + * and will not be able to resolve it after the crash. Hence persist + * first then prepare. + */ + fdw_xact = register_fdw_xact(MyDatabaseId, GetTopTransactionId(), + fdw_conn->serverid, fdw_conn->userid, + fdw_conn->umid, fdw_xact_info_len, + fdw_xact_info); + /* + * Between register_fdw_xact call above till this backend hears back + * from foreign server, the backend may abort the local transaction (say, + * because of a signal). During abort processing, it will send an ABORT + * message to the foreign server. If the foreign server has not prepared + * the transaction, the message will succeed. If the foreign server has + * prepared transaction, it will throw an error, which we will ignore and the + * prepared foreign transaction will be resolved by the foreign transaction + * resolver. + */ + if (!fdw_conn->prepare_foreign_xact(fdw_conn->serverid, fdw_conn->userid, + fdw_xact_info_len, fdw_xact_info)) + { + /* + * An error occured, and we didn't prepare the transaction. Delete the + * entry from foreign transaction table. Raise an error, so that the + * local server knows that one of the foreign server has failed to + * prepare the transaction. + * TODO: + * FDW is expected to print the error as a warning and then we + * raise actual error here. But instead, we should pull the + * error text from FDW and add it here in the message or as a + * context or a hint. + */ + remove_fdw_xact(fdw_xact); + + /* + * Delete the connection, since it doesn't require any further + * processing. This deletion will invalidate current cell + * pointer, but that is fine since we will not use that pointer + * because the subsequent ereport will get us out of this loop. + */ + MyFDWConnections = list_delete_ptr(MyFDWConnections, fdw_conn); + ereport(ERROR, + (errmsg("can not prepare transaction on foreign server %s", + fdw_conn->servername))); + } + + /* Prepare succeeded, remember it in the connection */ + fdw_conn->fdw_xact = fdw_xact; + } + return; +} +/* + * register_fdw_xact + * This function is used to create new foreign transaction entry before an FDW + * executes the first phase of two-phase commit. The function adds the entry to + * WAL and then persists it to the disk by creating a file under + * data/pg_fdw_xact directory. + */ +static FDWXact +register_fdw_xact(Oid dbid, TransactionId xid, Oid serverid, Oid userid, + Oid umid, int fdw_xact_id_len, char *fdw_xact_id) +{ + FDWXact fdw_xact; + FDWXactOnDiskData *fdw_xact_file_data; + int data_len; + char path[MAXPGPATH]; + int fd; + pg_crc32c fdw_xact_crc; + pg_crc32c bogus_crc; + + /* Enter the foreign transaction in the shared memory structure */ + fdw_xact = insert_fdw_xact(dbid, xid, serverid, userid, + fdw_xact_id_len, fdw_xact_id, + FDW_XACT_PREPARING); + /* + * Prepare to write the entry to a file. Also add xlog entry. The contents + * of the xlog record are same as what is written to the file. + */ + data_len = offsetof(FDWXactOnDiskData, fdw_xact_id); + data_len = data_len + fdw_xact->fdw_xact_id_len; + data_len = MAXALIGN(data_len); + fdw_xact_file_data = (FDWXactOnDiskData *) palloc0(data_len); + fdw_xact_file_data->dboid = fdw_xact->dboid; + fdw_xact_file_data->local_xid = fdw_xact->local_xid; + fdw_xact_file_data->serverid = fdw_xact->serverid; + fdw_xact_file_data->userid = fdw_xact->userid; + fdw_xact_file_data->umid = fdw_xact->umid; + fdw_xact_file_data->fdw_xact_id_len = fdw_xact->fdw_xact_id_len; + memcpy(fdw_xact_file_data->fdw_xact_id, fdw_xact->fdw_xact_id, + fdw_xact->fdw_xact_id_len); + + FDWXactFilePath(path, fdw_xact->local_xid, fdw_xact->serverid, + fdw_xact->userid); + + /* Create the file, but error out if it already exists. */ + fd = OpenTransientFile(path, O_EXCL | O_CREAT | PG_BINARY | O_WRONLY, + S_IRUSR | S_IWUSR); + if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create foreign transaction state file \"%s\": %m", + path))); + + /* Write data to file, and calculate CRC as we pass over it */ + INIT_CRC32C(fdw_xact_crc); + COMP_CRC32C(fdw_xact_crc, fdw_xact_file_data, data_len); + if (write(fd, fdw_xact_file_data, data_len) != data_len) + { + CloseTransientFile(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write FDW transaction state file: %s", path))); + } + + FIN_CRC32C(fdw_xact_crc); + /* + * Write a deliberately bogus CRC to the state file; this is just paranoia + * to catch the case where four more bytes will run us out of disk space. + */ + bogus_crc = ~fdw_xact_crc; + + if ((write(fd, &bogus_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c)) + { + CloseTransientFile(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write two-phase state file: %m"))); + } + + /* Back up to prepare for rewriting the CRC */ + if (lseek(fd, -((off_t) sizeof(pg_crc32c)), SEEK_CUR) < 0) + { + CloseTransientFile(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek in two-phase state file: %m"))); + } + + /* + * The state file isn't valid yet, because we haven't written the correct + * CRC yet. Before we do that, insert entry in WAL and flush it to disk. + * + * Between the time we have written the WAL entry and the time we write + * out the correct state file CRC, we have an inconsistency: we have + * recorded the foreign transaction in WAL but not on the disk. We + * use a critical section to force a PANIC if we are unable to complete + * the write --- then, WAL replay should repair the inconsistency. The + * odds of a PANIC actually occurring should be very tiny given that we + * were able to write the bogus CRC above. + */ + START_CRIT_SECTION(); + + /* + * We have to set delayChkpt here, too; otherwise a checkpoint starting + * immediately after the WAL record is inserted could complete without + * fsync'ing our foreign transaction file. (This is essentially the same + * kind of race condition as the COMMIT-to-clog-write case that + * RecordTransactionCommit uses delayChkpt for; see notes there.) + */ + MyPgXact->delayChkpt = true; + + /* Add the entry in the xlog and save LSN for checkpointer */ + XLogBeginInsert(); + XLogRegisterData((char *)fdw_xact_file_data, data_len); + fdw_xact->fdw_xact_lsn = XLogInsert(RM_FDW_XACT_ID, XLOG_FDW_XACT_INSERT); + XLogFlush(fdw_xact->fdw_xact_lsn); + + /* If we crash now WAL replay will fix things */ + /* write correct CRC and close file */ + if ((write(fd, &fdw_xact_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c)) + { + CloseTransientFile(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write foreign transaction file: %m"))); + } + + if (CloseTransientFile(fd) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close foreign transaction file: %m"))); + + /* File is written completely, checkpoint can proceed with syncing */ + fdw_xact->fdw_xact_valid = true; + + MyPgXact->delayChkpt = false; + END_CRIT_SECTION(); + + pfree(fdw_xact_file_data); + return fdw_xact; +} + +/* + * insert_fdw_xact + * Insert a new entry for a given foreign transaction identified by transaction + * id, foreign server and user mapping, in the shared memory. The inserted entry + * is returned locked. + * + * If the entry already exists, the function raises an error. + */ +static FDWXact +insert_fdw_xact(Oid dboid, TransactionId xid, Oid serverid, Oid userid, + int fdw_xact_id_len, char *fdw_xact_id, FDWXactStatus fdw_xact_status) +{ + FDWXact fdw_xact; + int cnt; + UserMapping *user_mapping; + + if (!fdwXactExitRegistered) + { + before_shmem_exit(AtProcExit_FDWXact, 0); + fdwXactExitRegistered = true; + } + + if (fdw_xact_id_len > MAX_FDW_XACT_ID_LEN) + elog(ERROR, "foreign transaction identifier longer (%d) than allowed (%d)", + fdw_xact_id_len, MAX_FDW_XACT_ID_LEN); + + user_mapping = GetUserMapping(userid, serverid); + + LWLockAcquire(FDWXactLock, LW_EXCLUSIVE); + fdw_xact = NULL; + for (cnt = 0; cnt < FDWXactGlobal->num_fdw_xacts; cnt++) + { + fdw_xact = FDWXactGlobal->fdw_xacts[cnt]; + + if (fdw_xact->local_xid == xid && + fdw_xact->serverid == serverid && + fdw_xact->userid == userid) + elog(ERROR, "duplicate entry for foreign transaction with transaction id %u, serverid %u, userid %u found", + xid, serverid, userid); + } + + /* + * Get the next free foreign transaction entry. Raise error if there are + * none left. + */ + if (!FDWXactGlobal->freeFDWXacts) + { + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("maximum number of foreign transactions reached"), + errhint("Increase max_prepared_foreign_transactions (currently %d).", + max_fdw_xacts))); + } + + fdw_xact = FDWXactGlobal->freeFDWXacts; + FDWXactGlobal->freeFDWXacts = fdw_xact->fx_next; + + /* Insert the entry to active array */ + Assert(FDWXactGlobal->num_fdw_xacts < max_fdw_xacts); + FDWXactGlobal->fdw_xacts[FDWXactGlobal->num_fdw_xacts++] = fdw_xact; + + /* Stamp the entry with backend id before releasing the LWLock */ + fdw_xact->locking_backend = MyBackendId; + fdw_xact->dboid = dboid; + fdw_xact->local_xid = xid; + fdw_xact->serverid = serverid; + fdw_xact->userid = userid; + fdw_xact->umid = user_mapping->umid; + fdw_xact->fdw_xact_status = fdw_xact_status; + fdw_xact->fdw_xact_lsn = 0; + fdw_xact->fdw_xact_valid = false; + fdw_xact->fdw_xact_id_len = fdw_xact_id_len; + memcpy(fdw_xact->fdw_xact_id, fdw_xact_id, fdw_xact_id_len); + + /* Remember that we have locked this entry. */ + MyLockedFDWXacts = lappend(MyLockedFDWXacts, fdw_xact); + LWLockRelease(FDWXactLock); + + return fdw_xact; +} + +/* + * remove_fdw_xact + * Removes the foreign prepared transaction entry from shared memory, disk and + * logs about the removal in WAL. + */ +static void +remove_fdw_xact(FDWXact fdw_xact) +{ + int cnt; + + LWLockAcquire(FDWXactLock, LW_EXCLUSIVE); + /* Search the slot where this entry resided */ + for (cnt = 0; cnt < FDWXactGlobal->num_fdw_xacts; cnt++) + { + if (FDWXactGlobal->fdw_xacts[cnt] == fdw_xact) + { + FdwRemoveXlogRec fdw_remove_xlog; + + /* Fill up the log record before releasing the entry */ + fdw_remove_xlog.serverid = fdw_xact->serverid; + fdw_remove_xlog.dbid = fdw_xact->dboid; + fdw_remove_xlog.xid = fdw_xact->local_xid; + fdw_remove_xlog.userid = fdw_xact->userid; + + /* Remove the entry from active array */ + FDWXactGlobal->num_fdw_xacts--; + FDWXactGlobal->fdw_xacts[cnt] = FDWXactGlobal->fdw_xacts[FDWXactGlobal->num_fdw_xacts]; + + /* Put it back into free list */ + fdw_xact->fx_next = FDWXactGlobal->freeFDWXacts; + FDWXactGlobal->freeFDWXacts = fdw_xact; + + /* Unlock the entry */ + fdw_xact->locking_backend = InvalidBackendId; + MyLockedFDWXacts = list_delete_ptr(MyLockedFDWXacts, fdw_xact); + + LWLockRelease(FDWXactLock); + + /* + * Log that we are removing the foreign transaction entry and remove + * the file from the disk as well. + */ + XLogBeginInsert(); + XLogRegisterData((char *)&fdw_remove_xlog, sizeof(fdw_remove_xlog)); + XLogInsert(RM_FDW_XACT_ID, XLOG_FDW_XACT_REMOVE); + + /* Remove the file from the disk as well. */ + RemoveFDWXactFile(fdw_remove_xlog.xid, fdw_remove_xlog.serverid, + fdw_remove_xlog.userid, true); + return; + } + } + LWLockRelease(FDWXactLock); + + /* We did not find the given entry in global array */ + elog(ERROR, "failed to find %p in FDWXactGlobal array", fdw_xact); +} + +/* + * unlock_fdw_xact + * Unlock the foreign transaction entry by wiping out the locking_backend and + * removing it from the backend's list of foreign transaction. + */ +static void +unlock_fdw_xact(FDWXact fdw_xact) +{ + /* Only the backend holding the lock is allowed to unlock */ + Assert(fdw_xact->locking_backend == MyBackendId); + /* + * First set the locking backend as invalid, and then remove it from the + * list of locked foreign transactions, under the LW lock. If we reverse the + * order and process exits in-between those two, we will be left an entry + * locked by this backend, which gets unlocked only at the server restart. + */ + + LWLockAcquire(FDWXactLock, LW_EXCLUSIVE); + fdw_xact->locking_backend = InvalidBackendId; + MyLockedFDWXacts = list_delete_ptr(MyLockedFDWXacts, fdw_xact); + LWLockRelease(FDWXactLock); +} + +/* + * unlock_fdw_xact_entries + * Unlock the foreign transaction entries locked by this backend. + */ +static void +unlock_fdw_xact_entries() +{ + while (MyLockedFDWXacts) + { + FDWXact fdw_xact = (FDWXact) linitial(MyLockedFDWXacts); + unlock_fdw_xact(fdw_xact); + } +} + +/* + * AtProcExit_FDWXact + * When the process exits, unlock the entries it held. + */ +static void +AtProcExit_FDWXact(int code, Datum arg) +{ + unlock_fdw_xact_entries(); +} + +/* + * AtEOXact_FDWXacts + * The function executes phase 2 of two-phase commit protocol. + * At the end of transaction perform following actions + * 1. Mark the entries locked by this backend as ABORTING or COMMITTING + * according the result of transaction. + * 2. Try to commit or abort the transactions on foreign servers. If that + * succeeds, remove them from foreign transaction entries, otherwise unlock + * them. + */ +extern void +AtEOXact_FDWXacts(bool is_commit) +{ + ListCell *lcell; + + foreach(lcell, MyFDWConnections) + { + FDWConnection *fdw_conn = lfirst(lcell); + + /* Commit/abort prepared foreign transactions */ + if (fdw_conn->fdw_xact) + { + FDWXact fdw_xact = fdw_conn->fdw_xact; + fdw_xact->fdw_xact_status = (is_commit ? + FDW_XACT_COMMITTING_PREPARED : + FDW_XACT_ABORTING_PREPARED); + /* Try aborting or commiting the transaction on the foreign server */ + if (!resolve_fdw_xact(fdw_xact, fdw_conn->resolve_prepared_foreign_xact)) + { + /* + * The transaction was not resolved on the foreign server, unlock + * it, so that someone else can take care of it. + */ + unlock_fdw_xact(fdw_xact); + } + } + else + { + /* + * On servers where two phase commit protocol could not be executed + * we have tried to commit the transactions during pre-commit phase. + * Any remaining transactions need to be aborted. + */ + Assert(!is_commit); + + /* + * The FDW has to make sure that the connection opened to the + * foreign server is out of transaction. Even if the handler + * function returns failure statue, there's hardly anything to do. + */ + if (!fdw_conn->end_foreing_xact(fdw_conn->serverid, fdw_conn->userid, + fdw_conn->umid, is_commit)) + elog(WARNING, "could not %s transaction on server %s", + is_commit ? "commit" : "abort", + fdw_conn->servername); + } + } + + /* + * Unlock any locked foreign transactions. Resolver might lock the entries, + * and may not be able to unlock them if aborted in-between. In any case, + * there is no reason for a foreign transaction entry to be locked after the + * transaction which locked it has ended. + */ + unlock_fdw_xact_entries(); + + /* + * Reset the list of registered connections. Since the memory for the list + * and its nodes comes from transaction memory context, it will be freed + * after this call. + */ + MyFDWConnections = NIL; + /* Set TwoPhaseReady to its default value */ + TwoPhaseReady = true; +} + +/* + * AtPrepare_FDWXacts + * The function is called while preparing a transaction. If there are foreign + * servers involved in the transaction, this function prepares transactions + * on those servers. + */ +extern void +AtPrepare_FDWXacts(void) +{ + /* If there are no foreign servers involved, we have no business here */ + if (list_length(MyFDWConnections) < 1) + return; + + /* + * All foreign servers participating in a transaction to be prepared should + * be two phase compliant. + */ + if (!TwoPhaseReady) + ereport(ERROR, + (errcode(ERRCODE_T_R_INTEGRITY_CONSTRAINT_VIOLATION), + errmsg("can not prepare the transaction because some foreign server/s involved in transaction can not prepare the transaction"))); + + /* Prepare transactions on participating foreign servers. */ + prepare_foreign_transactions(); + + /* + * Unlock the foreign transaction entries so COMMIT/ROLLBACK PREPARED from + * some other backend will be able to lock those if required. + */ + unlock_fdw_xact_entries(); + + /* + * Reset the list of registered connections. Since the memory for the list + * and its nodes comes from transaction memory context, it will be freed + * after this call. + */ + MyFDWConnections = NIL; + /* Set TwoPhaseReady to its default value */ + TwoPhaseReady = true; +} + +/* + * FDWXactTwoPhaseFinish + * This function is called as part of the COMMIT/ROLLBACK PREPARED command to + * commit/rollback the foreign transactions prepared as part of the local + * prepared transaction. The function looks for the foreign transaction entries + * with local_xid equal to xid of the prepared transaction and tries to resolve them. + */ +extern void +FDWXactTwoPhaseFinish(bool isCommit, TransactionId xid) +{ + List *entries_to_resolve; + + FDWXactStatus status = isCommit ? FDW_XACT_COMMITTING_PREPARED : + FDW_XACT_ABORTING_PREPARED; + /* Get all the entries belonging to the given transaction id locked. If + * foreign transaction resolver is running, it might lock entries to + * check whether they can be resolved. The search function will skip such + * entries. The resolver will resolve them at a later point of time. + */ + search_fdw_xact(xid, InvalidOid, InvalidOid, InvalidOid, &entries_to_resolve); + + /* Try resolving the foreign transactions */ + while (entries_to_resolve) + { + FDWXact fdw_xact = linitial(entries_to_resolve); + entries_to_resolve = list_delete_first(entries_to_resolve); + fdw_xact->fdw_xact_status = status; + + /* + * Resolve the foreign transaction. If resolution is not successful, + * unlock the entry so that someone else can pick it up. + */ + if (!resolve_fdw_xact(fdw_xact, + get_prepared_foreign_xact_resolver(fdw_xact))) + unlock_fdw_xact(fdw_xact); + } +} + +static ResolvePreparedForeignTransaction_function +get_prepared_foreign_xact_resolver(FDWXact fdw_xact) +{ + ForeignServer *foreign_server; + ForeignDataWrapper *fdw; + FdwRoutine *fdw_routine; + + foreign_server = GetForeignServer(fdw_xact->serverid); + fdw = GetForeignDataWrapper(foreign_server->fdwid); + fdw_routine = GetFdwRoutine(fdw->fdwhandler); + if (!fdw_routine->ResolvePreparedForeignTransaction) + elog(ERROR, "no foreign transaction resolver routine provided for FDW %s", + fdw->fdwname); + return fdw_routine->ResolvePreparedForeignTransaction; +} + +/* + * resolve_fdw_xact + * Resolve the foreign transaction using the foreign data wrapper's transaction + * handler routine. + * If the resolution is successful, remove the foreign transaction entry from + * the shared memory and also remove the corresponding on-disk file. + */ +static bool +resolve_fdw_xact(FDWXact fdw_xact, + ResolvePreparedForeignTransaction_function fdw_xact_handler) +{ + bool resolved; + bool is_commit; + + Assert(fdw_xact->fdw_xact_status == FDW_XACT_COMMITTING_PREPARED || + fdw_xact->fdw_xact_status == FDW_XACT_ABORTING_PREPARED); + + is_commit = (fdw_xact->fdw_xact_status == FDW_XACT_COMMITTING_PREPARED) ? + true : false; + + resolved = fdw_xact_handler(fdw_xact->serverid, fdw_xact->userid, + fdw_xact->umid, is_commit, + fdw_xact->fdw_xact_id_len, + fdw_xact->fdw_xact_id); + + /* If we succeeded in resolving the transaction, remove the entry */ + if (resolved) + remove_fdw_xact(fdw_xact); + + return resolved; +} + +/* + * fdw_xact_exists + * Returns true if there exists at least one prepared foreign transaction which + * matches criteria. This function is wrapper around search_fdw_xact. Check that + * function's prologue for details. + */ +bool +fdw_xact_exists(TransactionId xid, Oid dbid, Oid serverid, Oid userid) +{ + return search_fdw_xact(xid, dbid, serverid, userid, NULL); +} + +/* + * search_fdw_xact + * Return true if there exists at least one prepared foreign transaction + * entry with given criteria. The criteria is defined by arguments with + * valid values for respective datatypes. + * + * The table below explains the same + * xid | dbid | serverid | userid | search for entry with + * invalid | invalid | invalid | invalid | nothing + * invalid | invalid | invalid | valid | given userid + * invalid | invalid | valid | invalid | given serverid + * invalid | invalid | valid | valid | given serverid and userid + * invalid | valid | invalid | invalid | given dbid + * invalid | valid | invalid | valid | given dbid and userid + * invalid | valid | valid | invalid | given dbid and serverid + * invalid | valid | valid | valid | given dbid, servroid and userid + * valid | invalid | invalid | invalid | given xid + * valid | invalid | invalid | valid | given xid and userid + * valid | invalid | valid | invalid | given xid, serverid + * valid | invalid | valid | valid | given xid, serverid, userid + * valid | valid | invalid | invalid | given xid and dbid + * valid | valid | invalid | valid | given xid, dbid and userid + * valid | valid | valid | invalid | given xid, dbid, serverid + * valid | valid | valid | valid | given xid, dbid, serverid, userid + * + * When the criteria is void (all arguments invalid) the + * function returns true, since any entry would match the criteria. + * + * If qualifying_fdw_xacts is not NULL, the qualifying entries are locked and + * returned in a linked list. Any entry which is already locked is ignored. If + * all the qualifying entries are locked, nothing will be returned in the list + * but returned value will be true. + */ +bool +search_fdw_xact(TransactionId xid, Oid dbid, Oid serverid, Oid userid, + List **qualifying_xacts) +{ + int cnt; + LWLockMode lock_mode; + /* Return value if a qualifying entry exists */ + bool entry_exists = false; + + if (qualifying_xacts) + { + *qualifying_xacts = NIL; + /* The caller expects us to lock entries */ + lock_mode = LW_EXCLUSIVE; + } + else + lock_mode = LW_SHARED; + + LWLockAcquire(FDWXactLock, lock_mode); + for (cnt = 0; cnt < FDWXactGlobal->num_fdw_xacts; cnt++) + { + FDWXact fdw_xact = FDWXactGlobal->fdw_xacts[cnt]; + bool entry_matches = true; + + /* xid */ + if (xid != InvalidTransactionId && xid != fdw_xact->local_xid) + entry_matches = false; + + /* dbid */ + if (OidIsValid(dbid) && fdw_xact->dboid != dbid) + entry_matches = false; + + /* serverid */ + if (OidIsValid(serverid) && serverid != fdw_xact->serverid) + entry_matches = false; + + /* userid */ + if (OidIsValid(userid) && fdw_xact->userid != userid) + entry_matches = false; + + if (entry_matches) + { + entry_exists = true; + if (qualifying_xacts) + { + /* + * User has requested list of qualifying entries. If the + * matching entry is not locked, lock it and add to the list. If + * the entry is locked by some other backend, ignore it. + */ + if (fdw_xact->locking_backend == InvalidBackendId) + { + MemoryContext oldcontext; + fdw_xact->locking_backend = MyBackendId; + + /* The list and its members may be required at the end of the transaction */ + oldcontext = MemoryContextSwitchTo(TopTransactionContext); + MyLockedFDWXacts = lappend(MyLockedFDWXacts, fdw_xact); + MemoryContextSwitchTo(oldcontext); + } + else if (fdw_xact->locking_backend != MyBackendId) + continue; + + *qualifying_xacts = lappend(*qualifying_xacts, fdw_xact); + } + else + { + /* + * User wants to check the existence, and we have found one + * matching entry. No need to check other entries. + */ + break; + } + } + } + + LWLockRelease(FDWXactLock); + + return entry_exists; +} + +/* + * get_dbids_with_unresolved_xact + * returns the oids of the databases containing unresolved foreign transactions. + * The function is used by pg_fdw_xact_resolver extension. Returns NIL if + * no such entry exists. + */ +List * +get_dbids_with_unresolved_xact(void) +{ + int cnt_xact; + List *dbid_list = NIL; + + LWLockAcquire(FDWXactLock, LW_SHARED); + for (cnt_xact = 0; cnt_xact < FDWXactGlobal->num_fdw_xacts; cnt_xact++) + { + FDWXact fdw_xact; + + fdw_xact = FDWXactGlobal->fdw_xacts[cnt_xact]; + + /* Skip locked entry as someone must be working on it */ + if (fdw_xact->locking_backend == InvalidBackendId) + dbid_list = list_append_unique_oid(dbid_list, fdw_xact->dboid); + } + LWLockRelease(FDWXactLock); + + return dbid_list; +} + +/* + * fdw_xact_redo + * Apply the redo log for a foreign transaction. + */ +extern void +fdw_xact_redo(XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + int rec_len = XLogRecGetDataLen(record); + TransactionId xid = XLogRecGetXid(record); + + if (info == XLOG_FDW_XACT_INSERT) + { + FDWXactOnDiskData *fdw_xact_data_file = (FDWXactOnDiskData *)rec; + char path[MAXPGPATH]; + int fd; + pg_crc32c fdw_xact_crc; + + /* Recompute CRC */ + INIT_CRC32C(fdw_xact_crc); + COMP_CRC32C(fdw_xact_crc, rec, rec_len); + FIN_CRC32C(fdw_xact_crc); + + FDWXactFilePath(path, xid, fdw_xact_data_file->serverid, + fdw_xact_data_file->userid); + /* + * The file may exist, if it was flushed to disk after creating it. The + * file might have been flushed while it was being crafted, so the + * contents can not be guaranteed to be accurate. Hence truncate and + * rewrite the file. + */ + fd = OpenTransientFile(path, O_CREAT | O_WRONLY | O_TRUNC | PG_BINARY, + S_IRUSR | S_IWUSR); + if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create/open foreign transaction state file \"%s\": %m", + path))); + + /* The log record is exactly the contents of the file. */ + if (write(fd, rec, rec_len) != rec_len) + { + CloseTransientFile(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write FDW transaction state file: %s", path))); + } + + if (write(fd, &fdw_xact_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c)) + { + CloseTransientFile(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write two-phase state file: %m"))); + } + + /* + * We must fsync the file because the end-of-replay checkpoint will not do + * so, there being no foreign transaction entry in shared memory yet to + * tell it to. + */ + if (pg_fsync(fd) != 0) + { + CloseTransientFile(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not fsync foreign transaction state file: %m"))); + } + + CloseTransientFile(fd); + } + else if (info == XLOG_FDW_XACT_REMOVE) + { + FdwRemoveXlogRec *fdw_remove_xlog = (FdwRemoveXlogRec *)rec; + + /* Remove the file from the disk. */ + RemoveFDWXactFile(fdw_remove_xlog->xid, fdw_remove_xlog->serverid, fdw_remove_xlog->userid, + true); + } + else + elog(ERROR, "invalid log type %d in foreign transction log record", info); + + return; +} + +/* + * CheckPointFDWXact + * Function syncs the foreign transaction files created between the two + * checkpoints. + * The foreign transaction entries and hence the corresponding files are expected + * to be very short-lived. By executing this function at the end, we might have + * lesser files to fsync, thus reducing some I/O. This is similar to + * CheckPointTwoPhase(). + * In order to avoid disk I/O while holding a light weight lock, the function + * first collects the files which need to be synced under FDWXactLock and then + * syncs them after releasing the lock. This approach creates a race condition: + * after releasing the lock, and before syncing a file, the corresponding + * foreign transaction entry and hence the file might get removed. The function + * checks whether that's true and ignores the error if so. + */ +void +CheckPointFDWXact(XLogRecPtr redo_horizon) +{ + Oid *serverids; + TransactionId *xids; + Oid *userids; + Oid *dbids; + int nxacts; + int cnt; + /* Quick get-away, before taking lock */ + if (max_fdw_xacts <= 0) + return; + + LWLockAcquire(FDWXactLock, LW_SHARED); + + /* Another quick, before we allocate memory */ + if (FDWXactGlobal->num_fdw_xacts <= 0) + { + LWLockRelease(FDWXactLock); + return; + } + + /* + * Collect the file paths which need to be synced. We might sync a file + * again if it lives beyond the checkpoint boundaries. But this case is rare + * and may not involve much I/O. + */ + xids = (TransactionId *) palloc(FDWXactGlobal->num_fdw_xacts * sizeof(TransactionId)); + serverids = (Oid *) palloc(FDWXactGlobal->num_fdw_xacts * sizeof(Oid)); + userids = (Oid *) palloc(FDWXactGlobal->num_fdw_xacts * sizeof(Oid)); + dbids = (Oid *) palloc(FDWXactGlobal->num_fdw_xacts * sizeof(Oid)); + nxacts = 0; + + for (cnt = 0; cnt < FDWXactGlobal->num_fdw_xacts; cnt++) + { + FDWXact fdw_xact = FDWXactGlobal->fdw_xacts[cnt]; + if (fdw_xact->fdw_xact_valid && + fdw_xact->fdw_xact_lsn <= redo_horizon) + { + xids[nxacts] = fdw_xact->local_xid; + serverids[nxacts] = fdw_xact->serverid; + userids[nxacts] = fdw_xact->userid; + dbids[nxacts] = fdw_xact->dboid; + nxacts++; + } + } + + LWLockRelease(FDWXactLock); + + for (cnt = 0; cnt < nxacts; cnt++) + { + char path[MAXPGPATH]; + int fd; + + FDWXactFilePath(path, xids[cnt], serverids[cnt], userids[cnt]); + + fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0); + + if (fd < 0) + { + if (errno == ENOENT) + { + /* OK if we do not have the entry anymore */ + if (!fdw_xact_exists(xids[cnt], dbids[cnt], serverids[cnt], + userids[cnt])) + continue; + + /* Restore errno in case it was changed */ + errno = ENOENT; + } + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open foreign transaction state file \"%s\": %m", + path))); + } + + if (pg_fsync(fd) != 0) + { + CloseTransientFile(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not fsync foreign transaction state file \"%s\": %m", + path))); + } + + if (CloseTransientFile(fd) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close foreign transaction state file \"%s\": %m", + path))); + } + + pfree(xids); + pfree(serverids); + pfree(userids); + pfree(dbids); +} + +/* Built in functions */ +/* + * pg_fdw_xact + * Produce a view with one row per prepared transaction on foreign server. + * + * This function is here so we don't have to export the + * FDWXactGlobalData struct definition. + * + */ + +/* + * Structure to hold and iterate over the foreign transactions to be displayed + * by the built-in functions. + */ +typedef struct +{ + FDWXact fdw_xacts; + int num_xacts; + int cur_xact; +} WorkingStatus; + +/* + * Returns an array of all foreign prepared transactions for the user-level + * function pg_fdw_xact. + * + * The returned array and all its elements are copies of internal data + * structures, to minimize the time we need to hold the FDWXactLock. + * + * WARNING -- we return even those transactions whose information is not + * completely filled yet. The caller should filter them out if he doesn't want them. + * + * The returned array is palloc'd. + */ +static int +GetFDWXactList(FDWXact *fdw_xacts) +{ + int num_xacts; + int cnt_xacts; + + LWLockAcquire(FDWXactLock, LW_SHARED); + + if (FDWXactGlobal->num_fdw_xacts == 0) + { + LWLockRelease(FDWXactLock); + *fdw_xacts = NULL; + return 0; + } + + num_xacts = FDWXactGlobal->num_fdw_xacts; + *fdw_xacts = (FDWXact) palloc(sizeof(FDWXactData) * num_xacts); + for (cnt_xacts = 0; cnt_xacts < num_xacts; cnt_xacts++) + memcpy((*fdw_xacts) + cnt_xacts, FDWXactGlobal->fdw_xacts[cnt_xacts], + sizeof(FDWXactData)); + + LWLockRelease(FDWXactLock); + + return num_xacts; +} + +Datum +pg_fdw_xact(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + WorkingStatus *status; + char *xact_status; + + if (SRF_IS_FIRSTCALL()) + { + TupleDesc tupdesc; + MemoryContext oldcontext; + + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + + /* + * Switch to memory context appropriate for multiple function calls + */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + /* build tupdesc for result tuples */ + /* this had better match pg_fdw_xacts view in system_views.sql */ + tupdesc = CreateTemplateTupleDesc(6, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "dbid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "transaction", + XIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "serverid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "userid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "status", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "identifier", + TEXTOID, -1, 0); + + funcctx->tuple_desc = BlessTupleDesc(tupdesc); + + /* + * Collect status information that we will format and send + * out as a result set. + */ + status = (WorkingStatus *) palloc(sizeof(WorkingStatus)); + funcctx->user_fctx = (void *) status; + + status->num_xacts = GetFDWXactList(&status->fdw_xacts); + status->cur_xact = 0; + + MemoryContextSwitchTo(oldcontext); + } + + funcctx = SRF_PERCALL_SETUP(); + status = funcctx->user_fctx; + + while (status->cur_xact < status->num_xacts) + { + FDWXact fdw_xact = &status->fdw_xacts[status->cur_xact++]; + Datum values[6]; + bool nulls[6]; + HeapTuple tuple; + Datum result; + + if (!fdw_xact->fdw_xact_valid) + continue; + + /* + * Form tuple with appropriate data. + */ + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); + + values[0] = ObjectIdGetDatum(fdw_xact->dboid); + values[1] = TransactionIdGetDatum(fdw_xact->local_xid); + values[2] = ObjectIdGetDatum(fdw_xact->serverid); + values[3] = ObjectIdGetDatum(fdw_xact->userid); + switch (fdw_xact->fdw_xact_status) + { + case FDW_XACT_PREPARING: + xact_status = "prepared"; + break; + case FDW_XACT_COMMITTING_PREPARED: + xact_status = "committing"; + break; + case FDW_XACT_ABORTING_PREPARED: + xact_status = "aborting"; + break; + default: + xact_status = "unknown"; + break; + } + values[4] = CStringGetTextDatum(xact_status); + /* should this be really interpreted by FDW */ + values[5] = PointerGetDatum(cstring_to_text_with_len(fdw_xact->fdw_xact_id, + fdw_xact->fdw_xact_id_len)); + + tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); + result = HeapTupleGetDatum(tuple); + SRF_RETURN_NEXT(funcctx, result); + } + + SRF_RETURN_DONE(funcctx); +} + +/* + * pg_fdw_resolve + * a user interface to initiate foreign transaction resolution. The function + * tries to resolve the prepared transactions on foreign servers in the database + * from where it is run. + * The function prints the status of all the foreign transactions it + * encountered, whether resolved or not. + */ +Datum +pg_fdw_resolve(PG_FUNCTION_ARGS) +{ + MemoryContext oldcontext; + FuncCallContext *funcctx; + WorkingStatus *status; + char *xact_status; + List *entries_to_resolve; + + if (SRF_IS_FIRSTCALL()) + { + TupleDesc tupdesc; + + /* We will be modifying the shared memory. Prepare to clean up on exit */ + if (!fdwXactExitRegistered) + { + before_shmem_exit(AtProcExit_FDWXact, 0); + fdwXactExitRegistered = true; + } + + /* Allocate space for and prepare the returning set */ + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + /* Switch to memory context appropriate for multiple function calls */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + /* build tupdesc for result tuples */ + tupdesc = CreateTemplateTupleDesc(6, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "dbid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "transaction", + XIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "serverid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "userid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "status", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "identifier", + TEXTOID, -1, 0); + + funcctx->tuple_desc = BlessTupleDesc(tupdesc); + + /* + * Collect status information that we will format and send + * out as a result set. + */ + status = (WorkingStatus *) palloc(sizeof(WorkingStatus)); + funcctx->user_fctx = (void *) status; + status->fdw_xacts = (FDWXact) palloc(sizeof(FDWXactData) * FDWXactGlobal->num_fdw_xacts); + status->num_xacts = 0; + status->cur_xact = 0; + + /* Done preparation for the result. */ + MemoryContextSwitchTo(oldcontext); + + /* + * Get entries whose foreign servers are part of the database where + * this function was called. We can get information about only such + * foreign servers. The function will lock the entries. The entries + * which are locked by other backends and whose foreign servers belong + * to this database are left out, since we can not work on those. + */ + search_fdw_xact(InvalidTransactionId, MyDatabaseId, InvalidOid, InvalidOid, + &entries_to_resolve); + + /* Work to resolve the resolvable entries */ + while (entries_to_resolve) + { + FDWXact fdw_xact = linitial(entries_to_resolve); + + /* Remove the entry as we will not use it again */ + entries_to_resolve = list_delete_first(entries_to_resolve); + + /* Copy the data for the sake of result. */ + memcpy(status->fdw_xacts + status->num_xacts++, + fdw_xact, sizeof(FDWXactData)); + + if (fdw_xact->fdw_xact_status == FDW_XACT_COMMITTING_PREPARED || + fdw_xact->fdw_xact_status == FDW_XACT_ABORTING_PREPARED) + { + /* + * We have already decided what to do with the foreign transaction + * nothing to be done. + */ + } + else if (TransactionIdDidCommit(fdw_xact->local_xid)) + fdw_xact->fdw_xact_status = FDW_XACT_COMMITTING_PREPARED; + else if (TransactionIdDidAbort(fdw_xact->local_xid)) + fdw_xact->fdw_xact_status = FDW_XACT_ABORTING_PREPARED; + else if (!TransactionIdIsInProgress(fdw_xact->local_xid)) + { + /* + * The transaction is in progress but not on any of the backends. So + * probably, it crashed before actual abort or commit. So assume it + * to be aborted. + */ + fdw_xact->fdw_xact_status = FDW_XACT_ABORTING_PREPARED; + } + else + { + /* + * Local transaction is in progress, should not resolve the foreign + * transaction. This can happen when the foreign transaction is + * prepared as part of a local prepared transaction. Just + * continue with the next one. + */ + unlock_fdw_xact(fdw_xact); + continue; + } + + /* + * Resolve the foreign transaction. If resolution was not successful, + * unlock the entry so that someone else can pick it up + */ + if (!resolve_fdw_xact(fdw_xact, get_prepared_foreign_xact_resolver(fdw_xact))) + unlock_fdw_xact(fdw_xact); + else + /* Update the status in the result set */ + status->fdw_xacts[status->num_xacts - 1].fdw_xact_status = FDW_XACT_RESOLVED; + } + } + + /* Print the result set */ + funcctx = SRF_PERCALL_SETUP(); + status = funcctx->user_fctx; + + while (status->cur_xact < status->num_xacts) + { + FDWXact fdw_xact = &status->fdw_xacts[status->cur_xact++]; + Datum values[6]; + bool nulls[6]; + HeapTuple tuple; + Datum result; + + if (!fdw_xact->fdw_xact_valid) + continue; + + /* + * Form tuple with appropriate data. + */ + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); + + values[0] = ObjectIdGetDatum(fdw_xact->dboid); + values[1] = TransactionIdGetDatum(fdw_xact->local_xid); + values[2] = ObjectIdGetDatum(fdw_xact->serverid); + values[3] = ObjectIdGetDatum(fdw_xact->userid); + switch (fdw_xact->fdw_xact_status) + { + case FDW_XACT_PREPARING: + xact_status = "preparing"; + break; + case FDW_XACT_COMMITTING_PREPARED: + xact_status = "committing"; + break; + case FDW_XACT_ABORTING_PREPARED: + xact_status = "aborting"; + break; + case FDW_XACT_RESOLVED: + xact_status = "resolved"; + break; + default: + xact_status = "unknown"; + break; + } + values[4] = CStringGetTextDatum(xact_status); + /* should this be really interpreted by FDW? */ + values[5] = PointerGetDatum(cstring_to_text_with_len(fdw_xact->fdw_xact_id, + fdw_xact->fdw_xact_id_len)); + + tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); + result = HeapTupleGetDatum(tuple); + SRF_RETURN_NEXT(funcctx, result); + } + + SRF_RETURN_DONE(funcctx); +} + +/* + * Built-in function to remove prepared foreign transaction entry/s without + * resolving. The function gives a way to forget about such prepared + * transaction in case + * 1. The foreign server where it is prepared is no longer available + * 2. The user which prepared this transaction needs to be dropped + * 3. PITR is recoverying before a transaction id, which created the prepared + * foreign transaction + * 4. The database containing the entries needs to be dropped + * + * Or any such conditions in which resolution is no longer possible. + * + * The function accepts 4 arguments transaction id, dbid, serverid and userid, + * which define the criteria in the same way as search_fdw_xact(). The entries + * matching the criteria are removed. The function does not remove an entry + * which is locked by some other backend. + */ +Datum +pg_fdw_remove(PG_FUNCTION_ARGS) +{ +/* Some #defines only for this function to deal with the arguments */ +#define XID_ARGNUM 0 +#define DBID_ARGNUM 1 +#define SRVID_ARGNUM 2 +#define USRID_ARGNUM 3 + + TransactionId xid; + Oid dbid; + Oid serverid; + Oid userid; + List *entries_to_remove; + + xid = PG_ARGISNULL(XID_ARGNUM) ? InvalidTransactionId : + DatumGetTransactionId(PG_GETARG_DATUM(XID_ARGNUM)); + dbid = PG_ARGISNULL(DBID_ARGNUM) ? InvalidOid : + PG_GETARG_OID(DBID_ARGNUM); + serverid = PG_ARGISNULL(SRVID_ARGNUM) ? InvalidOid : + PG_GETARG_OID(SRVID_ARGNUM); + userid = PG_ARGISNULL(USRID_ARGNUM) ? InvalidOid : + PG_GETARG_OID(USRID_ARGNUM); + + search_fdw_xact(xid, dbid, serverid, userid, &entries_to_remove); + + while (entries_to_remove) + { + FDWXact fdw_xact = linitial(entries_to_remove); + entries_to_remove = list_delete_first(entries_to_remove); + + remove_fdw_xact(fdw_xact); + } + + PG_RETURN_VOID(); +} + +/* + * Code dealing with the on disk files used to store foreign transaction + * information. + */ + +/* + * ReadFDWXactFile + * Read the foreign transction state file and return the contents in a + * structure allocated in-memory. The structure can be later freed by the + * caller. + */ +static FDWXactOnDiskData * +ReadFDWXactFile(TransactionId xid, Oid serverid, Oid userid) +{ + char path[MAXPGPATH]; + int fd; + FDWXactOnDiskData *fdw_xact_file_data; + struct stat stat; + uint32 crc_offset; + pg_crc32c calc_crc; + pg_crc32c file_crc; + char *buf; + + FDWXactFilePath(path, xid, serverid, userid); + + fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0); + if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open FDW transaction state file \"%s\": %m", + path))); + + /* + * Check file length. We can determine a lower bound pretty easily. We + * set an upper bound to avoid palloc() failure on a corrupt file, though + * we can't guarantee that we won't get an out of memory error anyway, + * even on a valid file. + */ + if (fstat(fd, &stat)) + { + CloseTransientFile(fd); + + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not stat FDW transaction state file \"%s\": %m", + path))); + return NULL; + } + + if (stat.st_size < offsetof(FDWXactOnDiskData, fdw_xact_id) || + stat.st_size > MaxAllocSize) + { + CloseTransientFile(fd); + ereport(WARNING, + (errcode_for_file_access(), + errmsg("Too large FDW transaction state file \"%s\": %m", + path))); + return NULL; + } + + buf = (char *) palloc(stat.st_size); + fdw_xact_file_data = (FDWXactOnDiskData *)buf; + crc_offset = stat.st_size - sizeof(pg_crc32c); + /* Slurp the file */ + if (read(fd, fdw_xact_file_data, stat.st_size) != stat.st_size) + { + CloseTransientFile(fd); + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not read FDW transaction state file \"%s\": %m", + path))); + pfree(fdw_xact_file_data); + return NULL; + } + + CloseTransientFile(fd); + /* + * Check the CRC. + */ + + INIT_CRC32C(calc_crc); + COMP_CRC32C(calc_crc, buf, crc_offset); + FIN_CRC32C(calc_crc); + + file_crc = *((pg_crc32c *) (buf + crc_offset)); + + if (!EQ_CRC32C(calc_crc, file_crc)) + { + pfree(buf); + return NULL; + } + + if (fdw_xact_file_data->serverid != serverid || + fdw_xact_file_data->userid != userid || + fdw_xact_file_data->local_xid != xid) + { + ereport(WARNING, + (errmsg("removing corrupt foreign transaction state file \"%s\"", + path))); + CloseTransientFile(fd); + pfree(buf); + return NULL; + } + + return fdw_xact_file_data; +} + +/* + * PrescanFDWXacts + * Read the foreign prepared transactions directory for oldest active + * transaction. The transactions corresponding to the xids in this directory + * are not necessarily active per say locally. But we still need those XIDs to + * be alive so that + * 1. we can determine whether they are committed or aborted + * 2. the file name contains xid which shouldn't get used again to avoid + * conflicting file names. + * + * The function accepts the oldest active xid determined by other functions + * (e.g. PrescanPreparedTransactions()). It then compares every xid it comes + * across while scanning foreign prepared transactions directory with the oldest + * active xid. It returns the oldest of those xids or oldest active xid + * whichever is older. + * + * If any foreign prepared transaction is part of a future transaction (PITR), + * the function removes the corresponding file as + * 1. We can not know the status of the local transaction which prepared this + * foreign transaction + * 2. The foreign server or the user may not be available as per new timeline + * + * Anyway, the local transaction which prepared the foreign prepared transaction + * does not exist as per the new timeline, so it's better to forget the foreign + * prepared transaction as well. + */ +TransactionId +PrescanFDWXacts(TransactionId oldestActiveXid) +{ + TransactionId nextXid = ShmemVariableCache->nextXid; + DIR *cldir; + struct dirent *clde; + + cldir = AllocateDir(FDW_XACTS_DIR); + while ((clde = ReadDir(cldir, FDW_XACTS_DIR)) != NULL) + { + if (strlen(clde->d_name) == FDW_XACT_FILE_NAME_LEN && + strspn(clde->d_name, "0123456789ABCDEF_") == FDW_XACT_FILE_NAME_LEN) + { + Oid serverid; + Oid userid; + TransactionId local_xid; + + sscanf(clde->d_name, "%08x_%08x_%08x", &local_xid, &serverid, + &userid); + + /* + * Remove a foreign prepared transaction file correspnding + * to an XID, which is too new. + */ + if (TransactionIdFollowsOrEquals(local_xid, nextXid)) + { + ereport(WARNING, + (errmsg("removing future foreign prepared transaction file \"%s\"", + clde->d_name))); + RemoveFDWXactFile(local_xid, serverid, userid, true); + continue; + } + + if (TransactionIdPrecedesOrEquals(local_xid, oldestActiveXid)) + oldestActiveXid = local_xid; + } + } + + FreeDir(cldir); + return oldestActiveXid; +} +/* + * ReadFDWXact + * Read the foreign prepared transaction information and set it up for further + * usage. + */ +void +ReadFDWXacts(void) +{ + DIR *cldir; + struct dirent *clde; + + cldir = AllocateDir(FDW_XACTS_DIR); + while ((clde = ReadDir(cldir, FDW_XACTS_DIR)) != NULL) + { + if (strlen(clde->d_name) == FDW_XACT_FILE_NAME_LEN && + strspn(clde->d_name, "0123456789ABCDEF_") == FDW_XACT_FILE_NAME_LEN) + { + Oid serverid; + Oid userid; + TransactionId local_xid; + FDWXactOnDiskData *fdw_xact_file_data; + FDWXact fdw_xact; + + sscanf(clde->d_name, "%08x_%08x_%08x", &local_xid, &serverid, + &userid); + + fdw_xact_file_data = ReadFDWXactFile(local_xid, serverid, userid); + + if (!fdw_xact_file_data) + { + ereport(WARNING, + (errmsg("Removing corrupt foreign transaction file \"%s\"", + clde->d_name))); + RemoveFDWXactFile(local_xid, serverid, userid, false); + continue; + } + + ereport(LOG, + (errmsg("recovering foreign transaction entry for xid %u, foreign server %u and user %u", + local_xid, serverid, userid))); + + /* + * Add this entry into the table of foreign transactions. The status + * of the transaction is set as preparing, since we do not know the + * exact status right now. Resolver will set it later based on the + * status of local transaction which prepared this foreign + * transaction. + */ + fdw_xact = insert_fdw_xact(fdw_xact_file_data->dboid, local_xid, + serverid, userid, + fdw_xact_file_data->fdw_xact_id_len, + fdw_xact_file_data->fdw_xact_id, + FDW_XACT_PREPARING); + /* Add some valid LSN */ + fdw_xact->fdw_xact_lsn = 0; + /* Mark the entry as ready */ + fdw_xact->fdw_xact_valid = true; + /* Unlock the entry as we don't need it any further */ + unlock_fdw_xact(fdw_xact); + pfree(fdw_xact_file_data); + } + } + + FreeDir(cldir); +} + +/* + * Remove the foreign transaction file for given entry. + * + * If giveWarning is false, do not complain about file-not-present; + * this is an expected case during WAL replay. + */ +void +RemoveFDWXactFile(TransactionId xid, Oid serverid, Oid userid, bool giveWarning) +{ + char path[MAXPGPATH]; + + FDWXactFilePath(path, xid, serverid, userid); + if (unlink(path)) + if (errno != ENOENT || giveWarning) + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not remove foreign transaction state file \"%s\": %m", + path))); +} diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 31c5fd1..159f9d9 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -9,6 +9,7 @@ #include "access/clog.h" #include "access/commit_ts.h" +#include "access/fdw_xact.h" #include "access/gin.h" #include "access/gist_private.h" #include "access/generic_xlog.h" diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 9f55adc..2dd3df4 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -59,6 +59,7 @@ #include #include "access/commit_ts.h" +#include "access/fdw_xact.h" #include "access/htup_details.h" #include "access/subtrans.h" #include "access/transam.h" @@ -1452,6 +1453,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit) PredicateLockTwoPhaseFinish(xid, isCommit); + /* + * Commit/Rollback the foreign transactions prepared as part of this + * prepared transaction. + */ + FDWXactTwoPhaseFinish(isCommit, xid); + /* Count the prepared xact as committed or aborted */ AtEOXact_PgStat(isCommit); diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 23f36ea..e140e71 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -21,6 +21,7 @@ #include #include "access/commit_ts.h" +#include "access/fdw_xact.h" #include "access/multixact.h" #include "access/parallel.h" #include "access/subtrans.h" @@ -186,6 +187,10 @@ typedef struct TransactionStateData bool didLogXid; /* has xid been included in WAL record? */ int parallelModeLevel; /* Enter/ExitParallelMode counter */ struct TransactionStateData *parent; /* back link to parent */ + int num_foreign_servers; /* number of foreign servers participating in the transaction, + Only valid for top level transaction */ + int can_prepare; /* can all the foreign server involved in + this transaction participate in 2PC */ } TransactionStateData; typedef TransactionStateData *TransactionState; @@ -1921,6 +1926,9 @@ StartTransaction(void) AtStart_Cache(); AfterTriggerBeginXact(); + /* Foreign transaction stuff */ + s->num_foreign_servers = 0; + /* * done with start processing, set current transaction state to "in * progress" @@ -1981,6 +1989,9 @@ CommitTransaction(void) break; } + /* Pre-commit step for foreign transcations */ + PreCommit_FDWXacts(); + CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_PRE_COMMIT : XACT_EVENT_PRE_COMMIT); @@ -2138,6 +2149,7 @@ CommitTransaction(void) AtEOXact_HashTables(true); AtEOXact_PgStat(true); AtEOXact_Snapshot(true); + AtEOXact_FDWXacts(true); pgstat_report_xact_timestamp(0); CurrentResourceOwner = NULL; @@ -2322,6 +2334,7 @@ PrepareTransaction(void) AtPrepare_PgStat(); AtPrepare_MultiXact(); AtPrepare_RelationMap(); + AtPrepare_FDWXacts(); /* * Here is where we really truly prepare. @@ -2608,6 +2621,7 @@ AbortTransaction(void) AtEOXact_ComboCid(); AtEOXact_HashTables(false); AtEOXact_PgStat(false); + AtEOXact_FDWXacts(false); pgstat_report_xact_timestamp(0); } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index f13f9c1..2735eff 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -23,6 +23,7 @@ #include "access/clog.h" #include "access/commit_ts.h" +#include "access/fdw_xact.h" #include "access/multixact.h" #include "access/rewriteheap.h" #include "access/subtrans.h" @@ -4905,6 +4906,7 @@ BootStrapXLOG(void) ControlFile->wal_log_hints = wal_log_hints; ControlFile->track_commit_timestamp = track_commit_timestamp; ControlFile->data_checksum_version = bootstrap_data_checksum_version; + ControlFile->max_fdw_xacts = max_fdw_xacts; /* some additional ControlFile fields are set in WriteControlFile() */ @@ -5901,6 +5903,9 @@ CheckRequiredParameterValues(void) RecoveryRequiresIntParameter("max_locks_per_transaction", max_locks_per_xact, ControlFile->max_locks_per_xact); + RecoveryRequiresIntParameter("max_prepared_foreign_transaction", + max_fdw_xacts, + ControlFile->max_fdw_xacts); } } @@ -6582,7 +6587,10 @@ StartupXLOG(void) InitRecoveryTransactionEnvironment(); if (wasShutdown) + { oldestActiveXID = PrescanPreparedTransactions(&xids, &nxids); + oldestActiveXID = PrescanFDWXacts(oldestActiveXID); + } else oldestActiveXID = checkPoint.oldestActiveXid; Assert(TransactionIdIsValid(oldestActiveXID)); @@ -7192,6 +7200,7 @@ StartupXLOG(void) /* Pre-scan prepared transactions to find out the range of XIDs present */ oldestActiveXID = PrescanPreparedTransactions(NULL, NULL); + oldestActiveXID = PrescanFDWXacts(oldestActiveXID); /* * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE @@ -7384,6 +7393,12 @@ StartupXLOG(void) RecoverPreparedTransactions(); /* + * WAL reply must have created the files for prepared foreign transactions. + * Reload the shared-memory foreign transaction state. + */ + ReadFDWXacts(); + + /* * Shutdown the recovery environment. This must occur after * RecoverPreparedTransactions(), see notes for lock_twophase_recover() */ @@ -8641,6 +8656,11 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags) CheckPointReplicationOrigin(); /* We deliberately delay 2PC checkpointing as long as possible */ CheckPointTwoPhase(checkPointRedo); + /* + * We deliberately delay foreign transaction checkpointing as long as + * possible. + */ + CheckPointFDWXact(checkPointRedo); } /* @@ -9051,7 +9071,8 @@ XLogReportParameters(void) max_worker_processes != ControlFile->max_worker_processes || max_prepared_xacts != ControlFile->max_prepared_xacts || max_locks_per_xact != ControlFile->max_locks_per_xact || - track_commit_timestamp != ControlFile->track_commit_timestamp) + track_commit_timestamp != ControlFile->track_commit_timestamp || + max_fdw_xacts != ControlFile->max_fdw_xacts) { /* * The change in number of backend slots doesn't need to be WAL-logged @@ -9072,6 +9093,7 @@ XLogReportParameters(void) xlrec.wal_level = wal_level; xlrec.wal_log_hints = wal_log_hints; xlrec.track_commit_timestamp = track_commit_timestamp; + xlrec.max_fdw_xacts = max_fdw_xacts; XLogBeginInsert(); XLogRegisterData((char *) &xlrec, sizeof(xlrec)); @@ -9087,6 +9109,7 @@ XLogReportParameters(void) ControlFile->wal_level = wal_level; ControlFile->wal_log_hints = wal_log_hints; ControlFile->track_commit_timestamp = track_commit_timestamp; + ControlFile->max_fdw_xacts = max_fdw_xacts; UpdateControlFile(); } } @@ -9275,6 +9298,7 @@ xlog_redo(XLogReaderState *record) RunningTransactionsData running; oldestActiveXID = PrescanPreparedTransactions(&xids, &nxids); + oldestActiveXID = PrescanFDWXacts(oldestActiveXID); /* * Construct a RunningTransactions snapshot representing a shut @@ -9467,6 +9491,7 @@ xlog_redo(XLogReaderState *record) ControlFile->max_locks_per_xact = xlrec.max_locks_per_xact; ControlFile->wal_level = xlrec.wal_level; ControlFile->wal_log_hints = xlrec.wal_log_hints; + ControlFile->max_fdw_xacts = xlrec.max_fdw_xacts; /* * Update minRecoveryPoint to ensure that if recovery is aborted, we diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c index e518e17..25126de 100644 --- a/src/backend/bootstrap/bootstrap.c +++ b/src/backend/bootstrap/bootstrap.c @@ -17,6 +17,7 @@ #include #include +#include "access/fdw_xact.h" #include "access/htup_details.h" #include "bootstrap/bootstrap.h" #include "catalog/index.h" diff --git a/src/backend/commands/foreigncmds.c b/src/backend/commands/foreigncmds.c index eb531af..9a10696 100644 --- a/src/backend/commands/foreigncmds.c +++ b/src/backend/commands/foreigncmds.c @@ -13,6 +13,7 @@ */ #include "postgres.h" +#include "access/fdw_xact.h" #include "access/heapam.h" #include "access/htup_details.h" #include "access/reloptions.h" @@ -1087,6 +1088,20 @@ RemoveForeignServerById(Oid srvId) if (!HeapTupleIsValid(tp)) elog(ERROR, "cache lookup failed for foreign server %u", srvId); + /* + * Check if the foreign server has any foreign transaction prepared on it. + * If there is one, and it gets dropped, we will not have any chance to + * resolve that transaction. + */ + if (fdw_xact_exists(InvalidTransactionId, MyDatabaseId, srvId, InvalidOid)) + { + Form_pg_foreign_server srvForm; + srvForm = (Form_pg_foreign_server) GETSTRUCT(tp); + ereport(ERROR, + (errmsg("server \"%s\" has unresolved prepared transactions on it", + NameStr(srvForm->srvname)))); + } + simple_heap_delete(rel, &tp->t_self); ReleaseSysCache(tp); @@ -1385,6 +1400,17 @@ RemoveUserMapping(DropUserMappingStmt *stmt) user_mapping_ddl_aclcheck(useId, srv->serverid, srv->servername); /* + * If there is a foreign prepared transaction with this user mapping, + * dropping the user mapping might result in dangling prepared + * transaction. + */ + if (fdw_xact_exists(InvalidTransactionId, MyDatabaseId, srv->serverid, + useId)) + ereport(ERROR, + (errmsg("server \"%s\" has unresolved prepared transaction for user \"%s\"", + srv->servername, MappingUserName(useId)))); + + /* * Do the deletion */ object.classId = UserMappingRelationId; diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 46cd5ba..c0f000c 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -148,6 +148,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor case RM_SPGIST_ID: case RM_BRIN_ID: case RM_COMMIT_TS_ID: + case RM_FDW_XACT_ID: case RM_REPLORIGIN_ID: case RM_GENERIC_ID: /* just deal with xid, and done */ diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index c04b17f..74f10b7 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -16,6 +16,7 @@ #include "access/clog.h" #include "access/commit_ts.h" +#include "access/fdw_xact.h" #include "access/heapam.h" #include "access/multixact.h" #include "access/nbtree.h" @@ -141,6 +142,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) size = add_size(size, BTreeShmemSize()); size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); + size = add_size(size, FDWXactShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif @@ -253,6 +255,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) BTreeShmemInit(); SyncScanShmemInit(); AsyncShmemInit(); + FDWXactShmemInit(); #ifdef EXEC_BACKEND diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index f8996cd..6589cfe 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -47,3 +47,4 @@ CommitTsLock 39 ReplicationOriginLock 40 MultiXactTruncationLock 41 OldSnapshotTimeMapLock 42 +FDWXactLock 43 diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index c5178f7..ff21090 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -27,6 +27,7 @@ #endif #include "access/commit_ts.h" +#include "access/fdw_xact.h" #include "access/gin.h" #include "access/transam.h" #include "access/twophase.h" @@ -2055,6 +2056,19 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + /* + * See also CheckRequiredParameterValues() if this parameter changes + */ + { + {"max_prepared_foreign_transactions", PGC_POSTMASTER, RESOURCES_MEM, + gettext_noop("Sets the maximum number of simultaneously prepared transactions on foreign servers."), + NULL + }, + &max_fdw_xacts, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + #ifdef LOCK_DEBUG { {"trace_lock_oidmin", PGC_SUSET, DEVELOPER_OPTIONS, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 6d0666c..8a26264 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -119,6 +119,12 @@ # (change requires restart) # Caution: it is not advisable to set max_prepared_transactions nonzero unless # you actively intend to use prepared transactions. +#max_prepared_foreign_transactions = 0 # zero disables the feature + # (change requires restart) +# Note: Increasing max_prepared_foreign_transactions costs ~600(?) bytes of shared memory +# per foreign transaction slot. +# It is not advisable to set max_prepared_foreign_transactions nonzero unless you +# actively intend to use atomic foreign transactions feature. #work_mem = 4MB # min 64kB #maintenance_work_mem = 64MB # min 1MB #replacement_sort_tuples = 150000 # limits use of replacement selection sort diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c index a978bbc..ea4682d 100644 --- a/src/bin/initdb/initdb.c +++ b/src/bin/initdb/initdb.c @@ -208,6 +208,7 @@ static const char *const subdirs[] = { "pg_snapshots", "pg_subtrans", "pg_twophase", + "pg_fdw_xact", "pg_multixact", "pg_multixact/members", "pg_multixact/offsets", diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c index 96619a2..90cceb5 100644 --- a/src/bin/pg_controldata/pg_controldata.c +++ b/src/bin/pg_controldata/pg_controldata.c @@ -296,5 +296,7 @@ main(int argc, char *argv[]) (ControlFile->float8ByVal ? _("by value") : _("by reference"))); printf(_("Data page checksum version: %u\n"), ControlFile->data_checksum_version); + printf(_("Current max_fdw_xacts setting: %d\n"), + ControlFile->max_fdw_xacts); return 0; } diff --git a/src/bin/pg_resetxlog/pg_resetxlog.c b/src/bin/pg_resetxlog/pg_resetxlog.c index 525b82b..c8cf4ce 100644 --- a/src/bin/pg_resetxlog/pg_resetxlog.c +++ b/src/bin/pg_resetxlog/pg_resetxlog.c @@ -586,6 +586,7 @@ GuessControlValues(void) ControlFile.MaxConnections = 100; ControlFile.max_worker_processes = 8; ControlFile.max_prepared_xacts = 0; + ControlFile.max_fdw_xacts = 0; ControlFile.max_locks_per_xact = 64; ControlFile.maxAlign = MAXIMUM_ALIGNOF; @@ -802,6 +803,7 @@ RewriteControlFile(void) ControlFile.MaxConnections = 100; ControlFile.max_worker_processes = 8; ControlFile.max_prepared_xacts = 0; + ControlFile.max_fdw_xacts = 0; ControlFile.max_locks_per_xact = 64; /* Now we can force the recorded xlog seg size to the right thing. */ diff --git a/src/bin/pg_xlogdump/rmgrdesc.c b/src/bin/pg_xlogdump/rmgrdesc.c index 017b9c5..edde3d5 100644 --- a/src/bin/pg_xlogdump/rmgrdesc.c +++ b/src/bin/pg_xlogdump/rmgrdesc.c @@ -8,9 +8,11 @@ #define FRONTEND 1 #include "postgres.h" +#include "access/fdw_xact.h" #include "access/brin_xlog.h" #include "access/clog.h" #include "access/commit_ts.h" +#include "access/fdw_xact.h" #include "access/generic_xlog.h" #include "access/gin.h" #include "access/gist_private.h" diff --git a/src/include/access/fdw_xact.h b/src/include/access/fdw_xact.h new file mode 100644 index 0000000..87636de --- /dev/null +++ b/src/include/access/fdw_xact.h @@ -0,0 +1,75 @@ +/* + * fdw_xact.h + * + * PostgreSQL distributed transaction manager + * + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/fdw_xact.h + */ +#ifndef FDW_XACT_H +#define FDW_XACT_H + +#include "storage/backendid.h" +#include "foreign/foreign.h" +#include "access/xlogreader.h" +#include "lib/stringinfo.h" +#include "nodes/pg_list.h" + +/* + * On disk file structure + */ +typedef struct +{ + Oid dboid; /* database oid where to find foreign server and + * user mapping + */ + TransactionId local_xid; + Oid serverid; /* foreign server where transaction takes place */ + Oid userid; /* user who initiated the foreign transaction */ + Oid umid; + uint32 fdw_xact_id_len; /* Length of the value stored in the next field */ + /* This should always be the last member */ + char fdw_xact_id[FLEXIBLE_ARRAY_MEMBER]; /* variable length array + * to store foreign transaction + * information. + */ +} FDWXactOnDiskData; + +typedef struct +{ + TransactionId xid; + Oid serverid; + Oid userid; + Oid dbid; +} FdwRemoveXlogRec; + +extern int max_fdw_xacts; + +/* Info types for logs related to FDW transactions */ +#define XLOG_FDW_XACT_INSERT 0x00 +#define XLOG_FDW_XACT_REMOVE 0x10 + +extern Size FDWXactShmemSize(void); +extern void FDWXactShmemInit(void); +extern void ReadFDWXacts(void); +extern TransactionId PrescanFDWXacts(TransactionId oldestActiveXid); +extern bool fdw_xact_has_usermapping(Oid serverid, Oid userid); +extern bool fdw_xact_has_server(Oid serverid); +extern void fdw_xact_redo(XLogReaderState *record); +extern void fdw_xact_desc(StringInfo buf, XLogReaderState *record); +extern const char *fdw_xact_identify(uint8 info); +extern void AtEOXact_FDWXacts(bool is_commit); +extern void AtPrepare_FDWXacts(void); +extern void FDWXactTwoPhaseFinish(bool isCommit, TransactionId xid); +extern bool fdw_xact_exists(TransactionId xid, Oid dboid, Oid serverid, + Oid userid); +extern void CheckPointFDWXact(XLogRecPtr redo_horizon); +extern void RegisterXactForeignServer(Oid serverid, Oid userid, bool can_prepare); +extern bool FdwTwoPhaseNeeded(void); +extern void PreCommit_FDWXacts(void); +/* For the sake of foreign transaction resolver */ +extern List *get_dbids_with_unresolved_xact(void); + +#endif /* FDW_XACT_H */ diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index a7a0ae2..86448ff 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -44,6 +44,7 @@ PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL) PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup) PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL) PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL) +PG_RMGR(RM_FDW_XACT_ID, "Foreign Transactions", fdw_xact_redo, fdw_xact_desc, fdw_xact_identify, NULL, NULL) PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL) PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL) PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL) diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 0a595cc..9a92ce7 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -213,6 +213,7 @@ typedef struct xl_parameter_change int MaxConnections; int max_worker_processes; int max_prepared_xacts; + int max_fdw_xacts; int max_locks_per_xact; int wal_level; bool wal_log_hints; diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index 0bc41ab..3413201 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -180,6 +180,7 @@ typedef struct ControlFileData int MaxConnections; int max_worker_processes; int max_prepared_xacts; + int max_fdw_xacts; int max_locks_per_xact; bool track_commit_timestamp; diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 6fed7a0..1f665a5 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5247,6 +5247,12 @@ DATA(insert OID = 3992 ( dense_rank PGNSP PGUID 12 1 0 2276 0 t f f f f f i s DESCR("rank of hypothetical row without gaps"); DATA(insert OID = 3993 ( dense_rank_final PGNSP PGUID 12 1 0 2276 0 f f f f f f i s 2 0 20 "2281 2276" "{2281,2276}" "{i,v}" _null_ _null_ _null_ hypothetical_dense_rank_final _null_ _null_ _null_ )); DESCR("aggregate final function"); +DATA(insert OID = 4109 ( pg_fdw_xact PGNSP PGUID 12 1 1000 0 0 f f f f t t v u 0 0 2249 "" "{26, 28,26,26,25,25}" "{o,o,o,o,o,o}" "{dbid, transaction,serverid,userid,status,identifier}" _null_ _null_ pg_fdw_xact _null_ _null_ _null_ )); +DESCR("view foreign transactions"); +DATA(insert OID = 4110 ( pg_fdw_resolve PGNSP PGUID 12 1 1000 0 0 f f f f t t v u 0 0 2249 "" "{26, 28,26,26,25,25}" "{o,o,o,o,o,o}" "{dbid, transaction,serverid,userid,status,identifier}" _null_ _null_ pg_fdw_resolve _null_ _null_ _null_ )); +DESCR("resolve foreign transactions"); +DATA(insert OID = 4111 ( pg_fdw_remove PGNSP PGUID 12 1 0 0 0 f f f f f f v u 4 0 2278 "28 26 26 26" _null_ _null_ "{transaction,dbid,serverid,userid}" _null_ _null_ pg_fdw_remove _null_ _null_ _null_ )); +DESCR("remove foreign transactions"); /* pg_upgrade support */ DATA(insert OID = 3582 ( binary_upgrade_set_next_pg_type_oid PGNSP PGUID 12 1 0 0 0 f f f f t f v r 1 0 2278 "26" _null_ _null_ _null_ _null_ _null_ binary_upgrade_set_next_pg_type_oid _null_ _null_ _null_ )); diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index e1b0d0d..3383651 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -12,6 +12,7 @@ #ifndef FDWAPI_H #define FDWAPI_H +#include "access/fdw_xact.h" #include "access/parallel.h" #include "nodes/execnodes.h" #include "nodes/relation.h" @@ -143,6 +144,23 @@ typedef bool (*AnalyzeForeignTable_function) (Relation relation, typedef List *(*ImportForeignSchema_function) (ImportForeignSchemaStmt *stmt, Oid serverOid); +typedef bool (*EndForeignTransaction_function) (Oid serverid, Oid userid, + Oid umid, bool is_commit); + +typedef bool (*PrepareForeignTransaction_function) (Oid serverid, Oid userid, + int prep_info_len, char *prep_info); + +typedef bool (*ResolvePreparedForeignTransaction_function) (Oid serverid, + Oid userid, + Oid umid, + bool is_commit, + int prep_info_len, + char *prep_info); + +typedef char *(*GetPrepareId_function) (Oid serverid, Oid userid, + int *prep_info_len); + + typedef Size (*EstimateDSMForeignScan_function) (ForeignScanState *node, ParallelContext *pcxt); typedef void (*InitializeDSMForeignScan_function) (ForeignScanState *node, @@ -219,6 +237,12 @@ typedef struct FdwRoutine /* Support functions for IMPORT FOREIGN SCHEMA */ ImportForeignSchema_function ImportForeignSchema; + /* Supprot functions for foreign transactions */ + GetPrepareId_function GetPrepareId; + EndForeignTransaction_function EndForeignTransaction; + PrepareForeignTransaction_function PrepareForeignTransaction; + ResolvePreparedForeignTransaction_function ResolvePreparedForeignTransaction; + /* Support functions for parallelism under Gather node */ IsForeignScanParallelSafe_function IsForeignScanParallelSafe; EstimateDSMForeignScan_function EstimateDSMForeignScan; diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index f576f05..f49334b 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -251,11 +251,12 @@ extern PGPROC *PreparedXactProcs; * We set aside some extra PGPROC structures for auxiliary processes, * ie things that aren't full-fledged backends but need shmem access. * - * Background writer, checkpointer and WAL writer run during normal operation. + * Background writer, checkpointer, WAL writer and foreign transction resolver + * run during normal operation. * Startup process and WAL receiver also consume 2 slots, but WAL writer is * launched only after startup has exited, so we only need 4 slots. */ -#define NUM_AUXILIARY_PROCS 4 +#define NUM_AUXILIARY_PROCS 5 /* configurable options */ diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h index 40e25c8..d047bd4 100644 --- a/src/include/utils/builtins.h +++ b/src/include/utils/builtins.h @@ -1331,4 +1331,8 @@ extern Datum pg_prepared_statement(PG_FUNCTION_ARGS); /* utils/mmgr/portalmem.c */ extern Datum pg_cursor(PG_FUNCTION_ARGS); +/* access/transam/fdw_xact.c */ +extern Datum pg_fdw_xact(PG_FUNCTION_ARGS); +extern Datum pg_fdw_resolve(PG_FUNCTION_ARGS); +extern Datum pg_fdw_remove(PG_FUNCTION_ARGS); #endif /* BUILTINS_H */ diff --git a/src/test/regress/pg_regress.c b/src/test/regress/pg_regress.c index 574f5b8..7d96d8d 100644 --- a/src/test/regress/pg_regress.c +++ b/src/test/regress/pg_regress.c @@ -2233,9 +2233,11 @@ regression_main(int argc, char *argv[], init_function ifunc, test_function tfunc * Adjust the default postgresql.conf for regression testing. The user * can specify a file to be appended; in any case we expand logging * and set max_prepared_transactions to enable testing of prepared - * xacts. (Note: to reduce the probability of unexpected shmmax - * failures, don't set max_prepared_transactions any higher than - * actually needed by the prepared_xacts regression test.) + * xacts. We also set max_fdw_transctions to enable testing of atomic + * foreign transactions. (Note: to reduce the probability of unexpected + * shmmax failures, don't set max_prepared_transactions or + * max_prepared_foreign_transactions any higher than actually needed by the + * corresponding regression tests.). */ snprintf(buf, sizeof(buf), "%s/data/postgresql.conf", temp_instance); pg_conf = fopen(buf, "a"); @@ -2249,7 +2251,8 @@ regression_main(int argc, char *argv[], init_function ifunc, test_function tfunc fputs("log_checkpoints = on\n", pg_conf); fputs("log_lock_waits = on\n", pg_conf); fputs("log_temp_files = 128kB\n", pg_conf); - fputs("max_prepared_transactions = 2\n", pg_conf); + fputs("max_prepared_transactions = 3\n", pg_conf); + fputs("max_prepared_foreign_transactions = 2\n", pg_conf); for (sl = temp_configs; sl != NULL; sl = sl->next) { -- 2.8.1