From 01caabbe159af6b6efd607e08ad5d7fbc77e513e Mon Sep 17 00:00:00 2001 From: Mircea Cadariu Date: Sun, 9 Nov 2025 10:41:51 +0000 Subject: [PATCH v1] Add parallel data loading support to pgbench pgbench initialization (pgbench -i) now uses the -j option to load the pgbench_accounts table in parallel using multiple threads, each with its own connection. For range-partitioned tables, each worker creates and loads its assigned partitions as standalone tables, then the main thread attaches them to the parent after all data is loaded. This avoids AccessExclusiveLock contention on the parent table during parallel loading, and allows each worker to use COPY FREEZE. For non-partitioned tables, worker 0 reuses the main connection (which did the TRUNCATE) and can use COPY FREEZE, while additional workers use separate connections with regular COPY. Implementation details: - COPY data is batched into 1MB buffers before sending to reduce overhead - Hash partitioning does not support parallel loading - Number of threads must not exceed the number of partitions Author: Mircea Cadariu Reviewed-by: Lakshmi G --- doc/src/sgml/ref/pgbench.sgml | 11 + src/bin/pgbench/pgbench.c | 475 +++++++++++++++++-- src/bin/pgbench/t/001_pgbench_with_server.pl | 45 ++ src/bin/pgbench/t/002_pgbench_no_server.pl | 5 + 4 files changed, 501 insertions(+), 35 deletions(-) diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml index 2e401d1ceb..58b3ccd379 100644 --- a/doc/src/sgml/ref/pgbench.sgml +++ b/doc/src/sgml/ref/pgbench.sgml @@ -502,6 +502,17 @@ pgbench options d Clients are distributed as evenly as possible among available threads. Default is 1. + + In initialization mode (), this option controls + the number of threads used to load data into + pgbench_accounts in parallel. + With using + range partitioning, each thread loads one or more + complete partitions independently. + The number of threads must not exceed the number of partitions. + Parallel data loading is not currently supported with + hash partitioning. + diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 58735871c1..b94da2567d 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -78,6 +78,8 @@ #define ERRCODE_T_R_DEADLOCK_DETECTED "40P01" #define ERRCODE_UNDEFINED_TABLE "42P01" +#define COPY_BATCH_SIZE (1024 * 1024) + /* * Hashing constants */ @@ -825,7 +827,6 @@ static const BuiltinScript builtin_script[] = } }; - /* Function prototypes */ static void setNullValue(PgBenchValue *pv); static void setBoolValue(PgBenchValue *pv, bool bval); @@ -848,6 +849,8 @@ static void clear_socket_set(socket_set *sa); static void add_socket_to_set(socket_set *sa, int fd, int idx); static int wait_on_socket_set(socket_set *sa, int64 usecs); static bool socket_has_input(socket_set *sa, int fd, int idx); +static void createPartitions(PGconn *con, int part_start, int part_end); +static void attachPartitions(PGconn *con); /* callback used to build rows for COPY during data loading */ typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr); @@ -857,6 +860,20 @@ static const PsqlScanCallbacks pgbench_callbacks = { NULL, /* don't need get_variable functionality */ }; +/* Worker thread data for parallel table loading */ +typedef struct WorkerTask +{ + PGconn *con; + const char *table; + int64 start_row; + int64 end_row; + initRowMethod append_row; + int worker_id; + int part_start; + int part_end; + int64 part_size; +} WorkerTask; + static char get_table_relkind(PGconn *con, const char *table) { @@ -1586,6 +1603,301 @@ doConnect(void) return conn; } +/* + * Truncate specified table(s) + * tableName can be a single table or comma-separated list of tables + */ +static void +truncateTable(PGconn *con, const char *tableName) +{ + PQExpBufferData query; + + initPQExpBuffer(&query); + printfPQExpBuffer(&query, "TRUNCATE TABLE %s", tableName); + executeStatement(con, query.data); + termPQExpBuffer(&query); +} + +/* + * Parameters needed for COPY operations. + */ +typedef struct CopyTarget +{ + const char *table_name; + int64 start_row; + int64 end_row; + bool use_freeze; +} CopyTarget; + +/* + * Perform COPY operation for a single table or partition. + * Batches rows into larger buffers before sending to reduce overhead. + */ +static void +performCopy(PGconn *conn, WorkerTask * wtd, CopyTarget * target) +{ + PGresult *res; + char copy_statement[NAMEDATALEN + 32]; + int64 row; + PQExpBufferData batch_buffer; + + /* Build the COPY command */ + if (target->use_freeze) + snprintf(copy_statement, sizeof(copy_statement), + "COPY %s FROM STDIN (FREEZE ON)", target->table_name); + else + snprintf(copy_statement, sizeof(copy_statement), + "COPY %s FROM STDIN", target->table_name); + + /* Initiate COPY mode */ + res = PQexec(conn, copy_statement); + if (PQresultStatus(res) != PGRES_COPY_IN) + pg_fatal("COPY command failed for table \"%s\": %s", + target->table_name, PQerrorMessage(conn)); + PQclear(res); + + /* Pre-allocate buffer to avoid repeated reallocs */ + initPQExpBuffer(&batch_buffer); + enlargePQExpBuffer(&batch_buffer, COPY_BATCH_SIZE); + + /* Generate and send rows in batches using append_row */ + for (row = target->start_row; row < target->end_row; row++) + { + /* Use append_row to accumulate multiple rows in the buffer */ + wtd->append_row(&batch_buffer, row); + + /* Send batch when buffer reaches size threshold */ + if (batch_buffer.len >= COPY_BATCH_SIZE) + { + if (PQputCopyData(conn, batch_buffer.data, batch_buffer.len) <= 0) + pg_fatal("error in PQputCopyData: %s", PQerrorMessage(conn)); + + resetPQExpBuffer(&batch_buffer); + } + } + + /* Send any remaining buffered data */ + if (batch_buffer.len > 0) + { + if (PQputCopyData(conn, batch_buffer.data, batch_buffer.len) <= 0) + pg_fatal("error in PQputCopyData: %s", PQerrorMessage(conn)); + } + + /* Finalize the COPY operation */ + if (PQputCopyEnd(conn, NULL) <= 0) + pg_fatal("error in PQputCopyEnd: %s", PQerrorMessage(conn)); + + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("COPY failed for table \"%s\": %s", + target->table_name, PQerrorMessage(conn)); + PQclear(res); + + termPQExpBuffer(&batch_buffer); +} + + +static void +assignWorkerRows(WorkerTask * wtd, int num_workers, int64 total_rows) +{ + int64 rows_per_worker = total_rows / num_workers; + + wtd->start_row = wtd->worker_id * rows_per_worker; + wtd->end_row = (wtd->worker_id == num_workers - 1) ? + total_rows : + (wtd->worker_id + 1) * rows_per_worker; +} + +/* + * Covers only multiple partitions per worker (workers <= partitions) for now. + * Each worker loads complete partitions independently and can use COPY FREEZE. + */ +static void +assignWorkerPartitions(WorkerTask * wtd, int num_workers, int64 total_rows, + int num_parts) +{ + int parts_per_worker = num_parts / num_workers; + int extra_parts = num_parts % num_workers; + + wtd->part_start = wtd->worker_id * parts_per_worker + 1 + + (wtd->worker_id < extra_parts ? wtd->worker_id : extra_parts); + wtd->part_end = wtd->part_start + parts_per_worker - 1 + + (wtd->worker_id < extra_parts ? 1 : 0); + + wtd->start_row = (wtd->part_start - 1) * wtd->part_size; + wtd->end_row = (wtd->part_end == num_parts) ? + total_rows : + wtd->part_end * wtd->part_size; +} + + +/* Load data into partitioned table */ +static void +loadPartitionedTable(PGconn *conn, WorkerTask * wtd) +{ + int p; + + for (p = wtd->part_start; p <= wtd->part_end; p++) + { + CopyTarget target; + int64 part_start_row = (p - 1) * wtd->part_size; + int64 part_end_row = (p == partitions) ? (naccounts * (int64) scale) : (p * wtd->part_size); + char partition_table[NAMEDATALEN]; + + snprintf(partition_table, sizeof(partition_table), "pgbench_accounts_%d", p); + + target.table_name = partition_table; + target.start_row = part_start_row; + target.end_row = part_end_row; + target.use_freeze = true; + + performCopy(conn, wtd, &target); + } +} + +/* + * Load data into non-partitioned table. + * + * Only worker 0 can use COPY FREEZE, because it inherits the transaction + * that truncated the table. Other workers use plain COPY in their own + * transactions. + */ +static void +loadRegularTable(PGconn *conn, WorkerTask * wtd) +{ + CopyTarget target; + + target.table_name = wtd->table; + target.start_row = wtd->start_row; + target.end_row = wtd->end_row; + target.use_freeze = (wtd->worker_id == 0); + + performCopy(conn, wtd, &target); +} + +static THREAD_FUNC_RETURN_TYPE THREAD_FUNC_CC +initWorkerThread(void *arg) +{ + WorkerTask *wtd = (WorkerTask *) arg; + PGconn *conn; + + /* Connection is pre-created, just use it */ + conn = wtd->con; + + /* + * Start a new transaction for this worker, except for worker 0 on + * non-partitioned tables. Worker 0 continues the transaction from the + * main thread that already did the truncate (to enable COPY FREEZE). + */ + if (wtd->part_start > 0 || wtd->worker_id > 0) + executeStatement(conn, "begin"); + + if (wtd->part_start > 0) + { + createPartitions(conn, wtd->part_start, wtd->part_end); + loadPartitionedTable(conn, wtd); + } + else + loadRegularTable(conn, wtd); + + executeStatement(conn, "commit"); + + THREAD_FUNC_RETURN; +} + +static void +initPopulateTableParallel(PGconn *connection, int num_workers, + const char *table, int64 total_rows, + initRowMethod append_row) +{ + THREAD_T *worker_threads; + WorkerTask *worker_data; + PGconn **connections; + bool is_partitioned; + int i; + + /* Allocate worker data and threads */ + worker_threads = pg_malloc(num_workers * sizeof(THREAD_T)); + worker_data = pg_malloc0(num_workers * sizeof(WorkerTask)); + connections = pg_malloc(num_workers * sizeof(PGconn *)); + + /* Reuse main connection for worker 0, create new ones for others */ + connections[0] = connection; + for (i = 1; i < num_workers; i++) + { + connections[i] = doConnect(); + if (connections[i] == NULL) + pg_fatal("could not create connection for worker %d", i); + } + + /* Works only for pgbench_accounts and the range partitioning option */ + is_partitioned = strcmp(table, "pgbench_accounts") == 0 && partition_method == PART_RANGE; + + /* + * For partitioned tables, we handle only num_workers <= partitions for + * now + */ + if (is_partitioned && num_workers > partitions) + pg_fatal("number of threads (%d) must not exceed the number of partitions (%d)", + num_workers, partitions); + + executeStatement(connections[0], "begin"); + truncateTable(connections[0], table); + + if (is_partitioned) + { + executeStatement(connections[0], "commit"); + } + + fprintf(stderr, "loading %s with %d threads...\n", table, num_workers); + + /* Create and start worker threads */ + for (i = 0; i < num_workers; i++) + { + worker_data[i].con = connections[i]; + worker_data[i].table = table; + worker_data[i].append_row = append_row; + worker_data[i].worker_id = i; + + if (!is_partitioned) + assignWorkerRows(&worker_data[i], num_workers, total_rows); + else + { + worker_data[i].part_size = (naccounts * (int64) scale + partitions - 1) / partitions; + assignWorkerPartitions(&worker_data[i], num_workers, total_rows, + partitions); + } + + errno = THREAD_CREATE(&worker_threads[i], initWorkerThread, &worker_data[i]); + if (errno != 0) + pg_fatal("could not create thread for worker %d: %m", i); + } + + /* + * Wait for all workers to finish. Any worker failure calls pg_fatal(), + * which terminates the process, so if we get here all workers succeeded. + */ + for (i = 0; i < num_workers; i++) + THREAD_JOIN(worker_threads[i]); + + /* + * For partitioned tables, attach all partitions now that data is loaded. + */ + if (is_partitioned) + attachPartitions(connection); + + /* + * Clean up worker connections (skip index 0, which is the main + * connection) + */ + for (i = 1; i < num_workers; i++) + PQfinish(connections[i]); + + free(connections); + free(worker_threads); + free(worker_data); +} + /* qsort comparator for Variable array */ static int compareVariableNames(const void *v1, const void *v2) @@ -4824,14 +5136,50 @@ initDropTables(PGconn *con) * with a known size, so we choose to partition it. */ static void -createPartitions(PGconn *con) +createPartitions(PGconn *con, int part_start, int part_end) { PQExpBufferData query; /* we must have to create some partitions */ Assert(partitions > 0); - fprintf(stderr, "creating %d partitions...\n", partitions); + initPQExpBuffer(&query); + + for (int p = part_start; p <= part_end; p++) + { + /* + * Create standalone tables (not attached to parent yet). This avoids + * AccessExclusiveLock on the parent table, allowing parallel + * creation. Tables will be attached after data loading. + */ + printfPQExpBuffer(&query, + "create%s table pgbench_accounts_%d\n" + " (aid int not null,\n" + " bid int,\n" + " abalance int,\n" + " filler character(84))\n" + " with (fillfactor=%d)", + unlogged_tables ? " unlogged" : "", p, + fillfactor); + + executeStatement(con, query.data); + } + + termPQExpBuffer(&query); +} + +/* + * Attach standalone partition tables to the parent table. + * Called after all data has been loaded in parallel. + */ +static void +attachPartitions(PGconn *con) +{ + PQExpBufferData query; + + Assert(partitions > 0); + + fprintf(stderr, "attaching %d partitions...\n", partitions); initPQExpBuffer(&query); @@ -4842,10 +5190,9 @@ createPartitions(PGconn *con) int64 part_size = (naccounts * (int64) scale + partitions - 1) / partitions; printfPQExpBuffer(&query, - "create%s table pgbench_accounts_%d\n" - " partition of pgbench_accounts\n" - " for values from (", - unlogged_tables ? " unlogged" : "", p); + "alter table pgbench_accounts\n" + " attach partition pgbench_accounts_%d\n" + " for values from (", p); /* * For RANGE, we use open-ended partitions at the beginning and @@ -4868,21 +5215,16 @@ createPartitions(PGconn *con) appendPQExpBufferChar(&query, ')'); } else if (partition_method == PART_HASH) + { printfPQExpBuffer(&query, - "create%s table pgbench_accounts_%d\n" - " partition of pgbench_accounts\n" + "alter table pgbench_accounts\n" + " attach partition pgbench_accounts_%d\n" " for values with (modulus %d, remainder %d)", - unlogged_tables ? " unlogged" : "", p, - partitions, p - 1); + p, partitions, p - 1); + } else /* cannot get there */ Assert(0); - /* - * Per ddlinfo in initCreateTables, fillfactor is needed on table - * pgbench_accounts. - */ - appendPQExpBuffer(&query, " with (fillfactor=%d)", fillfactor); - executeStatement(con, query.data); } @@ -4980,8 +5322,11 @@ initCreateTables(PGconn *con) termPQExpBuffer(&query); - if (partition_method != PART_NONE) - createPartitions(con); + if (partition_method != PART_NONE && (nthreads == 1 || partition_method == PART_HASH)) + { + fprintf(stderr, "creating %d partitions...\n", partitions); + createPartitions(con, 1, partitions); + } } /* @@ -4990,11 +5335,7 @@ initCreateTables(PGconn *con) static void initTruncateTables(PGconn *con) { - executeStatement(con, "truncate table " - "pgbench_accounts, " - "pgbench_branches, " - "pgbench_history, " - "pgbench_tellers"); + truncateTable(con, "pgbench_accounts, pgbench_branches, pgbench_history, pgbench_tellers"); } static void @@ -5024,9 +5365,41 @@ initAccount(PQExpBufferData *sql, int64 curr) curr + 1, curr / naccounts + 1); } +/* + * Append-based versions to enable batching. + * These use appendPQExpBuffer instead of printfPQExpBuffer to allow + * multiple rows to be accumulated in a single buffer. + */ +static void +appendBranch(PQExpBufferData *sql, int64 curr) +{ + /* "filler" column uses NULL */ + appendPQExpBuffer(sql, + INT64_FORMAT "\t0\t\\N\n", + curr + 1); +} + static void -initPopulateTable(PGconn *con, const char *table, int64 base, - initRowMethod init_row) +appendTeller(PQExpBufferData *sql, int64 curr) +{ + /* "filler" column uses NULL */ + appendPQExpBuffer(sql, + INT64_FORMAT "\t" INT64_FORMAT "\t0\t\\N\n", + curr + 1, curr / ntellers + 1); +} + +static void +appendAccount(PQExpBufferData *sql, int64 curr) +{ + /* "filler" column defaults to blank padded empty string */ + appendPQExpBuffer(sql, + INT64_FORMAT "\t" INT64_FORMAT "\t0\t\n", + curr + 1, curr / naccounts + 1); +} + +static void +initPopulateTableSerial(PGconn *con, const char *table, int64 base, + initRowMethod init_row) { int n; int64 k; @@ -5034,7 +5407,7 @@ initPopulateTable(PGconn *con, const char *table, int64 base, int prev_chars = 0; PGresult *res; PQExpBufferData sql; - char copy_statement[256]; + char copy_statement[NAMEDATALEN + 32]; const char *copy_statement_fmt = "copy %s from stdin"; int64 total = base * scale; @@ -5143,6 +5516,27 @@ initPopulateTable(PGconn *con, const char *table, int64 base, termPQExpBuffer(&sql); } +static void +initPopulateTable(PGconn *con, const char *table, int64 total_rows, + initRowMethod init_row, initRowMethod append_row, bool use_parallel) +{ + bool is_accounts = (strcmp(table, "pgbench_accounts") == 0); + + if (use_parallel && nthreads > 1) + initPopulateTableParallel(con, nthreads, table, total_rows * scale, append_row); + else + { + /* + * For single-threaded mode with partitioned tables, attach partitions + * before loading data so COPY to the parent table can route rows. + */ + if (is_accounts && partitions > 0 && partition_method != PART_NONE) + attachPartitions(con); + + initPopulateTableSerial(con, table, total_rows, init_row); + } +} + /* * Fill the standard tables with some data generated and sent from the client. * @@ -5155,8 +5549,9 @@ initGenerateDataClientSide(PGconn *con) fprintf(stderr, "generating data (client-side)...\n"); /* - * we do all of this in one transaction to enable the backend's - * data-loading optimizations + * For single-threaded mode, do everything in one transaction. For + * multi-threaded mode, do branches/tellers/history in one transaction, + * then accounts in parallel (each thread handles its own transaction). */ executeStatement(con, "begin"); @@ -5167,11 +5562,16 @@ initGenerateDataClientSide(PGconn *con) * fill branches, tellers, accounts in that order in case foreign keys * already exist */ - initPopulateTable(con, "pgbench_branches", nbranches, initBranch); - initPopulateTable(con, "pgbench_tellers", ntellers, initTeller); - initPopulateTable(con, "pgbench_accounts", naccounts, initAccount); + initPopulateTable(con, "pgbench_branches", nbranches, initBranch, appendBranch, false); + initPopulateTable(con, "pgbench_tellers", ntellers, initTeller, appendTeller, false); - executeStatement(con, "commit"); + if (nthreads > 1) + executeStatement(con, "commit"); + + initPopulateTable(con, "pgbench_accounts", naccounts, initAccount, appendAccount, nthreads > 1); + + if (nthreads == 1) + executeStatement(con, "commit"); } /* @@ -5197,6 +5597,9 @@ initGenerateDataServerSide(PGconn *con) /* truncate away any old data */ initTruncateTables(con); + if (partitions > 0 && partition_method != PART_NONE) + attachPartitions(con); + initPQExpBuffer(&sql); printfPQExpBuffer(&sql, @@ -6944,7 +7347,6 @@ main(int argc, char **argv) initialization_option_set = true; break; case 'j': /* jobs */ - benchmarking_option_set = true; if (!option_parse_int(optarg, "-j/--jobs", 1, INT_MAX, &nthreads)) { @@ -7176,7 +7578,7 @@ main(int argc, char **argv) * optimization; throttle_delay is calculated incorrectly below if some * threads have no clients assigned to them.) */ - if (nthreads > nclients) + if (nthreads > nclients && !is_init_mode) nthreads = nclients; /* @@ -7221,6 +7623,9 @@ main(int argc, char **argv) if (partitions > 0 && partition_method == PART_NONE) partition_method = PART_RANGE; + if (partition_method == PART_HASH && nthreads > 1) + pg_fatal("parallel data loading (-j) is not supported with hash partitioning"); + if (initialize_steps == NULL) initialize_steps = pg_strdup(DEFAULT_INIT_STEPS); diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl index b7685ea5d2..7b214c9030 100644 --- a/src/bin/pgbench/t/001_pgbench_with_server.pl +++ b/src/bin/pgbench/t/001_pgbench_with_server.pl @@ -217,6 +217,51 @@ my $nthreads = 2; $nthreads = 1 if $stderr =~ m/threads are not supported on this platform/; } +# Test parallel initialization (requires thread support) +if ($nthreads > 1) +{ + # Parallel init without partitions + $node->pgbench( + '-i -j 2 --scale=1', + 0, + [qr{^$}], + [ + qr{creating tables}, + qr{loading pgbench_accounts with 2 threads}, + qr{vacuuming}, + qr{creating primary keys}, + qr{done in \d+\.\d\d s } + ], + 'pgbench parallel initialization without partitions'); + + check_data_state($node, 'parallel-no-partitions'); + + # Parallel init with range partitions + $node->pgbench( + '-i -j 2 --scale=1 --partitions=4 --partition-method=range', + 0, + [qr{^$}], + [ + qr{creating tables}, + qr{loading pgbench_accounts with 2 threads}, + qr{attaching 4 partitions}, + qr{vacuuming}, + qr{creating primary keys}, + qr{done in \d+\.\d\d s } + ], + 'pgbench parallel initialization with range partitions'); + + check_data_state($node, 'parallel-range-partitions'); + + # Error: more threads than partitions + $node->pgbench( + '-i -j 3 --scale=1 --partitions=2 --partition-method=range', + 1, + [qr{^$}], + [qr{number of threads \(3\) must not exceed the number of partitions \(2\)}], + 'pgbench parallel init fails when threads exceed partitions'); +} + # run custom scripts $node->pgbench( "-t 100 -c 1 -j $nthreads -M prepared -n", diff --git a/src/bin/pgbench/t/002_pgbench_no_server.pl b/src/bin/pgbench/t/002_pgbench_no_server.pl index e694e9ef0f..d67f26e422 100644 --- a/src/bin/pgbench/t/002_pgbench_no_server.pl +++ b/src/bin/pgbench/t/002_pgbench_no_server.pl @@ -187,6 +187,11 @@ my @options = ( '-i --partition-method=hash', [qr{partition-method requires greater than zero --partitions}] ], + [ + 'parallel data loading with hash partitioning', + '-i -j 2 --partitions=4 --partition-method=hash', + [qr{parallel data loading \(-j\) is not supported with hash partitioning}] + ], [ 'bad maximum number of tries', '--max-tries -10', -- 2.39.5 (Apple Git-154)