Perform streaming logical transactions by background workers and parallel apply

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Cc: Hou, Zhijie/侯 志杰 <houzj(dot)fnst(at)fujitsu(dot)com>, shiy(dot)fnst(at)fujitsu(dot)com
Subject: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-04-06 05:19:40
Message-ID: CAA4eK1+wyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

In this email, I would like to discuss allowing streaming logical
transactions (large in-progress transactions) by background workers
and parallel apply in general. The goal of this work is to improve the
performance of the apply work in logical replication.

Currently, for large transactions, the publisher sends the data in
multiple streams (changes divided into chunks depending upon
logical_decoding_work_mem), and then on the subscriber-side, the apply
worker writes the changes into temporary files and once it receives
the commit, it read from the file and apply the entire transaction. To
improve the performance of such transactions, we can instead allow
them to be applied via background workers. There could be multiple
ways to achieve this:

Approach-1: Assign a new bgworker (if available) as soon as the xact's
first stream came and the main apply worker will send changes to this
new worker via shared memory. We keep this worker assigned till the
transaction commit came and also wait for the worker to finish at
commit. This preserves commit ordering and avoid writing to and
reading from file in most cases. We still need to spill if there is no
worker available. We also need to allow stream_stop to complete by the
background worker to finish it to avoid deadlocks because T-1's
current stream of changes can update rows in conflicting order with
T-2's next stream of changes.

Approach-2: Assign another worker to spill the changes and only allow
to apply at the commit time by the same or another worker. Now, to
preserve, the commit order, we need to wait at commit so that the
assigned respective workers can finish. This won't avoid spilling to
disk and reading back at commit time but can help in receiving and
processing more data than we are doing currently but not sure if this
can win over Approach-1 because we still need to write and read from
the file and we need to probably use share memory queue to send the
data to other background workers to process it.

We need to change error handling to allow the above parallelization.
The current model for apply is such that if any error occurs while
applying we will simply report the error in server logs and the apply
worker will exit. On the restart, it will again get the transaction
data which previously failed and it will try to apply it again. Now,
in the new approach (say Approach-1), we need to ensure that all the
active workers that are applying in-progress transactions should also
exit before the main apply worker exit to allow rollback of currently
applied transactions and re-apply them as we get the data again. This
is required to avoid losing transactions if any later transaction got
committed and updated the replication origin as in such cases the
earlier transactions won't be resent. This won't be much different
than what we do now, where say two transactions, t-1, and t-2 have
multiple streams overlapped. Now, if the error happened before one of
those is completed via commit or rollback, all the data needs to be
resent by the server and processed again by the apply worker.

The next step in this area is to parallelize apply of all possible
transactions. I think the main things we need to care about to allow
this are:
1. Transaction dependency: We can't simply allow dependent
transactions to perform in parallel as that can lead to inconsistency.
Say, if we insert a row in the first transaction and update it in the
second transaction and allow both transactions to apply in parallel,
the insert-one may occur later and the update will fail.
2. Deadlocks: It can happen because now the transactions will be
applied in parallel. Say transaction T-1 updates row-2 and row-3 and
transaction T-2 updates row-3 and row-2, if we allow in parallel then
there is a chance of deadlock whereas there is no such risk in serial
execution where the commit order is preserved.

We can solve both problems if we allow only independent xacts to be
parallelized. The transactions would be considered dependent if they
operate on the same set of rows from the same table. Now apart from
this, there could be other cases where determining transaction
dependency won't be straightforward, so we can disallow those
transactions to participate in parallel apply. Those are the cases
where we can use functions in the table definition expressions. We can
think of identifying safe functions like all built-in functions, and
any immutable functions (and probably stable functions). We need to
check safety for cases such as (a) trigger functions, (b) column
default value expressions (as those can call functions), (c)
constraint expressions, (d) foreign keys, (e) operations on
partitioned tables (especially those performed via
publish_via_partition_root option) as we need to check for expressions
on all partitions.

The transactions that operate on the same set of tables and are
performing truncate can lead to deadlock, so we need to consider such
transactions as a dependent.

The basic idea is that for each running xact we can maintain the table
oid, row id(pkey or replica identity), and xid in the hash table in
apply worker. For any new xact, we need to check if it doesn't
conflict with one of the previous running xacts and only then allow it
to be applied parallelly. We can collect all the changes of a
transaction in the in-memory buffer while checking its dependency and
then allow it to perform by one of the available workers at commit. If
the rows for a particular transaction exceed a certain threshold then
we need to escalate to a table-level strategy which means any other
transaction operating on the same table will be considered dependent.
For very large transactions that didn't fit in the in-memory buffer,
either we need to spill those to disk or just decide to not
parallelize them. We need to remove rows from the hash table once the
transaction is applied completely.

The other thing we need to ensure while parallelizing independent
transactions is to preserve the commit order of transactions. This is
to ensure that in case of errors, we won't get replicas out of sync.
Say, if we allow the commit order to be changed then it is possible
that some later transaction has updated the replication_origin LSN to
a later value than the transaction for which the apply is in progress.
Now, if the error occurs for such an in-progress transaction, the
server won't send the changes for such a transaction as the
replication_origin's LSN would have moved ahead.

Even though we are preserving commit order there will be a benefit of
doing parallel apply as we should be able to parallelize most of the
writes in the transactions.

Thoughts?

Thanks to Hou-San and Shi-San for helping me to investigate these ideas.

--
With Regards,
Amit Kapila.

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Konstantin Izmailov 2022-04-06 05:23:53 Re: zero char is returned as space
Previous Message Tom Lane 2022-04-06 05:08:21 Re: zero char is returned as space