Re: logical decoding and replication of sequences, take 2

From: Ashutosh Bapat <ashutosh(dot)bapat(dot)oss(at)gmail(dot)com>
To: Tomas Vondra <tomas(dot)vondra(at)enterprisedb(dot)com>
Cc: PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Peter Eisentraut <peter(dot)eisentraut(at)enterprisedb(dot)com>
Subject: Re: logical decoding and replication of sequences, take 2
Date: 2023-07-04 13:13:16
Message-ID: CAExHW5s6kMKYHo1eQppvzZQyitvZcnAyKNU6xvbKwbPuTOXJkQ@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Mon, Jun 26, 2023 at 8:35 PM Tomas Vondra
<tomas(dot)vondra(at)enterprisedb(dot)com> wrote:
> On 6/26/23 15:18, Ashutosh Bapat wrote:

> > I will look at 0004 next.
> >
>
> OK

0004- is quite large. I think if we split this into two or even three
1. publication and
subscription catalog handling 2. built-in replication protocol changes, it
might be easier to review. But anyway, I have given it one read. I have
reviewed the parts which deal with the replication-proper in detail. I have
*not* thoroughly reviewed the parts which deal with the catalogs, pg_dump,
describe and tab completion. Similarly tests. If those parts need a
thorough review, please let
me know.

But before jumping into the comments, a weird scenario I tried. On publisher I
created a table t1(a int, b int) and a sequence s and added both to a
publication. On subscriber I swapped their names i.e. created a table s(a int, b
int) and a sequence t1 and subscribed to the publication. The subscription was
created, and during replication it threw error "logical replication target
relation "public.t1" is missing replicated columns: "a", "b" and logical
replication target relation "public.s" is missing replicated columns:
"last_value", "lo g_cnt", "is_called". I think it's good that it at least
threw an error. But it would be good if it detected that the reltypes
themselves are different and mentioned that in the error. Something like
"logical replication target "public.s" is not a sequence like source
"public.s".

Comments on the patch itself.

I didn't find any mention of 'sequence' in the documentation of publish option
in CREATE or ALTER PUBLICATION. Something missing in the documentation? But do
we really need to record "sequence" as an operation? Just adding the sequences
to the publication should be fine right? There's only one operation on
sequences, updating the sequence row.

+CREATE VIEW pg_publication_sequences AS
+ SELECT
+ P.pubname AS pubname,
+ N.nspname AS schemaname,
+ C.relname AS sequencename

If we report oid or regclass for sequences it might be easier to join the view
further. We don't have reg* for publication so we report both oid and
name of publication.

+/*
+ * Update the sequence state by modifying the existing sequence data row.
+ *
+ * This keeps the same relfilenode, so the behavior is non-transactional.
+ */
+static void
+SetSequence_non_transactional(Oid seqrelid, int64 last_value, int64
log_cnt, bool is_called)

This function has some code similar to nextval but with the sequence
of operations (viz. changes to buffer, WAL insert and cache update) changed.
Given the comments in nextval_internal() the difference in sequence of
operations should not make a difference in the end result. But I think it will
be good to deduplicate the code to avoid confusion and also for ease of
maintenance.

+
+/*
+ * Update the sequence state by creating a new relfilenode.
+ *
+ * This creates a new relfilenode, to allow transactional behavior.
+ */
+static void
+SetSequence_transactional(Oid seq_relid, int64 last_value, int64
log_cnt, bool is_called)

Need some deduplication here as well. But the similarities with AlterSequence,
ResetSequence or DefineSequence are less.

