*** a/src/backend/catalog/pg_proc.c --- b/src/backend/catalog/pg_proc.c *************** *** 832,838 **** fmgr_sql_validator(PG_FUNCTION_ARGS) proc->proargtypes.values, proc->pronargs); (void) check_sql_fn_retval(funcoid, proc->prorettype, ! querytree_list, NULL, NULL); } else --- 832,838 ---- proc->proargtypes.values, proc->pronargs); (void) check_sql_fn_retval(funcoid, proc->prorettype, ! llast(querytree_list), NULL, NULL); } else *** a/src/backend/executor/functions.c --- b/src/backend/executor/functions.c *************** *** 90,107 **** typedef struct ParamListInfo paramLI; /* Param list representing current args */ Tuplestorestate *tstore; /* where we accumulate result tuples */ JunkFilter *junkFilter; /* will be NULL if function returns VOID */ ! /* head of linked list of execution_state records */ ! execution_state *func_state; } SQLFunctionCache; typedef SQLFunctionCache *SQLFunctionCachePtr; /* non-export function prototypes */ ! static execution_state *init_execution_state(List *queryTree_list, SQLFunctionCachePtr fcache, bool lazyEvalOK); static void init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK); --- 90,107 ---- ParamListInfo paramLI; /* Param list representing current args */ Tuplestorestate *tstore; /* where we accumulate result tuples */ + Snapshot snapshot; JunkFilter *junkFilter; /* will be NULL if function returns VOID */ ! List *func_state; } SQLFunctionCache; typedef SQLFunctionCache *SQLFunctionCachePtr; /* non-export function prototypes */ ! static List *init_execution_state(List *queryTree_list, SQLFunctionCachePtr fcache, bool lazyEvalOK); static void init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK); *************** *** 123,183 **** static void sqlfunction_destroy(DestReceiver *self); /* Set up the list of per-query execution_state records for a SQL function */ ! static execution_state * init_execution_state(List *queryTree_list, SQLFunctionCachePtr fcache, bool lazyEvalOK) { ! execution_state *firstes = NULL; ! execution_state *preves = NULL; execution_state *lasttages = NULL; ! ListCell *qtl_item; ! foreach(qtl_item, queryTree_list) { ! Query *queryTree = (Query *) lfirst(qtl_item); ! Node *stmt; ! execution_state *newes; ! Assert(IsA(queryTree, Query)); ! if (queryTree->commandType == CMD_UTILITY) ! stmt = queryTree->utilityStmt; ! else ! stmt = (Node *) pg_plan_query(queryTree, 0, NULL); ! /* Precheck all commands for validity in a function */ ! if (IsA(stmt, TransactionStmt)) ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! /* translator: %s is a SQL statement name */ ! errmsg("%s is not allowed in a SQL function", ! CreateCommandTag(stmt)))); ! if (fcache->readonly_func && !CommandIsReadOnly(stmt)) ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! /* translator: %s is a SQL statement name */ ! errmsg("%s is not allowed in a non-volatile function", ! CreateCommandTag(stmt)))); ! newes = (execution_state *) palloc(sizeof(execution_state)); ! if (preves) ! preves->next = newes; ! else ! firstes = newes; ! newes->next = NULL; ! newes->status = F_EXEC_START; ! newes->setsResult = false; /* might change below */ ! newes->lazyEval = false; /* might change below */ ! newes->stmt = stmt; ! newes->qd = NULL; ! if (queryTree->canSetTag) ! lasttages = newes; ! preves = newes; } /* --- 123,200 ---- /* Set up the list of per-query execution_state records for a SQL function */ ! static List * init_execution_state(List *queryTree_list, SQLFunctionCachePtr fcache, bool lazyEvalOK) { ! execution_state *firstes; ! execution_state *preves; execution_state *lasttages = NULL; ! List *eslist; ! ListCell *lc1; ! ListCell *lc2; ! List *qtlist; ! Query *queryTree; ! ! ! eslist = NIL; ! foreach(lc1, queryTree_list) { ! qtlist = (List *) lfirst(lc1); ! firstes = NULL; ! preves = NULL; ! foreach(lc2, qtlist) ! { ! Node *stmt; ! execution_state *newes; ! queryTree = (Query *) lfirst(lc2); ! Assert(IsA(queryTree, Query)); ! if (queryTree->commandType == CMD_UTILITY) ! stmt = queryTree->utilityStmt; ! else ! stmt = (Node *) pg_plan_query(queryTree, 0, NULL); ! /* Precheck all commands for validity in a function */ ! if (IsA(stmt, TransactionStmt)) ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! /* translator: %s is a SQL statement name */ ! errmsg("%s is not allowed in a SQL function", ! CreateCommandTag(stmt)))); ! if (fcache->readonly_func && !CommandIsReadOnly(stmt)) ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! /* translator: %s is a SQL statement name */ ! errmsg("%s is not allowed in a non-volatile function", ! CreateCommandTag(stmt)))); ! ! newes = (execution_state *) palloc(sizeof(execution_state)); ! if (preves) ! preves->next = newes; ! else ! firstes = newes; ! newes->next = NULL; ! newes->status = F_EXEC_START; ! newes->setsResult = false; /* might change below */ ! newes->lazyEval = false; /* might change below */ ! newes->stmt = stmt; ! newes->qd = NULL; ! if (queryTree->canSetTag) ! lasttages = newes; ! ! preves = newes; ! } ! ! eslist = lappend(eslist, firstes); } /* *************** *** 210,216 **** init_execution_state(List *queryTree_list, } } ! return firstes; } /* Initialize the SQLFunctionCache for a SQL function */ --- 227,233 ---- } } ! return eslist; } /* Initialize the SQLFunctionCache for a SQL function */ *************** *** 342,348 **** init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK) */ fcache->returnsTuple = check_sql_fn_retval(foid, rettype, ! queryTree_list, NULL, &fcache->junkFilter); --- 359,365 ---- */ fcache->returnsTuple = check_sql_fn_retval(foid, rettype, ! llast(queryTree_list), NULL, &fcache->junkFilter); *************** *** 374,397 **** init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK) static void postquel_start(execution_state *es, SQLFunctionCachePtr fcache) { - Snapshot snapshot; DestReceiver *dest; Assert(es->qd == NULL); ! /* ! * In a read-only function, use the surrounding query's snapshot; ! * otherwise take a new snapshot for each query. The snapshot should ! * include a fresh command ID so that all work to date in this transaction ! * is visible. ! */ ! if (fcache->readonly_func) ! snapshot = GetActiveSnapshot(); ! else ! { ! CommandCounterIncrement(); ! snapshot = GetTransactionSnapshot(); ! } /* * If this query produces the function result, send its output to the --- 391,401 ---- static void postquel_start(execution_state *es, SQLFunctionCachePtr fcache) { DestReceiver *dest; Assert(es->qd == NULL); ! Assert(ActiveSnapshotSet()); /* * If this query produces the function result, send its output to the *************** *** 415,427 **** postquel_start(execution_state *es, SQLFunctionCachePtr fcache) if (IsA(es->stmt, PlannedStmt)) es->qd = CreateQueryDesc((PlannedStmt *) es->stmt, fcache->src, ! snapshot, InvalidSnapshot, dest, fcache->paramLI, 0); else es->qd = CreateUtilityQueryDesc(es->stmt, fcache->src, ! snapshot, dest, fcache->paramLI); --- 419,432 ---- if (IsA(es->stmt, PlannedStmt)) es->qd = CreateQueryDesc((PlannedStmt *) es->stmt, fcache->src, ! GetActiveSnapshot(), ! InvalidSnapshot, dest, fcache->paramLI, 0); else es->qd = CreateUtilityQueryDesc(es->stmt, fcache->src, ! GetActiveSnapshot(), dest, fcache->paramLI); *************** *** 617,622 **** fmgr_sql(PG_FUNCTION_ARGS) --- 622,629 ---- execution_state *es; TupleTableSlot *slot; Datum result; + List *eslist; + ListCell *eslc; /* * Switch to context in which the fcache lives. This ensures that *************** *** 668,680 **** fmgr_sql(PG_FUNCTION_ARGS) init_sql_fcache(fcinfo->flinfo, lazyEvalOK); fcache = (SQLFunctionCachePtr) fcinfo->flinfo->fn_extra; } ! es = fcache->func_state; /* * Convert params to appropriate format if starting a fresh execution. (If * continuing execution, we can re-use prior params.) */ ! if (es && es->status == F_EXEC_START) postquel_sub_params(fcache, fcinfo); /* --- 675,687 ---- init_sql_fcache(fcinfo->flinfo, lazyEvalOK); fcache = (SQLFunctionCachePtr) fcinfo->flinfo->fn_extra; } ! eslist = fcache->func_state; /* * Convert params to appropriate format if starting a fresh execution. (If * continuing execution, we can re-use prior params.) */ ! if (linitial(eslist) && ((execution_state *) linitial(eslist))->status == F_EXEC_START) postquel_sub_params(fcache, fcinfo); /* *************** *** 687,694 **** fmgr_sql(PG_FUNCTION_ARGS) /* * Find first unfinished query in function. */ ! while (es && es->status == F_EXEC_DONE) ! es = es->next; /* * Execute each command in the function one after another until we either --- 694,709 ---- /* * Find first unfinished query in function. */ ! foreach(eslc, eslist) ! { ! es = (execution_state *) lfirst(eslc); ! ! while (es && es->status == F_EXEC_DONE) ! es = es->next; ! ! if (es) ! break; ! } /* * Execute each command in the function one after another until we either *************** *** 699,706 **** fmgr_sql(PG_FUNCTION_ARGS) --- 714,744 ---- bool completed; if (es->status == F_EXEC_START) + { + if (!fcache->readonly_func) + { + /* + * In a read-only function, use the surrounding query's snapshot; + * otherwise take a new snapshot if we don't have one yet. The + * snapshot should include a fresh command ID so that all work to + * date in this transaction is visible. + */ + if (!fcache->snapshot) + { + CommandCounterIncrement(); + fcache->snapshot = RegisterSnapshot(GetTransactionSnapshot()); + PushActiveSnapshot(fcache->snapshot); + } + else + PushUpdatedSnapshot(fcache->snapshot); + } + postquel_start(es, fcache); + if (!fcache->readonly_func) + PopActiveSnapshot(); + } + completed = postquel_getnext(es, fcache); /* *************** *** 726,731 **** fmgr_sql(PG_FUNCTION_ARGS) --- 764,788 ---- if (es->status != F_EXEC_DONE) break; es = es->next; + + if (!es) + { + eslc = lnext(eslc); + if (!eslc) + break; + + es = (execution_state *) lfirst(eslc); + + /* make sure we take a new snapshot for this query list */ + if (!fcache->readonly_func) + { + Assert(fcache->snapshot != InvalidSnapshot); + UnregisterSnapshot(fcache->snapshot); + fcache->snapshot = InvalidSnapshot; + } + else + Assert(fcache->snapshot == InvalidSnapshot); + } } /* *************** *** 794,799 **** fmgr_sql(PG_FUNCTION_ARGS) --- 851,861 ---- PointerGetDatum(fcache)); fcache->shutdown_reg = false; } + + /* Unregister snapshot if we have one */ + if (fcache->snapshot != InvalidSnapshot) + UnregisterSnapshot(fcache->snapshot); + fcache->snapshot = InvalidSnapshot; } else { *************** *** 820,825 **** fmgr_sql(PG_FUNCTION_ARGS) --- 882,892 ---- PointerGetDatum(fcache)); fcache->shutdown_reg = false; } + + /* Unregister snapshot if we have one */ + if (fcache->snapshot != InvalidSnapshot) + UnregisterSnapshot(fcache->snapshot); + fcache->snapshot = InvalidSnapshot; } } else *************** *** 850,855 **** fmgr_sql(PG_FUNCTION_ARGS) --- 917,927 ---- /* Clear the tuplestore, but keep it for next time */ tuplestore_clear(fcache->tstore); + + /* Unregister snapshot if we have one */ + if (fcache->snapshot != InvalidSnapshot) + UnregisterSnapshot(fcache->snapshot); + fcache->snapshot = InvalidSnapshot; } /* *************** *** 858,868 **** fmgr_sql(PG_FUNCTION_ARGS) */ if (es == NULL) { ! es = fcache->func_state; ! while (es) { ! es->status = F_EXEC_START; ! es = es->next; } } --- 930,943 ---- */ if (es == NULL) { ! foreach(eslc, fcache->func_state) { ! es = (execution_state *) lfirst(eslc); ! while (es) ! { ! es->status = F_EXEC_START; ! es = es->next; ! } } } *************** *** 913,931 **** sql_exec_error_callback(void *arg) { execution_state *es; int query_num; - es = fcache->func_state; query_num = 1; ! while (es) { ! if (es->qd) { ! errcontext("SQL function \"%s\" statement %d", ! fcache->fname, query_num); ! break; } - es = es->next; - query_num++; } if (es == NULL) { --- 988,1011 ---- { execution_state *es; int query_num; + ListCell *lc; query_num = 1; ! ! foreach(lc, fcache->func_state) { ! es = (execution_state *) lfirst(lc); ! while (es) { ! if (es->qd) ! { ! errcontext("SQL function \"%s\" statement %d", ! fcache->fname, query_num); ! break; ! } ! es = es->next; ! query_num++; } } if (es == NULL) { *************** *** 956,973 **** static void ShutdownSQLFunction(Datum arg) { SQLFunctionCachePtr fcache = (SQLFunctionCachePtr) DatumGetPointer(arg); ! execution_state *es = fcache->func_state; ! while (es != NULL) { ! /* Shut down anything still running */ ! if (es->status == F_EXEC_RUN) ! postquel_end(es); ! /* Reset states to START in case we're called again */ ! es->status = F_EXEC_START; ! es = es->next; } /* Release tuplestore if we have one */ if (fcache->tstore) tuplestore_end(fcache->tstore); --- 1036,1064 ---- ShutdownSQLFunction(Datum arg) { SQLFunctionCachePtr fcache = (SQLFunctionCachePtr) DatumGetPointer(arg); ! execution_state *es; ! ListCell *lc; ! foreach(lc, fcache->func_state) { ! es = (execution_state *) lfirst(lc); ! ! while (es) ! { ! /* Shut down anything still running */ ! if (es->status == F_EXEC_RUN) ! postquel_end(es); ! /* Reset states to START in case we're called again */ ! es->status = F_EXEC_START; ! es = es->next; ! } } + /* Unregister snapshot if we have one */ + if (fcache->snapshot != InvalidSnapshot) + UnregisterSnapshot(fcache->snapshot); + fcache->snapshot = InvalidSnapshot; + /* Release tuplestore if we have one */ if (fcache->tstore) tuplestore_end(fcache->tstore); *** a/src/backend/executor/spi.c --- b/src/backend/executor/spi.c *************** *** 1769,1774 **** _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, --- 1769,1775 ---- SPITupleTable *my_tuptable = NULL; int res = 0; bool have_active_snap = ActiveSnapshotSet(); + bool registered_snap = false; ErrorContextCallback spierrcontext; CachedPlan *cplan = NULL; ListCell *lc1; *************** *** 1872,1879 **** _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, } else { ! PushActiveSnapshot(GetTransactionSnapshot()); pushed_active_snap = true; } } else --- 1873,1882 ---- } else { ! snapshot = RegisterSnapshot(GetTransactionSnapshot()); ! PushActiveSnapshot(snapshot); pushed_active_snap = true; + registered_snap = true; } } else *************** *** 1966,1975 **** _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, --- 1969,1991 ---- */ if (!read_only) CommandCounterIncrement(); + + /* + * If we took a new snapshot for this query list, unregister it and + * make sure we take a new one for the next list. + */ + if (registered_snap) + { + UnregisterSnapshot(snapshot); + snapshot = InvalidSnapshot; + } } fail: + if (registered_snap) + UnregisterSnapshot(snapshot); + /* We no longer need the cached plan refcount, if any */ if (cplan) ReleaseCachedPlan(cplan, true); *** a/src/backend/tcop/postgres.c --- b/src/backend/tcop/postgres.c *************** *** 537,547 **** pg_parse_and_rewrite(const char *query_string, /* string to execute */ { Node *parsetree = (Node *) lfirst(list_item); ! querytree_list = list_concat(querytree_list, ! pg_analyze_and_rewrite(parsetree, ! query_string, ! paramTypes, ! numParams)); } return querytree_list; --- 537,547 ---- { Node *parsetree = (Node *) lfirst(list_item); ! querytree_list = lappend(querytree_list, ! pg_analyze_and_rewrite(parsetree, ! query_string, ! paramTypes, ! numParams)); } return querytree_list; *** a/src/backend/tcop/pquery.c --- b/src/backend/tcop/pquery.c *************** *** 170,180 **** ProcessQuery(PlannedStmt *plan, elog(DEBUG3, "ProcessQuery"); /* - * Must always set a snapshot for plannable queries. - */ - PushActiveSnapshot(GetTransactionSnapshot()); - - /* * Create the QueryDesc object */ queryDesc = CreateQueryDesc(plan, sourceText, --- 170,175 ---- *************** *** 234,241 **** ProcessQuery(PlannedStmt *plan, /* Now take care of any queued AFTER triggers */ AfterTriggerEndQuery(queryDesc->estate); - PopActiveSnapshot(); - /* * Now, we close down all the scans and free allocated resources. */ --- 229,234 ---- *************** *** 1220,1225 **** PortalRunMulti(Portal portal, bool isTopLevel, --- 1213,1219 ---- char *completionTag) { ListCell *stmtlist_item; + Snapshot snapshot = InvalidSnapshot; /* * If the destination is DestRemoteExecute, change to DestNone. The *************** *** 1262,1267 **** PortalRunMulti(Portal portal, bool isTopLevel, --- 1256,1270 ---- if (log_executor_stats) ResetUsage(); + /* if no snapshot is set, grab a new one and register it */ + if (snapshot == InvalidSnapshot) + { + snapshot = RegisterSnapshot(GetTransactionSnapshot()); + PushActiveSnapshot(snapshot); + } + else + PushUpdatedSnapshot(snapshot); + if (pstmt->canSetTag) { /* statement can set tag string */ *************** *** 1279,1284 **** PortalRunMulti(Portal portal, bool isTopLevel, --- 1282,1289 ---- altdest, NULL); } + PopActiveSnapshot(); + if (log_executor_stats) ShowUsage("EXECUTOR STATISTICS"); *************** *** 1291,1301 **** PortalRunMulti(Portal portal, bool isTopLevel, --- 1296,1320 ---- * * These are assumed canSetTag if they're the only stmt in the * portal. + * + * NotifyStmt is the only utility statement allowed in a list of + * rewritten queries, and it doesn't need a snapshot so we don't + * need to worry about it. However, if the list has only one + * statement and it's a utility statement, we are not allowed to + * take a snapshot. See the first comment in PortalRunUtility(). */ if (list_length(portal->stmts) == 1) + { + Assert(snapshot == InvalidSnapshot); + PortalRunUtility(portal, stmt, isTopLevel, dest, completionTag); + } else + { + Assert(IsA(stmt, NotifyStmt)); + PortalRunUtility(portal, stmt, isTopLevel, altdest, NULL); + } } /* *************** *** 1313,1318 **** PortalRunMulti(Portal portal, bool isTopLevel, --- 1332,1340 ---- MemoryContextDeleteChildren(PortalGetHeapMemory(portal)); } + if (snapshot != InvalidSnapshot) + UnregisterSnapshot(snapshot); + /* * If a command completion tag was supplied, use it. Otherwise use the * portal's commandTag as the default completion tag.