*** a/src/backend/access/transam/twophase.c --- b/src/backend/access/transam/twophase.c *************** *** 1298,1314 **** FinishPreparedTransaction(const char *gid, bool isCommit) * callbacks will release the locks the transaction held. */ if (isCommit) RecordTransactionCommitPrepared(xid, hdr->nsubxacts, children, hdr->ncommitrels, commitrels, hdr->ninvalmsgs, invalmsgs, hdr->initfileinval); else RecordTransactionAbortPrepared(xid, hdr->nsubxacts, children, hdr->nabortrels, abortrels); ! ! ProcArrayRemove(&gxact->proc, latestXid); /* * In case we fail while running the callbacks, mark the gxact invalid so --- 1298,1318 ---- * callbacks will release the locks the transaction held. */ if (isCommit) + { RecordTransactionCommitPrepared(xid, hdr->nsubxacts, children, hdr->ncommitrels, commitrels, hdr->ninvalmsgs, invalmsgs, hdr->initfileinval); + CommitRemove2PC(&gxact->proc, latestXid); + } else + { RecordTransactionAbortPrepared(xid, hdr->nsubxacts, children, hdr->nabortrels, abortrels); ! ProcArrayRemove(&gxact->proc, latestXid); ! } /* * In case we fail while running the callbacks, mark the gxact invalid so *** a/src/backend/access/transam/xact.c --- b/src/backend/access/transam/xact.c *************** *** 1878,1884 **** CommitTransaction(void) * must be done _before_ releasing locks we hold and _after_ * RecordTransactionCommit. */ ! ProcArrayEndTransaction(MyProc, latestXid); /* * This is all post-commit cleanup. Note that if an error is raised here, --- 1878,1884 ---- * must be done _before_ releasing locks we hold and _after_ * RecordTransactionCommit. */ ! CommitEndTransaction(MyProc, latestXid); /* * This is all post-commit cleanup. Note that if an error is raised here, *** a/src/backend/storage/lmgr/predicate.c --- b/src/backend/storage/lmgr/predicate.c *************** *** 120,125 **** --- 120,128 ---- * - The same lock protects a target, all locks on that target, and * the linked list of locks on the target.. * - When more than one is needed, acquire in ascending order. + & - There are times when this lock must be held concurrently with + * ProcArrayLock (acquired in called functions, not directly). + * In such cases this lock must be acquired first. * * SerializableXactHashLock * - Protects both PredXact and SerializableXidHash. *************** *** 158,163 **** --- 161,168 ---- * PredicateLockPageCombine(Relation relation, BlockNumber oldblkno, * BlockNumber newblkno); * TransferPredicateLocksToHeapRelation(Relation relation) + * CommitEndTransaction(PGPROC *proc, TransactionId latestXid) + * CommitRemove2PC(PGPROC *proc, TransactionId latestXid) * ReleasePredicateLocks(bool isCommit) * * conflict detection (may also trigger rollback) *************** *** 252,261 **** * The PREPARED flag remains set after commit, so SxactIsCommitted * implies SxactIsPrepared. */ ! #define SxactIsCommitted(sxact) (((sxact)->flags & SXACT_FLAG_COMMITTED) != 0) ! #define SxactIsPrepared(sxact) (((sxact)->flags & SXACT_FLAG_PREPARED) != 0) ! #define SxactIsRolledBack(sxact) (((sxact)->flags & SXACT_FLAG_ROLLED_BACK) != 0) #define SxactIsDoomed(sxact) (((sxact)->flags & SXACT_FLAG_DOOMED) != 0) #define SxactIsReadOnly(sxact) (((sxact)->flags & SXACT_FLAG_READ_ONLY) != 0) #define SxactHasSummaryConflictIn(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_IN) != 0) #define SxactHasSummaryConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_OUT) != 0) --- 257,267 ---- * The PREPARED flag remains set after commit, so SxactIsCommitted * implies SxactIsPrepared. */ ! #define SxactIsCommitted(sxact) ((sxact)->commitSeqNo != InvalidSerCommitSeqNo) ! #define SxactIsDoomed(sxact) (((sxact)->flags & SXACT_FLAG_DOOMED) != 0) + #define SxactIsPrepared(sxact) (((sxact)->flags & SXACT_FLAG_PREPARED) != 0) + #define SxactIsReleased(sxact) (((sxact)->flags & SXACT_FLAG_RELEASED) != 0) #define SxactIsReadOnly(sxact) (((sxact)->flags & SXACT_FLAG_READ_ONLY) != 0) #define SxactHasSummaryConflictIn(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_IN) != 0) #define SxactHasSummaryConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_OUT) != 0) *************** *** 1194,1200 **** InitPredicateLocks(void) PredXact->OldCommittedSxact->topXid = InvalidTransactionId; PredXact->OldCommittedSxact->finishedBefore = InvalidTransactionId; PredXact->OldCommittedSxact->xmin = InvalidTransactionId; ! PredXact->OldCommittedSxact->flags = SXACT_FLAG_COMMITTED; PredXact->OldCommittedSxact->pid = 0; } /* This never changes, so let's keep a local copy. */ --- 1200,1207 ---- PredXact->OldCommittedSxact->topXid = InvalidTransactionId; PredXact->OldCommittedSxact->finishedBefore = InvalidTransactionId; PredXact->OldCommittedSxact->xmin = InvalidTransactionId; ! PredXact->OldCommittedSxact->flags = ! SXACT_FLAG_PREPARED | SXACT_FLAG_RELEASED; PredXact->OldCommittedSxact->pid = 0; } /* This never changes, so let's keep a local copy. */ *************** *** 1669,1676 **** RegisterSerializableTransactionInt(Snapshot snapshot) othersxact != NULL; othersxact = NextPredXact(othersxact)) { ! if (!SxactIsOnFinishedList(othersxact) && ! !SxactIsReadOnly(othersxact)) { SetPossibleUnsafeConflict(sxact, othersxact); } --- 1676,1684 ---- othersxact != NULL; othersxact = NextPredXact(othersxact)) { ! if (!SxactIsCommitted(othersxact) ! && !SxactIsDoomed(othersxact) ! && !SxactIsReadOnly(othersxact)) { SetPossibleUnsafeConflict(sxact, othersxact); } *************** *** 3052,3059 **** SetNewSxactGlobalXmin(void) for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact)) { ! if (!SxactIsRolledBack(sxact) ! && !SxactIsCommitted(sxact) && sxact != OldCommittedSxact) { Assert(sxact->xmin != InvalidTransactionId); --- 3060,3066 ---- for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact)) { ! if (!SxactIsReleased(sxact) && sxact != OldCommittedSxact) { Assert(sxact->xmin != InvalidTransactionId); *************** *** 3074,3079 **** SetNewSxactGlobalXmin(void) --- 3081,3167 ---- } /* + * CommitEndTransaction + * + * For correct SERIALIZABLE semantics, the commitSeqNo must appear to be set + * atomically with the work of the transaction becoming visible to other + * transactions. + * + * Since a transaction without an xid has not made any changes which will be + * visible to other transactions, it doesn't need such treatment. These + * transactions will be flagged as committed and assigned a commitSeqNo in + * ReleasePredicateLocks(). This is after the actual commit, but the timing + * for these transactions isn't critical. Since they are explicitly or + * implicitly READ ONLY, it is the start time of the transaction which is + * critical for correct detection of serialization failures; the commit + * information is only really needed for cleanup purposes. + */ + void + CommitEndTransaction(PGPROC *proc, TransactionId latestXid) + { + if (MySerializableXact == InvalidSerializableXact + || !TransactionIdIsValid(latestXid)) + { + ProcArrayEndTransaction(proc, latestXid); + return; + } + + LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); + + ProcArrayEndTransaction(proc, latestXid); + + MySerializableXact->commitSeqNo = ++(PredXact->LastSxactCommitSeqNo); + + LWLockRelease(SerializableXactHashLock); + } + + /* + * CommitRemove2PC + * + * Like CommitEndTransaction, except for a prepared transaction which is being + * committed. + */ + void + CommitRemove2PC(PGPROC *proc, TransactionId latestXid) + { + SERIALIZABLEXIDTAG sxidtag; + SERIALIZABLEXID *sxid; + + if (!TransactionIdIsValid(latestXid)) + { + ProcArrayRemove(proc, latestXid); + return; + } + + LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); + + sxidtag.xid = latestXid; + sxid = (SERIALIZABLEXID *) + hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL); + + /* + * xid will not be found if it wasn't a serializable transaction. In that + * case, make the transaction's work visible while not holding + * SerializableXactHashLock. + */ + if (sxid == NULL) + { + LWLockRelease(SerializableXactHashLock); + ProcArrayRemove(proc, latestXid); + return; + } + + ProcArrayRemove(proc, latestXid); + + MySerializableXact = sxid->myXact; + MyXactDidWrite = true; /* conservatively assume that we wrote + * something */ + MySerializableXact->commitSeqNo = ++(PredXact->LastSxactCommitSeqNo); + + LWLockRelease(SerializableXactHashLock); + } + + /* * ReleasePredicateLocks * * Releases predicate locks based on completion of the current transaction, *************** *** 3116,3126 **** ReleasePredicateLocks(bool isCommit) return; } - Assert(!isCommit || SxactIsPrepared(MySerializableXact)); - Assert(!isCommit || !SxactIsDoomed(MySerializableXact)); - Assert(!SxactIsCommitted(MySerializableXact)); - Assert(!SxactIsRolledBack(MySerializableXact)); - /* may not be serializable during COMMIT/ROLLBACK PREPARED */ if (MySerializableXact->pid != 0) Assert(IsolationIsSerializable()); --- 3204,3209 ---- *************** *** 3132,3137 **** ReleasePredicateLocks(bool isCommit) --- 3215,3224 ---- LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); + Assert(!SxactIsReleased(MySerializableXact)); + Assert(!isCommit || SxactIsPrepared(MySerializableXact)); + Assert(!isCommit || !SxactIsDoomed(MySerializableXact)); + /* * We don't hold XidGenLock lock here, assuming that TransactionId is * atomic! *************** *** 3145,3158 **** ReleasePredicateLocks(bool isCommit) */ MySerializableXact->finishedBefore = ShmemVariableCache->nextXid; /* * If it's not a commit it's a rollback, and we can clear our locks * immediately. */ if (isCommit) { ! MySerializableXact->flags |= SXACT_FLAG_COMMITTED; ! MySerializableXact->commitSeqNo = ++(PredXact->LastSxactCommitSeqNo); /* Recognize implicit read-only transaction (commit without write). */ if (!MyXactDidWrite) MySerializableXact->flags |= SXACT_FLAG_READ_ONLY; --- 3232,3249 ---- */ MySerializableXact->finishedBefore = ShmemVariableCache->nextXid; + MySerializableXact->flags |= SXACT_FLAG_RELEASED; + /* * If it's not a commit it's a rollback, and we can clear our locks * immediately. */ if (isCommit) { ! if (MySerializableXact->commitSeqNo == InvalidSerCommitSeqNo) ! { ! MySerializableXact->commitSeqNo = ++(PredXact->LastSxactCommitSeqNo); ! } /* Recognize implicit read-only transaction (commit without write). */ if (!MyXactDidWrite) MySerializableXact->flags |= SXACT_FLAG_READ_ONLY; *************** *** 3165,3178 **** ReleasePredicateLocks(bool isCommit) * other transactions that conflict with it. Note that this flag might * already be set, if another backend marked this transaction for * abort. - * - * The ROLLED_BACK flag further indicates that ReleasePredicateLocks - * has been called, and so the SerializableXact is eligible for - * cleanup. This means it should not be considered when calculating - * SxactGlobalXmin. */ MySerializableXact->flags |= SXACT_FLAG_DOOMED; ! MySerializableXact->flags |= SXACT_FLAG_ROLLED_BACK; /* * If the transaction was previously prepared, but is now failing due * to a ROLLBACK PREPARED or (hopefully very rare) error after the --- 3256,3264 ---- * other transactions that conflict with it. Note that this flag might * already be set, if another backend marked this transaction for * abort. */ MySerializableXact->flags |= SXACT_FLAG_DOOMED; ! /* * If the transaction was previously prepared, but is now failing due * to a ROLLBACK PREPARED or (hopefully very rare) error after the *************** *** 3260,3274 **** ReleasePredicateLocks(bool isCommit) && !SxactIsReadOnly(MySerializableXact) && SxactIsCommitted(conflict->sxactIn)) { ! if ((MySerializableXact->flags & SXACT_FLAG_CONFLICT_OUT) == 0 || conflict->sxactIn->commitSeqNo < MySerializableXact->SeqNo.earliestOutConflictCommit) MySerializableXact->SeqNo.earliestOutConflictCommit = conflict->sxactIn->commitSeqNo; MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT; } ! if (!isCommit ! || SxactIsCommitted(conflict->sxactIn) ! || (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredXact->LastSxactCommitSeqNo)) ReleaseRWConflict(conflict); conflict = nextConflict; --- 3346,3358 ---- && !SxactIsReadOnly(MySerializableXact) && SxactIsCommitted(conflict->sxactIn)) { ! if (!SxactHasConflictOut(MySerializableXact) || conflict->sxactIn->commitSeqNo < MySerializableXact->SeqNo.earliestOutConflictCommit) MySerializableXact->SeqNo.earliestOutConflictCommit = conflict->sxactIn->commitSeqNo; MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT; } ! if (!isCommit || SxactIsCommitted(conflict->sxactIn)) ReleaseRWConflict(conflict); conflict = nextConflict; *************** *** 3290,3296 **** ReleasePredicateLocks(bool isCommit) offsetof(RWConflictData, inLink)); if (!isCommit ! || SxactIsCommitted(conflict->sxactOut) || SxactIsReadOnly(conflict->sxactOut)) ReleaseRWConflict(conflict); --- 3374,3380 ---- offsetof(RWConflictData, inLink)); if (!isCommit ! || SxactIsReleased(conflict->sxactOut) || SxactIsReadOnly(conflict->sxactOut)) ReleaseRWConflict(conflict); *************** *** 3556,3562 **** ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, nextConflict; Assert(sxact != NULL); ! Assert(SxactIsRolledBack(sxact) || SxactIsCommitted(sxact)); Assert(LWLockHeldByMe(SerializableFinishedListLock)); /* --- 3640,3646 ---- nextConflict; Assert(sxact != NULL); ! Assert(SxactIsReleased(sxact)); Assert(LWLockHeldByMe(SerializableFinishedListLock)); /* *************** *** 4702,4722 **** PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit) SERIALIZABLEXID *sxid; SERIALIZABLEXIDTAG sxidtag; ! sxidtag.xid = xid; ! LWLockAcquire(SerializableXactHashLock, LW_SHARED); ! sxid = (SERIALIZABLEXID *) ! hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL); ! LWLockRelease(SerializableXactHashLock); ! /* xid will not be found if it wasn't a serializable transaction */ ! if (sxid == NULL) ! return; ! /* Release its locks */ ! MySerializableXact = sxid->myXact; ! MyXactDidWrite = true; /* conservatively assume that we wrote * something */ ReleasePredicateLocks(isCommit); } --- 4786,4810 ---- SERIALIZABLEXID *sxid; SERIALIZABLEXIDTAG sxidtag; ! if (MySerializableXact == InvalidSerializableXact) ! { ! sxidtag.xid = xid; ! LWLockAcquire(SerializableXactHashLock, LW_SHARED); ! sxid = (SERIALIZABLEXID *) ! hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL); ! LWLockRelease(SerializableXactHashLock); ! /* xid will not be found if it wasn't a serializable transaction */ ! if (sxid == NULL) ! return; ! /* Release its locks */ ! MySerializableXact = sxid->myXact; ! MyXactDidWrite = true; /* conservatively assume that we wrote * something */ + } + ReleasePredicateLocks(isCommit); } *** a/src/include/storage/predicate.h --- b/src/include/storage/predicate.h *************** *** 14,19 **** --- 14,20 ---- #ifndef PREDICATE_H #define PREDICATE_H + #include "storage/proc.h" #include "utils/relcache.h" #include "utils/snapshot.h" *************** *** 50,55 **** extern void PredicateLockTuple(Relation relation, HeapTuple tuple, Snapshot snap --- 51,58 ---- extern void PredicateLockPageSplit(Relation relation, BlockNumber oldblkno, BlockNumber newblkno); extern void PredicateLockPageCombine(Relation relation, BlockNumber oldblkno, BlockNumber newblkno); extern void TransferPredicateLocksToHeapRelation(Relation relation); + extern void CommitEndTransaction(PGPROC *proc, TransactionId latestXid); + extern void CommitRemove2PC(PGPROC *proc, TransactionId latestXid); extern void ReleasePredicateLocks(bool isCommit); /* conflict detection (may also trigger rollback) */ *** a/src/include/storage/predicate_internals.h --- b/src/include/storage/predicate_internals.h *************** *** 90,111 **** typedef struct SERIALIZABLEXACT int pid; /* pid of associated process */ } SERIALIZABLEXACT; ! #define SXACT_FLAG_COMMITTED 0x00000001 /* already committed */ #define SXACT_FLAG_PREPARED 0x00000002 /* about to commit */ ! #define SXACT_FLAG_ROLLED_BACK 0x00000004 /* already rolled back */ ! #define SXACT_FLAG_DOOMED 0x00000008 /* will roll back */ /* * The following flag actually means that the flagged transaction has a * conflict out *to a transaction which committed ahead of it*. It's hard * to get that into a name of a reasonable length. */ ! #define SXACT_FLAG_CONFLICT_OUT 0x00000010 ! #define SXACT_FLAG_READ_ONLY 0x00000020 ! #define SXACT_FLAG_DEFERRABLE_WAITING 0x00000040 ! #define SXACT_FLAG_RO_SAFE 0x00000080 ! #define SXACT_FLAG_RO_UNSAFE 0x00000100 ! #define SXACT_FLAG_SUMMARY_CONFLICT_IN 0x00000200 ! #define SXACT_FLAG_SUMMARY_CONFLICT_OUT 0x00000400 /* * The following types are used to provide an ad hoc list for holding --- 90,110 ---- int pid; /* pid of associated process */ } SERIALIZABLEXACT; ! #define SXACT_FLAG_DOOMED 0x00000001 /* will roll back */ #define SXACT_FLAG_PREPARED 0x00000002 /* about to commit */ ! #define SXACT_FLAG_RELEASED 0x00000004 /* OK to clean up */ /* * The following flag actually means that the flagged transaction has a * conflict out *to a transaction which committed ahead of it*. It's hard * to get that into a name of a reasonable length. */ ! #define SXACT_FLAG_CONFLICT_OUT 0x00000008 ! #define SXACT_FLAG_READ_ONLY 0x00000010 ! #define SXACT_FLAG_DEFERRABLE_WAITING 0x00000020 ! #define SXACT_FLAG_RO_SAFE 0x00000040 ! #define SXACT_FLAG_RO_UNSAFE 0x00000080 ! #define SXACT_FLAG_SUMMARY_CONFLICT_IN 0x00000100 ! #define SXACT_FLAG_SUMMARY_CONFLICT_OUT 0x00000200 /* * The following types are used to provide an ad hoc list for holding *** a/src/test/regress/expected/prepared_xacts.out --- b/src/test/regress/expected/prepared_xacts.out *************** *** 134,140 **** SELECT * FROM pxtest1; -- This should fail, because the two transactions have a write-skew anomaly INSERT INTO pxtest1 VALUES ('fff'); ERROR: could not serialize access due to read/write dependencies among transactions ! DETAIL: Cancelled on identification as a pivot, during write. HINT: The transaction might succeed if retried. PREPARE TRANSACTION 'foo5'; SELECT gid FROM pg_prepared_xacts; --- 134,140 ---- -- This should fail, because the two transactions have a write-skew anomaly INSERT INTO pxtest1 VALUES ('fff'); ERROR: could not serialize access due to read/write dependencies among transactions ! DETAIL: Canceled on identification as a pivot, during write. HINT: The transaction might succeed if retried. PREPARE TRANSACTION 'foo5'; SELECT gid FROM pg_prepared_xacts;