@@ -730,9 +731,9 @@ CreateSubscription(ParseState *pstate,
CreateSubscriptionStmt *stmt,
{
/*
- * Get the table list from publisher and build local table status
- * info.
+ * Get the table and sequence list from publisher and build
+ * local relation sync status info.
*/
- tables = fetch_table_list(wrconn, publications);
- foreach(lc, tables)
+ relations = fetch_table_list(wrconn, publications);

Is it allowed to connect a newer subscriber to an old publisher? If
yes the query
to fetch sequences will throw an error since it won't find the catalog.

@@ -882,8 +886,10 @@ AlterSubscription_refresh(Subscription *sub, bool
copy_data,
- /* Get the table list from publisher. */
+ /* Get the list of relations from publisher. */
pubrel_names = fetch_table_list(wrconn, sub->publications);
+ pubrel_names = list_concat(pubrel_names,
+ fetch_sequence_list(wrconn,
sub->publications));

Similarly here.

+void
+logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid,
+
... snip ...
+ pq_sendint8(out, flags);
+ pq_sendint64(out, lsn);
... snip ...
+LogicalRepRelId
+logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata)
+{
... snip ...
+ /* XXX skipping flags and lsn */
+ pq_getmsgint(in, 1);
+ pq_getmsgint64(in);

We are ignoring these two fields on the WAL receiver side. I don't see such
fields being part of INSERT, UPDATE or DELETE messages. Should we just drop
those or do they have some future use? Two lsns are written by
OutputPrepareWrite() as prologue to the logical message. If this LSN
is one of them, it could be dropped anyway.

+static void
+fetch_sequence_data(char *nspname, char *relname,
... snip ...
+ appendStringInfo(&cmd, "SELECT last_value, log_cnt, is_called\n"
+ " FROM %s",
quote_qualified_identifier(nspname, relname));

We are using an undocumented interface here. SELECT ... FROM <sequence> is not
documented. This code will break if we change the way a sequence is stored.
That is quite unlikely but not impossible. Ideally we should use one of the
methods documented at [1]. But none of them provide us what is needed per your
comment in copy_sequence() i.e the state of sequence as of last WAL record on
that sequence. So I don't have any better ideas that what's done in the patch.
May be we can use "nextval() + 32" as an approximation.

Some minor comments and nitpicks:

@@ -1958,12 +1958,14 @@ get_object_address_publication_schema(List
*object, bool missing_ok)

Need an update to the function prologue with the description of the third
element. Also the error message at the end of the function needs to mention the
object type.

- appendStringInfo(&buffer, _("publication of schema %s
in publication %s"),
- nspname, pubname);
+ appendStringInfo(&buffer, _("publication of schema %s
in publication %s type %s"),
+ nspname, pubname, objtype);

s/type/for object type/ ?

@@ -5826,18 +5842,24 @@ getObjectIdentityParts(const ObjectAddress *object,

break;
- appendStringInfo(&buffer, "%s in publication %s",
- nspname, pubname);
+ appendStringInfo(&buffer, "%s in publication %s type %s",
+ nspname, pubname, objtype);

s/type/object type/? ... in some other places as well?

+/*
+ * Check the character is a valid object type for schema publication.
+ *
+ * This recognizes either 't' for tables or 's' for sequences. Places that
+ * need to handle 'u' for unsupported relkinds need to do that explicitlyl

s/explicitlyl/explicitly/

+Datum
+pg_get_publication_sequences(PG_FUNCTION_ARGS)
+{
... snip ...
+ /*
+ * Publications support partitioned tables, although all changes are
+ * replicated using leaf partition identity and schema, so we only
+ * need those.
+ */

Not relevant here.

+ if (publication->allsequences)
+ sequences = GetAllSequencesPublicationRelations();
+ else
+ {
+ List *relids,
+ *schemarelids;
+
+ relids = GetPublicationRelations(publication->oid,
+ PUB_OBJTYPE_SEQUENCE,
+ publication->pubviaroot ?
+ PUBLICATION_PART_ROOT :
+ PUBLICATION_PART_LEAF);
+ schemarelids = GetAllSchemaPublicationRelations(publication->oid,
+
PUB_OBJTYPE_SEQUENCE,
+
publication->pubviaroot ?
+
PUBLICATION_PART_ROOT :
+
PUBLICATION_PART_LEAF);

I think we should just pass PUBLICATION_PART_ALL since that parameter is
irrelevant to sequences anyway. Otherwise this code would be confusing.

I think we should rename PublicationTable structure to PublicationRelation
since it can now contain information about a table or a sequence, both of which
are relations.

+/*
+ * Add or remove table to/from publication.

s/table/sequence/. Generally this applies to all the code, working for tables,
copied and modified for sequences.

@@ -18826,6 +18867,30 @@ preprocess_pubobj_list(List *pubobjspec_list,
core_yyscan_t yyscanner)
errmsg("invalid schema name"),
parser_errposition(pubobj->location));
}
+ else if (pubobj->pubobjtype == PUBLICATIONOBJ_SEQUENCES_IN_SCHEMA ||
+ pubobj->pubobjtype == PUBLICATIONOBJ_SEQUENCES_IN_CUR_SCHEMA)
+ {
+ /* WHERE clause is not allowed on a schema object */
+ if (pubobj->pubtable && pubobj->pubtable->whereClause)
+ ereport(ERROR,
+ errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("WHERE clause not allowed for schema"),
+ parser_errposition(pubobj->location));

Grammar doesn't allow specifying whereClause with ALL TABLES IN SCHEMA
specification but we have code to throw error if that happens. We also have
similar code for ALL SEQUENCES IN SCHEMA. Should we add for SEQUENCE
specification as well?

+static void
+fetch_sequence_data(char *nspname, char *relname,
... snip ...
+ /* tablesync sets the sequences in non-transactional way */
+ SetSequence(RelationGetRelid(rel), false, last_value, log_cnt, is_called);
Why? In case of a regular table, in case the sync fails, the table will retain
its state before sync. Similarly it will be expected that the sequence retains
its state before sync, No?

@@ -1467,10 +1557,21 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)

Now that it syncs sequences as well, should we rename this as
LogicalRepSyncRelationStart?

+static void
+apply_handle_sequence(StringInfo s)
... snip ...
+ /*
+ * Commit the per-stream transaction (we only do this when not in
+ * remote transaction, i.e. for non-transactional sequence updates.)
+ */
+ if (!in_remote_transaction)
+ CommitTransactionCommand();

I understand the purpose of if block. It commits the transaction that was
started when applying a non-transactional sequence change. But didn't
understand the term "per-stream transaction".

@@ -5683,8 +5686,15 @@ RelationBuildPublicationDesc(Relation relation,
PublicationDesc *pubdesc)

Thanks for the additional comments. Those are useful.

@@ -1716,28 +1716,19 @@ describeOneTableDetails(const char *schemaname,

I think these changes make it easy to print the publication description per the
code changes later. But May be we should commit the refactoring patch
separately.

-DECLARE_UNIQUE_INDEX(pg_publication_namespace_pnnspid_pnpubid_index,
6239, PublicationNamespacePnnspidPnpubidIndexId, on
pg_publication_namespace using btree(pnnspid oid_ops, pnpubid
oid_ops));
+DECLARE_UNIQUE_INDEX(pg_publication_namespace_pnnspid_pnpubid_pntype_index,
8903, PublicationNamespacePnnspidPnpubidPntypeIndexId, on
pg_publication_namespace using btree(pnnspid oid_ops, pnpubid oid_ops,
pntype char_ops));

Why do we need a new OID? The old index should not be there in a cluster
created using this version and hence this OID will not be used.

[1] https://www.postgresql.org/docs/current/functions-sequence.html

Next I will review 0005.

--
Best Wishes,
Ashutosh Bapat

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Heikki Linnakangas 2023-07-04 14:15:49 Re: Experiments with Postgres and SSL
Previous Message Daniel Gustafsson 2023-07-04 13:11:37 Re: Named Operators