From 3656702c911785df27422ecd177e506723b0e33f Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Mon, 18 Nov 2019 16:32:34 +0530 Subject: [PATCH 06/17] Gracefully handle concurrent aborts of uncommitted transactions that are being decoded alongside. When a transaction aborts, it's changes are considered unnecessary for other transactions. That means the changes may be either cleaned up by vacuum or removed from HOT chains (thus made inaccessible through indexes), and there may be other such consequences. When decoding committed transactions this is not an issue, and we never decode transactions that abort before the decoding starts. But for in-progress transactions - for example when decoding prepared transactions on PREPARE (and not COMMIT PREPARED as before), this may cause failures when the output plugin consults catalogs (both system and user-defined). We handle such failures by returning ERRCODE_TRANSACTION_ROLLBACK sqlerrcode from system table scan APIs to the backend decoding a specific uncommitted transaction. The decoding logic on the receipt of such an sqlerrcode aborts the ongoing decoding and returns gracefully. --- doc/src/sgml/logicaldecoding.sgml | 5 +- src/backend/access/heap/heapam.c | 51 +++++++++++++++++++ src/backend/access/index/genam.c | 34 +++++++++++++ .../replication/logical/reorderbuffer.c | 9 ++-- src/backend/utils/time/snapmgr.c | 25 ++++++++- src/include/utils/snapmgr.h | 4 +- 6 files changed, 120 insertions(+), 8 deletions(-) diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index fc4ad65eae..da6a6f3233 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -432,7 +432,10 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); ALTER TABLE user_catalog_table SET (user_catalog_table = true); CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true); - Any actions leading to transaction ID assignment are prohibited. That, among others, + Note that access to user catalog tables or regular system catalog tables + in the output plugins has to be done via the systable_* scan APIs only. + Access via the heap_* scan APIs will error out. + Additionally, any actions leading to transaction ID assignment are prohibited. That, among others, includes writing to tables, performing DDL changes, and calling txid_current(). diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 0128bb34ef..2a60a7380a 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -1303,6 +1303,17 @@ heap_getnext(TableScanDesc sscan, ScanDirection direction) (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg_internal("only heap AM is supported"))); + /* + * We don't expect direct calls to heap_getnext with valid + * CheckXidAlive for regular tables. Track that below. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && + !(IsCatalogRelation(scan->rs_base.rs_rd) || + RelationIsUsedAsCatalogTable(scan->rs_base.rs_rd)))) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("improper heap_getnext call"))); + /* Note: no locking manipulations needed */ HEAPDEBUG_1; /* heap_getnext( info ) */ @@ -1421,6 +1432,16 @@ heap_fetch(Relation relation, OffsetNumber offnum; bool valid; + /* + * We don't expect direct calls to heap_fetch with valid + * CheckXidAlive for regular tables. Track that below. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && + !(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation)))) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("improper heap_fetch call"))); + /* * Fetch and pin the appropriate page of the relation. */ @@ -1535,6 +1556,16 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer, bool valid; bool skip; + /* + * We don't expect direct calls to heap_hot_search_buffer with + * valid CheckXidAlive for regular tables. Track that below. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && + !(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation)))) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("improper heap_hot_search_buffer call"))); + /* If this is not the first call, previous call returned a (live!) tuple */ if (all_dead) *all_dead = first_call; @@ -1682,6 +1713,16 @@ heap_get_latest_tid(TableScanDesc sscan, */ Assert(ItemPointerIsValid(tid)); + /* + * We don't expect direct calls to heap_get_latest_tid with valid + * CheckXidAlive for regular tables. Track that below. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && + !(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation)))) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("improper heap_get_latest_tid call"))); + /* * Loop to chase down t_ctid links. At top of loop, ctid is the tuple we * need to examine, and *tid is the TID we will return if ctid turns out @@ -5481,6 +5522,16 @@ heap_finish_speculative(Relation relation, ItemPointer tid) ItemId lp = NULL; HeapTupleHeader htup; + /* + * We don't expect direct calls to heap_hot_search with + * valid CheckXidAlive for regular tables. Track that below. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && + !(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation)))) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("improper heap_hot_search call"))); + buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); page = (Page) BufferGetPage(buffer); diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c index 2599b5d342..201acfbbf2 100644 --- a/src/backend/access/index/genam.c +++ b/src/backend/access/index/genam.c @@ -28,6 +28,7 @@ #include "lib/stringinfo.h" #include "miscadmin.h" #include "storage/bufmgr.h" +#include "storage/procarray.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -477,6 +478,17 @@ systable_getnext(SysScanDesc sysscan) } } + /* + * If CheckXidAlive is valid, then we check if it aborted. If it did, we + * error out + */ + if (TransactionIdIsValid(CheckXidAlive) && + !TransactionIdIsInProgress(CheckXidAlive) && + !TransactionIdDidCommit(CheckXidAlive)) + ereport(ERROR, + (errcode(ERRCODE_TRANSACTION_ROLLBACK), + errmsg("transaction aborted during system catalog scan"))); + return htup; } @@ -513,6 +525,17 @@ systable_recheck_tuple(SysScanDesc sysscan, HeapTuple tup) sysscan->slot, freshsnap); + /* + * If CheckXidAlive is valid, then we check if it aborted. If it did, we + * error out + */ + if (TransactionIdIsValid(CheckXidAlive) && + !TransactionIdIsInProgress(CheckXidAlive) && + !TransactionIdDidCommit(CheckXidAlive)) + ereport(ERROR, + (errcode(ERRCODE_TRANSACTION_ROLLBACK), + errmsg("transaction aborted during system catalog scan"))); + return result; } @@ -639,6 +662,17 @@ systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction) if (htup && sysscan->iscan->xs_recheck) elog(ERROR, "system catalog scans with lossy index conditions are not implemented"); + /* + * If CheckXidAlive is valid, then we check if it aborted. If it did, we + * error out + */ + if (TransactionIdIsValid(CheckXidAlive) && + !TransactionIdIsInProgress(CheckXidAlive) && + !TransactionIdDidCommit(CheckXidAlive)) + ereport(ERROR, + (errcode(ERRCODE_TRANSACTION_ROLLBACK), + errmsg("transaction aborted during system catalog scan"))); + return htup; } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index f74c1996d0..76d2701233 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -683,7 +683,7 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); /* setup snapshot to allow catalog access */ - SetupHistoricSnapshot(snapshot_now, NULL); + SetupHistoricSnapshot(snapshot_now, NULL, xid); PG_TRY(); { rb->message(rb, txn, lsn, false, prefix, message_size, message); @@ -1533,7 +1533,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ReorderBufferBuildTupleCidHash(rb, txn); /* setup the initial snapshot */ - SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash); + SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, xid); /* * Decoding needs access to syscaches et al., which in turn use @@ -1784,7 +1784,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, /* and continue with the new one */ - SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash); + SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, xid); break; case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: @@ -1804,7 +1804,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, snapshot_now->curcid = command_id; TeardownHistoricSnapshot(false); - SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash); + SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, xid); } break; @@ -1876,6 +1876,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, PG_CATCH(); { /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */ + if (iterstate) ReorderBufferIterTXNFinish(rb, iterstate); diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 47b0517596..9fa1e43347 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -153,6 +153,13 @@ static Snapshot SecondarySnapshot = NULL; static Snapshot CatalogSnapshot = NULL; static Snapshot HistoricSnapshot = NULL; +/* + * An xid value pointing to a possibly ongoing or a prepared transaction. + * Currently used in logical decoding. It's possible that such transactions + * can get aborted while the decoding is ongoing. + */ +TransactionId CheckXidAlive = InvalidTransactionId; + /* * These are updated by GetSnapshotData. We initialize them this way * for the convenience of TransactionIdIsInProgress: even in bootstrap @@ -2029,10 +2036,14 @@ MaintainOldSnapshotTimeMapping(TimestampTz whenTaken, TransactionId xmin) * Setup a snapshot that replaces normal catalog snapshots that allows catalog * access to behave just like it did at a certain point in the past. * + * If a valid xid is passed in, we check if it is uncommitted and track it in + * CheckXidAlive. This is to re-check XID status while accessing catalog. + * * Needed for logical decoding. */ void -SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids) +SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids, + TransactionId snapshot_xid) { Assert(historic_snapshot != NULL); @@ -2041,8 +2052,17 @@ SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids) /* setup (cmin, cmax) lookup hash */ tuplecid_data = tuplecids; -} + /* + * setup CheckXidAlive if it's not committed yet. We don't check + * if the xid aborted. That will happen during catalog access. + */ + if (TransactionIdIsValid(snapshot_xid) && + !TransactionIdDidCommit(snapshot_xid)) + CheckXidAlive = snapshot_xid; + else + CheckXidAlive = InvalidTransactionId; +} /* * Make catalog snapshots behave normally again. @@ -2052,6 +2072,7 @@ TeardownHistoricSnapshot(bool is_error) { HistoricSnapshot = NULL; tuplecid_data = NULL; + CheckXidAlive = InvalidTransactionId; } bool diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h index 67b07df48c..9a8f9ceba3 100644 --- a/src/include/utils/snapmgr.h +++ b/src/include/utils/snapmgr.h @@ -145,8 +145,10 @@ extern bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot); /* Support for catalog timetravel for logical decoding */ struct HTAB; +extern TransactionId CheckXidAlive; extern struct HTAB *HistoricSnapshotGetTupleCids(void); -extern void SetupHistoricSnapshot(Snapshot snapshot_now, struct HTAB *tuplecids); +extern void SetupHistoricSnapshot(Snapshot snapshot_now, struct HTAB *tuplecids, + TransactionId snapshot_xid); extern void TeardownHistoricSnapshot(bool is_error); extern bool HistoricSnapshotActive(void); -- 2.21.0