From 043b92f519630cb1fd7c84c1461e6ab420f5d7eb Mon Sep 17 00:00:00 2001
From: Peter Geoghegan <peter@peter.laptop>
Date: Mon, 26 Sep 2011 03:02:19 +0100
Subject: [PATCH] Added various optimizations to tuplesort quicksort.

We now take advantage of optimizations from knowing which comparator to
use at compile time, as well as from avoiding using the general SQL
callable function machinery for comparators
---
 src/backend/utils/sort/tuplesort.c     |   55 +++++++-
 src/include/utils/template_qsort_arg.h |  217 ++++++++++++++++++++++++++++++++
 2 files changed, 266 insertions(+), 6 deletions(-)
 create mode 100644 src/include/utils/template_qsort_arg.h

diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index 3505236..b006c3e 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -112,6 +112,7 @@
 #include "utils/memutils.h"
 #include "utils/pg_rusage.h"
 #include "utils/rel.h"
+#include "utils/template_qsort_arg.h"
 #include "utils/tuplesort.h"
 
 
@@ -1225,7 +1226,27 @@ puttuple_common(Tuplesortstate *state, SortTuple *tuple)
 }
 
 /*
- * All tuples have been provided; finish the sort.
+ * Manufacture type specific sorting specialisations
+ * with inline comparators
+ */
+static inline int
+compare_int4(Datum first, Datum second)
+{
+	int32		first_int = DatumGetInt32(first);
+	int32		second_int = DatumGetInt32(second);
+
+	if (first_int > second_int)
+		return 1;
+	else if (first_int == second_int)
+		return 0;
+	else
+		return -1;
+}
+
+TEMPLATE_QSORT_ARG(int4, compare_int4);
+
+/*
+ * All tuples have been provided; perform the sort.
  */
 void
 tuplesort_performsort(Tuplesortstate *state)
@@ -1247,11 +1268,30 @@ tuplesort_performsort(Tuplesortstate *state)
 			 * amount of memory.  Just qsort 'em and we're done.
 			 */
 			if (state->memtupcount > 1)
-				qsort_arg((void *) state->memtuples,
-						  state->memtupcount,
-						  sizeof(SortTuple),
-						  (qsort_arg_comparator) state->comparetup,
-						  (void *) state);
+			{
+				/*
+				 * Choose between various type-specific qsort variants.
+				 * Do so to avail of per-type compiler optimizations, in
+				 * particular, inlining of comparators.
+				 */
+
+				if (state->scanKeys && state->scanKeys->sk_func.fn_oid == 351 &&
+					state->comparetup == &comparetup_heap)
+					int4_qsort_arg((void *) state->memtuples,
+								  state->memtupcount,
+								  sizeof(SortTuple),
+								  (void *) state);
+				else
+					/*
+					 * Finally, fall back on type-neutral qsort with
+					 * function-pointer comparator
+					 */
+					qsort_arg((void *) state->memtuples,
+								  state->memtupcount,
+								  sizeof(SortTuple),
+								  (qsort_arg_comparator) state->comparetup,
+								  (void *) state);
+			}
 			state->current = 0;
 			state->eof_reached = false;
 			state->markpos_offset = 0;
@@ -2657,6 +2697,9 @@ myFunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
  * and return a 3-way comparison result.  This takes care of handling
  * reverse-sort and NULLs-ordering properly.  We assume that DESC and
  * NULLS_FIRST options are encoded in sk_flags the same way btree does it.
