commit dd71004519237e9449bb1902de3004a01f695365[m
Author:     Alvaro Herrera <alvherre@alvh.no-ip.org>
AuthorDate: Tue Jan 26 10:12:51 2016 -0300
CommitDate: Wed Jan 27 02:57:18 2016 +0100

    apply 13-b, fix conflicts

diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index d5f242c..305c319 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -166,10 +166,8 @@ int			agg_interval;		/* log aggregates instead of individual
 								 * transactions */
 int			progress = 0;		/* thread progress report every this seconds */
 bool		progress_timestamp = false; /* progress report with Unix time */
-int			progress_nclients = 0;		/* number of clients for progress
-										 * report */
-int			progress_nthreads = 0;		/* number of threads for progress
-										 * report */
+int			nclients = 1;		/* number of clients */
+int			nthreads = 1;		/* number of threads */
 bool		is_connect;			/* establish connection for each transaction */
 bool		is_latencies;		/* report per-command latencies */
 int			main_pid;			/* main process id used in log filename */
@@ -193,6 +191,35 @@ typedef struct
 #define SHELL_COMMAND_SIZE	256 /* maximum size allowed for shell command */
 
 /*
+ * Simple data structure to keep stats about something.
+ *
+ * XXX probably the first value should be kept and used as an offset for
+ * better numerical stability...
+ */
+typedef struct
+{
+	int64		count;			/* how many values were encountered */
+	double		min;			/* the minimum seen */
+	double		max;			/* the maximum seen */
+	double		sum;			/* sum of values */
+	double		sum2;			/* sum of squared values */
+} SimpleStats;
+
+/*
+ * Data structure to hold various statistics, used for interval statistics as
+ * well as file statistics.
+ */
+typedef struct
+{
+	long		start_time;		/* interval start time, for aggregates */
+	int64		cnt;			/* number of transactions */
+	int64		skipped;		/* number of transactions skipped under --rate
+								 * and --latency-limit */
+	SimpleStats latency;
+	SimpleStats lag;
+} StatsData;
+
+/*
  * Connection state
  */
 typedef struct
@@ -213,10 +240,8 @@ typedef struct
 	bool		prepared[MAX_SCRIPTS];	/* whether client prepared the script */
 
 	/* per client collected stats */
-	int			cnt;			/* xacts count */
+	int64		cnt;			/* transaction count */
 	int			ecnt;			/* error count */
-	int64		txn_latencies;	/* cumulated latencies */
-	int64		txn_sqlats;		/* cumulated square latencies */
 } CState;
 
 /*
@@ -228,19 +253,14 @@ typedef struct
 	pthread_t	thread;			/* thread handle */
 	CState	   *state;			/* array of CState */
 	int			nstate;			/* length of state[] */
-	instr_time	start_time;		/* thread start time */
-	instr_time *exec_elapsed;	/* time spent executing cmds (per Command) */
-	int		   *exec_count;		/* number of cmd executions (per Command) */
 	unsigned short random_state[3];		/* separate randomness for each thread */
 	int64		throttle_trigger;		/* previous/next throttling (us) */
 
 	/* per thread collected stats */
+	instr_time	start_time;		/* thread start time */
 	instr_time	conn_time;
-	int64		throttle_lag;	/* total transaction lag behind throttling */
-	int64		throttle_lag_max;		/* max transaction lag */
-	int64		throttle_latency_skipped;		/* lagging transactions
-												 * skipped */
-	int64		latency_late;	/* late transactions */
+	StatsData	stats;
+	int64		latency_late;	/* executed but late transactions */
 } TState;
 
 #define INVALID_THREAD		((pthread_t) 0)
@@ -272,33 +292,14 @@ typedef struct
 	char	   *argv[MAX_ARGS]; /* command word list */
 	int			cols[MAX_ARGS]; /* corresponding column starting from 1 */
 	PgBenchExpr *expr;			/* parsed expression */
+	SimpleStats stats;			/* time spent in this command */
 } Command;
 
-typedef struct
-{
-
-	long		start_time;		/* when does the interval start */
-	int			cnt;			/* number of transactions */
-	int			skipped;		/* number of transactions skipped under --rate
-								 * and --latency-limit */
-
-	double		min_latency;	/* min/max latencies */
-	double		max_latency;
-	double		sum_latency;	/* sum(latency), sum(latency^2) - for
-								 * estimates */
-	double		sum2_latency;
-
-	double		min_lag;
-	double		max_lag;
-	double		sum_lag;		/* sum(lag) */
-	double		sum2_lag;		/* sum(lag*lag) */
-} AggVals;
-
 static struct
 {
 	const char *name;
-	Command	 **commands;
-} sql_script[MAX_SCRIPTS];		/* SQL script files */
+	Command   **commands;
+}	sql_script[MAX_SCRIPTS];	/* SQL script files */
 static int	num_scripts;		/* number of scripts in sql_script[] */
 static int	num_commands = 0;	/* total number of Command structs */
 static int	debug = 0;			/* debug flag */
