RE: Perform streaming logical transactions by background workers and parallel apply

From: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
To: "kuroda(dot)hayato(at)fujitsu(dot)com" <kuroda(dot)hayato(at)fujitsu(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-09-27 12:31:15
Message-ID: OS0PR01MB57162BC6B0FB848F216410DE94559@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Tuesday, September 27, 2022 2:32 PM Kuroda, Hayato/黒田 隼人 <kuroda(dot)hayato(at)fujitsu(dot)com>
>
> Dear Wang,
>
> Followings are comments for your patchset.

Thanks for the comments.

> ====
> 0001
>
>
> 01. launcher.c - logicalrep_worker_stop_internal()
>
> ```
> +
> + Assert(LWLockHeldByMe(LogicalRepWorkerLock));
> +
> ```

Changed.

> I think it should be Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock,
> LW_SHARED)) because the lock is released one and acquired again as
> LW_SHARED.
> If newer function has been acquired lock as LW_EXCLUSIVE and call
> logicalrep_worker_stop_internal(),
> its lock may become weaker after calling it.
>
> 02. launcher.c - apply_handle_stream_start()
>
> ```
> + /*
> + * Notify handle methods we're processing a remote
> in-progress
> + * transaction.
> + */
> + in_streamed_transaction = true;
>
> - MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
> - FileSetInit(MyLogicalRepWorker->stream_fileset);
> + /*
> + * Start a transaction on stream start, this transaction
> will be
> + * committed on the stream stop unless it is a
> tablesync worker in
> + * which case it will be committed after processing all
> the
> + * messages. We need the transaction for handling the
> buffile,
> + * used for serializing the streaming data and subxact
> info.
> + */
> + begin_replication_step();
> ```
>
> Previously in_streamed_transaction was set after the begin_replication_step(),
> but the ordering is modified. Maybe we don't have to modify it if there is no
> particular reason.
>
> 03. launcher.c - apply_handle_stream_stop()
>
> ```
> + /* Commit the per-stream transaction */
> + CommitTransactionCommand();
> +
> + /* Reset per-stream context */
> + MemoryContextReset(LogicalStreamingContext);
> +
> + pgstat_report_activity(STATE_IDLE, NULL);
> +
> + in_streamed_transaction = false;
> ```
>
> Previously in_streamed_transaction was set after the MemoryContextReset(),
> but the ordering is modified.
> Maybe we don't have to modify it if there is no particular reason.

I adjusted the position of this due to some other improvements this time.

>
> 04. applyparallelworker.c - LogicalParallelApplyLoop()
>
> ```
> + shmq_res = shm_mq_receive(mqh, &len, &data, false);
> ...
> + if (ConfigReloadPending)
> + {
> + ConfigReloadPending = false;
> + ProcessConfigFile(PGC_SIGHUP);
> + }
> ```
>
>
> Here the parallel apply worker waits to receive messages and after dispatching
> it ProcessConfigFile() is called.
> It means that .conf will be not read until the parallel apply worker receives new
> messages and apply them.
>
> It may be problematic when users change log_min_message to debugXXX for
> debugging but the streamed transaction rarely come.
> They expected that detailed description appears on the log from next
> streaming chunk, but it does not.
>
> This does not occur in leader worker when it waits messages from publisher,
> because it uses libpqrcv_receive(), which works asynchronously.
>
> I 'm not sure whether it should be documented that the evaluation of GUCs may
> be delayed, how do you think?

I changed the shm_mq_receive to asynchronous mode which is also consistent with
what we did for Gather node when reading data from parallel query workers.

>
> ===
> 0004
>
> 05. logical-replication.sgml
>
> ```
> ...
> In that case, it may be necessary to change the streaming mode to on or off and
> cause the same conflicts again so the finish LSN of the failed transaction will be
> written to the server log.
> ...
> ```
>
> Above sentence is added by 0001, but it is not modified by 0004.
> Such transactions will be retried as streaming=on mode, so some descriptions
> related with it should be added.

Added.

Best regards,
Hou zj

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Aleksander Alekseev 2022-09-27 12:34:48 Re: Adding a clang-format file
Previous Message houzj.fnst@fujitsu.com 2022-09-27 12:27:18 RE: Perform streaming logical transactions by background workers and parallel apply