From d615a3e2cdc6949f4fae83a4a7a7328937f7acbf Mon Sep 17 00:00:00 2001
From: Marina Polyakova <m.polyakova@postgrespro.ru>
Date: Tue, 4 Sep 2018 19:02:32 +0300
Subject: [PATCH v11 1/4] Pgbench errors: use the RandomState structure for
 thread/client random seed.

This is most important when it is used to reset a client's random seed during
the repeating of transactions after serialization/deadlock failures.

Use the random state of the client for random functions PGBENCH_RANDOM_* during
the execution of the script. Use the random state of the each thread option (to
choose the script / get the throttle delay / to log with a sample rate) to make
all of them independent of each other and therefore deterministic at the thread
level.
---
 src/bin/pgbench/pgbench.c | 104 +++++++++++++++++++++++++++-----------
 1 file changed, 74 insertions(+), 30 deletions(-)

diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 41b756c089..988e37bce5 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -250,6 +250,14 @@ typedef struct StatsData
 	SimpleStats lag;
 } StatsData;
 
+/*
+ * Data structure for thread/client random seed.
+ */
+typedef struct
+{
+	unsigned short xseed[3];
+} RandomState;
+
 /*
  * Connection state machine states.
  */
@@ -331,6 +339,12 @@ typedef struct
 	ConnectionStateEnum state;	/* state machine's current state. */
 	ConditionalStack cstack;	/* enclosing conditionals state */
 
+	/*
+	 * Separate randomness for each client. This is used for random functions
+	 * PGBENCH_RANDOM_* during the execution of the script.
+	 */
+	RandomState random_state;
+
 	int			use_file;		/* index in sql_script for this client */
 	int			command;		/* command number in script */
 
@@ -390,7 +404,16 @@ typedef struct
 	pthread_t	thread;			/* thread handle */
 	CState	   *state;			/* array of CState */
 	int			nstate;			/* length of state[] */
-	unsigned short random_state[3]; /* separate randomness for each thread */
+
+	/*
+	 * Separate randomness for each thread. Each thread option uses its own
+	 * random state to make all of them independent of each other and therefore
+	 * deterministic at the thread level.
+	 */
+	RandomState choose_script_rs;	/* random state for selecting a script */
+	RandomState throttling_rs;	/* random state for transaction throttling */
+	RandomState sampling_rs;	/* random state for log sampling */
+
 	int64		throttle_trigger;	/* previous/next throttling (us) */
 	FILE	   *logfile;		/* where to log, or NULL */
 	ZipfCache	zipf_cache;		/* for thread-safe  zipfian random number
@@ -694,7 +717,7 @@ gotdigits:
 
 /* random number generator: uniform distribution from min to max inclusive */
 static int64
-getrand(TState *thread, int64 min, int64 max)
+getrand(RandomState *random_state, int64 min, int64 max)
 {
 	/*
 	 * Odd coding is so that min and max have approximately the same chance of
@@ -705,7 +728,7 @@ getrand(TState *thread, int64 min, int64 max)
 	 * protected by a mutex, and therefore a bottleneck on machines with many
 	 * CPUs.
 	 */
-	return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
+	return min + (int64) ((max - min + 1) * pg_erand48(random_state->xseed));
 }
 
 /*
@@ -714,7 +737,8 @@ getrand(TState *thread, int64 min, int64 max)
  * value is exp(-parameter).
  */
 static int64