@@ -361,9 +362,8 @@ static struct
 /* Function prototypes */
 static void setalarm(int seconds);
 static void *threadRun(void *arg);
+static void doTxStats(TState *, CState *, instr_time *, bool, FILE *, StatsData *);
 
-static void doLog(TState *thread, CState *st, FILE *logfile, instr_time *now,
-	  AggVals *agg, bool skipped);
 
 static void
 usage(void)
@@ -602,6 +602,63 @@ getPoissonRand(TState *thread, int64 center)
 	return (int64) (-log(uniform) * ((double) center) + 0.5);
 }
 
+/*
+ * Initialize the given SimpleStats struct to all zeroes
+ */
+static void
+initSimpleStats(SimpleStats *ss)
+{
+	memset(ss, 0, sizeof(SimpleStats));
+}
+
+/*
+ * Accumulate one value into a SimpleStats struct.
+ */
+static void
+addToSimpleStats(SimpleStats *ss, double val)
+{
+	if (ss->count == 0 || val < ss->min)
+		ss->min = val;
+	if (ss->count == 0 || val > ss->max)
+		ss->max = val;
+	ss->count++;
+	ss->sum += val;
+	ss->sum2 += val * val;
+}
+
+/*
+ * Merge two SimpleStats objects
+ */
+static void
+mergeSimpleStats(SimpleStats *acc, SimpleStats *ss)
+{
+	if (acc->count == 0 || ss->min < acc->min)
+		acc->min = ss->min;
+	if (acc->count == 0 || ss->max > acc->max)
+		acc->max = ss->max;
+	acc->count += ss->count;
+	acc->sum += ss->sum;
+	acc->sum2 += ss->sum2;
+}
+
+/*
+ * Initialize a StatsData struct to all zeroes.
+ *
+ * If the given start_time is different from 0.0, it is used; otherwise
+ * the original value is retained.
+ */
+static void
+initStats(StatsData *sd, double start_time)
+{
+	if (start_time != 0.0)
+		sd->start_time = start_time;
+
+	sd->cnt = 0;
+	sd->skipped = 0;
+	initSimpleStats(&sd->latency);
+	initSimpleStats(&sd->lag);
+}
+
 /* call PQexec() and exit() on failure */
 static void
 executeStatement(PGconn *con, const char *sql)
@@ -1121,30 +1178,6 @@ clientDone(CState *st, bool ok)
 	return false;				/* always false */
 }
 
