Parallel tuplesort (for parallel B-Tree index creation)

From: Peter Geoghegan <pg(at)heroku(dot)com>
To: Pg Hackers <pgsql-hackers(at)postgresql(dot)org>
Cc: Corey Huinker <corey(dot)huinker(at)gmail(dot)com>
Subject: Parallel tuplesort (for parallel B-Tree index creation)
Date: 2016-08-01 22:18:22
Views: Raw Message | Whole Thread | Download mbox | Resend email
Lists: pgsql-hackers

As some of you know, I've been working on parallel sort. I think I've
gone as long as I can without feedback on the design (and I see that
we're accepting stuff for September CF now), so I'd like to share what
I came up with. This project is something that I've worked on
inconsistently since late last year. It can be thought of as the
Postgres 10 follow-up to the 9.6 work on external sorting.

Attached WIP patch series:

* Adds a parallel sorting capability to tuplesort.c.

* Adds a new client of this capability: btbuild()/nbtsort.c can now
create B-Trees in parallel.

Most of the complexity here relates to the first item; the tuplesort
module has been extended to support sorting in parallel. This is
usable in principle by every existing tuplesort caller, without any
restriction imposed by the newly expanded tuplesort.h interface. So,
for example, randomAccess MinimalTuple support has been added,
although it goes unused for now.

I went with CREATE INDEX as the first client of parallel sort in part
because the cost model and so on can be relatively straightforward.
Even CLUSTER uses the optimizer to determine if a sort strategy is
appropriate, and that would need to be taught about parallelism if its
tuplesort is to be parallelized. I suppose that I'll probably try to
get CLUSTER (with a tuplesort) done in the Postgres 10 development
cycle too, but not just yet.

For now, I would prefer to focus discussion on tuplesort itself. If
you can only look at one part of this patch, please look at the
high-level description of the interface/caller contract that was added
to tuplesort.h.


Without further ado, I'll demonstrate how the patch series improves
performance in one case. This benchmark was run on an AWS server with
many disks. A d2.4xlarge instance was used, with 16 vCPUs, 122 GiB
RAM, 12 x 2 TB HDDs, running Amazon Linux. Apparently, this AWS
instance type can sustain 1,750 MB/second of I/O, which I was able to
verify during testing (when a parallel sequential scan ran, iotop
reported read throughput slightly above that for multi-second bursts).
Disks were configured in software RAID0. These instances have disks
that are optimized for sequential performance, which suits the patch
quite well. I don't usually trust AWS EC2 for performance testing, but
it seemed to work well here (results were pretty consistent).


CREATE TABLE parallel_sort_test AS
SELECT hashint8(i) randint,
md5(i::text) collate "C" padding1,
md5(i::text || '2') collate "C" padding2
FROM generate_series(0, 1e9::bigint) i;


This leaves us with a parallel_sort_test table that is 94 GB in size.

SET maintenance_work_mem = '8GB';

-- Serial case (external sort, should closely match master branch):
CREATE INDEX serial_idx ON parallel_sort_test (randint) WITH
(parallel_workers = 0);

Total time: 00:15:42.15

-- Patch with 8 tuplesort "sort-and-scan" workers (leader process
participates as a worker here):
CREATE INDEX patch_8_idx ON parallel_sort_test (randint) WITH
(parallel_workers = 7);

Total time: 00:06:03.86

As you can see, the parallel case is 2.58x faster (while using more
memory, though it's not the case that a higher maintenance_work_mem
setting speeds up the serial/baseline index build). 8 workers are a
bit faster than 4, but not by much (not shown). 16 are a bit slower,
but not by much (not shown).

trace_sort output for "serial_idx" case:
begin index sort: unique = f, workMem = 8388608, randomAccess = f
switching to external sort with 501 tapes: CPU 7.81s/25.54u sec
elapsed 33.95 sec
*** SNIP ***
performsort done (except 7-way final merge): CPU 53.52s/666.89u sec
elapsed 731.67 sec
external sort ended, 2443786 disk blocks used: CPU 74.40s/854.52u sec
elapsed 942.15 sec

trace_sort output for "patch_8_idx" case:
begin index sort: unique = f, workMem = 8388608, randomAccess = f
*** SNIP ***
sized memtuples 1.62x from worker's 130254158 (3052832 KB) to
210895910 (4942873 KB) for leader merge (0 KB batch memory conserved)
*** SNIP ***
tape -1/7 initially used 411907 KB of 430693 KB batch (0.956) and
26361986 out of 26361987 slots (1.000)
performsort done (except 8-way final merge): CPU 12.28s/101.76u sec
elapsed 129.01 sec
parallel external sort ended, 2443805 disk blocks used: CPU
30.08s/318.15u sec elapsed 363.86 sec

This is roughly the degree of improvement that I expected when I first
undertook this project late last year. As I go into in more detail
below, I believe that we haven't exhausted all avenues to make
parallel CREATE INDEX faster still, but I do think what's left on the
table is not enormous.

There is less benefit when sorting on a C locale text attribute,
because the overhead of merging dominates parallel sorts, and that's
even more pronounced with text. So, many text cases tend to work out
at about only 2x - 2.2x faster. We could work on this indirectly.

I've seen cases where a CREATE INDEX ended up more than 3x faster,
though. I benchmarked this case in the interest of simplicity (the
serial case is intended to be comparable, making the test fair).
Encouragingly, as you can see from the trace_sort output, the 8
parallel workers are 5.67x faster at getting to the final merge (a
merge that even it performs serially). Note that the final merge for
each CREATE INDEX is comparable (7 runs vs. 8 runs from each of 8
workers). Not bad!

Design: New, key concepts for tuplesort.c

The heap is scanned in parallel, and worker processes also merge in
parallel if required (it isn't required in the example above). The
implementation makes heavy use of existing external sort
infrastructure. In fact, it's almost the case that the implementation
is a generalization of external sorting that allows workers to perform
heap scanning and run sorting independently, with tapes then "unified"
in the leader process for merging. At that point, the state held by
the leader is more or less consistent with the leader being a serial
external sort process that has reached its merge phase in the
conventional manner (serially).

The steps callers must take are described fully in tuplesort.h. The
general idea is that a Tuplesortstate is aware that it might not be a
self-contained sort; it may instead be one part of a parallel sort
operation. You might say that the tuplesort caller must "build its own
sort" from participant worker process Tuplesortstates. The caller
creates a dynamic shared memory segment + TOC for each parallel sort
operation (could be more than one concurrent sort operation, of
course), passes that to tuplesort to initialize and manage, and
creates a "leader" Tuplesortstate in private memory, plus one or more
"worker" Tuplesortstates, each presumably managed by a different
parallel worker process.

tuplesort.c does most of the heavy lifting, including having processes
wait on each other to respect its ordering dependencies. Caller is
responsible for spawning workers to do the work, reporting details of
the workers to tuplesort through shared memory, and having workers
call tuplesort to actually perform sorting. Caller consumes final
output through leader Tuplesortstate in leader process.

I think that this division of labor works well for us.

Tape unification

Sort operations have a unique identifier, generated before any workers
are launched, using a scheme based on the leader's PID, and a unique
temp file number. This makes all on-disk state (temp files managed by
logtape.c) discoverable by the leader process. State in shared memory
is sized in proportion to the number of workers, so the only thing
about the data being sorted that gets passed around in shared memory
is a little logtape.c metadata for tapes, describing for example how
large each constituent BufFile is (a BufFile associated with one
particular worker's tapeset).

(See below also for notes on buffile.c's role in all of this, fd.c and
resource management, etc.)


Each worker process claims workMem as if it was an independent node.

The new implementation reuses much of what was originally designed for
external sorts. As such, parallel sorts are necessarily external
sorts, even when the workMem (i.e. maintenance_work_mem) budget could
in principle allow for parallel sorting to take place entirely in
memory. The implementation arguably *insists* on making such cases
external sorts, when they don't really need to be. This is much less
of a problem than you might think, since the 9.6 work on external
sorting does somewhat blur the distinction between internal and
external sorts (just consider how much time trace_sort indicates is
spent waiting on writes in workers; it's typically a small part of the
total time spent). Since parallel sort is really only compelling for
large sorts, it makes sense to make them external, or at least to
prioritize the cases that should be performed externally.

Anyway, workMem-not-exceeded cases require special handling to not
completely waste memory. Statistics about worker observations are used
at later stages, to at least avoid blatant waste, and to ensure that
memory is used optimally more generally.


The model that I've come up with is that every worker process is
guaranteed to output one materialized run onto one tape for the leader
to merge within from its "unified" tapeset. This is the case
regardless of how much workMem is available, or any other factor. The
leader always assumes that the worker runs/tapes are present and
discoverable based only on the number of known-launched worker
processes, and a little metadata on each that is passed through shared

Producing one output run/materialized tape from all input tuples in a
worker often happens without the worker running out of workMem, which
you saw above. A straight quicksort and dump of all tuples is
therefore possible, without any merging required in the worker.
Alternatively, it may prove necessary to do some amount of merging in
each worker to generate one materialized output run. This case is
handled in the same way as a randomAccess case that requires one
materialized output tape to support random access by the caller. This
worker merging does necessitate another pass over all temp files for
the worker, but that's a much lower cost than you might imagine, in
part because the newly expanded use of batch memory makes merging here
cache efficient.

Batch allocation is used for all merging involved here, not just the
leader's own final-on-the-fly merge, so merging is consistently cache
efficient. (Workers that must merge on their own are therefore similar
to traditional randomAccess callers, so these cases become important
enough to optimize with the batch memory patch, although that's still
independently useful.)

No merging in parallel

Currently, merging worker *output* runs may only occur in the leader
process. In other words, we always keep n worker processes busy with
scanning-and-sorting (and maybe some merging), but then all processes
but the leader process grind to a halt (note that the leader process
can participate as a scan-and-sort tuplesort worker, just as it will
everywhere else, which is why I specified "parallel_workers = 7" but
talked about 8 workers).

One leader process is kept busy with merging these n output runs on
the fly, so things will bottleneck on that, which you saw in the
example above. As already described, workers will sometimes merge in
parallel, but only their own runs -- never another worker's runs. I
did attempt to address the leader merge bottleneck by implementing
cross-worker run merging in workers. I got as far as implementing a
very rough version of this, but initial results were disappointing,
and so that was not pursued further than the experimentation stage.

Parallel merging is a possible future improvement that could be added
to what I've come up with, but I don't think that it will move the
needle in a really noticeable way.

Partitioning for parallelism (samplesort style "bucketing")

Perhaps a partition-based approach would be more effective than
parallel merging (e.g., redistribute slices of worker runs across
workers along predetermined partition boundaries, sort a range of
values within dedicated workers, then concatenate to get final result,
a bit like the in-memory samplesort algorithm). That approach would
not suit CREATE INDEX, because the approach's great strength is that
the workers can run in parallel for the entire duration, since there
is no merge bottleneck (this assumes good partition boundaries, which
is of a bit risky assumption). Parallel CREATE INDEX wants something
where the workers can independently write the index, and independently
WAL log, and independently create a unified set of internal pages, all
of which is hard.

This patch series will tend to proportionally speed up CREATE INDEX
statements at a level that is comparable to other major database
systems. That's enough progress for one release. I think that
partitioning to sort is more useful for query execution than for
utility statements like CREATE INDEX.

Partitioning and merge joins

Robert has often speculated about what it would take to make merge
joins work well in parallel. I think that "range
distribution"/bucketing will prove an important component of that.
It's just too useful to aggregate tuples in shared memory initially,
and have workers sort them without any serial merge bottleneck;
arguments about misestimations, data skew, and so on should not deter
us from this, long term. This approach has minimal IPC overhead,
especially with regard to LWLock contention.

This kind of redistribution probably belongs in a Gather-like node,
though, which has access to the context necessary to determine a
range, and even dynamically alter the range in the event of a
misestimation. Under this scheme, tuplesort.c just needs to be
instructed that these worker-private Tuplesortstates are
range-partitioned (i.e., the sorts are virtually independent, as far
as it's concerned). That's a bit messy, but it is still probably the
way to go for merge joins and other sort-reliant executor nodes.

buffile.c, and "unification"

There has been significant new infrastructure added to make logtape.c
aware of workers. buffile.c has in turn been taught about unification
as a first class part of the abstraction, with low-level management of
certain details occurring within fd.c. So, "tape unification" within
processes to open other backend's logical tapes to generate a unified
logical tapeset for the leader to merge is added. This is probably the
single biggest source of complexity for the patch, since I must

* Creating a general, reusable abstraction for other possible BufFile
users (logtape.c only has to serve tuplesort.c, though).

* Logical tape free space management.

* Resource management, file lifetime, etc. fd.c resource management
can now close a file at xact end for temp files, while not deleting it
in the leader backend (only the "owning" worker backend deletes the
temp file it owns).

* Crash safety (e.g., when to truncate existing temp files, and when not to).

CREATE INDEX user interface

There are two ways of determine how many parallel workers a CREATE
INDEX requests:

* A cost model, which is closely based on create_plain_partial_paths()
at the moment. This needs more work, particularly to model things like
maintenance_work_mem. Even still, it isn't terrible.

* A parallel_workers storage parameter, which completely bypasses the
cost model. This is the "DBA knows best" approach, and is what I've
consistently used during testing.

Corey Huinker has privately assisted me with performance testing the
patch, using his own datasets. Testing has exclusively used the
storage parameter.

I've added a new GUC, max_parallel_workers_maintenance, which is
essentially the utility statement equivalent of
max_parallel_workers_per_gather. This is clearly necessary, since
we're using up to maintenance_work_mem per worker, which is of course
typically much higher than work_mem. I didn't feel the need to create
a new maintenance-wise variant GUC for things like
min_parallel_relation_size, though. Only this one new GUC is added
(plus the new storage parameter, parallel_workers, not to be confused
with the existing table storage parameter of the same name).

I am much more concerned about the tuplesort.h interface than the
CREATE INDEX user interface as such. The user interface is merely a
facade on top of tuplesort.c and nbtsort.c (and not one that I'm
particularly attached to).

Peter Geoghegan

Attachment Content-Type Size
0001-Cap-the-number-of-tapes-used-by-external-sorts.patch.gz application/x-gzip 1.8 KB
0005-Add-force_btree_randomaccess-GUC-for-testing.patch.gz application/x-gzip 1.7 KB
0004-Add-parallel-B-tree-index-build-sorting.patch.gz application/x-gzip 54.3 KB
0003-Rearrange-header-file-include-directives.patch.gz application/x-gzip 1.4 KB
0002-Use-tuplesort-batch-memory-for-randomAccess-sorts.patch.gz application/x-gzip 8.6 KB


Browse pgsql-hackers by date

  From Date Subject
Next Message Tom Lane 2016-08-01 22:18:27 Re: PostmasterContext survives into parallel workers!?
Previous Message Robert Haas 2016-08-01 22:13:50 Re: TODO item: Implement Boyer-Moore searching in LIKE queries