diff --git a/contrib/test_decoding/logical.conf b/contrib/test_decoding/logical.conf index 367f706651..02595d99d5 100644 --- a/contrib/test_decoding/logical.conf +++ b/contrib/test_decoding/logical.conf @@ -1,2 +1,3 @@ wal_level = logical max_replication_slots = 4 +logical_decoding_work_mem = 64MB diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index a74fd705b4..62b661dc4d 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -989,11 +989,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, nr_txns++; } - /* - * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no - * need to allocate/build a heap then. - */ - /* allocate iteration state */ state = (ReorderBufferIterTXNState *) MemoryContextAllocZero(rb->context, @@ -1009,10 +1004,11 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, state->entries[off].segno = 0; } - /* allocate heap */ - state->heap = binaryheap_allocate(state->nr_txns, - ReorderBufferIterCompare, - state); + /* allocate heap, if we have more than one transaction. */ + if (nr_txns > 1) + state->heap = binaryheap_allocate(state->nr_txns, + ReorderBufferIterCompare, + state); /* Now that the state fields are initialized, it is safe to return it. */ *iter_state = state; @@ -1044,7 +1040,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, state->entries[off].change = cur_change; state->entries[off].txn = txn; - binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); + /* add to heap, only if we have more than one transaction. */ + if (nr_txns > 1) + binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); } /* add subtransactions if they contain changes */ @@ -1073,12 +1071,15 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, state->entries[off].change = cur_change; state->entries[off].txn = cur_txn; - binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); + /* add to heap, only if we have more than one transaction. */ + if (nr_txns > 1) + binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); } } /* assemble a valid binary heap */ - binaryheap_build(state->heap); + if (nr_txns > 1) + binaryheap_build(state->heap); } /* @@ -1094,11 +1095,24 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) ReorderBufferIterTXNEntry *entry; int32 off; - /* nothing there anymore */ - if (state->heap->bh_size == 0) - return NULL; + /* + * If there is only one transaction then it will be at the offset 0. + * Otherwise get the offset from the binary heap. + */ + if (state->nr_txns == 1) + { + off = 0; + if (state->entries[off].change == NULL) + return NULL; + } + else + { + /* nothing there anymore */ + if (state->heap->bh_size == 0) + return NULL; + off = DatumGetInt32(binaryheap_first(state->heap)); + } - off = DatumGetInt32(binaryheap_first(state->heap)); entry = &state->entries[off]; /* free memory we might have "leaked" in the previous *Next call */ @@ -1128,7 +1142,9 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) state->entries[off].lsn = next_change->lsn; state->entries[off].change = next_change; - binaryheap_replace_first(state->heap, Int32GetDatum(off)); + if (state->nr_txns > 1) + binaryheap_replace_first(state->heap, Int32GetDatum(off)); + return change; } @@ -1165,7 +1181,10 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) } /* ok, no changes there anymore, remove */ - binaryheap_remove_first(state->heap); + if (state->nr_txns > 1) + binaryheap_remove_first(state->heap); + else + entry->change = NULL; return change; } @@ -1196,7 +1215,8 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb, Assert(dlist_is_empty(&state->old_change)); } - binaryheap_free(state->heap); + if (state->nr_txns > 1) + binaryheap_free(state->heap); pfree(state); }