From fd324f39df941e2a692a9c950c8182ed07502140 Mon Sep 17 00:00:00 2001
From: Alexander Pyhalov <a.pyhalov@postgrespro.ru>
Date: Tue, 16 Dec 2025 18:46:27 +0300
Subject: [PATCH 2/3] Limit batch_size for foreign insert with work_mem

Option "batch_size" can be set on foreign server level.
It can be very optimistic even for one table with
different tuple lengths. To prevent large memory usage
limit effective batch size with work_mem.

When tuple is more than work_mem, we don't form batch,
but fall through to usual foreign insert. This means
that executor can form batch, insert it and finish with
a usual insert, so there will be no pending batch inserts
to process. To avoid manipulation with possibly long lists,
we don't clear es_insert_pending_result_relations and
es_insert_pending_modifytables lists, but check in
ExecPendingInserts() if there are any pending tuples.
---
 .../postgres_fdw/expected/postgres_fdw.out    |  11 ++
 contrib/postgres_fdw/sql/postgres_fdw.sql     |   9 ++
 src/backend/executor/nodeModifyTable.c        | 149 ++++++++++--------
 src/include/nodes/execnodes.h                 |   2 +
 4 files changed, 107 insertions(+), 64 deletions(-)

diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 6066510c7c0..cb3044c769a 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -7640,6 +7640,17 @@ select count(*) from tab_batch_sharded;
     45
 (1 row)
 
+delete from tab_batch_sharded;
+-- test batch insert with large tuples and switching from batch to usual insert
+set work_mem to 64;
+insert into tab_batch_sharded select 3, case when i%4 = 0 then lpad('a',65*1024,'a') else 'test'|| i end from generate_series(1, 100) i;
+select count(*) from tab_batch_sharded;
+ count 
+-------
+   100
+(1 row)
+
+reset work_mem;
 drop table tab_batch_local;
 drop table tab_batch_sharded;
 drop table tab_batch_sharded_p1_remote;
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 4f7ab2ed0ac..a084afc8de7 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -1944,10 +1944,19 @@ create foreign table tab_batch_sharded_p1 partition of tab_batch_sharded
   server loopback options (table_name 'tab_batch_sharded_p1_remote');
 insert into tab_batch_sharded select * from tab_batch_local;
 select count(*) from tab_batch_sharded;
+delete from tab_batch_sharded;
+-- test batch insert with large tuples and switching from batch to usual insert
+set work_mem to 64;
+insert into tab_batch_sharded select 3, case when i%4 = 0 then lpad('a',65*1024,'a') else 'test'|| i end from generate_series(1, 100) i;
+select count(*) from tab_batch_sharded;
+reset work_mem;
+
 drop table tab_batch_local;
 drop table tab_batch_sharded;
 drop table tab_batch_sharded_p1_remote;
 
