From 91f9f5351e8842940714dcce6fdcb12a995fc7c1 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Wed, 7 Jul 2021 16:39:30 +1200
Subject: [PATCH v15] Fix bugs in pgbench log output.

Commit 547f04e switched from using instr_time to pg_time_usec_t, but
introduced a couple of bugs:

1.  The log previously included the current Unix epoch time, but it was
inadvertantly changed to use a system-dependent epoch.  Restore the Unix
epoch-based reporting, and make sure to begin aggregates on wall clock
second boundaries as before.

2.  The aggregation interval needed to be scaled appropriately for the
new accounting unit to avoid logging bogus empty aggregates.  Also force
a final partial aggregate to be logged.

Also add a simple new TAP tests to verify the logged output format.

Back-patch to 14.

Author: Fabien COELHO <coelho@cri.ensmp.fr>
Reviewed-by: Michael Paquier <michael@paquier.xyz>
Reviewed-by: Yugo NAGATA <nagata@sraoss.co.jp>
Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org>
Reported-by: YoungHwan Joo <rulyox@gmail.com>
Reported-by: Gregory Smith <gregsmithpgsql@gmail.com>
Discussion: https://postgr.es/m/CAF7igB1r6wRfSCEAB-iZBKxkowWY6%2BdFF2jObSdd9%2BiVK%2BvHJg%40mail.gmail.com
Discussion: https://postgr.es/m/CAHLJuCW_8Vpcr0%3Dt6O_gozrg3wXXWXZXDioYJd3NhvKriqgpfQ%40mail.gmail.com
---
 src/bin/pgbench/pgbench.c                    | 103 +++++++++++++------
 src/bin/pgbench/t/001_pgbench_with_server.pl |  40 ++++++-
 2 files changed, 112 insertions(+), 31 deletions(-)

diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 4aeccd93af..3846ef8f40 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -343,6 +343,12 @@ typedef struct StatsData
 	SimpleStats lag;
 } StatsData;
 
+/*
+ * For displaying Unix epoch timestamps, as some time functions may have
+ * another reference.
+ */
+pg_time_usec_t epoch_shift;
+
 /*
  * Struct to keep random state.
  */
@@ -649,7 +655,7 @@ static void setDoubleValue(PgBenchValue *pv, double dval);
 static bool evaluateExpr(CState *st, PgBenchExpr *expr,
 						 PgBenchValue *retval);
 static ConnectionStateEnum executeMetaCommand(CState *st, pg_time_usec_t *now);
-static void doLog(TState *thread, CState *st,
+static void doLog(TState *thread, CState *st, bool final,
 				  StatsData *agg, bool skipped, double latency, double lag);
 static void processXactStats(TState *thread, CState *st, pg_time_usec_t *now,
 							 bool skipped, StatsData *agg);
@@ -3768,16 +3774,46 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
 	return CSTATE_END_COMMAND;
 }
 
+/* print aggregated report to logfile */
+static void
+logAgg(FILE *logfile, StatsData *agg)
+{
+	fprintf(logfile, INT64_FORMAT " " INT64_FORMAT " %.0f %.0f %.0f %.0f",
+			(agg->start_time + epoch_shift) / 1000000,
+			agg->cnt,
+			agg->latency.sum,
+			agg->latency.sum2,
+			agg->latency.min,
+			agg->latency.max);
+	if (throttle_delay)
+	{
+		fprintf(logfile, " %.0f %.0f %.0f %.0f",
+				agg->lag.sum,
+				agg->lag.sum2,
+				agg->lag.min,
+				agg->lag.max);
+		if (latency_limit)
+			fprintf(logfile, " " INT64_FORMAT, agg->skipped);
+	}
+	fputc('\n', logfile);
+}
+
 /*
  * Print log entry after completing one transaction.
  *
+ * The function behavior changes depending on sample_rate (a fraction of
+ * transaction is reported) and agg_interval (transactions are aggregated
+ * and reported once every agg_interval seconds).  For aggregation, "final"
+ * indicates that no transaction has occurred, but the final aggregate
+ * should be logged.
+ *
  * We print Unix-epoch timestamps in the log, so that entries can be
  * correlated against other logs.  On some platforms this could be obtained
  * from the caller, but rather than get entangled with that, we just eat
  * the cost of an extra syscall in all cases.
  */
 static void
