diff --git a/src/backend/utils/cache/plancache.c b/src/backend/utils/cache/plancache.c
index dbae18d..8e27b03 100644
--- a/src/backend/utils/cache/plancache.c
+++ b/src/backend/utils/cache/plancache.c
@@ -1278,6 +1278,160 @@ ReleaseCachedPlan(CachedPlan *plan, bool useResOwner)
 }
 
 /*
+ * CachedPlanAllowsSimpleValidityCheck: can we use CachedPlanIsSimplyValid?
+ *
+ * This function, together with CachedPlanIsSimplyValid, provides a fast path
+ * for revalidating "simple" generic plans.  The core requirement to be simple
+ * is that the plan must not require taking any locks, which translates to
+ * not touching any tables; this happens to match up well with an important
+ * use-case in PL/pgSQL.  This function tests whether that's true, along
+ * with checking some other corner cases that we'd rather not bother with
+ * handling in the fast path.  (Note that it's still possible for such a plan
+ * to be invalidated, for example due to a change in a function that was
+ * inlined into the plan.)
+ *
+ * This must only be called on known-valid generic plans (eg, ones just
+ * returned by GetCachedPlan).  If it returns true, the caller may re-use
+ * the cached plan as long as CachedPlanIsSimplyValid returns true; that
+ * check is much cheaper than the full revalidation done by GetCachedPlan.
+ * Nonetheless, no required checks are omitted.
+ */
+bool
+CachedPlanAllowsSimpleValidityCheck(CachedPlanSource *plansource,
+									CachedPlan *plan)
+{
+	ListCell   *lc;
+
+	/* Sanity-check that the caller gave us a validated generic plan. */
+	Assert(plansource->magic == CACHEDPLANSOURCE_MAGIC);
+	Assert(plan->magic == CACHEDPLAN_MAGIC);
+	Assert(plansource->is_valid);
+	Assert(plan->is_valid);
+	Assert(plan == plansource->gplan);
+
+	/* We don't support oneshot plans here. */
+	if (plansource->is_oneshot)
+		return false;
+	Assert(!plan->is_oneshot);
+
+	/*
+	 * If the plan is dependent on RLS considerations, or it's transient,
+	 * reject.  These things probably can't ever happen for table-free
+	 * queries, but for safety's sake let's check.
+	 */
+	if (plansource->dependsOnRLS)
+		return false;
+	if (plan->dependsOnRole)
+		return false;
+	if (TransactionIdIsValid(plan->saved_xmin))
+		return false;
+
+	/*
+	 * Reject if AcquirePlannerLocks would have anything to do.  This is
+	 * simplistic, but there's no need to inquire any more carefully; indeed,
+	 * for current callers it shouldn't even be possible to hit any of these
+	 * checks.
+	 */
+	foreach(lc, plansource->query_list)
+	{
+		Query	   *query = lfirst_node(Query, lc);
+
+		if (query->commandType == CMD_UTILITY)
+			return false;
+		if (query->rtable || query->cteList || query->hasSubLinks)
+			return false;
+	}
+
+	/*
+	 * Reject if AcquireExecutorLocks would have anything to do.  This is
+	 * probably unnecessary given the previous check, but let's be safe.
+	 */
+	foreach(lc, plan->stmt_list)
+	{
+		PlannedStmt *plannedstmt = lfirst_node(PlannedStmt, lc);
+		ListCell   *lc2;
+
+		if (plannedstmt->commandType == CMD_UTILITY)
+			return false;
+
+		/*
+		 * We have to grovel through the rtable because it's likely to contain
+		 * an RTE_RESULT relation, rather than being totally empty.
+		 */
+		foreach(lc2, plannedstmt->rtable)
+		{
+			RangeTblEntry *rte = (RangeTblEntry *) lfirst(lc2);
+
+			if (rte->rtekind == RTE_RELATION)
+				return false;
+		}
+	}
+
+	/*
+	 * Okay, it's simple.  Note that what we've primarily established here is
+	 * that no locks need be taken before checking the plan's is_valid flag.
+	 */
+	return true;
+}
+
+/*
+ * CachedPlanIsSimplyValid: quick check for plan still being valid
+ *
+ * This function must not be used unless CachedPlanAllowsSimpleValidityCheck
+ * previously said it was OK.
+ *
+ * If the plan is valid, and "owner" is not NULL, record a refcount on
+ * the plan in that resowner before returning.  It is caller's responsibility
+ * to be sure that a refcount is held on any plan that's being actively used.
+ *
+ * The code here is unconditionally safe as long as the only use of this
+ * CachedPlanSource is in connection with the particular CachedPlan pointer
+ * that's passed in.  If the plansource were being used for other purposes,
+ * it's possible that its generic plan could be invalidated and regenerated
+ * while the current caller wasn't looking, and then there could be a chance
+ * collision of address between this caller's now-stale plan pointer and the
+ * actual address of the new generic plan.  For current uses, that scenario
+ * can't happen; but with a plansource shared across multiple uses, it'd be
+ * advisable to also save plan->generation and verify that that still matches.
+ */
+bool
+CachedPlanIsSimplyValid(CachedPlanSource *plansource, CachedPlan *plan,
+						ResourceOwner owner)
+{
+	/*
+	 * Careful here: since the caller doesn't necessarily hold a refcount on
+	 * the plan to start with, it's possible that "plan" is a dangling
+	 * pointer.  Don't dereference it until we've verified that it still
+	 * matches the plansource's gplan (which is either valid or NULL).
+	 */
+	Assert(plansource->magic == CACHEDPLANSOURCE_MAGIC);
+
+	/*
+	 * Has cache invalidation fired on this plan?  We can check this right
+	 * away since there are no locks that we'd need to acquire first.
+	 */
+	if (!plansource->is_valid || plan != plansource->gplan || !plan->is_valid)
+		return false;
+
+	Assert(plan->magic == CACHEDPLAN_MAGIC);
+
+	/* Is the search_path still the same as when we made it? */
+	Assert(plansource->search_path != NULL);
+	if (!OverrideSearchPathMatchesCurrent(plansource->search_path))
+		return false;
+
+	/* It's still good.  Bump refcount if requested. */
+	if (owner)
+	{
+		ResourceOwnerEnlargePlanCacheRefs(owner);
+		plan->refcount++;
+		ResourceOwnerRememberPlanCacheRef(owner, plan);
+	}
+
+	return true;
+}
+
+/*
  * CachedPlanSetParentContext: move a CachedPlanSource to a new memory context
  *
  * This can only be applied to unsaved plans; once saved, a plan always
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index 3c39e48..8bc2c4e 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -679,6 +679,30 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
 }
 
 /*
+ * ResourceOwnerReleaseAllPlanCacheRefs
+ *		Release the plancache references (only) held by this owner.
+ *
+ * We might eventually add similar functions for other resource types,
+ * but for now, only this is needed.
+ */
+void
+ResourceOwnerReleaseAllPlanCacheRefs(ResourceOwner owner)
+{
+	ResourceOwner save;
+	Datum		foundres;
+
+	save = CurrentResourceOwner;
+	CurrentResourceOwner = owner;
+	while (ResourceArrayGetAny(&(owner->planrefarr), &foundres))
+	{
+		CachedPlan *res = (CachedPlan *) DatumGetPointer(foundres);
+
+		ReleaseCachedPlan(res, true);
+	}
+	CurrentResourceOwner = save;
+}
+
+/*
  * ResourceOwnerDelete
  *		Delete an owner object and its descendants.
  *
diff --git a/src/include/utils/plancache.h b/src/include/utils/plancache.h
index 6a5953c..037edd0 100644
--- a/src/include/utils/plancache.h
+++ b/src/include/utils/plancache.h
@@ -20,6 +20,8 @@
 #include "nodes/params.h"
 #include "tcop/cmdtag.h"
 #include "utils/queryenvironment.h"
+#include "utils/resowner.h"
+
 
 /* Forward declaration, to avoid including parsenodes.h here */
 struct RawStmt;
