diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index c50f9c4bf6..e9751aa2f6 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -195,7 +195,10 @@ typedef struct TwoPhaseStateData static TwoPhaseStateData *TwoPhaseState; /* - * Global transaction entry currently locked by us, if any. + * Global transaction entry currently locked by us, if any. Note that any + * access to the entry pointed to by this variable must be protected by + * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be + * (since it's just local memory). */ static GlobalTransaction MyLockedGxact = NULL; @@ -338,18 +341,13 @@ AtAbort_Twophase(void) * resources held by the transaction yet. In those cases, the in-memory * state can be wrong, but it's too late to back out. */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); if (!MyLockedGxact->valid) - { RemoveGXact(MyLockedGxact); - } else - { - LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); - MyLockedGxact->locking_backend = InvalidBackendId; + LWLockRelease(TwoPhaseStateLock); - LWLockRelease(TwoPhaseStateLock); - } MyLockedGxact = NULL; } @@ -454,6 +452,8 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid, PGXACT *pgxact; int i; + Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); + Assert(gxact != NULL); proc = &ProcGlobal->allProcs[gxact->pgprocno]; pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; @@ -530,15 +530,19 @@ GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts, /* * MarkAsPrepared * Mark the GXACT as fully valid, and enter it into the global ProcArray. + * + * lock_held indicates whether caller already holds TwoPhaseStateLock. */ static void -MarkAsPrepared(GlobalTransaction gxact) +MarkAsPrepared(GlobalTransaction gxact, bool lock_held) { /* Lock here may be overkill, but I'm not convinced of that ... */ - LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + if (!lock_held) + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); Assert(!gxact->valid); gxact->valid = true; - LWLockRelease(TwoPhaseStateLock); + if (!lock_held) + LWLockRelease(TwoPhaseStateLock); /* * Put it into the global ProcArray so TransactionIdIsInProgress considers @@ -632,7 +636,7 @@ RemoveGXact(GlobalTransaction gxact) { int i; - LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { @@ -646,14 +650,10 @@ RemoveGXact(GlobalTransaction gxact) gxact->next = TwoPhaseState->freeGXacts; TwoPhaseState->freeGXacts = gxact; - LWLockRelease(TwoPhaseStateLock); - return; } } - LWLockRelease(TwoPhaseStateLock); - elog(ERROR, "failed to find %p in GlobalTransaction array", gxact); } @@ -1127,7 +1127,7 @@ EndPrepare(GlobalTransaction gxact) * the xact crashed. Instead we have a window where the same XID appears * twice in ProcArray, which is OK. */ - MarkAsPrepared(gxact); + MarkAsPrepared(gxact, false); /* * Now we can mark ourselves as out of the commit critical section: a @@ -1506,7 +1506,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit) if (gxact->ondisk) RemoveTwoPhaseFile(xid, true); + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); RemoveGXact(gxact); + LWLockRelease(TwoPhaseStateLock); MyLockedGxact = NULL; pfree(buf); @@ -1734,6 +1736,7 @@ restoreTwoPhaseData(void) struct dirent *clde; cldir = AllocateDir(TWOPHASE_DIR); + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL) { if (strlen(clde->d_name) == 8 && @@ -1752,6 +1755,7 @@ restoreTwoPhaseData(void) PrepareRedoAdd(buf, InvalidXLogRecPtr, InvalidXLogRecPtr); } } + LWLockRelease(TwoPhaseStateLock); FreeDir(cldir); } @@ -1792,7 +1796,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) int allocsize = 0; int i; - LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { TransactionId xid; @@ -1867,7 +1871,7 @@ StandbyRecoverPreparedTransactions(void) { int i; - LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { TransactionId xid; @@ -1893,7 +1897,8 @@ StandbyRecoverPreparedTransactions(void) * Scan the shared memory entries of TwoPhaseState and reload the state for * each prepared transaction (reacquire locks, etc). * - * This is run during database startup. + * This is run at the end of recovery, but before we allow backends to write + * WAL. * * At the end of recovery the way we take snapshots will change. We now need * to mark all running transactions with their full SubTransSetParent() info @@ -1907,9 +1912,7 @@ RecoverPreparedTransactions(void) { int i; - /* - * Don't need a lock in the recovery phase. - */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { TransactionId xid; @@ -1955,7 +1958,6 @@ RecoverPreparedTransactions(void) * Recreate its GXACT and dummy PGPROC. But, check whether it was * added in redo and already has a shmem entry for it. */ - LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); MarkAsPreparingGuts(gxact, xid, gid, hdr->prepared_at, hdr->owner, hdr->database); @@ -1963,13 +1965,13 @@ RecoverPreparedTransactions(void) /* recovered, so reset the flag for entries generated by redo */ gxact->inredo = false; + GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids); + MarkAsPrepared(gxact, true); + LWLockRelease(TwoPhaseStateLock); - GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids); - MarkAsPrepared(gxact); - /* - * Recover other state (notably locks) using resource managers + * Recover other state (notably locks) using resource managers. */ ProcessRecords(bufptr, xid, twophase_recover_callbacks); @@ -1988,7 +1990,11 @@ RecoverPreparedTransactions(void) PostPrepare_Twophase(); pfree(buf); + + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); } + + LWLockRelease(TwoPhaseStateLock); } /* @@ -2014,6 +2020,8 @@ ProcessTwoPhaseBuffer(TransactionId xid, TwoPhaseFileHeader *hdr; int i; + Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); + if (!fromdisk) Assert(prepare_start_lsn != InvalidXLogRecPtr); @@ -2030,8 +2038,8 @@ ProcessTwoPhaseBuffer(TransactionId xid, else { ereport(WARNING, - (errmsg("removing stale two-phase state from" - " shared memory for \"%u\"", xid))); + (errmsg("removing stale two-phase state from shared memory for \"%u\"", + xid))); PrepareRedoRemove(xid, true); } return NULL; @@ -2308,6 +2316,7 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn) const char *gid; GlobalTransaction gxact; + Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); Assert(RecoveryInProgress()); bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader)); @@ -2324,7 +2333,6 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn) * that it got added in the redo phase */ - LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); /* Get a free gxact from the freelist */ if (TwoPhaseState->freeGXacts == NULL) ereport(ERROR, @@ -2350,17 +2358,17 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn) Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts); TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact; - LWLockRelease(TwoPhaseStateLock); - - elog(DEBUG2, "Adding 2PC data to shared memory %u", gxact->xid); + elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid); } /* * PrepareRedoRemove * - * Remove the corresponding gxact entry from TwoPhaseState. Also - * remove the 2PC file if a prepared transaction was saved via - * an earlier checkpoint. + * Remove the corresponding gxact entry from TwoPhaseState. Also remove + * the 2PC file if a prepared transaction was saved via an earlier checkpoint. + * + * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState + * is updated. */ void PrepareRedoRemove(TransactionId xid, bool giveWarning) @@ -2369,9 +2377,9 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) int i; bool found = false; + Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); Assert(RecoveryInProgress()); - LWLockAcquire(TwoPhaseStateLock, LW_SHARED); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { gxact = TwoPhaseState->prepXacts[i]; @@ -2383,7 +2391,6 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) break; } } - LWLockRelease(TwoPhaseStateLock); /* * Just leave if there is nothing, this is expected during WAL replay. @@ -2394,7 +2401,7 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) /* * And now we can clean up any files we may have left. */ - elog(DEBUG2, "Removing 2PC data from shared memory %u", xid); + elog(DEBUG2, "removing 2PC data for transaction %u", xid); if (gxact->ondisk) RemoveTwoPhaseFile(xid, giveWarning); RemoveGXact(gxact); diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 7e8c598f2a..e09696c37f 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5351,6 +5351,8 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, int i; TimestampTz commit_time; + Assert(TransactionIdIsValid(xid)); + max_xid = TransactionIdLatest(xid, parsed->nsubxacts, parsed->subxacts); /* @@ -5518,6 +5520,8 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid) int i; TransactionId max_xid; + Assert(TransactionIdIsValid(xid)); + /* * Make sure nextXid is beyond any XID mentioned in the record. * @@ -5598,51 +5602,49 @@ xact_redo(XLogReaderState *record) /* Backup blocks are not used in xact records */ Assert(!XLogRecHasAnyBlockRefs(record)); - if (info == XLOG_XACT_COMMIT || info == XLOG_XACT_COMMIT_PREPARED) + if (info == XLOG_XACT_COMMIT) { xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record); xl_xact_parsed_commit parsed; - ParseCommitRecord(XLogRecGetInfo(record), xlrec, - &parsed); - - if (info == XLOG_XACT_COMMIT) - { - Assert(!TransactionIdIsValid(parsed.twophase_xid)); - xact_redo_commit(&parsed, XLogRecGetXid(record), - record->EndRecPtr, XLogRecGetOrigin(record)); - } - else - { - Assert(TransactionIdIsValid(parsed.twophase_xid)); - xact_redo_commit(&parsed, parsed.twophase_xid, - record->EndRecPtr, XLogRecGetOrigin(record)); - - /* Delete TwoPhaseState gxact entry and/or 2PC file. */ - PrepareRedoRemove(parsed.twophase_xid, false); - } + ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed); + xact_redo_commit(&parsed, XLogRecGetXid(record), + record->EndRecPtr, XLogRecGetOrigin(record)); } - else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED) + else if (info == XLOG_XACT_COMMIT_PREPARED) + { + xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record); + xl_xact_parsed_commit parsed; + + ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed); + xact_redo_commit(&parsed, parsed.twophase_xid, + record->EndRecPtr, XLogRecGetOrigin(record)); + + /* Delete TwoPhaseState gxact entry and/or 2PC file. */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + PrepareRedoRemove(parsed.twophase_xid, false); + LWLockRelease(TwoPhaseStateLock); + } + else if (info == XLOG_XACT_ABORT) { xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record); xl_xact_parsed_abort parsed; - ParseAbortRecord(XLogRecGetInfo(record), xlrec, - &parsed); + ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed); + xact_redo_abort(&parsed, XLogRecGetXid(record)); + } + else if (info == XLOG_XACT_ABORT_PREPARED) + { + xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record); + xl_xact_parsed_abort parsed; - if (info == XLOG_XACT_ABORT) - { - Assert(!TransactionIdIsValid(parsed.twophase_xid)); - xact_redo_abort(&parsed, XLogRecGetXid(record)); - } - else - { - Assert(TransactionIdIsValid(parsed.twophase_xid)); - xact_redo_abort(&parsed, parsed.twophase_xid); + ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed); + xact_redo_abort(&parsed, parsed.twophase_xid); - /* Delete TwoPhaseState gxact entry and/or 2PC file. */ - PrepareRedoRemove(parsed.twophase_xid, false); - } + /* Delete TwoPhaseState gxact entry and/or 2PC file. */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + PrepareRedoRemove(parsed.twophase_xid, false); + LWLockRelease(TwoPhaseStateLock); } else if (info == XLOG_XACT_PREPARE) { @@ -5650,9 +5652,11 @@ xact_redo(XLogReaderState *record) * Store xid and start/end pointers of the WAL record in TwoPhaseState * gxact entry. */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); PrepareRedoAdd(XLogRecGetData(record), record->ReadRecPtr, record->EndRecPtr); + LWLockRelease(TwoPhaseStateLock); } else if (info == XLOG_XACT_ASSIGNMENT) {