From d8d0b4e92b118290c895be3f9ff21ff2ad79f0fa Mon Sep 17 00:00:00 2001 From: Mircea Cadariu Date: Wed, 11 Mar 2026 16:55:50 +0000 Subject: [PATCH 2/2] Extend pgbench parallel data loading to range-partitioned tables Each worker thread creates and loads complete partitions as standalone tables, then partitions are attached to the parent after all data is loaded. This avoids AccessExclusiveLock contention on the parent table during parallel loading and allows COPY FREEZE on each partition. The number of threads is capped to the number of partitions. Hash partitioning with parallel loading is not supported and raises an error. --- doc/src/sgml/ref/pgbench.sgml | 8 +- src/bin/pgbench/pgbench.c | 186 ++++++++++++++++--- src/bin/pgbench/t/001_pgbench_with_server.pl | 17 ++ src/bin/pgbench/t/002_pgbench_no_server.pl | 5 + 4 files changed, 189 insertions(+), 27 deletions(-) diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml index 41772442d1..8dcee8f423 100644 --- a/doc/src/sgml/ref/pgbench.sgml +++ b/doc/src/sgml/ref/pgbench.sgml @@ -506,8 +506,12 @@ pgbench options d In initialization mode (), this option controls the number of threads used to load data into pgbench_accounts in parallel. - Parallel data loading is currently supported for - non-partitioned tables only. + With using + range partitioning, each thread loads one or more + complete partitions independently. + The number of threads is limited to the number of partitions. + Parallel data loading is not currently supported with + hash partitioning. diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 1ecd44c4ab..938fec1cc5 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -849,6 +849,8 @@ static void clear_socket_set(socket_set *sa); static void add_socket_to_set(socket_set *sa, int fd, int idx); static int wait_on_socket_set(socket_set *sa, int64 usecs); static bool socket_has_input(socket_set *sa, int fd, int idx); +static void createPartitions(PGconn *con, int part_start, int part_end); +static void attachPartitions(PGconn *con); /* callback used to build rows for COPY during data loading */ typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr); @@ -867,6 +869,9 @@ typedef struct WorkerTask int64 end_row; initRowMethod append_row; int worker_id; + int part_start; + int part_end; + int64 part_size; } WorkerTask; static char @@ -1701,6 +1706,52 @@ assignWorkerRows(WorkerTask *wt, int num_workers, int64 total_rows) (wt->worker_id + 1) * rows_per_worker; } +/* + * Covers only multiple partitions per worker (workers <= partitions) for now. + * Each worker loads complete partitions independently and can use COPY FREEZE. + */ +static void +assignWorkerPartitions(WorkerTask *wt, int num_workers, int64 total_rows, + int num_parts) +{ + int parts_per_worker = num_parts / num_workers; + int extra_parts = num_parts % num_workers; + + wt->part_start = wt->worker_id * parts_per_worker + 1 + + (wt->worker_id < extra_parts ? wt->worker_id : extra_parts); + wt->part_end = wt->part_start + parts_per_worker - 1 + + (wt->worker_id < extra_parts ? 1 : 0); + + wt->start_row = (wt->part_start - 1) * wt->part_size; + wt->end_row = (wt->part_end == num_parts) ? + total_rows : + wt->part_end * wt->part_size; +} + +/* Load data into partitioned table */ +static void +loadPartitionedTable(PGconn *conn, WorkerTask *wt) +{ + int p; + + for (p = wt->part_start; p <= wt->part_end; p++) + { + CopyTarget target; + int64 part_start_row = (p - 1) * wt->part_size; + int64 part_end_row = (p == partitions) ? (naccounts * (int64) scale) : (p * wt->part_size); + char partition_table[NAMEDATALEN]; + + snprintf(partition_table, sizeof(partition_table), "pgbench_accounts_%d", p); + + target.table_name = partition_table; + target.start_row = part_start_row; + target.end_row = part_end_row; + target.use_freeze = true; + + performCopy(conn, wt, &target); + } +} + /* * Load data into non-partitioned table. * @@ -1731,14 +1782,20 @@ initWorkerThread(void *arg) conn = wt->con; /* - * Start a new transaction for this worker, except for worker 0. - * Worker 0 continues the transaction from the main thread that already - * did the truncate (to enable COPY FREEZE). + * Start a new transaction for this worker, except for worker 0 on + * non-partitioned tables. Worker 0 continues the transaction from the + * main thread that already did the truncate (to enable COPY FREEZE). */ - if (wt->worker_id > 0) + if (wt->part_start > 0 || wt->worker_id > 0) executeStatement(conn, "begin"); - loadRegularTable(conn, wt); + if (wt->part_start > 0) + { + createPartitions(conn, wt->part_start, wt->part_end); + loadPartitionedTable(conn, wt); + } + else + loadRegularTable(conn, wt); executeStatement(conn, "commit"); @@ -1753,6 +1810,7 @@ initPopulateTableParallel(PGconn *connection, int num_workers, THREAD_T *worker_threads; WorkerTask *worker_data; PGconn **connections; + bool is_partitioned; int i; /* Allocate worker data and threads */ @@ -1769,9 +1827,17 @@ initPopulateTableParallel(PGconn *connection, int num_workers, pg_fatal("could not create connection for worker %d", i); } + /* Works only for pgbench_accounts and the range partitioning option */ + is_partitioned = strcmp(table, "pgbench_accounts") == 0 && partition_method == PART_RANGE; + executeStatement(connections[0], "begin"); truncateTable(connections[0], table); + if (is_partitioned) + { + executeStatement(connections[0], "commit"); + } + fprintf(stderr, "loading %s with %d threads...\n", table, num_workers); /* Create and start worker threads */ @@ -1782,7 +1848,14 @@ initPopulateTableParallel(PGconn *connection, int num_workers, worker_data[i].append_row = append_row; worker_data[i].worker_id = i; - assignWorkerRows(&worker_data[i], num_workers, total_rows); + if (!is_partitioned) + assignWorkerRows(&worker_data[i], num_workers, total_rows); + else + { + worker_data[i].part_size = (naccounts * (int64) scale + partitions - 1) / partitions; + assignWorkerPartitions(&worker_data[i], num_workers, total_rows, + partitions); + } errno = THREAD_CREATE(&worker_threads[i], initWorkerThread, &worker_data[i]); if (errno != 0) @@ -1796,6 +1869,12 @@ initPopulateTableParallel(PGconn *connection, int num_workers, for (i = 0; i < num_workers; i++) THREAD_JOIN(worker_threads[i]); + /* + * For partitioned tables, attach all partitions now that data is loaded. + */ + if (is_partitioned) + attachPartitions(connection); + /* * Clean up worker connections (skip index 0, which is the main * connection) @@ -5046,14 +5125,50 @@ initDropTables(PGconn *con) * with a known size, so we choose to partition it. */ static void -createPartitions(PGconn *con) +createPartitions(PGconn *con, int part_start, int part_end) { PQExpBufferData query; /* we must have to create some partitions */ Assert(partitions > 0); - fprintf(stderr, "creating %d partitions...\n", partitions); + initPQExpBuffer(&query); + + for (int p = part_start; p <= part_end; p++) + { + /* + * Create standalone tables (not attached to parent yet). This avoids + * AccessExclusiveLock on the parent table, allowing parallel + * creation. Tables will be attached after data loading. + */ + printfPQExpBuffer(&query, + "create%s table pgbench_accounts_%d\n" + " (aid int not null,\n" + " bid int,\n" + " abalance int,\n" + " filler character(84))\n" + " with (fillfactor=%d)", + unlogged_tables ? " unlogged" : "", p, + fillfactor); + + executeStatement(con, query.data); + } + + termPQExpBuffer(&query); +} + +/* + * Attach standalone partition tables to the parent table. + * Called after all data has been loaded in parallel. + */ +static void +attachPartitions(PGconn *con) +{ + PQExpBufferData query; + + Assert(partitions > 0); + + fprintf(stderr, "attaching %d partitions...\n", partitions); initPQExpBuffer(&query); @@ -5064,10 +5179,9 @@ createPartitions(PGconn *con) int64 part_size = (naccounts * (int64) scale + partitions - 1) / partitions; printfPQExpBuffer(&query, - "create%s table pgbench_accounts_%d\n" - " partition of pgbench_accounts\n" - " for values from (", - unlogged_tables ? " unlogged" : "", p); + "alter table pgbench_accounts\n" + " attach partition pgbench_accounts_%d\n" + " for values from (", p); /* * For RANGE, we use open-ended partitions at the beginning and @@ -5090,21 +5204,16 @@ createPartitions(PGconn *con) appendPQExpBufferChar(&query, ')'); } else if (partition_method == PART_HASH) + { printfPQExpBuffer(&query, - "create%s table pgbench_accounts_%d\n" - " partition of pgbench_accounts\n" + "alter table pgbench_accounts\n" + " attach partition pgbench_accounts_%d\n" " for values with (modulus %d, remainder %d)", - unlogged_tables ? " unlogged" : "", p, - partitions, p - 1); + p, partitions, p - 1); + } else /* cannot get there */ Assert(0); - /* - * Per ddlinfo in initCreateTables, fillfactor is needed on table - * pgbench_accounts. - */ - appendPQExpBuffer(&query, " with (fillfactor=%d)", fillfactor); - executeStatement(con, query.data); } @@ -5202,8 +5311,11 @@ initCreateTables(PGconn *con) termPQExpBuffer(&query); - if (partition_method != PART_NONE) - createPartitions(con); + if (partition_method != PART_NONE && (nthreads == 1 || partition_method == PART_HASH)) + { + fprintf(stderr, "creating %d partitions...\n", partitions); + createPartitions(con, 1, partitions); + } } /* @@ -5397,10 +5509,21 @@ static void initPopulateTable(PGconn *con, const char *table, int64 total_rows, initRowMethod init_row, initRowMethod append_row, bool use_parallel) { - if (use_parallel && nthreads > 1 && partition_method == PART_NONE) + bool is_accounts = (strcmp(table, "pgbench_accounts") == 0); + + if (use_parallel && nthreads > 1) initPopulateTableParallel(con, nthreads, table, total_rows * scale, append_row); else + { + /* + * For single-threaded mode with partitioned tables, attach partitions + * before loading data so COPY to the parent table can route rows. + */ + if (is_accounts && partitions > 0 && partition_method != PART_NONE) + attachPartitions(con); + initPopulateTableSerial(con, table, total_rows, init_row); + } } /* @@ -5463,6 +5586,9 @@ initGenerateDataServerSide(PGconn *con) /* truncate away any old data */ initTruncateTables(con); + if (partitions > 0 && partition_method != PART_NONE) + attachPartitions(con); + initPQExpBuffer(&sql); printfPQExpBuffer(&sql, @@ -7486,6 +7612,16 @@ main(int argc, char **argv) if (partitions > 0 && partition_method == PART_NONE) partition_method = PART_RANGE; + if (partition_method == PART_HASH && nthreads > 1) + pg_fatal("parallel data loading (-j) is not supported with hash partitioning"); + + /* + * For partitioned tables, limit the number of threads to the number of + * partitions, since each worker handles at least one partition. + */ + if (partition_method == PART_RANGE && nthreads > partitions) + nthreads = partitions; + if (initialize_steps == NULL) initialize_steps = pg_strdup(DEFAULT_INIT_STEPS); diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl index b0a4b973b4..c64ca622d7 100644 --- a/src/bin/pgbench/t/001_pgbench_with_server.pl +++ b/src/bin/pgbench/t/001_pgbench_with_server.pl @@ -235,6 +235,23 @@ if ($nthreads > 1) 'pgbench parallel initialization without partitions'); check_data_state($node, 'parallel-no-partitions'); + + # Parallel init with range partitions + $node->pgbench( + '-i -j 2 --scale=1 --partitions=4 --partition-method=range', + 0, + [qr{^$}], + [ + qr{creating tables}, + qr{loading pgbench_accounts with 2 threads}, + qr{attaching 4 partitions}, + qr{vacuuming}, + qr{creating primary keys}, + qr{done in \d+\.\d\d s } + ], + 'pgbench parallel initialization with range partitions'); + + check_data_state($node, 'parallel-range-partitions'); } # run custom scripts diff --git a/src/bin/pgbench/t/002_pgbench_no_server.pl b/src/bin/pgbench/t/002_pgbench_no_server.pl index e694e9ef0f..d67f26e422 100644 --- a/src/bin/pgbench/t/002_pgbench_no_server.pl +++ b/src/bin/pgbench/t/002_pgbench_no_server.pl @@ -187,6 +187,11 @@ my @options = ( '-i --partition-method=hash', [qr{partition-method requires greater than zero --partitions}] ], + [ + 'parallel data loading with hash partitioning', + '-i -j 2 --partitions=4 --partition-method=hash', + [qr{parallel data loading \(-j\) is not supported with hash partitioning}] + ], [ 'bad maximum number of tries', '--max-tries -10', -- 2.39.5 (Apple Git-154)