From 7a45713c4161e4efdf82148f56af3328283b20e9 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Fri, 27 Dec 2019 23:56:04 +0100 Subject: [PATCH 12/17] fixup: add proper schema tracking --- src/backend/replication/pgoutput/pgoutput.c | 45 ++++++++++++++++++++- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 8490ea4717..0148f4c01e 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -82,6 +82,8 @@ typedef struct RelationSyncEntry Oid relid; /* relation oid */ TransactionId xid; /* transaction that created the record */ bool schema_sent; /* did we send the schema? */ + List *streamed_txns; /* streamed toplevel transactions with + * this schema */ bool replicate_valid; PublicationActions pubactions; } RelationSyncEntry; @@ -96,6 +98,11 @@ static void rel_sync_cache_relation_cb(Datum arg, Oid relid); static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue); +static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, + TransactionId xid); +static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, + TransactionId xid); + /* * Specify output plugin callbacks */ @@ -366,6 +373,7 @@ maybe_send_schema(LogicalDecodingContext *ctx, { bool schema_sent; TransactionId xid = InvalidTransactionId; + TransactionId topxid = InvalidTransactionId; /* * Remember XID of the (sub)transaction for the change. We don't care if @@ -378,6 +386,11 @@ maybe_send_schema(LogicalDecodingContext *ctx, if (in_streaming) xid = change->txn->xid; + if (change->txn->toptxn) + topxid = change->txn->toptxn->xid; + else + topxid = xid; + /* * Do we need to send the schema? We do track streamed transactions * separately, because those may not be applied later (and the regular @@ -391,7 +404,7 @@ maybe_send_schema(LogicalDecodingContext *ctx, * occur when streaming already started, so we have to track new catalog * changes somehow. */ - schema_sent = txn->is_schema_sent; + schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid); } else schema_sent = relentry->schema_sent; @@ -432,7 +445,7 @@ maybe_send_schema(LogicalDecodingContext *ctx, relentry->xid = change->txn->xid; if (in_streaming) - txn->is_schema_sent = true; + set_schema_sent_in_streamed_txn(relentry, topxid); else relentry->schema_sent = true; } @@ -759,6 +772,34 @@ init_rel_sync_cache(MemoryContext cachectx) (Datum) 0); } +/* + * We expect relatively small number of streamed transactions. + */ +static bool +get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid) +{ + ListCell *lc; + foreach (lc, entry->streamed_txns) + { + if (xid == lfirst_int(lc)) + return true; + } + + return false; +} + +static void +set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid) +{ + MemoryContext oldctx; + + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + + entry->streamed_txns = lappend_int(entry->streamed_txns, xid); + + MemoryContextSwitchTo(oldctx); +} + /* * Find or create entry in the relation schema cache. */ -- 2.21.0