diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index c50f9c4bf6..bd729923f1 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -195,7 +195,10 @@ typedef struct TwoPhaseStateData static TwoPhaseStateData *TwoPhaseState; /* - * Global transaction entry currently locked by us, if any. + * Global transaction entry currently locked by us, if any. Note that any + * access to the entry pointed to by this variable must be protected by + * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be + * (since it's just local memory). */ static GlobalTransaction MyLockedGxact = NULL; @@ -338,18 +341,13 @@ AtAbort_Twophase(void) * resources held by the transaction yet. In those cases, the in-memory * state can be wrong, but it's too late to back out. */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); if (!MyLockedGxact->valid) - { RemoveGXact(MyLockedGxact); - } else - { - LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); - MyLockedGxact->locking_backend = InvalidBackendId; + LWLockRelease(TwoPhaseStateLock); - LWLockRelease(TwoPhaseStateLock); - } MyLockedGxact = NULL; } @@ -454,6 +452,8 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid, PGXACT *pgxact; int i; + Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); + Assert(gxact != NULL); proc = &ProcGlobal->allProcs[gxact->pgprocno]; pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; @@ -632,7 +632,7 @@ RemoveGXact(GlobalTransaction gxact) { int i; - LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { @@ -646,14 +646,10 @@ RemoveGXact(GlobalTransaction gxact) gxact->next = TwoPhaseState->freeGXacts; TwoPhaseState->freeGXacts = gxact; - LWLockRelease(TwoPhaseStateLock); - return; } } - LWLockRelease(TwoPhaseStateLock); - elog(ERROR, "failed to find %p in GlobalTransaction array", gxact); } @@ -1506,7 +1502,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit) if (gxact->ondisk) RemoveTwoPhaseFile(xid, true); + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); RemoveGXact(gxact); + LWLockRelease(TwoPhaseStateLock); MyLockedGxact = NULL; pfree(buf); @@ -1734,6 +1732,7 @@ restoreTwoPhaseData(void) struct dirent *clde; cldir = AllocateDir(TWOPHASE_DIR); + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL) { if (strlen(clde->d_name) == 8 && @@ -1752,6 +1751,7 @@ restoreTwoPhaseData(void) PrepareRedoAdd(buf, InvalidXLogRecPtr, InvalidXLogRecPtr); } } + LWLockRelease(TwoPhaseStateLock); FreeDir(cldir); } @@ -1781,6 +1781,9 @@ restoreTwoPhaseData(void) * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all * top-level xids is stored in *xids_p. The number of entries in the array * is returned in *nxids_p. + * + * An exclusive lock on TwoPhaseStateLock is taken for the duration of the + * scan of TwoPhaseState data as its may be updated. */ TransactionId PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) @@ -1792,7 +1795,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) int allocsize = 0; int i; - LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { TransactionId xid; @@ -1867,7 +1870,7 @@ StandbyRecoverPreparedTransactions(void) { int i; - LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { TransactionId xid; @@ -1893,7 +1896,8 @@ StandbyRecoverPreparedTransactions(void) * Scan the shared memory entries of TwoPhaseState and reload the state for * each prepared transaction (reacquire locks, etc). * - * This is run during database startup. + * This is run at the end of recovery, but before we allow backends to write + * WAL. * * At the end of recovery the way we take snapshots will change. We now need * to mark all running transactions with their full SubTransSetParent() info @@ -1908,7 +1912,13 @@ RecoverPreparedTransactions(void) int i; /* - * Don't need a lock in the recovery phase. + * It's okay to access TwoPhaseState without a lock here: recovery is + * finished (so if we were a standby, there's no master that can prepare + * transactions anymore), and we haven't yet set WAL as open for writes, + * so local existing backends, if any, cannot do so either. We could use a + * coding pattern similar to restoreTwoPhaseData, i.e., run the whole loop + * with the lock held; but this loop is far more complex, so instead only + * grab the lock while calling the low-level functions that require it. */ for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { @@ -1931,9 +1941,11 @@ RecoverPreparedTransactions(void) * SubTransSetParent has been set before, if the prepared transaction * generated xid assignment records. */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); buf = ProcessTwoPhaseBuffer(xid, gxact->prepare_start_lsn, gxact->ondisk, true, false); + LWLockRelease(TwoPhaseStateLock); if (buf == NULL) continue; @@ -1962,14 +1974,13 @@ RecoverPreparedTransactions(void) /* recovered, so reset the flag for entries generated by redo */ gxact->inredo = false; - LWLockRelease(TwoPhaseStateLock); GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids); MarkAsPrepared(gxact); /* - * Recover other state (notably locks) using resource managers + * Recover other state (notably locks) using resource managers. */ ProcessRecords(bufptr, xid, twophase_recover_callbacks); @@ -2014,6 +2025,8 @@ ProcessTwoPhaseBuffer(TransactionId xid, TwoPhaseFileHeader *hdr; int i; + Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); + if (!fromdisk) Assert(prepare_start_lsn != InvalidXLogRecPtr); @@ -2030,8 +2043,8 @@ ProcessTwoPhaseBuffer(TransactionId xid, else { ereport(WARNING, - (errmsg("removing stale two-phase state from" - " shared memory for \"%u\"", xid))); + (errmsg("removing stale two-phase state from shared memory for \"%u\"", + xid))); PrepareRedoRemove(xid, true); } return NULL; @@ -2308,6 +2321,7 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn) const char *gid; GlobalTransaction gxact; + Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); Assert(RecoveryInProgress()); bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader)); @@ -2324,7 +2338,6 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn) * that it got added in the redo phase */ - LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); /* Get a free gxact from the freelist */ if (TwoPhaseState->freeGXacts == NULL) ereport(ERROR, @@ -2350,17 +2363,17 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn) Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts); TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact; - LWLockRelease(TwoPhaseStateLock); - - elog(DEBUG2, "Adding 2PC data to shared memory %u", gxact->xid); + elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid); } /* * PrepareRedoRemove * - * Remove the corresponding gxact entry from TwoPhaseState. Also - * remove the 2PC file if a prepared transaction was saved via - * an earlier checkpoint. + * Remove the corresponding gxact entry from TwoPhaseState. Also remove + * the 2PC file if a prepared transaction was saved via an earlier checkpoint. + * + * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState + * is updated. */ void PrepareRedoRemove(TransactionId xid, bool giveWarning) @@ -2369,9 +2382,9 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) int i; bool found = false; + Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); Assert(RecoveryInProgress()); - LWLockAcquire(TwoPhaseStateLock, LW_SHARED); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { gxact = TwoPhaseState->prepXacts[i]; @@ -2383,7 +2396,6 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) break; } } - LWLockRelease(TwoPhaseStateLock); /* * Just leave if there is nothing, this is expected during WAL replay. @@ -2394,7 +2406,7 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) /* * And now we can clean up any files we may have left. */ - elog(DEBUG2, "Removing 2PC data from shared memory %u", xid); + elog(DEBUG2, "removing 2PC data for transaction %u", xid); if (gxact->ondisk) RemoveTwoPhaseFile(xid, giveWarning); RemoveGXact(gxact); diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 7e8c598f2a..e09696c37f 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5351,6 +5351,8 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, int i; TimestampTz commit_time; + Assert(TransactionIdIsValid(xid)); + max_xid = TransactionIdLatest(xid, parsed->nsubxacts, parsed->subxacts); /* @@ -5518,6 +5520,8 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid) int i; TransactionId max_xid; + Assert(TransactionIdIsValid(xid)); + /* * Make sure nextXid is beyond any XID mentioned in the record. * @@ -5598,51 +5602,49 @@ xact_redo(XLogReaderState *record) /* Backup blocks are not used in xact records */ Assert(!XLogRecHasAnyBlockRefs(record)); - if (info == XLOG_XACT_COMMIT || info == XLOG_XACT_COMMIT_PREPARED) + if (info == XLOG_XACT_COMMIT) { xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record); xl_xact_parsed_commit parsed; - ParseCommitRecord(XLogRecGetInfo(record), xlrec, - &parsed); - - if (info == XLOG_XACT_COMMIT) - { - Assert(!TransactionIdIsValid(parsed.twophase_xid)); - xact_redo_commit(&parsed, XLogRecGetXid(record), - record->EndRecPtr, XLogRecGetOrigin(record)); - } - else - { - Assert(TransactionIdIsValid(parsed.twophase_xid)); - xact_redo_commit(&parsed, parsed.twophase_xid, - record->EndRecPtr, XLogRecGetOrigin(record)); - - /* Delete TwoPhaseState gxact entry and/or 2PC file. */ - PrepareRedoRemove(parsed.twophase_xid, false); - } + ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed); + xact_redo_commit(&parsed, XLogRecGetXid(record), + record->EndRecPtr, XLogRecGetOrigin(record)); } - else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED) + else if (info == XLOG_XACT_COMMIT_PREPARED) + { + xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record); + xl_xact_parsed_commit parsed; + + ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed); + xact_redo_commit(&parsed, parsed.twophase_xid, + record->EndRecPtr, XLogRecGetOrigin(record)); + + /* Delete TwoPhaseState gxact entry and/or 2PC file. */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + PrepareRedoRemove(parsed.twophase_xid, false); + LWLockRelease(TwoPhaseStateLock); + } + else if (info == XLOG_XACT_ABORT) { xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record); xl_xact_parsed_abort parsed; - ParseAbortRecord(XLogRecGetInfo(record), xlrec, - &parsed); + ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed); + xact_redo_abort(&parsed, XLogRecGetXid(record)); + } + else if (info == XLOG_XACT_ABORT_PREPARED) + { + xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record); + xl_xact_parsed_abort parsed; - if (info == XLOG_XACT_ABORT) - { - Assert(!TransactionIdIsValid(parsed.twophase_xid)); - xact_redo_abort(&parsed, XLogRecGetXid(record)); - } - else - { - Assert(TransactionIdIsValid(parsed.twophase_xid)); - xact_redo_abort(&parsed, parsed.twophase_xid); + ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed); + xact_redo_abort(&parsed, parsed.twophase_xid); - /* Delete TwoPhaseState gxact entry and/or 2PC file. */ - PrepareRedoRemove(parsed.twophase_xid, false); - } + /* Delete TwoPhaseState gxact entry and/or 2PC file. */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + PrepareRedoRemove(parsed.twophase_xid, false); + LWLockRelease(TwoPhaseStateLock); } else if (info == XLOG_XACT_PREPARE) { @@ -5650,9 +5652,11 @@ xact_redo(XLogReaderState *record) * Store xid and start/end pointers of the WAL record in TwoPhaseState * gxact entry. */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); PrepareRedoAdd(XLogRecGetData(record), record->ReadRecPtr, record->EndRecPtr); + LWLockRelease(TwoPhaseStateLock); } else if (info == XLOG_XACT_ASSIGNMENT) {