RE: Perform streaming logical transactions by background workers and parallel apply

From: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
To: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
Cc: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-08-29 11:31:42
Message-ID: OS0PR01MB5716DB4C67394D1676429D3594769@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Thursday, August 25, 2022 7:33 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>
> On Wed, Aug 24, 2022 at 7:17 PM houzj(dot)fnst(at)fujitsu(dot)com
> <houzj(dot)fnst(at)fujitsu(dot)com> wrote:
> >
> > On Friday, August 19, 2022 4:49 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
> > >
> >
> > > 8. It is not clear to me how APPLY_BGWORKER_EXIT status is used. Is it
> required
> > > for the cases where bgworker exists due to some error and then apply
> worker
> > > uses it to detect that and exits? How other bgworkers would notice this, is
> it
> > > done via apply_bgworker_check_status()?
> >
> > It was used to detect the unexpected exit of bgworker and I have changed
> the design
> > of this which is now similar to what we have in parallel query.
> >
>
> Thanks, this looks better.
>
> > Attach the new version patch set(v24) which address above comments.
> > Besides, I added some logic which try to stop the bgworker at transaction
> end
> > if there are enough workers in the pool.
> >
>
> I think this deserves an explanation in worker.c under the title:
> "Separate background workers" in the patch.
>
> Review comments for v24-0001

Thanks for the comments.

> =========================
> 1.
> + * cost of searhing the hash table
>
> /searhing/searching

Fixed.

> 2.
> +/*
> + * Apply background worker states.
> + */
> +typedef enum ApplyBgworkerState
> +{
> + APPLY_BGWORKER_BUSY, /* assigned to a transaction */
> + APPLY_BGWORKER_FINISHED /* transaction is completed */
> +} ApplyBgworkerState;
>
> Now, that there are just two states, can we think to represent them
> via a flag ('available'/'in_use') or do you see a downside with that
> as compared to the current approach?

Changed to in_use.

> 3.
> -replorigin_session_setup(RepOriginId node)
> +replorigin_session_setup(RepOriginId node, int apply_leader_pid)
>
> I have mentioned previously that we don't need anything specific to
> apply worker/leader in this API, so why this change? The other idea
> that occurred to me is that can we use replorigin_session_reset()
> before sending the commit message to bgworker and then do the session
> setup in bgworker only to handle the commit/abort/prepare message. We
> also need to set it again for the leader apply worker after the leader
> worker completes the wait for bgworker to finish the commit handling.

I have reverted the changes related to replorigin_session_setup and used
the suggested approach. I also did some simple performance tests for this approach
and didn't see some obvious overhead as the replorigin_session_setup is invoked
per streaming transaction.

> 4. Unlike parallel query, here we seem to be creating separate DSM for
> each worker, and probably the difference is due to the fact that here
> we don't know upfront how many workers will actually be required. If
> so, can we write some comments for the same in worker.c where you have
> explained about parallel bgwroker stuff?

Added.