-doLog(TState *thread, CState *st,
+doLog(TState *thread, CState *st, bool final,
 	  StatsData *agg, bool skipped, double latency, double lag)
 {
 	FILE	   *logfile = thread->logfile;
@@ -3796,43 +3832,36 @@ doLog(TState *thread, CState *st,
 	/* should we aggregate the results or not? */
 	if (agg_interval > 0)
 	{
+		pg_time_usec_t	next;
+
 		/*
 		 * Loop until we reach the interval of the current moment, and print
 		 * any empty intervals in between (this may happen with very low tps,
 		 * e.g. --rate=0.1).
 		 */
-
-		while (agg->start_time + agg_interval <= now)
+		while ((next = agg->start_time + agg_interval * INT64CONST(1000000)) <= now)
 		{
-			/* print aggregated report to logfile */
-			fprintf(logfile, INT64_FORMAT " " INT64_FORMAT " %.0f %.0f %.0f %.0f",
-					agg->start_time,
-					agg->cnt,
-					agg->latency.sum,
-					agg->latency.sum2,
-					agg->latency.min,
-					agg->latency.max);
-			if (throttle_delay)
-			{
-				fprintf(logfile, " %.0f %.0f %.0f %.0f",
-						agg->lag.sum,
-						agg->lag.sum2,
-						agg->lag.min,
-						agg->lag.max);
-				if (latency_limit)
-					fprintf(logfile, " " INT64_FORMAT, agg->skipped);
-			}
-			fputc('\n', logfile);
+			logAgg(logfile, agg);
 
 			/* reset data and move to next interval */
-			initStats(agg, agg->start_time + agg_interval);
+			initStats(agg, next);
 		}
 
-		/* accumulate the current transaction */
-		accumStats(agg, skipped, latency, lag);
+		if (!final)
+			/* accumulate the current transaction */
+			accumStats(agg, skipped, latency, lag);
+		else if (agg->cnt > 0)
+			/* final call to show the last aggregate */
+			logAgg(logfile, agg);
 	}
 	else
 	{
+		/* switch to Unix epoch */
+		now += epoch_shift;
+
+		/* final only used for aggregated data */
+		Assert(!final);
+
 		/* no, print raw transactions */
 		if (skipped)
 			fprintf(logfile, "%d " INT64_FORMAT " skipped %d " INT64_FORMAT " "
@@ -3892,7 +3921,7 @@ processXactStats(TState *thread, CState *st, pg_time_usec_t *now,
 	st->cnt++;
 
 	if (use_log)
-		doLog(thread, st, agg, skipped, latency, lag);
+		doLog(thread, st, false, agg, skipped, latency, lag);
 
 	/* XXX could use a mutex here, but we choose not to */
 	if (per_script_stats)
@@ -5458,7 +5487,8 @@ printProgressReport(TState *threads, int64 test_start, pg_time_usec_t now,
 
 	if (progress_timestamp)
 	{
-		snprintf(tbuf, sizeof(tbuf), "%.3f s", PG_TIME_GET_DOUBLE(now));
+		snprintf(tbuf, sizeof(tbuf), "%.3f s",
+				 PG_TIME_GET_DOUBLE(now + epoch_shift));
 	}
 	else
 	{
@@ -5808,6 +5838,11 @@ main(int argc, char **argv)
 	char	   *env;
 
 	int			exit_code = 0;
+	struct timeval tv;
+
+	/* record shift between Unix epoch and now() */
+	gettimeofday(&tv, NULL);
+	epoch_shift = tv.tv_sec * INT64CONST(1000000) + tv.tv_usec - pg_time_now();
 
 	pg_logging_init(argv[0]);
 	progname = get_progname(argv[0]);
@@ -6637,7 +6672,14 @@ threadRun(void *arg)
 	thread->bench_start = start;
 	thread->throttle_trigger = start;
 
-	initStats(&aggs, start);
+	/*
+	 * The log format currently has Unix epoch timestamps with whole numbers of
+	 * seconds.  Round the first aggregate's start time down to the nearest
+	 * Unix epoch second (the very first aggregate might really have started a
+	 * fraction of a second later, but later aggregates are measured from the
+	 * whole number time that is actually logged).
+	 */
+	initStats(&aggs, start - (start + epoch_shift) % 1000000);
 	last = aggs;
 
 	/* loop till all clients have terminated */
@@ -6830,8 +6872,9 @@ done:
 		if (agg_interval > 0)
 		{
 			/* log aggregated but not yet reported transactions */
-			doLog(thread, state, &aggs, false, 0, 0);
+			doLog(thread, state, true, &aggs, false, 0.0, 0.0);
 		}
+
 		fclose(thread->logfile);
 		thread->logfile = NULL;
 	}
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl
index 3aa9d5d753..70bd097b80 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -1187,7 +1187,7 @@ sub check_pgbench_logs
 
 	# $prefix is simple enough, thus does not need escaping
 	my @logs = list_files($dir, qr{^$prefix\..*$});
-	ok(@logs == $nb, "number of log files");
+	ok(@logs == $nb, "number of log files (@logs)");
 	ok(grep(/\/$prefix\.\d+(\.\d+)?$/, @logs) == $nb, "file name format");
 
 	my $log_number = 0;
@@ -1220,6 +1220,44 @@ sub check_pgbench_logs
 my $bdir = $node->basedir;
 
 # Run with sampling rate, 2 clients with 50 transactions each.
+#
+# Test time-sensitive features on a light read-only transaction:
+#
+#   -T: benchmark duration, 2 seconds to exercise progress & logs
+#   -P: progress report
+#   --aggregate-interval: periodic aggregated logs
+#   --rate: schedule load
+#   --latency-limit: max delay, not exercised much
+#
+# note: the --rate behavior is probabilistic in nature.
+# note: --progress-timestamp is not tested.
+pgbench(
+	'-T 2 -P 1 -l --aggregate-interval=1 -S -b se@2'
+	. ' --rate=20 --latency-limit=1000 -j ' . $nthreads
+	. ' -c 3 -r',
+	0,
+	[   qr{type: multiple},
+		qr{clients: 3},
+		qr{threads: $nthreads},
+		qr{duration: 2 s},
+		qr{script 1: .* select only},
+		qr{script 2: .* select only},
+		qr{statement latencies in milliseconds},
+		qr{FROM pgbench_accounts} ],
+	[ qr{vacuum}, qr{progress: 1\b} ],
+	'pgbench progress', undef,
+	"--log-prefix=$bdir/001_pgbench_log_1");
+
+# $nthreads threads, 2 seconds, but due to timing imprecision we might get
+# only 1 or as many as 3 progress reports per thread.
+# aggregate log format is:
+#   epoch #tx sum sum2 min max [sum sum2 min max [skipped]]
+# first series about latency; second about lag (--rate);
+# skipped only if --latency-limit is set.
+check_pgbench_logs($bdir, '001_pgbench_log_1', $nthreads, 1, 3,
+	qr{^\d{10,} \d{1,2} \d+ \d+ \d+ \d+ \d+ \d+ \d+ \d+ \d+$});
+
+# with sampling rate, 2 clients with 50 tx each
 pgbench(
 	"-n -S -t 50 -c 2 --log --sampling-rate=0.5", 0,
 	[ qr{select only}, qr{processed: 100/100} ], [qr{^$}],
-- 
2.30.2

