[PATCH 14/16] Add module to apply changes from an apply-cache using low-level functions

From: Andres Freund <andres(at)2ndquadrant(dot)com>
To: pgsql-hackers(at)postgresql(dot)org
Subject: [PATCH 14/16] Add module to apply changes from an apply-cache using low-level functions
Date: 2012-06-13 11:28:45
Message-ID: 1339586927-13156-14-git-send-email-andres@2ndquadrant.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

From: Andres Freund <andres(at)anarazel(dot)de>

We decided to use low level functions to do the apply instead of producing sql
statements containing the data (or using prepared statements) because both, the
text conversion and the full executor overhead aren't introduce a significant
overhead which is unneccesary if youre using the same version of pg on the same
architecture.

There are loads of use cases though that require different methods of applyin
though - so the part doing the applying from an ApplyCache is just a bunch of
well abstracted callbacks getting passed all the required knowledge to change
the data representation into other formats.

Missing:

- TOAST handling. For physical apply not much needs to be done because the
toast inserts will have been made beforehand. There needs to be an option in
ApplyCache that helps reassembling TOAST datums to make it easier to write
apply modules which convert to text.
---
src/backend/replication/logical/Makefile | 2 +-
src/backend/replication/logical/apply.c | 313 ++++++++++++++++++++++++++++++
src/include/replication/apply.h | 24 +++
3 files changed, 338 insertions(+), 1 deletion(-)
create mode 100644 src/backend/replication/logical/apply.c
create mode 100644 src/include/replication/apply.h

diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index c2d6d82..d0e0b13 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -14,6 +14,6 @@ include $(top_builddir)/src/Makefile.global

override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)

-OBJS = applycache.o decode.o logical.o
+OBJS = apply.o applycache.o decode.o logical.o

