Re: Column Filtering in Logical Replication

From: Tomas Vondra <tomas(dot)vondra(at)enterprisedb(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
Cc: Peter Eisentraut <peter(dot)eisentraut(at)enterprisedb(dot)com>, Alvaro Herrera <alvherre(at)alvh(dot)no-ip(dot)org>, Justin Pryzby <pryzby(at)telsasoft(dot)com>, Rahila Syed <rahilasyed90(at)gmail(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>
Subject: Re: Column Filtering in Logical Replication
Date: 2022-03-11 01:56:41
Message-ID: 822a8e40-287c-59ff-0ea9-35eb759f4fe6@enterprisedb.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On 3/10/22 20:10, Tomas Vondra wrote:
>
>
> On 3/10/22 19:17, Tomas Vondra wrote:
>> On 3/9/22 11:12, houzj(dot)fnst(at)fujitsu(dot)com wrote:
>>> Hi,
>>>
>>> Here are some tests and results about the table sync query of
>>> column filter patch and row filter.
>>>
>>> 1) multiple publications which publish schema of parent table and partition.
>>> ----pub
>>> create schema s1;
>>> create table s1.t (a int, b int, c int) partition by range (a);
>>> create table t_1 partition of s1.t for values from (1) to (10);
>>> create publication pub1 for all tables in schema s1;
>>> create publication pub2 for table t_1(b);
>>>
>>> ----sub
>>> - prepare tables
>>> CREATE SUBSCRIPTION sub CONNECTION 'port=10000 dbname=postgres' PUBLICATION pub1, pub2;
>>>
>>> When doing table sync for 't_1', the column list will be (b). I think it should
>>> be no filter because table t_1 is also published via ALL TABLES IN SCHMEA
>>> publication.
>>>
>>> For Row Filter, it will use no filter for this case.
>>>
>>>
>>> 2) one publication publishes both parent and child
>>> ----pub
>>> create table t (a int, b int, c int) partition by range (a);
>>> create table t_1 partition of t for values from (1) to (10)
>>> partition by range (a);
>>> create table t_2 partition of t_1 for values from (1) to (10);
>>>
>>> create publication pub2 for table t_1(a), t_2
>>> with (PUBLISH_VIA_PARTITION_ROOT);
>>>
>>> ----sub
>>> - prepare tables
>>> CREATE SUBSCRIPTION sub CONNECTION 'port=10000 dbname=postgres' PUBLICATION pub2;
>>>
>>> When doing table sync for table 't_1', it has no column list. I think the
>>> expected column list is (a).
>>>
>>> For Row Filter, it will use the row filter of the top most parent table(t_1) in
>>> this case.
>>>
>>>
>>> 3) one publication publishes both parent and child
>>> ----pub
>>> create table t (a int, b int, c int) partition by range (a);
>>> create table t_1 partition of t for values from (1) to (10)
>>> partition by range (a);
>>> create table t_2 partition of t_1 for values from (1) to (10);
>>>
>>> create publication pub2 for table t_1(a), t_2(b)
>>> with (PUBLISH_VIA_PARTITION_ROOT);
>>>
>>> ----sub
>>> - prepare tables
>>> CREATE SUBSCRIPTION sub CONNECTION 'port=10000 dbname=postgres' PUBLICATION pub2;
>>>
>>> When doing table sync for table 't_1', the column list would be (a, b). I think
>>> the expected column list is (a).
>>>
>>> For Row Filter, it will use the row filter of the top most parent table(t_1) in
>>> this case.
>>>
>>
>> Attached is an updated patch version, addressing all of those issues.
>>
>> 0001 is a bugfix, reworking how we calculate publish_as_relid. The old
>> approach was unstable with multiple publications, giving different
>> results depending on order of the publications. This should be
>> backpatched into PG13 where publish_via_partition_root was introduced, I
>> think.
>>
>> 0002 is the main patch, merging the changes proposed by Peter and fixing
>> the issues reported here. In most cases this means adopting the code
>> used for row filters, and perhaps simplifying it a bit.
>>
>>
>> But I also tried to implement a row-filter test for 0001, and I'm not
>> sure I understand the behavior I observe. Consider this:
>>
>> -- a chain of 3 partitions (on both publisher and subscriber)
>> CREATE TABLE test_part_rf (a int primary key, b int, c int)
>> PARTITION BY LIST (a);
>>
>> CREATE TABLE test_part_rf_1
>> PARTITION OF test_part_rf FOR VALUES IN (1,2,3,4,5)
>> PARTITION BY LIST (a);
>>
>> CREATE TABLE test_part_rf_2
>> PARTITION OF test_part_rf_1 FOR VALUES IN (1,2,3,4,5);
>>
>> -- initial data
>> INSERT INTO test_part_rf VALUES (1, 5, 100);
>> INSERT INTO test_part_rf VALUES (2, 15, 200);
>>
>> -- two publications, each adding a different partition
>> CREATE PUBLICATION test_pub_part_1 FOR TABLE test_part_rf_1
>> WHERE (b < 10) WITH (publish_via_partition_root);
>>
>> CREATE PUBLICATION test_pub_part_2 FOR TABLE test_part_rf_2
>> WHERE (b > 10) WITH (publish_via_partition_root);
>>
>> -- now create the subscription (also try opposite ordering)
>> CREATE SUBSCRIPTION test_part_sub CONNECTION '...'
>> PUBLICATION test_pub_part_1, test_pub_part_2;
>>
>> -- wait for sync
>>
>> -- inert some more data
>> INSERT INTO test_part_rf VALUES (3, 6, 300);
>> INSERT INTO test_part_rf VALUES (4, 16, 400);
>>
>> -- wait for catchup
>>
>> Now, based on the discussion here, my expectation is that we'll use the
>> row filter from the top-most ancestor in any publication, which in this
>> case is test_part_rf_1. Hence the filter should be (b < 10).
>>
>> So I'd expect these rows to be replicated:
>>
>> 1,5,100
>> 3,6,300
>>
>> But that's not what I get, unfortunately. I get different results,
>> depending on the order of publications:
>>
>> 1) test_pub_part_1, test_pub_part_2
>>
>> 1|5|100
>> 2|15|200
>> 3|6|300
>> 4|16|400
>>
>> 2) test_pub_part_2, test_pub_part_1
>>
>> 3|6|300
>> 4|16|400
>>
>> That seems pretty bizarre, because it either means we're not enforcing
>> any filter or some strange combination of filters (notice that for (2)
>> we skip/replicate rows matching either filter).
>>
>> I have to be missing something important, but this seems confusing.
>> There's a patch adding a simple test case to 028_row_filter.sql (named
>> .txt, so as not to confuse cfbot).
>>
>
> FWIW I think the reason is pretty simple - pgoutput_row_filter_init is
> broken. It assumes you can just do this
>
> rftuple = SearchSysCache2(PUBLICATIONRELMAP,
> ObjectIdGetDatum(entry->publish_as_relid),
> ObjectIdGetDatum(pub->oid));
>
> if (HeapTupleIsValid(rftuple))
> {
> /* Null indicates no filter. */
> rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
> Anum_pg_publication_rel_prqual,
> &pub_no_filter);
> }
> else
> {
> pub_no_filter = true;
> }
>
>
> and pub_no_filter=true means there's no filter at all. Which is
> nonsense, because we're using publish_as_relid here - the publication
> may not include this particular ancestor, in which case we need to just
> ignore this publication.
>
> So yeah, this needs to be reworked.
>

