diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index f6cb5d4..f93673e 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -269,7 +269,8 @@ typedef enum * * CSTATE_START_COMMAND starts the execution of a command. On a SQL * command, the command is sent to the server, and we move to - * CSTATE_WAIT_RESULT state. On a \sleep meta-command, the timer is set, + * CSTATE_WAIT_RESULT state unless in batch mode. + * On a \sleep meta-command, the timer is set, * and we enter the CSTATE_SLEEP state to wait for it to expire. Other * meta-commands are executed immediately. * @@ -1882,11 +1883,24 @@ sendCommand(CState *st, Command *command) if (commands[j]->type != SQL_COMMAND) continue; preparedStatementName(name, st->use_file, j); - res = PQprepare(st->con, name, - commands[j]->argv[0], commands[j]->argc - 1, NULL); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - fprintf(stderr, "%s", PQerrorMessage(st->con)); - PQclear(res); + if (PQbatchStatus(st->con) == PQBATCH_MODE_OFF) + { + res = PQprepare(st->con, name, + commands[j]->argv[0], commands[j]->argc - 1, NULL); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + fprintf(stderr, "%s", PQerrorMessage(st->con)); + PQclear(res); + } + else + { + /* + * In batch mode, we use asynchronous functions. If a server-side + * error occurs, it will be processed later among the other results. + */ + if (!PQsendPrepare(st->con, name, + commands[j]->argv[0], commands[j]->argc - 1, NULL)) + fprintf(stderr, "%s", PQerrorMessage(st->con)); + } } st->prepared[st->use_file] = true; } @@ -2165,7 +2179,13 @@ doCustom(TState *thread, CState *st, StatsData *agg) return; } else - st->state = CSTATE_WAIT_RESULT; + { + /* Wait for results, unless in batch mode */ + if (PQbatchStatus(st->con) == PQBATCH_MODE_OFF) + st->state = CSTATE_WAIT_RESULT; + else + st->state = CSTATE_END_COMMAND; + } } else if (command->type == META_COMMAND) { @@ -2207,7 +2227,51 @@ doCustom(TState *thread, CState *st, StatsData *agg) } else { - if (pg_strcasecmp(argv[0], "set") == 0) + if (pg_strcasecmp(argv[0], "beginbatch") == 0) + { + /* + * In batch mode, we use a workflow based on libpq batch + * functions. + */ + if (querymode == QUERY_SIMPLE) + { + commandFailed(st, "cannot use batch mode with the simple query protocol"); + st->state = CSTATE_ABORTED; + break; + } + + if (PQbatchStatus(st->con) != PQBATCH_MODE_OFF) + { + commandFailed(st, "already in batch mode"); + break; + } + if (PQbatchBegin(st->con) == 0) + { + commandFailed(st, "failed to start a batch"); + break; + } + } + else if (pg_strcasecmp(argv[0], "endbatch") == 0) + { + if (PQbatchStatus(st->con) != PQBATCH_MODE_ON) + { + commandFailed(st, "not in batch mode"); + break; + } + if (!PQbatchQueueSync(st->con)) + { + commandFailed(st, "failed to end the batch"); + st->state = CSTATE_ABORTED; + break; + } + if (PQbatchEnd(st->con) == 0) + { + /* collect pending results before getting out of batch mode */ + st->state = CSTATE_WAIT_RESULT; + break; + } + } + else if (pg_strcasecmp(argv[0], "set") == 0) { PgBenchExpr *expr = command->expr; PgBenchValue result; @@ -2279,6 +2343,7 @@ doCustom(TState *thread, CState *st, StatsData *agg) } break; + /* * Wait for the current SQL command to complete */ @@ -2295,6 +2360,13 @@ doCustom(TState *thread, CState *st, StatsData *agg) if (PQisBusy(st->con)) return; /* don't have the whole result yet */ + if (PQbatchStatus(st->con) == PQBATCH_MODE_ON && + !PQbatchQueueProcess(st->con)) + { + /* no complete result yet in batch mode*/ + return; + } + /* * Read and discard the query result; */ @@ -2307,7 +2379,22 @@ doCustom(TState *thread, CState *st, StatsData *agg) /* OK */ PQclear(res); discard_response(st); - st->state = CSTATE_END_COMMAND; + /* + * In non-batch mode, only one result per command is expected. + * In batch mode, keep waiting for results until getting + * PGRES_BATCH_END. + */ + if (PQbatchStatus(st->con) != PQBATCH_MODE_ON) + st->state = CSTATE_END_COMMAND; + break; + case PGRES_BATCH_END: + if (PQbatchEnd(st->con) == 1) + { + /* all results collected, exit out of command and batch mode */ + st->state = CSTATE_END_COMMAND; + } + else + fprintf(stderr, "client %d to exit batch mode", st->id); break; default: commandFailed(st, PQerrorMessage(st->con)); @@ -3173,6 +3260,13 @@ process_backslash_command(PsqlScanState sstate, const char *source) syntax_error(source, lineno, my_command->line, my_command->argv[0], "missing command", NULL, -1); } + else if (pg_strcasecmp(my_command->argv[0], "beginbatch") == 0 || + pg_strcasecmp(my_command->argv[0], "endbatch") == 0 ) + { + if (my_command->argc > 1) + syntax_error(source, lineno, my_command->line, my_command->argv[0], + "unexpected argument", NULL, -1); + } else { syntax_error(source, lineno, my_command->line, my_command->argv[0],