From e4619e03cf3c30653c2de4eafaaf9391eba34f3f Mon Sep 17 00:00:00 2001 From: Mircea Cadariu Date: Sun, 8 Mar 2026 20:05:41 +0000 Subject: [PATCH 1/2] Add parallel data loading support to pgbench Add support for parallel data loading during pgbench initialization (-i mode) using multiple threads (-j/--jobs). When multiple threads are specified, pgbench_accounts data generation is split across worker threads, each loading its portion via a separate connection. Parallel loading is currently supported for non-partitioned tables only; partitioned tables fall back to serial loading. --- doc/src/sgml/ref/pgbench.sgml | 7 + src/bin/pgbench/pgbench.c | 299 +++++++++++++++++-- src/bin/pgbench/t/001_pgbench_with_server.pl | 20 ++ src/tools/pgindent/typedefs.list | 2 + 4 files changed, 311 insertions(+), 17 deletions(-) diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml index 2e401d1ceb..41772442d1 100644 --- a/doc/src/sgml/ref/pgbench.sgml +++ b/doc/src/sgml/ref/pgbench.sgml @@ -502,6 +502,13 @@ pgbench options d Clients are distributed as evenly as possible among available threads. Default is 1. + + In initialization mode (), this option controls + the number of threads used to load data into + pgbench_accounts in parallel. + Parallel data loading is currently supported for + non-partitioned tables only. + diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 1dae918cc0..1ecd44c4ab 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -78,6 +78,8 @@ #define ERRCODE_T_R_DEADLOCK_DETECTED "40P01" #define ERRCODE_UNDEFINED_TABLE "42P01" +#define COPY_BATCH_SIZE (1024 * 1024) + /* * Hashing constants */ @@ -825,7 +827,6 @@ static const BuiltinScript builtin_script[] = } }; - /* Function prototypes */ static void setNullValue(PgBenchValue *pv); static void setBoolValue(PgBenchValue *pv, bool bval); @@ -857,6 +858,17 @@ static const PsqlScanCallbacks pgbench_callbacks = { NULL, /* don't need get_variable functionality */ }; +/* Worker thread data for parallel table loading */ +typedef struct WorkerTask +{ + PGconn *con; + const char *table; + int64 start_row; + int64 end_row; + initRowMethod append_row; + int worker_id; +} WorkerTask; + static char get_table_relkind(PGconn *con, const char *table) { @@ -1586,6 +1598,216 @@ doConnect(void) return conn; } +/* + * Truncate specified table(s) + * tableName can be a single table or comma-separated list of tables + */ +static void +truncateTable(PGconn *con, const char *tableName) +{ + PQExpBufferData query; + initPQExpBuffer(&query); + printfPQExpBuffer(&query, "TRUNCATE TABLE %s", tableName); + executeStatement(con, query.data); + termPQExpBuffer(&query); +} + +/* + * Parameters needed for COPY operations. + */ +typedef struct CopyTarget +{ + const char *table_name; + int64 start_row; + int64 end_row; + bool use_freeze; +} CopyTarget; + +/* + * Perform COPY operation for a single table or partition. + * Batches rows into larger buffers before sending to reduce overhead. + */ +static void +performCopy(PGconn *conn, WorkerTask *wt, CopyTarget *ct) +{ + PGresult *res; + char copy_statement[NAMEDATALEN + 32]; + int64 row; + PQExpBufferData batch_buffer; + + /* Build the COPY command */ + if (ct->use_freeze) + snprintf(copy_statement, sizeof(copy_statement), + "COPY %s FROM STDIN (FREEZE ON)", ct->table_name); + else + snprintf(copy_statement, sizeof(copy_statement), + "COPY %s FROM STDIN", ct->table_name); + + /* Initiate COPY mode */ + res = PQexec(conn, copy_statement); + if (PQresultStatus(res) != PGRES_COPY_IN) + pg_fatal("COPY command failed for table \"%s\": %s", + ct->table_name, PQerrorMessage(conn)); + PQclear(res); + + /* Pre-allocate buffer to avoid repeated reallocs */ + initPQExpBuffer(&batch_buffer); + enlargePQExpBuffer(&batch_buffer, COPY_BATCH_SIZE); + + /* Generate and send rows in batches using append_row */ + for (row = ct->start_row; row < ct->end_row; row++) + { + /* Use append_row to accumulate multiple rows in the buffer */ + wt->append_row(&batch_buffer, row); + + /* Send batch when buffer reaches size threshold */ + if (batch_buffer.len >= COPY_BATCH_SIZE) + { + if (PQputCopyData(conn, batch_buffer.data, batch_buffer.len) <= 0) + pg_fatal("error in PQputCopyData: %s", PQerrorMessage(conn)); + + resetPQExpBuffer(&batch_buffer); + } + } + + /* Send any remaining buffered data */ + if (batch_buffer.len > 0) + { + if (PQputCopyData(conn, batch_buffer.data, batch_buffer.len) <= 0) + pg_fatal("error in PQputCopyData: %s", PQerrorMessage(conn)); + } + + /* Finalize the COPY operation */ + if (PQputCopyEnd(conn, NULL) <= 0) + pg_fatal("error in PQputCopyEnd: %s", PQerrorMessage(conn)); + + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("COPY failed for table \"%s\": %s", + ct->table_name, PQerrorMessage(conn)); + PQclear(res); + + termPQExpBuffer(&batch_buffer); +} + +static void +assignWorkerRows(WorkerTask *wt, int num_workers, int64 total_rows) +{ + int64 rows_per_worker = total_rows / num_workers; + + wt->start_row = wt->worker_id * rows_per_worker; + wt->end_row = (wt->worker_id == num_workers - 1) ? + total_rows : + (wt->worker_id + 1) * rows_per_worker; +} + +/* + * Load data into non-partitioned table. + * + * Only worker 0 can use COPY FREEZE, because it inherits the transaction + * that truncated the table. Other workers use plain COPY in their own + * transactions. + */ +static void +loadRegularTable(PGconn *conn, WorkerTask *wt) +{ + CopyTarget target; + + target.table_name = wt->table; + target.start_row = wt->start_row; + target.end_row = wt->end_row; + target.use_freeze = (wt->worker_id == 0); + + performCopy(conn, wt, &target); +} + +static THREAD_FUNC_RETURN_TYPE THREAD_FUNC_CC +initWorkerThread(void *arg) +{ + WorkerTask *wt = (WorkerTask *) arg; + PGconn *conn; + + /* Connection is pre-created, just use it */ + conn = wt->con; + + /* + * Start a new transaction for this worker, except for worker 0. + * Worker 0 continues the transaction from the main thread that already + * did the truncate (to enable COPY FREEZE). + */ + if (wt->worker_id > 0) + executeStatement(conn, "begin"); + + loadRegularTable(conn, wt); + + executeStatement(conn, "commit"); + + THREAD_FUNC_RETURN; +} + +static void +initPopulateTableParallel(PGconn *connection, int num_workers, + const char *table, int64 total_rows, + initRowMethod append_row) +{ + THREAD_T *worker_threads; + WorkerTask *worker_data; + PGconn **connections; + int i; + + /* Allocate worker data and threads */ + worker_threads = pg_malloc(num_workers * sizeof(THREAD_T)); + worker_data = pg_malloc0(num_workers * sizeof(WorkerTask)); + connections = pg_malloc(num_workers * sizeof(PGconn *)); + + /* Reuse main connection for worker 0, create new ones for others */ + connections[0] = connection; + for (i = 1; i < num_workers; i++) + { + connections[i] = doConnect(); + if (connections[i] == NULL) + pg_fatal("could not create connection for worker %d", i); + } + + executeStatement(connections[0], "begin"); + truncateTable(connections[0], table); + + fprintf(stderr, "loading %s with %d threads...\n", table, num_workers); + + /* Create and start worker threads */ + for (i = 0; i < num_workers; i++) + { + worker_data[i].con = connections[i]; + worker_data[i].table = table; + worker_data[i].append_row = append_row; + worker_data[i].worker_id = i; + + assignWorkerRows(&worker_data[i], num_workers, total_rows); + + errno = THREAD_CREATE(&worker_threads[i], initWorkerThread, &worker_data[i]); + if (errno != 0) + pg_fatal("could not create thread for worker %d: %m", i); + } + + /* + * Wait for all workers to finish. Any worker failure calls pg_fatal(), + * which terminates the process, so if we get here all workers succeeded. + */ + for (i = 0; i < num_workers; i++) + THREAD_JOIN(worker_threads[i]); + + /* + * Clean up worker connections (skip index 0, which is the main + * connection) + */ + for (i = 1; i < num_workers; i++) + PQfinish(connections[i]); + + free(connections); + free(worker_threads); + free(worker_data); +} + /* qsort comparator for Variable array */ static int compareVariableNames(const void *v1, const void *v2) @@ -4990,11 +5212,7 @@ initCreateTables(PGconn *con) static void initTruncateTables(PGconn *con) { - executeStatement(con, "truncate table " - "pgbench_accounts, " - "pgbench_branches, " - "pgbench_history, " - "pgbench_tellers"); + truncateTable(con, "pgbench_accounts, pgbench_branches, pgbench_history, pgbench_tellers"); } static void @@ -5024,9 +5242,41 @@ initAccount(PQExpBufferData *sql, int64 curr) curr + 1, curr / naccounts + 1); } +/* + * Append-based versions to enable batching. + * These use appendPQExpBuffer instead of printfPQExpBuffer to allow + * multiple rows to be accumulated in a single buffer. + */ static void -initPopulateTable(PGconn *con, const char *table, int64 base, - initRowMethod init_row) +appendBranch(PQExpBufferData *sql, int64 curr) +{ + /* "filler" column uses NULL */ + appendPQExpBuffer(sql, + INT64_FORMAT "\t0\t\\N\n", + curr + 1); +} + +static void +appendTeller(PQExpBufferData *sql, int64 curr) +{ + /* "filler" column uses NULL */ + appendPQExpBuffer(sql, + INT64_FORMAT "\t" INT64_FORMAT "\t0\t\\N\n", + curr + 1, curr / ntellers + 1); +} + +static void +appendAccount(PQExpBufferData *sql, int64 curr) +{ + /* "filler" column defaults to blank padded empty string */ + appendPQExpBuffer(sql, + INT64_FORMAT "\t" INT64_FORMAT "\t0\t\n", + curr + 1, curr / naccounts + 1); +} + +static void +initPopulateTableSerial(PGconn *con, const char *table, int64 base, + initRowMethod init_row) { int n; int64 k; @@ -5034,7 +5284,7 @@ initPopulateTable(PGconn *con, const char *table, int64 base, int prev_chars = 0; PGresult *res; PQExpBufferData sql; - char copy_statement[256]; + char copy_statement[NAMEDATALEN + 32]; const char *copy_statement_fmt = "copy %s from stdin"; int64 total = base * scale; @@ -5143,6 +5393,16 @@ initPopulateTable(PGconn *con, const char *table, int64 base, termPQExpBuffer(&sql); } +static void +initPopulateTable(PGconn *con, const char *table, int64 total_rows, + initRowMethod init_row, initRowMethod append_row, bool use_parallel) +{ + if (use_parallel && nthreads > 1 && partition_method == PART_NONE) + initPopulateTableParallel(con, nthreads, table, total_rows * scale, append_row); + else + initPopulateTableSerial(con, table, total_rows, init_row); +} + /* * Fill the standard tables with some data generated and sent from the client. * @@ -5155,8 +5415,9 @@ initGenerateDataClientSide(PGconn *con) fprintf(stderr, "generating data (client-side)...\n"); /* - * we do all of this in one transaction to enable the backend's - * data-loading optimizations + * For single-threaded mode, do everything in one transaction. For + * multi-threaded mode, do branches/tellers/history in one transaction, + * then accounts in parallel (each thread handles its own transaction). */ executeStatement(con, "begin"); @@ -5167,11 +5428,16 @@ initGenerateDataClientSide(PGconn *con) * fill branches, tellers, accounts in that order in case foreign keys * already exist */ - initPopulateTable(con, "pgbench_branches", nbranches, initBranch); - initPopulateTable(con, "pgbench_tellers", ntellers, initTeller); - initPopulateTable(con, "pgbench_accounts", naccounts, initAccount); + initPopulateTable(con, "pgbench_branches", nbranches, initBranch, appendBranch, false); + initPopulateTable(con, "pgbench_tellers", ntellers, initTeller, appendTeller, false); - executeStatement(con, "commit"); + if (nthreads > 1) + executeStatement(con, "commit"); + + initPopulateTable(con, "pgbench_accounts", naccounts, initAccount, appendAccount, nthreads > 1); + + if (nthreads == 1) + executeStatement(con, "commit"); } /* @@ -6944,7 +7210,6 @@ main(int argc, char **argv) initialization_option_set = true; break; case 'j': /* jobs */ - benchmarking_option_set = true; if (!option_parse_int(optarg, "-j/--jobs", 1, INT_MAX, &nthreads)) { @@ -7176,7 +7441,7 @@ main(int argc, char **argv) * optimization; throttle_delay is calculated incorrectly below if some * threads have no clients assigned to them.) */ - if (nthreads > nclients) + if (nthreads > nclients && !is_init_mode) nthreads = nclients; /* diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl index b7685ea5d2..b0a4b973b4 100644 --- a/src/bin/pgbench/t/001_pgbench_with_server.pl +++ b/src/bin/pgbench/t/001_pgbench_with_server.pl @@ -217,6 +217,26 @@ my $nthreads = 2; $nthreads = 1 if $stderr =~ m/threads are not supported on this platform/; } +# Test parallel initialization (requires thread support) +if ($nthreads > 1) +{ + # Parallel init without partitions + $node->pgbench( + '-i -j 2 --scale=1', + 0, + [qr{^$}], + [ + qr{creating tables}, + qr{loading pgbench_accounts with 2 threads}, + qr{vacuuming}, + qr{creating primary keys}, + qr{done in \d+\.\d\d s } + ], + 'pgbench parallel initialization without partitions'); + + check_data_state($node, 'parallel-no-partitions'); +} + # run custom scripts $node->pgbench( "-t 100 -c 1 -j $nthreads -M prepared -n", diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 3da19d4141..f2ba9c75a6 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -541,6 +541,7 @@ CopyOnErrorChoice CopySeqResult CopySource CopyStmt +CopyTarget CopyToRoutine CopyToState CopyToStateData @@ -3383,6 +3384,7 @@ WorkerInfoData WorkerInstrumentation WorkerJobDumpPtrType WorkerJobRestorePtrType +WorkerTask Working_State WriteBufPtrType WriteBytePtrType -- 2.39.5 (Apple Git-154)