*** pgbench/pgbench.c 2004-11-09 15:09:31.000000000 +0900 --- pgbench-new/pgbench.c 2005-09-27 14:31:34.000000000 +0900 *************** *** 41,46 **** --- 41,49 ---- #include #endif /* ! WIN32 */ + #include + #include + extern char *optarg; extern int optind; *************** *** 72,77 **** --- 75,83 ---- #define ntellers 10 #define naccounts 100000 + #define SQL_COMMAND 1 + #define META_COMMAND 2 + FILE *LOGFILE = NULL; bool use_log; /* log transaction latencies to a file */ *************** *** 91,96 **** --- 97,108 ---- typedef struct { + char *name; + char *value; + } Variable; + + typedef struct + { PGconn *con; /* connection handle to DB */ int id; /* client No. */ int state; /* state No. */ *************** *** 103,115 **** int tid; /* teller id for this transaction */ int delta; int abalance; struct timeval txn_begin; /* used for measuring latencies */ } CState; static void usage(void) { ! fprintf(stderr, "usage: pgbench [-h hostname][-p port][-c nclients][-t ntransactions][-s scaling_factor][-n][-C][-v][-S][-N][-l][-U login][-P password][-d][dbname]\n"); fprintf(stderr, "(initialize mode): pgbench -i [-h hostname][-p port][-s scaling_factor][-U login][-P password][-d][dbname]\n"); } --- 115,137 ---- int tid; /* teller id for this transaction */ int delta; int abalance; + void *variables; struct timeval txn_begin; /* used for measuring latencies */ } CState; + typedef struct + { + int type; + int argc; + char **argv; + } Command; + + Command **commands = NULL; + static void usage(void) { ! fprintf(stderr, "usage: pgbench [-h hostname][-p port][-c nclients][-t ntransactions][-s scaling_factor][-n][-C][-v][-S][-N][-f filename][-l][-U login][-P password][-d][dbname]\n"); fprintf(stderr, "(initialize mode): pgbench -i [-h hostname][-p port][-s scaling_factor][-U login][-P password][-d][dbname]\n"); } *************** *** 190,195 **** --- 212,326 ---- return (0); /* OK */ } + static int + compareVariables(const void *v1, const void *v2) + { + return strcmp(((Variable *)v1)->name, ((Variable *)v2)->name); + } + + static char * + getVariable(CState * st, char *name) + { + Variable key = { name }, *var; + + var = tfind(&key, &st->variables, compareVariables); + if (var != NULL) + return (*(Variable **)var)->value; + else + return NULL; + } + + static int + putVariable(CState * st, char *name, char *value) + { + Variable key = { name }, *var; + + var = tfind(&key, &st->variables, compareVariables); + if (var == NULL) + { + if ((var = malloc(sizeof(Variable))) == NULL) + return false; + + var->name = NULL; + var->value = NULL; + + if ((var->name = strdup(name)) == NULL + || (var->value = strdup(value)) == NULL + || tsearch(var, &st->variables, compareVariables) == NULL) + { + free(var->name); + free(var->value); + free(var); + return false; + } + } + else + { + free((*(Variable **)var)->value); + if (((*(Variable **)var)->value = strdup(value)) == NULL) + return false; + } + + return true; + } + + static char * + assignVariables(CState * st, char *sql) + { + int i, j; + char *p, *name, *val; + void *tmp; + + i = 0; + while ((p = strchr(&sql[i], ':')) != NULL) + { + i = j = p - sql; + do + i++; + while (isalnum(sql[i]) != 0 || sql[i] == '_'); + if (i == j + 1) + continue; + + if ((name = strndup(&sql[j + 1], i - (j + 1))) == NULL) + return NULL; + val = getVariable(st, name); + free(name); + if (val == NULL) + continue; + + if (strlen(val) > i - j) + { + tmp = realloc(sql, strlen(sql) - (i - j) + strlen(val) + 1); + if (tmp == NULL) + { + free(sql); + return NULL; + } + sql = tmp; + } + + if (strlen(val) != i - j) + memmove(&sql[j + strlen(val)], &sql[i], strlen(&sql[i]) + 1); + + strncpy(&sql[j], val, strlen(val)); + + if (strlen(val) < i - j) + { + tmp = realloc(sql, strlen(sql) + 1); + if (tmp == NULL) + { + free(sql); + return NULL; + } + sql = tmp; + } + + i = j + strlen(val); + } + + return sql; + } + /* process a transaction */ static void doOne(CState * state, int n, int debug, int ttype) *************** *** 465,470 **** --- 596,765 ---- } } + static void + doCustom(CState * state, int n, int debug) + { + PGresult *res; + CState *st = &state[n]; + + if (st->listen) + { /* are we receiver? */ + if (commands[st->state]->type == SQL_COMMAND) + { + if (debug) + fprintf(stderr, "client %d receiving\n", n); + if (!PQconsumeInput(st->con)) + { /* there's something wrong */ + fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state); + remains--; /* I've aborted */ + PQfinish(st->con); + st->con = NULL; + return; + } + if (PQisBusy(st->con)) + return; /* don't have the whole result yet */ + } + + /* + * transaction finished: record the time it took in the + * log + */ + if (use_log && commands[st->state + 1] == NULL) + { + double diff; + struct timeval now; + + gettimeofday(&now, NULL); + diff = (int) (now.tv_sec - st->txn_begin.tv_sec) * 1000000.0 + + (int) (now.tv_usec - st->txn_begin.tv_usec); + + fprintf(LOGFILE, "%d %d %.0f\n", st->id, st->cnt, diff); + } + + if (commands[st->state]->type == SQL_COMMAND) + { + res = PQgetResult(st->con); + if (strncasecmp(commands[st->state]->argv[0], "select", 6) != 0) + { + if (check(state, res, n, PGRES_COMMAND_OK)) + return; + } + else + { + if (check(state, res, n, PGRES_TUPLES_OK)) + return; + } + PQclear(res); + discard_response(st); + } + + if (commands[st->state + 1] == NULL) + { + if (is_connect) + { + PQfinish(st->con); + st->con = NULL; + } + + if (++st->cnt >= nxacts) + { + remains--; /* I'm done */ + if (st->con != NULL) + { + PQfinish(st->con); + st->con = NULL; + } + return; + } + } + + /* increment state counter */ + st->state++; + if (commands[st->state] == NULL) + st->state = 0; + } + + if (st->con == NULL) + { + if ((st->con = doConnect()) == NULL) + { + fprintf(stderr, "Client %d aborted in establishing connection.\n", + n); + remains--; /* I've aborted */ + PQfinish(st->con); + st->con = NULL; + return; + } + } + + if (use_log && st->state == 0) + gettimeofday(&(st->txn_begin), NULL); + + if (commands[st->state]->type == SQL_COMMAND) + { + char *sql; + + if ((sql = strdup(commands[st->state]->argv[0])) == NULL + || (sql = assignVariables(st, sql)) == NULL) + { + fprintf(stderr, "out of memory\n"); + st->ecnt++; + return; + } + + if (debug) + fprintf(stderr, "client %d sending %s\n", n, sql); + if (PQsendQuery(st->con, sql) == 0) + { + if (debug) + fprintf(stderr, "PQsendQuery(%s)failed\n", sql); + st->ecnt++; + } + else + { + st->listen++; /* flags that should be listened */ + } + } + else if (commands[st->state]->type == META_COMMAND) + { + int argc = commands[st->state]->argc, i; + char **argv = commands[st->state]->argv; + + if (debug) + { + fprintf(stderr, "client %d executing \\%s", n, argv[0]); + for (i = 1; i < argc; i++) + fprintf(stderr, " %s", argv[i]); + fprintf(stderr, "\n"); + } + + if (strcasecmp(argv[0], "setrandom") == 0) + { + char *val; + + if ((val = malloc(strlen(argv[3]) + 1)) == NULL) + { + fprintf(stderr, "%s: out of memory\n", argv[0]); + st->ecnt++; + return; + } + + sprintf(val, "%d", getrand(atoi(argv[2]), atoi(argv[3]))); + + if (putVariable(st, argv[1], val) == false) + { + fprintf(stderr, "%s: out of memory\n", argv[0]); + free(val); + st->ecnt++; + return; + } + + free(val); + st->listen++; + } + } + } + /* discard connections */ static void disconnect_all(CState * state) *************** *** 644,649 **** --- 939,1098 ---- PQfinish(con); } + static int + process_file(char *filename) + { + const char delim[] = " \f\n\r\t\v"; + + FILE *fd; + int lineno, i, j; + char buf[BUFSIZ], *p, *tok; + void *tmp; + + if (strcmp(filename, "-") == 0) + fd = stdin; + else if ((fd = fopen(filename, "r")) == NULL) + { + fprintf(stderr, "%s: %s\n", strerror(errno), filename); + return false; + } + + fprintf(stderr, "processing file...\n"); + + lineno = 1; + i = 0; + while (fgets(buf, sizeof(buf), fd) != NULL) + { + if ((p = strchr(buf, '\n')) != NULL) + *p = '\0'; + p = buf; + while (isspace(*p)) + p++; + if (*p == '\0' || strncmp(p, "--", 2) == 0) + { + lineno++; + continue; + } + + if ((tmp = realloc(commands, sizeof(Command *) * (i + 1))) == NULL) + { + i--; + goto error; + } + commands = tmp; + + if ((commands[i] = malloc(sizeof(Command))) == NULL) + goto error; + + commands[i]->argv = NULL; + commands[i]->argc = 0; + + if (*p == '\\') + { + commands[i]->type = META_COMMAND; + + j = 0; + tok = strtok(++p, delim); + while (tok != NULL) + { + tmp = realloc(commands[i]->argv, sizeof(char *) * (j + 1)); + if (tmp == NULL) + goto error; + commands[i]->argv = tmp; + + if ((commands[i]->argv[j] = strdup(tok)) == NULL) + goto error; + + commands[i]->argc++; + + j++; + tok = strtok(NULL, delim); + } + + if (strcasecmp(commands[i]->argv[0], "setrandom") == 0) + { + int min, max; + + if (commands[i]->argc < 4) + { + fprintf(stderr, "%s: %d: \\%s: missing argument\n", filename, lineno, commands[i]->argv[0]); + goto error; + } + + for (j = 4; j < commands[i]->argc; j++) + fprintf(stderr, "%s: %d: \\%s: extra argument \"%s\" ignored\n", filename, lineno, commands[i]->argv[0], commands[i]->argv[j]); + + if ((min = atoi(commands[i]->argv[2])) < 0) + { + fprintf(stderr, "%s: %d: \\%s: invalid minimum number %s\n", filename, lineno, commands[i]->argv[0], commands[i]->argv[2]); + goto error; + } + + if ((max = atoi(commands[i]->argv[3])) < min || max > RAND_MAX) + { + fprintf(stderr, "%s: %d: \\%s: invalid maximum number %s\n", filename, lineno, commands[i]->argv[0], commands[i]->argv[3]); + goto error; + } + } + else + { + fprintf(stderr, "%s: %d: invalid command \\%s\n", filename, lineno, commands[i]->argv[0]); + goto error; + } + } + else + { + commands[i]->type = SQL_COMMAND; + + if ((commands[i]->argv = malloc(sizeof(char *))) == NULL) + goto error; + + if ((commands[i]->argv[0] = strdup(p)) == NULL) + goto error; + + commands[i]->argc++; + } + + i++; + lineno++; + } + fclose(fd); + + if ((tmp = realloc(commands, sizeof(Command *) * (i + 1))) == NULL) + goto error; + commands = tmp; + + commands[i] = NULL; + + return true; + + error: + if (errno == ENOMEM) + fprintf(stderr, "%s: %d: out of memory\n", filename, lineno); + + fclose(fd); + + if (commands == NULL) + return false; + + while (i >= 0) + { + if (commands[i] != NULL) + { + for (j = 0; j < commands[i]->argc; j++) + free(commands[i]->argv[j]); + + free(commands[i]->argv); + free(commands[i]); + } + + i--; + } + free(commands); + + return false; + } + /* print out results */ static void printResults( *************** *** 670,677 **** s = "TPC-B (sort of)"; else if (ttype == 2) s = "Update only accounts"; ! else s = "SELECT only"; printf("transaction type: %s\n", s); printf("scaling factor: %d\n", tps); --- 1119,1128 ---- s = "TPC-B (sort of)"; else if (ttype == 2) s = "Update only accounts"; ! else if (ttype == 1) s = "SELECT only"; + else + s = "Custom query"; printf("transaction type: %s\n", s); printf("scaling factor: %d\n", tps); *************** *** 695,700 **** --- 1146,1152 ---- int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT * only, 2: skip update of branches and * tellers */ + char *filename = NULL; static CState *state; /* status of clients */ *************** *** 724,730 **** else if ((env = getenv("PGUSER")) != NULL && *env != '\0') login = env; ! while ((c = getopt(argc, argv, "ih:nvp:dc:t:s:U:P:CNSl")) != -1) { switch (c) { --- 1176,1182 ---- else if ((env = getenv("PGUSER")) != NULL && *env != '\0') login = env; ! while ((c = getopt(argc, argv, "ih:nvp:dc:t:s:U:P:CNSlf:")) != -1) { switch (c) { *************** *** 806,811 **** --- 1258,1267 ---- case 'l': use_log = true; break; + case 'f': + ttype = 3; + filename = optarg; + break; default: usage(); exit(1); *************** *** 868,941 **** exit(1); } ! /* ! * get the scaling factor that should be same as count(*) from ! * branches... ! */ ! res = PQexec(con, "select count(*) from branches"); ! if (PQresultStatus(res) != PGRES_TUPLES_OK) { ! fprintf(stderr, "%s", PQerrorMessage(con)); ! exit(1); ! } ! tps = atoi(PQgetvalue(res, 0, 0)); ! if (tps < 0) ! { ! fprintf(stderr, "count(*) from branches invalid (%d)\n", tps); ! exit(1); } ! PQclear(res); ! ! if (!is_no_vacuum) { ! fprintf(stderr, "starting vacuum..."); ! res = PQexec(con, "vacuum branches"); ! if (PQresultStatus(res) != PGRES_COMMAND_OK) { fprintf(stderr, "%s", PQerrorMessage(con)); exit(1); } ! PQclear(res); ! ! res = PQexec(con, "vacuum tellers"); ! if (PQresultStatus(res) != PGRES_COMMAND_OK) { ! fprintf(stderr, "%s", PQerrorMessage(con)); exit(1); } PQclear(res); ! res = PQexec(con, "delete from history"); ! if (PQresultStatus(res) != PGRES_COMMAND_OK) { ! fprintf(stderr, "%s", PQerrorMessage(con)); ! exit(1); ! } ! PQclear(res); ! res = PQexec(con, "vacuum history"); ! if (PQresultStatus(res) != PGRES_COMMAND_OK) ! { ! fprintf(stderr, "%s", PQerrorMessage(con)); ! exit(1); ! } ! PQclear(res); ! fprintf(stderr, "end.\n"); ! if (is_full_vacuum) ! { ! fprintf(stderr, "starting full vacuum..."); ! res = PQexec(con, "vacuum analyze accounts"); if (PQresultStatus(res) != PGRES_COMMAND_OK) { fprintf(stderr, "%s", PQerrorMessage(con)); exit(1); } PQclear(res); fprintf(stderr, "end.\n"); } } - PQfinish(con); /* set random seed */ gettimeofday(&tv1, NULL); --- 1324,1406 ---- exit(1); } ! if (ttype == 3) { ! PQfinish(con); ! if (process_file(filename) == false) ! exit(1); } ! else { ! /* ! * get the scaling factor that should be same as count(*) from ! * branches... ! */ ! res = PQexec(con, "select count(*) from branches"); ! if (PQresultStatus(res) != PGRES_TUPLES_OK) { fprintf(stderr, "%s", PQerrorMessage(con)); exit(1); } ! tps = atoi(PQgetvalue(res, 0, 0)); ! if (tps < 0) { ! fprintf(stderr, "count(*) from branches invalid (%d)\n", tps); exit(1); } PQclear(res); ! if (!is_no_vacuum) { ! fprintf(stderr, "starting vacuum..."); ! res = PQexec(con, "vacuum branches"); ! if (PQresultStatus(res) != PGRES_COMMAND_OK) ! { ! fprintf(stderr, "%s", PQerrorMessage(con)); ! exit(1); ! } ! PQclear(res); ! res = PQexec(con, "vacuum tellers"); ! if (PQresultStatus(res) != PGRES_COMMAND_OK) ! { ! fprintf(stderr, "%s", PQerrorMessage(con)); ! exit(1); ! } ! PQclear(res); ! res = PQexec(con, "delete from history"); ! if (PQresultStatus(res) != PGRES_COMMAND_OK) ! { ! fprintf(stderr, "%s", PQerrorMessage(con)); ! exit(1); ! } ! PQclear(res); ! res = PQexec(con, "vacuum history"); if (PQresultStatus(res) != PGRES_COMMAND_OK) { fprintf(stderr, "%s", PQerrorMessage(con)); exit(1); } PQclear(res); + fprintf(stderr, "end.\n"); + + if (is_full_vacuum) + { + fprintf(stderr, "starting full vacuum..."); + res = PQexec(con, "vacuum analyze accounts"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, "%s", PQerrorMessage(con)); + exit(1); + } + PQclear(res); + fprintf(stderr, "end.\n"); + } } + PQfinish(con); } /* set random seed */ gettimeofday(&tv1, NULL); *************** *** 965,970 **** --- 1430,1437 ---- doOne(state, i, debug, ttype); else if (ttype == 1) doSelectOnly(state, i, debug); + else if (ttype == 3) + doCustom(state, i, debug); } for (;;) *************** *** 982,997 **** FD_ZERO(&input_mask); ! maxsock = 0; for (i = 0; i < nclients; i++) { ! if (state[i].con) { int sock = PQsocket(state[i].con); if (sock < 0) { - fprintf(stderr, "Client %d: PQsocket failed\n", i); disconnect_all(state); exit(1); } --- 1449,1464 ---- FD_ZERO(&input_mask); ! maxsock = -1; for (i = 0; i < nclients; i++) { ! if (state[i].con && ! (ttype != 3 || commands[state[i].state]->type != META_COMMAND)) { int sock = PQsocket(state[i].con); if (sock < 0) { disconnect_all(state); exit(1); } *************** *** 1001,1036 **** } } ! if ((nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL, ! (fd_set *) NULL, (struct timeval *) NULL)) < 0) { ! if (errno == EINTR) ! continue; ! /* must be something wrong */ ! disconnect_all(state); ! fprintf(stderr, "select failed: %s\n", strerror(errno)); ! exit(1); ! } ! else if (nsocks == 0) ! { /* timeout */ ! fprintf(stderr, "select timeout\n"); ! for (i = 0; i < nclients; i++) { ! fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n", ! i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen); } - exit(0); } /* ok, backend returns reply */ for (i = 0; i < nclients; i++) { ! if (state[i].con && FD_ISSET(PQsocket(state[i].con), &input_mask)) { if (ttype == 0 || ttype == 2) doOne(state, i, debug, ttype); else if (ttype == 1) doSelectOnly(state, i, debug); } } } --- 1468,1510 ---- } } ! if (maxsock != -1) { ! if ((nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL, ! (fd_set *) NULL, (struct timeval *) NULL)) < 0) { ! if (errno == EINTR) ! continue; ! /* must be something wrong */ ! disconnect_all(state); ! fprintf(stderr, "select failed: %s\n", strerror(errno)); ! exit(1); ! } ! else if (nsocks == 0) ! { /* timeout */ ! fprintf(stderr, "select timeout\n"); ! for (i = 0; i < nclients; i++) ! { ! fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n", ! i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen); ! } ! exit(0); } } /* ok, backend returns reply */ for (i = 0; i < nclients; i++) { ! if (state[i].con && (FD_ISSET(PQsocket(state[i].con), &input_mask) ! || (ttype == 3 ! && commands[state[i].state]->type == META_COMMAND))) { if (ttype == 0 || ttype == 2) doOne(state, i, debug, ttype); else if (ttype == 1) doSelectOnly(state, i, debug); + else if (ttype == 3) + doCustom(state, i, debug); } } }