-static void
-agg_vals_init(AggVals *aggs, instr_time start)
-{
-	/* basic counters */
-	aggs->cnt = 0;				/* number of transactions (includes skipped) */
-	aggs->skipped = 0;			/* xacts skipped under --rate --latency-limit */
-
-	aggs->sum_latency = 0;		/* SUM(latency) */
-	aggs->sum2_latency = 0;		/* SUM(latency*latency) */
-
-	/* min and max transaction duration */
-	aggs->min_latency = 0;
-	aggs->max_latency = 0;
-
-	/* schedule lag counters */
-	aggs->sum_lag = 0;
-	aggs->sum2_lag = 0;
-	aggs->min_lag = 0;
-	aggs->max_lag = 0;
-
-	/* start of the current interval */
-	aggs->start_time = INSTR_TIME_GET_DOUBLE(start);
-}
-
 static int
 chooseScript(TState *thread)
 {
@@ -1156,7 +1189,7 @@ chooseScript(TState *thread)
 
 /* return false iff client should be disconnected */
 static bool
-doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVals *agg)
+doCustom(TState *thread, CState *st, FILE *logfile, StatsData *agg)
 {
 	PGresult   *res;
 	Command   **commands;
@@ -1210,11 +1243,8 @@ top:
 			now_us = INSTR_TIME_GET_MICROSEC(now);
 			while (thread->throttle_trigger < now_us - latency_limit)
 			{
-				thread->throttle_latency_skipped++;
-
-				if (logfile)
-					doLog(thread, st, logfile, &now, agg, true);
-
+				doTxStats(thread, st, &now, true, logfile, agg);
+				/* next rendez-vous */
 				wait = getPoissonRand(thread, throttle_delay);
 				thread->throttle_trigger += wait;
 				st->txn_scheduled = thread->throttle_trigger;
@@ -1231,28 +1261,13 @@ top:
 
 	if (st->sleeping)
 	{							/* are we sleeping? */
-		int64		now_us;
-
 		if (INSTR_TIME_IS_ZERO(now))
 			INSTR_TIME_SET_CURRENT(now);
-		now_us = INSTR_TIME_GET_MICROSEC(now);
-		if (st->txn_scheduled <= now_us)
-		{
-			/* Done sleeping, go ahead with next command */
-			st->sleeping = false;
-			if (st->throttling)
-			{
-				/* Measure lag of throttled transaction relative to target */
-				int64		lag = now_us - st->txn_scheduled;
-
-				thread->throttle_lag += lag;
-				if (lag > thread->throttle_lag_max)
-					thread->throttle_lag_max = lag;
-				st->throttling = false;
-			}
-		}
-		else
+		if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
 			return true;		/* Still sleeping, nothing to do here */
+		/* Else done sleeping, go ahead with next command */
+		st->sleeping = 0;
+		st->throttling = false;
 	}
 
 	if (st->listen)
@@ -1276,47 +1291,27 @@ top:
 		 */
 		if (is_latencies)
 		{
-			int			cnum = commands[st->state]->command_num;
-
 			if (INSTR_TIME_IS_ZERO(now))
 				INSTR_TIME_SET_CURRENT(now);
-			INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum],
-								  now, st->stmt_begin);
-			thread->exec_count[cnum]++;
+
+			/*
+			 * XXX When doing multiple threads, more than one could try to
+			 * update the stats for one command simultaneously.  Instead of
+			 * adding the overhead of a mutex, we accept the very small
+			 * probability of getting slightly wrong values.
+			 */
+			addToSimpleStats(&commands[st->state]->stats,
+							 INSTR_TIME_GET_DOUBLE(now) -
+							 INSTR_TIME_GET_DOUBLE(st->stmt_begin));
 		}
 
 		/* transaction finished: calculate latency and log the transaction */
 		if (commands[st->state + 1] == NULL)
 		{
-			/* only calculate latency if an option is used that needs it */
-			if (progress || throttle_delay || latency_limit)
-			{
-				int64		latency;
-
-				if (INSTR_TIME_IS_ZERO(now))
-					INSTR_TIME_SET_CURRENT(now);
-
-				latency = INSTR_TIME_GET_MICROSEC(now) - st->txn_scheduled;
-
-				st->txn_latencies += latency;
-
-				/*
-				 * XXX In a long benchmark run of high-latency transactions,
-				 * this int64 addition eventually overflows.  For example, 100
-				 * threads running 10s transactions will overflow it in 2.56
-				 * hours.  With a more-typical OLTP workload of .1s
-				 * transactions, overflow would take 256 hours.
-				 */
-				st->txn_sqlats += latency * latency;
-
-				/* record over the limit transactions if needed. */
-				if (latency_limit && latency > latency_limit)
-					thread->latency_late++;
-			}
-
-			/* record the time it took in the log */
-			if (logfile)
-				doLog(thread, st, logfile, &now, agg, false);
+			if (progress || throttle_delay || latency_limit || logfile)
+				doTxStats(thread, st, &now, false, logfile, agg);
+			else
+				thread->stats.cnt++;
 		}
 
 		if (commands[st->state]->type == SQL_COMMAND)
@@ -1391,7 +1386,7 @@ top:
 			return clientDone(st, false);
 		}
 		INSTR_TIME_SET_CURRENT(end);
-		INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
+		INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
 	}
 
 	/*
@@ -1734,22 +1729,41 @@ top:
 			else	/* succeeded */
 				st->listen = true;
 		}
+
+		/* after a meta command, immediately proceed with next command */
 		goto top;
 	}
 
 	return true;
 }
 
+static void
+doStats(StatsData *stats, bool skipped, double lat, double lag)
+{
+	stats->cnt++;
+
+	if (skipped)
+	{
+		/* no latency to record on skipped transactions */
+		stats->skipped++;
+	}
+	else
+	{
+		addToSimpleStats(&stats->latency, lat);
+
+		/* and possibly the same for schedule lag */
+		if (throttle_delay)
+			addToSimpleStats(&stats->lag, lag);
+	}
+}
+
 /*
  * print log entry after completing one transaction.
  */
 static void
-doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
-	  bool skipped)
+doLog(TState *thread, CState *st, FILE *logfile, instr_time *now,
+	  StatsData *agg, bool skipped, double latency, double lag)
 {
-	double		lag;
-	double		latency;
-
 	/*
 	 * Skip the log entry if sampling is enabled and this row doesn't belong
 	 * to the random sample.
@@ -1758,15 +1772,6 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
 		pg_erand48(thread->random_state) > sample_rate)
 		return;
 
-	if (INSTR_TIME_IS_ZERO(*now))
-		INSTR_TIME_SET_CURRENT(*now);
-
-	latency = (double) (INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled);
-	if (skipped)
-		lag = latency;
-	else
-		lag = (double) (INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled);
-
 	/* should we aggregate the results or not? */
 	if (agg_interval > 0)
 	{
@@ -1776,39 +1781,7 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
 		 */
 		if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(*now))
 		{
-			agg->cnt += 1;
-			if (skipped)
-			{
-				/*
-				 * there is no latency to record if the transaction was
-				 * skipped
-				 */
-				agg->skipped += 1;
-			}
-			else
-			{
-				agg->sum_latency += latency;
-				agg->sum2_latency += latency * latency;
-
-				/* first in this aggregation interval */
-				if ((agg->cnt == 1) || (latency < agg->min_latency))
-					agg->min_latency = latency;
-
-				if ((agg->cnt == 1) || (latency > agg->max_latency))
-					agg->max_latency = latency;
-
-				/* and the same for schedule lag */
-				if (throttle_delay)
-				{
-					agg->sum_lag += lag;
-					agg->sum2_lag += lag * lag;
-
-					if ((agg->cnt == 1) || (lag < agg->min_lag))
-						agg->min_lag = lag;
-					if ((agg->cnt == 1) || (lag > agg->max_lag))
-						agg->max_lag = lag;
-				}
-			}
+			doStats(agg, skipped, latency, lag);
 		}
 		else
 		{
@@ -1823,52 +1796,34 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
 				 * usage), so we don't need to handle this in a special way
 				 * (see below).
 				 */
-				fprintf(logfile, "%ld %d %.0f %.0f %.0f %.0f",
+				fprintf(logfile, "%ld " INT64_FORMAT " %.0f %.0f %.0f %.0f",
 						agg->start_time,
 						agg->cnt,
-						agg->sum_latency,
-						agg->sum2_latency,
-						agg->min_latency,
-						agg->max_latency);
+						agg->latency.sum,
+						agg->latency.sum2,
+						agg->latency.min,
+						agg->latency.max);
 				if (throttle_delay)
 				{
 					fprintf(logfile, " %.0f %.0f %.0f %.0f",
-							agg->sum_lag,
-							agg->sum2_lag,
-							agg->min_lag,
-							agg->max_lag);
+							agg->lag.sum,
+							agg->lag.sum2,
+							agg->lag.min,
+							agg->lag.max);
 					if (latency_limit)
-						fprintf(logfile, " %d", agg->skipped);
+						fprintf(logfile, " " INT64_FORMAT, agg->skipped);
 				}
 				fputc('\n', logfile);
 
-				/* move to the next inteval */
-				agg->start_time = agg->start_time + agg_interval;
+				/* move to the next interval */
+				agg->start_time += agg_interval;
 
 				/* reset for "no transaction" intervals */
-				agg->cnt = 0;
-				agg->skipped = 0;
-				agg->min_latency = 0;
-				agg->max_latency = 0;
-				agg->sum_latency = 0;
-				agg->sum2_latency = 0;
-				agg->min_lag = 0;
-				agg->max_lag = 0;
-				agg->sum_lag = 0;
-				agg->sum2_lag = 0;
+				initStats(agg, 0.0);
 			}
 
 			/* reset the values to include only the current transaction. */
-			agg->cnt = 1;
-			agg->skipped = skipped ? 1 : 0;
-			agg->min_latency = latency;
-			agg->max_latency = latency;
-			agg->sum_latency = skipped ? 0.0 : latency;
-			agg->sum2_latency = skipped ? 0.0 : latency * latency;
-			agg->min_lag = lag;
-			agg->max_lag = lag;
-			agg->sum_lag = lag;
-			agg->sum2_lag = lag * lag;
+			doStats(agg, skipped, latency, lag);
 		}
 	}
 	else
@@ -1878,21 +1833,21 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
 
 		/* This is more than we really ought to know about instr_time */
 		if (skipped)
