>From 7ef3e944c1ee8266d70fafae080afc6beb492102 Mon Sep 17 00:00:00 2001 From: Jim Nasby Date: Wed, 25 Jan 2017 12:57:40 -0600 Subject: [PATCH 1/2] Add SPI_execute_callback() and callback-based DestReceiver. Instead of placing results in a tuplestore, this method of execution uses the supplied callback when creating the Portal for a query. --- src/backend/executor/spi.c | 76 ++++++++++++++++++++++++++++++++++++++++------ src/backend/tcop/dest.c | 11 +++++++ src/include/executor/spi.h | 4 +++ src/include/tcop/dest.h | 1 + 4 files changed, 83 insertions(+), 9 deletions(-) diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index 55f97b14e6..d55e06509f 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -55,7 +55,8 @@ static void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan); static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, Snapshot snapshot, Snapshot crosscheck_snapshot, - bool read_only, bool fire_triggers, uint64 tcount); + bool read_only, bool fire_triggers, uint64 tcount, + DestReceiver *callback); static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes, Datum *Values, const char *Nulls); @@ -320,7 +321,34 @@ SPI_execute(const char *src, bool read_only, long tcount) res = _SPI_execute_plan(&plan, NULL, InvalidSnapshot, InvalidSnapshot, - read_only, true, tcount); + read_only, true, tcount, NULL); + + _SPI_end_call(true); + return res; +} +int +SPI_execute_callback(const char *src, bool read_only, long tcount, + DestReceiver *callback) +{ + _SPI_plan plan; + int res; + + if (src == NULL || tcount < 0) + return SPI_ERROR_ARGUMENT; + + res = _SPI_begin_call(true); + if (res < 0) + return res; + + memset(&plan, 0, sizeof(_SPI_plan)); + plan.magic = _SPI_PLAN_MAGIC; + plan.cursor_options = 0; + + _SPI_prepare_oneshot_plan(src, &plan); + + res = _SPI_execute_plan(&plan, NULL, + InvalidSnapshot, InvalidSnapshot, + read_only, true, tcount, callback); _SPI_end_call(true); return res; @@ -354,7 +382,34 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls, _SPI_convert_params(plan->nargs, plan->argtypes, Values, Nulls), InvalidSnapshot, InvalidSnapshot, - read_only, true, tcount); + read_only, true, tcount, NULL); + + _SPI_end_call(true); + return res; +} + +/* Execute a previously prepared plan with a callback Destination */ +int +SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls, + bool read_only, long tcount, DestReceiver *callback) +{ + int res; + + if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0) + return SPI_ERROR_ARGUMENT; + + if (plan->nargs > 0 && Values == NULL) + return SPI_ERROR_PARAM; + + res = _SPI_begin_call(true); + if (res < 0) + return res; + + res = _SPI_execute_plan(plan, + _SPI_convert_params(plan->nargs, plan->argtypes, + Values, Nulls), + InvalidSnapshot, InvalidSnapshot, + read_only, true, tcount, callback); _SPI_end_call(true); return res; @@ -383,7 +438,7 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params, res = _SPI_execute_plan(plan, params, InvalidSnapshot, InvalidSnapshot, - read_only, true, tcount); + read_only, true, tcount, NULL); _SPI_end_call(true); return res; @@ -424,7 +479,7 @@ SPI_execute_snapshot(SPIPlanPtr plan, _SPI_convert_params(plan->nargs, plan->argtypes, Values, Nulls), snapshot, crosscheck_snapshot, - read_only, fire_triggers, tcount); + read_only, fire_triggers, tcount, NULL); _SPI_end_call(true); return res; @@ -471,7 +526,7 @@ SPI_execute_with_args(const char *src, res = _SPI_execute_plan(&plan, paramLI, InvalidSnapshot, InvalidSnapshot, - read_only, true, tcount); + read_only, true, tcount, NULL); _SPI_end_call(true); return res; @@ -1892,7 +1947,8 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan) static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, Snapshot snapshot, Snapshot crosscheck_snapshot, - bool read_only, bool fire_triggers, uint64 tcount) + bool read_only, bool fire_triggers, uint64 tcount, + DestReceiver *callback) { int my_res = 0; uint64 my_processed = 0; @@ -1903,6 +1959,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, ErrorContextCallback spierrcontext; CachedPlan *cplan = NULL; ListCell *lc1; + DestReceiver *dest = callback; /* * Setup error traceback support for ereport() @@ -2020,7 +2077,6 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, { PlannedStmt *stmt = castNode(PlannedStmt, lfirst(lc2)); bool canSetTag = stmt->canSetTag; - DestReceiver *dest; _SPI_current->processed = 0; _SPI_current->lastoid = InvalidOid; @@ -2065,7 +2121,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, UpdateActiveSnapshotCommandId(); } - dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone); + if (!callback) + dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone); if (stmt->utilityStmt == NULL) { @@ -2090,6 +2147,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, { char completionTag[COMPLETION_TAG_BUFSIZE]; + // XXX throw error if callback is set ProcessUtility(stmt, plansource->query_string, PROCESS_UTILITY_QUERY, diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c index 28081c3765..bd671e0b26 100644 --- a/src/backend/tcop/dest.c +++ b/src/backend/tcop/dest.c @@ -87,6 +87,11 @@ static DestReceiver spi_printtupDR = { DestSPI }; +static DestReceiver spi_callbackDR = { + donothingReceive, donothingStartup, donothingCleanup, donothingCleanup, + DestSPICallback +}; + /* Globally available receiver for DestNone */ DestReceiver *None_Receiver = &donothingDR; @@ -126,6 +131,9 @@ CreateDestReceiver(CommandDest dest) case DestSPI: return &spi_printtupDR; + case DestSPICallback: + return &spi_callbackDR; + case DestTuplestore: return CreateTuplestoreDestReceiver(); @@ -172,6 +180,7 @@ EndCommand(const char *commandTag, CommandDest dest) case DestNone: case DestDebug: case DestSPI: + case DestSPICallback: case DestTuplestore: case DestIntoRel: case DestCopyOut: @@ -216,6 +225,7 @@ NullCommand(CommandDest dest) case DestNone: case DestDebug: case DestSPI: + case DestSPICallback: case DestTuplestore: case DestIntoRel: case DestCopyOut: @@ -262,6 +272,7 @@ ReadyForQuery(CommandDest dest) case DestNone: case DestDebug: case DestSPI: + case DestSPICallback: case DestTuplestore: case DestIntoRel: case DestCopyOut: diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h index a18ae63245..d779511130 100644 --- a/src/include/executor/spi.h +++ b/src/include/executor/spi.h @@ -74,11 +74,15 @@ extern PGDLLIMPORT int SPI_result; extern int SPI_connect(void); extern int SPI_finish(void); extern int SPI_execute(const char *src, bool read_only, long tcount); +extern int SPI_execute_callback(const char *src, bool read_only, long tcount, + DestReceiver *callback); extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls, bool read_only, long tcount); extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params, bool read_only, long tcount); +extern int SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls, + bool read_only, long tcount, DestReceiver *callback); extern int SPI_exec(const char *src, long tcount); extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls, long tcount); diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h index c459af2e13..1d1d641ae0 100644 --- a/src/include/tcop/dest.h +++ b/src/include/tcop/dest.h @@ -91,6 +91,7 @@ typedef enum DestRemoteExecute, /* sent to frontend, in Execute command */ DestRemoteSimple, /* sent to frontend, w/no catalog access */ DestSPI, /* results sent to SPI manager */ + DestSPICallback, /* results sent to user-specified callback function */ DestTuplestore, /* results sent to Tuplestore */ DestIntoRel, /* results sent to relation (SELECT INTO) */ DestCopyOut, /* results sent to COPY TO code */ -- 2.11.1