From 4fafc870690b7de5aeab7e583780546e8170f6d0 Mon Sep 17 00:00:00 2001
From: Sokolov Yura <funny.falcon@postgrespro.ru>
Date: Mon, 15 May 2017 14:23:39 +0300
Subject: [PATCH 1/2] Improve compactify_tuples

Items passed to compactify_tuples are almost sorted. So call to general
qsort makes unnecessary overhead on function calls (swapfunc, med,
itemoffcompare).

This patch implements bucket sort:
- one pass of stable prefix sort on high 8 bits of offset
- and insertion sort for buckets larger than 1 element

Also for smaller arrays shell sort is used.

Insertion and Shell sort are implemented using macroses.

This approach allows to save 3% of cpu in degenerate case
(highly intensive HOT random updates on unlogged table with
 synchronized_commit=off), and speeds up WAL replaying (as were
found by Heikki Linnakangas).

Same approach were implemented by Heikki Linnakangas some time ago with
several distinctions.
---
 src/backend/storage/page/bufpage.c | 86 ++++++++++++++++++++++++++++++++++++--
 src/include/c.h                    | 59 ++++++++++++++++++++++++++
 2 files changed, 142 insertions(+), 3 deletions(-)

diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c
index fdf045a45b..c1bb11c354 100644
--- a/src/backend/storage/page/bufpage.c
+++ b/src/backend/storage/page/bufpage.c
@@ -434,6 +434,85 @@ itemoffcompare(const void *itemidp1, const void *itemidp2)
 }
 
 /*
+ * Sort an array of itemIdSort's on itemoff, descending.
+ *
+ * It uses Shell sort. Given array is small and itemoffcompare is inlined,
+ * it is much faster than call to qsort.
+ */
+static void
+sort_itemIds_small(itemIdSort itemidbase, int nitems)
+{
+	pg_shell_sort(itemIdSortData, itemidbase, nitems, itemoffcompare);
+}
+
+/*
+ * Sort an array of itemIdSort's on itemoff, descending.
+ *
+ * It uses bucket sort:
+ * - single pass of stable prefix sort on high 8 bits
+ * - and insertion sort on buckets larger than 1 element
+ */
+static void
+sort_itemIds(itemIdSort itemidbase, int nitems)
+{
+	itemIdSortData copy[Max(MaxIndexTuplesPerPage, MaxHeapTuplesPerPage)];
+#define NSPLIT 256
+#define PREFDIV (BLCKSZ / NSPLIT)
+	/* two extra elements to emulate offset on previous step */
+	uint16		count[NSPLIT + 2] = {0};
+	int			i,
+				max,
+				total,
+				pos,
+				highbits;
+
+	Assert(nitems <= MaxIndexTuplesPerPage);
+	for (i = 0; i < nitems; i++)
+	{
+		highbits = itemidbase[i].itemoff / PREFDIV;
+		count[highbits]++;
+	}
+	/* sort in decreasing order */
+	max = total = count[NSPLIT - 1];
+	for (i = NSPLIT - 2; i >= 0; i--)
+	{
+		max |= count[i];
+		total += count[i];
+		count[i] = total;
+	}
+
+	/*
+	 * count[k+1] is start of bucket k, count[k] is end of bucket k, and
+	 * count[k] - count[k+1] is length of bucket k.
+	 */
+	Assert(count[0] == nitems);
+	for (i = 0; i < nitems; i++)
+	{
+		highbits = itemidbase[i].itemoff / PREFDIV;
+		pos = count[highbits + 1];
+		count[highbits + 1]++;
+		copy[pos] = itemidbase[i];
+	}
+	Assert(count[1] == nitems);
+
+	if (max == 1)
+		goto end;
+	/*
+	 * count[k+2] is start of bucket k, count[k+1] is end of bucket k, and
+	 * count[k+1]-count[k+2] is length of bucket k.
+	 */
+	for (i = NSPLIT; i > 0; i--)
+	{
+		pg_insertion_sort(itemIdSortData,
+						  copy + count[i + 1],
+						  count[i] - count[i + 1],
+						  itemoffcompare);
+	}
+end:
+	memcpy(itemidbase, copy, sizeof(itemIdSortData) * nitems);
+}
+
+/*
  * After removing or marking some line pointers unused, move the tuples to
  * remove the gaps caused by the removed items.
  */