include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/apply.c b/src/backend/replication/logical/apply.c
new file mode 100644
index 0000000..646bd54
--- /dev/null
+++ b/src/backend/replication/logical/apply.c
@@ -0,0 +1,313 @@
+/*-------------------------------------------------------------------------
+ *
+ * logical.c
+ *
+ * Support functions for logical/multimaster replication
+ *
+ *
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical.c
+ *
+ */
+#include "postgres.h"
+
+#include "access/xact.h"
+#include "access/heapam.h"
+#include "access/genam.h"
+
+#include "catalog/pg_control.h"
+#include "catalog/index.h"
+
+#include "executor/executor.h"
+
+#include "replication/applycache.h"
+#include "replication/apply.h"
+
+#include "utils/rel.h"
+#include "utils/snapmgr.h"
+#include "utils/lsyscache.h"
+
+
+
+static void
+UserTableUpdateIndexes(Relation heapRel, HeapTuple heapTuple);
+
+
+void apply_begin_txn(ApplyCache* cache, ApplyCacheTXN* txn)
+{
+ ApplyApplyCacheState *state = cache->private_data;
+
+ state->original_resource_owner = CurrentResourceOwner;
+
+ PreventTransactionChain(true, "Apply Process cannot be started inside a txn");
+
+ StartTransactionCommand();
+
+ PushActiveSnapshot(GetTransactionSnapshot());
+}
+
+void apply_commit_txn(ApplyCache* cache, ApplyCacheTXN* txn)
+{
+ ApplyApplyCacheState *state = cache->private_data;
+
+ current_replication_origin_lsn = txn->lsn;
+
+ PopActiveSnapshot();
+ CommitTransactionCommand();
+
+
+ /*
+ * for some reason after (Start|Commit)TransactionCommand we loose our
+ * resource owner, restore it.
+ * XXX: is that correct?
+ */
+ CurrentResourceOwner = state->original_resource_owner;
+
+ current_replication_origin_lsn.xlogid = 0;
+ current_replication_origin_lsn.xrecoff = 0;
+}
+
+
+void apply_change(ApplyCache* cache, ApplyCacheTXN* txn, ApplyCacheTXN* subtxn, ApplyCacheChange* change)
+{
+ /* for inserting */
+ Relation tuple_rel;
+
+ tuple_rel = heap_open(HeapTupleGetOid(change->table), RowExclusiveLock);
+
+ switch (change->action)
+ {
+ case APPLY_CACHE_CHANGE_INSERT:
+ {
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "INSERT");
+#endif
+ simple_heap_insert(tuple_rel, &change->newtuple->tuple);
+
+ UserTableUpdateIndexes(tuple_rel, &change->newtuple->tuple);
+ break;
+ }
+ case APPLY_CACHE_CHANGE_UPDATE:
+ {
+ Oid indexoid = InvalidOid;
+ int16 pknratts;
+ int16 pkattnum[INDEX_MAX_KEYS];
+ Oid pktypoid[INDEX_MAX_KEYS];
+ Oid pkopclass[INDEX_MAX_KEYS];
+
+ ScanKeyData cur_skey[INDEX_MAX_KEYS];
+ int i;
+ bool isnull;
+ TupleDesc desc = RelationGetDescr(tuple_rel);
+
+ Relation index_rel;
+
+ HeapTuple old_tuple;
+ bool found = false;
+
+ IndexScanDesc scan;
+
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "UPDATE");
+#endif
+ MemSet(pkattnum, 0, sizeof(pkattnum));
+ MemSet(pktypoid, 0, sizeof(pktypoid));
+ MemSet(pkopclass, 0, sizeof(pkopclass));
+
+ relationFindPrimaryKey(tuple_rel, &indexoid, &pknratts, pkattnum, pktypoid, pkopclass);
+
+ if (!OidIsValid(indexoid))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("there is no primary key for table \"%s\"",
+ RelationGetRelationName(tuple_rel))));
+
+ index_rel = index_open(indexoid, AccessShareLock);
+
+ for (i = 0; i < pknratts; i++)
+ {
+ Oid operator;
+ Oid opfamily;
+ RegProcedure regop;
+
+ opfamily = get_opclass_family(pkopclass[i]);
+
+ operator = get_opfamily_member(opfamily, pktypoid[i], pktypoid[i], BTEqualStrategyNumber);
+
+ regop = get_opcode(operator);
+
+ ScanKeyInit(&cur_skey[i],
+ pkattnum[i],
+ BTEqualStrategyNumber,
+ regop,
+ fastgetattr(&change->newtuple->tuple, pkattnum[i], desc, &isnull));
+
+ Assert(!isnull);
+ }
+
+ scan = index_beginscan(tuple_rel, index_rel, GetTransactionSnapshot(),
+ pknratts, 0);
+ index_rescan(scan, cur_skey, pknratts, NULL, 0);
+
+ while ((old_tuple = index_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ if (found)
+ {
+ elog(ERROR, "WTF, more than one tuple found via pk???");
+ }
+ found = true;
+
+ simple_heap_update(tuple_rel, &old_tuple->t_self, &change->newtuple->tuple);
+ }
+
+ if (!found)
+ elog(ERROR, "could not find tuple to update");
+
+ index_endscan(scan);
+
+ if (!HeapTupleIsHeapOnly(&change->newtuple->tuple))
+ UserTableUpdateIndexes(tuple_rel, &change->newtuple->tuple);
+
+ heap_close(index_rel, NoLock);
+
+ break;
+ }
+ case APPLY_CACHE_CHANGE_DELETE:
+ {
+ Oid indexoid = InvalidOid;
+ int16 pknratts;
+ int16 pkattnum[INDEX_MAX_KEYS];
+ Oid pktypoid[INDEX_MAX_KEYS];
+ Oid pkopclass[INDEX_MAX_KEYS];
+
+ ScanKeyData cur_skey[INDEX_MAX_KEYS];
+ int i;
+ bool isnull;
+
+ Relation index_rel;
+
+ HeapTuple old_tuple;
+ bool found = false;
+
+ TupleDesc index_desc;
+
+ IndexScanDesc scan;
+
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "DELETE comming");
+#endif
+ MemSet(pkattnum, 0, sizeof(pkattnum));
+ MemSet(pktypoid, 0, sizeof(pktypoid));
+ MemSet(pkopclass, 0, sizeof(pkopclass));
+
+ relationFindPrimaryKey(tuple_rel, &indexoid, &pknratts, pkattnum, pktypoid, pkopclass);
+
+ if (!OidIsValid(indexoid))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("there is no primary key for table \"%s\"",
+ RelationGetRelationName(tuple_rel))));
+
+ index_rel = index_open(indexoid, AccessShareLock);
+ index_desc = RelationGetDescr(index_rel);
+
+ for (i = 0; i < pknratts; i++)
+ {
+ Oid operator;
+ Oid opfamily;
+ RegProcedure regop;
+
+ opfamily = get_opclass_family(pkopclass[i]);
+
+ operator = get_opfamily_member(opfamily, pktypoid[i], pktypoid[i], BTEqualStrategyNumber);
+
+ regop = get_opcode(operator);
+
+ ScanKeyInit(&cur_skey[i],
+ pkattnum[i],
+ BTEqualStrategyNumber,
+ regop,
+ fastgetattr(&change->oldtuple->tuple, i + 1, index_desc, &isnull));
+
+ Assert(!isnull);
+ }
+
+ scan = index_beginscan(tuple_rel, index_rel, GetTransactionSnapshot(),
+ pknratts, 0);
+ index_rescan(scan, cur_skey, pknratts, NULL, 0);
+
+
+ while ((old_tuple = index_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ if (found)
+ {
+ elog(ERROR, "WTF, more than one tuple found via pk???");
+ }
+ found = true;
+ simple_heap_delete(tuple_rel, &old_tuple->t_self);
+ }
+
+ if (!found)
+ elog(ERROR, "could not find tuple to update");
+
+ index_endscan(scan);
+
+ heap_close(index_rel, NoLock);
+
+ break;
+ }
+ }
+ /* FIXME: locking */
+
+ heap_close(tuple_rel, NoLock);
+ CommandCounterIncrement();
+}
+
+/*
+ * The state object used by CatalogOpenIndexes and friends is actually the
+ * same as the executor's ResultRelInfo, but we give it another type name
+ * to decouple callers from that fact.
+ */
+typedef struct ResultRelInfo *UserTableIndexState;
+
+static void
+UserTableUpdateIndexes(Relation heapRel, HeapTuple heapTuple)
+{
+ /* this is largely copied together from copy.c's CopyFrom */
+ EState *estate = CreateExecutorState();
+ ResultRelInfo *resultRelInfo;
+ List *recheckIndexes = NIL;
+ TupleDesc tupleDesc = RelationGetDescr(heapRel);
+
+ resultRelInfo = makeNode(ResultRelInfo);
+ resultRelInfo->ri_RangeTableIndex = 1; /* dummy */
+ resultRelInfo->ri_RelationDesc = heapRel;
+ resultRelInfo->ri_TrigInstrument = NULL;
+
+ ExecOpenIndices(resultRelInfo);
+
+ estate->es_result_relations = resultRelInfo;
+ estate->es_num_result_relations = 1;
+ estate->es_result_relation_info = resultRelInfo;
+
+ if (resultRelInfo->ri_NumIndices > 0)
+ {
+ TupleTableSlot *slot = ExecInitExtraTupleSlot(estate);
+ ExecSetSlotDescriptor(slot, tupleDesc);
+ ExecStoreTuple(heapTuple, slot, InvalidBuffer, false);
+
+ recheckIndexes = ExecInsertIndexTuples(slot, &heapTuple->t_self,
+ estate);
+ }
+
+ ExecResetTupleTable(estate->es_tupleTable, false);
+
+ ExecCloseIndices(resultRelInfo);
+
+ FreeExecutorState(estate);
+ /* FIXME: recheck the indexes */
+ list_free(recheckIndexes);
+}
diff --git a/src/include/replication/apply.h b/src/include/replication/apply.h
new file mode 100644
index 0000000..3b818c0
--- /dev/null
+++ b/src/include/replication/apply.h
@@ -0,0 +1,24 @@
+/*
+ * apply.h
+ *
+ * PostgreSQL logical replay
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ * src/include/replication/logical/replay.h
+ */
+#ifndef APPLY_H
+#define APPLY_H
+
+#include "utils/resowner.h"
+
+typedef struct ApplyApplyCacheState
+{
+ ResourceOwner original_resource_owner;
+} ApplyApplyCacheState;
+
+void apply_begin_txn(ApplyCache* cache, ApplyCacheTXN* txn);
+void apply_commit_txn(ApplyCache* cache, ApplyCacheTXN* txn);
+void apply_change(ApplyCache* cache, ApplyCacheTXN* txn, ApplyCacheTXN* subtxn, ApplyCacheChange* change);
+
+#endif
--
1.7.10.rc3.3.g19a6c.dirty

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Andres Freund 2012-06-13 11:28:46 [PATCH 15/16] Activate/Implement the "apply process" which applies received updates from another node
Previous Message Andres Freund 2012-06-13 11:28:44 [PATCH 13/16] Introduction of pair of logical walreceiver/sender