[PATCH 08/16] Introduce the ApplyCache module which can reassemble transactions from a stream of interspersed changes

From: Andres Freund <andres(at)2ndquadrant(dot)com>
To: pgsql-hackers(at)postgresql(dot)org
Subject: [PATCH 08/16] Introduce the ApplyCache module which can reassemble transactions from a stream of interspersed changes
Date: 2012-06-13 11:28:39
Message-ID: 1339586927-13156-8-git-send-email-andres@2ndquadrant.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

From: Andres Freund <andres(at)anarazel(dot)de>

The individual changes need to be identified by an xid. The xid can be a
subtransaction or a toplevel one, at commit those can be reintegrated by doing
a k-way mergesort between the individual transaction.

Callbacks for apply_begin, apply_change and apply_commit are provided to
retrieve complete transactions.

Missing:
- spill-to-disk
- correct subtransaction merge, current behaviour is simple/wrong
- DDL handling (?)
- resource usage controls
---
src/backend/replication/Makefile | 2 +
src/backend/replication/logical/Makefile | 19 ++
src/backend/replication/logical/applycache.c | 380 ++++++++++++++++++++++++++
src/include/replication/applycache.h | 185 +++++++++++++
4 files changed, 586 insertions(+)
create mode 100644 src/backend/replication/logical/Makefile
create mode 100644 src/backend/replication/logical/applycache.c
create mode 100644 src/include/replication/applycache.h

diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile
index 9d9ec87..ae7f6b1 100644
--- a/src/backend/replication/Makefile
+++ b/src/backend/replication/Makefile
@@ -17,6 +17,8 @@ override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
repl_gram.o syncrep.o

+SUBDIRS = logical
+
include $(top_srcdir)/src/backend/common.mk

