From 6099f30cf78b0ed8608670ff07b8a71b8cf0d47c Mon Sep 17 00:00:00 2001 From: Mircea Cadariu Date: Sun, 3 May 2026 16:42:20 +0100 Subject: [PATCH v4] pgbench: parallelize account loading for range-partitioned tables In init mode with range partitioning, -j > 1 loads pgbench_accounts in parallel. Each worker creates its assigned partitions as standalone tables, populates them with COPY FREEZE, and the main connection attaches them afterwards. --- doc/src/sgml/ref/pgbench.sgml | 9 + src/bin/pgbench/pgbench.c | 258 +++++++++++++++++-- src/bin/pgbench/t/001_pgbench_with_server.pl | 29 +++ 3 files changed, 269 insertions(+), 27 deletions(-) diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml index 2e401d1ceb..3594b731cc 100644 --- a/doc/src/sgml/ref/pgbench.sgml +++ b/doc/src/sgml/ref/pgbench.sgml @@ -382,6 +382,11 @@ pgbench options d the scaled number of accounts. Default is 0, meaning no partitioning. + + With greater than 1 and + , partitions are + loaded in parallel. + @@ -502,6 +507,10 @@ pgbench options d Clients are distributed as evenly as possible among available threads. Default is 1. + + In initialization mode (), + sets the number of threads used to load partitions in parallel. + diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 1dae918cc0..aa21b653ce 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -4817,6 +4817,34 @@ initDropTables(PGconn *con) "pgbench_tellers"); } +static void +appendAccountsRangeForValues(PQExpBufferData *query, int p) +{ + int64 part_size = (naccounts * (int64) scale + partitions - 1) / partitions; + + appendPQExpBufferStr(query, " for values from ("); + if (p == 1) + appendPQExpBufferStr(query, "minvalue"); + else + appendPQExpBuffer(query, INT64_FORMAT, (p - 1) * part_size + 1); + appendPQExpBufferStr(query, ") to ("); + if (p < partitions) + appendPQExpBuffer(query, INT64_FORMAT, p * part_size + 1); + else + appendPQExpBufferStr(query, "maxvalue"); + appendPQExpBufferChar(query, ')'); +} + +static void +getAccountsPartitionRows(int p, int64 *start_row, int64 *end_row) +{ + int64 total_rows = (int64) naccounts * scale; + int64 part_size = (total_rows + partitions - 1) / partitions; + + *start_row = (int64) (p - 1) * part_size; + *end_row = (p == partitions) ? total_rows : (int64) p * part_size; +} + /* * Create "pgbench_accounts" partitions if needed. * @@ -4839,33 +4867,17 @@ createPartitions(PGconn *con) { if (partition_method == PART_RANGE) { - int64 part_size = (naccounts * (int64) scale + partitions - 1) / partitions; - - printfPQExpBuffer(&query, - "create%s table pgbench_accounts_%d\n" - " partition of pgbench_accounts\n" - " for values from (", - unlogged_tables ? " unlogged" : "", p); - /* * For RANGE, we use open-ended partitions at the beginning and * end to allow any valid value for the primary key. Although the * actual minimum and maximum values can be derived from the * scale, it is more generic and the performance is better. */ - if (p == 1) - appendPQExpBufferStr(&query, "minvalue"); - else - appendPQExpBuffer(&query, INT64_FORMAT, (p - 1) * part_size + 1); - - appendPQExpBufferStr(&query, ") to ("); - - if (p < partitions) - appendPQExpBuffer(&query, INT64_FORMAT, p * part_size + 1); - else - appendPQExpBufferStr(&query, "maxvalue"); - - appendPQExpBufferChar(&query, ')'); + printfPQExpBuffer(&query, + "create%s table pgbench_accounts_%d\n" + " partition of pgbench_accounts", + unlogged_tables ? " unlogged" : "", p); + appendAccountsRangeForValues(&query, p); } else if (partition_method == PART_HASH) printfPQExpBuffer(&query, @@ -4889,6 +4901,62 @@ createPartitions(PGconn *con) termPQExpBuffer(&query); } +static void +createStandalonePartitions(PGconn *con, int part_start, int part_end) +{ + PQExpBufferData query; + const char *aid_type = (scale >= SCALE_32BIT_THRESHOLD) ? "bigint" : "int"; + + Assert(partitions > 0); + Assert(partition_method == PART_RANGE); + + initPQExpBuffer(&query); + + for (int p = part_start; p <= part_end; p++) + { + printfPQExpBuffer(&query, + "create%s table pgbench_accounts_%d\n" + " (aid %s not null,\n" + " bid int,\n" + " abalance int,\n" + " filler character(84))\n" + " with (fillfactor=%d)", + unlogged_tables ? " unlogged" : "", p, + aid_type, fillfactor); + + executeStatement(con, query.data); + } + + termPQExpBuffer(&query); +} + +static void +attachStandalonePartitions(PGconn *con) +{ + PQExpBufferData query; + + Assert(partitions > 0); + Assert(partition_method == PART_RANGE); + + initPQExpBuffer(&query); + + executeStatement(con, "begin"); + + for (int p = 1; p <= partitions; p++) + { + printfPQExpBuffer(&query, + "alter table pgbench_accounts\n" + " attach partition pgbench_accounts_%d", + p); + appendAccountsRangeForValues(&query, p); + executeStatement(con, query.data); + } + + executeStatement(con, "commit"); + + termPQExpBuffer(&query); +} + /* * Create pgbench's standard tables */ @@ -4981,7 +5049,17 @@ initCreateTables(PGconn *con) termPQExpBuffer(&query); if (partition_method != PART_NONE) + { + /* + * In the parallel range-partitioned case, partitions are created by + * the worker threads (so each one can use COPY FREEZE in its own + * transaction) and attached afterwards. + */ + if (partition_method == PART_RANGE && nthreads > 1) + return; + createPartitions(con); + } } /* @@ -5143,6 +5221,121 @@ initPopulateTable(PGconn *con, const char *table, int64 base, termPQExpBuffer(&sql); } +static void +initPopulatePartition(PGconn *con, int partno) +{ + int64 start_row; + int64 end_row; + char copy_stmt[256]; + PGresult *res; + PQExpBufferData sql; + int64 row; + + getAccountsPartitionRows(partno, &start_row, &end_row); + + snprintf(copy_stmt, sizeof(copy_stmt), + PQserverVersion(con) >= 140000 ? + "copy pgbench_accounts_%d from stdin with (freeze on)" : + "copy pgbench_accounts_%d from stdin", + partno); + + res = PQexec(con, copy_stmt); + if (PQresultStatus(res) != PGRES_COPY_IN) + pg_fatal("could not start COPY for partition %d: %s", + partno, PQerrorMessage(con)); + PQclear(res); + + initPQExpBuffer(&sql); + + for (row = start_row; row < end_row; row++) + { + initAccount(&sql, row); + if (PQputCopyData(con, sql.data, sql.len) <= 0) + pg_fatal("PQputCopyData failed for partition %d", partno); + } + + if (PQputCopyEnd(con, NULL) <= 0) + pg_fatal("PQputCopyEnd failed for partition %d", partno); + + res = PQgetResult(con); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("COPY failed for partition %d: %s", partno, PQerrorMessage(con)); + PQclear(res); + + termPQExpBuffer(&sql); +} + +typedef struct PartitionWorkerData +{ + int thread_id; + int part_start; + int part_end; +} PartitionWorkerData; + +static THREAD_FUNC_RETURN_TYPE THREAD_FUNC_CC +initPartitionWorkerThread(void *arg) +{ + PartitionWorkerData *data = (PartitionWorkerData *) arg; + PGconn *con = doConnect(); + int p; + + if (con == NULL) + pg_fatal("could not create connection for partition worker (parts %d-%d)", + data->part_start, data->part_end); + + executeStatement(con, "begin"); + createStandalonePartitions(con, data->part_start, data->part_end); + for (p = data->part_start; p <= data->part_end; p++) + { + pg_time_usec_t start = pg_time_now(); + + initPopulatePartition(con, p); + fprintf(stderr, "partition %d loaded by thread %d (in %.2f s)\n", + p, data->thread_id, + PG_TIME_GET_DOUBLE(pg_time_now() - start)); + } + executeStatement(con, "commit"); + + PQfinish(con); + THREAD_FUNC_RETURN; +} + +static void +initLoadAccountsParallel(void) +{ + THREAD_T *threads; + PartitionWorkerData *data; + int parts_per_worker = partitions / nthreads; + int extra_parts = partitions % nthreads; + int next_part = 1; + int i; + + fprintf(stderr, "creating %d partitions...\n", partitions); + fprintf(stderr, "loading pgbench_accounts with %d threads...\n", nthreads); + + threads = pg_malloc_array(THREAD_T, nthreads); + data = pg_malloc_array(PartitionWorkerData, nthreads); + + for (i = 0; i < nthreads; i++) + { + data[i].thread_id = i; + data[i].part_start = next_part; + data[i].part_end = next_part + parts_per_worker - 1 + + (i < extra_parts ? 1 : 0); + next_part = data[i].part_end + 1; + + errno = THREAD_CREATE(&threads[i], initPartitionWorkerThread, &data[i]); + if (errno != 0) + pg_fatal("could not create thread for worker %d: %m", i); + } + + for (i = 0; i < nthreads; i++) + THREAD_JOIN(threads[i]); + + free(threads); + free(data); +} + /* * Fill the standard tables with some data generated and sent from the client. * @@ -5155,8 +5348,11 @@ initGenerateDataClientSide(PGconn *con) fprintf(stderr, "generating data (client-side)...\n"); /* - * we do all of this in one transaction to enable the backend's - * data-loading optimizations + * For serial loading, do everything in one transaction to enable the + * backend's data-loading optimizations. For parallel loading + * (range-partitioned, -j > 1), load branches and tellers in one + * transaction, then load accounts in parallel with each worker in its own + * transaction. */ executeStatement(con, "begin"); @@ -5169,9 +5365,18 @@ initGenerateDataClientSide(PGconn *con) */ initPopulateTable(con, "pgbench_branches", nbranches, initBranch); initPopulateTable(con, "pgbench_tellers", ntellers, initTeller); - initPopulateTable(con, "pgbench_accounts", naccounts, initAccount); - executeStatement(con, "commit"); + if (partition_method == PART_RANGE && nthreads > 1) + { + executeStatement(con, "commit"); + initLoadAccountsParallel(); + attachStandalonePartitions(con); + } + else + { + initPopulateTable(con, "pgbench_accounts", naccounts, initAccount); + executeStatement(con, "commit"); + } } /* @@ -6944,7 +7149,6 @@ main(int argc, char **argv) initialization_option_set = true; break; case 'j': /* jobs */ - benchmarking_option_set = true; if (!option_parse_int(optarg, "-j/--jobs", 1, INT_MAX, &nthreads)) { @@ -7176,7 +7380,7 @@ main(int argc, char **argv) * optimization; throttle_delay is calculated incorrectly below if some * threads have no clients assigned to them.) */ - if (nthreads > nclients) + if (nthreads > nclients && !is_init_mode) nthreads = nclients; /* diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl index b7685ea5d2..29ee28d616 100644 --- a/src/bin/pgbench/t/001_pgbench_with_server.pl +++ b/src/bin/pgbench/t/001_pgbench_with_server.pl @@ -164,6 +164,35 @@ $node->pgbench( # Check data state, after server-side data generation. check_data_state($node, 'server-side'); +# Test parallel initialization with range partitions (client-side generation). +# Use -j to control the number of worker threads; partitions must be >= -j. +$node->pgbench( + '-i -j 2 -s 1 --partitions=4 --partition-method=range', + 0, + [qr{^$}], + [ + qr{creating tables}, + qr{creating 4 partitions}, + qr{loading pgbench_accounts with 2 threads}, + qr{partition \d loaded by thread \d \(in \d+\.\d\d s\)}, + qr{vacuuming}, + qr{creating primary keys}, + qr{done in \d+\.\d\d s } + ], + 'pgbench parallel initialization with range partitions'); + +check_data_state($node, 'parallel-range-partitions'); + +# Uneven distribution: 5 partitions across 2 threads (3 + 2). +$node->pgbench( + '-i -j 2 -s 1 --partitions=5 --partition-method=range', + 0, + [qr{^$}], + [ qr{loading pgbench_accounts with 2 threads}, qr{done in \d+\.\d\d s } ], + 'pgbench parallel init with uneven partition distribution'); + +check_data_state($node, 'parallel-range-uneven'); + # Run all builtin scripts, for a few transactions each $node->pgbench( '--transactions=5 -Dfoo=bla --client=2 --protocol=simple --builtin=t' -- 2.39.5 (Apple Git-154)