From dd4f3e2d7dbae6b008157f4928287056fd0a82b9 Mon Sep 17 00:00:00 2001 From: Mircea Cadariu Date: Wed, 8 Apr 2026 15:35:31 +0100 Subject: [PATCH] pgbench: parallelize account loading for range-partitioned tables When initializing with range partitioning, spawn one worker thread per partition to load pgbench_accounts in parallel. Each worker opens its own connection, truncates its partition within a transaction, and loads its rows using COPY FREEZE, which avoids a separate freeze pass during the subsequent vacuum step. Non-partitioned and hash-partitioned tables are unaffected and continue to use serial loading. --- src/bin/pgbench/pgbench.c | 120 ++++++++++++++++++- src/bin/pgbench/t/001_pgbench_with_server.pl | 18 +++ 2 files changed, 134 insertions(+), 4 deletions(-) diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 1dae918cc0..f537d46393 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -5143,6 +5143,106 @@ initPopulateTable(PGconn *con, const char *table, int64 base, termPQExpBuffer(&sql); } +static void +initPopulatePartition(PGconn *con, int partno) +{ + int64 total_rows = (int64) naccounts * scale; + int64 part_size = (total_rows + partitions - 1) / partitions; + int64 start_row = (int64) (partno - 1) * part_size; + int64 end_row = (partno == partitions) ? total_rows : (int64) partno * part_size; + char table_name[NAMEDATALEN]; + char truncate_stmt[256]; + char copy_stmt[256]; + int n; + PGresult *res; + PQExpBufferData sql; + int64 row; + + snprintf(table_name, sizeof(table_name), "pgbench_accounts_%d", partno); + snprintf(truncate_stmt, sizeof(truncate_stmt), "truncate %s", table_name); + + if (PQserverVersion(con) >= 140000) + n = pg_snprintf(copy_stmt, sizeof(copy_stmt), + "copy %s from stdin with (freeze on)", table_name); + else + n = pg_snprintf(copy_stmt, sizeof(copy_stmt), + "copy %s from stdin", table_name); + + if (n >= sizeof(copy_stmt)) + pg_fatal("invalid buffer size: must be at least %d characters long", n); + + executeStatement(con, truncate_stmt); + + res = PQexec(con, copy_stmt); + if (PQresultStatus(res) != PGRES_COPY_IN) + pg_fatal("could not start COPY for partition %d: %s", + partno, PQerrorMessage(con)); + PQclear(res); + + initPQExpBuffer(&sql); + + for (row = start_row; row < end_row; row++) + { + initAccount(&sql, row); + if (PQputCopyData(con, sql.data, sql.len) <= 0) + pg_fatal("PQputCopyData failed for partition %d", partno); + } + + if (PQputCopyEnd(con, NULL) <= 0) + pg_fatal("PQputCopyEnd failed for partition %d", partno); + + res = PQgetResult(con); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("COPY failed for partition %d: %s", partno, PQerrorMessage(con)); + PQclear(res); + + termPQExpBuffer(&sql); +} + +static THREAD_FUNC_RETURN_TYPE THREAD_FUNC_CC +initPartitionWorkerThread(void *arg) +{ + int partno = *(int *) arg; + PGconn *con = doConnect(); + + if (con == NULL) + pg_fatal("could not create connection for partition worker %d", partno); + + executeStatement(con, "begin"); + initPopulatePartition(con, partno); + executeStatement(con, "commit"); + + PQfinish(con); + THREAD_FUNC_RETURN; +} + +static void +initLoadAccountsParallel(void) +{ + THREAD_T *threads; + int *partno; + int i; + + fprintf(stderr, "loading pgbench_accounts with %d threads...\n", partitions); + + threads = pg_malloc_array(THREAD_T, partitions); + partno = pg_malloc_array(int, partitions); + + for (i = 0; i < partitions; i++) + { + partno[i] = i + 1; + errno = THREAD_CREATE(&threads[i], initPartitionWorkerThread, &partno[i]); + if (errno != 0) + pg_fatal("could not create thread for partition %d: %m", i + 1); + } + + for (i = 0; i < partitions; i++) + THREAD_JOIN(threads[i]); + + free(threads); + free(partno); +} + /* * Fill the standard tables with some data generated and sent from the client. * @@ -5155,8 +5255,11 @@ initGenerateDataClientSide(PGconn *con) fprintf(stderr, "generating data (client-side)...\n"); /* - * we do all of this in one transaction to enable the backend's - * data-loading optimizations + * For the non-partitioned and hash-partitioned cases, do everything in + * one transaction to enable the backend's data-loading optimizations. For + * range-partitioned tables, branches and tellers are loaded in one + * transaction, then accounts are loaded in parallel with one thread per + * partition, each in its own transaction. */ executeStatement(con, "begin"); @@ -5169,9 +5272,18 @@ initGenerateDataClientSide(PGconn *con) */ initPopulateTable(con, "pgbench_branches", nbranches, initBranch); initPopulateTable(con, "pgbench_tellers", ntellers, initTeller); - initPopulateTable(con, "pgbench_accounts", naccounts, initAccount); - executeStatement(con, "commit"); + if (partition_method == PART_RANGE) + { + executeStatement(con, "commit"); + initLoadAccountsParallel(); + } + else + { + /* hash partitioning and non-partitioned tables use serial loading */ + initPopulateTable(con, "pgbench_accounts", naccounts, initAccount); + executeStatement(con, "commit"); + } } /* diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl index b7685ea5d2..b59c181c2a 100644 --- a/src/bin/pgbench/t/001_pgbench_with_server.pl +++ b/src/bin/pgbench/t/001_pgbench_with_server.pl @@ -164,6 +164,24 @@ $node->pgbench( # Check data state, after server-side data generation. check_data_state($node, 'server-side'); +# Test parallel initialization with range partitions (client-side generation). +# One thread per partition is spawned automatically. +$node->pgbench( + '-i -s 1 --partitions=4 --partition-method=range', + 0, + [qr{^$}], + [ + qr{creating tables}, + qr{creating 4 partitions}, + qr{loading pgbench_accounts with 4 threads}, + qr{vacuuming}, + qr{creating primary keys}, + qr{done in \d+\.\d\d s } + ], + 'pgbench parallel initialization with range partitions'); + +check_data_state($node, 'parallel-range-partitions'); + # Run all builtin scripts, for a few transactions each $node->pgbench( '--transactions=5 -Dfoo=bla --client=2 --protocol=simple --builtin=t' -- 2.39.5 (Apple Git-154)