diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index 7bd37283b7..97585d272e 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -55,7 +55,8 @@ static void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan); static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, Snapshot snapshot, Snapshot crosscheck_snapshot, - bool read_only, bool fire_triggers, uint64 tcount); + bool read_only, bool fire_triggers, uint64 tcount, + DestReceiver *callback); static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes, Datum *Values, const char *Nulls); @@ -320,7 +321,34 @@ SPI_execute(const char *src, bool read_only, long tcount) res = _SPI_execute_plan(&plan, NULL, InvalidSnapshot, InvalidSnapshot, - read_only, true, tcount); + read_only, true, tcount, NULL); + + _SPI_end_call(true); + return res; +} +int +SPI_execute_callback(const char *src, bool read_only, long tcount, + DestReceiver *callback) +{ + _SPI_plan plan; + int res; + + if (src == NULL || tcount < 0) + return SPI_ERROR_ARGUMENT; + + res = _SPI_begin_call(true); + if (res < 0) + return res; + + memset(&plan, 0, sizeof(_SPI_plan)); + plan.magic = _SPI_PLAN_MAGIC; + plan.cursor_options = 0; + + _SPI_prepare_oneshot_plan(src, &plan); + + res = _SPI_execute_plan(&plan, NULL, + InvalidSnapshot, InvalidSnapshot, + read_only, true, tcount, callback); _SPI_end_call(true); return res; @@ -354,7 +382,34 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls, _SPI_convert_params(plan->nargs, plan->argtypes, Values, Nulls), InvalidSnapshot, InvalidSnapshot, - read_only, true, tcount); + read_only, true, tcount, NULL); + + _SPI_end_call(true); + return res; +} + +/* Execute a previously prepared plan with a callback Destination */ +int +SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls, + bool read_only, long tcount, DestReceiver *callback) +{ + int res; + + if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0) + return SPI_ERROR_ARGUMENT; + + if (plan->nargs > 0 && Values == NULL) + return SPI_ERROR_PARAM; + + res = _SPI_begin_call(true); + if (res < 0) + return res; + + res = _SPI_execute_plan(plan, + _SPI_convert_params(plan->nargs, plan->argtypes, + Values, Nulls), + InvalidSnapshot, InvalidSnapshot, + read_only, true, tcount, callback); _SPI_end_call(true); return res; @@ -383,7 +438,7 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params, res = _SPI_execute_plan(plan, params, InvalidSnapshot, InvalidSnapshot, - read_only, true, tcount); + read_only, true, tcount, NULL); _SPI_end_call(true); return res; @@ -424,7 +479,7 @@ SPI_execute_snapshot(SPIPlanPtr plan, _SPI_convert_params(plan->nargs, plan->argtypes, Values, Nulls), snapshot, crosscheck_snapshot, - read_only, fire_triggers, tcount); + read_only, fire_triggers, tcount, NULL); _SPI_end_call(true); return res; @@ -471,7 +526,7 @@ SPI_execute_with_args(const char *src, res = _SPI_execute_plan(&plan, paramLI, InvalidSnapshot, InvalidSnapshot, - read_only, true, tcount); + read_only, true, tcount, NULL); _SPI_end_call(true); return res; @@ -1892,7 +1947,8 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan) static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, Snapshot snapshot, Snapshot crosscheck_snapshot, - bool read_only, bool fire_triggers, uint64 tcount) + bool read_only, bool fire_triggers, uint64 tcount, + DestReceiver *callback) { int my_res = 0; uint64 my_processed = 0; @@ -1903,6 +1959,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, ErrorContextCallback spierrcontext; CachedPlan *cplan = NULL; ListCell *lc1; + DestReceiver *dest = callback; /* * Setup error traceback support for ereport() @@ -2020,7 +2077,6 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, { PlannedStmt *stmt = (PlannedStmt *) lfirst(lc2); bool canSetTag = stmt->canSetTag; - DestReceiver *dest; _SPI_current->processed = 0; _SPI_current->lastoid = InvalidOid; @@ -2065,7 +2121,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, UpdateActiveSnapshotCommandId(); } - dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone); + if (!callback) + dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone); if (stmt->utilityStmt == NULL) { @@ -2090,6 +2147,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, { char completionTag[COMPLETION_TAG_BUFSIZE]; + // XXX throw error if callback is set ProcessUtility(stmt, plansource->query_string, PROCESS_UTILITY_QUERY, diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c index da39f43f38..85141a0a0f 100644 --- a/src/backend/tcop/dest.c +++ b/src/backend/tcop/dest.c @@ -81,6 +81,11 @@ static DestReceiver spi_printtupDR = { DestSPI }; +static DestReceiver spi_callbackDR = { + donothingReceive, donothingStartup, donothingCleanup, donothingCleanup, + DestSPICallback +}; + /* Globally available receiver for DestNone */ DestReceiver *None_Receiver = &donothingDR; @@ -117,6 +122,9 @@ CreateDestReceiver(CommandDest dest) case DestSPI: return &spi_printtupDR; + case DestSPICallback: + return &spi_callbackDR; + case DestTuplestore: return CreateTuplestoreDestReceiver(); @@ -162,6 +170,7 @@ EndCommand(const char *commandTag, CommandDest dest) case DestNone: case DestDebug: case DestSPI: + case DestSPICallback: case DestTuplestore: case DestIntoRel: case DestCopyOut: @@ -205,6 +214,7 @@ NullCommand(CommandDest dest) case DestNone: case DestDebug: case DestSPI: + case DestSPICallback: case DestTuplestore: case DestIntoRel: case DestCopyOut: @@ -250,6 +260,7 @@ ReadyForQuery(CommandDest dest) case DestNone: case DestDebug: case DestSPI: + case DestSPICallback: case DestTuplestore: case DestIntoRel: case DestCopyOut: diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h index a18ae63245..d779511130 100644 --- a/src/include/executor/spi.h +++ b/src/include/executor/spi.h @@ -74,11 +74,15 @@ extern PGDLLIMPORT int SPI_result; extern int SPI_connect(void); extern int SPI_finish(void); extern int SPI_execute(const char *src, bool read_only, long tcount); +extern int SPI_execute_callback(const char *src, bool read_only, long tcount, + DestReceiver *callback); extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls, bool read_only, long tcount); extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params, bool read_only, long tcount); +extern int SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls, + bool read_only, long tcount, DestReceiver *callback); extern int SPI_exec(const char *src, long tcount); extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls, long tcount); diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h index 93f9b7463a..9a8c98e267 100644 --- a/src/include/tcop/dest.h +++ b/src/include/tcop/dest.h @@ -90,6 +90,7 @@ typedef enum DestRemote, /* results sent to frontend process */ DestRemoteExecute, /* sent to frontend, in Execute command */ DestSPI, /* results sent to SPI manager */ + DestSPICallback, /* results sent to user-specified callback function */ DestTuplestore, /* results sent to Tuplestore */ DestIntoRel, /* results sent to relation (SELECT INTO) */ DestCopyOut, /* results sent to COPY TO code */