From 1b3318154540d0fe71480ca58938433ecfccabbd Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Tue, 31 Oct 2017 14:48:51 -0400 Subject: [PATCH v1] Transaction control in PL/Python procedures Add .commit, .rollback, and .start_transaction functions to plpy module to control transactions in a PL/Python function. Add similar underlying functions to SPI. Some additional cleanup so that transaction commit or abort doesn't blow away data structures still used by the procedure call. --- src/backend/commands/functioncmds.c | 3 + src/backend/executor/spi.c | 71 +++++++++++++++++------ src/backend/utils/mmgr/portalmem.c | 39 +++++-------- src/include/executor/spi.h | 4 ++ src/include/executor/spi_priv.h | 1 + src/pl/plpython/Makefile | 1 + src/pl/plpython/expected/plpython_test.out | 9 ++- src/pl/plpython/expected/plpython_transaction.out | 44 ++++++++++++++ src/pl/plpython/plpy_main.c | 2 +- src/pl/plpython/plpy_plpymodule.c | 38 ++++++++++++ src/pl/plpython/sql/plpython_transaction.sql | 36 ++++++++++++ 11 files changed, 201 insertions(+), 47 deletions(-) create mode 100644 src/pl/plpython/expected/plpython_transaction.out create mode 100644 src/pl/plpython/sql/plpython_transaction.sql diff --git a/src/backend/commands/functioncmds.c b/src/backend/commands/functioncmds.c index 1f3156d870..badef78fdf 100644 --- a/src/backend/commands/functioncmds.c +++ b/src/backend/commands/functioncmds.c @@ -65,6 +65,7 @@ #include "utils/fmgroids.h" #include "utils/guc.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" #include "utils/rel.h" #include "utils/syscache.h" #include "utils/tqual.h" @@ -2255,6 +2256,8 @@ ExecuteCallStmt(ParseState *pstate, CallStmt *stmt) FUNC_MAX_ARGS, FUNC_MAX_ARGS))); + MemoryContextSwitchTo(PortalContext); + fmgr_info(fexpr->funcid, &flinfo); InitFunctionCallInfoData(fcinfo, &flinfo, nargs, fexpr->inputcollid, NULL, NULL); diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index 40292b86c1..f097f15731 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -92,7 +92,7 @@ SPI_connect(void) elog(ERROR, "SPI stack corrupted"); newdepth = 16; _SPI_stack = (_SPI_connection *) - MemoryContextAlloc(TopTransactionContext, + MemoryContextAlloc(TopMemoryContext, newdepth * sizeof(_SPI_connection)); _SPI_stack_depth = newdepth; } @@ -133,10 +133,10 @@ SPI_connect(void) * Perhaps CurTransactionContext would do? For now it doesn't matter * because we clean up explicitly in AtEOSubXact_SPI(). */ - _SPI_current->procCxt = AllocSetContextCreate(TopTransactionContext, + _SPI_current->procCxt = AllocSetContextCreate(PortalContext, "SPI Proc", ALLOCSET_DEFAULT_SIZES); - _SPI_current->execCxt = AllocSetContextCreate(TopTransactionContext, + _SPI_current->execCxt = AllocSetContextCreate(_SPI_current->procCxt, "SPI Exec", ALLOCSET_DEFAULT_SIZES); /* ... and switch to procedure's context */ @@ -158,8 +158,6 @@ SPI_finish(void) MemoryContextSwitchTo(_SPI_current->savedcxt); /* Release memory used in procedure call (including tuptables) */ - MemoryContextDelete(_SPI_current->execCxt); - _SPI_current->execCxt = NULL; MemoryContextDelete(_SPI_current->procCxt); _SPI_current->procCxt = NULL; @@ -181,12 +179,58 @@ SPI_finish(void) return SPI_OK_FINISH; } +int +SPI_start_transaction(void) +{ + MemoryContext oldcontext = CurrentMemoryContext; + + StartTransactionCommand(); + MemoryContextSwitchTo(oldcontext); + return 0; +} + + +int +SPI_commit(void) +{ + MemoryContext oldcontext = CurrentMemoryContext; + + _SPI_current->internal_xact = true; + + if (ActiveSnapshotSet()) + PopActiveSnapshot(); + CommitTransactionCommand(); + MemoryContextSwitchTo(oldcontext); + + _SPI_current->internal_xact = false; + + return 0; +} + +int +SPI_rollback(void) +{ + MemoryContext oldcontext = CurrentMemoryContext; + + _SPI_current->internal_xact = true; + + AbortCurrentTransaction(); + MemoryContextSwitchTo(oldcontext); + + _SPI_current->internal_xact = false; + + return 0; +} + /* * Clean up SPI state at transaction commit or abort. */ void AtEOXact_SPI(bool isCommit) { + if (_SPI_current && _SPI_current->internal_xact) + return; + /* * Note that memory contexts belonging to SPI stack entries will be freed * automatically, so we can ignore them here. We just need to restore our @@ -224,21 +268,10 @@ AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid) if (connection->connectSubid != mySubid) break; /* couldn't be any underneath it either */ - found = true; + if (connection->internal_xact) + break; - /* - * Release procedure memory explicitly (see note in SPI_connect) - */ - if (connection->execCxt) - { - MemoryContextDelete(connection->execCxt); - connection->execCxt = NULL; - } - if (connection->procCxt) - { - MemoryContextDelete(connection->procCxt); - connection->procCxt = NULL; - } + found = true; /* * Pop the stack entry and reset global variables. Unlike diff --git a/src/backend/utils/mmgr/portalmem.c b/src/backend/utils/mmgr/portalmem.c index 89db08464f..d07dc060a5 100644 --- a/src/backend/utils/mmgr/portalmem.c +++ b/src/backend/utils/mmgr/portalmem.c @@ -736,11 +736,8 @@ PreCommit_Portals(bool isPrepare) /* * Abort processing for portals. * - * At this point we reset "active" status and run the cleanup hook if - * present, but we can't release the portal's memory until the cleanup call. - * - * The reason we need to reset active is so that we can replace the unnamed - * portal, else we'll fail to execute ROLLBACK when it arrives. + * At this point we run the cleanup hook if present, but we can't release the + * portal's memory until the cleanup call. */ void AtAbort_Portals(void) @@ -754,17 +751,6 @@ AtAbort_Portals(void) { Portal portal = hentry->portal; - /* - * See similar code in AtSubAbort_Portals(). This would fire if code - * orchestrating multiple top-level transactions within a portal, such - * as VACUUM, caught errors and continued under the same portal with a - * fresh transaction. No part of core PostgreSQL functions that way. - * XXX Such code would wish the portal to remain ACTIVE, as in - * PreCommit_Portals(). - */ - if (portal->status == PORTAL_ACTIVE) - MarkPortalFailed(portal); - /* * Do nothing else to cursors held over from a previous transaction. */ @@ -799,14 +785,6 @@ AtAbort_Portals(void) * PortalDrop. */ portal->resowner = NULL; - - /* - * Although we can't delete the portal data structure proper, we can - * release any memory in subsidiary contexts, such as executor state. - * The cleanup hook was the last thing that might have needed data - * there. - */ - MemoryContextDeleteChildren(PortalGetHeapMemory(portal)); } } @@ -826,6 +804,19 @@ AtCleanup_Portals(void) { Portal portal = hentry->portal; + /* + * Do not touch active portals --- this can only happen in the case of + * a multi-transaction command. + * + * Note however that any resource owner attached to such a portal is + * still going to go away, so don't leave a dangling pointer. + */ + if (portal->status == PORTAL_ACTIVE) + { + portal->resowner = NULL; + continue; + } + /* Do nothing to cursors held over from a previous transaction */ if (portal->createSubid == InvalidSubTransactionId) { diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h index acade7e92e..1bf73d8594 100644 --- a/src/include/executor/spi.h +++ b/src/include/executor/spi.h @@ -156,6 +156,10 @@ extern int SPI_register_relation(EphemeralNamedRelation enr); extern int SPI_unregister_relation(const char *name); extern int SPI_register_trigger_data(TriggerData *tdata); +extern int SPI_start_transaction(void); +extern int SPI_commit(void); +extern int SPI_rollback(void); + extern void AtEOXact_SPI(bool isCommit); extern void AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid); diff --git a/src/include/executor/spi_priv.h b/src/include/executor/spi_priv.h index 8fae755418..c12a465712 100644 --- a/src/include/executor/spi_priv.h +++ b/src/include/executor/spi_priv.h @@ -36,6 +36,7 @@ typedef struct MemoryContext savedcxt; /* context of SPI_connect's caller */ SubTransactionId connectSubid; /* ID of connecting subtransaction */ QueryEnvironment *queryEnv; /* query environment setup for SPI level */ + bool internal_xact; /* SPI-managed transaction boundary, skip cleanup */ } _SPI_connection; /* diff --git a/src/pl/plpython/Makefile b/src/pl/plpython/Makefile index cc91afebde..d09910835d 100644 --- a/src/pl/plpython/Makefile +++ b/src/pl/plpython/Makefile @@ -90,6 +90,7 @@ REGRESS = \ plpython_quote \ plpython_composite \ plpython_subtransaction \ + plpython_transaction \ plpython_drop REGRESS_PLPYTHON3_MANGLE := $(REGRESS) diff --git a/src/pl/plpython/expected/plpython_test.out b/src/pl/plpython/expected/plpython_test.out index 847e4cc412..f0a10cc05f 100644 --- a/src/pl/plpython/expected/plpython_test.out +++ b/src/pl/plpython/expected/plpython_test.out @@ -43,11 +43,12 @@ contents.sort() return contents $$ LANGUAGE plpythonu; select module_contents(); - module_contents ------------------ + module_contents +------------------- Error Fatal SPIError + commit cursor debug error @@ -60,10 +61,12 @@ select module_contents(); quote_ident quote_literal quote_nullable + rollback spiexceptions + start_transaction subtransaction warning -(18 rows) +(21 rows) CREATE FUNCTION elog_test_basic() RETURNS void AS $$ diff --git a/src/pl/plpython/expected/plpython_transaction.out b/src/pl/plpython/expected/plpython_transaction.out new file mode 100644 index 0000000000..64e3abdd60 --- /dev/null +++ b/src/pl/plpython/expected/plpython_transaction.out @@ -0,0 +1,44 @@ +CREATE TABLE test1 (a int, b text); +CREATE PROCEDURE transaction_test1() +LANGUAGE plpythonu +AS $$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +$$; +CALL transaction_test1(); +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +TRUNCATE test1; +DO +LANGUAGE plpythonu +$$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +$$; +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +DROP TABLE test1; diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c index 7df50c09c8..0d3b4a8fbc 100644 --- a/src/pl/plpython/plpy_main.c +++ b/src/pl/plpython/plpy_main.c @@ -424,7 +424,7 @@ PLy_push_execution_context(void) PLyExecutionContext *context; context = (PLyExecutionContext *) - MemoryContextAlloc(TopTransactionContext, sizeof(PLyExecutionContext)); + MemoryContextAlloc(PortalContext, sizeof(PLyExecutionContext)); context->curr_proc = NULL; context->scratch_ctx = NULL; context->next = PLy_execution_contexts; diff --git a/src/pl/plpython/plpy_plpymodule.c b/src/pl/plpython/plpy_plpymodule.c index 759ad44932..a95e66bfee 100644 --- a/src/pl/plpython/plpy_plpymodule.c +++ b/src/pl/plpython/plpy_plpymodule.c @@ -6,8 +6,10 @@ #include "postgres.h" +#include "access/xact.h" #include "mb/pg_wchar.h" #include "utils/builtins.h" +#include "utils/snapmgr.h" #include "plpython.h" @@ -41,6 +43,9 @@ static PyObject *PLy_fatal(PyObject *self, PyObject *args, PyObject *kw); static PyObject *PLy_quote_literal(PyObject *self, PyObject *args); static PyObject *PLy_quote_nullable(PyObject *self, PyObject *args); static PyObject *PLy_quote_ident(PyObject *self, PyObject *args); +static PyObject *PLy_start_transaction(PyObject *self, PyObject *args); +static PyObject *PLy_commit(PyObject *self, PyObject *args); +static PyObject *PLy_rollback(PyObject *self, PyObject *args); /* A list of all known exceptions, generated from backend/utils/errcodes.txt */ @@ -95,6 +100,13 @@ static PyMethodDef PLy_methods[] = { */ {"cursor", PLy_cursor, METH_VARARGS, NULL}, + /* + * transaction control + */ + {"start_transaction", PLy_start_transaction, METH_NOARGS, NULL}, + {"commit", PLy_commit, METH_NOARGS, NULL}, + {"rollback", PLy_rollback, METH_NOARGS, NULL}, + {NULL, NULL, 0, NULL} }; @@ -577,3 +589,29 @@ PLy_output(volatile int level, PyObject *self, PyObject *args, PyObject *kw) */ Py_RETURN_NONE; } + +static PyObject * +PLy_start_transaction(PyObject *self, PyObject *args) +{ + SPI_start_transaction(); + + Py_RETURN_NONE; +} + +static PyObject * +PLy_commit(PyObject *self, PyObject *args) +{ + SPI_commit(); + SPI_start_transaction(); + + Py_RETURN_NONE; +} + +static PyObject * +PLy_rollback(PyObject *self, PyObject *args) +{ + SPI_rollback(); + SPI_start_transaction(); + + Py_RETURN_NONE; +} diff --git a/src/pl/plpython/sql/plpython_transaction.sql b/src/pl/plpython/sql/plpython_transaction.sql new file mode 100644 index 0000000000..42f191b008 --- /dev/null +++ b/src/pl/plpython/sql/plpython_transaction.sql @@ -0,0 +1,36 @@ +CREATE TABLE test1 (a int, b text); + + +CREATE PROCEDURE transaction_test1() +LANGUAGE plpythonu +AS $$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +$$; + +CALL transaction_test1(); + +SELECT * FROM test1; + + +TRUNCATE test1; + +DO +LANGUAGE plpythonu +$$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +$$; + +SELECT * FROM test1; + + +DROP TABLE test1; base-commit: ee4673ac071f8352c41cc673299b7ec695f079ff prerequisite-patch-id: 7b37b5c8905dcab64b9c6b8f98caf81049a569ec -- 2.14.3