From 0a2ef661f55a047763a43b0eebd7483760e4a427 Mon Sep 17 00:00:00 2001 From: Jim Nasby Date: Wed, 5 Apr 2017 20:52:39 -0500 Subject: [PATCH 1/2] Add SPI_execute_callback Instead of placing results in a tuplestore, this method of execution uses the supplied callback when creating the Portal for a query. --- src/backend/executor/spi.c | 80 ++++++++++++++++++++++++++++++++++++++++------ src/backend/tcop/dest.c | 15 +++++++++ src/include/executor/spi.h | 4 +++ src/include/tcop/dest.h | 1 + 4 files changed, 90 insertions(+), 10 deletions(-) diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index ca547dc6d9..4f6c3011f9 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -55,7 +55,8 @@ static void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan); static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, Snapshot snapshot, Snapshot crosscheck_snapshot, - bool read_only, bool fire_triggers, uint64 tcount); + bool read_only, bool fire_triggers, uint64 tcount, + DestReceiver *callback); static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes, Datum *Values, const char *Nulls); @@ -321,7 +322,35 @@ SPI_execute(const char *src, bool read_only, long tcount) res = _SPI_execute_plan(&plan, NULL, InvalidSnapshot, InvalidSnapshot, - read_only, true, tcount); + read_only, true, tcount, NULL); + + _SPI_end_call(true); + return res; +} + +int +SPI_execute_callback(const char *src, bool read_only, long tcount, + DestReceiver *callback) +{ + _SPI_plan plan; + int res; + + if (src == NULL || tcount < 0) + return SPI_ERROR_ARGUMENT; + + res = _SPI_begin_call(true); + if (res < 0) + return res; + + memset(&plan, 0, sizeof(_SPI_plan)); + plan.magic = _SPI_PLAN_MAGIC; + plan.cursor_options = 0; + + _SPI_prepare_oneshot_plan(src, &plan); + + res = _SPI_execute_plan(&plan, NULL, + InvalidSnapshot, InvalidSnapshot, + read_only, true, tcount, callback); _SPI_end_call(true); return res; @@ -355,7 +384,34 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls, _SPI_convert_params(plan->nargs, plan->argtypes, Values, Nulls), InvalidSnapshot, InvalidSnapshot, - read_only, true, tcount); + read_only, true, tcount, NULL); + + _SPI_end_call(true); + return res; +} + +/* Execute a previously prepared plan with a callback */ +int +SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls, + bool read_only, long tcount, DestReceiver *callback) +{ + int res; + + if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0) + return SPI_ERROR_ARGUMENT; + + if (plan->nargs > 0 && Values == NULL) + return SPI_ERROR_PARAM; + + res = _SPI_begin_call(true); + if (res < 0) + return res; + + res = _SPI_execute_plan(plan, + _SPI_convert_params(plan->nargs, plan->argtypes, + Values, Nulls), + InvalidSnapshot, InvalidSnapshot, + read_only, true, tcount, callback); _SPI_end_call(true); return res; @@ -384,7 +440,7 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params, res = _SPI_execute_plan(plan, params, InvalidSnapshot, InvalidSnapshot, - read_only, true, tcount); + read_only, true, tcount, NULL); _SPI_end_call(true); return res; @@ -425,7 +481,7 @@ SPI_execute_snapshot(SPIPlanPtr plan, _SPI_convert_params(plan->nargs, plan->argtypes, Values, Nulls), snapshot, crosscheck_snapshot, - read_only, fire_triggers, tcount); + read_only, fire_triggers, tcount, NULL); _SPI_end_call(true); return res; @@ -472,7 +528,7 @@ SPI_execute_with_args(const char *src, res = _SPI_execute_plan(&plan, paramLI, InvalidSnapshot, InvalidSnapshot, - read_only, true, tcount); + read_only, true, tcount, NULL); _SPI_end_call(true); return res; @@ -1907,7 +1963,8 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan) static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, Snapshot snapshot, Snapshot crosscheck_snapshot, - bool read_only, bool fire_triggers, uint64 tcount) + bool read_only, bool fire_triggers, uint64 tcount, + DestReceiver *callback) { int my_res = 0; uint64 my_processed = 0; @@ -1918,6 +1975,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, ErrorContextCallback spierrcontext; CachedPlan *cplan = NULL; ListCell *lc1; + DestReceiver *dest = callback; /* * Setup error traceback support for ereport() @@ -2037,7 +2095,6 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, { PlannedStmt *stmt = castNode(PlannedStmt, lfirst(lc2)); bool canSetTag = stmt->canSetTag; - DestReceiver *dest; _SPI_current->processed = 0; _SPI_current->lastoid = InvalidOid; @@ -2082,7 +2139,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, UpdateActiveSnapshotCommandId(); } - dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone); + if (!callback) + dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone); if (stmt->utilityStmt == NULL) { @@ -2108,6 +2166,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, { char completionTag[COMPLETION_TAG_BUFSIZE]; + Assert(!callback); ProcessUtility(stmt, plansource->query_string, PROCESS_UTILITY_QUERY, @@ -2281,7 +2340,8 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount) switch (operation) { case CMD_SELECT: - if (queryDesc->dest->mydest != DestSPI) + if (queryDesc->dest->mydest != DestSPI && + queryDesc->dest->mydest != DestSPICallback) { /* Don't return SPI_OK_SELECT if we're discarding result */ res = SPI_OK_UTILITY; diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c index 28081c3765..f68b6e1b51 100644 --- a/src/backend/tcop/dest.c +++ b/src/backend/tcop/dest.c @@ -87,6 +87,15 @@ static DestReceiver spi_printtupDR = { DestSPI }; +/* + * This is strictly a starting point for creating a callback. It should not + * actually be used. + */ +static DestReceiver spi_callbackDR = { + donothingReceive, donothingStartup, donothingCleanup, donothingCleanup, + DestSPICallback +}; + /* Globally available receiver for DestNone */ DestReceiver *None_Receiver = &donothingDR; @@ -126,6 +135,9 @@ CreateDestReceiver(CommandDest dest) case DestSPI: return &spi_printtupDR; + case DestSPICallback: + return &spi_callbackDR; + case DestTuplestore: return CreateTuplestoreDestReceiver(); @@ -172,6 +184,7 @@ EndCommand(const char *commandTag, CommandDest dest) case DestNone: case DestDebug: case DestSPI: + case DestSPICallback: case DestTuplestore: case DestIntoRel: case DestCopyOut: @@ -216,6 +229,7 @@ NullCommand(CommandDest dest) case DestNone: case DestDebug: case DestSPI: + case DestSPICallback: case DestTuplestore: case DestIntoRel: case DestCopyOut: @@ -262,6 +276,7 @@ ReadyForQuery(CommandDest dest) case DestNone: case DestDebug: case DestSPI: + case DestSPICallback: case DestTuplestore: case DestIntoRel: case DestCopyOut: diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h index 94a805d477..13719e1df5 100644 --- a/src/include/executor/spi.h +++ b/src/include/executor/spi.h @@ -80,11 +80,15 @@ extern PGDLLIMPORT int SPI_result; extern int SPI_connect(void); extern int SPI_finish(void); extern int SPI_execute(const char *src, bool read_only, long tcount); +extern int SPI_execute_callback(const char *src, bool read_only, long tcount, + DestReceiver *callback); extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls, bool read_only, long tcount); extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params, bool read_only, long tcount); +extern int SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls, + bool read_only, long tcount, DestReceiver *callback); extern int SPI_exec(const char *src, long tcount); extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls, long tcount); diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h index c459af2e13..1d1d641ae0 100644 --- a/src/include/tcop/dest.h +++ b/src/include/tcop/dest.h @@ -91,6 +91,7 @@ typedef enum DestRemoteExecute, /* sent to frontend, in Execute command */ DestRemoteSimple, /* sent to frontend, w/no catalog access */ DestSPI, /* results sent to SPI manager */ + DestSPICallback, /* results sent to user-specified callback function */ DestTuplestore, /* results sent to Tuplestore */ DestIntoRel, /* results sent to relation (SELECT INTO) */ DestCopyOut, /* results sent to COPY TO code */ -- 2.11.1