Re: Perform streaming logical transactions by background workers and parallel apply

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-10-24 06:11:38
Message-ID: CAHut+PsJWHRoRzXtMrJ1RaxmkS2LkiMR_4S2pSionxXmYsyOww@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Here are my review comments for v40-0001.

======

src/backend/replication/logical/worker.c

1. should_apply_changes_for_rel

+ else if (am_parallel_apply_worker())
+ {
+ if (rel->state != SUBREL_STATE_READY)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication parallel apply worker for subscription
\"%s\" will stop",
+ MySubscription->name),
+ errdetail("Cannot handle streamed replication transaction using parallel "
+ "apply workers until all tables are synchronized.")));

1a.
"transaction" -> "transactions"

1b.
"are synchronized" -> "have been synchronized."

e.g. "Cannot handle streamed replication transactions using parallel
apply workers until all tables have been synchronized."

~~~

2. maybe_reread_subscription

+ if (am_parallel_apply_worker())
+ ereport(LOG,
+ (errmsg("logical replication parallel apply worker for subscription
\"%s\" will "
+ "stop because the subscription was removed",
+ MySubscription->name)));
+ else
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription \"%s\" will "
+ "stop because the subscription was removed",
+ MySubscription->name)));

Maybe there is an easier way to code this instead of if/else and
cut/paste message text:

SUGGESTION

ereport(LOG,
(errmsg("logical replication %s for subscription \"%s\" will stop
because the subscription was removed",
am_parallel_apply_worker() ? "parallel apply worker" : "apply worker",
MySubscription->name)));
~~~

3.

+ if (am_parallel_apply_worker())
+ ereport(LOG,
+ (errmsg("logical replication parallel apply worker for subscription
\"%s\" will "
+ "stop because the subscription was disabled",
+ MySubscription->name)));
+ else
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription \"%s\" will "
+ "stop because the subscription was disabled",
+ MySubscription->name)));

These can be combined like comment #2 above

SUGGESTION

ereport(LOG,
(errmsg("logical replication %s for subscription \"%s\" will stop
because the subscription was disabled",
am_parallel_apply_worker() ? "parallel apply worker" : "apply worker",
MySubscription->name)));

~~~

4.

+ if (am_parallel_apply_worker())
+ ereport(LOG,
+ (errmsg("logical replication parallel apply worker for subscription
\"%s\" will stop because of a parameter change",
+ MySubscription->name)));
+ else
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription \"%s\"
will restart because of a parameter change",
+ MySubscription->name)));

These can be combined like comment #2 above

SUGGESTION

ereport(LOG,
(errmsg("logical replication %s for subscription \"%s\" will restart
because of a parameter change",
am_parallel_apply_worker() ? "parallel apply worker" : "apply worker",
MySubscription->name)));

~~~~

4. InitializeApplyWorker

+ if (am_parallel_apply_worker())
+ ereport(LOG,
+ (errmsg("logical replication parallel apply worker for subscription
%u will not "
+ "start because the subscription was removed during startup",
+ MyLogicalRepWorker->subid)));
+ else
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription %u will not "
+ "start because the subscription was removed during startup",
+ MyLogicalRepWorker->subid)));

These can be combined like comment #2 above

SUGGESTION

ereport(LOG,
(errmsg("logical replication %s for subscription %u will not start
because the subscription was removed during startup",
am_parallel_apply_worker() ? "parallel apply worker" : "apply worker",
MyLogicalRepWorker->subid)));

~~~

5.

+ else if (am_parallel_apply_worker())
+ ereport(LOG,
+ (errmsg("logical replication parallel apply worker for subscription
\"%s\" has started",
+ MySubscription->name)));
else
ereport(LOG,
(errmsg("logical replication apply worker for subscription \"%s\"
has started",
MySubscription->name)));

The last if/else can be combined same as comment #2 above

SUGGESTION

else
ereport(LOG,
(errmsg("logical replication %s for subscription \"%s\" has started",
am_parallel_apply_worker() ? "parallel apply worker" : "apply worker",
MySubscription->name)));

~~~

6. IsLogicalParallelApplyWorker

+bool
+IsLogicalParallelApplyWorker(void)
+{
+ return IsLogicalWorker() && am_parallel_apply_worker();
+}

Patch v40 added the IsLogicalWorker() to the condition, but why is
that extra check necessary?

======

7. src/include/replication/worker_internal.h

+typedef struct ParallelApplyWorkerInfo
+{
+ shm_mq_handle *mq_handle;
+
+ /*
+ * The queue used to transfer messages from the parallel apply worker to
+ * the leader apply worker.
+ */
+ shm_mq_handle *error_mq_handle;

In patch v40 the comment about the NULL error_mq_handle is removed,
but since the code still explicitly set/checks NULL in different
places isn't it still better to have some comment here to describe
what NULL means?

------
Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Japin Li 2022-10-24 06:59:54 Re: Question about savepoint level?
Previous Message Masahiko Sawada 2022-10-24 05:53:36 Re: [PoC] Improve dead tuple storage for lazy vacuum