@@ -444,9 +523,10 @@ compactify_tuples(itemIdSort itemidbase, int nitems, Page page)
 	Offset		upper;
 	int			i;
 
-	/* sort itemIdSortData array into decreasing itemoff order */
-	qsort((char *) itemidbase, nitems, sizeof(itemIdSortData),
-		  itemoffcompare);
+	if (nitems > 48)
+		sort_itemIds(itemidbase, nitems);
+	else
+		sort_itemIds_small(itemidbase, nitems);
 
 	upper = phdr->pd_special;
 	for (i = 0; i < nitems; i++)
diff --git a/src/include/c.h b/src/include/c.h
index fba07c651f..837940d5cf 100644
--- a/src/include/c.h
+++ b/src/include/c.h
@@ -962,6 +962,65 @@ typedef NameData *Name;
 #define unlikely(x) ((x) != 0)
 #endif
 
+/*
+ * pg_shell_sort - sort for small arrays with inlinable comparator.
+ * Since it is implemented as a macros it could be optimized together with
+ * comparison function.
+ * Gaps are "gap(i) = smallest prime number below e^i". They are close to
+ * Incerpi & Sedwick gaps, but looks to be better in average.
+ * It is best used with array smaller than 200 elements, and could be safely
+ * used upto 1000 elements. But it degrades fast after that, and it is
+ * better to use qsort.
+ *
+ * Arguments:
+ *	 elem_t - type of array element (for declaring temporary variables)
+ *	 array	- pointer to elements to be sorted
+ *	 nitems - amount of elements to be sorted
+ *	 cmp	- comparison function that accepts address of elements
+ *			  (it is compatible with qsort comparison function).
+ *	 Note: `cmp` argument evaluates many times.
+ *	 Other arguments evaluates once
+ */
+#define pg_shell_sort_pass(elem_t, cmp, off) \
+	do { \
+		int _i, _j; \
+		elem_t _temp; \
+		for (_i = off; _i < _n; _i += off) \
+		{ \
+			if ((cmp)(_arr+_i, _arr+_i-off) >= 0) \
+				continue; \
+			_temp = _arr[_i]; \
+			_j = _i; \
+			do { \
+				_arr[_j] = _arr[_j - off]; \
+				_j -= off; \
+			} while (_j >= off && (cmp)(_arr+_j-off, &_temp) > 0); \
+			_arr[_j] = _temp; \
+		} \
+	} while (0)
+#define pg_shell_sort(elem_t, array, nitems, cmp) \
+	do { \
+		int _n = (nitems); \
+		elem_t* _arr = (array); \
+		static const int _offsets[] = {401, 139, 53, 19, 7, 3}; \
+		int _noff, _off; \
+		for (_noff = 0; _noff < lengthof(_offsets); _noff++) { \
+			_off = _offsets[_noff]; \
+			pg_shell_sort_pass(elem_t, (cmp), _off); \
+		} \
+		pg_shell_sort_pass(elem_t, (cmp), 1); \
+	} while(0)
+/*
+ * pg_insertion_sort - just insertion sort for smallest array, or if
+ * array was almost sorted already. Uses same arguments as pg_shell_sort.
+ */
+#define pg_insertion_sort(elem_t, array, nitems, cmp) \
+	do { \
+		int _n = (nitems); \
+		elem_t* _arr = (array); \
+		pg_shell_sort_pass(elem_t, (cmp), 1); \
+	} while (0)
+
 
 /* ----------------------------------------------------------------
  *				Section 8:	random stuff
-- 
2.11.0

