RE: Perform streaming logical transactions by background workers and parallel apply

From: "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>
To: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-08-18 03:44:05
Message-ID: OSZPR01MB6310269420CA5588D6B81518FD6D9@OSZPR01MB6310.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Wed, Aug 17, 2022 2:28 PM Wang, Wei/王 威 <wangw(dot)fnst(at)fujitsu(dot)com> wrote:
>
> On Tues, August 16, 2022 15:33 PM I wrote:
> > Attach the new patches.
>
> I found that cfbot has a failure.
> After investigation, I think it is because the worker's exit state is not set
> correctly. So I made some slight modifications.
>
> Attach the new patches.
>

Thanks for updating the patch. Here are some comments.

0003 patch
==============
1. src/backend/replication/logical/applybgworker.c
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot replicate target relation \"%s.%s\" using "
+ "subscription parameter streaming=parallel",
+ rel->remoterel.nspname, rel->remoterel.relname),
+ errdetail("The unique column on subscriber is not the unique "
+ "column on publisher or there is at least one "
+ "non-immutable function."),
+ errhint("Please change to use subscription parameter "
+ "streaming=on.")));

Should we use "%s" instead of "streaming=parallel" and "streaming=on"?

2. src/backend/replication/logical/applybgworker.c
+ * If any worker is handling the streaming transaction, this check needs to
+ * be performed not only using the apply background worker, but also in the
+ * main apply worker. This is because without these restrictions, main

this check needs to be performed not only using the apply background worker, but
also in the main apply worker.
->
this check not only needs to be performed by apply background worker, but also
by the main apply worker

3. src/backend/replication/logical/relation.c
+ if (ukey)
+ {
+ i = -1;
+ while ((i = bms_next_member(ukey, i)) >= 0)
+ {
+ attnum = AttrNumberGetAttrOffset(i + FirstLowInvalidHeapAttributeNumber);
+
+ if (entry->attrmap->attnums[attnum] < 0 ||
+ !bms_is_member(entry->attrmap->attnums[attnum], entry->remoterel.attunique))
+ {
+ entry->parallel_apply = PARALLEL_APPLY_UNSAFE;
+ return;
+ }
+ }
+
+ bms_free(ukey);

It looks we need to call bms_free() before return, right?

4. src/backend/replication/logical/relation.c
+ /* We don't need info for dropped or generated attributes */
+ if (att->attisdropped || att->attgenerated)
+ continue;

Would it be better to change the comment to:
We don't check dropped or generated attributes

5. src/test/subscription/t/032_streaming_apply.pl
+$node_publisher->wait_for_catchup($appname);
+
+# Then we check the foreign key on partition table.
+$node_publisher->wait_for_catchup($appname);

Here, wait_for_catchup() is called twice, we can remove the second one.

6. src/backend/replication/logical/applybgworker.c
+ /* If any workers (or the postmaster) have died, we have failed. */
+ if (status == APPLY_BGWORKER_EXIT)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("background worker %u failed to apply transaction %u",
+ entry->wstate->shared->worker_id,
+ entry->wstate->shared->stream_xid)));

Should we change the error message to "apply background worker %u failed to
apply transaction %u" ? To be consistent with the error message in
apply_bgworker_wait_for().

0004 patch
==============
1.
I saw that the commit message says:
If the subscriber exits with an error, this flag will be set true, and
whenever the transaction is applied successfully, this flag is reset false.

"subretry" is set to false if a transaction is applied successfully, it looks
similar to what clear_subscription_skip_lsn() does, so maybe we should remove
the following change in apply_handle_stream_abort()? Or only call
set_subscription_retry() when rollbacking the toplevel transaction.

@@ -1671,6 +1688,9 @@ apply_handle_stream_abort(StringInfo s)
*/
serialize_stream_abort(xid, subxid);
}
+
+ /* Reset the retry flag. */
+ set_subscription_retry(false);
}

reset_apply_error_context_info();

2. src/backend/replication/logical/worker.c
+ /* Reset subretry */
+ values[Anum_pg_subscription_subretry - 1] = BoolGetDatum(retry);
+ replaces[Anum_pg_subscription_subretry - 1] = true;

/* Reset subretry */
->
/* Set subretry */

3.
+# Insert dependent data on the publisher, now it works.
+$node_subscriber->safe_psql('postgres', "INSERT INTO test_tab2 VALUES(1)");

In the case that the DELETE change from publisher has not been applied yet when
executing the INSERT, the INSERT will fail.

0005 patch
==============
1.
+ <para>
+ Process ID of the main apply worker, if this process is a apply
+ background worker. NULL if this process is a main apply worker or a
+ synchronization worker.
+ </para></entry>

a apply background worker
->
an apply background worker

Regards,
Shi yu

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message Nikita Glukhov 2022-08-18 03:45:56 Re: SQL/JSON features for v15
Previous Message Tom Lane 2022-08-18 03:42:27 Re: shadow variables - pg15 edition