# repl_scanner is compiled as part of repl_gram
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
new file mode 100644
index 0000000..2eadab8
--- /dev/null
+++ b/src/backend/replication/logical/Makefile
@@ -0,0 +1,19 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+# Makefile for src/backend/replication/logical
+#
+# IDENTIFICATION
+# src/backend/replication/logical/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/replication/logical
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
+
+OBJS = applycache.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/applycache.c b/src/backend/replication/logical/applycache.c
new file mode 100644
index 0000000..b73b0ba
--- /dev/null
+++ b/src/backend/replication/logical/applycache.c
@@ -0,0 +1,380 @@
+/*-------------------------------------------------------------------------
+ *
+ * applycache.c
+ *
+ * PostgreSQL logical replay "cache" management
+ *
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/replication/applycache.c
+ *
+ */
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/xact.h"
+#include "catalog/pg_class.h"
+#include "catalog/pg_control.h"
+#include "replication/applycache.h"
+
+#include "utils/ilist.h"
+#include "utils/memutils.h"
+#include "utils/relcache.h"
+#include "utils/syscache.h"
+
+const Size max_memtries = 1<<16;
+
+const size_t max_cached_changes = 1024;
+const size_t max_cached_tuplebufs = 1024; /* ~8MB */
+const size_t max_cached_transactions = 512;
+
+typedef struct ApplyCacheTXNByIdEnt
+{
+ TransactionId xid;
+ ApplyCacheTXN* txn;
+} ApplyCacheTXNByIdEnt;
+
+static ApplyCacheTXN* ApplyCacheGetTXN(ApplyCache *cache);
+static void ApplyCacheReturnTXN(ApplyCache *cache, ApplyCacheTXN* txn);
+
+static ApplyCacheTXN* ApplyCacheTXNByXid(ApplyCache*, TransactionId xid, bool create);
+
+
+ApplyCache*
+ApplyCacheAllocate(void)
+{
+ ApplyCache* cache = (ApplyCache*)malloc(sizeof(ApplyCache));
+ HASHCTL hash_ctl;
+
+ if (!cache)
+ elog(ERROR, "Could not allocate the ApplyCache");
+
+ memset(&hash_ctl, 0, sizeof(hash_ctl));
+
+ cache->context = AllocSetContextCreate(TopMemoryContext,
+ "ApplyCache",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ hash_ctl.keysize = sizeof(TransactionId);
+ hash_ctl.entrysize = sizeof(ApplyCacheTXNByIdEnt);
+ hash_ctl.hash = tag_hash;
+ hash_ctl.hcxt = cache->context;
+
+ cache->by_txn = hash_create("ApplyCacheByXid", 1000, &hash_ctl,
+ HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+
+ cache->nr_cached_transactions = 0;
+ cache->nr_cached_changes = 0;
+ cache->nr_cached_tuplebufs = 0;
+
+ ilist_d_init(&cache->cached_transactions);
+ ilist_d_init(&cache->cached_changes);
+ ilist_s_init(&cache->cached_tuplebufs);
+
+ return cache;
+}
+
+void ApplyCacheFree(ApplyCache* cache)
+{
+ /* FIXME: check for in-progress transactions */
+ /* FIXME: clean up cached transaction */
+ /* FIXME: clean up cached changes */
+ /* FIXME: clean up cached tuplebufs */
+ hash_destroy(cache->by_txn);
+ free(cache);
+}
+
+static ApplyCacheTXN* ApplyCacheGetTXN(ApplyCache *cache)
+{
+ ApplyCacheTXN* txn;
+
+ if (cache->nr_cached_transactions)
+ {
+ cache->nr_cached_transactions--;
+ txn = ilist_container(ApplyCacheTXN, node,
+ ilist_d_pop_front(&cache->cached_transactions));
+ }
+ else
+ {
+ txn = (ApplyCacheTXN*)
+ malloc(sizeof(ApplyCacheTXN));
+
+ if (!txn)
+ elog(ERROR, "Could not allocate a ApplyCacheTXN struct");
+ }
+
+ memset(txn, 0, sizeof(ApplyCacheTXN));
+ ilist_d_init(&txn->changes);
+ ilist_d_init(&txn->subtxns);
+ return txn;
+}
+
+void ApplyCacheReturnTXN(ApplyCache *cache, ApplyCacheTXN* txn)
+{
+ if(cache->nr_cached_transactions < max_cached_transactions){
+ cache->nr_cached_transactions++;
+ ilist_d_push_front(&cache->cached_transactions, &txn->node);
+ }
+ else{
+ free(txn);
+ }
+}
+
+ApplyCacheChange*
+ApplyCacheGetChange(ApplyCache* cache)
+{
+ ApplyCacheChange* change;
+
+ if (cache->nr_cached_changes)
+ {
+ cache->nr_cached_changes--;
+ change = ilist_container(ApplyCacheChange, node,
+ ilist_d_pop_front(&cache->cached_changes));
+ }
+ else
+ {
+ change = (ApplyCacheChange*)malloc(sizeof(ApplyCacheChange));
+
+ if (!change)
+ elog(ERROR, "Could not allocate a ApplyCacheChange struct");
+ }
+
+
+ memset(change, 0, sizeof(ApplyCacheChange));
+ return change;
+}
+
+void
+ApplyCacheReturnChange(ApplyCache* cache, ApplyCacheChange* change)
+{
+ if (change->newtuple)
+ ApplyCacheReturnTupleBuf(cache, change->newtuple);
+ if (change->oldtuple)
+ ApplyCacheReturnTupleBuf(cache, change->oldtuple);
+
+ if (change->table)
+ heap_freetuple(change->table);
+
+ if(cache->nr_cached_changes < max_cached_changes){
+ cache->nr_cached_changes++;
+ ilist_d_push_front(&cache->cached_changes, &change->node);
+ }
+ else{
+ free(change);
+ }
+}
+
+ApplyCacheTupleBuf*
+ApplyCacheGetTupleBuf(ApplyCache* cache)
+{
+ ApplyCacheTupleBuf* tuple;
+
+ if (cache->nr_cached_tuplebufs)
+ {
+ cache->nr_cached_tuplebufs--;
+ tuple = ilist_container(ApplyCacheTupleBuf, node,
+ ilist_s_pop_front(&cache->cached_tuplebufs));
+ }
+ else
+ {
+ tuple =
+ (ApplyCacheTupleBuf*)malloc(sizeof(ApplyCacheTupleBuf));
+
+ if (!tuple)
+ elog(ERROR, "Could not allocate a ApplyCacheTupleBuf struct");
+ }
+
+ return tuple;
+}
+
+void
+ApplyCacheReturnTupleBuf(ApplyCache* cache, ApplyCacheTupleBuf* tuple)
+{
+ if(cache->nr_cached_tuplebufs < max_cached_tuplebufs){
+ cache->nr_cached_tuplebufs++;
+ ilist_s_push_front(&cache->cached_tuplebufs, &tuple->node);
+ }
+ else{
+ free(tuple);
+ }
+}
+
+
+static
+ApplyCacheTXN*
+ApplyCacheTXNByXid(ApplyCache* cache, TransactionId xid, bool create)
+{
+ ApplyCacheTXNByIdEnt* ent;
+ bool found;
+
+ ent = (ApplyCacheTXNByIdEnt*)
+ hash_search(cache->by_txn,
+ (void *)&xid,
+ (create ? HASH_ENTER : HASH_FIND),
+ &found);
+
+ if (found)
+ {
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "found cache entry for %u at %p", xid, ent);
+#endif
+ }
+ else
+ {
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "didn't find cache entry for %u in %p at %p, creating %u",
+ xid, cache, ent, create);
+#endif
+ }
+
+ if (!found && !create)
+ return NULL;
+
+ if (!found)
+ {
+ ent->txn = ApplyCacheGetTXN(cache);
+ }
+
+ return ent->txn;
+}
+
+void
+ApplyCacheAddChange(ApplyCache* cache, TransactionId xid, XLogRecPtr lsn,
+ ApplyCacheChange* change)
+{
+ ApplyCacheTXN* txn = ApplyCacheTXNByXid(cache, xid, true);
+ txn->lsn = lsn;
+ ilist_d_push_back(&txn->changes, &change->node);
+}
+
+
+void
+ApplyCacheCommitChild(ApplyCache* cache, TransactionId xid,
+ TransactionId subxid, XLogRecPtr lsn)
+{
+ ApplyCacheTXN* txn;
+ ApplyCacheTXN* subtxn;
+
+ subtxn = ApplyCacheTXNByXid(cache, subxid, false);
+
+ /*
+ * No need to do anything if that subtxn didn't contain any changes
+ */
+ if (!subtxn)
+ return;
+
+ subtxn->lsn = lsn;
+
+ txn = ApplyCacheTXNByXid(cache, xid, true);
+
+ ilist_d_push_back(&txn->subtxns, &subtxn->node);
+}
+
+void
+ApplyCacheCommit(ApplyCache* cache, TransactionId xid, XLogRecPtr lsn)
+{
+ ApplyCacheTXN* txn = ApplyCacheTXNByXid(cache, xid, false);
+ ilist_d_node* cur_change, *next_change;
+ ilist_d_node* cur_txn, *next_txn;
+ bool found;
+
+ if (!txn)
+ return;
+
+ txn->lsn = lsn;
+
+ cache->begin(cache, txn);
+
+ /*
+ * FIXME:
+ * do a k-way mergesort of all changes ordered by xid
+ *
+ * For now we just iterate through all subtransactions and then through the
+ * main txn. But thats *WRONG*.
+ *
+ * The best way to do is probably to model the current heads of all TXNs as
+ * a heap and always remove from the smallest lsn till thats not the case
+ * anymore.
+ */
+ ilist_d_foreach_modify (cur_txn, next_txn, &txn->subtxns)
+ {
+ ApplyCacheTXN* subtxn = ilist_container(ApplyCacheTXN, node, cur_txn);
+
+ ilist_d_foreach_modify (cur_change, next_change, &subtxn->changes)
+ {
+ ApplyCacheChange* change =
+ ilist_container(ApplyCacheChange, node, cur_change);
+ cache->apply_change(cache, txn, subtxn, change);
+
+ ApplyCacheReturnChange(cache, change);
+ }
+ ApplyCacheReturnTXN(cache, subtxn);
+ }
+
+ ilist_d_foreach_modify (cur_change, next_change, &txn->changes)
+ {
+ ApplyCacheChange* change =
+ ilist_container(ApplyCacheChange, node, cur_change);
+ cache->apply_change(cache, txn, NULL, change);
+
+ ApplyCacheReturnChange(cache, change);
+ }
+
+ cache->commit(cache, txn);
+
+ /* now remove reference from cache */
+ hash_search(cache->by_txn,
+ (void *)&xid,
+ HASH_REMOVE,
+ &found);
+ Assert(found);
+
+ ApplyCacheReturnTXN(cache, txn);
+}
+
+void
+ApplyCacheAbort(ApplyCache* cache, TransactionId xid, XLogRecPtr lsn)
+{
+ ilist_d_node* cur_change, *next_change;
+ ilist_d_node* cur_txn, *next_txn;
+ ApplyCacheTXN* txn = ApplyCacheTXNByXid(cache, xid, false);
+ bool found;
+
+ /* no changes in this commit */
+ if (!txn)
+ return;
+
+ /* iterate through all subtransactions and free memory */
+ ilist_d_foreach_modify (cur_txn, next_txn, &txn->subtxns)
+ {
+ ApplyCacheTXN* subtxn = ilist_container(ApplyCacheTXN, node, cur_txn);
+ ilist_d_foreach_modify (cur_change, next_change, &subtxn->changes)
+ {
+ ApplyCacheChange* change =
+ ilist_container(ApplyCacheChange, node, cur_change);
+ ApplyCacheReturnChange(cache, change);
+ }
+ ApplyCacheReturnTXN(cache, subtxn);
+ }
+
+ ilist_d_foreach_modify (cur_change, next_change, &txn->changes)
+ {
+ ApplyCacheChange* change =
+ ilist_container(ApplyCacheChange, node, cur_change);
+ ApplyCacheReturnChange(cache, change);
+ }
+
+ /* now remove reference from cache */
+ hash_search(cache->by_txn,
+ (void *)&xid,
+ HASH_REMOVE,
+ &found);
+ Assert(found);
+
+ ApplyCacheReturnTXN(cache, txn);
+}
diff --git a/src/include/replication/applycache.h b/src/include/replication/applycache.h
new file mode 100644
index 0000000..4ceba63
--- /dev/null
+++ b/src/include/replication/applycache.h
@@ -0,0 +1,185 @@
+/*
+ * applycache.h
+ *
+ * PostgreSQL logical replay "cache" management
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/replication/applycache.h
+ */
+#ifndef APPLYCACHE_H
+#define APPLYCACHE_H
+
+#include "access/htup.h"
+#include "utils/hsearch.h"
+#include "utils/ilist.h"
+
+typedef struct ApplyCache ApplyCache;
+
+enum ApplyCacheChangeType
+{
+ APPLY_CACHE_CHANGE_INSERT,
+ APPLY_CACHE_CHANGE_UPDATE,
+ APPLY_CACHE_CHANGE_DELETE
+};
+
+typedef struct ApplyCacheTupleBuf
+{
+ /* position in preallocated list */
+ ilist_s_node node;
+
+ HeapTupleData tuple;
+ HeapTupleHeaderData header;
+ char data[MaxHeapTupleSize];
+} ApplyCacheTupleBuf;
+
+typedef struct ApplyCacheChange
+{
+ XLogRecPtr lsn;
+ enum ApplyCacheChangeType action;
+
+ ApplyCacheTupleBuf* newtuple;
+
+ ApplyCacheTupleBuf* oldtuple;
+
+ HeapTuple table;
+
+ /*
+ * While in use this is how a change is linked into a transactions,
+ * otherwise its the preallocated list.
+ */
+ ilist_d_node node;
+} ApplyCacheChange;
+
+typedef struct ApplyCacheTXN
+{
+ TransactionId xid;
+
+ XLogRecPtr lsn;
+
+ /*
+ * How many ApplyCacheChange's do we have in this txn.
+ *
+ * Subtransactions are *not* included.
+ */
+ Size nentries;
+
+ /*
+ * How many of the above entries are stored in memory in contrast to being
+ * spilled to disk.
+ */
+ Size nentries_mem;
+
+ /*
+ * List of actual changes
+ */
+ ilist_d_head changes;
+
+ /*
+ * non-hierarchical list of subtransactions that are *not* aborted
+ */
+ ilist_d_head subtxns;
+
+ /*
+ * our position in a list of subtransactions while the TXN is in
+ * use. Otherwise its the position in the list of preallocated
+ * transactions.
+ */
+ ilist_d_node node;
+} ApplyCacheTXN;
+
+
+/* XXX: were currently passing the originating subtxn. Not sure thats necessary */
+typedef void (*ApplyCacheApplyChangeCB)(ApplyCache* cache, ApplyCacheTXN* txn, ApplyCacheTXN* subtxn, ApplyCacheChange* change);
+typedef void (*ApplyCacheBeginCB)(ApplyCache* cache, ApplyCacheTXN* txn);
+typedef void (*ApplyCacheCommitCB)(ApplyCache* cache, ApplyCacheTXN* txn);
+
+/*
+ * max number of concurrent top-level transactions or transaction where we
+ * don't know if they are top-level can be calculated by:
+ * (max_connections + max_prepared_xactx + ?) * PGPROC_MAX_CACHED_SUBXIDS
+ */
+struct ApplyCache
+{
+ TransactionId last_txn;
+ ApplyCacheTXN *last_txn_cache;
+ HTAB *by_txn;
+
+ ApplyCacheBeginCB begin;
+ ApplyCacheApplyChangeCB apply_change;
+ ApplyCacheCommitCB commit;
+
+ void* private_data;
+
+ MemoryContext context;
+
+ /*
+ * we don't want to repeatedly (de-)allocated those structs, so cache them for reusage.
+ */
+ ilist_d_head cached_transactions;
+ size_t nr_cached_transactions;
+
+ ilist_d_head cached_changes;
+ size_t nr_cached_changes;
+
+ ilist_s_head cached_tuplebufs;
+ size_t nr_cached_tuplebufs;
+};
+
+
+ApplyCache*
+ApplyCacheAllocate(void);
+
+void
+ApplyCacheFree(ApplyCache*);
+
+ApplyCacheTupleBuf*
+ApplyCacheGetTupleBuf(ApplyCache*);
+
+void
+ApplyCacheReturnTupleBuf(ApplyCache* cache, ApplyCacheTupleBuf* tuple);
+
+/*
+ * Returns a (potentically preallocated) change struct. Its lifetime is managed
+ * by the applycache module.
+ *
+ * If not added to a transaction with ApplyCacheAddChange it needs to be
+ * returned via ApplyCacheReturnChange
+ *
+ * FIXME: better name
+ */
+ApplyCacheChange*
+ApplyCacheGetChange(ApplyCache*);
+
+/*
+ * Return an unused ApplyCacheChange struct
+ */
+void
+ApplyCacheReturnChange(ApplyCache*, ApplyCacheChange*);
+
+
+/*
+ * record the transaction as in-progress if not already done, add the current
+ * change.
+ *
+ * We have a one-entry cache for lookin up the current ApplyCacheTXN so we
+ * don't need to do a full hash-lookup if the same xid is used
+ * sequentially. Them being used multiple times that way is rather frequent.
+ */
+void
+ApplyCacheAddChange(ApplyCache*, TransactionId, XLogRecPtr lsn, ApplyCacheChange*);
+
+/*
+ *
+ */
+void
+ApplyCacheCommit(ApplyCache*, TransactionId, XLogRecPtr lsn);
+
+void
+ApplyCacheCommitChild(ApplyCache*, TransactionId, TransactionId, XLogRecPtr lsn);
+
+void
+ApplyCacheAbort(ApplyCache*, TransactionId, XLogRecPtr lsn);
+
+#endif
--
1.7.10.rc3.3.g19a6c.dirty

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Andres Freund 2012-06-13 11:28:40 [PATCH 09/16] Decode wal (with wal_level=logical) into changes in an ApplyCache instance
Previous Message Andres Freund 2012-06-13 11:28:38 [PATCH 07/16] Log enough data into the wal to reconstruct logical changes from it if wal_level=logical