From 6ad23129ae0318d09b95bb01c181c89a46a052c2 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Wed, 2 Mar 2022 18:02:03 -0800
Subject: [PATCH v65 01/11] dshash: Add sequential scan support.

Dshash did not allow scan the all entries sequentially. This adds the
functionality. The interface is similar but a bit different both from
that of dynahash and simple dshash search functions. One of the most
significant differences is the sequential scan interface of dshash
always needs a call to dshash_seq_term when scan ends. Another is
locking. Dshash holds partition lock when returning an entry,
dshash_seq_next() also holds lock when returning an entry but callers
shouldn't release it, since the lock is essential to continue a
scan. The seqscan interface allows entry deletion while a scan. The
in-scan deletion should be performed by dshash_delete_current().

Author: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
---
 src/include/lib/dshash.h         |  22 +++++
 src/backend/lib/dshash.c         | 162 ++++++++++++++++++++++++++++++-
 src/tools/pgindent/typedefs.list |   1 +
 3 files changed, 184 insertions(+), 1 deletion(-)

diff --git a/src/include/lib/dshash.h b/src/include/lib/dshash.h
index f3c57e76bfe..8ff4dbb59a0 100644
--- a/src/include/lib/dshash.h
+++ b/src/include/lib/dshash.h
@@ -59,6 +59,21 @@ typedef struct dshash_parameters
 struct dshash_table_item;
 typedef struct dshash_table_item dshash_table_item;
 
