diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 41b756c089..369e321196 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -267,14 +267,22 @@ typedef enum * transaction, and advance to CSTATE_THROTTLE. CSTATE_THROTTLE state * sleeps until that moment. (If throttling is not enabled, doCustom() * falls directly through from CSTATE_START_THROTTLE to CSTATE_START_TX.) + * + * It may also detect that the next transaction would start beyond the end + * of run, and switch to CSTATE_FINISHED. */ CSTATE_START_THROTTLE, CSTATE_THROTTLE, /* * CSTATE_START_TX performs start-of-transaction processing. Establishes - * a new connection for the transaction, in --connect mode, and records - * the transaction start time. + * a new connection for the transaction in --connect mode, records + * the transaction start time, and proceed to the first command. + * + * Note: once a script is started, it will either error or run till + * its end, where it may be interrupted. It is not interrupted while + * running, so pgbench --time is to be understood as tx are allowed to + * start in that time, and will finish when their work is completed. */ CSTATE_START_TX, @@ -287,9 +295,6 @@ typedef enum * and we enter the CSTATE_SLEEP state to wait for it to expire. Other * meta-commands are executed immediately. * - * CSTATE_SKIP_COMMAND for conditional branches which are not executed, - * quickly skip commands that do not need any evaluation. - * * CSTATE_WAIT_RESULT waits until we get a result set back from the server * for the current command. * @@ -297,19 +302,25 @@ typedef enum * * CSTATE_END_COMMAND records the end-of-command timestamp, increments the * command counter, and loops back to CSTATE_START_COMMAND state. + * + * CSTATE_SKIP_COMMAND is used by conditional branches which are not + * executed. It quickly skip commands that do not need any evaluation. + * This state can move forward several commands, till there is something + * to do or the end of the script. */ CSTATE_START_COMMAND, - CSTATE_SKIP_COMMAND, CSTATE_WAIT_RESULT, CSTATE_SLEEP, CSTATE_END_COMMAND, + CSTATE_SKIP_COMMAND, /* - * CSTATE_END_TX performs end-of-transaction processing. Calculates - * latency, and logs the transaction. In --connect mode, closes the - * current connection. Chooses the next script to execute and starts over - * in CSTATE_START_THROTTLE state, or enters CSTATE_FINISHED if we have no - * more work to do. + * CSTATE_END_TX performs end-of-transaction processing. It calculates + * latency, and logs the transaction. In --connect mode, it closes the + * current connection. + * + * Then either starts over in CSTATE_CHOOSE_SCRIPT, or enters CSTATE_FINISHED + * if we have no more work to do. */ CSTATE_END_TX, @@ -2679,16 +2690,13 @@ evaluateSleep(CState *st, int argc, char **argv, int *usecs) /* * Advance the state machine of a connection, if possible. + * + * All state changes are performed within this function called + * by threadRun. */ static void doCustom(TState *thread, CState *st, StatsData *agg) { - PGresult *res; - Command *command; - instr_time now; - bool end_tx_processed = false; - int64 wait; - /* * gettimeofday() isn't free, so we get the current timestamp lazily the * first time it's needed, and reuse the same value throughout this @@ -2697,37 +2705,44 @@ doCustom(TState *thread, CState *st, StatsData *agg) * means "not set yet". Reset "now" when we execute shell commands or * expressions, which might take a non-negligible amount of time, though. */ + instr_time now; INSTR_TIME_SET_ZERO(now); /* * Loop in the state machine, until we have to wait for a result from the - * server (or have to sleep, for throttling or for \sleep). + * server or have to sleep for throttling or \sleep. * * Note: In the switch-statement below, 'break' will loop back here, * meaning "continue in the state machine". Return is used to return to - * the caller. + * the caller, giving the thread opportunity to move forward another client. */ for (;;) { + PGresult *res; + Command *command; + switch (st->state) { /* * Select transaction to run. */ case CSTATE_CHOOSE_SCRIPT: - st->use_file = chooseScript(thread); if (debug) fprintf(stderr, "client %d executing script \"%s\"\n", st->id, sql_script[st->use_file].desc); - if (throttle_delay > 0) + /* check stack consistency */ + Assert(conditional_stack_empty(st->cstack)); + + if (timer_exceeded) + st->state = CSTATE_FINISHED; + else if (throttle_delay > 0) st->state = CSTATE_START_THROTTLE; else st->state = CSTATE_START_TX; - /* check consistency */ - Assert(conditional_stack_empty(st->cstack)); + break; /* @@ -2745,21 +2760,10 @@ doCustom(TState *thread, CState *st, StatsData *agg) * away. */ Assert(throttle_delay > 0); - wait = getPoissonRand(thread, throttle_delay); - thread->throttle_trigger += wait; + thread->throttle_trigger += getPoissonRand(thread, throttle_delay); st->txn_scheduled = thread->throttle_trigger; - /* - * stop client if next transaction is beyond pgbench end of - * execution - */ - if (duration > 0 && st->txn_scheduled > end_time) - { - st->state = CSTATE_FINISHED; - break; - } - /* * If --latency-limit is used, and this slot is already late * so that the transaction will miss the latency limit even if @@ -2771,19 +2775,19 @@ doCustom(TState *thread, CState *st, StatsData *agg) { int64 now_us; - if (INSTR_TIME_IS_ZERO(now)) - INSTR_TIME_SET_CURRENT(now); + INSTR_TIME_SET_CURRENT_LAZY(now); now_us = INSTR_TIME_GET_MICROSEC(now); + while (thread->throttle_trigger < now_us - latency_limit && (nxacts <= 0 || st->cnt < nxacts)) { processXactStats(thread, st, &now, true, agg); /* next rendez-vous */ - wait = getPoissonRand(thread, throttle_delay); - thread->throttle_trigger += wait; + thread->throttle_trigger += getPoissonRand(thread, throttle_delay); st->txn_scheduled = thread->throttle_trigger; } - /* stop client if -t exceeded */ + + /* stop client if -t was exceeded in the previous skip loop */ if (nxacts > 0 && st->cnt >= nxacts) { st->state = CSTATE_FINISHED; @@ -2791,38 +2795,45 @@ doCustom(TState *thread, CState *st, StatsData *agg) } } + /* + * stop client if next transaction is beyond pgbench end of + * execution. + */ + if (duration > 0 && st->txn_scheduled > end_time) + { + st->state = CSTATE_FINISHED; + break; + } + st->state = CSTATE_THROTTLE; - if (debug) - fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n", - st->id, wait); break; /* * Wait until it's time to start next transaction. */ case CSTATE_THROTTLE: - if (INSTR_TIME_IS_ZERO(now)) - INSTR_TIME_SET_CURRENT(now); + + INSTR_TIME_SET_CURRENT_LAZY(now); + if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled) - return; /* Still sleeping, nothing to do here */ + return; /* still sleeping, nothing to do here */ - /* Else done sleeping, start the transaction */ - st->state = CSTATE_START_TX; + /* done sleeping, but do not start if transaction if we are done */ + if (timer_exceeded) + st->state = CSTATE_FINISHED; + else + st->state = CSTATE_START_TX; break; /* Start new transaction */ case CSTATE_START_TX: - /* - * Establish connection on first call, or if is_connect is - * true. - */ + /* establish connection if needed, i.e. under --connect */ if (st->con == NULL) { instr_time start; - if (INSTR_TIME_IS_ZERO(now)) - INSTR_TIME_SET_CURRENT(now); + INSTR_TIME_SET_CURRENT_LAZY(now); start = now; if ((st->con = doConnect()) == NULL) { @@ -2838,28 +2849,20 @@ doCustom(TState *thread, CState *st, StatsData *agg) memset(st->prepared, 0, sizeof(st->prepared)); } + /* record transaction start time. */ + INSTR_TIME_SET_CURRENT_LAZY(now); + st->txn_begin = now; + /* - * Record transaction start time under logging, progress or - * throttling. + * When not throttling, this is also the transaction's + * scheduled start time. */ - if (use_log || progress || throttle_delay || latency_limit || - per_script_stats) - { - if (INSTR_TIME_IS_ZERO(now)) - INSTR_TIME_SET_CURRENT(now); - st->txn_begin = now; - - /* - * When not throttling, this is also the transaction's - * scheduled start time. - */ - if (!throttle_delay) - st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now); - } + if (!throttle_delay) + st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now); /* Begin with the first command */ - st->command = 0; st->state = CSTATE_START_COMMAND; + st->command = 0; break; /* @@ -2878,17 +2881,11 @@ doCustom(TState *thread, CState *st, StatsData *agg) break; } - /* - * Record statement start time if per-command latencies are - * requested - */ - if (is_latencies) - { - if (INSTR_TIME_IS_ZERO(now)) - INSTR_TIME_SET_CURRENT(now); - st->stmt_begin = now; - } + /* record statement start time. */ + INSTR_TIME_SET_CURRENT_LAZY(now); + st->stmt_begin = now; + /* execute the command */ if (command->type == SQL_COMMAND) { if (!sendCommand(st, command)) @@ -2931,8 +2928,8 @@ doCustom(TState *thread, CState *st, StatsData *agg) break; } - if (INSTR_TIME_IS_ZERO(now)) - INSTR_TIME_SET_CURRENT(now); + INSTR_TIME_SET_CURRENT_LAZY(now); + st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec; st->state = CSTATE_SLEEP; break; @@ -2983,10 +2980,7 @@ doCustom(TState *thread, CState *st, StatsData *agg) } else /* elif */ { - /* - * we should get here only if the "elif" - * needed evaluation - */ + /* we should get here only if the "elif" needed evaluation */ Assert(conditional_stack_peek(st->cstack) == IFSTATE_FALSE); conditional_stack_poke(st->cstack, cond ? IFSTATE_TRUE : IFSTATE_FALSE); } @@ -3018,43 +3012,23 @@ doCustom(TState *thread, CState *st, StatsData *agg) } else if (command->meta == META_SETSHELL) { - bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2); - - if (timer_exceeded) /* timeout */ - { - st->state = CSTATE_FINISHED; - break; - } - else if (!ret) /* on error */ + if (!runShellCommand(st, argv[1], argv + 2, argc - 2)) { commandFailed(st, "setshell", "execution of meta-command failed"); st->state = CSTATE_ABORTED; break; } - else - { - /* succeeded */ - } + /* else success */ } else if (command->meta == META_SHELL) { - bool ret = runShellCommand(st, NULL, argv + 1, argc - 1); - - if (timer_exceeded) /* timeout */ - { - st->state = CSTATE_FINISHED; - break; - } - else if (!ret) /* on error */ + if (!runShellCommand(st, NULL, argv + 1, argc - 1)) { commandFailed(st, "shell", "execution of meta-command failed"); st->state = CSTATE_ABORTED; break; } - else - { - /* succeeded */ - } + /* else success */ } move_to_end_command: @@ -3156,6 +3130,7 @@ doCustom(TState *thread, CState *st, StatsData *agg) } if (st->state != CSTATE_SKIP_COMMAND) + /* out of quick skip command loop */ break; } break; @@ -3205,10 +3180,9 @@ doCustom(TState *thread, CState *st, StatsData *agg) * instead of CSTATE_START_TX. */ case CSTATE_SLEEP: - if (INSTR_TIME_IS_ZERO(now)) - INSTR_TIME_SET_CURRENT(now); + INSTR_TIME_SET_CURRENT_LAZY(now); if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until) - return; /* Still sleeping, nothing to do here */ + return; /* still sleeping, nothing to do here */ /* Else done sleeping. */ st->state = CSTATE_END_COMMAND; break; @@ -3223,17 +3197,13 @@ doCustom(TState *thread, CState *st, StatsData *agg) * in thread-local data structure, if per-command latencies * are requested. */ - if (is_latencies) - { - if (INSTR_TIME_IS_ZERO(now)) - INSTR_TIME_SET_CURRENT(now); + INSTR_TIME_SET_CURRENT_LAZY(now); - /* XXX could use a mutex here, but we choose not to */ - command = sql_script[st->use_file].commands[st->command]; - addToSimpleStats(&command->stats, - INSTR_TIME_GET_DOUBLE(now) - - INSTR_TIME_GET_DOUBLE(st->stmt_begin)); - } + /* XXX could use a mutex here, but we choose not to */ + command = sql_script[st->use_file].commands[st->command]; + addToSimpleStats(&command->stats, + INSTR_TIME_GET_DOUBLE(now) - + INSTR_TIME_GET_DOUBLE(st->stmt_begin)); /* Go ahead with next command, to be executed or skipped */ st->command++; @@ -3242,19 +3212,15 @@ doCustom(TState *thread, CState *st, StatsData *agg) break; /* - * End of transaction. + * End of transaction (end of script, really). */ case CSTATE_END_TX: /* transaction finished: calculate latency and do log */ processXactStats(thread, st, &now, false, agg); - /* conditional stack must be empty */ - if (!conditional_stack_empty(st->cstack)) - { - fprintf(stderr, "end of script reached within a conditional, missing \\endif\n"); - exit(1); - } + /* missing \endif... cannot happen if CheckConditional was okay */ + Assert(conditional_stack_empty(st->cstack)); if (is_connect) { @@ -3268,26 +3234,17 @@ doCustom(TState *thread, CState *st, StatsData *agg) st->state = CSTATE_FINISHED; break; } + else + { + /* next transaction */ + st->state = CSTATE_CHOOSE_SCRIPT; - /* - * No transaction is underway anymore. - */ - st->state = CSTATE_CHOOSE_SCRIPT; - - /* - * If we paced through all commands in the script in this - * loop, without returning to the caller even once, do it now. - * This gives the thread a chance to process other - * connections, and to do progress reporting. This can - * currently only happen if the script consists entirely of - * meta-commands. - */ - if (end_tx_processed) + /* + * Ensure that we always return on this point, so as + * to avoid an infinite loop if the script only contains + * meta commands. + */ return; - else - { - end_tx_processed = true; - break; } /* @@ -3401,8 +3358,7 @@ processXactStats(TState *thread, CState *st, instr_time *now, if (detailed && !skipped) { - if (INSTR_TIME_IS_ZERO(*now)) - INSTR_TIME_SET_CURRENT(*now); + INSTR_TIME_SET_CURRENT_LAZY(*now); /* compute latency & lag */ latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled; @@ -5652,7 +5608,7 @@ threadRun(void *arg) if (!is_connect) { - /* make connections to the database */ + /* make connections to the database before starting */ for (i = 0; i < nstate; i++) { if ((state[i].con = doConnect()) == NULL) @@ -5686,14 +5642,7 @@ threadRun(void *arg) { CState *st = &state[i]; - if (st->state == CSTATE_THROTTLE && timer_exceeded) - { - /* interrupt client that has not started a transaction */ - st->state = CSTATE_FINISHED; - finishCon(st); - remains--; - } - else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE) + if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE) { /* a nap from the script, or under throttling */ int64 this_usec; diff --git a/src/include/portability/instr_time.h b/src/include/portability/instr_time.h index f968444671..c46f6825bb 100644 --- a/src/include/portability/instr_time.h +++ b/src/include/portability/instr_time.h @@ -20,6 +20,8 @@ * * INSTR_TIME_SET_CURRENT(t) set t to current time * + * INSTR_TIME_SET_CURRENT_LAZY(t) set t to current time if t is zero + * * INSTR_TIME_ADD(x, y) x += y * * INSTR_TIME_SUBTRACT(x, y) x -= y @@ -245,4 +247,9 @@ GetTimerFrequency(void) #endif /* WIN32 */ +/* same macro on all platforms */ +#define INSTR_TIME_SET_CURRENT_LAZY(t) \ + if (INSTR_TIME_IS_ZERO(t)) \ + INSTR_TIME_SET_CURRENT(t) + #endif /* INSTR_TIME_H */