@@ -220,6 +222,12 @@ extern CachedPlan *GetCachedPlan(CachedPlanSource *plansource,
 								 QueryEnvironment *queryEnv);
 extern void ReleaseCachedPlan(CachedPlan *plan, bool useResOwner);
 
+extern bool CachedPlanAllowsSimpleValidityCheck(CachedPlanSource *plansource,
+												CachedPlan *plan);
+extern bool CachedPlanIsSimplyValid(CachedPlanSource *plansource,
+									CachedPlan *plan,
+									ResourceOwner owner);
+
 extern CachedExpression *GetCachedExpression(Node *expr);
 extern void FreeCachedExpression(CachedExpression *cexpr);
 
diff --git a/src/include/utils/resowner.h b/src/include/utils/resowner.h
index 850bf69..878f39c 100644
--- a/src/include/utils/resowner.h
+++ b/src/include/utils/resowner.h
@@ -71,6 +71,7 @@ extern void ResourceOwnerRelease(ResourceOwner owner,
 								 ResourceReleasePhase phase,
 								 bool isCommit,
 								 bool isTopLevel);
+extern void ResourceOwnerReleaseAllPlanCacheRefs(ResourceOwner owner);
 extern void ResourceOwnerDelete(ResourceOwner owner);
 extern ResourceOwner ResourceOwnerGetParent(ResourceOwner owner);
 extern void ResourceOwnerNewParent(ResourceOwner owner,
diff --git a/src/pl/plpgsql/src/pl_exec.c b/src/pl/plpgsql/src/pl_exec.c
index d3ad4fa..6f8ae2c 100644
--- a/src/pl/plpgsql/src/pl_exec.c
+++ b/src/pl/plpgsql/src/pl_exec.c
@@ -84,6 +84,13 @@ typedef struct
  * has its own simple-expression EState, which is cleaned up at exit from
  * plpgsql_inline_handler().  DO blocks still use the simple_econtext_stack,
  * though, so that subxact abort cleanup does the right thing.
+ *
+ * (However, if a DO block executes COMMIT or ROLLBACK, then exec_stmt_commit
+ * or exec_stmt_rollback will unlink it from the DO's simple-expression EState
+ * and create a new shared EState that will be used thenceforth.  The original
+ * EState will be cleaned up when we get back to plpgsql_inline_handler.  This
+ * is a bit ugly, but it isn't worth doing better, since scenarios like this
+ * can't result in indefinite accumulation of state trees.)
  */
 typedef struct SimpleEcontextStackEntry
 {
@@ -96,6 +103,15 @@ static EState *shared_simple_eval_estate = NULL;
 static SimpleEcontextStackEntry *simple_econtext_stack = NULL;
 
 /*
+ * In addition to the shared simple-eval EState, we have a shared resource
+ * owner that holds refcounts on the CachedPlans for any "simple" expressions
+ * we have evaluated in the current transaction.  This allows us to avoid
+ * continually grabbing and releasing a plan refcount when a simple expression
+ * is used over and over.
+ */
+static ResourceOwner shared_simple_eval_resowner = NULL;
+
+/*
  * Memory management within a plpgsql function generally works with three
  * contexts:
  *
@@ -314,7 +330,8 @@ static int	exec_stmt_set(PLpgSQL_execstate *estate,
 static void plpgsql_estate_setup(PLpgSQL_execstate *estate,
 								 PLpgSQL_function *func,
 								 ReturnSetInfo *rsi,
-								 EState *simple_eval_estate);
+								 EState *simple_eval_estate,
+								 ResourceOwner simple_eval_resowner);
 static void exec_eval_cleanup(PLpgSQL_execstate *estate);
 
 static void exec_prepare_plan(PLpgSQL_execstate *estate,
@@ -440,16 +457,19 @@ static char *format_preparedparamsdata(PLpgSQL_execstate *estate,
  *
  * This is also used to execute inline code blocks (DO blocks).  The only
  * difference that this code is aware of is that for a DO block, we want
- * to use a private simple_eval_estate, which is created and passed in by
- * the caller.  For regular functions, pass NULL, which implies using
- * shared_simple_eval_estate.  (When using a private simple_eval_estate,
+ * to use a private simple_eval_estate and a private simple_eval_resowner,
+ * which are created and passed in by the caller.  For regular functions,
+ * pass NULL, which implies using shared_simple_eval_estate and
+ * shared_simple_eval_resowner.  (When using a private simple_eval_estate,
  * we must also use a private cast hashtable, but that's taken care of
  * within plpgsql_estate_setup.)
  * ----------
  */
 Datum
 plpgsql_exec_function(PLpgSQL_function *func, FunctionCallInfo fcinfo,
-					  EState *simple_eval_estate, bool atomic)
+					  EState *simple_eval_estate,
+					  ResourceOwner simple_eval_resowner,
+					  bool atomic)
 {
 	PLpgSQL_execstate estate;
 	ErrorContextCallback plerrcontext;
@@ -460,7 +480,7 @@ plpgsql_exec_function(PLpgSQL_function *func, FunctionCallInfo fcinfo,
 	 * Setup the execution state
 	 */
 	plpgsql_estate_setup(&estate, func, (ReturnSetInfo *) fcinfo->resultinfo,
-						 simple_eval_estate);
+						 simple_eval_estate, simple_eval_resowner);
 	estate.atomic = atomic;
 
 	/*
@@ -897,7 +917,7 @@ plpgsql_exec_trigger(PLpgSQL_function *func,
 	/*
 	 * Setup the execution state
 	 */
-	plpgsql_estate_setup(&estate, func, NULL, NULL);
+	plpgsql_estate_setup(&estate, func, NULL, NULL, NULL);
 	estate.trigdata = trigdata;
 
 	/*
@@ -1135,7 +1155,7 @@ plpgsql_exec_event_trigger(PLpgSQL_function *func, EventTriggerData *trigdata)
 	/*
 	 * Setup the execution state
 	 */
-	plpgsql_estate_setup(&estate, func, NULL, NULL);
+	plpgsql_estate_setup(&estate, func, NULL, NULL, NULL);
 	estate.evtrigdata = trigdata;
 
 	/*
@@ -2319,6 +2339,7 @@ exec_stmt_call(PLpgSQL_execstate *estate, PLpgSQL_stmt_call *stmt)
 		 * some internal state.
 		 */
 		estate->simple_eval_estate = NULL;
+		estate->simple_eval_resowner = NULL;
 		plpgsql_create_econtext(estate);
 	}
 
@@ -3874,7 +3895,8 @@ static void
 plpgsql_estate_setup(PLpgSQL_execstate *estate,
 					 PLpgSQL_function *func,
 					 ReturnSetInfo *rsi,
-					 EState *simple_eval_estate)
+					 EState *simple_eval_estate,
+					 ResourceOwner simple_eval_resowner)
 {
 	HASHCTL		ctl;
 
@@ -3965,6 +3987,11 @@ plpgsql_estate_setup(PLpgSQL_execstate *estate,
 		estate->cast_hash = shared_cast_hash;
 		estate->cast_hash_context = shared_cast_context;
 	}
+	/* likewise for the simple-expression resource owner */
+	if (simple_eval_resowner)
+		estate->simple_eval_resowner = simple_eval_resowner;
+	else
+		estate->simple_eval_resowner = shared_simple_eval_resowner;
 
 	/*
 	 * We start with no stmt_mcontext; one will be created only if needed.
@@ -4825,6 +4852,7 @@ exec_stmt_commit(PLpgSQL_execstate *estate, PLpgSQL_stmt_commit *stmt)
 	}
 
 	estate->simple_eval_estate = NULL;
+	estate->simple_eval_resowner = NULL;
 	plpgsql_create_econtext(estate);
 
 	return PLPGSQL_RC_OK;
@@ -4847,6 +4875,7 @@ exec_stmt_rollback(PLpgSQL_execstate *estate, PLpgSQL_stmt_rollback *stmt)
 	}
 
 	estate->simple_eval_estate = NULL;
+	estate->simple_eval_resowner = NULL;
 	plpgsql_create_econtext(estate);
 
 	return PLPGSQL_RC_OK;
@@ -6059,8 +6088,6 @@ loop_exit:
  * someone might redefine a SQL function that had been inlined into the simple
  * expression.  That cannot cause a simple expression to become non-simple (or
  * vice versa), but we do have to handle replacing the expression tree.
- * Fortunately it's normally inexpensive to call SPI_plan_get_cached_plan for
- * a simple expression.
  *
  * Note: if pass-by-reference, the result is in the eval_mcontext.
  * It will be freed when exec_eval_cleanup is done.
@@ -6076,7 +6103,7 @@ exec_eval_simple_expr(PLpgSQL_execstate *estate,
 {
 	ExprContext *econtext = estate->eval_econtext;
 	LocalTransactionId curlxid = MyProc->lxid;
-	CachedPlan *cplan;
+	ParamListInfo paramLI;
 	void	   *save_setup_arg;
 	bool		need_snapshot;
 	MemoryContext oldcontext;
@@ -6090,29 +6117,92 @@ exec_eval_simple_expr(PLpgSQL_execstate *estate,
 	/*
 	 * If expression is in use in current xact, don't touch it.
 	 */
-	if (expr->expr_simple_in_use && expr->expr_simple_lxid == curlxid)
+	if (unlikely(expr->expr_simple_in_use) &&
+		expr->expr_simple_lxid == curlxid)
 		return false;
 
 	/*
-	 * Revalidate cached plan, so that we will notice if it became stale. (We
-	 * need to hold a refcount while using the plan, anyway.)  If replanning
-	 * is needed, do that work in the eval_mcontext.
+	 * Check to see if the cached plan has been invalidated.  If not, and this
+	 * is the first use in the current transaction, save a plan refcount in
+	 * the simple-expression resowner.
 	 */
-	oldcontext = MemoryContextSwitchTo(get_eval_mcontext(estate));
-	cplan = SPI_plan_get_cached_plan(expr->plan);
-	MemoryContextSwitchTo(oldcontext);
+	if (likely(CachedPlanIsSimplyValid(expr->expr_simple_plansource,
+									   expr->expr_simple_plan,
+									   (expr->expr_simple_plan_lxid != curlxid ?
+										estate->simple_eval_resowner : NULL))))
+	{
+		/*
+		 * It's still good, so just remember that we have a refcount on the
+		 * plan in the current transaction.  (If we already had one, this
+		 * assignment is a no-op.)
+		 */
+		expr->expr_simple_plan_lxid = curlxid;
+	}
+	else
+	{
+		/* Need to replan */
+		CachedPlan *cplan;
 
-	/*
-	 * We can't get a failure here, because the number of CachedPlanSources in
-	 * the SPI plan can't change from what exec_simple_check_plan saw; it's a
-	 * property of the raw parsetree generated from the query text.
-	 */
-	Assert(cplan != NULL);
+		/*
+		 * If we have a valid refcount on some previous version of the plan,
+		 * release it, so we don't leak plans intra-transaction.
+		 */
+		if (expr->expr_simple_plan_lxid == curlxid)
+		{
+			ResourceOwner saveResourceOwner = CurrentResourceOwner;
 
-	/* If it got replanned, update our copy of the simple expression */
-	if (cplan->generation != expr->expr_simple_generation)
-	{
+			CurrentResourceOwner = estate->simple_eval_resowner;
+			ReleaseCachedPlan(expr->expr_simple_plan, true);
+			CurrentResourceOwner = saveResourceOwner;
+			expr->expr_simple_plan = NULL;
+			expr->expr_simple_plan_lxid = InvalidLocalTransactionId;
+		}
+
+		/* Do the replanning work in the eval_mcontext */
+		oldcontext = MemoryContextSwitchTo(get_eval_mcontext(estate));
+		cplan = SPI_plan_get_cached_plan(expr->plan);
+		MemoryContextSwitchTo(oldcontext);
+
+		/*
+		 * We can't get a failure here, because the number of
+		 * CachedPlanSources in the SPI plan can't change from what
+		 * exec_simple_check_plan saw; it's a property of the raw parsetree
+		 * generated from the query text.
+		 */
+		Assert(cplan != NULL);
+
+		/*
+		 * These tests probably can't fail either, but if they do, cope by
+		 * declaring the plan to be non-simple.  On success, we'll acquire a
+		 * refcount on the new plan, stored in simple_eval_resowner.
+		 */
+		if (CachedPlanAllowsSimpleValidityCheck(expr->expr_simple_plansource,
+												cplan) &&
+			CachedPlanIsSimplyValid(expr->expr_simple_plansource, cplan,
+									estate->simple_eval_resowner))
+		{
+			/* Remember that we have the refcount */
+			expr->expr_simple_plan = cplan;
+			expr->expr_simple_plan_lxid = curlxid;
+		}
+		else
+		{
+			/* Release SPI_plan_get_cached_plan's refcount */
+			ReleaseCachedPlan(cplan, true);
+			/* Mark expression as non-simple, and fail */
+			expr->expr_simple_expr = NULL;
+			return false;
+		}
+
+		/*
+		 * SPI_plan_get_cached_plan acquired a plan refcount stored in the
+		 * active resowner.  We don't need that anymore, so release it.
+		 */
+		ReleaseCachedPlan(cplan, true);
+
+		/* Extract desired scalar expression from cached plan */
 		exec_save_simple_expr(expr, cplan);
+
 		/* better recheck r/w safety, as it could change due to inlining */
 		if (expr->rwparam >= 0)
 			exec_check_rw_parameter(expr, expr->rwparam);
@@ -6128,16 +6218,24 @@ exec_eval_simple_expr(PLpgSQL_execstate *estate,
 	 * Set up ParamListInfo to pass to executor.  For safety, save and restore
 	 * estate->paramLI->parserSetupArg around our use of the param list.
 	 */
-	save_setup_arg = estate->paramLI->parserSetupArg;
+	paramLI = estate->paramLI;
+	save_setup_arg = paramLI->parserSetupArg;
 
-	econtext->ecxt_param_list_info = setup_param_list(estate, expr);
+	/*
+	 * We can skip using setup_param_list() in favor of just doing this
+	 * unconditionally, because there's no need for the optimization of
+	 * possibly setting ecxt_param_list_info to NULL; we've already forced use
+	 * of a generic plan.
+	 */
+	paramLI->parserSetupArg = (void *) expr;
+	econtext->ecxt_param_list_info = paramLI;
 
 	/*
 	 * Prepare the expression for execution, if it's not been done already in
 	 * the current transaction.  (This will be forced to happen if we called
 	 * exec_save_simple_expr above.)
 	 */
-	if (expr->expr_simple_lxid != curlxid)
+	if (unlikely(expr->expr_simple_lxid != curlxid))
 	{
 		oldcontext = MemoryContextSwitchTo(estate->simple_eval_estate->es_query_cxt);
 		expr->expr_simple_state =
@@ -6185,7 +6283,7 @@ exec_eval_simple_expr(PLpgSQL_execstate *estate,
 
 	econtext->ecxt_param_list_info = NULL;
 
-	estate->paramLI->parserSetupArg = save_setup_arg;
+	paramLI->parserSetupArg = save_setup_arg;
 
 	if (need_snapshot)
 		PopActiveSnapshot();
@@ -6193,11 +6291,6 @@ exec_eval_simple_expr(PLpgSQL_execstate *estate,
 	MemoryContextSwitchTo(oldcontext);
 
 	/*
-	 * Now we can release our refcount on the cached plan.
-	 */
-	ReleaseCachedPlan(cplan, true);
-
-	/*
 	 * That's it.
 	 */
 	return true;
@@ -7984,10 +8077,35 @@ exec_simple_check_plan(PLpgSQL_execstate *estate, PLpgSQL_expr *expr)
 	/* Can't fail, because we checked for a single CachedPlanSource above */
 	Assert(cplan != NULL);
 
-	/* Share the remaining work with replan code path */
-	exec_save_simple_expr(expr, cplan);
+	/*
+	 * Verify that plancache.c thinks the plan is simple enough to use
+	 * CachedPlanIsSimplyValid.  Given the restrictions above, it's unlikely
+	 * that this could fail, but if it does, just treat plan as not simple.
+	 */
+	if (CachedPlanAllowsSimpleValidityCheck(plansource, cplan))
+	{
+		/*
+		 * OK, use CachedPlanIsSimplyValid to save a refcount on the plan in
+		 * the simple-expression resowner.  This shouldn't fail either, but if
+		 * somehow it does, again we can cope by treating plan as not simple.
+		 */
+		if (CachedPlanIsSimplyValid(plansource, cplan,
+									estate->simple_eval_resowner))
+		{
+			/* Remember that we have the refcount */
+			expr->expr_simple_plansource = plansource;
+			expr->expr_simple_plan = cplan;
+			expr->expr_simple_plan_lxid = MyProc->lxid;
+
+			/* Share the remaining work with the replan code path */
+			exec_save_simple_expr(expr, cplan);
+		}
+	}
 
-	/* Release our plan refcount */
+	/*
+	 * Release the plan refcount obtained by SPI_plan_get_cached_plan.  (This
+	 * refcount is held by the wrong resowner, so we can't just repurpose it.)
+	 */
 	ReleaseCachedPlan(cplan, true);
 }
 
@@ -8060,7 +8178,6 @@ exec_save_simple_expr(PLpgSQL_expr *expr, CachedPlan *cplan)
 	 * current transaction".
 	 */
 	expr->expr_simple_expr = tle_expr;
-	expr->expr_simple_generation = cplan->generation;
 	expr->expr_simple_state = NULL;
 	expr->expr_simple_in_use = false;
 	expr->expr_simple_lxid = InvalidLocalTransactionId;
@@ -8196,7 +8313,7 @@ exec_set_found(PLpgSQL_execstate *estate, bool state)
  *
  * We may need to create a new shared_simple_eval_estate too, if there's not
  * one already for the current transaction.  The EState will be cleaned up at
- * transaction end.
+ * transaction end.  Ditto for shared_simple_eval_resowner.
  */
 static void
 plpgsql_create_econtext(PLpgSQL_execstate *estate)
@@ -8208,8 +8325,11 @@ plpgsql_create_econtext(PLpgSQL_execstate *estate)
 	 * one already in the current transaction.  The EState is made a child of
 	 * TopTransactionContext so it will have the right lifespan.
 	 *
-	 * Note that this path is never taken when executing a DO block; the
-	 * required EState was already made by plpgsql_inline_handler.
+	 * Note that this path is never taken when beginning a DO block; the
+	 * required EState was already made by plpgsql_inline_handler.  However,
+	 * if the DO block executes COMMIT or ROLLBACK, then we'll come here and
+	 * make a shared EState to use for the rest of the DO block.  That's OK;
+	 * see the comments above struct SimpleEcontextStackEntry.
 	 */
 	if (estate->simple_eval_estate == NULL)
 	{
@@ -8225,6 +8345,18 @@ plpgsql_create_econtext(PLpgSQL_execstate *estate)
 	}
 
 	/*
+	 * Likewise for the simple-expression resource owner.
+	 */
+	if (estate->simple_eval_resowner == NULL)
+	{
+		if (shared_simple_eval_resowner == NULL)
+			shared_simple_eval_resowner =
+				ResourceOwnerCreate(TopTransactionResourceOwner,
+									"PL/pgSQL simple expressions");
+		estate->simple_eval_resowner = shared_simple_eval_resowner;
+	}
+
+	/*
 	 * Create a child econtext for the current function.
 	 */
 	estate->eval_econtext = CreateExprContext(estate->simple_eval_estate);
@@ -8270,29 +8402,40 @@ plpgsql_destroy_econtext(PLpgSQL_execstate *estate)
  * plpgsql_xact_cb --- post-transaction-commit-or-abort cleanup
  *
  * If a simple-expression EState was created in the current transaction,
- * it has to be cleaned up.
+ * it has to be cleaned up.  The same for the simple-expression resowner.
  */
 void
 plpgsql_xact_cb(XactEvent event, void *arg)
 {
 	/*
-	 * If we are doing a clean transaction shutdown, free the EState (so that
-	 * any remaining resources will be released correctly). In an abort, we
-	 * expect the regular abort recovery procedures to release everything of
-	 * interest.
+	 * If we are doing a clean transaction shutdown, free the EState and tell
+	 * the resowner to release whatever plancache references it has, so that
+	 * all remaining resources will be released correctly.  (We don't need to
+	 * actually delete the resowner here; deletion of the
+	 * TopTransactionResourceOwner will take care of that.)
+	 *
+	 * In an abort, we expect the regular abort recovery procedures to release
+	 * everything of interest, so just clear our pointers.
 	 */
-	if (event == XACT_EVENT_COMMIT || event == XACT_EVENT_PREPARE)
+	if (event == XACT_EVENT_COMMIT ||
+		event == XACT_EVENT_PARALLEL_COMMIT ||
+		event == XACT_EVENT_PREPARE)
 	{
 		simple_econtext_stack = NULL;
 
 		if (shared_simple_eval_estate)
 			FreeExecutorState(shared_simple_eval_estate);
 		shared_simple_eval_estate = NULL;
+		if (shared_simple_eval_resowner)
+			ResourceOwnerReleaseAllPlanCacheRefs(shared_simple_eval_resowner);
+		shared_simple_eval_resowner = NULL;
 	}
-	else if (event == XACT_EVENT_ABORT)
+	else if (event == XACT_EVENT_ABORT ||
+			 event == XACT_EVENT_PARALLEL_ABORT)
 	{
 		simple_econtext_stack = NULL;
 		shared_simple_eval_estate = NULL;
+		shared_simple_eval_resowner = NULL;
 	}
 }
 
diff --git a/src/pl/plpgsql/src/pl_handler.c b/src/pl/plpgsql/src/pl_handler.c
index b434818..97cf849 100644
--- a/src/pl/plpgsql/src/pl_handler.c
+++ b/src/pl/plpgsql/src/pl_handler.c
@@ -262,7 +262,9 @@ plpgsql_call_handler(PG_FUNCTION_ARGS)
 			retval = (Datum) 0;
 		}
 		else
-			retval = plpgsql_exec_function(func, fcinfo, NULL, !nonatomic);
+			retval = plpgsql_exec_function(func, fcinfo,
+										   NULL, NULL,
+										   !nonatomic);
 	}
 	PG_FINALLY();
 	{
@@ -297,6 +299,7 @@ plpgsql_inline_handler(PG_FUNCTION_ARGS)
 	PLpgSQL_function *func;
 	FmgrInfo	flinfo;
 	EState	   *simple_eval_estate;
+	ResourceOwner simple_eval_resowner;
 	Datum		retval;
 	int			rc;
 
@@ -323,22 +326,33 @@ plpgsql_inline_handler(PG_FUNCTION_ARGS)
 	flinfo.fn_oid = InvalidOid;
 	flinfo.fn_mcxt = CurrentMemoryContext;
 
-	/* Create a private EState for simple-expression execution */
+	/*
+	 * Create a private EState and resowner for simple-expression execution.
+	 * Notice that these are NOT tied to transaction-level resources; they
+	 * must survive any COMMIT/ROLLBACK the DO block executes, since we will
+	 * unconditionally try to clean them up below.  (Hence, be wary of adding
+	 * anything that could fail between here and the PG_TRY block.)
+	 */
 	simple_eval_estate = CreateExecutorState();
+	simple_eval_resowner =
+		ResourceOwnerCreate(NULL, "PL/pgSQL DO block simple expressions");
 
 	/* And run the function */
 	PG_TRY();
 	{
-		retval = plpgsql_exec_function(func, fake_fcinfo, simple_eval_estate, codeblock->atomic);
+		retval = plpgsql_exec_function(func, fake_fcinfo,
+									   simple_eval_estate,
+									   simple_eval_resowner,
+									   codeblock->atomic);
 	}
 	PG_CATCH();
 	{
 		/*
 		 * We need to clean up what would otherwise be long-lived resources
 		 * accumulated by the failed DO block, principally cached plans for
-		 * statements (which can be flushed with plpgsql_free_function_memory)
-		 * and execution trees for simple expressions, which are in the
-		 * private EState.
+		 * statements (which can be flushed by plpgsql_free_function_memory),
+		 * execution trees for simple expressions, which are in the private
+		 * EState, and cached-plan refcounts held by the private resowner.
 		 *
 		 * Before releasing the private EState, we must clean up any
 		 * simple_econtext_stack entries pointing into it, which we can do by
@@ -351,8 +365,10 @@ plpgsql_inline_handler(PG_FUNCTION_ARGS)
 						   GetCurrentSubTransactionId(),
 						   0, NULL);
 
-		/* Clean up the private EState */
+		/* Clean up the private EState and resowner */
 		FreeExecutorState(simple_eval_estate);
+		ResourceOwnerReleaseAllPlanCacheRefs(simple_eval_resowner);
+		ResourceOwnerDelete(simple_eval_resowner);
 
 		/* Function should now have no remaining use-counts ... */
 		func->use_count--;
@@ -366,8 +382,10 @@ plpgsql_inline_handler(PG_FUNCTION_ARGS)
 	}
 	PG_END_TRY();
 
-	/* Clean up the private EState */
+	/* Clean up the private EState and resowner */
 	FreeExecutorState(simple_eval_estate);
+	ResourceOwnerReleaseAllPlanCacheRefs(simple_eval_resowner);
+	ResourceOwnerDelete(simple_eval_resowner);
 
 	/* Function should now have no remaining use-counts ... */
 	func->use_count--;
diff --git a/src/pl/plpgsql/src/plpgsql.h b/src/pl/plpgsql/src/plpgsql.h
index 69df330..1af2595 100644
--- a/src/pl/plpgsql/src/plpgsql.h
+++ b/src/pl/plpgsql/src/plpgsql.h
@@ -231,12 +231,21 @@ typedef struct PLpgSQL_expr
 
 	/* fields for "simple expression" fast-path execution: */
 	Expr	   *expr_simple_expr;	/* NULL means not a simple expr */
-	int			expr_simple_generation; /* plancache generation we checked */
 	Oid			expr_simple_type;	/* result type Oid, if simple */
 	int32		expr_simple_typmod; /* result typmod, if simple */
 	bool		expr_simple_mutable;	/* true if simple expr is mutable */
 
 	/*
+	 * If the expression was ever determined to be simple, we remember its
+	 * CachedPlanSource and CachedPlan here.  If expr_simple_plan_lxid matches
+	 * current LXID, then we hold a refcount on expr_simple_plan in the
+	 * current transaction.  Otherwise we need to get one before re-using it.
+	 */
+	CachedPlanSource *expr_simple_plansource;	/* extracted from "plan" */
+	CachedPlan *expr_simple_plan;	/* extracted from "plan" */
+	LocalTransactionId expr_simple_plan_lxid;
+
+	/*
 	 * if expr is simple AND prepared in current transaction,
 	 * expr_simple_state and expr_simple_in_use are valid. Test validity by
 	 * seeing if expr_simple_lxid matches current LXID.  (If not,
@@ -1082,8 +1091,9 @@ typedef struct PLpgSQL_execstate
 	 */
 	ParamListInfo paramLI;
 
-	/* EState to use for "simple" expression evaluation */
+	/* EState and resowner to use for "simple" expression evaluation */
 	EState	   *simple_eval_estate;
+	ResourceOwner simple_eval_resowner;
 
 	/* lookup table to use for executing type casts */
 	HTAB	   *cast_hash;
@@ -1268,6 +1278,7 @@ extern void _PG_init(void);
 extern Datum plpgsql_exec_function(PLpgSQL_function *func,
 								   FunctionCallInfo fcinfo,
 								   EState *simple_eval_estate,
+								   ResourceOwner simple_eval_resowner,
 								   bool atomic);
 extern HeapTuple plpgsql_exec_trigger(PLpgSQL_function *func,
 									  TriggerData *trigdata);