-getExponentialRand(TState *thread, int64 min, int64 max, double parameter)
+getExponentialRand(RandomState *random_state, int64 min, int64 max,
+				   double parameter)
 {
 	double		cut,
 				uniform,
@@ -724,7 +748,7 @@ getExponentialRand(TState *thread, int64 min, int64 max, double parameter)
 	Assert(parameter > 0.0);
 	cut = exp(-parameter);
 	/* erand in [0, 1), uniform in (0, 1] */
-	uniform = 1.0 - pg_erand48(thread->random_state);
+	uniform = 1.0 - pg_erand48(random_state->xseed);
 
 	/*
 	 * inner expression in (cut, 1] (if parameter > 0), rand in [0, 1)
@@ -737,7 +761,8 @@ getExponentialRand(TState *thread, int64 min, int64 max, double parameter)
 
 /* random number generator: gaussian distribution from min to max inclusive */
 static int64
-getGaussianRand(TState *thread, int64 min, int64 max, double parameter)
+getGaussianRand(RandomState *random_state, int64 min, int64 max,
+				double parameter)
 {
 	double		stdev;
 	double		rand;
@@ -765,8 +790,8 @@ getGaussianRand(TState *thread, int64 min, int64 max, double parameter)
 		 * are expected in (0, 1] (see
 		 * https://en.wikipedia.org/wiki/Box-Muller_transform)
 		 */
-		double		rand1 = 1.0 - pg_erand48(thread->random_state);
-		double		rand2 = 1.0 - pg_erand48(thread->random_state);
+		double		rand1 = 1.0 - pg_erand48(random_state->xseed);
+		double		rand2 = 1.0 - pg_erand48(random_state->xseed);
 
 		/* Box-Muller basic form transform */
 		double		var_sqrt = sqrt(-2.0 * log(rand1));
@@ -793,7 +818,7 @@ getGaussianRand(TState *thread, int64 min, int64 max, double parameter)
  * will approximate a Poisson distribution centered on the given value.
  */
 static int64
-getPoissonRand(TState *thread, int64 center)
+getPoissonRand(RandomState *random_state, int64 center)
 {
 	/*
 	 * Use inverse transform sampling to generate a value > 0, such that the
@@ -802,7 +827,7 @@ getPoissonRand(TState *thread, int64 center)
 	double		uniform;
 
 	/* erand in [0, 1), uniform in (0, 1] */
-	uniform = 1.0 - pg_erand48(thread->random_state);
+	uniform = 1.0 - pg_erand48(random_state->xseed);
 
 	return (int64) (-log(uniform) * ((double) center) + 0.5);
 }
@@ -880,7 +905,7 @@ zipfFindOrCreateCacheCell(ZipfCache *cache, int64 n, double s)
  * Luc Devroye, p. 550-551, Springer 1986.
  */
 static int64
-computeIterativeZipfian(TState *thread, int64 n, double s)
+computeIterativeZipfian(RandomState *random_state, int64 n, double s)
 {
 	double		b = pow(2.0, s - 1.0);
 	double		x,
@@ -891,8 +916,8 @@ computeIterativeZipfian(TState *thread, int64 n, double s)
 	while (true)
 	{
 		/* random variates */
-		u = pg_erand48(thread->random_state);
-		v = pg_erand48(thread->random_state);
+		u = pg_erand48(random_state->xseed);
+		v = pg_erand48(random_state->xseed);
 
 		x = floor(pow(u, -1.0 / (s - 1.0)));
 
@@ -910,10 +935,11 @@ computeIterativeZipfian(TState *thread, int64 n, double s)
  * Jim Gray et al, SIGMOD 1994
  */
 static int64
-computeHarmonicZipfian(TState *thread, int64 n, double s)
+computeHarmonicZipfian(ZipfCache *zipf_cache, RandomState *random_state,
+					   int64 n, double s)
 {
-	ZipfCell   *cell = zipfFindOrCreateCacheCell(&thread->zipf_cache, n, s);
-	double		uniform = pg_erand48(thread->random_state);
+	ZipfCell   *cell = zipfFindOrCreateCacheCell(zipf_cache, n, s);
+	double		uniform = pg_erand48(random_state->xseed);
 	double		uz = uniform * cell->harmonicn;
 
 	if (uz < 1.0)
@@ -925,7 +951,8 @@ computeHarmonicZipfian(TState *thread, int64 n, double s)
 
 /* random number generator: zipfian distribution from min to max inclusive */
 static int64
-getZipfianRand(TState *thread, int64 min, int64 max, double s)
+getZipfianRand(ZipfCache *zipf_cache, RandomState *random_state, int64 min,
+			   int64 max, double s)
 {
 	int64		n = max - min + 1;
 
@@ -934,8 +961,8 @@ getZipfianRand(TState *thread, int64 min, int64 max, double s)
 
 
 	return min - 1 + ((s > 1)
-					  ? computeIterativeZipfian(thread, n, s)
-					  : computeHarmonicZipfian(thread, n, s));
+					  ? computeIterativeZipfian(random_state, n, s)
+					  : computeHarmonicZipfian(zipf_cache, random_state, n, s));
 }
 
 /*
@@ -2209,7 +2236,7 @@ evalStandardFunc(TState *thread, CState *st,
 				if (func == PGBENCH_RANDOM)
 				{
 					Assert(nargs == 2);
-					setIntValue(retval, getrand(thread, imin, imax));
+					setIntValue(retval, getrand(&st->random_state, imin, imax));
 				}
 				else			/* gaussian & exponential */
 				{
@@ -2231,7 +2258,8 @@ evalStandardFunc(TState *thread, CState *st,
 						}
 
 						setIntValue(retval,
-									getGaussianRand(thread, imin, imax, param));
+									getGaussianRand(&st->random_state, imin,
+													imax, param));
 					}
 					else if (func == PGBENCH_RANDOM_ZIPFIAN)
 					{
@@ -2243,7 +2271,9 @@ evalStandardFunc(TState *thread, CState *st,
 							return false;
 						}
 						setIntValue(retval,
-									getZipfianRand(thread, imin, imax, param));
+									getZipfianRand(&thread->zipf_cache,
+												   &st->random_state, imin,
+												   imax, param));
 					}
 					else		/* exponential */
 					{
@@ -2256,7 +2286,8 @@ evalStandardFunc(TState *thread, CState *st,
 						}
 
 						setIntValue(retval,
-									getExponentialRand(thread, imin, imax, param));
+									getExponentialRand(&st->random_state, imin,
+													   imax, param));
 					}
 				}
 
@@ -2551,7 +2582,7 @@ chooseScript(TState *thread)
 	if (num_scripts == 1)
 		return 0;
 
-	w = getrand(thread, 0, total_weight - 1);
+	w = getrand(&thread->choose_script_rs, 0, total_weight - 1);
 	do
 	{
 		w -= sql_script[i++].weight;
@@ -2745,7 +2776,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
 				 * away.
 				 */
 				Assert(throttle_delay > 0);
-				wait = getPoissonRand(thread, throttle_delay);
+				wait = getPoissonRand(&thread->throttling_rs, throttle_delay);
 
 				thread->throttle_trigger += wait;
 				st->txn_scheduled = thread->throttle_trigger;
@@ -2779,7 +2810,8 @@ doCustom(TState *thread, CState *st, StatsData *agg)
 					{
 						processXactStats(thread, st, &now, true, agg);
 						/* next rendez-vous */
-						wait = getPoissonRand(thread, throttle_delay);
+						wait = getPoissonRand(&thread->throttling_rs,
+											  throttle_delay);
 						thread->throttle_trigger += wait;
 						st->txn_scheduled = thread->throttle_trigger;
 					}
@@ -3322,7 +3354,7 @@ doLog(TState *thread, CState *st,
 	 * to the random sample.
 	 */
 	if (sample_rate != 0.0 &&
-		pg_erand48(thread->random_state) > sample_rate)
+		pg_erand48(thread->sampling_rs.xseed) > sample_rate)
 		return;
 
 	/* should we aggregate the results or not? */
@@ -4750,6 +4782,17 @@ set_random_seed(const char *seed)
 	return true;
 }
 
+/*
+ * Initialize the random state of the client/thread.
+ */
+static void
+initRandomState(RandomState *random_state)
+{
+	random_state->xseed[0] = random();
+	random_state->xseed[1] = random();
+	random_state->xseed[2] = random();
+}
+
 
 int
 main(int argc, char **argv)
@@ -5358,6 +5401,7 @@ main(int argc, char **argv)
 	for (i = 0; i < nclients; i++)
 	{
 		state[i].cstack = conditional_stack_create();
+		initRandomState(&state[i].random_state);
 	}
 
 	if (debug)
@@ -5491,9 +5535,9 @@ main(int argc, char **argv)
 		thread->state = &state[nclients_dealt];
 		thread->nstate =
 			(nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i);
-		thread->random_state[0] = random();
-		thread->random_state[1] = random();
-		thread->random_state[2] = random();
+		initRandomState(&thread->choose_script_rs);
+		initRandomState(&thread->throttling_rs);
+		initRandomState(&thread->sampling_rs);
 		thread->logfile = NULL; /* filled in later */
 		thread->latency_late = 0;
 		thread->zipf_cache.nb_cells = 0;
-- 
2.17.1

