From 91c20ecb45a8627a9252ed589fe6b85927be8b45 Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Thu, 25 Mar 2021 11:59:13 +0100 Subject: [PATCH 2/6] fixup! Row filter for logical replication Allow replication from older PostgreSQL versions without prqual. --- src/backend/replication/logical/tablesync.c | 70 +++++++++++---------- 1 file changed, 37 insertions(+), 33 deletions(-) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 40d84dadb5..246510c82e 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -796,49 +796,53 @@ fetch_remote_table_info(char *nspname, char *relname, walrcv_clear_result(res); /* Get relation qual */ - resetStringInfo(&cmd); - appendStringInfo(&cmd, - "SELECT pg_get_expr(prqual, prrelid) " - " FROM pg_publication p " - " INNER JOIN pg_publication_rel pr " - " ON (p.oid = pr.prpubid) " - " WHERE pr.prrelid = %u " - " AND p.pubname IN (", lrel->remoteid); - - first = true; - foreach(lc, MySubscription->publications) + if (walrcv_server_version(wrconn) >= 140000) { - char *pubname = strVal(lfirst(lc)); + resetStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT pg_get_expr(prqual, prrelid) " + " FROM pg_publication p " + " INNER JOIN pg_publication_rel pr " + " ON (p.oid = pr.prpubid) " + " WHERE pr.prrelid = %u " + " AND p.pubname IN (", lrel->remoteid); + + first = true; + foreach(lc, MySubscription->publications) + { + char *pubname = strVal(lfirst(lc)); - if (first) - first = false; - else - appendStringInfoString(&cmd, ", "); + if (first) + first = false; + else + appendStringInfoString(&cmd, ", "); - appendStringInfoString(&cmd, quote_literal_cstr(pubname)); - } - appendStringInfoChar(&cmd, ')'); + appendStringInfoString(&cmd, quote_literal_cstr(pubname)); + } + appendStringInfoChar(&cmd, ')'); - res = walrcv_exec(wrconn, cmd.data, 1, qualRow); + res = walrcv_exec(wrconn, cmd.data, 1, qualRow); - if (res->status != WALRCV_OK_TUPLES) - ereport(ERROR, - (errmsg("could not fetch relation qualifications for table \"%s.%s\" from publisher: %s", - nspname, relname, res->err))); + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch relation qualifications for table \"%s.%s\" from publisher: %s", + nspname, relname, res->err))); - slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); - while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) - { - Datum rf = slot_getattr(slot, 1, &isnull); + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + Datum rf = slot_getattr(slot, 1, &isnull); - if (!isnull) - *qual = lappend(*qual, makeString(TextDatumGetCString(rf))); + if (!isnull) + *qual = lappend(*qual, makeString(TextDatumGetCString(rf))); - ExecClearTuple(slot); + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); } - ExecDropSingleTupleTableSlot(slot); - walrcv_clear_result(res); pfree(cmd.data); } -- 2.30.2