From c9d0c5da2641d0746657d465893bb65eb9d73b7e Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Fri, 22 Nov 2019 12:43:38 +0530 Subject: [PATCH 17/17] Extend handling of concurrent aborts for streaming transaction --- .../replication/logical/reorderbuffer.c | 36 +++++++++++++++++-- src/include/replication/reorderbuffer.h | 5 +++ 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 5883d14a02..0987032208 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2349,9 +2349,9 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) /* * When the (sub)transaction was streamed, notify the remote node - * about the abort. + * about the abort only if we have sent any data for this transaction. */ - if (rbtxn_is_streamed(txn)) + if (rbtxn_is_streamed(txn) && txn->any_data_sent) rb->stream_abort(rb, txn, lsn); /* cosmetic... */ @@ -3267,6 +3267,7 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) volatile CommandId command_id; bool using_subtxn; Size streamed = 0; + MemoryContext ccxt = CurrentMemoryContext; ReorderBufferStreamIterTXNState *volatile iterstate = NULL; /* @@ -3396,6 +3397,13 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* we're going to stream this change */ streamed++; + /* + * Set the CheckXidAlive to the current (sub)xid for which this + * change belongs to so that we can detect the abort while we are + * decoding. + */ + CheckXidAlive = change->txn->xid; + switch (change->action) { case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: @@ -3457,6 +3465,10 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferToastReplace(rb, txn, relation, change); rb->stream_change(rb, txn, relation, change); + /* Remember that we have sent some data for this txn.*/ + if (!change->txn->any_data_sent) + change->txn->any_data_sent = true; + /* * Only clear reassembled toast chunks if we're sure * they're not required anymore. The creator of the @@ -3695,6 +3707,9 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) } PG_CATCH(); { + MemoryContext ecxt = MemoryContextSwitchTo(ccxt); + ErrorData *errdata = CopyErrorData(); + /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */ if (iterstate) ReorderBufferStreamIterTXNFinish(rb, iterstate); @@ -3713,7 +3728,22 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); - PG_RE_THROW(); + /* re-throw only if it's not an abort */ + if (errdata->sqlerrcode != ERRCODE_TRANSACTION_ROLLBACK) + { + MemoryContextSwitchTo(ecxt); + PG_RE_THROW(); + } + else + { + /* remember the command ID and snapshot for the streaming run */ + txn->command_id = command_id; + txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now, + txn, command_id); + rb->stream_stop(rb, txn); + + FlushErrorState(); + } } PG_END_TRY(); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index e132c3c5ea..6186465a85 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -236,6 +236,11 @@ typedef struct ReorderBufferTXN */ XLogRecPtr final_lsn; + /* + * Have we sent any changes for this transaction in output plugin? + */ + bool any_data_sent; + /* * Toplevel transaction for this subxact (NULL for top-level). */ -- 2.21.0