Re: Column Filtering in Logical Replication

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>
Cc: Tomas Vondra <tomas(dot)vondra(at)enterprisedb(dot)com>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, Peter Eisentraut <peter(dot)eisentraut(at)enterprisedb(dot)com>, Alvaro Herrera <alvherre(at)alvh(dot)no-ip(dot)org>, Justin Pryzby <pryzby(at)telsasoft(dot)com>, Rahila Syed <rahilasyed90(at)gmail(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Column Filtering in Logical Replication
Date: 2022-03-16 11:27:43
Message-ID: CAA4eK1KR+yUQquK0Bx9uO3eb5xB1e0rAD9xKf-ddm5nSf4WfNg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Tue, Mar 15, 2022 at 7:38 AM shiy(dot)fnst(at)fujitsu(dot)com
<shiy(dot)fnst(at)fujitsu(dot)com> wrote:
>
> On Mon, Mar 14, 2022 5:08 AM Tomas Vondra <tomas(dot)vondra(at)enterprisedb(dot)com> wrote:
>
> 3. src/backend/commands/publicationcmds.c
> +/*
> + * Check if all columns referenced in the column list are part of the
> + * REPLICA IDENTITY index or not.
> + *
> + * Returns true if any invalid column is found.
> + */
>
> The comment for pub_collist_contains_invalid_column() seems wrong. Should it be
> "Check if all REPLICA IDENTITY columns are covered by the column list or not"?
>

On similar lines, I think errdetail for below messages need to be changed.
ereport(ERROR,
+ (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+ errmsg("cannot update table \"%s\"",
+ RelationGetRelationName(rel)),
+ errdetail("Column list used by the publication does not cover the
replica identity.")));
else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("cannot delete from table \"%s\"",
RelationGetRelationName(rel)),
errdetail("Column used in the publication WHERE expression is not
part of the replica identity.")));
+ else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+ errmsg("cannot delete from table \"%s\"",
+ RelationGetRelationName(rel)),
+ errdetail("Column list used by the publication does not cover the
replica identity.")));

Some assorted comments:
========================
1. As mentioned previously as well[1], the change in ATExecDropColumn
is not required. Similarly, the change you seem to agree upon in
logicalrep_write_update[2] doesn't seem to be present.

2. I think the dependency handling in publication_set_table_columns()
has problems. While removing existing dependencies, it uses
PublicationRelationId as classId whereas while adding new dependencies
it uses PublicationRelRelationId as classId. This will create problems
while removing columns from table. For example,
postgres=# create table t1(c1 int, c2 int, c3 int);
CREATE TABLE
postgres=# create publication pub1 for table t1(c1, c2);
CREATE PUBLICATION
postgres=# select * from pg_depend where classid = 6106 or refclassid
= 6106 or classid = 6104;
classid | objid | objsubid | refclassid | refobjid | refobjsubid | deptype
---------+-------+----------+------------+----------+-------------+---------
6106 | 16409 | 0 | 1259 | 16405 | 1 | a
6106 | 16409 | 0 | 1259 | 16405 | 2 | a
6106 | 16409 | 0 | 6104 | 16408 | 0 | a
6106 | 16409 | 0 | 1259 | 16405 | 0 | a
(4 rows)

Till here everything is fine.

postgres=# Alter publication pub1 alter table t1 set columns(c2);
ALTER PUBLICATION
postgres=# select * from pg_depend where classid = 6106 or refclassid
= 6106 or classid = 6104;
classid | objid | objsubid | refclassid | refobjid | refobjsubid | deptype
---------+-------+----------+------------+----------+-------------+---------
6106 | 16409 | 0 | 1259 | 16405 | 1 | a
6106 | 16409 | 0 | 1259 | 16405 | 2 | a
6106 | 16409 | 0 | 6104 | 16408 | 0 | a
6106 | 16409 | 0 | 1259 | 16405 | 0 | a
6106 | 16409 | 0 | 1259 | 16405 | 2 | a
(5 rows)

Now without removing dependencies for columns 1 and 2, it added a new
dependency for column 2.

3.
@@ -930,8 +1054,24 @@ copy_table(Relation rel)
...
+ for (int i = 0; i < lrel.natts; i++)
+ {
+ if (i > 0)
+ appendStringInfoString(&cmd, ", ");
+
+ appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+ }
...
...
for (int i = 0; i < lrel.natts; i++)
{
appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
if (i < lrel.natts - 1)
appendStringInfoString(&cmd, ", ");
}

In the same function, we use two different styles to achieve the same
thing. I think it is better to use the same style (probably existing)
at both places for the sake of consistency.

4.
+ <para>
+ The <literal>ALTER TABLE ... SET COLUMNS</literal> variant allows changing
+ the set of columns that are included in the publication. If a column list
+ is specified, it must include the replica identity columns.
+ </para>

I think the second part holds true only for update/delete publications.

5.
+ * XXX Should this detect duplicate columns?
+ */
+static void
+publication_translate_columns(Relation targetrel, List *columns,
+ int *natts, AttrNumber **attrs)
{
...
+ if (bms_is_member(attnum, set))
+ ereport(ERROR,
+ errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("duplicate column \"%s\" in publication column list",
+ colname));
...
}

It seems we already detect duplicate columns in this function. So XXX
part of the comment doesn't seem to be required.

6.
+ * XXX The name is a bit misleading, because we don't really transform
+ * anything here - we merely check the column list is compatible with the
+ * definition of the publication (with publish_via_partition_root=false)
+ * we only allow column lists on the leaf relations. So maybe rename it?
+ */
+static void
+TransformPubColumnList(List *tables, const char *queryString,
+ bool pubviaroot)

The second parameter is not used in this function. As noted in the
comments, I also think it is better to rename this. How about
ValidatePubColumnList?

7.
+ /*
+ * FIXME check pubactions vs. replica identity, to ensure the replica
+ * identity is included in the column list. Only do this for update
+ * and delete publications. See check_publication_columns.
+ *
+ * XXX This is needed because publish_via_partition_root may change,
+ * in which case the row filters may be invalid (e.g. with pvpr=false
+ * there must be no filter on partitioned tables).
+ */
+

This entire comment doesn't seem to be required.

8.
+publication_set_table_columns()
{
...
+ /* XXX "pub" is leaked here ??? */
...
}

It is not clear what this means?

9.
+ * ALTER PUBLICATION name SET COLUMNS table_name (column[, ...])
+ *
+ * ALTER PUBLICATION name SET COLUMNS table_name ALL
+ *
* pub_obj is one of:
*
* TABLE table_name [, ...]
@@ -9869,6 +9878,32 @@ AlterPublicationStmt:
n->action = AP_SetObjects;
$$ = (Node *)n;
}
+ | ALTER PUBLICATION name ALTER TABLE relation_expr SET COLUMNS '('
columnList ')'

The comments in gram.y indicates different rules than the actual implementation.

10.
+ *
+ * FIXME Do we need something similar for column filters?
*/
enum RowFilterPubAction

I have thought about this point and it seems we don't need anything on
this front for this patch. We need the filter combining of
update/delete for row filter because if inserts have some column which
is not present in RI then during update filtering it can give an error
as the column won't be present in WAL log.

Now, the same problem won't be there for the column list/filter patch
because all the RI columns are there in the column list (for
update/delete) and we don't need to apply a column filter for old
tuples in either update or delete.

We can remove this FIXME.

11.
+ } /* loop all subscribed publications */
+
+}

No need for an empty line here.

[1] - https://www.postgresql.org/message-id/CAA4eK1K5pkrPT9z5TByUPptExian5c18g6GnfNf9Cr97QdPbjw%40mail.gmail.com
[2] - https://www.postgresql.org/message-id/43c15aa8-aa15-ca0f-40e4-3be68d98df05%40enterprisedb.com

--
With Regards,
Amit Kapila.

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message Ashutosh Bapat 2022-03-16 11:31:42 unnecessary (same) restart_lsn handing in LogicalIncreaseRestartDecodingForSlot
Previous Message Yura Sokolov 2022-03-16 11:11:58 Re: BufferAlloc: don't take two simultaneous locks