From 18d91ec9c22d43522dc1cd83c16359c36b3dc58d Mon Sep 17 00:00:00 2001 From: Mircea Cadariu Date: Sun, 9 Nov 2025 10:41:51 +0000 Subject: [PATCH v1] wip --- src/bin/pgbench/pgbench.c | 455 +++++++++++++++++++++++++++++++++++--- 1 file changed, 420 insertions(+), 35 deletions(-) diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index a425176ecd..ef4e05678a 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -78,6 +78,8 @@ #define ERRCODE_T_R_DEADLOCK_DETECTED "40P01" #define ERRCODE_UNDEFINED_TABLE "42P01" +#define COPY_BATCH_SIZE (1024 * 1024) + /* * Hashing constants */ @@ -825,7 +827,6 @@ static const BuiltinScript builtin_script[] = } }; - /* Function prototypes */ static void setNullValue(PgBenchValue *pv); static void setBoolValue(PgBenchValue *pv, bool bval); @@ -848,6 +849,8 @@ static void clear_socket_set(socket_set *sa); static void add_socket_to_set(socket_set *sa, int fd, int idx); static int wait_on_socket_set(socket_set *sa, int64 usecs); static bool socket_has_input(socket_set *sa, int fd, int idx); +static void createPartitions(PGconn *con, int part_start, int part_end); +static void attachPartitions(PGconn *con); /* callback used to build rows for COPY during data loading */ typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr); @@ -856,6 +859,19 @@ typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr); static const PsqlScanCallbacks pgbench_callbacks = { NULL, /* don't need get_variable functionality */ }; +/* Worker thread data for parallel table loading */ +typedef struct WorkerTaskDescription +{ + PGconn *con; + const char *table; + int64 start_row; + int64 end_row; + initRowMethod append_row; + int worker_id; + int part_start; + int part_end; + int64 part_size; +} WorkerTaskDescription; static char get_table_relkind(PGconn *con, const char *table) @@ -1631,6 +1647,277 @@ doConnect(void) return conn; } +/* + * Truncate specified table(s) + * tableName can be a single table or comma-separated list of tables + */ +static void +truncateTable(PGconn *con, const char *tableName) +{ + PQExpBufferData query; + + initPQExpBuffer(&query); + printfPQExpBuffer(&query, "TRUNCATE TABLE %s", tableName); + executeStatement(con, query.data); + termPQExpBuffer(&query); +} + +/* + * Parameters needed for COPY operations. + */ +typedef struct CopyTarget +{ + const char *table_name; + int64 start_row; + int64 end_row; + bool use_freeze; +} CopyTarget; + +/* + * Perform COPY operation for a single table or partition. + * Batches rows into larger buffers before sending to reduce overhead. + */ +static void +performCopy(PGconn *conn, WorkerTaskDescription *wtd, CopyTarget *target) +{ + PGresult *res; + char copy_statement[NAMEDATALEN + 32]; + int64 row; + PQExpBufferData batch_buffer; + + /* Build the COPY command */ + if (target->use_freeze) + snprintf(copy_statement, sizeof(copy_statement), + "COPY %s FROM STDIN (FREEZE ON)", target->table_name); + else + snprintf(copy_statement, sizeof(copy_statement), + "COPY %s FROM STDIN", target->table_name); + + /* Initiate COPY mode */ + res = PQexec(conn, copy_statement); + if (PQresultStatus(res) != PGRES_COPY_IN) + pg_fatal("COPY command failed for table \"%s\": %s", + target->table_name, PQerrorMessage(conn)); + PQclear(res); + + /* Pre-allocate buffer to avoid repeated reallocs */ + initPQExpBuffer(&batch_buffer); + enlargePQExpBuffer(&batch_buffer, COPY_BATCH_SIZE); + + /* Generate and send rows in batches using append_row */ + for (row = target->start_row; row < target->end_row; row++) + { + /* Use append_row to accumulate multiple rows in the buffer */ + wtd->append_row(&batch_buffer, row); + + /* Send batch when buffer reaches size threshold */ + if (batch_buffer.len >= COPY_BATCH_SIZE) + { + if (PQputCopyData(conn, batch_buffer.data, batch_buffer.len) <= 0) + pg_fatal("error in PQputCopyData: %s", PQerrorMessage(conn)); + + resetPQExpBuffer(&batch_buffer); + } + } + + /* Send any remaining buffered data */ + if (batch_buffer.len > 0) + { + if (PQputCopyData(conn, batch_buffer.data, batch_buffer.len) <= 0) + pg_fatal("error in PQputCopyData: %s", PQerrorMessage(conn)); + } + + /* Finalize the COPY operation */ + if (PQputCopyEnd(conn, NULL) <= 0) + pg_fatal("error in PQputCopyEnd: %s", PQerrorMessage(conn)); + + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("COPY failed for table \"%s\": %s", + target->table_name, PQerrorMessage(conn)); + PQclear(res); + + termPQExpBuffer(&batch_buffer); +} + + +static void +assignWorkerRows(WorkerTaskDescription *wtd, int num_workers, int64 total_rows) +{ + int64 rows_per_worker = total_rows / num_workers; + + wtd->start_row = wtd->worker_id * rows_per_worker; + wtd->end_row = (wtd->worker_id == num_workers - 1) ? + total_rows : + (wtd->worker_id + 1) * rows_per_worker; +} + +/* + * Covers only multiple partitions per worker (workers <= partitions) for now. + * Each worker loads complete partitions independently and can use COPY FREEZE. + */ +static void +assignWorkerPartitions(WorkerTaskDescription *wtd, int num_workers, int64 total_rows, + int num_parts) +{ + int parts_per_worker = num_parts / num_workers; + int extra_parts = num_parts % num_workers; + + wtd->part_start = wtd->worker_id * parts_per_worker + 1 + + (wtd->worker_id < extra_parts ? wtd->worker_id : extra_parts); + wtd->part_end = wtd->part_start + parts_per_worker - 1 + + (wtd->worker_id < extra_parts ? 1 : 0); + + wtd->start_row = (wtd->part_start - 1) * wtd->part_size; + wtd->end_row = (wtd->part_end == num_parts) ? + total_rows : + wtd->part_end * wtd->part_size; +} + + +/* Load data into partitioned table */ +static void +loadPartitionedTable(PGconn *conn, WorkerTaskDescription *wtd) +{ + int p; + + for (p = wtd->part_start; p <= wtd->part_end; p++) + { + CopyTarget target; + int64 part_start_row = (p - 1) * wtd->part_size; + int64 part_end_row = (p == partitions) ? (naccounts * (int64) scale) : (p * wtd->part_size); + char partition_table[NAMEDATALEN]; + + snprintf(partition_table, sizeof(partition_table), "pgbench_accounts_%d", p); + + target.table_name = partition_table; + target.start_row = part_start_row; + target.end_row = part_end_row; + target.use_freeze = true; + + performCopy(conn, wtd, &target); + } +} + +/* Load data into non-partitioned table */ +static void +loadRegularTable(PGconn *conn, WorkerTaskDescription *wtd) +{ + CopyTarget target; + + target.table_name = wtd->table; + target.start_row = wtd->start_row; + target.end_row = wtd->end_row; + target.use_freeze = (wtd->worker_id == 0); + + performCopy(conn, wtd, &target); +} + +static THREAD_FUNC_RETURN_TYPE THREAD_FUNC_CC +initWorkerThread(void *arg) +{ + WorkerTaskDescription *wtd = (WorkerTaskDescription *) arg; + PGconn *conn; + + /* Connection is pre-created, just use it */ + conn = wtd->con; + + /* + * Start a new transaction for this worker, except for worker 0 on + * non-partitioned tables. Worker 0 continues the transaction from the + * main thread that already did the truncate (to enable COPY FREEZE). + */ + if (wtd->part_start > 0 || wtd->worker_id > 0) + executeStatement(conn, "begin"); + + if (wtd->part_start > 0) + { + createPartitions(conn, wtd->part_start, wtd->part_end); + loadPartitionedTable(conn, wtd); + } + else + loadRegularTable(conn, wtd); + + executeStatement(conn, "commit"); + + return NULL; +} + +static void +initPopulateTableParallel(PGconn *connection, int num_workers, + const char *table, int64 total_rows, + initRowMethod append_row) +{ + THREAD_T *worker_threads; + WorkerTaskDescription *worker_data; + PGconn **connections; + bool is_partitioned; + int i; + + /* Allocate worker data and threads */ + worker_threads = pg_malloc(num_workers * sizeof(pthread_t)); + worker_data = pg_malloc0(num_workers * sizeof(WorkerTaskDescription)); + connections = pg_malloc(num_workers * sizeof(PGconn *)); + + /* Reuse main connection for worker 0, create new ones for others */ + connections[0] = connection; + for (i = 1; i < num_workers; i++) + connections[i] = doConnect(); + + /* Works only for pgbench_accounts and the range partitioning option */ + is_partitioned = strcmp(table, "pgbench_accounts") == 0 && partition_method == PART_RANGE; + + /* For partitioned tables, we handle only num_workers <= partitions for now */ + if (is_partitioned && num_workers > partitions) + pg_fatal("number of threads (%d) must not exceed the number of partitions (%d)", + num_workers, partitions); + + executeStatement(connections[0], "begin"); + truncateTable(connections[0], table); + + if (is_partitioned) + { + executeStatement(connections[0], "commit"); + } + + /* Create and start worker threads */ + for (i = 0; i < num_workers; i++) + { + worker_data[i].con = connections[i]; + worker_data[i].table = table; + worker_data[i].append_row = append_row; + worker_data[i].worker_id = i; + + if (!is_partitioned) + assignWorkerRows(&worker_data[i], num_workers, total_rows); + else + { + worker_data[i].part_size = (naccounts * (int64) scale + partitions - 1) / partitions; + assignWorkerPartitions(&worker_data[i], num_workers, total_rows, + partitions); + } + + THREAD_CREATE(&worker_threads[i], initWorkerThread, &worker_data[i]); + } + + for (i = 0; i < num_workers; i++) + THREAD_JOIN(worker_threads[i]); + + /* + * For partitioned tables, attach all partitions now that data is loaded. + */ + if (is_partitioned) + attachPartitions(connection); + + /* Clean up worker connections (skip index 0, which is the main connection) */ + for (i = 1; i < num_workers; i++) + PQfinish(connections[i]); + + free(connections); + free(worker_threads); + free(worker_data); +} + /* qsort comparator for Variable array */ static int compareVariableNames(const void *v1, const void *v2) @@ -4869,14 +5156,58 @@ initDropTables(PGconn *con) * with a known size, so we choose to partition it. */ static void -createPartitions(PGconn *con) +createPartitions(PGconn *con, int part_start, int part_end) { PQExpBufferData query; /* we must have to create some partitions */ Assert(partitions > 0); - fprintf(stderr, "creating %d partitions...\n", partitions); + /* If called with -1, create all partitions */ + if (part_start == -1) + { + part_start = 1; + part_end = partitions; + fprintf(stderr, "creating %d partitions...\n", partitions); + } + + initPQExpBuffer(&query); + + for (int p = part_start; p <= part_end; p++) + { + /* + * Create standalone tables (not attached to parent yet). + * This avoids AccessExclusiveLock on the parent table, allowing + * parallel creation. Tables will be attached after data loading. + */ + printfPQExpBuffer(&query, + "create%s table pgbench_accounts_%d\n" + " (aid int not null,\n" + " bid int,\n" + " abalance int,\n" + " filler character(84))\n" + " with (fillfactor=%d)", + unlogged_tables ? " unlogged" : "", p, + fillfactor); + + executeStatement(con, query.data); + } + + termPQExpBuffer(&query); +} + +/* + * Attach standalone partition tables to the parent table. + * Called after all data has been loaded in parallel. + */ +static void +attachPartitions(PGconn *con) +{ + PQExpBufferData query; + + Assert(partitions > 0); + + fprintf(stderr, "attaching %d partitions...\n", partitions); initPQExpBuffer(&query); @@ -4884,13 +5215,12 @@ createPartitions(PGconn *con) { if (partition_method == PART_RANGE) { - int64 part_size = (naccounts * (int64) scale + partitions - 1) / partitions; + int64 part_size = (naccounts * (int64) scale + partitions - 1) / partitions; printfPQExpBuffer(&query, - "create%s table pgbench_accounts_%d\n" - " partition of pgbench_accounts\n" - " for values from (", - unlogged_tables ? " unlogged" : "", p); + "alter table pgbench_accounts\n" + " attach partition pgbench_accounts_%d\n" + " for values from (", p); /* * For RANGE, we use open-ended partitions at the beginning and @@ -4913,21 +5243,16 @@ createPartitions(PGconn *con) appendPQExpBufferChar(&query, ')'); } else if (partition_method == PART_HASH) + { printfPQExpBuffer(&query, - "create%s table pgbench_accounts_%d\n" - " partition of pgbench_accounts\n" + "alter table pgbench_accounts\n" + " attach partition pgbench_accounts_%d\n" " for values with (modulus %d, remainder %d)", - unlogged_tables ? " unlogged" : "", p, - partitions, p - 1); + p, partitions, p - 1); + } else /* cannot get there */ Assert(0); - /* - * Per ddlinfo in initCreateTables, fillfactor is needed on table - * pgbench_accounts. - */ - appendPQExpBuffer(&query, " with (fillfactor=%d)", fillfactor); - executeStatement(con, query.data); } @@ -5025,8 +5350,8 @@ initCreateTables(PGconn *con) termPQExpBuffer(&query); - if (partition_method != PART_NONE) - createPartitions(con); + if (partition_method != PART_NONE && (nthreads == 1 || partition_method == PART_HASH)) + createPartitions(con, -1, -1); } /* @@ -5035,11 +5360,7 @@ initCreateTables(PGconn *con) static void initTruncateTables(PGconn *con) { - executeStatement(con, "truncate table " - "pgbench_accounts, " - "pgbench_branches, " - "pgbench_history, " - "pgbench_tellers"); + truncateTable(con, "pgbench_accounts, pgbench_branches, pgbench_history, pgbench_tellers"); } static void @@ -5069,8 +5390,40 @@ initAccount(PQExpBufferData *sql, int64 curr) curr + 1, curr / naccounts + 1); } +/* + * Append-based versions to enable batching. + * These use appendPQExpBuffer instead of printfPQExpBuffer to allow + * multiple rows to be accumulated in a single buffer. + */ +static void +appendBranch(PQExpBufferData *sql, int64 curr) +{ + /* "filler" column uses NULL */ + appendPQExpBuffer(sql, + INT64_FORMAT "\t0\t\\N\n", + curr + 1); +} + +static void +appendTeller(PQExpBufferData *sql, int64 curr) +{ + /* "filler" column uses NULL */ + appendPQExpBuffer(sql, + INT64_FORMAT "\t" INT64_FORMAT "\t0\t\\N\n", + curr + 1, curr / ntellers + 1); +} + static void -initPopulateTable(PGconn *con, const char *table, int64 base, +appendAccount(PQExpBufferData *sql, int64 curr) +{ + /* "filler" column defaults to blank padded empty string */ + appendPQExpBuffer(sql, + INT64_FORMAT "\t" INT64_FORMAT "\t0\t\n", + curr + 1, curr / naccounts + 1); +} + +static void +initPopulateTableSerial(PGconn *con, const char *table, int64 base, initRowMethod init_row) { int n; @@ -5079,7 +5432,7 @@ initPopulateTable(PGconn *con, const char *table, int64 base, int prev_chars = 0; PGresult *res; PQExpBufferData sql; - char copy_statement[256]; + char copy_statement[NAMEDATALEN + 32]; const char *copy_statement_fmt = "copy %s from stdin"; int64 total = base * scale; @@ -5188,6 +5541,27 @@ initPopulateTable(PGconn *con, const char *table, int64 base, termPQExpBuffer(&sql); } +static void +initPopulateTable(PGconn *con, const char *table, int64 total_rows, + initRowMethod init_row, initRowMethod append_row, bool use_parallel) +{ + bool is_accounts = (strcmp(table, "pgbench_accounts") == 0); + + if (use_parallel && nthreads > 1) + initPopulateTableParallel(con, nthreads, table, total_rows * scale, append_row); + else + { + /* + * For single-threaded mode with partitioned tables, attach partitions + * before loading data so COPY to the parent table can route rows. + */ + if (is_accounts && partitions > 0 && partition_method != PART_NONE) + attachPartitions(con); + + initPopulateTableSerial(con, table, total_rows, init_row); + } +} + /* * Fill the standard tables with some data generated and sent from the client. * @@ -5200,8 +5574,9 @@ initGenerateDataClientSide(PGconn *con) fprintf(stderr, "generating data (client-side)...\n"); /* - * we do all of this in one transaction to enable the backend's - * data-loading optimizations + * For single-threaded mode, do everything in one transaction. + * For multi-threaded mode, do branches/tellers/history in one transaction, + * then accounts in parallel (each thread handles its own transaction). */ executeStatement(con, "begin"); @@ -5212,11 +5587,16 @@ initGenerateDataClientSide(PGconn *con) * fill branches, tellers, accounts in that order in case foreign keys * already exist */ - initPopulateTable(con, "pgbench_branches", nbranches, initBranch); - initPopulateTable(con, "pgbench_tellers", ntellers, initTeller); - initPopulateTable(con, "pgbench_accounts", naccounts, initAccount); + initPopulateTable(con, "pgbench_branches", nbranches, initBranch, appendBranch, false); + initPopulateTable(con, "pgbench_tellers", ntellers, initTeller, appendTeller, false); - executeStatement(con, "commit"); + if (nthreads > 1) + executeStatement(con, "commit"); + + initPopulateTable(con, "pgbench_accounts", naccounts, initAccount, appendAccount, nthreads > 1); + + if (nthreads == 1) + executeStatement(con, "commit"); } /* @@ -5242,6 +5622,9 @@ initGenerateDataServerSide(PGconn *con) /* truncate away any old data */ initTruncateTables(con); + if (partitions > 0 && partition_method != PART_NONE) + attachPartitions(con); + initPQExpBuffer(&sql); printfPQExpBuffer(&sql, @@ -6989,7 +7372,6 @@ main(int argc, char **argv) initialization_option_set = true; break; case 'j': /* jobs */ - benchmarking_option_set = true; if (!option_parse_int(optarg, "-j/--jobs", 1, INT_MAX, &nthreads)) { @@ -7221,7 +7603,7 @@ main(int argc, char **argv) * optimization; throttle_delay is calculated incorrectly below if some * threads have no clients assigned to them.) */ - if (nthreads > nclients) + if (nthreads > nclients && !is_init_mode) nthreads = nclients; /* @@ -7266,6 +7648,9 @@ main(int argc, char **argv) if (partitions > 0 && partition_method == PART_NONE) partition_method = PART_RANGE; + if (partition_method == PART_HASH && nthreads > 1) + pg_fatal("parallel data loading (-j) is not supported with hash partitioning"); + if (initialize_steps == NULL) initialize_steps = pg_strdup(DEFAULT_INIT_STEPS); -- 2.39.5 (Apple Git-154)