diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 5415604..fb69646 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -45,8 +45,8 @@ * fsynced * * If COMMIT happens after checkpoint then backend reads state data from * files - * * In case of crash replay will move data from xlog to files, if that - * hasn't happened before. XXX TODO - move to shmem in replay also + * * Simplified version of the same scenario happens during recovery and + * replication. See comments to KnownPreparedXact structure. * *------------------------------------------------------------------------- */ @@ -181,6 +181,35 @@ static GlobalTransaction MyLockedGxact = NULL; static bool twophaseExitRegistered = false; +/* + * During replay and replication KnownPreparedList holds info about active prepared + * transactions that weren't moved to files yet. We will need that info by the end of + * recovery (including promote) to restore memory state of that transactions. + * + * Naive approach here is to move each PREPARE record to disk, fsync it and don't have + * that list at all, but that provokes a lot of unnecessary fsyncs on small files + * causing replica to be slower than master. + * + * Replay of twophase records happens by the following rules: + * * On PREPARE redo KnownPreparedAdd() is called to add that transaction to + * KnownPreparedList and no more actions are taken. + * * On checkpoint redo we iterate through KnownPreparedList and move all prepare + * records that behind redo_horizon to files and deleting them from list. + * * On COMMIT/ABORT we delete file or entry in KnownPreparedList. + * * At the end of recovery we move all known prepared transactions to disk + * to allow RecoverPreparedTransactions/StandbyRecoverPreparedTransactions + * do their work. + */ +typedef struct KnownPreparedXact +{ + TransactionId xid; + XLogRecPtr prepare_start_lsn; + XLogRecPtr prepare_end_lsn; + dlist_node list_node; +} KnownPreparedXact; + +static dlist_head KnownPreparedList = DLIST_STATIC_INIT(KnownPreparedList); + static void RecordTransactionCommitPrepared(TransactionId xid, int nchildren, TransactionId *children, @@ -1241,9 +1270,9 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings) * Reads 2PC data from xlog. During checkpoint this data will be moved to * twophase files and ReadTwoPhaseFile should be used instead. * - * Note clearly that this function accesses WAL during normal operation, similarly - * to the way WALSender or Logical Decoding would do. It does not run during - * crash recovery or standby processing. + * Note clearly that this function can access WAL during normal operation, similarly + * to the way WALSender or Logical Decoding would do. + * */ static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) @@ -1252,8 +1281,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) XLogReaderState *xlogreader; char *errormsg; - Assert(!RecoveryInProgress()); - xlogreader = XLogReaderAllocate(&read_local_xlog_page, NULL); if (!xlogreader) ereport(ERROR, @@ -1691,6 +1718,15 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) int nxids = 0; int allocsize = 0; + /* + * Move prepared transactions from KnownPreparedList to files, if any. + * It is possible to skip that step and teach subsequent code about + * KnownPreparedList, but whole PrescanPreparedTransactions() happens + * once during end of recovery or promote, so probably it isn't worth + * complications. + */ + KnownPreparedRecreateFiles(InvalidXLogRecPtr); + cldir = AllocateDir(TWOPHASE_DIR); while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL) { @@ -2162,3 +2198,111 @@ RecordTransactionAbortPrepared(TransactionId xid, */ SyncRepWaitForLSN(recptr, false); } + +/* + * KnownPreparedAdd. + * + * Store correspondence of start/end lsn and xid in KnownPreparedList. + * This is called during redo of prepare record to have list of prepared + * transactions that aren't yet moved to 2PC files by the end of recovery. + */ +void +KnownPreparedAdd(XLogReaderState *record) +{ + KnownPreparedXact *xact; + TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) XLogRecGetData(record); + + Assert(RecoveryInProgress()); + + xact = (KnownPreparedXact *) palloc(sizeof(KnownPreparedXact)); + xact->xid = hdr->xid; + xact->prepare_start_lsn = record->ReadRecPtr; + xact->prepare_end_lsn = record->EndRecPtr; + + dlist_push_tail(&KnownPreparedList, &xact->list_node); +} + +/* + * KnownPreparedRemoveByXid + * + * Forget about prepared transaction. Called during commit/abort redo. + */ +void +KnownPreparedRemoveByXid(TransactionId xid) +{ + dlist_mutable_iter miter; + + Assert(RecoveryInProgress()); + + dlist_foreach_modify(miter, &KnownPreparedList) + { + KnownPreparedXact *xact = dlist_container(KnownPreparedXact, + list_node, miter.cur); + + if (xact->xid == xid) + { + dlist_delete(miter.cur); + /* + * Since we found entry in KnownPreparedList we know that file isn't + * on disk yet and we can end up here. + */ + return; + } + } + + /* + * Here we know that file should be moved to disk. But aborting recovery because + * of absence of unnecessary file doesn't seems to be a good idea, so call remove + * with giveWarning=false. + */ + RemoveTwoPhaseFile(xid, false); +} + +/* + * KnownPreparedRecreateFiles + * + * Moves prepare records from WAL to files. Called during checkpoint replay + * or PrescanPreparedTransactions. + * + * redo_horizon = InvalidXLogRecPtr indicates that all transactions from + * KnownPreparedList should be moved to disk. + */ +void +KnownPreparedRecreateFiles(XLogRecPtr redo_horizon) +{ + dlist_mutable_iter miter; + int serialized_xacts = 0; + + Assert(RecoveryInProgress()); + + TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START(); + + dlist_foreach_modify(miter, &KnownPreparedList) + { + KnownPreparedXact *xact = dlist_container(KnownPreparedXact, + list_node, miter.cur); + + if (xact->prepare_end_lsn <= redo_horizon || redo_horizon == InvalidXLogRecPtr) + { + char *buf; + int len; + + XlogReadTwoPhaseData(xact->prepare_start_lsn, &buf, &len); + RecreateTwoPhaseFile(xact->xid, buf, len); + pfree(buf); + dlist_delete(miter.cur); + serialized_xacts++; + } + } + + TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE(); + + if (log_checkpoints && serialized_xacts > 0) + ereport(LOG, + (errmsg_plural("%u two-phase state file was written " + "for long-running prepared transactions", + "%u two-phase state files were written " + "for long-running prepared transactions", + serialized_xacts, + serialized_xacts))); +} diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index d643216..b3e0238 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5604,7 +5604,9 @@ xact_redo(XLogReaderState *record) Assert(TransactionIdIsValid(parsed.twophase_xid)); xact_redo_commit(&parsed, parsed.twophase_xid, record->EndRecPtr, XLogRecGetOrigin(record)); - RemoveTwoPhaseFile(parsed.twophase_xid, false); + + /* Delete KnownPrepared entry or 2PC file. */ + KnownPreparedRemoveByXid(parsed.twophase_xid); } } else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED) @@ -5624,14 +5626,20 @@ xact_redo(XLogReaderState *record) { Assert(TransactionIdIsValid(parsed.twophase_xid)); xact_redo_abort(&parsed, parsed.twophase_xid); - RemoveTwoPhaseFile(parsed.twophase_xid, false); + + /* Delete KnownPrepared entry or 2PC file. */ + KnownPreparedRemoveByXid(parsed.twophase_xid); } } else if (info == XLOG_XACT_PREPARE) { - /* the record contents are exactly the 2PC file */ - RecreateTwoPhaseFile(XLogRecGetXid(record), - XLogRecGetData(record), XLogRecGetDataLen(record)); + /* + * If that transaction will not be committed by the end of recovery then we + * will need 2PC file (the record contents is exactly the 2PC file) to be able + * to commit that later. + * For now store xid and pointers to that record in KnownPreparedList. + */ + KnownPreparedAdd(record); } else if (info == XLOG_XACT_ASSIGNMENT) { diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 084401d..9ac1fd7 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -9488,6 +9488,7 @@ xlog_redo(XLogReaderState *record) (errmsg("unexpected timeline ID %u (should be %u) in checkpoint record", checkPoint.ThisTimeLineID, ThisTimeLineID))); + KnownPreparedRecreateFiles(checkPoint.redo); RecoveryRestartPoint(&checkPoint); } else if (info == XLOG_END_OF_RECOVERY) diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index b7ce0c6..23be08f 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -15,6 +15,7 @@ #define TWOPHASE_H #include "access/xlogdefs.h" +#include "access/xlogreader.h" #include "datatype/timestamp.h" #include "storage/lock.h" @@ -56,4 +57,8 @@ extern void CheckPointTwoPhase(XLogRecPtr redo_horizon); extern void FinishPreparedTransaction(const char *gid, bool isCommit); +extern void KnownPreparedAdd(XLogReaderState *record); +extern void KnownPreparedRemoveByXid(TransactionId xid); +extern void KnownPreparedRecreateFiles(XLogRecPtr redo_horizon); + #endif /* TWOPHASE_H */ diff --git a/src/test/recovery/t/009_twophase.pl b/src/test/recovery/t/009_twophase.pl new file mode 100644 index 0000000..27bccb3 --- /dev/null +++ b/src/test/recovery/t/009_twophase.pl @@ -0,0 +1,315 @@ +# Tests dedicated to two-phase commit in recovery +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 13; + +# Setup master node +my $node_master = get_new_node("master"); +$node_master->init(allows_streaming => 1); +$node_master->append_conf('postgresql.conf', qq( + max_prepared_transactions = 10 + log_checkpoints = true +)); +$node_master->start; +$node_master->backup('master_backup'); +$node_master->psql('postgres', "create table t(id int)"); + +# Setup master node +my $node_slave = get_new_node('slave'); +$node_slave->init_from_backup($node_master, 'master_backup', has_streaming => 1); +$node_slave->start; + +# Switch to synchronous replication +$node_master->append_conf('postgresql.conf', qq( + synchronous_standby_names = '*' +)); +$node_master->psql('postgres', "select pg_reload_conf()"); + +my $psql_out = ''; +my $psql_rc = ''; + +############################################################################### +# Check that we can commit and abort tx after soft restart. +# Here checkpoint happens before shutdown and no WAL replay will occur at next +# startup. In this case postgres re-create shared-memory state from twophase +# files. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (42); + savepoint s1; + insert into t values (43); + prepare transaction 'x'; + begin; + insert into t values (142); + savepoint s1; + insert into t values (143); + prepare transaction 'y';"); +$node_master->stop; +$node_master->start; + +$psql_rc = $node_master->psql('postgres', "commit prepared 'x'"); +is($psql_rc, '0', 'Commit prepared transaction after restart.'); + +$psql_rc = $node_master->psql('postgres', "rollback prepared 'y'"); +is($psql_rc, '0', 'Rollback prepared transaction after restart.'); + +############################################################################### +# Check that we can commit and abort after hard restart. +# At next startup, WAL replay will re-create shared memory state for prepared +# transaction using dedicated WAL records. +############################################################################### + +$node_master->psql('postgres', " + checkpoint; + begin; + insert into t values (42); + savepoint s1; + insert into t values (43); + prepare transaction 'x'; + begin; + insert into t values (142); + savepoint s1; + insert into t values (143); + prepare transaction 'y';"); +$node_master->teardown_node; +$node_master->start; + +$psql_rc = $node_master->psql('postgres', "commit prepared 'x'"); +is($psql_rc, '0', 'Commit prepared tx after teardown.'); + +$psql_rc = $node_master->psql('postgres', "rollback prepared 'y'"); +is($psql_rc, '0', 'Rollback prepared transaction after teardown.'); + +############################################################################### +# Check that WAL replay can handle several transactions with same name GID. +############################################################################### + +$node_master->psql('postgres', " + checkpoint; + begin; + insert into t values (42); + savepoint s1; + insert into t values (43); + prepare transaction 'x'; + commit prepared 'x'; + begin; + insert into t values (42); + savepoint s1; + insert into t values (43); + prepare transaction 'x';"); +$node_master->teardown_node; +$node_master->start; + +$psql_rc = $node_master->psql('postgres', "commit prepared 'x'"); +is($psql_rc, '0', 'Replay several transactions with same GID.'); + +############################################################################### +# Check that WAL replay cleans up its shared memory state and releases locks +# while replaying transaction commits. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (42); + savepoint s1; + insert into t values (43); + prepare transaction 'x'; + commit prepared 'x';"); +$node_master->teardown_node; +$node_master->start; +$psql_rc = $node_master->psql('postgres', "begin; + insert into t values (42); + savepoint s1; + insert into t values (43); + -- This prepare can fail due to conflicting GID or locks conflicts if + -- replay did not fully cleanup its state on previous commit. + prepare transaction 'x';"); +is($psql_rc, '0', "Cleanup of shared memory state for 2PC commit"); + +$node_master->psql('postgres', "commit prepared 'x'"); + +############################################################################### +# Check that WAL replay will cleanup its shared memory state on running slave. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (42); + savepoint s1; + insert into t values (43); + prepare transaction 'x'; + commit prepared 'x';"); +$node_slave->psql('postgres', "select count(*) from pg_prepared_xacts;", + stdout => \$psql_out); +is($psql_out, '0', + "Cleanup of shared memory state on running standby without checkpoint."); + +############################################################################### +# Same as in previous case, but let's force checkpoint on slave between +# prepare and commit to use on-disk twophase files. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (42); + savepoint s1; + insert into t values (43); + prepare transaction 'x';"); +$node_slave->psql('postgres', "checkpoint;"); +$node_master->psql('postgres', "commit prepared 'x';"); +$node_slave->psql('postgres', "select count(*) from pg_prepared_xacts;", + stdout => \$psql_out); +is($psql_out, '0', + "Cleanup of shared memory state on running standby after checkpoint."); + +############################################################################### +# Check that prepared transactions can be committed on promoted slave. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (42); + savepoint s1; + insert into t values (43); + prepare transaction 'x';"); +$node_master->teardown_node; +$node_slave->promote; +$node_slave->poll_query_until('postgres', "SELECT pg_is_in_recovery() <> true"); + +$psql_rc = $node_slave->psql('postgres', "commit prepared 'x';"); +is($psql_rc, '0', "Restore of prepared transaction on promoted slave."); + +# change roles +($node_master, $node_slave) = ($node_slave, $node_master); +$node_slave->enable_streaming($node_master); +$node_slave->append_conf('recovery.conf', qq( +recovery_target_timeline='latest' +)); +$node_slave->start; + +############################################################################### +# Check that prepared transactions are replayed after soft restart of standby +# while master is down. Since standby knows that master is down it uses +# different code path on start to be sure that the status of transactions is +# consistent. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (42); + savepoint s1; + insert into t values (43); + prepare transaction 'x';"); +$node_master->stop; +$node_slave->restart; +$node_slave->promote; +$node_slave->poll_query_until('postgres', "SELECT pg_is_in_recovery() <> true"); + +$node_slave->psql('postgres', "select count(*) from pg_prepared_xacts", + stdout => \$psql_out); +is($psql_out, '1', + "Restore prepared transactions from files with master down."); + +# restore state +($node_master, $node_slave) = ($node_slave, $node_master); +$node_slave->enable_streaming($node_master); +$node_slave->append_conf('recovery.conf', qq( +recovery_target_timeline='latest' +)); +$node_slave->start; +$node_master->psql('postgres', "commit prepared 'x'"); + +############################################################################### +# Check that prepared transactions are correctly replayed after slave hard +# restart while master is down. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (242); + savepoint s1; + insert into t values (243); + prepare transaction 'x'; + "); +$node_master->stop; +$node_slave->teardown_node; +$node_slave->start; +$node_slave->promote; +$node_slave->poll_query_until('postgres', + "SELECT pg_is_in_recovery() <> true"); + +$node_slave->psql('postgres', "select count(*) from pg_prepared_xacts", + stdout => \$psql_out); +is($psql_out, '1', + "Restore prepared transactions from records with master down."); + +# restore state +($node_master, $node_slave) = ($node_slave, $node_master); +$node_slave->enable_streaming($node_master); +$node_slave->append_conf('recovery.conf', qq( +recovery_target_timeline='latest' +)); +$node_slave->start; +$node_master->psql('postgres', "commit prepared 'x'"); + + +############################################################################### +# Check for a lock conflict between prepared tx with DDL inside and replay of +# XLOG_STANDBY_LOCK wal record. +############################################################################### + +$node_master->psql('postgres', " + begin; + create table t2(id int); + savepoint s1; + insert into t2 values (42); + prepare transaction 'x'; + -- checkpoint will issue XLOG_STANDBY_LOCK that can conflict with lock + -- held by 'create table' statement + checkpoint; + commit prepared 'x';"); + +$node_slave->psql('postgres', "select count(*) from pg_prepared_xacts", + stdout => \$psql_out); +is($psql_out, '0', "Replay prepared transaction with DDL."); + + +############################################################################### +# Check that replay will correctly set SUBTRANS and properly advance nextXid +# so it won't conflict with savepoint xids. +############################################################################### + +$node_master->psql('postgres', " + begin; + delete from t; + insert into t values (43); + savepoint s1; + insert into t values (43); + savepoint s2; + insert into t values (43); + savepoint s3; + insert into t values (43); + savepoint s4; + insert into t values (43); + savepoint s5; + insert into t values (43); + prepare transaction 'x'; + checkpoint;"); + +$node_master->stop; +$node_master->start; +$node_master->psql('postgres', " + -- here we can get xid of previous savepoint if nextXid + -- wasn't properly advanced + begin; + insert into t values (142); + abort; + commit prepared 'x';"); + +$node_master->psql('postgres', "select count(*) from t", + stdout => \$psql_out); +is($psql_out, '6', "Check nextXid handling for prepared subtransactions");