Re: Perform streaming logical transactions by background workers and parallel apply

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
Cc: Peter Smith <smithpb2250(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-05-30 11:38:21
Message-ID: CAA4eK1Jt08SYbRt_-rbSWNg=X9-m8+RdP5PosfnQgyF-z8bkxQ@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Mon, May 30, 2022 at 2:22 PM wangw(dot)fnst(at)fujitsu(dot)com
<wangw(dot)fnst(at)fujitsu(dot)com> wrote:
>
> Attach the new patches(only changed 0001 and 0002)
>

Few comments/suggestions for 0001 and 0003
=====================================
0001
--------
1.
+ else
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "logical replication apply worker for subscription %u", subid);

Can we slightly change the message to: "logical replication background
apply worker for subscription %u"?

2. Can we think of separating the new logic for applying the xact by
bgworker into a new file like applybgwroker or applyparallel? We have
previously done the same in the case of vacuum (see vacuumparallel.c).

3.
+ /*
+ * XXX The publisher side doesn't always send relation update messages
+ * after the streaming transaction, so update the relation in main
+ * apply worker here.
+ */
+ if (action == LOGICAL_REP_MSG_RELATION)
+ {
+ LogicalRepRelation *rel = logicalrep_read_rel(s);
+ logicalrep_relmap_update(rel);
+ }

I think the publisher side won't send the relation update message
after streaming transaction only if it has already been sent for a
non-streaming transaction in which case we don't need to update the
local cache here. This is as per my understanding of
maybe_send_schema(), do let me know if I am missing something? If my
understanding is correct then we don't need this change.

4.
+ * For the main apply worker, if in streaming mode (receiving a block of
+ * streamed transaction), we send the data to the apply background worker.
*
- * If in streaming mode (receiving a block of streamed transaction), we
- * simply redirect it to a file for the proper toplevel transaction.

This comment is slightly confusing. Can we change it to something
like: "In streaming case (receiving a block of streamed transaction),
for SUBSTREAM_ON mode, we simply redirect it to a file for the proper
toplevel transaction, and for SUBSTREAM_APPLY mode, we send the
changes to background apply worker."?

5.
+apply_handle_stream_abort(StringInfo s)
{
...
...
+ /*
+ * If the two XIDs are the same, it's in fact abort of toplevel xact,
+ * so just free the subxactlist.
+ */
+ if (subxid == xid)
+ {
+ set_apply_error_context_xact(subxid, InvalidXLogRecPtr);

- fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
- false);
+ AbortCurrentTransaction();

- buffer = palloc(BLCKSZ);
+ EndTransactionBlock(false);
+ CommitTransactionCommand();
+
+ in_remote_transaction = false;
...
...
}

Here, can we update the replication origin as we are doing in
apply_handle_rollback_prepared? Currently, we don't do it because we
are just cleaning up temporary files for which we don't even have a
transaction. Also, we don't have the required infrastructure to
advance origins for aborts as we have for abort prepared. See commits
[1eb6d6527a][8a812e5106]. If we think it is a good idea then I think
we need to send abort_lsn and abort_time from the publisher and we
need to be careful to make it work with lower subscriber versions that
don't have the facility to process these additional values.

0003
--------
6.
+ /*
+ * If any unique index exist, check that they are same as remoterel.
+ */
+ if (!rel->sameunique)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot replicate relation with different unique index"),
+ errhint("Please change the streaming option to 'on' instead of 'apply'.")));

I think we can do better here. Instead of simply erroring out and
asking the user to change streaming mode, we can remember this in the
system catalog probably in pg_subscription, and then on restart, we
can change the streaming mode to 'on', perform the transaction, and
again change the streaming mode to apply. I am not sure whether we
want to do it in the first version or not, so if you agree with this,
developing it as a separate patch would be a good idea.

Also, please update comments here as to why we don't handle such cases.

--
With Regards,
Amit Kapila.

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Michael Meskes 2022-05-30 13:25:16 Re: SQL/JSON functions vs. ECPG vs. STRING as a reserved word
Previous Message kuroda.hayato@fujitsu.com 2022-05-30 09:49:58 RE: Multi-Master Logical Replication