Parallel INSERT SELECT take 2

From: "tsunakawa(dot)takay(at)fujitsu(dot)com" <tsunakawa(dot)takay(at)fujitsu(dot)com>
To: PostgreSQL Developers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Parallel INSERT SELECT take 2
Date: 2021-04-12 01:21:57
Message-ID: TYAPR01MB29905A9AB82CC8BA50AB0F80FE709@TYAPR01MB2990.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

This is another try of [1].

BACKGROUND
========================================

We want to realize parallel INSERT SELECT in the following steps:
1) INSERT + parallel SELECT
2) Parallel INSERT + parallel SELECT

Below are example use cases. We don't expect high concurrency or an empty data source.
* Data loading (ETL or ELT) into an analytics database, typically a data ware house.
* Batch processing in an OLTP database.

PROBLEMS
========================================

(1) The overhead of checking parallel-safety could be large
We have to check the target table and its child partitions for parallel safety. That is, we make sure those relations don't have parallel-unsafe domains, constraints, indexes, or triggers.

What we should check is the relations into which the statement actually inserts data. However, the planner does not know which relations will be actually inserted into. So, the planner has to check all descendant partitions of a target table. When the target table has many partitions, this overhead could be unacceptable when compared to the benefit gained from parallelism.

(2) There's no mechanism for parallel workers to assign an XID
Parallel workers need an XID of the current (sub)transaction when actually inserting a tuple (i.e., calling heap_insert()). When the leader has not got the XID yet, the worker may have to assign a new XID and communicate it to the leader and other workers so that all parallel processes use the same XID.

SOLUTION TO (1)
========================================

The candidate ideas are:

1) Caching the result of parallel-safety check
The planner stores the result of checking parallel safety for each relation in relcache, or some purpose-built hash table in shared memory.

The problems are:

* Even if the target relation turns out to be parallel safe by looking at those data structures, we cannot assume it remains true until the SQL statement finishes. For instance, other sessions might add a parallel-unsafe index to its descendant relations. Other examples include that when the user changes the parallel safety of indexes or triggers by running ALTER FUNCTION on the underlying index AM function or trigger function, the relcache entry of the table or index is not invalidated, so the correct parallel safety is not maintained in the cache.
In that case, when the executor encounters a parallel-unsafe object, it can change the cached state as being parallel-unsafe and error out.

* Can't ensure fast access. With relcache, the first access in each session has to undergo the overhead of parallel-safety check. With a hash table in shared memory, the number of relations stored there would be limited, so the first access after database startup or the hash table entry eviction similarly experiences slowness.

* With a new hash table, some lwlock for concurrent access must be added, which can have an adverse effect on performance.

2) Enabling users to declare that the table allows parallel data modification
Add a table property that represents parallel safety of the table for DML statement execution. Users specify it as follows:

CREATE TABLE table_name (...) PARALLEL { UNSAFE | RESTRICTED | SAFE };
ALTER TABLE table_name PARALLEL { UNSAFE | RESTRICTED | SAFE };

This property is recorded in pg_class's relparallel column as 'u', 'r', or 's', just like pg_proc's proparallel. The default is UNSAFE.

The planner assumes that all of the table, its descendant partitions, and their ancillary objects have the specified parallel safety or safer one. The user is responsible for its correctness. If the parallel processes find an object that is less safer than the assumed parallel safety during statement execution, it throws an ERROR and abort the statement execution.

The objects that relate to the parallel safety of a DML target table are as follows:

* Column default expression
* DOMAIN type CHECK expression
* CHECK constraints on column
* Partition key
* Partition key support function
* Index expression
* Index predicate
* Index AM function
* Operator function
* Trigger function

When the parallel safety of some of these objects is changed, it's costly to reflect it on the parallel safety of tables that depend on them. So, we don't do it. Instead, we provide a utility function pg_get_parallel_safety('table_name') that returns records of (objid, classid, parallel_safety) that represent the parallel safety of objects that determine the parallel safety of the specified table. The function only outputs objects that are not parallel safe. Otherwise, it will consume excessive memory while accumulating the output. The user can use this function to identify problematic objects when a parallel DML fails or is not parallelized in an expected manner.