+
+
 alter server loopback options (drop batch_size);
 
 -- ===================================================================
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 7d7411a7056..fcbffdd2dec 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -942,87 +942,100 @@ ExecInsert(ModifyTableContext *context,
 		 */
 		if (resultRelInfo->ri_BatchSize > 1)
 		{
-			bool		flushed = false;
+			Size		tuple_len;
+
+			/* Compute length of the current tuple */
+			slot_getallattrs(slot);
+			tuple_len = estimate_tuple_size(slot->tts_tupleDescriptor, slot->tts_values, slot->tts_isnull);
 
 			/*
-			 * When we've reached the desired batch size, perform the
-			 * insertion.
+			 * When we've reached the desired batch size or exceeded work_mem,
+			 * perform the insertion.
 			 */
-			if (resultRelInfo->ri_NumSlots == resultRelInfo->ri_BatchSize)
+			if (resultRelInfo->ri_NumSlots == resultRelInfo->ri_BatchSize ||
+				((tuple_len + resultRelInfo->ri_BatchMemoryUsed > work_mem * 1024) && (resultRelInfo->ri_NumSlots > 0)))
 			{
 				ExecBatchInsert(mtstate, resultRelInfo,
 								resultRelInfo->ri_Slots,
 								resultRelInfo->ri_PlanSlots,
 								resultRelInfo->ri_NumSlots,
 								estate, canSetTag);
-				flushed = true;
 			}
 
-			oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
-
-			if (resultRelInfo->ri_Slots == NULL)
+			if (tuple_len < work_mem * 1024)
 			{
-				resultRelInfo->ri_Slots = palloc_array(TupleTableSlot *, resultRelInfo->ri_BatchSize);
-				resultRelInfo->ri_PlanSlots = palloc_array(TupleTableSlot *, resultRelInfo->ri_BatchSize);
-			}
+				oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
 
-			/*
-			 * Initialize the batch slots. We don't know how many slots will
-			 * be needed, so we initialize them as the batch grows, and we
-			 * keep them across batches. To mitigate an inefficiency in how
-			 * resource owner handles objects with many references (as with
-			 * many slots all referencing the same tuple descriptor) we copy
-			 * the appropriate tuple descriptor for each slot.
-			 */
-			if (resultRelInfo->ri_NumSlots >= resultRelInfo->ri_NumSlotsInitialized)
-			{
-				TupleDesc	tdesc = CreateTupleDescCopy(slot->tts_tupleDescriptor);
-				TupleDesc	plan_tdesc =
-					CreateTupleDescCopy(planSlot->tts_tupleDescriptor);
+				if (resultRelInfo->ri_Slots == NULL)
+				{
+					resultRelInfo->ri_Slots = palloc_array(TupleTableSlot *, resultRelInfo->ri_BatchSize);
+					resultRelInfo->ri_PlanSlots = palloc_array(TupleTableSlot *, resultRelInfo->ri_BatchSize);
+				}
 
-				resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots] =
-					MakeSingleTupleTableSlot(tdesc, slot->tts_ops);
+				/*
+				 * Initialize the batch slots. We don't know how many slots
+				 * will be needed, so we initialize them as the batch grows,
+				 * and we keep them across batches. To mitigate an
+				 * inefficiency in how resource owner handles objects with
+				 * many references (as with many slots all referencing the
+				 * same tuple descriptor) we copy the appropriate tuple
+				 * descriptor for each slot.
+				 */
+				if (resultRelInfo->ri_NumSlots >= resultRelInfo->ri_NumSlotsInitialized)
+				{
+					TupleDesc	tdesc = CreateTupleDescCopy(slot->tts_tupleDescriptor);
+					TupleDesc	plan_tdesc =
+						CreateTupleDescCopy(planSlot->tts_tupleDescriptor);
 
-				resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots] =
-					MakeSingleTupleTableSlot(plan_tdesc, planSlot->tts_ops);
+					resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots] =
+						MakeSingleTupleTableSlot(tdesc, slot->tts_ops);
 
-				/* remember how many batch slots we initialized */
-				resultRelInfo->ri_NumSlotsInitialized++;
-			}
+					resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots] =
+						MakeSingleTupleTableSlot(plan_tdesc, planSlot->tts_ops);
 
-			ExecCopySlot(resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots],
-						 slot);
+					/* remember how many batch slots we initialized */
+					resultRelInfo->ri_NumSlotsInitialized++;
+				}
 
-			ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots],
-						 planSlot);
+				ExecCopySlot(resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots],
+							 slot);
 
