From f8ed235dbb60a79b4f59dc8a4af014b2ca698772 Mon Sep 17 00:00:00 2001
From: Sokolov Yura aka funny_falcon <funny.falcon@gmail.com>
Date: Sun, 14 May 2017 20:57:00 +0300
Subject: [PATCH] storage/page/bufpage.c: improve compactify_tuples

Items passed to compactify_tuples are almost sorted. So call to general
qsort makes unnecessary overhead on function calls (swapfunc, med,
itemoffcompare).

This patch add one pass of prefix sort + insertion sort for large array
of items, and shell sort for smaller arrays.

Insertion and Shell sort are implemented using macroses.

This approach allows to save 3% of cpu in degenerate case
(highly intensive HOT random updates on unlogged table with
 synchronized_commit=off).
---
 src/backend/storage/page/bufpage.c | 42 +++++++++++++++++++++++++--
 src/include/c.h                    | 59 ++++++++++++++++++++++++++++++++++++++
 2 files changed, 98 insertions(+), 3 deletions(-)

diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c
index fdf045a..b7c6392 100644
--- a/src/backend/storage/page/bufpage.c
+++ b/src/backend/storage/page/bufpage.c
@@ -433,6 +433,36 @@ itemoffcompare(const void *itemidp1, const void *itemidp2)
 		((itemIdSort) itemidp1)->itemoff;
 }
 
+#define NSPLIT 256
+#define PREFDIV (BLCKSZ / NSPLIT)
+/* one pass of prefix sort for high 8 bits of itemoff.*/
+static void
+prefix_presort(itemIdSort itemidbase, int nitems)
+{
+	itemIdSortData copy[MaxIndexTuplesPerPage];
+	int	count[NSPLIT+1] = { 0 };
+	int i, pos, highbits;
+
+	Assert(nitems <= MaxIndexTuplesPerPage);
+	for (i = 0; i < nitems; i++)
+	{
+		highbits = itemidbase[i].itemoff / PREFDIV;
+		count[highbits]++;
+	}
+	/* sort in decreasing order */
+	for (i = NSPLIT-1; i != 0; i--)
+		count[i-1] += count[i];
+	Assert(count[0] == nitems);
+	for (i = 0; i < nitems; i++)
+	{
+		highbits = itemidbase[i].itemoff / PREFDIV;
+		pos = count[highbits+1]++;
+		copy[pos] = itemidbase[i];
+	}
+	Assert(count[1] == nitems);
+	memcpy(itemidbase, copy, sizeof(itemIdSortData) * nitems);
+}
+
 /*
  * After removing or marking some line pointers unused, move the tuples to
  * remove the gaps caused by the removed items.
@@ -444,9 +474,15 @@ compactify_tuples(itemIdSort itemidbase, int nitems, Page page)
 	Offset		upper;
 	int			i;
 
-	/* sort itemIdSortData array into decreasing itemoff order */
-	qsort((char *) itemidbase, nitems, sizeof(itemIdSortData),
-		  itemoffcompare);
+	if (nitems > 48)
+	{
+		prefix_presort(itemidbase, nitems);
+		pg_insertion_sort(itemIdSortData, itemidbase, nitems, itemoffcompare);
+	}
+	else
+	{
+		pg_shell_sort(itemIdSortData, itemidbase, nitems, itemoffcompare);
+	}
 
 	upper = phdr->pd_special;
 	for (i = 0; i < nitems; i++)
diff --git a/src/include/c.h b/src/include/c.h
index fba07c6..837940d 100644
--- a/src/include/c.h
+++ b/src/include/c.h
@@ -962,6 +962,65 @@ typedef NameData *Name;
 #define unlikely(x) ((x) != 0)
 #endif
 
+/*
+ * pg_shell_sort - sort for small arrays with inlinable comparator.
+ * Since it is implemented as a macros it could be optimized together with
+ * comparison function.
+ * Gaps are "gap(i) = smallest prime number below e^i". They are close to
+ * Incerpi & Sedwick gaps, but looks to be better in average.
+ * It is best used with array smaller than 200 elements, and could be safely
+ * used upto 1000 elements. But it degrades fast after that, and it is
+ * better to use qsort.
+ *
+ * Arguments:
+ *	 elem_t - type of array element (for declaring temporary variables)
+ *	 array	- pointer to elements to be sorted
+ *	 nitems - amount of elements to be sorted
+ *	 cmp	- comparison function that accepts address of elements
+ *			  (it is compatible with qsort comparison function).
+ *	 Note: `cmp` argument evaluates many times.
+ *	 Other arguments evaluates once
+ */
+#define pg_shell_sort_pass(elem_t, cmp, off) \
+	do { \
+		int _i, _j; \
+		elem_t _temp; \
+		for (_i = off; _i < _n; _i += off) \
+		{ \
+			if ((cmp)(_arr+_i, _arr+_i-off) >= 0) \
+				continue; \
+			_temp = _arr[_i]; \
+			_j = _i; \
+			do { \
+				_arr[_j] = _arr[_j - off]; \
+				_j -= off; \
+			} while (_j >= off && (cmp)(_arr+_j-off, &_temp) > 0); \
+			_arr[_j] = _temp; \
+		} \
+	} while (0)
+#define pg_shell_sort(elem_t, array, nitems, cmp) \
+	do { \
+		int _n = (nitems); \
+		elem_t* _arr = (array); \
+		static const int _offsets[] = {401, 139, 53, 19, 7, 3}; \
+		int _noff, _off; \
+		for (_noff = 0; _noff < lengthof(_offsets); _noff++) { \
+			_off = _offsets[_noff]; \
+			pg_shell_sort_pass(elem_t, (cmp), _off); \
+		} \
+		pg_shell_sort_pass(elem_t, (cmp), 1); \
+	} while(0)
+/*
+ * pg_insertion_sort - just insertion sort for smallest array, or if
+ * array was almost sorted already. Uses same arguments as pg_shell_sort.
+ */
+#define pg_insertion_sort(elem_t, array, nitems, cmp) \
+	do { \
+		int _n = (nitems); \
+		elem_t* _arr = (array); \
+		pg_shell_sort_pass(elem_t, (cmp), 1); \
+	} while (0)
+
 
 /* ----------------------------------------------------------------
  *				Section 8:	random stuff
-- 
2.9.3