How does the executor detect parallel unsafe objects? There are two ways:

1) At loading time
When the executor loads the definition of objects (tables, constraints, index, triggers, etc.) during the first access to them after session start or their eviction by sinval message, it checks the parallel safety.

This is a legitimate way, but may need much code. Also, it might overlook necessary code changes without careful inspection.

2) At function execution time
All related objects come down to some function execution. So, add a parallel safety check there when in a parallel worker. If the current process is a parallel worker and the function is parallel unsafe, error out with ereport(ERROR). This approach eliminates the oversight of parallel safety check with the additional bonus of tiny code change!

The place would be FunctionCallInvoke(). It's a macro in fmgr.h now. Perhaps we should make it a function in fmgr.c, so that fmgr.h does not have to include header files for parallelism-related definitions.

We have to evaluate the performance effect of converting FunctionCallInvoke() into a function and adding an if statement there, because it's a relatively low-level function.

SOLUTION TO (2)
========================================

1) Make it possible for workers to assign an XID and share it among the parallel processes
The problems are:

* Tuple visibility
If the worker that acquires the XID writes some row and another worker reads that row before it gets to see the XID information, the latter worker won't treat such a row is written by its own transaction.

For instance, the worker (w-1) that acquires the XID (501) deletes the tuple (CTID: 0, 2). Now, another worker (w-2) reads that tuple (CTID: 0, 2), it would consider that the tuple is still visible to its snapshot but if the w-2 knows that 501 is its own XID, it would have been considered it as (not-visible) deleted. I think this can happen when multiple updates to the same row happen and new rows get added to the new page.

* The implementation seems complex
When the DML is run inside a deeply nested subtransaction and the parent transactions have not allocated their XIDs yet, the worker needs to allocate the XIDs for its parents. That indeterminate number of XIDs must be stored in shared memory. The stack of TransactionState structures must also be passed.

Also, TransactionIdIsCurrentTransactionId() uses an array ParallelCurrentXids where parallel workers receive sub-committed XIDs from the leader. This needs to be reconsidered.

2) The leader assigns an XID before entering parallel mode and passes it to workers
This is what was done in [1].

The problem is that the XID would not be used if the data source (SELECT query) returns no valid rows. This is a waste of XID.

However, the data source should be rarely empty when this feature is used. As the following Oracle manual says, parallel DML will be used in data analytics and OLTP batch jobs. There should be plenty of source data in those scenarios.

When to Use Parallel DML
https://docs.oracle.com/en/database/oracle/oracle-database/21/vldbg/types-parallelism.html#GUID-18B2AF09-C548-48DE-A794-86224111549F
--------------------------------------------------
Several scenarios where parallel DML is used include:

Refreshing Tables in a Data Warehouse System

Creating Intermediate Summary Tables

Using Scoring Tables

Updating Historical Tables

Running Batch Jobs
--------------------------------------------------

CONCLUSION
========================================

(1) The overhead of checking parallel-safety could be large
We're inclined to go with solution 2, because it doesn't have a big problem. However, we'd like to try to present some more analysis on solution 1 in this thread.

Regarding how to check parallel safety in executor, I prefer the simpler way of adding a check in function execution. If it turns out to have an untolerable performance problem, we can choose the other approach.

(2) There's no mechanism for parallel workers to assign an XID
We'd like to adopt solution 2 because it will really not have a big issue in the assumed use cases. The implementation is very easy and does not look strange.

Of course, any better-looking idea would be much appreciated. (But simple, or not unnecessarily complex, one is desired.)

[1]
Parallel INSERT (INTO ... SELECT ...)
https://www.postgresql.org/message-id/flat/CAJcOf-cXnB5cnMKqWEp2E2z7Mvcd04iLVmV=qpFJrR3AcrTS3g(at)mail(dot)gmail(dot)com

Regards
Takayuki Tsunakawa

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Amit Langote 2021-04-12 01:29:39 Re: Table refer leak in logical replication
Previous Message Zhihong Yu 2021-04-11 23:33:52 Re: Have I found an interval arithmetic bug?