>From 0fb68b2e2649ab81fae8c86c45c1522f1b0d56ab Mon Sep 17 00:00:00 2001 From: Jim Nasby Date: Tue, 28 Feb 2017 21:54:45 -0600 Subject: [PATCH 2/2] Minimal adoption of SPI callbacks in plpython --- src/pl/plpython/plpy_main.c | 13 ++ src/pl/plpython/plpy_main.h | 3 + src/pl/plpython/plpy_spi.c | 294 ++++++++++++++++++++++++++++++++------------ 3 files changed, 231 insertions(+), 79 deletions(-) diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c index 860b804e54..07501f18f2 100644 --- a/src/pl/plpython/plpy_main.c +++ b/src/pl/plpython/plpy_main.c @@ -403,6 +403,19 @@ PLy_current_execution_context(void) return PLy_execution_contexts; } +PLyExecutionContext * +PLy_switch_execution_context(PLyExecutionContext *new) +{ + PLyExecutionContext *last = PLy_execution_contexts; + + if (PLy_execution_contexts == NULL) + elog(ERROR, "no Python function is currently executing"); + + PLy_execution_contexts = new; + + return last; +} + MemoryContext PLy_get_scratch_context(PLyExecutionContext *context) { diff --git a/src/pl/plpython/plpy_main.h b/src/pl/plpython/plpy_main.h index 10426c4323..7cbe0a8d35 100644 --- a/src/pl/plpython/plpy_main.h +++ b/src/pl/plpython/plpy_main.h @@ -25,6 +25,9 @@ typedef struct PLyExecutionContext /* Get the current execution context */ extern PLyExecutionContext *PLy_current_execution_context(void); +/* Get switch execution contexts */ +extern PLyExecutionContext *PLy_switch_execution_context(PLyExecutionContext *new); + /* Get the scratch memory context for specified execution context */ extern MemoryContext PLy_get_scratch_context(PLyExecutionContext *context); diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c index 07ab6a087e..6f48e70329 100644 --- a/src/pl/plpython/plpy_spi.c +++ b/src/pl/plpython/plpy_spi.c @@ -28,10 +28,30 @@ #include "plpy_procedure.h" #include "plpy_resultobject.h" +typedef struct +{ + DestReceiver pub; + PLyExecutionContext *exec_ctx; + MemoryContext parent_ctx; + MemoryContext cb_ctx; + TupleDesc desc; + PLyTypeInfo *args; + + PyObject *result; +} CallbackState; + + +void PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo); +void PLy_CSDestroy(DestReceiver *self); +static bool PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self); +static CallbackState *PLy_Callback_New(PLyExecutionContext *exec_ctx); +static CallbackState *PLy_Callback_Free(CallbackState *callback); +static PLyResultObject *PLyCSNewResult(CallbackState *myState); + static PyObject *PLy_spi_execute_query(char *query, long limit); static PyObject *PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit); -static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable, +static PyObject *PLy_spi_execute_fetch_result(CallbackState *callback, uint64 rows, int status); static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata); @@ -196,6 +216,7 @@ PLy_spi_execute(PyObject *self, PyObject *args) static PyObject * PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit) { + CallbackState *callback; volatile int nargs; int i, rv; @@ -289,9 +310,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit) } } - rv = SPI_execute_plan(plan->plan, plan->values, nulls, - exec_ctx->curr_proc->fn_readonly, limit); - ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv); + callback = PLy_Callback_New(exec_ctx); + rv = SPI_execute_plan_callback(plan->plan, plan->values, nulls, + exec_ctx->curr_proc->fn_readonly, limit, + (DestReceiver *) callback); + ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv); if (nargs > 0) pfree(nulls); @@ -316,9 +339,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit) } PLy_spi_subtransaction_abort(oldcontext, oldowner); + PLy_Callback_Free(callback); return NULL; } PG_END_TRY(); + callback = PLy_Callback_Free(callback); for (i = 0; i < nargs; i++) { @@ -344,6 +369,8 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit) static PyObject * PLy_spi_execute_query(char *query, long limit) { + PLyExecutionContext *exec_ctx = PLy_current_execution_context(); + CallbackState *callback; int rv; volatile MemoryContext oldcontext; volatile ResourceOwner oldowner; @@ -356,20 +383,23 @@ PLy_spi_execute_query(char *query, long limit) PG_TRY(); { - PLyExecutionContext *exec_ctx = PLy_current_execution_context(); - pg_verifymbstr(query, strlen(query), false); - rv = SPI_execute(query, exec_ctx->curr_proc->fn_readonly, limit); - ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv); + callback = PLy_Callback_New(exec_ctx); + rv = SPI_execute_callback(query, exec_ctx->curr_proc->fn_readonly, limit, + (DestReceiver *) callback); + + ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv); PLy_spi_subtransaction_commit(oldcontext, oldowner); } PG_CATCH(); { PLy_spi_subtransaction_abort(oldcontext, oldowner); + PLy_Callback_Free(callback); return NULL; } PG_END_TRY(); + callback = PLy_Callback_Free(callback); if (rv < 0) { @@ -383,94 +413,200 @@ PLy_spi_execute_query(char *query, long limit) return ret; } -static PyObject * -PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status) +static CallbackState * +PLy_Callback_New(PLyExecutionContext *exec_ctx) { + volatile MemoryContext oldcontext, cb_ctx; + CallbackState *callback; + + callback = palloc0(sizeof(CallbackState)); + + /* + * Use a new context to make cleanup easier. Allocate it in the current + * context so we don't have to worry about cleaning it up if there's an + * error. + */ + cb_ctx = AllocSetContextCreate(CurrentMemoryContext, + "PL/Python callback context", + ALLOCSET_DEFAULT_SIZES); + + oldcontext = MemoryContextSwitchTo(cb_ctx); + callback->parent_ctx = oldcontext; + callback->cb_ctx = cb_ctx; + memcpy(&(callback->pub), CreateDestReceiver(DestSPICallback), sizeof(DestReceiver)); + callback->pub.receiveSlot = PLy_CSreceive; + callback->pub.rStartup = PLy_CSStartup; + callback->pub.rDestroy = PLy_CSDestroy; + callback->exec_ctx = exec_ctx; + + MemoryContextSwitchTo(oldcontext); + + return callback; +} + +static CallbackState * +PLy_Callback_Free(CallbackState *callback) +{ + if (callback) + { + if (callback->cb_ctx) + (callback->pub.rDestroy) ((DestReceiver *) callback); + + pfree(callback); + } + + return (CallbackState *) NULL; +} + +static PLyResultObject * +PLyCSNewResult(CallbackState *myState) +{ + MemoryContext oldctx; + + /* The result info needs to be in the parent context */ + oldctx = MemoryContextSwitchTo(myState->parent_ctx); + myState->result = PLy_result_new(); + if (myState->result == NULL) + PLy_elog(ERROR, "could not create new result object"); + + MemoryContextSwitchTo(oldctx); + return (PLyResultObject *) myState->result; +} + +void +PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo) +{ + PLyExecutionContext *old_exec_ctx; + CallbackState *myState = (CallbackState *) self; PLyResultObject *result; - volatile MemoryContext oldcontext; + PLyTypeInfo *args; + MemoryContext mctx, old_mctx; - result = (PLyResultObject *) PLy_result_new(); - Py_DECREF(result->status); - result->status = PyInt_FromLong(status); + /* + * We may be in a different execution context when we're called, so switch + * back to our original one. + */ + mctx = myState->cb_ctx; + old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx); + old_mctx = MemoryContextSwitchTo(mctx); + + /* We need to store this because the TupleDesc the receive function gets has no names. */ + myState->desc = typeinfo; + + /* Setup type conversion info */ + myState->args = args = palloc0(sizeof(PLyTypeInfo)); + PLy_typeinfo_init(args, mctx); + PLy_input_tuple_funcs(args, typeinfo); + + result = PLyCSNewResult(myState); + + /* + * Save tuple descriptor for later use by result set metadata + * functions. Save it in TopMemoryContext so that it survives + * outside of an SPI context. We trust that PLy_result_dealloc() + * will clean it up when the time is right. XXX This might result in a leak + * if an error happens and the result doesn't get dereferenced. + */ + MemoryContextSwitchTo(TopMemoryContext); + result->tupdesc = CreateTupleDescCopy(typeinfo); + + MemoryContextSwitchTo(old_mctx); + PLy_switch_execution_context(old_exec_ctx); +} - if (status > 0 && tuptable == NULL) +void +PLy_CSDestroy(DestReceiver *self) +{ + CallbackState *myState = (CallbackState *) self; + MemoryContext cb_ctx = myState->cb_ctx; + + MemoryContextDelete(cb_ctx); + myState->cb_ctx = 0; +} + +static bool +PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self) +{ + TupleDesc slotdesc = slot->tts_tupleDescriptor; + CallbackState *myState = (CallbackState *) self; + TupleDesc desc = myState->desc; + PLyTypeInfo *args = myState->args; + PLyResultObject *result = (PLyResultObject *) myState->result; + PLyExecutionContext *old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx); + MemoryContext scratch_context = PLy_get_scratch_context(myState->exec_ctx); + MemoryContext oldcontext = CurrentMemoryContext; + int rv; + PyObject *row; + + /* Verify saved state matches incoming slot */ + Assert(desc->tdtypeid == slotdesc->tdtypeid); + Assert(args->in.r.natts == slotdesc->natts); + + /* Make sure the tuple is fully deconstructed */ + slot_getallattrs(slot); + + /* + * Do the work in the scratch context to avoid leaking memory from the + * datatype output function calls. + */ + MemoryContextSwitchTo(scratch_context); + + PG_TRY(); { - Py_DECREF(result->nrows); - result->nrows = (rows > (uint64) LONG_MAX) ? - PyFloat_FromDouble((double) rows) : - PyInt_FromLong((long) rows); + row = PLyDict_FromTuple(args, ExecFetchSlotTuple(slot), desc); } - else if (status > 0 && tuptable != NULL) + PG_CATCH(); { - PLyTypeInfo args; - MemoryContext cxt; + Py_XDECREF(row); + MemoryContextSwitchTo(oldcontext); + PLy_switch_execution_context(old_exec_ctx); + PG_RE_THROW(); + } + PG_END_TRY(); - Py_DECREF(result->nrows); - result->nrows = (rows > (uint64) LONG_MAX) ? - PyFloat_FromDouble((double) rows) : - PyInt_FromLong((long) rows); + /* + * If we tried to do this in the PG_CATCH we'd have to mark value + * as volatile, but that won't work with PyList_Append, so just + * test the error code after doing Py_DECREF(). + */ + rv = PyList_Append(result->rows, row); + Py_DECREF(row); + + if (rv != 0) + ereport(ERROR, + (errmsg("unable to append value to list"))); - cxt = AllocSetContextCreate(CurrentMemoryContext, - "PL/Python temp context", - ALLOCSET_DEFAULT_SIZES); - PLy_typeinfo_init(&args, cxt); + MemoryContextSwitchTo(oldcontext); + MemoryContextReset(scratch_context); + PLy_switch_execution_context(old_exec_ctx); - oldcontext = CurrentMemoryContext; - PG_TRY(); - { - MemoryContext oldcontext2; + return true; +} - if (rows) - { - uint64 i; - - /* - * PyList_New() and PyList_SetItem() use Py_ssize_t for list - * size and list indices; so we cannot support a result larger - * than PY_SSIZE_T_MAX. - */ - if (rows > (uint64) PY_SSIZE_T_MAX) - ereport(ERROR, - (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), - errmsg("query result has too many rows to fit in a Python list"))); - - Py_DECREF(result->rows); - result->rows = PyList_New(rows); - - PLy_input_tuple_funcs(&args, tuptable->tupdesc); - for (i = 0; i < rows; i++) - { - PyObject *row = PLyDict_FromTuple(&args, - tuptable->vals[i], - tuptable->tupdesc); - PyList_SetItem(result->rows, i, row); - } - } +static PyObject * +PLy_spi_execute_fetch_result(CallbackState *callback, uint64 rows, int status) +{ + PLyResultObject *result = (PLyResultObject *) callback->result; + /* If status < 0 this stuff would just get thrown away anyway. */ + if (status > 0) + { + if (!result) + { /* - * Save tuple descriptor for later use by result set metadata - * functions. Save it in TopMemoryContext so that it survives - * outside of an SPI context. We trust that PLy_result_dealloc() - * will clean it up when the time is right. (Do this as late as - * possible, to minimize the number of ways the tupdesc could get - * leaked due to errors.) + * This happens if the command returned no results. Create a dummy result set. */ - oldcontext2 = MemoryContextSwitchTo(TopMemoryContext); - result->tupdesc = CreateTupleDescCopy(tuptable->tupdesc); - MemoryContextSwitchTo(oldcontext2); - } - PG_CATCH(); - { - MemoryContextSwitchTo(oldcontext); - MemoryContextDelete(cxt); - Py_DECREF(result); - PG_RE_THROW(); + result = PLyCSNewResult(callback); + callback->result = (PyObject *) result; } - PG_END_TRY(); - MemoryContextDelete(cxt); - SPI_freetuptable(tuptable); + Py_DECREF(result->status); + result->status = PyInt_FromLong(status); + Py_DECREF(result->nrows); + result->nrows = (rows > (uint64) LONG_MAX) ? + PyFloat_FromDouble((double) rows) : + PyInt_FromLong((long) rows); } return (PyObject *) result; -- 2.11.1