I spent a bit of time looking at this, and I think a minor change in
get_rel_sync_entry() fixes this - it's enough to ensure rel_publications
only includes publications that actually include publish_as_relid.

But this does not address tablesync.c :-( That still copies everything,
because it decides to sync both rels (test_pub_part_1, test_pub_part_2),
with it's row filter. On older releases this would fail, because we'd
start two workers:

1) COPY public.test_part_rf_2 TO STDOUT

2) COPY (SELECT a, b, c FROM public.test_part_rf_1) TO STDOUT

And that ends up inserting date from test_part_rf_2 twice. But now we
end up doing this instead:

1) COPY (SELECT a, b, c FROM public.test_part_rf_1 WHERE (b < 10)) TO STDOUT

2) COPY (SELECT a, b, c FROM ONLY public.test_part_rf_2 WHERE (b > 10))
TO STDOUT

Which no longer conflicts, because those subsets are mutually exclusive
(due to how the filter is defined), so the sync succeeds.

But I find this really weird - I think it's reasonable to expect the
sync to produce the same result as if the data was inserted and
replicated, and this just violates that.

Shouldn't tablesync calculate a list of relations in a way that prevents
such duplicate / overlapping syncs? In any case, this sync issue looks
entirely unrelated to the column filtering patch.

regards

--
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

Attachment Content-Type Size
0001-fixup-publish_as_relid-20220311.patch text/x-patch 2.8 KB
0002-fixup-row-filter-publications-20220311.patch text/x-patch 1.8 KB
0003-Allow-specifying-column-filters-for-logical-20220311.patch text/x-patch 163.5 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Masahiko Sawada 2022-03-11 02:25:04 Re: Skipping logical replication transactions on subscriber side
Previous Message Andres Freund 2022-03-11 01:27:12 Re: pgsql: dshash: Add sequential scan support.