-			fprintf(logfile, "%d %d skipped %d %ld %ld",
+			fprintf(logfile, "%d " INT64_FORMAT " skipped %d %ld %ld",
 					st->id, st->cnt, st->use_file,
 					(long) now->tv_sec, (long) now->tv_usec);
 		else
-			fprintf(logfile, "%d %d %.0f %d %ld %ld",
+			fprintf(logfile, "%d " INT64_FORMAT " %.0f %d %ld %ld",
 					st->id, st->cnt, latency, st->use_file,
 					(long) now->tv_sec, (long) now->tv_usec);
 #else
 
 		/* On Windows, instr_time doesn't provide a timestamp anyway */
 		if (skipped)
-			fprintf(logfile, "%d %d skipped %d 0 0",
+			fprintf(logfile, "%d " INT64_FORMAT " skipped %d 0 0",
 					st->id, st->cnt, st->use_file);
 		else
-			fprintf(logfile, "%d %d %.0f %d 0 0",
+			fprintf(logfile, "%d " INT64_FORMAT " %.0f %d 0 0",
 					st->id, st->cnt, latency, st->use_file);
 #endif
 		if (throttle_delay)
@@ -1901,6 +1856,42 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
 	}
 }
 
+/*
+ * end of transaction statistics
+ */
+static void
+doTxStats(TState *thread, CState *st, instr_time *now,
+		  bool skipped, FILE *logfile, StatsData *agg)
+{
+	double		latency = 0.0,
+				lag = 0.0;
+
+	if ((!skipped || agg_interval) && INSTR_TIME_IS_ZERO(*now))
+		INSTR_TIME_SET_CURRENT(*now);
+
+	if (!skipped)
+	{
+		/* compute latency & lag if needed */
+		latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled;
+		lag = INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled;
+	}
+
+	if (progress || throttle_delay || latency_limit)
+	{
+		doStats(&thread->stats, skipped, latency, lag);
+
+		/* record over the limit transactions if needed. */
+		if (latency_limit && latency > latency_limit)
+			thread->latency_late++;
+	}
+	else
+		thread->stats.cnt++;
+
+	if (use_log)
+		doLog(thread, st, logfile, now, agg, skipped, latency, lag);
+}
+
+
 /* discard connections */
 static void
 disconnect_all(CState *state, int length)
@@ -2297,6 +2288,7 @@ process_commands(char *buf, const char *source, const int lineno)
 	my_commands->command_num = num_commands++;
 	my_commands->type = 0;		/* until set */
 	my_commands->argc = 0;
+	initSimpleStats(&my_commands->stats);
 
 	if (*p == '\\')
 	{
@@ -2641,7 +2633,7 @@ process_builtin(const char *tb, const char *source)
 static void
 listAvailableScripts(void)
 {
-	int		i;
+	int			i;
 
 	fprintf(stderr, "Available builtin scripts:\n");
 	for (i = 0; i < N_BUILTIN; i++)
@@ -2689,22 +2681,29 @@ addScript(const char *name, Command **commands)
 	num_scripts++;
 }
 
+static void
+printSimpleStats(char *prefix, SimpleStats *ss)
+{
+	/* print NaN if no transactions where executed */
+	double		latency = ss->sum / ss->count;
+	double		stddev = sqrt(ss->sum2 / ss->count - latency * latency);
+
+	printf("%s average = %.3f ms\n", prefix, 0.001 * latency);
+	printf("%s stddev = %.3f ms\n", prefix, 0.001 * stddev);
+}
+
 /* print out results */
 static void
-printResults(int64 normal_xacts, int nclients,
-			 TState *threads, int nthreads,
-			 instr_time total_time, instr_time conn_total_time,
-			 int64 total_latencies, int64 total_sqlats,
-			 int64 throttle_lag, int64 throttle_lag_max,
-			 int64 throttle_latency_skipped, int64 latency_late)
+printResults(TState *threads, StatsData *total, instr_time total_time,
+			 instr_time conn_total_time, int latency_late)
 {
 	double		time_include,
 				tps_include,
 				tps_exclude;
 
 	time_include = INSTR_TIME_GET_DOUBLE(total_time);
-	tps_include = normal_xacts / time_include;
-	tps_exclude = normal_xacts / (time_include -
+	tps_include = total->cnt / time_include;
+	tps_exclude = total->cnt / (time_include -
 						(INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients));
 
 	printf("transaction type: %s\n",
@@ -2716,46 +2715,36 @@ printResults(int64 normal_xacts, int nclients,
 	if (duration <= 0)
 	{
 		printf("number of transactions per client: %d\n", nxacts);
-		printf("number of transactions actually processed: " INT64_FORMAT "/" INT64_FORMAT "\n",
-			   normal_xacts, (int64) nxacts * nclients);
+		printf("number of transactions actually processed: " INT64_FORMAT "/%d\n",
+			   total->cnt, nxacts * nclients);
 	}
 	else
 	{
 		printf("duration: %d s\n", duration);
 		printf("number of transactions actually processed: " INT64_FORMAT "\n",
-			   normal_xacts);
+			   total->cnt);
 	}
 
 	/* Remaining stats are nonsensical if we failed to execute any xacts */
-	if (normal_xacts <= 0)
+	if (total->cnt <= 0)
 		return;
 
 	if (throttle_delay && latency_limit)
 		printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
-			   throttle_latency_skipped,
-			   100.0 * throttle_latency_skipped / (throttle_latency_skipped + normal_xacts));
+			   total->skipped,
+			   100.0 * total->skipped / (total->skipped + total->cnt));
 
 	if (latency_limit)
-		printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT " (%.3f %%)\n",
+		printf("number of transactions above the %.1f ms latency limit: %d (%.3f %%)\n",
 			   latency_limit / 1000.0, latency_late,
-		   100.0 * latency_late / (throttle_latency_skipped + normal_xacts));
+			   100.0 * latency_late / (total->skipped + total->cnt));
 
 	if (throttle_delay || progress || latency_limit)
-	{
-		/* compute and show latency average and standard deviation */
-		double		latency = 0.001 * total_latencies / normal_xacts;
-		double		sqlat = (double) total_sqlats / normal_xacts;
-
-		printf("latency average: %.3f ms\n"
-			   "latency stddev: %.3f ms\n",
-			   latency, 0.001 * sqrt(sqlat - 1000000.0 * latency * latency));
-	}
+		printSimpleStats("latency", &total->latency);
 	else
-	{
 		/* only an average latency computed from the duration is available */
 		printf("latency average: %.3f ms\n",
-			   1000.0 * duration * nclients / normal_xacts);
-	}
+			   1000.0 * duration * nclients / total->cnt);
 
 	if (throttle_delay)
 	{
@@ -2766,7 +2755,7 @@ printResults(int64 normal_xacts, int nclients,
 		 * the database load, or the Poisson throttling process.
 		 */
 		printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
-			   0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max);
+			   0.001 * total->lag.sum / total->cnt, 0.001 * total->lag.max);
 	}
 
 	printf("tps = %f (including connections establishing)\n", tps_include);
@@ -2785,33 +2774,9 @@ printResults(int64 normal_xacts, int nclients,
 			printf(" - statement latencies in milliseconds:\n");
 
 			for (commands = sql_script[i].commands; *commands != NULL; commands++)
-			{
-				Command    *command = *commands;
-				int			cnum = command->command_num;
-				double		total_time;
-				instr_time	total_exec_elapsed;
-				int			total_exec_count;
-				int			t;
-
-				/* Accumulate per-thread data for command */
-				INSTR_TIME_SET_ZERO(total_exec_elapsed);
-				total_exec_count = 0;
-				for (t = 0; t < nthreads; t++)
-				{
-					TState	   *thread = &threads[t];
-
-					INSTR_TIME_ADD(total_exec_elapsed,
-								   thread->exec_elapsed[cnum]);
-					total_exec_count += thread->exec_count[cnum];
-				}
-
-				if (total_exec_count > 0)
-					total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count;
-				else
-					total_time = 0.0;
-
-				printf("\t%f\t%s\n", total_time, command->line);
-			}
+				printf("   %11.3f  %s\n",
+				   1000.0 * (*commands)->stats.sum / (*commands)->stats.count,
+					   (*commands)->line);
 		}
 	}
 }