+ *
+ * Note that this logic is more-or-less duplicated in template_qsort_arg.h,
+ * and "instantiated" per-type there.
  */
 static inline int32
 inlineApplySortFunction(FmgrInfo *sortFunction, int sk_flags, Oid collation,
diff --git a/src/include/utils/template_qsort_arg.h b/src/include/utils/template_qsort_arg.h
new file mode 100644
index 0000000..41f8237
--- /dev/null
+++ b/src/include/utils/template_qsort_arg.h
@@ -0,0 +1,217 @@
+/*-------------------------------------------------------------------------
+ *	template_qsort_arg.h: "template" version of qsort_arg.c
+ *
+ *	This version of qsort_arg is exclusively used within tuplesort.c to
+ *	more efficiently sort common types such as integers and floats. In 
+ *	providing this version, we seek to take advantage of compile-time 
+ *	optimisations for specific types, in particular, the inlining of 
+ *	comparators, as well as reducing the overhead of comparisons that
+ *	the general SQL-callable-function API imposes.
+ *
+ *	It is assumed that any types that use this infrastructure have at 
+ *	most one element in their ScanKeys array.
+ *
+ *	CAUTION: if you change this file, see also qsort_arg.c as well as 
+ *	qsort.c. qsort_arg.c should be considered authoratative.
+ *
+ *	Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *	Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *	src/include/utils/template_qsort_arg.h
+ *-------------------------------------------------------------------------
+ */
+
+#include "c.h"
+
+#define swapcode(TYPE, parmi, parmj, n) 	\
+do {						\
+	size_t i = (n) / sizeof (TYPE);		\
+	TYPE *pi = (TYPE *)(void *)(parmi);	\
+	TYPE *pj = (TYPE *)(void *)(parmj);	\
+	do {					\
+		TYPE	t = *pi;		\
+		*pi++ = *pj;			\
+		*pj++ = t;			\
+		} while (--i > 0);		\
+} while (0)
+
+#define SWAPINIT(a, es) swaptype = ((char *)(a) - (char *)0) % sizeof(long) || \
+	(es) % sizeof(long) ? 2 : (es) == sizeof(long)? 0 : 1;
+
+#define thistype_vecswap(TYPE, a, b, n) if ((n) > 0) TYPE##_swapfunc((a), (b), (size_t)(n), swaptype) 	
+
+#define thistype_swap(TYPE, a, b)				\
+if (swaptype == 0) {						\
+		long t = *(long *)(void *)(a);			\
+		*(long *)(void *)(a) = *(long *)(void *)(b);	\
+		*(long *)(void *)(b) = t;			\
+	} else							\
+		TYPE##_swapfunc(a, b, es, swaptype)	
+
+/* 
+ * This macro manufactures a type-specific implementation of qsort_arg with 
+ * the comparator, COMPAR, known at compile time. COMPAR is typically an
+ * inline function.
+ *
+ * COMPAR should take as its arguments two Datums, and return an int, in
+ * line with standard qsort convention.
+ * 
+ * We have void* parameters for TYPE##AppSort just to shut up the compiler. 
+ * They could be SortTuple pointers instead, but that would make it more 
+ * difficult to keep template_qsort_arg.h consistent with qsort_arg.c. 		
+ *
+ */
+
+#define TEMPLATE_QSORT_ARG(TYPE, COMPAR)				\
+void TYPE##_qsort_arg(void *a, size_t n, size_t es, void *arg);		\
+												\
+static inline int32										\
+TYPE##AppSort(const void *a, const void *b, Tuplesortstate *state)				\
+{												\
+	int32		compare;								\
+	const SortTuple* aT = a;								\
+	const SortTuple* bT = b;								\
+	/* Allow interrupting long sorts */							\
+	CHECK_FOR_INTERRUPTS();									\
+												\
+	Assert(state->nKeys == 1);								\
+	Assert(state->scanKeys);								\
+												\
+	if ( aT->isnull1)									\
+	{											\
+		if (bT->isnull1)								\
+			compare = 0;		/* NULL "=" NULL */				\
+		else if (state->scanKeys->sk_flags & SK_BT_NULLS_FIRST)				\
+			compare = -1;		/* NULL "<" NOT_NULL */				\
+		else										\
+			compare = 1;		/* NULL ">" NOT_NULL */				\
+	}											\
+	else if (bT->isnull1)									\
+	{											\
+		if (state->scanKeys->sk_flags & SK_BT_NULLS_FIRST)				\
+			compare = 1;		/* NOT_NULL ">" NULL */				\
+		else										\
+			compare = -1;		/* NOT_NULL "<" NULL */				\
+	}											\
+	else											\
+	{											\
+		compare = COMPAR(aT->datum1, bT->datum1);					\
+												\
+		if (state->scanKeys->sk_flags & SK_BT_DESC)					\
+			compare = -compare;							\
+	}											\
+												\
+	return compare;										\
+}												\
+									\
+static inline void							\
+TYPE##_swapfunc(char *a, char *b, size_t n, int swaptype)		\
+{									\
+	if (swaptype <= 1)						\
+		swapcode(long, a, b, n); 				\
+	else								\
+		swapcode(char, a, b, n);				\
+}									\
+									\
+static inline char *										\
+TYPE##_med3(char *a, char *b, char *c, void *arg)						\
+{												\
+	return TYPE##AppSort(a, b, arg) < 0 ?							\
+		(TYPE##AppSort(b, c, arg) < 0 ? b : (TYPE##AppSort(a, c, arg) < 0 ? c : a))	\
+		: (TYPE##AppSort(b, c, arg) > 0 ? b : (TYPE##AppSort(a, c, arg) < 0 ? a : c));	\
+}												\
+												\
+void 												\
+TYPE##_qsort_arg(void *a, size_t n, size_t es, void *arg)					\
+{												\
+	char	   *pa,										\
+			   *pb,									\
+			   *pc,									\
+			   *pd,									\
+			   *pl,									\
+			   *pm,									\
+			   *pn;									\
+	int			d,								\
+				r,								\
+				swaptype,							\
+				presorted;							\
+												\
+loop:SWAPINIT(a, es); 										\
+	if (n < 7)										\
+	{											\
+		for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)			\
+			for (pl = pm; pl > (char *) a && TYPE##AppSort(pl - es, pl, arg) > 0;	\
+				 pl -= es)							\
+				thistype_swap(TYPE, pl, pl - es);				\
+		return;										\
+	}											\
+	presorted = 1;										\
+	for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)				\
+	{											\
+		if (TYPE##AppSort(pm - es, pm, arg) > 0)					\
+		{										\
+			presorted = 0;								\
+			break;									\
+		}										\
+	}											\
+	if (presorted)										\
+		return;										\
+	pm = (char *) a + (n / 2) * es;								\
+	if (n > 7)										\
+	{											\
+		pl = (char *) a;								\
+		pn = (char *) a + (n - 1) * es;							\
+		if (n > 40)									\
+		{										\
+			d = (n / 8) * es;							\
+			pl = TYPE##_med3(pl, pl + d, pl + 2 * d, arg);				\
+			pm = TYPE##_med3(pm - d, pm, pm + d, arg);				\
+			pn = TYPE##_med3(pn - 2 * d, pn - d, pn, arg);				\
+		}										\
+		pm = TYPE##_med3(pl, pm, pn, arg);						\
+	}											\
+	thistype_swap(TYPE, a, pm);								\
+	pa = pb = (char *) a + es;								\
+	pc = pd = (char *) a + (n - 1) * es;							\
+	for (;;)										\
+	{											\
+		while (pb <= pc && (r = TYPE##AppSort(pb, a, arg)) <= 0)			\
+		{										\
+			if (r == 0)								\
+			{									\
+				thistype_swap(TYPE, pa, pb);					\
+				pa += es;							\
+			}									\
+			pb += es;								\
+		}										\
+		while (pb <= pc && (r = TYPE##AppSort(pc, a, arg)) >= 0)			\
+		{										\
+			if (r == 0)								\
+			{									\
+				thistype_swap(TYPE, pc, pd);					\
+				pd -= es;							\
+			}									\
+			pc -= es;								\
+		}										\
+		if (pb > pc)									\
+			break;									\
+		thistype_swap(TYPE, pb, pc);							\
+		pb += es;									\
+		pc -= es;									\
+	}											\
+	pn = (char *) a + n * es;								\
+	r = Min(pa - (char *) a, pb - pa);							\
+	thistype_vecswap(TYPE, a, pb - r, r);							\
+	r = Min(pd - pc, pn - pd - es);								\
+	thistype_vecswap(TYPE, pb, pn - r, r);							\
+	if ((r = pb - pa) > es)									\
+		TYPE##_qsort_arg(a, r / es, es, arg);						\
+	if ((r = pd - pc) > es)									\
+	{											\
+		/* Iterate rather than recurse to save stack space */				\
+		a = pn - r;									\
+		n = r / es;									\
+		goto loop;									\
+	}											\
+}												\
+
-- 
1.7.6
