Re: Perform streaming logical transactions by background workers and parallel apply

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-08-25 11:32:33
Message-ID: CAA4eK1JERguGiYoid9CMEOamZoD4jF_R8+mkNVS=mV-X=CXe=A@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Wed, Aug 24, 2022 at 7:17 PM houzj(dot)fnst(at)fujitsu(dot)com
<houzj(dot)fnst(at)fujitsu(dot)com> wrote:
>
> On Friday, August 19, 2022 4:49 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
> >
>
> > 8. It is not clear to me how APPLY_BGWORKER_EXIT status is used. Is it required
> > for the cases where bgworker exists due to some error and then apply worker
> > uses it to detect that and exits? How other bgworkers would notice this, is it
> > done via apply_bgworker_check_status()?
>
> It was used to detect the unexpected exit of bgworker and I have changed the design
> of this which is now similar to what we have in parallel query.
>

Thanks, this looks better.

> Attach the new version patch set(v24) which address above comments.
> Besides, I added some logic which try to stop the bgworker at transaction end
> if there are enough workers in the pool.
>

I think this deserves an explanation in worker.c under the title:
"Separate background workers" in the patch.

Review comments for v24-0001
=========================
1.
+ * cost of searhing the hash table

/searhing/searching

2.
+/*
+ * Apply background worker states.
+ */
+typedef enum ApplyBgworkerState
+{
+ APPLY_BGWORKER_BUSY, /* assigned to a transaction */
+ APPLY_BGWORKER_FINISHED /* transaction is completed */
+} ApplyBgworkerState;

Now, that there are just two states, can we think to represent them
via a flag ('available'/'in_use') or do you see a downside with that
as compared to the current approach?

3.
-replorigin_session_setup(RepOriginId node)
+replorigin_session_setup(RepOriginId node, int apply_leader_pid)

I have mentioned previously that we don't need anything specific to
apply worker/leader in this API, so why this change? The other idea
that occurred to me is that can we use replorigin_session_reset()
before sending the commit message to bgworker and then do the session
setup in bgworker only to handle the commit/abort/prepare message. We
also need to set it again for the leader apply worker after the leader
worker completes the wait for bgworker to finish the commit handling.

4. Unlike parallel query, here we seem to be creating separate DSM for
each worker, and probably the difference is due to the fact that here
we don't know upfront how many workers will actually be required. If
so, can we write some comments for the same in worker.c where you have
explained about parallel bgwroker stuff?

5.
/*
- * Handle streamed transactions.
+ * Handle streamed transactions for both the main apply worker and the apply
+ * background workers.

Shall we use leader apply worker in the above comment? Also, check
other places in the patch for similar changes.

6.
+ else
+ {

- /* open the spool file for this transaction */
- stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment);
+ /* notify handle methods we're processing a remote transaction */
+ in_streamed_transaction = true;

There is a spurious line after else {. Also, the comment could be
slightly improved: "/* notify handle methods we're processing a remote
in-progress transaction */"

7. The checks in various apply_handle_stream_* functions have improved
as compared to the previous version but I think we can still improve
those. One idea could be to use a separate function to decide the
action we want to take and then based on it, the caller can take
appropriate action. Using a similar idea, we can improve the checks in
handle_streamed_transaction() as well.

8.
+ else if ((winfo = apply_bgworker_find(xid)))
+ {
+ /* Send STREAM ABORT message to the apply background worker. */
+ apply_bgworker_send_data(winfo, s->len, s->data);
+
+ /*
+ * After sending the data to the apply background worker, wait for
+ * that worker to finish. This is necessary to maintain commit
+ * order which avoids failures due to transaction dependencies and
+ * deadlocks.
+ */
+ if (subxid == xid)
+ {
+ apply_bgworker_wait_for(winfo, APPLY_BGWORKER_FINISHED);
+ apply_bgworker_free(winfo);
+ }
+ }
+ else
+ /*
+ * We are in main apply worker and the transaction has been
+ * serialized to file.
+ */
+ serialize_stream_abort(xid, subxid);

In the last else block, you can use {} to make it consistent with
other if, else checks.

9.
+void
+ApplyBgworkerMain(Datum main_arg)
+{
+ volatile ApplyBgworkerShared *shared;
+
+ dsm_handle handle;

Is there a need to keep this empty line between the above two declarations?

10.
+ /*
+ * Attach to the message queue.
+ */
+ mq = shm_toc_lookup(toc, APPLY_BGWORKER_KEY_ERROR_QUEUE, false);

Here, we should say error queue in the comments.

11.
+ /*
+ * Attach to the message queue.
+ */
+ mq = shm_toc_lookup(toc, APPLY_BGWORKER_KEY_ERROR_QUEUE, false);
+ shm_mq_set_sender(mq, MyProc);
+ error_mqh = shm_mq_attach(mq, seg, NULL);
+ pq_redirect_to_shm_mq(seg, error_mqh);
+
+ /*
+ * Now, we have initialized DSM. Attach to slot.
+ */
+ logicalrep_worker_attach(worker_slot);
+ MyParallelShared->logicalrep_worker_generation =
MyLogicalRepWorker->generation;
+ MyParallelShared->logicalrep_worker_slot_no = worker_slot;
+
+ pq_set_parallel_leader(MyLogicalRepWorker->apply_leader_pid,
+ InvalidBackendId);

Is there a reason to set parallel_leader immediately after
pq_redirect_to_shm_mq() as we are doing parallel.c?

12.
if (pq_mq_parallel_leader_pid != 0)
+ {
SendProcSignal(pq_mq_parallel_leader_pid,
PROCSIG_PARALLEL_MESSAGE,
pq_mq_parallel_leader_backend_id);

+ /*
+ * XXX maybe we can reuse the PROCSIG_PARALLEL_MESSAGE instead of
+ * introducing a new signal reason.
+ */
+ SendProcSignal(pq_mq_parallel_leader_pid,
+ PROCSIG_APPLY_BGWORKER_MESSAGE,
+ pq_mq_parallel_leader_backend_id);
+ }

I think we don't need to send both signals. Here, we can check if this
is a parallel worker (IsParallelWorker), then send
PROCSIG_PARALLEL_MESSAGE, otherwise, send
PROCSIG_APPLY_BGWORKER_MESSAGE message. In the else part, we can have
an assert to ensure it is an apply bgworker.

--
With Regards,
Amit Kapila.

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Dilip Kumar 2022-08-25 11:56:18 Re: making relfilenodes 56 bits
Previous Message Michael Paquier 2022-08-25 11:07:26 Re: Avoid unecessary MemSet call (src/backend/utils/cache/relcache.c)