@@ -2860,8 +2825,6 @@ main(int argc, char **argv)
 	};
 
 	int			c;
-	int			nclients = 1;	/* default number of simulated clients */
-	int			nthreads = 1;	/* default number of threads */
 	int			is_init_mode = 0;		/* initialize mode? */
 	int			is_no_vacuum = 0;		/* no vacuum at all before testing? */
 	int			do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
@@ -2878,13 +2841,8 @@ main(int argc, char **argv)
 	instr_time	start_time;		/* start up time */
 	instr_time	total_time;
 	instr_time	conn_total_time;
-	int64		total_xacts = 0;
-	int64		total_latencies = 0;
-	int64		total_sqlats = 0;
-	int64		throttle_lag = 0;
-	int64		throttle_lag_max = 0;
-	int64		throttle_latency_skipped = 0;
 	int64		latency_late = 0;
+	StatsData	stats;
 	char	   *desc;
 
 	int			i;
@@ -3071,14 +3029,14 @@ main(int argc, char **argv)
 			case 'S':
 				addScript(desc,
 						  process_builtin(findBuiltin("select-only", &desc),
-										 desc));
+										  desc));
 				benchmarking_option_set = true;
 				internal_script_used = true;
 				break;
 			case 'N':
 				addScript(desc,
 						  process_builtin(findBuiltin("simple-update", &desc),
-										 desc));
+										  desc));
 				benchmarking_option_set = true;
 				internal_script_used = true;
 				break;
