From 4dfb923fb4eea66cffb58de3175ae44fa56350b3 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Mon, 18 Nov 2019 11:42:31 +0530 Subject: [PATCH 07/17] Implement streaming mode in ReorderBuffer Instead of serializing the transaction to disk after reaching the maximum number of changes in memory (4096 changes), we consume the changes we have in memory and invoke new stream API methods. This happens in ReorderBufferStreamTXN() using about the same logic as in ReorderBufferCommit() logic. We can do this incremental processing thanks to having assignments (associating subxact with toplevel xacts) in WAL right away, and thanks to logging the invalidation messages. This adds a second iterator for the streaming case, without the spill-to-disk functionality and only processing changes currently in memory. Theoretically, we could get rid of the k-way merge, and append the changes to the toplevel xact directly (and remember the position in the list in case the subxact gets aborted later). It also adds ReorderBufferTXN pointer to two places: * ReorderBufferChange, so that we know which xact it belongs to * ReorderBufferTXN, pointing to toplevel xact (from subxact) The output plugin can use this to decide which changes to discard in case of stream_abort_cb (e.g. when a subxact gets discarded). --- src/backend/access/heap/heapam_visibility.c | 38 +- .../replication/logical/reorderbuffer.c | 1075 ++++++++++++++++- src/include/replication/reorderbuffer.h | 32 + 3 files changed, 1112 insertions(+), 33 deletions(-) diff --git a/src/backend/access/heap/heapam_visibility.c b/src/backend/access/heap/heapam_visibility.c index 3e3646716f..cf10dd041d 100644 --- a/src/backend/access/heap/heapam_visibility.c +++ b/src/backend/access/heap/heapam_visibility.c @@ -1571,8 +1571,23 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot, htup, buffer, &cmin, &cmax); + /* + * If we haven't resolved the combocid to cmin/cmax, that means + * we have not decoded the combocid yet. That means the cmin is + * definitely in the future, and we're not supposed to see the + * tuple yet. + * + * XXX This only applies to decoding of in-progress transactions. + * In regular logical decoding we only execute this code at commit + * time, at which point we should have seen all relevant combocids. + * So we should error out in this case. + * + * XXX For the streaming case, we can track the largest combocid + * assigned, and error out based on this (when unable to resolve + * combocid below that observed maximum value). + */ if (!resolved) - elog(ERROR, "could not resolve cmin/cmax of catalog tuple"); + return false; Assert(cmin != InvalidCommandId); @@ -1642,10 +1657,23 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot, htup, buffer, &cmin, &cmax); - if (!resolved) - elog(ERROR, "could not resolve combocid to cmax"); - - Assert(cmax != InvalidCommandId); + /* + * If we haven't resolved the combocid to cmin/cmax, that means + * we have not decoded the combocid yet. That means the cmax is + * definitely in the future, and we're still supposed to see the + * tuple. + * + * XXX This only applies to decoding of in-progress transactions. + * In regular logical decoding we only execute this code at commit + * time, at which point we should have seen all relevant combocids. + * So we should error out in this case. + * + * XXX For the streaming case, we can track the largest combocid + * assigned, and error out based on this (when unable to resolve + * combocid below that observed maximum value). + */ + if (!resolved || cmax == InvalidCommandId) + return true; if (cmax >= snapshot->curcid) return true; /* deleted after scan started */ diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 76d2701233..f02c47238a 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -149,6 +149,28 @@ typedef struct ReorderBufferIterTXNState ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]; } ReorderBufferIterTXNState; +/* + * k-way in-order change iteration support structures + * + * This is a simplified version for streaming, which does not require + * serialization to files and only reads changes that are currently in + * memory. + */ +typedef struct ReorderBufferStreamIterTXNEntry +{ + XLogRecPtr lsn; + ReorderBufferChange *change; + ReorderBufferTXN *txn; +} ReorderBufferStreamIterTXNEntry; + +typedef struct ReorderBufferStreamIterTXNState +{ + binaryheap *heap; + Size nr_txns; + dlist_head old_change; + ReorderBufferStreamIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]; +} ReorderBufferStreamIterTXNState; + /* toast datastructures */ typedef struct ReorderBufferToastEnt { @@ -213,6 +235,20 @@ static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state); static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn); + +/* iterator for streaming (only get data from memory) */ +static ReorderBufferStreamIterTXNState * ReorderBufferStreamIterTXNInit( + ReorderBuffer *rb, + ReorderBufferTXN *txn); + +static ReorderBufferChange *ReorderBufferStreamIterTXNNext( + ReorderBuffer *rb, + ReorderBufferStreamIterTXNState * state); + +static void ReorderBufferStreamIterTXNFinish( + ReorderBuffer *rb, + ReorderBufferStreamIterTXNState * state); + /* * --------------------------------------- * Disk serialization support functions @@ -227,6 +263,7 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferCleanupSerializedTXNs(const char *slotname); static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno); @@ -235,6 +272,15 @@ static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap); static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid); +/* + * --------------------------------------- + * Streaming support functions + * --------------------------------------- + */ +static bool ReorderBufferCanStream(ReorderBuffer *rb); +static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn); + /* --------------------------------------- * toast reassembly support * --------------------------------------- @@ -362,6 +408,9 @@ ReorderBufferGetTXN(ReorderBuffer *rb) dlist_init(&txn->tuplecids); dlist_init(&txn->subtxns); + /* InvalidCommandId is not zero, so set it explicitly */ + txn->command_id = InvalidCommandId; + return txn; } @@ -759,6 +808,33 @@ AssertTXNLsnOrder(ReorderBuffer *rb) #endif } +static void +AssertChangeLsnOrder(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ +#ifdef USE_ASSERT_CHECKING + dlist_iter iter; + XLogRecPtr prev_lsn = txn->first_lsn; + + dlist_foreach(iter, &txn->changes) + { + ReorderBufferChange *cur_change; + + cur_change = dlist_container(ReorderBufferChange, node, iter.cur); + + Assert(txn->first_lsn != InvalidXLogRecPtr); + Assert(cur_change->lsn != InvalidXLogRecPtr); + Assert(txn->first_lsn <= cur_change->lsn); + + if (txn->end_lsn != InvalidXLogRecPtr) + Assert(cur_change->lsn <= txn->end_lsn); + + Assert(prev_lsn <= cur_change->lsn); + + prev_lsn = cur_change->lsn; + } +#endif +} + /* * ReorderBufferGetOldestTXN * Return oldest transaction in reorderbuffer @@ -855,6 +931,9 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, subtxn->toplevel_xid = xid; Assert(subtxn->nsubtxns == 0); + /* set the reference to toplevel transaction */ + subtxn->toptxn = txn; + /* add to subtransaction list */ dlist_push_tail(&txn->subtxns, &subtxn->node); txn->nsubtxns++; @@ -978,7 +1057,7 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, */ /* - * Binary heap comparison function. + * Binary heap comparison function (regular non-streaming iterator). */ static int ReorderBufferIterCompare(Datum a, Datum b, void *arg) @@ -1006,6 +1085,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) dlist_iter cur_txn_i; int32 off; + /* Check ordering of changes in the toplevel transaction. */ + AssertChangeLsnOrder(rb, txn); + /* * Calculate the size of our heap: one element for every transaction that * contains changes. (Besides the transactions already in the reorder @@ -1020,6 +1102,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur); + /* Check ordering of changes in this subtransaction. */ + AssertChangeLsnOrder(rb, cur_txn); + if (cur_txn->nentries > 0) nr_txns++; } @@ -1234,6 +1319,210 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb, pfree(state); } +/* + * Binary heap comparison function (streaming iterator). + */ +static int +ReorderBufferStreamIterCompare(Datum a, Datum b, void *arg) +{ + ReorderBufferStreamIterTXNState *state = (ReorderBufferStreamIterTXNState *) arg; + XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn; + XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn; + + if (pos_a < pos_b) + return 1; + else if (pos_a == pos_b) + return 0; + return -1; +} + +/* + * Allocate & initialize an iterator which iterates in lsn order over a + * transaction and all its subtransactions. This version is meant for + * streaming of incomplete transactions. + */ +static ReorderBufferStreamIterTXNState * +ReorderBufferStreamIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + Size nr_txns = 0; + ReorderBufferStreamIterTXNState *state; + dlist_iter cur_txn_i; + int32 off; + + /* Check ordering of changes in the toplevel transaction. */ + AssertChangeLsnOrder(rb, txn); + + /* + * Calculate the size of our heap: one element for every transaction that + * contains changes. (Besides the transactions already in the reorder + * buffer, we count the one we were directly passed.) + */ + if (txn->nentries > 0) + nr_txns++; + + dlist_foreach(cur_txn_i, &txn->subtxns) + { + ReorderBufferTXN *cur_txn; + + cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur); + + /* Check ordering of changes in this subtransaction. */ + AssertChangeLsnOrder(rb, cur_txn); + + if (cur_txn->nentries > 0) + nr_txns++; + } + + /* + * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no + * need to allocate/build a heap then. + */ + + /* allocate iteration state */ + state = (ReorderBufferStreamIterTXNState *) + MemoryContextAllocZero(rb->context, + sizeof(ReorderBufferStreamIterTXNState) + + sizeof(ReorderBufferStreamIterTXNEntry) * nr_txns); + + state->nr_txns = nr_txns; + dlist_init(&state->old_change); + + /* allocate heap */ + state->heap = binaryheap_allocate(state->nr_txns, + ReorderBufferStreamIterCompare, + state); + + /* + * Now insert items into the binary heap, in an unordered fashion. (We + * will run a heap assembly step at the end; this is more efficient.) + */ + + off = 0; + + /* add toplevel transaction if it contains changes */ + if (txn->nentries > 0) + { + ReorderBufferChange *cur_change; + + cur_change = dlist_head_element(ReorderBufferChange, node, + &txn->changes); + + state->entries[off].lsn = cur_change->lsn; + state->entries[off].change = cur_change; + state->entries[off].txn = txn; + + binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); + } + + /* add subtransactions if they contain changes */ + dlist_foreach(cur_txn_i, &txn->subtxns) + { + ReorderBufferTXN *cur_txn; + + cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur); + + if (cur_txn->nentries > 0) + { + ReorderBufferChange *cur_change; + + cur_change = dlist_head_element(ReorderBufferChange, node, + &cur_txn->changes); + + state->entries[off].lsn = cur_change->lsn; + state->entries[off].change = cur_change; + state->entries[off].txn = cur_txn; + + binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); + } + } + + Assert(off == nr_txns); + + /* assemble a valid binary heap */ + binaryheap_build(state->heap); + + return state; +} + +/* + * Return the next change when iterating over a transaction and its + * subtransactions. + * + * Returns NULL when no further changes exist. + */ +static ReorderBufferChange * +ReorderBufferStreamIterTXNNext(ReorderBuffer *rb, ReorderBufferStreamIterTXNState * state) +{ + ReorderBufferChange *change; + ReorderBufferStreamIterTXNEntry *entry; + int32 off; + + /* nothing there anymore */ + if (state->heap->bh_size == 0) + return NULL; + + off = DatumGetInt32(binaryheap_first(state->heap)); + entry = &state->entries[off]; + + /* free memory we might have "leaked" in the previous *Next call */ + if (!dlist_is_empty(&state->old_change)) + { + change = dlist_container(ReorderBufferChange, node, + dlist_pop_head_node(&state->old_change)); + ReorderBufferReturnChange(rb, change); + Assert(dlist_is_empty(&state->old_change)); + } + + change = entry->change; + + /* + * update heap with information about which transaction has the next + * relevant change in LSN order + */ + + /* there are in-memory changes */ + if (dlist_has_next(&entry->txn->changes, &entry->change->node)) + { + dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node); + ReorderBufferChange *next_change = + dlist_container(ReorderBufferChange, node, next); + + /* txn stays the same */ + state->entries[off].lsn = next_change->lsn; + state->entries[off].change = next_change; + + binaryheap_replace_first(state->heap, Int32GetDatum(off)); + return change; + } + + /* ok, no changes there anymore, remove */ + binaryheap_remove_first(state->heap); + + return change; +} + +/* + * Deallocate the iterator + */ +static void +ReorderBufferStreamIterTXNFinish(ReorderBuffer *rb, + ReorderBufferStreamIterTXNState * state) +{ + /* free memory we might have "leaked" in the last *Next call */ + if (!dlist_is_empty(&state->old_change)) + { + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, node, + dlist_pop_head_node(&state->old_change)); + ReorderBufferReturnChange(rb, change); + Assert(dlist_is_empty(&state->old_change)); + } + + binaryheap_free(state->heap); + pfree(state); +} + /* * Cleanup the contents of a transaction, usually after the transaction * committed or aborted. @@ -1327,33 +1616,104 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) } /* - * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by - * HeapTupleSatisfiesHistoricMVCC. + * Discard changes from a transaction (and subtransactions), after streaming + * them. Keep the remaining info - transactions, tuplecids and snapshots. */ static void -ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn) +ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) { - dlist_iter iter; - HASHCTL hash_ctl; + dlist_mutable_iter iter; - if (!rbtxn_has_catalog_changes(txn) || dlist_is_empty(&txn->tuplecids)) - return; + /* cleanup subtransactions & their changes */ + dlist_foreach_modify(iter, &txn->subtxns) + { + ReorderBufferTXN *subtxn; - memset(&hash_ctl, 0, sizeof(hash_ctl)); + subtxn = dlist_container(ReorderBufferTXN, node, iter.cur); - hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey); - hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt); - hash_ctl.hcxt = rb->context; + /* + * Subtransactions are always associated to the toplevel TXN, even if + * they originally were happening inside another subtxn, so we won't + * ever recurse more than one level deep here. + */ + Assert(rbtxn_is_known_subxact(subtxn)); + Assert(subtxn->nsubtxns == 0); - /* - * create the hash with the exact number of to-be-stored tuplecids from - * the start - */ - txn->tuplecid_hash = - hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + ReorderBufferTruncateTXN(rb, subtxn); + } - dlist_foreach(iter, &txn->tuplecids) + /* cleanup changes in the toplevel txn */ + dlist_foreach_modify(iter, &txn->changes) + { + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, node, iter.cur); + + /* remove the change from it's containing list */ + dlist_delete(&change->node); + + ReorderBufferReturnChange(rb, change); + } + + /* + * Mark the transaction as streamed. + * + * The toplevel transaction, identified by (toptxn==NULL), is marked + * as streamed always, even if it does not contain any changes (that + * is, when all the changes are in subtransactions). + * + * For subtransactions, we only mark them as streamed when there are + * any changes in them. + * + * We do it this way because of aborts - we don't want to send aborts + * for XIDs the downstream is not aware of. And of course, it always + * knows about the toplevel xact (we send the XID in all messages), + * but we never stream XIDs of empty subxacts. + */ + if ((!txn->toptxn) || (txn->nentries_mem != 0)) + txn->txn_flags |= RBTXN_IS_STREAMED; + + /* also reset the number of entries in the transaction */ + txn->nentries_mem = 0; + txn->nentries = 0; +} + +/* + * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by + * tqual.c's HeapTupleSatisfiesHistoricMVCC. + * + * We do build the hash table even if there are no CIDs. That's + * because when streaming in-progress transactions we may run into + * tuples with the CID before actually decoding them. Think e.g. about + * INSERT followed by TRUNCATE, where the TRUNCATE may not be decoded + * yet when applying the INSERT. So we build a hash table so that + * ResolveCminCmaxDuringDecoding does not segfault in this case. + * + * XXX We might limit this behavior to streaming mode, and just bail + * out when decoding transaction at commit time (at which point it's + * guaranteed to see all CIDs). + */ +static void +ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + dlist_iter iter; + HASHCTL hash_ctl; + + memset(&hash_ctl, 0, sizeof(hash_ctl)); + + hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey); + hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt); + hash_ctl.hcxt = rb->context; + + /* + * create the hash with the exact number of to-be-stored tuplecids from + * the start + */ + txn->tuplecid_hash = + hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + + dlist_foreach(iter, &txn->tuplecids) { ReorderBufferTupleCidKey key; ReorderBufferTupleCidEnt *ent; @@ -1403,6 +1763,16 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn) } } +static void +ReorderBufferDestroyTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + if (txn->tuplecid_hash != NULL) + { + hash_destroy(txn->tuplecid_hash); + txn->tuplecid_hash = NULL; + } +} + /* * Copy a provided snapshot so we can modify it privately. This is needed so * that catalog modifying transactions can look into intermediate catalog @@ -1476,6 +1846,19 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap) SnapBuildSnapDecRefcount(snap); } +static void +ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + /* we should only call this for previously streamed transactions */ + Assert(rbtxn_is_streamed(txn)); + + ReorderBufferStreamTXN(rb, txn); + + rb->stream_commit(rb, txn, txn->final_lsn); + + ReorderBufferCleanupTXN(rb, txn); +} + /* * Perform the replay of a transaction and its non-aborted subtransactions. * @@ -1514,6 +1897,22 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, txn->origin_id = origin_id; txn->origin_lsn = origin_lsn; + /* + * If the transaction was (partially) streamed, we need to commit it in a + * 'streamed' way. That is, we first stream the remaining part of the + * transaction, and then invoke stream_commit message. + * + * XXX Called after everything (origin ID and LSN, ...) is stored in the + * transaction, so we don't pass that directly. + * + * XXX Somewhat hackish redirection, perhaps needs to be refactored? + */ + if (rbtxn_is_streamed(txn)) + { + ReorderBufferStreamCommit(rb, txn); + return; + } + /* * If this transaction has no snapshot, it didn't make any changes to the * database, so there's nothing to decode. Note that @@ -1549,6 +1948,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, PG_TRY(); { + XLogRecPtr prev_lsn = InvalidXLogRecPtr; ReorderBufferChange *change; ReorderBufferChange *specinsert = NULL; @@ -1565,6 +1965,16 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, Relation relation = NULL; Oid reloid; + /* + * Enforce correct ordering of changes, merged from multiple + * subtransactions. The changes may have the same LSN due to + * MULTI_INSERT xlog records. + */ + if (prev_lsn != InvalidXLogRecPtr) + Assert(prev_lsn <= change->lsn); + + prev_lsn = change->lsn; + switch (change->action) { case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: @@ -1929,6 +2339,13 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) if (txn == NULL) return; + /* + * When the (sub)transaction was streamed, notify the remote node + * about the abort. + */ + if (rbtxn_is_streamed(txn)) + rb->stream_abort(rb, txn, lsn); + /* cosmetic... */ txn->final_lsn = lsn; @@ -2013,6 +2430,13 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) if (txn == NULL) return; + /* + * When the (sub)transaction was streamed, notify the remote node + * about the abort. + */ + if (rbtxn_is_streamed(txn)) + rb->stream_abort(rb, txn, lsn); + /* cosmetic... */ txn->final_lsn = lsn; @@ -2148,8 +2572,17 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, } /* - * Update the memory accounting info. We track memory used by the whole - * reorder buffer and the transaction containing the change. + * Update memory counters to account for the new or removed change. + * + * We update two counters - in the reorder buffer, and in the transaction + * containing the change. The reorder buffer counter allows us to quickly + * decide if we reached the memory limit, the transaction counter allows + * us to quickly pick the largest transaction for eviction. + * + * When streaming is enabled, we need to update the toplevel transaction + * counters instead - we don't really care about subtransactions as we + * can't stream them individually anyway, and we only pick toplevel + * transactions for eviction. So only toplevel transactions matter. */ static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, @@ -2157,6 +2590,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, bool addition) { Size sz; + ReorderBufferTXN *txn; Assert(change->txn); @@ -2168,19 +2602,28 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID) return; + txn = change->txn; + + /* if subxact, and streaming supported, use the toplevel instead */ + if (txn->toptxn && ReorderBufferCanStream(rb)) + txn = txn->toptxn; + sz = ReorderBufferChangeSize(change); if (addition) { - change->txn->size += sz; + txn->size += sz; rb->size += sz; } else { - Assert((rb->size >= sz) && (change->txn->size >= sz)); - change->txn->size -= sz; + Assert((rb->size >= sz) && (txn->size >= sz)); + txn->size -= sz; rb->size -= sz; } + + Assert(txn->size <= rb->size); + Assert((txn->size >= 0) && (rb->size >= 0)); } /* @@ -2209,6 +2652,7 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, change->lsn = lsn; change->txn = txn; change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID; + change->txn = txn; dlist_push_tail(&txn->tuplecids, &change->node); txn->ntuplecids++; @@ -2284,6 +2728,9 @@ ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn) for (i = 0; i < txn->ninvalidations; i++) LocalExecuteInvalidationMessage(&txn->invalidations[i]); + + /* Invalidate current schema as well */ + txn->is_schema_sent = false; } /* @@ -2298,6 +2745,23 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; + + /* + * We read catalog changes from WAL, which are not yet sent, so + * invalidate current schema in order output plugin can resend + * schema again. + */ + txn->is_schema_sent = false; + + /* + * TOCHECK: Mark toplevel transaction as having catalog changes too + * if one of its children has. + */ + if (txn->toptxn != NULL) + { + txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; + txn->toptxn->is_schema_sent = false; + } } /* @@ -2401,6 +2865,38 @@ ReorderBufferLargestTXN(ReorderBuffer *rb) return largest; } +/* + * Find the largest toplevel transaction to evict (by streaming). + * + * This can be seen as an optimized version of ReorderBufferLargestTXN, which + * should give us the same transaction (because we don't update memory account + * for subtransaction with streaming, so it's always 0). But we can simply + * iterate over the limited number of toplevel transactions. + */ +static ReorderBufferTXN * +ReorderBufferLargestTopTXN(ReorderBuffer *rb) +{ + dlist_iter iter; + ReorderBufferTXN *largest = NULL; + + dlist_foreach(iter, &rb->toplevel_by_lsn) + { + ReorderBufferTXN *txn; + + txn = dlist_container(ReorderBufferTXN, node, iter.cur); + + /* if the current transaction is larger, remember it */ + if ((!largest) || (txn->size > largest->size)) + largest = txn; + } + + Assert(largest); + Assert(largest->size > 0); + Assert(largest->size <= rb->size); + + return largest; +} + /* * Check whether the logical_decoding_work_mem limit was reached, and if yes * pick the transaction to evict and spill the changes to disk. @@ -2421,15 +2917,46 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) /* * Pick the largest transaction (or subtransaction) and evict it from - * memory by serializing it to disk. + * memory by streaming, if supported. Otherwise spill to disk. */ - txn = ReorderBufferLargestTXN(rb); + if (ReorderBufferCanStream(rb)) + { + /* + * Pick the largest toplevel transaction and evict it from memory by + * streaming the already decoded part. + */ + txn = ReorderBufferLargestTopTXN(rb); + + /* we know there has to be one, because the size is not zero */ + Assert(txn && !txn->toptxn); + Assert(txn->size > 0); + Assert(rb->size >= txn->size); + + ReorderBufferStreamTXN(rb, txn); + } + else + { + /* + * Pick the largest transaction (or subtransaction) and evict it from + * memory by serializing it to disk. + */ + txn = ReorderBufferLargestTXN(rb); + + /* we know there has to be one, because the size is not zero */ + Assert(txn); + Assert(txn->size > 0); + Assert(rb->size >= txn->size); - ReorderBufferSerializeTXN(rb, txn); + ReorderBufferSerializeTXN(rb, txn); + } /* * After eviction, the transaction should have no entries in memory, and * should use 0 bytes for changes. + * + * XXX Checking the size is fine for both cases - spill to disk and + * streaming. But for streaming we should really check nentries_mem for + * all subtransactions too. */ Assert(txn->size == 0); Assert(txn->nentries_mem == 0); @@ -2722,6 +3249,498 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Assert(ondisk->change.action == change->action); } +static bool +ReorderBufferCanStream(ReorderBuffer *rb) +{ + LogicalDecodingContext *ctx = rb->private_data; + + return ctx->streaming; +} + +/* + * Send data of a large transaction (and its subtransactions) to the + * output plugin, but using the stream API. + * + * XXX Do we need to check if the transaction has some changes to stream + * (maybe it got streamed right before the commit, which attempts to + * stream it again before the commit)? + */ +static void +ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + volatile Snapshot snapshot_now; + volatile CommandId command_id; + bool using_subtxn; + Size streamed = 0; + ReorderBufferStreamIterTXNState *volatile iterstate = NULL; + + /* + * If this is a subxact, we need to stream the top-level transaction + * instead. + */ + if (txn->toptxn) + { + ReorderBufferStreamTXN(rb, txn->toptxn); + return; + } + + /* + * XXX Not sure if we can make any assumptions about base snapshot here, + * similarly to what ReorderBufferCommit() does. That relies on + * base_snapshot getting transferred from subxact in + * ReorderBufferCommitChild(), but that was not yet called as the + * transaction is in-progress. + * + * So just walk the subxacts and use the same logic here. But we only need + * to do that once, when the transaction is streamed for the first time. + * After that we need to reuse the snapshot from the previous run. + */ + if (txn->snapshot_now == NULL) + { + dlist_iter subxact_i; + + /* make sure this transaction is streamed for the first time */ + Assert(!rbtxn_is_streamed(txn)); + + /* at the beginning we should have invalid command ID */ + Assert(txn->command_id == InvalidCommandId); + + dlist_foreach(subxact_i, &txn->subtxns) + { + ReorderBufferTXN *subtxn; + + subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur); + + if (subtxn->base_snapshot != NULL && + (txn->base_snapshot == NULL || + txn->base_snapshot_lsn > subtxn->base_snapshot_lsn)) + { + txn->base_snapshot = subtxn->base_snapshot; + txn->base_snapshot_lsn = subtxn->base_snapshot_lsn; + subtxn->base_snapshot = NULL; + subtxn->base_snapshot_lsn = InvalidXLogRecPtr; + } + } + + command_id = FirstCommandId; + snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot, + txn, command_id); + } + else + { + /* the transaction must have been already streamed */ + Assert(rbtxn_is_streamed(txn)); + + /* + * Nah, we already have snapshot from the previous streaming run. We + * assume new subxacts can't move the LSN backwards, and so can't beat + * the LSN condition in the previous branch (so no need to walk + * through subxacts again). In fact, we must not do that as we may be + * using snapshot half-way through the subxact. + */ + command_id = txn->command_id; + + /* + * TOCHECK: We have to rebuild historic snapshot to be sure it includes all + * information about subtransactions, which could arrive after streaming start. + */ + if (!txn->is_schema_sent) + snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot, + txn, command_id); + } + + /* + * build data to be able to lookup the CommandIds of catalog tuples + */ + ReorderBufferBuildTupleCidHash(rb, txn); + + /* setup the initial snapshot */ + SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, txn->xid); + + /* + * Decoding needs access to syscaches et al., which in turn use + * heavyweight locks and such. Thus we need to have enough state around to + * keep track of those. The easiest way is to simply use a transaction + * internally. That also allows us to easily enforce that nothing writes + * to the database by checking for xid assignments. + * + * When we're called via the SQL SRF there's already a transaction + * started, so start an explicit subtransaction there. + */ + using_subtxn = IsTransactionOrTransactionBlock(); + + PG_TRY(); + { + XLogRecPtr prev_lsn = InvalidXLogRecPtr; + ReorderBufferChange *change; + ReorderBufferChange *specinsert = NULL; + + if (using_subtxn) + BeginInternalSubTransaction("stream"); + else + StartTransactionCommand(); + + /* start streaming this chunk of transaction */ + rb->stream_start(rb, txn); + + iterstate = ReorderBufferStreamIterTXNInit(rb, txn); + while ((change = ReorderBufferStreamIterTXNNext(rb, iterstate)) != NULL) + { + Relation relation = NULL; + Oid reloid; + + /* + * Enforce correct ordering of changes, merged from multiple + * subtransactions. The changes may have the same LSN due to + * MULTI_INSERT xlog records. + */ + if (prev_lsn != InvalidXLogRecPtr) + Assert(prev_lsn <= change->lsn); + + prev_lsn = change->lsn; + + /* we're going to stream this change */ + streamed++; + + switch (change->action) + { + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + + /* + * Confirmation for speculative insertion arrived. Simply + * use as a normal record. It'll be cleaned up at the end + * of INSERT processing. + */ + Assert(specinsert->data.tp.oldtuple == NULL); + change = specinsert; + change->action = REORDER_BUFFER_CHANGE_INSERT; + + /* intentionally fall through */ + case REORDER_BUFFER_CHANGE_INSERT: + case REORDER_BUFFER_CHANGE_UPDATE: + case REORDER_BUFFER_CHANGE_DELETE: + Assert(snapshot_now); + + reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode, + change->data.tp.relnode.relNode); + + /* + * Catalog tuple without data, emitted while catalog was + * in the process of being rewritten. + */ + if (reloid == InvalidOid && + change->data.tp.newtuple == NULL && + change->data.tp.oldtuple == NULL) + goto change_done; + else if (reloid == InvalidOid) + elog(ERROR, "could not map filenode \"%s\" to relation OID", + relpathperm(change->data.tp.relnode, + MAIN_FORKNUM)); + + relation = RelationIdGetRelation(reloid); + + if (relation == NULL) + elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")", + reloid, + relpathperm(change->data.tp.relnode, + MAIN_FORKNUM)); + + if (!RelationIsLogicallyLogged(relation)) + goto change_done; + + /* + * For now ignore sequence changes entirely. Most of the + * time they don't log changes using records we + * understand, so it doesn't make sense to handle the few + * cases we do. + */ + if (relation->rd_rel->relkind == RELKIND_SEQUENCE) + goto change_done; + + /* user-triggered change */ + if (!IsToastRelation(relation)) + { + ReorderBufferToastReplace(rb, txn, relation, change); + rb->stream_change(rb, txn, relation, change); + + /* + * Only clear reassembled toast chunks if we're sure + * they're not required anymore. The creator of the + * tuple tells us. + */ + if (change->data.tp.clear_toast_afterwards) + ReorderBufferToastReset(rb, txn); + } + /* we're not interested in toast deletions */ + else if (change->action == REORDER_BUFFER_CHANGE_INSERT) + { + /* + * Need to reassemble the full toasted Datum in + * memory, to ensure the chunks don't get reused till + * we're done remove it from the list of this + * transaction's changes. Otherwise it will get + * freed/reused while restoring spooled data from + * disk. + */ + dlist_delete(&change->node); + ReorderBufferToastAppendChunk(rb, txn, relation, + change); + } + + change_done: + + /* + * Either speculative insertion was confirmed, or it was + * unsuccessful and the record isn't needed anymore. + */ + if (specinsert != NULL) + { + ReorderBufferReturnChange(rb, specinsert); + specinsert = NULL; + } + + if (relation != NULL) + { + RelationClose(relation); + relation = NULL; + } + break; + + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: + + /* + * Speculative insertions are dealt with by delaying the + * processing of the insert until the confirmation record + * arrives. For that we simply unlink the record from the + * chain, so it does not get freed/reused while restoring + * spooled data from disk. + * + * This is safe in the face of concurrent catalog changes + * because the relevant relation can't be changed between + * speculative insertion and confirmation due to + * CheckTableNotInUse() and locking. + */ + + /* clear out a pending (and thus failed) speculation */ + if (specinsert != NULL) + { + ReorderBufferReturnChange(rb, specinsert); + specinsert = NULL; + } + + /* and memorize the pending insertion */ + dlist_delete(&change->node); + specinsert = change; + break; + + case REORDER_BUFFER_CHANGE_TRUNCATE: + { + int i; + int nrelids = change->data.truncate.nrelids; + int nrelations = 0; + Relation *relations; + + relations = palloc0(nrelids * sizeof(Relation)); + for (i = 0; i < nrelids; i++) + { + Oid relid = change->data.truncate.relids[i]; + Relation relation; + + relation = RelationIdGetRelation(relid); + + if (relation == NULL) + elog(ERROR, "could not open relation with OID %u", relid); + + if (!RelationIsLogicallyLogged(relation)) + continue; + + relations[nrelations++] = relation; + } + + rb->stream_truncate(rb, txn, nrelations, relations, change); + + for (i = 0; i < nrelations; i++) + RelationClose(relations[i]); + + break; + } + + case REORDER_BUFFER_CHANGE_MESSAGE: + + rb->stream_message(rb, txn, change->lsn, true, + change->data.msg.prefix, + change->data.msg.message_size, + change->data.msg.message); + break; + + case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: + /* get rid of the old */ + TeardownHistoricSnapshot(false); + + if (snapshot_now->copied) + { + ReorderBufferFreeSnap(rb, snapshot_now); + snapshot_now = + ReorderBufferCopySnap(rb, change->data.snapshot, + txn, command_id); + } + + /* + * Restored from disk, need to be careful not to double + * free. We could introduce refcounting for that, but for + * now this seems infrequent enough not to care. + */ + else if (change->data.snapshot->copied) + { + snapshot_now = + ReorderBufferCopySnap(rb, change->data.snapshot, + txn, command_id); + } + else + { + snapshot_now = change->data.snapshot; + } + + /* + * TOCHECK: Snapshot changed, then invalidate current schema to reflect + * possible catalog changes. + */ + txn->is_schema_sent = false; + + /* and continue with the new one */ + SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, + txn->xid); + break; + + case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: + Assert(change->data.command_id != InvalidCommandId); + + if (command_id < change->data.command_id) + { + command_id = change->data.command_id; + + if (!snapshot_now->copied) + { + /* we don't use the global one anymore */ + snapshot_now = ReorderBufferCopySnap(rb, snapshot_now, + txn, command_id); + } + + snapshot_now->curcid = command_id; + + TeardownHistoricSnapshot(false); + SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, + txn->xid); + } + + break; + + case REORDER_BUFFER_CHANGE_INVALIDATION: + + /* + * Execute the invalidation message locally. + * + * XXX Do we need to care about relcacheInitFileInval and + * the other fields added to ReorderBufferChange, or just + * about the message itself? + */ + LocalExecuteInvalidationMessage(&change->data.inval.msg); + txn->is_schema_sent = false; + break; + + case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: + elog(ERROR, "tuplecid value in changequeue"); + break; + } + } + + /* + * There's a speculative insertion remaining, just clean in up, it + * can't have been successful, otherwise we'd gotten a confirmation + * record. + */ + if (specinsert) + { + ReorderBufferReturnChange(rb, specinsert); + specinsert = NULL; + } + + /* clean up the iterator */ + ReorderBufferStreamIterTXNFinish(rb, iterstate); + iterstate = NULL; + + /* call stream_stop callback */ + rb->stream_stop(rb, txn); + + /* this is just a sanity check against bad output plugin behaviour */ + if (GetCurrentTransactionIdIfAny() != InvalidTransactionId) + elog(ERROR, "output plugin used XID %u", + GetCurrentTransactionId()); + + /* remember the command ID and snapshot for the streaming run */ + txn->command_id = command_id; + txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now, + txn, command_id); + + /* cleanup */ + TeardownHistoricSnapshot(false); + + /* + * Destroy the (relfilenode, ctid) hashtable, so that we don't leak + * any memory. We could also keep the hash table and update it with + * new ctid values, but this seems simpler and good enough for now. + */ + ReorderBufferDestroyTupleCidHash(rb, txn); + + /* + * Aborting the current (sub-)transaction as a whole has the right + * semantics. We want all locks acquired in here to be released, not + * reassigned to the parent and we do not want any database access + * have persistent effects. + */ + AbortCurrentTransaction(); + + /* make sure there's no cache pollution */ + ReorderBufferExecuteInvalidations(rb, txn); + + if (using_subtxn) + RollbackAndReleaseCurrentSubTransaction(); + } + PG_CATCH(); + { + /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */ + if (iterstate) + ReorderBufferStreamIterTXNFinish(rb, iterstate); + + TeardownHistoricSnapshot(true); + + /* + * Force cache invalidation to happen outside of a valid transaction + * to prevent catalog access as we just caught an error. + */ + AbortCurrentTransaction(); + + /* make sure there's no cache pollution */ + ReorderBufferExecuteInvalidations(rb, txn); + + if (using_subtxn) + RollbackAndReleaseCurrentSubTransaction(); + + PG_RE_THROW(); + } + PG_END_TRY(); + + /* + * Discard the changes that we just streamed, and mark the transactions + * as streamed (if they contained changes). + */ + ReorderBufferTruncateTXN(rb, txn); + + Assert(dlist_is_empty(&txn->changes)); + Assert(txn->nentries == 0); + Assert(txn->nentries_mem == 0); +} + /* * Size of a change in memory. */ diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 19c7bac8c0..7d08e2fd39 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -173,6 +173,7 @@ typedef struct ReorderBufferChange #define RBTXN_HAS_CATALOG_CHANGES 0x0001 #define RBTXN_IS_SUBXACT 0x0002 #define RBTXN_IS_SERIALIZED 0x0004 +#define RBTXN_IS_STREAMED 0x0008 /* does the txn have catalog changes */ #define rbtxn_has_catalog_changes(txn) (txn->txn_flags & RBTXN_HAS_CATALOG_CHANGES) @@ -187,6 +188,20 @@ typedef struct ReorderBufferChange */ #define rbtxn_is_serialized(txn) (txn->txn_flags & RBTXN_IS_SERIALIZED) +/* + * Has this transaction been streamed to downstream? Similarly to spilling + * to disk, it's not trivial to deduce this from nentries and nentries_mem, + * for various reasons. For example, all changes may be in subtransactions + * in which case we'd have nentries==0 for the toplevel one, and it'd say + * nothing about the streaming. So we maintain this flag, but only for the + * toplevel transaction. + * + * Note: We never stream and serialize a transaction at the same time (e + * only do spill to disk when streaming is not supported by the plugin), + * so only one of those two flags may be set at any given time. + */ +#define rbtxn_is_streamed(txn) (txn->txn_flags & RBTXN_IS_STREAMED) + typedef struct ReorderBufferTXN { int txn_flags; @@ -221,6 +236,16 @@ typedef struct ReorderBufferTXN */ XLogRecPtr final_lsn; + /* + * Do we need to send schema for this transaction in output plugin? + */ + bool is_schema_sent; + + /* + * Toplevel transaction for this subxact (NULL for top-level). + */ + struct ReorderBufferTXN *toptxn; + /* * LSN pointing to the end of the commit record + 1. */ @@ -251,6 +276,13 @@ typedef struct ReorderBufferTXN XLogRecPtr base_snapshot_lsn; dlist_node base_snapshot_node; /* link in txns_by_base_snapshot_lsn */ + /* + * Snapshot/CID from the previous streaming run. Only valid for already + * streamed transactions (NULL/InvalidCommandId otherwise). + */ + Snapshot snapshot_now; + CommandId command_id; + /* * How many ReorderBufferChange's do we have in this txn. * -- 2.21.0