diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index 80fc4c4725..ace30d75f0 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -55,7 +55,8 @@ static void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan); static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, Snapshot snapshot, Snapshot crosscheck_snapshot, - bool read_only, bool fire_triggers, uint64 tcount); + bool read_only, bool fire_triggers, uint64 tcount, + DestReceiver *callback); static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes, Datum *Values, const char *Nulls); @@ -320,7 +321,34 @@ SPI_execute(const char *src, bool read_only, long tcount) res = _SPI_execute_plan(&plan, NULL, InvalidSnapshot, InvalidSnapshot, - read_only, true, tcount); + read_only, true, tcount, NULL); + + _SPI_end_call(true); + return res; +} +int +SPI_execute_callback(const char *src, bool read_only, long tcount, + DestReceiver *callback) +{ + _SPI_plan plan; + int res; + + if (src == NULL || tcount < 0) + return SPI_ERROR_ARGUMENT; + + res = _SPI_begin_call(true); + if (res < 0) + return res; + + memset(&plan, 0, sizeof(_SPI_plan)); + plan.magic = _SPI_PLAN_MAGIC; + plan.cursor_options = 0; + + _SPI_prepare_oneshot_plan(src, &plan); + + res = _SPI_execute_plan(&plan, NULL, + InvalidSnapshot, InvalidSnapshot, + read_only, true, tcount, callback); _SPI_end_call(true); return res; @@ -354,7 +382,34 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls, _SPI_convert_params(plan->nargs, plan->argtypes, Values, Nulls), InvalidSnapshot, InvalidSnapshot, - read_only, true, tcount); + read_only, true, tcount, NULL); + + _SPI_end_call(true); + return res; +} + +/* Execute a previously prepared plan with a callback Destination */ +int +SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls, + bool read_only, long tcount, DestReceiver *callback) +{ + int res; + + if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0) + return SPI_ERROR_ARGUMENT; + + if (plan->nargs > 0 && Values == NULL) + return SPI_ERROR_PARAM; + + res = _SPI_begin_call(true); + if (res < 0) + return res; + + res = _SPI_execute_plan(plan, + _SPI_convert_params(plan->nargs, plan->argtypes, + Values, Nulls), + InvalidSnapshot, InvalidSnapshot, + read_only, true, tcount, callback); _SPI_end_call(true); return res; @@ -383,7 +438,7 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params, res = _SPI_execute_plan(plan, params, InvalidSnapshot, InvalidSnapshot, - read_only, true, tcount); + read_only, true, tcount, NULL); _SPI_end_call(true); return res; @@ -424,7 +479,7 @@ SPI_execute_snapshot(SPIPlanPtr plan, _SPI_convert_params(plan->nargs, plan->argtypes, Values, Nulls), snapshot, crosscheck_snapshot, - read_only, fire_triggers, tcount); + read_only, fire_triggers, tcount, NULL); _SPI_end_call(true); return res; @@ -471,7 +526,7 @@ SPI_execute_with_args(const char *src, res = _SPI_execute_plan(&plan, paramLI, InvalidSnapshot, InvalidSnapshot, - read_only, true, tcount); + read_only, true, tcount, NULL); _SPI_end_call(true); return res; @@ -1892,7 +1947,8 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan) static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, Snapshot snapshot, Snapshot crosscheck_snapshot, - bool read_only, bool fire_triggers, uint64 tcount) + bool read_only, bool fire_triggers, uint64 tcount, + DestReceiver *callback) { int my_res = 0; uint64 my_processed = 0; @@ -1903,6 +1959,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, ErrorContextCallback spierrcontext; CachedPlan *cplan = NULL; ListCell *lc1; + DestReceiver *dest = callback; /* * Setup error traceback support for ereport() @@ -2020,7 +2077,6 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, { Node *stmt = (Node *) lfirst(lc2); bool canSetTag; - DestReceiver *dest; _SPI_current->processed = 0; _SPI_current->lastoid = InvalidOid; @@ -2072,7 +2128,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, UpdateActiveSnapshotCommandId(); } - dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone); + if (!callback) + dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone); if (IsA(stmt, PlannedStmt) && ((PlannedStmt *) stmt)->utilityStmt == NULL) @@ -2098,6 +2155,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, { char completionTag[COMPLETION_TAG_BUFSIZE]; + // XXX throw error if callback is set ProcessUtility(stmt, plansource->query_string, PROCESS_UTILITY_QUERY, diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c index 60a92801f1..212cfbd101 100644 --- a/src/backend/tcop/dest.c +++ b/src/backend/tcop/dest.c @@ -81,6 +81,11 @@ static DestReceiver spi_printtupDR = { DestSPI }; +static DestReceiver spi_callbackDR = { + donothingReceive, donothingStartup, donothingCleanup, donothingCleanup, + DestSPICallback +}; + /* Globally available receiver for DestNone */ DestReceiver *None_Receiver = &donothingDR; @@ -117,6 +122,9 @@ CreateDestReceiver(CommandDest dest) case DestSPI: return &spi_printtupDR; + case DestSPICallback: + return &spi_callbackDR; + case DestTuplestore: return CreateTuplestoreDestReceiver(); @@ -162,6 +170,7 @@ EndCommand(const char *commandTag, CommandDest dest) case DestNone: case DestDebug: case DestSPI: + case DestSPICallback: case DestTuplestore: case DestIntoRel: case DestCopyOut: @@ -205,6 +214,7 @@ NullCommand(CommandDest dest) case DestNone: case DestDebug: case DestSPI: + case DestSPICallback: case DestTuplestore: case DestIntoRel: case DestCopyOut: @@ -250,6 +260,7 @@ ReadyForQuery(CommandDest dest) case DestNone: case DestDebug: case DestSPI: + case DestSPICallback: case DestTuplestore: case DestIntoRel: case DestCopyOut: diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h index 76ba394a2b..15187c47e3 100644 --- a/src/include/executor/spi.h +++ b/src/include/executor/spi.h @@ -74,11 +74,15 @@ extern PGDLLIMPORT int SPI_result; extern int SPI_connect(void); extern int SPI_finish(void); extern int SPI_execute(const char *src, bool read_only, long tcount); +extern int SPI_execute_callback(const char *src, bool read_only, long tcount, + DestReceiver *callback); extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls, bool read_only, long tcount); extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params, bool read_only, long tcount); +extern int SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls, + bool read_only, long tcount, DestReceiver *callback); extern int SPI_exec(const char *src, long tcount); extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls, long tcount); diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h index dd80433f74..5389f496a2 100644 --- a/src/include/tcop/dest.h +++ b/src/include/tcop/dest.h @@ -90,6 +90,7 @@ typedef enum DestRemote, /* results sent to frontend process */ DestRemoteExecute, /* sent to frontend, in Execute command */ DestSPI, /* results sent to SPI manager */ + DestSPICallback, /* results sent to user-specified callback function */ DestTuplestore, /* results sent to Tuplestore */ DestIntoRel, /* results sent to relation (SELECT INTO) */ DestCopyOut, /* results sent to COPY TO code */ diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c index 860b804e54..07501f18f2 100644 --- a/src/pl/plpython/plpy_main.c +++ b/src/pl/plpython/plpy_main.c @@ -403,6 +403,19 @@ PLy_current_execution_context(void) return PLy_execution_contexts; } +PLyExecutionContext * +PLy_switch_execution_context(PLyExecutionContext *new) +{ + PLyExecutionContext *last = PLy_execution_contexts; + + if (PLy_execution_contexts == NULL) + elog(ERROR, "no Python function is currently executing"); + + PLy_execution_contexts = new; + + return last; +} + MemoryContext PLy_get_scratch_context(PLyExecutionContext *context) { diff --git a/src/pl/plpython/plpy_main.h b/src/pl/plpython/plpy_main.h index 10426c4323..7cbe0a8d35 100644 --- a/src/pl/plpython/plpy_main.h +++ b/src/pl/plpython/plpy_main.h @@ -25,6 +25,9 @@ typedef struct PLyExecutionContext /* Get the current execution context */ extern PLyExecutionContext *PLy_current_execution_context(void); +/* Get switch execution contexts */ +extern PLyExecutionContext *PLy_switch_execution_context(PLyExecutionContext *new); + /* Get the scratch memory context for specified execution context */ extern MemoryContext PLy_get_scratch_context(PLyExecutionContext *context); diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c index 07ab6a087e..ab303367d3 100644 --- a/src/pl/plpython/plpy_spi.c +++ b/src/pl/plpython/plpy_spi.c @@ -28,8 +28,27 @@ #include "plpy_procedure.h" #include "plpy_resultobject.h" +typedef struct +{ + DestReceiver pub; + PLyExecutionContext *exec_ctx; + MemoryContext mctx; + TupleDesc desc; + PLyTypeInfo *args; + + /* Dictionary of Lists */ + PyObject *dict; + PyObject **lists; +} CallbackState; + + + +void PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo); +void PLy_CSDestroy(DestReceiver *self); +static bool PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self); static PyObject *PLy_spi_execute_query(char *query, long limit); +static PyObject *PLy_spi_execute_query2(char *query, long limit); static PyObject *PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit); static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status); @@ -341,8 +360,159 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit) return ret; } +void +PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo) +{ + PLyExecutionContext *old_exec_ctx; + CallbackState *myState = (CallbackState *) self; + PLyTypeInfo *args; + + MemoryContext mctx, old_mctx; + PyObject *dict; + PyObject **lists; + int i; + + /* + * We may be in a different execution context when we're called, so switch + * back to our original one. + */ + mctx = myState->mctx; + old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx); + old_mctx = MemoryContextSwitchTo(mctx); + + /* Store this as a sanity check */ + myState->desc = typeinfo; + + /* Setup type conversion info */ + myState->args = args = palloc0(sizeof(PLyTypeInfo)); + PLy_typeinfo_init(args, mctx); + PLy_input_tuple_funcs(args, typeinfo); + + /* + * We never palloc python objects, but this is an array of object pointers, + * so it's OK. + */ + myState->lists = lists = palloc0(args->in.r.natts * sizeof(PyObject *)); + + myState->dict = dict = PyDict_New(); + if (dict == NULL) + PLy_elog(ERROR, "could not create new dictionary"); + + + for (i = 0; i < args->in.r.natts; i++) + { + char *key; + PyObject *value; + + if (typeinfo->attrs[i]->attisdropped) + continue; + + /* NOTE: If size is > 0 then the list must get initialized! */ + value = PyList_New(0); + if (value == NULL) + PLy_elog(ERROR, "could not create new list"); + + key = NameStr(typeinfo->attrs[i]->attname); + PyDict_SetItemString(dict, key, value); + Py_DECREF(value); + + /* We want fast access to the lists, so we store them in our array of pointers */ + lists[i] = value; + } + + MemoryContextSwitchTo(old_mctx); + PLy_switch_execution_context(old_exec_ctx); +} + +void +PLy_CSDestroy(DestReceiver *self) +{ + CallbackState *myState = (CallbackState *) self; + + MemoryContextDelete(myState->mctx); +} + +static bool +PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self) +{ + TupleDesc desc = slot->tts_tupleDescriptor; + CallbackState *myState = (CallbackState *) self; + PLyTypeInfo *args = myState->args; + PLyExecutionContext *old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx); + MemoryContext scratch_context = PLy_get_scratch_context(myState->exec_ctx); + MemoryContext oldcontext = CurrentMemoryContext; + int i, rv; + + /* Verify saved state matches incoming slot */ + Assert(myState->desc == desc); + Assert(args->in.r.natts == desc->natts); + + /* Make sure the tuple is fully deconstructed */ + slot_getallattrs(slot); + + MemoryContextSwitchTo(scratch_context); + + + /* + * Do the work in the scratch context to avoid leaking memory from the + * datatype output function calls. + */ + for (i = 0; i < desc->natts; i++) + { + PyObject * volatile value = NULL; + PLyDatumToOb * volatile atts = &args->in.r.atts[i]; + + if (desc->attrs[i]->attisdropped) + continue; + + if (myState->lists[i] == NULL) + ereport(ERROR, + (errmsg("missing list for attribute %d", i))); + /* XXX If the function can't be null, ditch that check */ + if (slot->tts_isnull[i] || atts->func == NULL) + { + Py_INCREF(Py_None); + value = Py_None; + } + else + { + PG_TRY(); + { + value = (atts->func) (atts, slot->tts_values[i]); + } + PG_CATCH(); + { + Py_XDECREF(value); + MemoryContextSwitchTo(oldcontext); + PLy_switch_execution_context(old_exec_ctx); + PG_RE_THROW(); + } + PG_END_TRY(); + } + + /* + * If we tried to do this in the PG_CATCH we'd have to mark value + * as volatile, but that won't work with PyList_Append, so just + * test the error code after doing Py_DECREF(). + */ + rv = PyList_Append(myState->lists[i], value); + Py_DECREF(value); + + if (rv != 0) + ereport(ERROR, + (errmsg("unable to append value to list"))); + } + MemoryContextSwitchTo(oldcontext); + /* Should we just do this once, for the whole tuple?? */ + MemoryContextReset(scratch_context); + PLy_switch_execution_context(old_exec_ctx); + + /* If we get here then we were successful */ + return true; +} + static PyObject * -PLy_spi_execute_query(char *query, long limit) +PLy_spi_execute_query2(char *query, long limit) { int rv; volatile MemoryContext oldcontext; @@ -384,6 +554,75 @@ PLy_spi_execute_query(char *query, long limit) } static PyObject * +PLy_spi_execute_query(char *query, long limit) +{ + PLyExecutionContext *exec_ctx = PLy_current_execution_context(); + int rv; + volatile MemoryContext oldcontext, cb_ctx; + volatile ResourceOwner oldowner; + PyObject *ret = NULL; + CallbackState callback; + + oldowner = CurrentResourceOwner; + + /* + * Use a new context to make cleanup easier. Allocate it in the current + * context so we don't have to worry about cleaning it up if there's an + * error. + */ + cb_ctx = AllocSetContextCreate(CurrentMemoryContext, + "PL/Python callback context", + ALLOCSET_DEFAULT_SIZES); + + oldcontext = MemoryContextSwitchTo(cb_ctx); + //callback = palloc0(sizeof(CallbackState)); + callback.mctx = cb_ctx; + memcpy(&(callback.pub), CreateDestReceiver(DestSPICallback), sizeof(DestReceiver)); + callback.pub.receiveSlot = PLy_CSreceive; + callback.pub.rStartup = PLy_CSStartup; + //callback.pub.rDestroy = PLy_CSDestroy; + callback.exec_ctx = exec_ctx; + + PLy_spi_subtransaction_begin(oldcontext, oldowner); + + PG_TRY(); + { + + pg_verifymbstr(query, strlen(query), false); + rv = SPI_execute_callback(query, exec_ctx->curr_proc->fn_readonly, limit, + (DestReceiver *) &callback); + /* + * callback.dict gets set in PLy_CSStartup, which happens during + * executor startup. It's not valid before then. + */ + ret = callback.dict; + + PLy_spi_subtransaction_commit(oldcontext, oldowner); + } + PG_CATCH(); + { + PLy_spi_subtransaction_abort(oldcontext, oldowner); + return NULL; + } + PG_END_TRY(); + + if (rv < 0) + { + Py_XDECREF(ret); + PLy_exception_set(PLy_exc_spi_error, + "SPI_execute failed: %s", + SPI_result_code_string(rv)); + return NULL; + } + + /* Free the callback context. */ + MemoryContextSwitchTo(oldcontext); + MemoryContextDelete(cb_ctx); + + return ret; +} + +static PyObject * PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status) { PLyResultObject *result;