@@ -3311,8 +3269,6 @@ main(int argc, char **argv)
 	 * changed after fork.
 	 */
 	main_pid = (int) getpid();
-	progress_nclients = nclients;
-	progress_nthreads = nthreads;
 
 	if (nclients > 1)
 	{
@@ -3454,32 +3410,10 @@ main(int argc, char **argv)
 		thread->random_state[0] = random();
 		thread->random_state[1] = random();
 		thread->random_state[2] = random();
-		thread->throttle_latency_skipped = 0;
 		thread->latency_late = 0;
+		initStats(&thread->stats, 0.0);
 
 		nclients_dealt += thread->nstate;
-
-		if (is_latencies)
-		{
-			/* Reserve memory for the thread to store per-command latencies */
-			int			t;
-
-			thread->exec_elapsed = (instr_time *)
-				pg_malloc(sizeof(instr_time) * num_commands);
-			thread->exec_count = (int *)
-				pg_malloc(sizeof(int) * num_commands);
-
-			for (t = 0; t < num_commands; t++)
-			{
-				INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]);
-				thread->exec_count[t] = 0;
-			}
-		}
-		else
-		{
-			thread->exec_elapsed = NULL;
-			thread->exec_count = NULL;
-		}
 	}
 
 	/* all clients must be assigned to a thread */
@@ -3522,11 +3456,11 @@ main(int argc, char **argv)
 #endif   /* ENABLE_THREAD_SAFETY */
 
 	/* wait for threads and accumulate results */
+	initStats(&stats, 0.0);
 	INSTR_TIME_SET_ZERO(conn_total_time);
 	for (i = 0; i < nthreads; i++)
 	{
 		TState	   *thread = &threads[i];
-		int			j;
 
 #ifdef ENABLE_THREAD_SAFETY
 		if (threads[i].thread == INVALID_THREAD)
@@ -3539,21 +3473,13 @@ main(int argc, char **argv)
 		(void) threadRun(thread);
 #endif   /* ENABLE_THREAD_SAFETY */
 
-		/* thread level stats */
-		throttle_lag += thread->throttle_lag;
-		throttle_latency_skipped += threads->throttle_latency_skipped;
+		/* aggregate thread level stats */
+		mergeSimpleStats(&stats.latency, &thread->stats.latency);
+		mergeSimpleStats(&stats.lag, &thread->stats.lag);
+		stats.cnt += thread->stats.cnt;
+		stats.skipped += thread->stats.skipped;
 		latency_late += thread->latency_late;
-		if (throttle_lag_max > thread->throttle_lag_max)
-			throttle_lag_max = thread->throttle_lag_max;
 		INSTR_TIME_ADD(conn_total_time, thread->conn_time);
-
-		/* client-level stats */
-		for (j = 0; j < thread->nstate; j++)
-		{
-			total_xacts += thread->state[j].cnt;
-			total_latencies += thread->state[j].txn_latencies;
-			total_sqlats += thread->state[j].txn_sqlats;
-		}
 	}
 	disconnect_all(state, nclients);
 
@@ -3569,10 +3495,7 @@ main(int argc, char **argv)
 	 */
 	INSTR_TIME_SET_CURRENT(total_time);
 	INSTR_TIME_SUBTRACT(total_time, start_time);
-	printResults(total_xacts, nclients, threads, nthreads,
-				 total_time, conn_total_time, total_latencies, total_sqlats,
-				 throttle_lag, throttle_lag_max, throttle_latency_skipped,
-				 latency_late);
+	printResults(threads, &stats, total_time, conn_total_time, latency_late);
 
 	return 0;
 }