> 5.
> /*
> - * Handle streamed transactions.
> + * Handle streamed transactions for both the main apply worker and the apply
> + * background workers.
>
> Shall we use leader apply worker in the above comment? Also, check
> other places in the patch for similar changes.

Changed.

> 6.
> + else
> + {
>
> - /* open the spool file for this transaction */
> - stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment);
> + /* notify handle methods we're processing a remote transaction */
> + in_streamed_transaction = true;
>
> There is a spurious line after else {. Also, the comment could be
> slightly improved: "/* notify handle methods we're processing a remote
> in-progress transaction */"

Changed.

> 7. The checks in various apply_handle_stream_* functions have improved
> as compared to the previous version but I think we can still improve
> those. One idea could be to use a separate function to decide the
> action we want to take and then based on it, the caller can take
> appropriate action. Using a similar idea, we can improve the checks in
> handle_streamed_transaction() as well.

Improved as suggested.

> 8.
> + else if ((winfo = apply_bgworker_find(xid)))
> + {
> + /* Send STREAM ABORT message to the apply background worker. */
> + apply_bgworker_send_data(winfo, s->len, s->data);
> +
> + /*
> + * After sending the data to the apply background worker, wait for
> + * that worker to finish. This is necessary to maintain commit
> + * order which avoids failures due to transaction dependencies and
> + * deadlocks.
> + */
> + if (subxid == xid)
> + {
> + apply_bgworker_wait_for(winfo, APPLY_BGWORKER_FINISHED);
> + apply_bgworker_free(winfo);
> + }
> + }
> + else
> + /*
> + * We are in main apply worker and the transaction has been
> + * serialized to file.
> + */
> + serialize_stream_abort(xid, subxid);
>
> In the last else block, you can use {} to make it consistent with
> other if, else checks.
>
> 9.
> +void
> +ApplyBgworkerMain(Datum main_arg)
> +{
> + volatile ApplyBgworkerShared *shared;
> +
> + dsm_handle handle;
>
> Is there a need to keep this empty line between the above two declarations?

Removed.

> 10.
> + /*
> + * Attach to the message queue.
> + */
> + mq = shm_toc_lookup(toc, APPLY_BGWORKER_KEY_ERROR_QUEUE, false);
>
> Here, we should say error queue in the comments.

Fixed.

> 11.
> + /*
> + * Attach to the message queue.
> + */
> + mq = shm_toc_lookup(toc, APPLY_BGWORKER_KEY_ERROR_QUEUE, false);
> + shm_mq_set_sender(mq, MyProc);
> + error_mqh = shm_mq_attach(mq, seg, NULL);
> + pq_redirect_to_shm_mq(seg, error_mqh);
> +
> + /*
> + * Now, we have initialized DSM. Attach to slot.
> + */
> + logicalrep_worker_attach(worker_slot);
> + MyParallelShared->logicalrep_worker_generation =
> MyLogicalRepWorker->generation;
> + MyParallelShared->logicalrep_worker_slot_no = worker_slot;
> +
> + pq_set_parallel_leader(MyLogicalRepWorker->apply_leader_pid,
> + InvalidBackendId);
>
> Is there a reason to set parallel_leader immediately after
> pq_redirect_to_shm_mq() as we are doing parallel.c?

Moved the code.

> 12.
> if (pq_mq_parallel_leader_pid != 0)
> + {
> SendProcSignal(pq_mq_parallel_leader_pid,
> PROCSIG_PARALLEL_MESSAGE,
> pq_mq_parallel_leader_backend_id);
>
> + /*
> + * XXX maybe we can reuse the PROCSIG_PARALLEL_MESSAGE instead of
> + * introducing a new signal reason.
> + */
> + SendProcSignal(pq_mq_parallel_leader_pid,
> + PROCSIG_APPLY_BGWORKER_MESSAGE,
> + pq_mq_parallel_leader_backend_id);
> + }
>
> I think we don't need to send both signals. Here, we can check if this
> is a parallel worker (IsParallelWorker), then send
> PROCSIG_PARALLEL_MESSAGE, otherwise, send
> PROCSIG_APPLY_BGWORKER_MESSAGE message. In the else part, we can have
> an assert to ensure it is an apply bgworker.

Changed.

Attach the new version patch set which addressed the above comments
and comments from Amit[1] and Kuroda-san[2].

As discussed, I also renamed all the "apply background worker" and
related stuff to "apply parallel worker".

[1] https://www.postgresql.org/message-id/CAA4eK1%2B_oHZHoDooAR7QcYD2CeTUWNSwkqVcLWC2iQijAJC4Cg%40mail.gmail.com
[2] https://www.postgresql.org/message-id/TYAPR01MB58666A97D40AB8919D106AD5F5709%40TYAPR01MB5866.jpnprd01.prod.outlook.com

Best regards,
Hou zj

Attachment Content-Type Size
v25-0005-Add-a-main_worker_pid-to-pg_stat_subscription.patch application/octet-stream 7.7 KB
v25-0001-Perform-streaming-logical-transactions-by-parall.patch application/octet-stream 128.3 KB
v25-0002-Test-streaming-parallel-option-in-tap-test.patch application/octet-stream 75.2 KB
v25-0003-Add-some-checks-before-using-apply-parallel-work.patch application/octet-stream 52.0 KB
v25-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch application/octet-stream 31.4 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message houzj.fnst@fujitsu.com 2022-08-29 11:31:49 RE: Perform streaming logical transactions by background workers and parallel apply
Previous Message Dong Wook Lee 2022-08-29 11:26:56 pg_checksum: add test for coverage