-			/*
-			 * If these are the first tuples stored in the buffers, add the
-			 * target rel and the mtstate to the
-			 * es_insert_pending_result_relations and
-			 * es_insert_pending_modifytables lists respectively, except in
-			 * the case where flushing was done above, in which case they
-			 * would already have been added to the lists, so no need to do
-			 * this.
-			 */
-			if (resultRelInfo->ri_NumSlots == 0 && !flushed)
-			{
-				Assert(!list_member_ptr(estate->es_insert_pending_result_relations,
-										resultRelInfo));
-				estate->es_insert_pending_result_relations =
-					lappend(estate->es_insert_pending_result_relations,
-							resultRelInfo);
-				estate->es_insert_pending_modifytables =
-					lappend(estate->es_insert_pending_modifytables, mtstate);
-			}
-			Assert(list_member_ptr(estate->es_insert_pending_result_relations,
-								   resultRelInfo));
+				ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots],
+							 planSlot);
+
+				/*
+				 * If these are the first tuples stored in the buffers, add
+				 * the target rel and the mtstate to the
+				 * es_insert_pending_result_relations and
+				 * es_insert_pending_modifytables lists respectively, except
+				 * in the case where flushing was done above, in which case
+				 * they would already have been added to the lists, so no need
+				 * to do this.
+				 */
+				if (resultRelInfo->ri_NumSlots == 0 && !resultRelInfo->ri_ExecutorPendingStateModified)
+				{
+					Assert(!list_member_ptr(estate->es_insert_pending_result_relations,
+											resultRelInfo));
+					estate->es_insert_pending_result_relations =
+						lappend(estate->es_insert_pending_result_relations,
+								resultRelInfo);
+					estate->es_insert_pending_modifytables =
+						lappend(estate->es_insert_pending_modifytables, mtstate);
+					resultRelInfo->ri_ExecutorPendingStateModified = true;
+				}
+				Assert(list_member_ptr(estate->es_insert_pending_result_relations,
+									   resultRelInfo));
 
-			resultRelInfo->ri_NumSlots++;
+				resultRelInfo->ri_NumSlots++;
+				resultRelInfo->ri_BatchMemoryUsed += tuple_len;
+				/* We've flushed batch if it is too big. */
+				Assert(resultRelInfo->ri_BatchMemoryUsed < work_mem * 1024);
 
-			MemoryContextSwitchTo(oldContext);
+				MemoryContextSwitchTo(oldContext);
 
-			return NULL;
+				return NULL;
+			}
+			/* else do usual foreign insert */
 		}
 
 		/*
@@ -1418,6 +1431,7 @@ ExecBatchInsert(ModifyTableState *mtstate,
 		ExecClearTuple(planSlots[i]);
 	}
 	resultRelInfo->ri_NumSlots = 0;
+	resultRelInfo->ri_BatchMemoryUsed = 0;
 }
 
 /*
@@ -1436,11 +1450,18 @@ ExecPendingInserts(EState *estate)
 		ModifyTableState *mtstate = (ModifyTableState *) lfirst(l2);
 
 		Assert(mtstate);
-		ExecBatchInsert(mtstate, resultRelInfo,
-						resultRelInfo->ri_Slots,
-						resultRelInfo->ri_PlanSlots,
-						resultRelInfo->ri_NumSlots,
-						estate, mtstate->canSetTag);
+
+		/*
+		 * Batch insert could switch to non-batched insert, in this case we
+		 * could have no filled slots.
+		 */
+		if (resultRelInfo->ri_NumSlots > 0)
+			ExecBatchInsert(mtstate, resultRelInfo,
+							resultRelInfo->ri_Slots,
+							resultRelInfo->ri_PlanSlots,
+							resultRelInfo->ri_NumSlots,
+							estate, mtstate->canSetTag);
+		resultRelInfo->ri_ExecutorPendingStateModified = false;
 	}
 
 	list_free(estate->es_insert_pending_result_relations);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index f8053d9e572..3220f9b2574 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -544,6 +544,8 @@ typedef struct ResultRelInfo
 	int			ri_NumSlots;	/* number of slots in the array */
 	int			ri_NumSlotsInitialized; /* number of initialized slots */
 	int			ri_BatchSize;	/* max slots inserted in a single batch */
+	int			ri_BatchMemoryUsed;	/* memory used by batch */
+	bool			ri_ExecutorPendingStateModified; /* true if member of estate->es_insert_pending_result_relations */
 	TupleTableSlot **ri_Slots;	/* input tuples for batch insert */
 	TupleTableSlot **ri_PlanSlots;
 
-- 
2.43.0