@@ -3593,13 +3516,8 @@ threadRun(void *arg)
 	int64		thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
 	int64		last_report = thread_start;
 	int64		next_report = last_report + (int64) progress * 1000000;
-	int64		last_count = 0,
-				last_lats = 0,
-				last_sqlats = 0,
-				last_lags = 0,
-				last_skipped = 0;
-
-	AggVals		aggs;
+	StatsData	last,
+				aggs;
 
 	/*
 	 * Initialize throttling rate target for all of the thread's clients.  It
@@ -3609,8 +3527,6 @@ threadRun(void *arg)
 	 */
 	INSTR_TIME_SET_CURRENT(start);
 	thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
-	thread->throttle_lag = 0;
-	thread->throttle_lag_max = 0;
 
 	INSTR_TIME_SET_ZERO(thread->conn_time);
 
@@ -3647,7 +3563,8 @@ threadRun(void *arg)
 	INSTR_TIME_SET_CURRENT(thread->conn_time);
 	INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
 
-	agg_vals_init(&aggs, thread->start_time);
+	initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time));
+	last = aggs;
 
 	/* send start up queries in async manner */
 	for (i = 0; i < nstate; i++)
@@ -3661,7 +3578,7 @@ threadRun(void *arg)
 		if (debug)
 			fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
 					sql_script[st->use_file].name);
-		if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
+		if (!doCustom(thread, st, logfile, &aggs))
 			remains--;			/* I've aborted */
 
 		if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
@@ -3800,7 +3717,7 @@ threadRun(void *arg)
 			if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
 							|| commands[st->state]->type == META_COMMAND))
 			{
-				if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
+				if (!doCustom(thread, st, logfile, &aggs))
 					remains--;	/* I've aborted */
 			}
 
@@ -3825,11 +3742,7 @@ threadRun(void *arg)
 			if (now >= next_report)
 			{
 				/* generate and show report */
-				int64		count = 0,
-							lats = 0,
-							sqlats = 0,
-							lags = 0,
-							skipped = 0;
+				StatsData	cur;
 				int64		run = now - last_report;
 				double		tps,
 							total_run,
@@ -3850,25 +3763,24 @@ threadRun(void *arg)
 				 * (If a read from a 64-bit integer is not atomic, you might
 				 * get a "torn" read and completely bogus latencies though!)
 				 */
-				for (i = 0; i < progress_nclients; i++)
+				initStats(&cur, 0.0);
+				for (i = 0; i < nthreads; i++)
 				{
-					count += state[i].cnt;
-					lats += state[i].txn_latencies;
-					sqlats += state[i].txn_sqlats;
-				}
-
-				for (i = 0; i < progress_nthreads; i++)
-				{
-					skipped += thread[i].throttle_latency_skipped;
-					lags += thread[i].throttle_lag;
+					mergeSimpleStats(&cur.latency, &thread[i].stats.latency);
+					mergeSimpleStats(&cur.lag, &thread[i].stats.lag);
+					cur.cnt += thread[i].stats.cnt;
+					cur.skipped += thread[i].stats.skipped;
 				}
 
 				total_run = (now - thread_start) / 1000000.0;
-				tps = 1000000.0 * (count - last_count) / run;
-				latency = 0.001 * (lats - last_lats) / (count - last_count);
-				sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
+				tps = 1000000.0 * (cur.cnt - last.cnt) / run;
+				latency = 0.001 * (cur.latency.sum - last.latency.sum) /
+					(cur.cnt - last.cnt);
+				sqlat = 1.0 * (cur.latency.sum2 - last.latency.sum2)
+					/ (cur.cnt - last.cnt);
 				stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
-				lag = 0.001 * (lags - last_lags) / (count - last_count);
+				lag = 0.001 * (cur.lag.sum - last.lag.sum) /
+					(cur.cnt - last.cnt);
 
 				if (progress_timestamp)
 					sprintf(tbuf, "%.03f s",
@@ -3885,16 +3797,12 @@ threadRun(void *arg)
 					fprintf(stderr, ", lag %.3f ms", lag);
 					if (latency_limit)
 						fprintf(stderr, ", " INT64_FORMAT " skipped",
-								skipped - last_skipped);
+								cur.skipped - last.skipped);
 				}
 				fprintf(stderr, "\n");
 
-				last_count = count;
-				last_lats = lats;
-				last_sqlats = sqlats;
-				last_lags = lags;
+				last = cur;
 				last_report = now;
-				last_skipped = skipped;
 
 				/*
 				 * Ensure that the next report is in the future, in case