+/*
+ * Sequential scan state. The detail is exposed to let users know the storage
+ * size but it should be considered as an opaque type by callers.
+ */
+typedef struct dshash_seq_status
+{
+	dshash_table	   *hash_table;	/* dshash table working on */
+	int					curbucket;	/* bucket number we are at */
+	int					nbuckets;	/* total number of buckets in the dshash */
+	dshash_table_item  *curitem;	/* item we are currently at */
+	dsa_pointer			pnextitem;	/* dsa-pointer to the next item */
+	int					curpartition;	/* partition number we are at */
+	bool				exclusive;	/* locking mode */
+} dshash_seq_status;
+
 /* Creating, sharing and destroying from hash tables. */
 extern dshash_table *dshash_create(dsa_area *area,
 								   const dshash_parameters *params,
@@ -80,6 +95,13 @@ extern bool dshash_delete_key(dshash_table *hash_table, const void *key);
 extern void dshash_delete_entry(dshash_table *hash_table, void *entry);
 extern void dshash_release_lock(dshash_table *hash_table, void *entry);
 
+/* seq scan support */
+extern void dshash_seq_init(dshash_seq_status *status, dshash_table *hash_table,
+							bool exclusive);
+extern void *dshash_seq_next(dshash_seq_status *status);
+extern void dshash_seq_term(dshash_seq_status *status);
+extern void dshash_delete_current(dshash_seq_status *status);
+extern void *dshash_get_current(dshash_seq_status *status);
 /* Convenience hash and compare functions wrapping memcmp and tag_hash. */
 extern int	dshash_memcmp(const void *a, const void *b, size_t size, void *arg);
 extern dshash_hash dshash_memhash(const void *v, size_t size, void *arg);
diff --git a/src/backend/lib/dshash.c b/src/backend/lib/dshash.c
index decedb2605b..6a7201dd5a9 100644
--- a/src/backend/lib/dshash.c
+++ b/src/backend/lib/dshash.c
@@ -127,6 +127,10 @@ struct dshash_table
 #define NUM_SPLITS(size_log2)					\
 	(size_log2 - DSHASH_NUM_PARTITIONS_LOG2)
 
+/* How many buckets are there in a given size? */
+#define NUM_BUCKETS(size_log2)		\
+	(((size_t) 1) << (size_log2))
+
 /* How many buckets are there in each partition at a given size? */
 #define BUCKETS_PER_PARTITION(size_log2)		\
 	(((size_t) 1) << NUM_SPLITS(size_log2))
@@ -153,6 +157,10 @@ struct dshash_table
 #define BUCKET_INDEX_FOR_PARTITION(partition, size_log2)	\
 	((partition) << NUM_SPLITS(size_log2))
 
+/* Choose partition based on bucket index. */
+#define PARTITION_FOR_BUCKET_INDEX(bucket_idx, size_log2)				\
+	((bucket_idx) >> NUM_SPLITS(size_log2))
+
 /* The head of the active bucket for a given hash value (lvalue). */
 #define BUCKET_FOR_HASH(hash_table, hash)								\
 	(hash_table->buckets[												\
@@ -324,7 +332,7 @@ dshash_destroy(dshash_table *hash_table)
 	ensure_valid_bucket_pointers(hash_table);
 
 	/* Free all the entries. */
-	size = ((size_t) 1) << hash_table->size_log2;
+	size = NUM_BUCKETS(hash_table->size_log2);
 	for (i = 0; i < size; ++i)
 	{
 		dsa_pointer item_pointer = hash_table->buckets[i];
@@ -592,6 +600,158 @@ dshash_memhash(const void *v, size_t size, void *arg)
 	return tag_hash(v, size);
 }
 
+/*
+ * dshash_seq_init/_next/_term
+ *           Sequentially scan through dshash table and return all the
+ *           elements one by one, return NULL when no more.
+ *
+ * dshash_seq_term should always be called when a scan finished.
+ * The caller may delete returned elements midst of a scan by using
+ * dshash_delete_current(). exclusive must be true to delete elements.
+ */
+void
+dshash_seq_init(dshash_seq_status *status, dshash_table *hash_table,
+				bool exclusive)
+{
+	status->hash_table = hash_table;
+	status->curbucket = 0;
+	status->nbuckets = 0;
+	status->curitem = NULL;
+	status->pnextitem = InvalidDsaPointer;
+	status->curpartition = -1;
+	status->exclusive = exclusive;
+}
+
+/*
+ * Returns the next element.
+ *
+ * Returned elements are locked and the caller must not explicitly release
+ * it. It is released at the next call to dshash_next().
+ */
+void *
+dshash_seq_next(dshash_seq_status *status)
+{
+	dsa_pointer next_item_pointer;
+
+	if (status->curitem == NULL)
+	{
+		int partition;
+
+		Assert(status->curbucket == 0);
+		Assert(!status->hash_table->find_locked);
+
+		/* first shot. grab the first item. */
+		partition =
+			PARTITION_FOR_BUCKET_INDEX(status->curbucket,
+									   status->hash_table->size_log2);
+		LWLockAcquire(PARTITION_LOCK(status->hash_table, partition),
+					  status->exclusive ? LW_EXCLUSIVE : LW_SHARED);
+		status->curpartition = partition;
+
+		/* resize doesn't happen from now until seq scan ends */
+		status->nbuckets =
+			NUM_BUCKETS(status->hash_table->control->size_log2);
+		ensure_valid_bucket_pointers(status->hash_table);
+
+		next_item_pointer = status->hash_table->buckets[status->curbucket];
+	}
+	else
+		next_item_pointer = status->pnextitem;
+
+	Assert(LWLockHeldByMeInMode(PARTITION_LOCK(status->hash_table,
+											   status->curpartition),
+								status->exclusive ? LW_EXCLUSIVE : LW_SHARED));
+
+	/* Move to the next bucket if we finished the current bucket */
+	while (!DsaPointerIsValid(next_item_pointer))
+	{
+		int next_partition;
+
+		if (++status->curbucket >= status->nbuckets)
+		{
+			/* all buckets have been scanned. finish. */
+			return NULL;
+		}
+
+		/* Check if move to the next partition */
+		next_partition =
+			PARTITION_FOR_BUCKET_INDEX(status->curbucket,
+									   status->hash_table->size_log2);
+
+		if (status->curpartition != next_partition)
+		{
+			/*
+			 * Move to the next partition. Lock the next partition then release
+			 * the current, not in the reverse order to avoid concurrent
+			 * resizing.  Avoid dead lock by taking lock in the same order
+			 * with resize().
+			 */
+			LWLockAcquire(PARTITION_LOCK(status->hash_table,
+										 next_partition),
+						  status->exclusive ? LW_EXCLUSIVE : LW_SHARED);
+			LWLockRelease(PARTITION_LOCK(status->hash_table,
+										 status->curpartition));
+			status->curpartition = next_partition;
+		}
+
+		next_item_pointer = status->hash_table->buckets[status->curbucket];
+	}
+
+	status->curitem =
+		dsa_get_address(status->hash_table->area, next_item_pointer);
+	status->hash_table->find_locked = true;
+	status->hash_table->find_exclusively_locked = status->exclusive;
+
+	/*
+	 * The caller may delete the item. Store the next item in case of deletion.
+	 */
+	status->pnextitem = status->curitem->next;
+
+	return ENTRY_FROM_ITEM(status->curitem);
+}
+
+/*
+ * Terminates the seqscan and release all locks.
+ *
+ * Should be always called when finishing or exiting a seqscan.
+ */
+void
+dshash_seq_term(dshash_seq_status *status)
+{
+	status->hash_table->find_locked = false;
+	status->hash_table->find_exclusively_locked = false;
+
+	if (status->curpartition >= 0)
+		LWLockRelease(PARTITION_LOCK(status->hash_table, status->curpartition));
+}
+
+/* Remove the current entry while a seq scan. */
+void
+dshash_delete_current(dshash_seq_status *status)
+{
+	dshash_table	   *hash_table	= status->hash_table;
+	dshash_table_item  *item		= status->curitem;
+	size_t				partition PG_USED_FOR_ASSERTS_ONLY;
+
+	partition = PARTITION_FOR_HASH(item->hash);
+
+	Assert(status->exclusive);
+	Assert(hash_table->control->magic == DSHASH_MAGIC);
+	Assert(hash_table->find_locked);
+	Assert(hash_table->find_exclusively_locked);
+	Assert(LWLockHeldByMeInMode(PARTITION_LOCK(hash_table, partition),
+								LW_EXCLUSIVE));
+
+	delete_item(hash_table, item);
+}
+
+/* Get the current entry while a seq scan. */
+void *
+dshash_get_current(dshash_seq_status *status)
+{
+	return ENTRY_FROM_ITEM(status->curitem);
+}
+
 /*
  * Print debugging information about the internal state of the hash table to
  * stderr.  The caller must hold no partition locks.
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index d9b83f744fb..eaf3e7a8d44 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3103,6 +3103,7 @@ dshash_hash
 dshash_hash_function
 dshash_parameters
 dshash_partition
+dshash_seq_status
 dshash_table
 dshash_table_control
 dshash_table_handle
-- 
2.35.1.354.g715d08a9e5

