diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 1515ed405ba..b462dcc8348 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -35,6 +35,7 @@ #include #include +#include #include #include #include @@ -237,6 +238,11 @@ static const char *const PARTITION_METHOD[] = {"none", "range", "hash"}; /* random seed used to initialize base_random_sequence */ static int64 random_seed = -1; +/* LISTEN/NOTIFY benchmark mode parameters */ +static bool listen_notify_mode = false; /* enable LISTEN/NOTIFY benchmark */ +static int notify_round_trips = 100; /* number of round-trips per iteration */ +static int notify_idle_step = 10; /* idle listeners to add per iteration */ + /* * end of configurable parameters *********************************************************************/ @@ -930,6 +936,10 @@ usage(void) " (same as \"-b simple-update\")\n" " -S, --select-only perform SELECT-only transactions\n" " (same as \"-b select-only\")\n" + " --listen-notify-benchmark\n" + " run LISTEN/NOTIFY round-trip benchmark\n" + " --notify-round-trips=NUM number of round-trips per iteration (default: 100)\n" + " --notify-idle-step=NUM idle listeners to add per iteration (default: 10)\n" "\nBenchmarking options:\n" " -c, --client=NUM number of concurrent database clients (default: 1)\n" " -C, --connect establish new connection for each transaction\n" @@ -6689,6 +6699,216 @@ set_random_seed(const char *seed) return true; } +/* + * Run LISTEN/NOTIFY round-trip benchmark + * + * This benchmark measures the round-trip time between two processes that + * ping-pong NOTIFY messages while adding idle listening connections. + */ +static void +runListenNotifyBenchmark(void) +{ + PGconn *conn1 = NULL; + PGconn *conn2 = NULL; + PGconn **idle_conns = NULL; + int num_idle = 0; + int max_idle = 10000; /* reasonable upper limit */ + PGresult *res; + char channel1[] = "pgbench_channel_1"; + char channel2[] = "pgbench_channel_2"; + char notify_cmd[256]; + bool first_failure = false; + + pg_log_info("starting LISTEN/NOTIFY round-trip benchmark"); + pg_log_info("round-trips per iteration: %d", notify_round_trips); + pg_log_info("idle listeners added per iteration: %d", notify_idle_step); + printf("\n%14s %19s %19s\n", "idle_listeners", "round_trips_per_sec", "max_latency_usec"); + + /* Allocate array for idle connections */ + idle_conns = (PGconn **) pg_malloc0(max_idle * sizeof(PGconn *)); + + /* Create two active connections for ping-pong */ + conn1 = doConnect(); + if (conn1 == NULL) + pg_fatal("failed to create connection 1"); + + conn2 = doConnect(); + if (conn2 == NULL) + pg_fatal("failed to create connection 2"); + + /* Set up LISTEN on both connections */ + snprintf(notify_cmd, sizeof(notify_cmd), "LISTEN %s", channel1); + res = PQexec(conn1, notify_cmd); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("LISTEN failed on connection 1: %s", PQerrorMessage(conn1)); + PQclear(res); + + snprintf(notify_cmd, sizeof(notify_cmd), "LISTEN %s", channel2); + res = PQexec(conn2, notify_cmd); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("LISTEN failed on connection 2: %s", PQerrorMessage(conn2)); + PQclear(res); + + /* Main benchmark loop: measure round-trips then add idle connections */ + while (num_idle < max_idle) + { + int i; + int64 total_latency = 0; + int64 max_latency = 0; + + /* Perform round-trip measurements */ + for (i = 0; i < notify_round_trips; i++) + { + pg_time_usec_t start_time, + end_time; + int64 latency; + PGnotify *notify; + int sock; + fd_set input_mask; + struct timeval tv; + + /* Clear any pending notifications */ + PQconsumeInput(conn1); + while ((notify = PQnotifies(conn1)) != NULL) + PQfreemem(notify); + PQconsumeInput(conn2); + while ((notify = PQnotifies(conn2)) != NULL) + PQfreemem(notify); + + /* Start timer and send notification from conn1 */ + start_time = pg_time_now(); + snprintf(notify_cmd, sizeof(notify_cmd), "NOTIFY %s", channel2); + res = PQexec(conn1, notify_cmd); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("NOTIFY failed: %s", PQerrorMessage(conn1)); + PQclear(res); + + /* Wait for notification on conn2 */ + sock = PQsocket(conn2); + notify = NULL; + while (notify == NULL) + { + PQconsumeInput(conn2); + notify = PQnotifies(conn2); + if (notify == NULL) + { + /* Wait for data on socket */ + FD_ZERO(&input_mask); + FD_SET(sock, &input_mask); + tv.tv_sec = 10; /* 10 second timeout */ + tv.tv_usec = 0; + if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0) + pg_fatal("select() failed: %m"); + } + } + PQfreemem(notify); + + /* Send notification back from conn2 */ + snprintf(notify_cmd, sizeof(notify_cmd), "NOTIFY %s", channel1); + res = PQexec(conn2, notify_cmd); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("NOTIFY failed: %s", PQerrorMessage(conn2)); + PQclear(res); + + /* Wait for notification on conn1 */ + sock = PQsocket(conn1); + notify = NULL; + while (notify == NULL) + { + PQconsumeInput(conn1); + notify = PQnotifies(conn1); + if (notify == NULL) + { + /* Wait for data on socket */ + FD_ZERO(&input_mask); + FD_SET(sock, &input_mask); + tv.tv_sec = 10; /* 10 second timeout */ + tv.tv_usec = 0; + if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0) + pg_fatal("select() failed: %m"); + } + } + PQfreemem(notify); + + /* End timer */ + end_time = pg_time_now(); + + /* Calculate individual round-trip latency */ + latency = end_time - start_time; + + /* Accumulate total latency and track maximum */ + total_latency += latency; + if (latency > max_latency) + max_latency = latency; + } + + /* Calculate and report round-trips per second and max latency */ + fprintf(stdout, "%14d %19.1f %19" PRId64 "\n", + num_idle, + 1000000.0 * notify_round_trips / total_latency, + max_latency); + fflush(stdout); + + /* Stop if we hit connection limit */ + if (first_failure) + break; + + /* Add idle listening connections */ + for (i = 0; i < notify_idle_step && num_idle < max_idle; i++) + { + PGconn *idle_conn; + char idle_channel[256]; + + idle_conn = doConnect(); + if (idle_conn == NULL) + { + if (!first_failure) + { + pg_log_info("reached max_connections at %d idle listeners", num_idle); + first_failure = true; + } + break; + } + + /* Each idle connection listens on a unique channel */ + snprintf(idle_channel, sizeof(idle_channel), "idle_%d", num_idle); + snprintf(notify_cmd, sizeof(notify_cmd), "LISTEN %s", idle_channel); + + res = PQexec(idle_conn, notify_cmd); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_warning("LISTEN failed on idle connection %d: %s", + num_idle, PQerrorMessage(idle_conn)); + PQfinish(idle_conn); + PQclear(res); + first_failure = true; + break; + } + PQclear(res); + + idle_conns[num_idle] = idle_conn; + num_idle++; + } + + /* Stop if we couldn't add any connections */ + if (first_failure && i == 0) + break; + } + + /* Clean up */ + pg_log_info("cleaning up connections"); + PQfinish(conn1); + PQfinish(conn2); + for (int i = 0; i < num_idle; i++) + { + if (idle_conns[i]) + PQfinish(idle_conns[i]); + } + pg_free(idle_conns); + + pg_log_info("LISTEN/NOTIFY benchmark completed"); +} + int main(int argc, char **argv) { @@ -6739,6 +6959,9 @@ main(int argc, char **argv) {"verbose-errors", no_argument, NULL, 15}, {"exit-on-abort", no_argument, NULL, 16}, {"debug", no_argument, NULL, 17}, + {"listen-notify-benchmark", no_argument, NULL, 18}, + {"notify-round-trips", required_argument, NULL, 19}, + {"notify-idle-step", required_argument, NULL, 20}, {NULL, 0, NULL, 0} }; @@ -7092,6 +7315,22 @@ main(int argc, char **argv) case 17: /* debug */ pg_logging_increase_verbosity(); break; + case 18: /* listen-notify-benchmark */ + listen_notify_mode = true; + benchmarking_option_set = true; + break; + case 19: /* notify-round-trips */ + benchmarking_option_set = true; + if (!option_parse_int(optarg, "--notify-round-trips", 1, INT_MAX, + ¬ify_round_trips)) + exit(1); + break; + case 20: /* notify-idle-step */ + benchmarking_option_set = true; + if (!option_parse_int(optarg, "--notify-idle-step", 1, INT_MAX, + ¬ify_idle_step)) + exit(1); + break; default: /* getopt_long already emitted a complaint */ pg_log_error_hint("Try \"%s --help\" for more information.", progname); @@ -7210,6 +7449,20 @@ main(int argc, char **argv) pg_fatal("some of the specified options cannot be used in benchmarking mode"); } + /* Handle LISTEN/NOTIFY benchmark mode */ + if (listen_notify_mode) + { + /* Establish a database connection for setup */ + if ((con = doConnect()) == NULL) + pg_fatal("could not connect to database"); + + /* Run the LISTEN/NOTIFY benchmark */ + runListenNotifyBenchmark(); + + PQfinish(con); + exit(0); + } + if (nxacts > 0 && duration > 0) pg_fatal("specify either a number of transactions (-t) or a duration (-T), not both");