Re: Logical Replication WIP

From: Andres Freund <andres(at)anarazel(dot)de>
To: Petr Jelinek <petr(at)2ndquadrant(dot)com>
Cc: Peter Eisentraut <peter(dot)eisentraut(at)2ndquadrant(dot)com>, Craig Ringer <craig(at)2ndquadrant(dot)com>, Simon Riggs <simon(at)2ndquadrant(dot)com>, PostgreSQL-development <pgsql-hackers(at)postgresql(dot)org>, Erik Rijkers <er(at)xs4all(dot)nl>, Steve Singer <steve(at)ssinger(dot)info>
Subject: Re: Logical Replication WIP
Date: 2016-09-14 16:21:28
Message-ID: 20160914162128.pemvaxjvd47qjrkt@alap3.anarazel.de
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

(continuing, uh, a bit happier)

On 2016-09-09 00:59:26 +0200, Petr Jelinek wrote:

> +/*
> + * Relcache invalidation callback for our relation map cache.
> + */
> +static void
> +logicalreprelmap_invalidate_cb(Datum arg, Oid reloid)
> +{
> + LogicalRepRelMapEntry *entry;
> +
> + /* Just to be sure. */
> + if (LogicalRepRelMap == NULL)
> + return;
> +
> + if (reloid != InvalidOid)
> + {
> + HASH_SEQ_STATUS status;
> +
> + hash_seq_init(&status, LogicalRepRelMap);
> +
> + /* TODO, use inverse lookup hastable? */

*hashtable

> + while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
> + {
> + if (entry->reloid == reloid)
> + entry->reloid = InvalidOid;

can't we break here?

> +/*
> + * Initialize the relation map cache.
> + */
> +static void
> +remoterelmap_init(void)
> +{
> + HASHCTL ctl;
> +
> + /* Make sure we've initialized CacheMemoryContext. */
> + if (CacheMemoryContext == NULL)
> + CreateCacheMemoryContext();
> +
> + /* Initialize the hash table. */
> + MemSet(&ctl, 0, sizeof(ctl));
> + ctl.keysize = sizeof(uint32);
> + ctl.entrysize = sizeof(LogicalRepRelMapEntry);
> + ctl.hcxt = CacheMemoryContext;

Wonder if this (and similar code earlier) should try to do everything in
a sub-context of CacheMemoryContext instead. That'd make some issues
easier to track down.

> +/*
> + * Open the local relation associated with the remote one.
> + */
> +static LogicalRepRelMapEntry *
> +logicalreprel_open(uint32 remoteid, LOCKMODE lockmode)
> +{
> + LogicalRepRelMapEntry *entry;
> + bool found;
> +
> + if (LogicalRepRelMap == NULL)
> + remoterelmap_init();
> +
> + /* Search for existing entry. */
> + entry = hash_search(LogicalRepRelMap, (void *) &remoteid,
> + HASH_FIND, &found);
> +
> + if (!found)
> + elog(FATAL, "cache lookup failed for remote relation %u",
> + remoteid);
> +
> + /* Need to update the local cache? */
> + if (!OidIsValid(entry->reloid))
> + {
> + Oid nspid;
> + Oid relid;
> + int i;
> + TupleDesc desc;
> + LogicalRepRelation *remoterel;
> +
> + remoterel = &entry->remoterel;
> +
> + nspid = LookupExplicitNamespace(remoterel->nspname, false);
> + if (!OidIsValid(nspid))
> + ereport(FATAL,
> + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> + errmsg("the logical replication target %s not found",
> + quote_qualified_identifier(remoterel->nspname,
remoterel->relname))));
> + relid = get_relname_relid(remoterel->relname, nspid);
> + if (!OidIsValid(relid))
> + ereport(FATAL,
> + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> + errmsg("the logical replication target %s not found",
> + quote_qualified_identifier(remoterel->nspname,
> + remoterel->relname))));
> +
> + entry->rel = heap_open(relid, lockmode);

This seems rather racy. I think this really instead needs something akin
to RangeVarGetRelidExtended().

> +/*
> + * Executor state preparation for evaluation of constraint expressions,
> + * indexes and triggers.
> + *
> + * This is based on similar code in copy.c
> + */
> +static EState *
> +create_estate_for_relation(LogicalRepRelMapEntry *rel)
> +{
> + EState *estate;
> + ResultRelInfo *resultRelInfo;
> + RangeTblEntry *rte;
> +
> + estate = CreateExecutorState();
> +
> + rte = makeNode(RangeTblEntry);
> + rte->rtekind = RTE_RELATION;
> + rte->relid = RelationGetRelid(rel->rel);
> + rte->relkind = rel->rel->rd_rel->relkind;
> + estate->es_range_table = list_make1(rte);
> +
> + resultRelInfo = makeNode(ResultRelInfo);
> + InitResultRelInfo(resultRelInfo, rel->rel, 1, 0);
> +
> + estate->es_result_relations = resultRelInfo;
> + estate->es_num_result_relations = 1;
> + estate->es_result_relation_info = resultRelInfo;
> +
> + /* Triggers might need a slot */
> + if (resultRelInfo->ri_TrigDesc)
> + estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
> +
> + return estate;
> +}

Ugh, we do this for every single change? That's pretty darn heavy.

> +/*
> + * Check if the local attribute is present in relation definition used
> + * by upstream and hence updated by the replication.
> + */
> +static bool
> +physatt_in_attmap(LogicalRepRelMapEntry *rel, int attid)
> +{
> + AttrNumber i;
> +
> + /* Fast path for tables that are same on upstream and downstream. */
> + if (attid < rel->remoterel.natts && rel->attmap[attid] == attid)
> + return true;
> +
> + /* Try to find the attribute in the map. */
> + for (i = 0; i < rel->remoterel.natts; i++)
> + if (rel->attmap[i] == attid)
> + return true;
> +
> + return false;
> +}

Shouldn't we rather try to keep an attribute map that always can map
remote attribute numbers to local ones? That doesn't seem hard on a
first blush? But I might be missing something here.

> +/*
> + * Executes default values for columns for which we can't map to remote
> + * relation columns.
> + *
> + * This allows us to support tables which have more columns on the downstream
> + * than on the upsttream.
> + */

Typo: upsttream.

> +static void
> +FillSlotDefaults(LogicalRepRelMapEntry *rel, EState *estate,
> + TupleTableSlot *slot)
> +{

Why is this using a different naming scheme?

> +/*
> + * Handle COMMIT message.
> + *
> + * TODO, support tracking of multiple origins
> + */
> +static void
> +handle_commit(StringInfo s)
> +{
> + XLogRecPtr commit_lsn;
> + XLogRecPtr end_lsn;
> + TimestampTz commit_time;
> +
> + logicalrep_read_commit(s, &commit_lsn, &end_lsn, &commit_time);

Perhaps this (and related routines) should rather be
LogicalRepCommitdata commit_data;
logicalrep_read_commit(s, &commit_data);
etc? That way the data can transparently be enhanced.

> + Assert(commit_lsn == replorigin_session_origin_lsn);
> + Assert(commit_time == replorigin_session_origin_timestamp);
> +
> + if (IsTransactionState())
> + {
> + FlushPosition *flushpos;
> +
> + CommitTransactionCommand();
> + MemoryContextSwitchTo(CacheMemoryContext);
> +
> + /* Track commit lsn */
> + flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
> + flushpos->local_end = XactLastCommitEnd;
> + flushpos->remote_end = end_lsn;
> +
> + dlist_push_tail(&lsn_mapping, &flushpos->node);
> + MemoryContextSwitchTo(ApplyContext);

Seems like it should be in a separate function.

> +/*
> + * Handle INSERT message.
> + */
> +static void
> +handle_insert(StringInfo s)
> +{
> + LogicalRepRelMapEntry *rel;
> + LogicalRepTupleData newtup;
> + LogicalRepRelId relid;
> + EState *estate;
> + TupleTableSlot *remoteslot;
> + MemoryContext oldctx;
> +
> + ensure_transaction();
> +
> + relid = logicalrep_read_insert(s, &newtup);
> + rel = logicalreprel_open(relid, RowExclusiveLock);
> +
> + /* Initialize the executor state. */
> + estate = create_estate_for_relation(rel);
> + remoteslot = ExecInitExtraTupleSlot(estate);
> + ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->rel));

This seems incredibly expensive for replicating a lot of rows.

> + /* Process and store remote tuple in the slot */
> + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
> + SlotStoreCStrings(remoteslot, newtup.values);
> + FillSlotDefaults(rel, estate, remoteslot);
> + MemoryContextSwitchTo(oldctx);
> +
> + PushActiveSnapshot(GetTransactionSnapshot());
> + ExecOpenIndices(estate->es_result_relation_info, false);
> +
> + ExecInsert(NULL, /* mtstate is only used for onconflict handling which we don't support atm */
> + remoteslot,
> + remoteslot,
> + NIL,
> + ONCONFLICT_NONE,
> + estate,
> + false);

I have *severe* doubts about just using the (newly) exposed functions
1:1 here.

> +/*
> + * Search the relation 'rel' for tuple using the replication index.
> + *
> + * If a matching tuple is found lock it with lockmode, fill the slot with its
> + * contents and return true, return false is returned otherwise.
> + */
> +static bool
> +tuple_find_by_replidx(Relation rel, LockTupleMode lockmode,
> + TupleTableSlot *searchslot, TupleTableSlot *slot)
> +{
> + HeapTuple scantuple;
> + ScanKeyData skey[INDEX_MAX_KEYS];
> + IndexScanDesc scan;
> + SnapshotData snap;
> + TransactionId xwait;
> + Oid idxoid;
> + Relation idxrel;
> + bool found;
> +
> + /* Open REPLICA IDENTITY index.*/
> + idxoid = RelationGetReplicaIndex(rel);
> + if (!OidIsValid(idxoid))
> + {
> + elog(ERROR, "could not find configured replica identity for table \"%s\"",
> + RelationGetRelationName(rel));
> + return false;
> + }
> + idxrel = index_open(idxoid, RowExclusiveLock);
> +
> + /* Start an index scan. */
> + InitDirtySnapshot(snap);
> + scan = index_beginscan(rel, idxrel, &snap,
> + RelationGetNumberOfAttributes(idxrel),
> + 0);
> +
> + /* Build scan key. */
> + build_replindex_scan_key(skey, rel, idxrel, searchslot);
> +
> +retry:
> + found = false;
> +
> + index_rescan(scan, skey, RelationGetNumberOfAttributes(idxrel), NULL, 0);
> +
> + /* Try to find the tuple */
> + if ((scantuple = index_getnext(scan, ForwardScanDirection)) != NULL)
> + {
> + found = true;
> + ExecStoreTuple(scantuple, slot, InvalidBuffer, false);
> + ExecMaterializeSlot(slot);
> +
> + xwait = TransactionIdIsValid(snap.xmin) ?
> + snap.xmin : snap.xmax;
> +
> + /*
> + * If the tuple is locked, wait for locking transaction to finish
> + * and retry.
> + */
> + if (TransactionIdIsValid(xwait))
> + {
> + XactLockTableWait(xwait, NULL, NULL, XLTW_None);
> + goto retry;
> + }
> + }

Hm. So we potentially find multiple tuples here, and lock all of
them. but then only use one for the update.

> +static List *
> +get_subscription_list(void)
> +{
> + List *res = NIL;
> + Relation rel;
> + HeapScanDesc scan;
> + HeapTuple tup;
> + MemoryContext resultcxt;
> +
> + /* This is the context that we will allocate our output data in */
> + resultcxt = CurrentMemoryContext;
> +
> + /*
> + * Start a transaction so we can access pg_database, and get a snapshot.
> + * We don't have a use for the snapshot itself, but we're interested in
> + * the secondary effect that it sets RecentGlobalXmin. (This is critical
> + * for anything that reads heap pages, because HOT may decide to prune
> + * them even if the process doesn't attempt to modify any tuples.)
> + */

> + StartTransactionCommand();
> + (void) GetTransactionSnapshot();
> +
> + rel = heap_open(SubscriptionRelationId, AccessShareLock);
> + scan = heap_beginscan_catalog(rel, 0, NULL);
> +
> + while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
> + {
> + Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
> + Subscription *sub;
> + MemoryContext oldcxt;
> +
> + /*
> + * Allocate our results in the caller's context, not the
> + * transaction's. We do this inside the loop, and restore the original
> + * context at the end, so that leaky things like heap_getnext() are
> + * not called in a potentially long-lived context.
> + */
> + oldcxt = MemoryContextSwitchTo(resultcxt);
> +
> + sub = (Subscription *) palloc(sizeof(Subscription));
> + sub->oid = HeapTupleGetOid(tup);
> + sub->dbid = subform->subdbid;
> + sub->enabled = subform->subenabled;
> +
> + /* We don't fill fields we are not intereste in. */
> + sub->name = NULL;
> + sub->conninfo = NULL;
> + sub->slotname = NULL;
> + sub->publications = NIL;
> +
> + res = lappend(res, sub);
> + MemoryContextSwitchTo(oldcxt);
> + }
> +
> + heap_endscan(scan);
> + heap_close(rel, AccessShareLock);
> +
> + CommitTransactionCommand();

Hm. this doesn't seem quite right from a locking pov. What if, in the
middle of this, a new subscription is created?

> +void
> +logicalrep_worker_stop(LogicalRepWorker *worker)
> +{
> + Assert(LWLockHeldByMe(LogicalRepWorkerLock));
> +
> + /* Check that the worker is up and what we expect. */
> + if (!worker->proc)
> + return;
> + if (!IsBackendPid(worker->proc->pid))
> + return;
> +
> + /* Terminate the worker. */
> + kill(worker->proc->pid, SIGTERM);
> +
> + LWLockRelease(LogicalRepLauncherLock);
> +
> + /* Wait for it to detach. */
> + for (;;)
> + {
> + int rc = WaitLatch(&MyProc->procLatch,
> + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
> + 1000L);
> +
> + /* emergency bailout if postmaster has died */
> + if (rc & WL_POSTMASTER_DEATH)
> + proc_exit(1);
> +
> + ResetLatch(&MyProc->procLatch);
> +
> + CHECK_FOR_INTERRUPTS();
> +
> + if (!worker->proc)
> + return;
> + }
> +}

indentation here seems scfrewed.

> +static void
> +xacthook_signal_launcher(XactEvent event, void *arg)
> +{
> + switch (event)
> + {
> + case XACT_EVENT_COMMIT:
> + if (xacthook_do_signal_launcher)
> + ApplyLauncherWakeup();
> + break;
> + default:
> + /* We're not interested in other tx events */
> + break;
> + }
> +}

> +void
> +ApplyLauncherWakeupOnCommit(void)
> +{
> + if (!xacthook_do_signal_launcher)
> + {
> + RegisterXactCallback(xacthook_signal_launcher, NULL);
> + xacthook_do_signal_launcher = true;
> + }
> +}

Hm. This seems like it really should be an AtCommit_* routine instead.
This also needs more docs.

Hadn't I previously read about always streaming data to disk first?

> @@ -0,0 +1,674 @@
> +/*-------------------------------------------------------------------------
> + * tablesync.c
> + * PostgreSQL logical replication
> + *
> + * Copyright (c) 2012-2016, PostgreSQL Global Development Group
> + *
> + * IDENTIFICATION
> + * src/backend/replication/logical/tablesync.c
> + *
> + * NOTES
> + * This file contains code for initial table data synchronization for
> + * logical replication.
> + *
> + * The initial data synchronization is done separately for each table,
> + * in separate apply worker that only fetches the initial snapshot data
> + * from the provider and then synchronizes the position in stream with
> + * the main apply worker.

Why? I guess that's because it allows to incrementally add tables, with
acceptable overhead.

> + * The stream position synchronization works in multiple steps.
> + * - sync finishes copy and sets table state as SYNCWAIT and waits
> + * for state to change in a loop
> + * - apply periodically checks unsynced tables for SYNCWAIT, when it
> + * appears it will compare its position in the stream with the
> + * SYNCWAIT position and decides to either set it to CATCHUP when
> + * the apply was infront (and wait for the sync to do the catchup),
> + * or set the state to SYNCDONE if the sync was infront or in case
> + * both sync and apply are at the same position it will set it to
> + * READY and stops tracking it

I'm not quite following here.

> + * - if the state was set to CATCHUP sync will read the stream and
> + * apply changes until it catches up to the specified stream
> + * position and then sets state to READY and signals apply that it
> + * can stop waiting and exits, if the state was set to something
> + * else than CATCHUP the sync process will simply end
> + * - if the state was set to SYNCDONE by apply, the apply will
> + * continue tracking the table until it reaches the SYNCDONE stream
> + * position at which point it sets state to READY and stops tracking
> + *
> + * Example flows look like this:
> + * - Apply is infront:
> + * sync:8 -> set SYNCWAIT
> + * apply:10 -> set CATCHUP
> + * sync:10 -> set ready
> + * exit
> + * apply:10
> + * stop tracking
> + * continue rep
> + * - Sync infront:
> + * sync:10
> + * set SYNCWAIT
> + * apply:8
> + * set SYNCDONE
> + * sync:10
> + * exit
> + * apply:10
> + * set READY
> + * stop tracking
> + * continue rep

This definitely needs to be expanded a bit. Where are we tracking how
far replication has progressed on individual tables? Are we creating new
slots for syncing? Is there any parallelism in syncing?

> +/*
> + * Exit routine for synchronization worker.
> + */
> +static void
> +finish_sync_worker(char *slotname)
> +{
> + LogicalRepWorker *worker;
> + RepOriginId originid;
> + MemoryContext oldctx = CurrentMemoryContext;
> +
> + /*
> + * Drop the replication slot on remote server.
> + * We want to continue even in the case that the slot on remote side
> + * is already gone. This means that we can leave slot on the remote
> + * side but that can happen for other reasons as well so we can't
> + * really protect against that.
> + */
> + PG_TRY();
> + {
> + wrcapi->drop_slot(wrchandle, slotname);
> + }
> + PG_CATCH();
> + {
> + MemoryContext ectx;
> + ErrorData *edata;
> +
> + ectx = MemoryContextSwitchTo(oldctx);
> + /* Save error info */
> + edata = CopyErrorData();
> + MemoryContextSwitchTo(ectx);
> + FlushErrorState();
> +
> + ereport(WARNING,
> + (errmsg("there was problem dropping the replication slot "
> + "\"%s\" on provider", slotname),
> + errdetail("The error was: %s", edata->message),
> + errhint("You may have to drop it manually")));
> + FreeErrorData(edata);

ISTM we really should rather return success/failure here, and not throw
an error inside the libpqwalreceiver stuff. I kind of wonder if we
actually can get rid of this indirection.

> + /* Find the main apply worker and signal it. */
> + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
> + worker = logicalrep_worker_find(MyLogicalRepWorker->subid, InvalidOid);
> + if (worker && worker->proc)
> + SetLatch(&worker->proc->procLatch);
> + LWLockRelease(LogicalRepWorkerLock);

I'd rather do the SetLatch outside of the critical section.

> +static bool
> +wait_for_sync_status_change(TableState *tstate)
> +{
> + int rc;
> + char state = tstate->state;
> +
> + while (!got_SIGTERM)
> + {
> + StartTransactionCommand();
> + tstate->state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
> + tstate->relid,
> + &tstate->lsn,
> + true);
> + CommitTransactionCommand();
> +
> + /* Status record was removed. */
> + if (tstate->state == SUBREL_STATE_UNKNOWN)
> + return false;
> +
> + if (tstate->state != state)
> + return true;
> +
> + rc = WaitLatch(&MyProc->procLatch,
> + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
> + 10000L);
> +
> + /* emergency bailout if postmaster has died */
> + if (rc & WL_POSTMASTER_DEATH)
> + proc_exit(1);
> +
> + ResetLatch(&MyProc->procLatch);

broken indentation.

> +/*
> + * Read the state of the tables in the subscription and update our table
> + * state list.
> + */
> +static void
> +reread_sync_state(Oid relid)
> +{
> + dlist_mutable_iter iter;
> + Relation rel;
> + HeapTuple tup;
> + ScanKeyData skey[2];
> + HeapScanDesc scan;
> +
> + /* Clean the old list. */
> + dlist_foreach_modify(iter, &table_states)
> + {
> + TableState *tstate = dlist_container(TableState, node, iter.cur);
> +
> + dlist_delete(iter.cur);
> + pfree(tstate);
> + }
> +
> + /*
> + * Fetch all the subscription relation states that are not marked as
> + * ready and push them into our table state tracking list.
> + */
> + rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
> +
> + ScanKeyInit(&skey[0],
> + Anum_pg_subscription_rel_subid,
> + BTEqualStrategyNumber, F_OIDEQ,
> + ObjectIdGetDatum(MyLogicalRepWorker->subid));
> +
> + if (OidIsValid(relid))
> + {
> + ScanKeyInit(&skey[1],
> + Anum_pg_subscription_rel_subrelid,
> + BTEqualStrategyNumber, F_OIDEQ,
> + ObjectIdGetDatum(relid));
> + }
> + else
> + {
> + ScanKeyInit(&skey[1],
> + Anum_pg_subscription_rel_substate,
> + BTEqualStrategyNumber, F_CHARNE,
> + CharGetDatum(SUBREL_STATE_READY));
> + }
> +
> + scan = heap_beginscan_catalog(rel, 2, skey);

Hm. So this is a seqscan. Shouldn't we make this use an index (depending
on which branch is taken above)?

> + while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
> + {
> + Form_pg_subscription_rel subrel;
> + TableState *tstate;
> + MemoryContext oldctx;
> +
> + subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
> +
> + /* Allocate the tracking info in a permament memory context. */

s/permament/permanent/

> +/*
> + * Handle table synchronization cooperation from the synchroniation
> + * worker.
> + */
> +static void
> +process_syncing_tables_sync(char *slotname, XLogRecPtr end_lsn)
> +{
> + TableState *tstate;
> + TimeLineID tli;
> +
> + Assert(!IsTransactionState());
> +
> + /*
> + * Synchronization workers don't keep track of all synchronization
> + * tables, they only care about their table.
> + */
> + if (!table_states_valid)
> + {
> + StartTransactionCommand();
> + reread_sync_state(MyLogicalRepWorker->relid);
> + CommitTransactionCommand();
> + }
> +
> + /* Somebody removed table underneath this worker, nothing more to do. */
> + if (dlist_is_empty(&table_states))
> + {
> + wrcapi->endstreaming(wrchandle, &tli);
> + finish_sync_worker(slotname);
> + }
> +
> + /* Check if we are done with catchup now. */
> + tstate = dlist_container(TableState, node, dlist_head_node(&table_states));
> + if (tstate->state == SUBREL_STATE_CATCHUP)
> + {
> + Assert(tstate->lsn != InvalidXLogRecPtr);
> +
> + if (tstate->lsn == end_lsn)
> + {
> + tstate->state = SUBREL_STATE_READY;
> + tstate->lsn = InvalidXLogRecPtr;
> + /* Update state of the synchronization. */
> + StartTransactionCommand();
> + SetSubscriptionRelState(MyLogicalRepWorker->subid,
> + tstate->relid, tstate->state,
> + tstate->lsn);
> + CommitTransactionCommand();
> +
> + wrcapi->endstreaming(wrchandle, &tli);
> + finish_sync_worker(slotname);
> + }
> + return;
> + }
> +}

The return inside the if is a bit weird. Makes one think it might be a
loop or such.

> +/*
> + * Handle table synchronization cooperation from the apply worker.
> + */
> +static void
> +process_syncing_tables_apply(char *slotname, XLogRecPtr end_lsn)
> +{
> + dlist_mutable_iter iter;
> +
> + Assert(!IsTransactionState());
> +
> + if (!table_states_valid)
> + {
> + StartTransactionCommand();
> + reread_sync_state(InvalidOid);
> + CommitTransactionCommand();
> + }

So this pattern is repeated a bunch of times, maybe we can encapsulate
that somewhat? Maybe like ensure_sync_state_valid() or such?

> + dlist_foreach_modify(iter, &table_states)
> + {
> + TableState *tstate = dlist_container(TableState, node, iter.cur);
> + bool start_worker;
> + LogicalRepWorker *worker;
> +
> + /*
> + * When the synchronization process is at the cachup phase we need

s/cachup/catchup/

> + * to ensure that we are not behind it (it's going to wait at this
> + * point for the change of state). Once we are infront or at the same
> + * position as the synchronization proccess we can signal it to
> + * finish the catchup.
> + */
> + if (tstate->state == SUBREL_STATE_SYNCWAIT)
> + {
> + if (end_lsn > tstate->lsn)
> + {
> + /*
> + * Apply is infront, tell sync to catchup. and wait until
> + * it does.
> + */
> + tstate->state = SUBREL_STATE_CATCHUP;
> + tstate->lsn = end_lsn;
> + StartTransactionCommand();
> + SetSubscriptionRelState(MyLogicalRepWorker->subid,
> + tstate->relid, tstate->state,
> + tstate->lsn);
> + CommitTransactionCommand();
> +
> + /* Signal the worker as it may be waiting for us. */
> + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
> + worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
> + tstate->relid);
> + if (worker && worker->proc)
> + SetLatch(&worker->proc->procLatch);
> + LWLockRelease(LogicalRepWorkerLock);

Different parts of this file use different lock level to set the
latch. Why?

> + if (wait_for_sync_status_change(tstate))
> + Assert(tstate->state == SUBREL_STATE_READY);
> + }
> + else
> + {
> + /*
> + * Apply is either behind in which case sync worker is done
> + * but apply needs to keep tracking the table until it
> + * catches up to where sync finished.
> + * Or apply and sync are at the same position in which case
> + * table can be switched to standard replication mode
> + * immediately.
> + */
> + if (end_lsn < tstate->lsn)
> + tstate->state = SUBREL_STATE_SYNCDONE;
> + else
> + tstate->state = SUBREL_STATE_READY;
> +

What I'm failing to understand is how this can be done under
concurrency. You probably thought about this, but it should really be
explained somewhere.

> + StartTransactionCommand();
> + SetSubscriptionRelState(MyLogicalRepWorker->subid,
> + tstate->relid, tstate->state,
> + tstate->lsn);
> + CommitTransactionCommand();
> +
> + /* Signal the worker as it may be waiting for us. */
> + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
> + worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
> + tstate->relid);
> + if (worker && worker->proc)
> + SetLatch(&worker->proc->procLatch);
> + LWLockRelease(LogicalRepWorkerLock);

Oh, and again, please set latches outside of the lock.

> + else if (tstate->state == SUBREL_STATE_SYNCDONE &&
> + end_lsn >= tstate->lsn)
> + {
> + /*
> + * Apply catched up to the position where table sync finished,
> + * mark the table as ready for normal replication.
> + */

Sentence needs to be rephrased a bit.

> + /*
> + * In case table is supposed to be synchronizing but the
> + * synchronization worker is not running, start it.
> + * Limit the number of launched workers here to one (for now).
> + */

Hm. That seems problematic for online upgrade type cases, we might never
be catch up that way...

> +/*
> + * Start syncing the table in the sync worker.
> + */
> +char *
> +LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
> +{
> + StringInfoData s;
> + TableState tstate;
> + MemoryContext oldctx;
> + char *slotname;
> +
> + /* Check the state of the table synchronization. */
> + StartTransactionCommand();
> + tstate.relid = MyLogicalRepWorker->relid;
> + tstate.state = GetSubscriptionRelState(MySubscription->oid, tstate.relid,
> + &tstate.lsn, false);
> +
> + /*
> + * Build unique slot name.
> + * TODO: protect against too long slot name.
> + */
> + oldctx = MemoryContextSwitchTo(CacheMemoryContext);
> + initStringInfo(&s);
> + appendStringInfo(&s, "%s_sync_%s", MySubscription->slotname,
> + get_rel_name(tstate.relid));
> + slotname = s.data;

Is this memory freed somewhere?

> + /*
> + * We want to do the table data sync in single
> + * transaction so do not close the transaction opened
> + * above.
> + * There will be no BEGIN or COMMIT messages coming via
> + * logical replication while the copy table command is
> + * running so start the transaction here.
> + * Note the memory context for data handling will still
> + * be done using ensure_transaction called by the insert
> + * handler.
> + */
> + StartTransactionCommand();
> +
> + /*
> + * Don't allow parallel access other than SELECT while
> + * the initial contents are being copied.
> + */
> + rel = heap_open(tstate.relid, ExclusiveLock);

Why do we want to allow access at all?

> @@ -87,6 +92,8 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
> cb->commit_cb = pgoutput_commit_txn;
> cb->filter_by_origin_cb = pgoutput_origin_filter;
> cb->shutdown_cb = pgoutput_shutdown;
> + cb->tuple_cb = pgoutput_tuple;
> + cb->list_tables_cb = pgoutput_list_tables;
> }

What are these new, and undocumented callbacks actually doing? And why
is this integrated into logical decoding?

> /*
> + * Handle LIST_TABLES command.
> + */
> +static void
> +SendTableList(ListTablesCmd *cmd)
> +{

Ugh.

I really dislike this kind of command. I think we should instead change
things around, allowing to issue normal SQL via the replication
command. We'll have to error out for running sql for non-database
connected replication connections, but that seems fine.

Andres

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Claudio Freire 2016-09-14 16:42:38 Re: Vacuum: allow usage of more than 1GB of work mem
Previous Message Pavan Deolasee 2016-09-14 16:19:43 Re: Vacuum: allow usage of more than 1GB of work mem