*** a/contrib/postgres_fdw/deparse.c --- b/contrib/postgres_fdw/deparse.c *************** *** 189,198 **** is_foreign_expr(PlannerInfo *root, if (!foreign_expr_walker((Node *) expr, &glob_cxt, &loc_cxt)) return false; - /* Expressions examined here should be boolean, ie noncollatable */ - Assert(loc_cxt.collation == InvalidOid); - Assert(loc_cxt.state == FDW_COLLATE_NONE); - /* * An expression which includes any mutable functions can't be sent over * because its result is not stable. For example, sending now() remote --- 189,194 ---- *************** *** 928,933 **** deparseUpdateSql(StringInfo buf, PlannerInfo *root, --- 924,982 ---- } /* + * deparse remote UPDATE statement + * + * The statement text is appended to buf, and we also create an integer List + * of the columns being retrieved by RETURNING (if any), which is returned + * to *retrieved_attrs. + */ + void + deparseDirectUpdateSql(StringInfo buf, PlannerInfo *root, + Index rtindex, Relation rel, + List *remote_conds, + List *targetlist, + List *targetAttrs, List *returningList, + List **retrieved_attrs) + { + RelOptInfo *baserel = root->simple_rel_array[rtindex]; + List *params_list = NIL; + deparse_expr_cxt context; + bool first; + ListCell *lc; + + /* Set up context struct for recursion */ + context.root = root; + context.foreignrel = baserel; + context.buf = buf; + context.params_list = NULL; + + appendStringInfoString(buf, "UPDATE "); + deparseRelation(buf, rel); + appendStringInfoString(buf, " SET "); + + first = true; + foreach(lc, targetAttrs) + { + int attnum = lfirst_int(lc); + TargetEntry *tle = get_tle_by_resno(targetlist, attnum); + + if (!first) + appendStringInfoString(buf, ", "); + first = false; + + deparseColumnRef(buf, rtindex, attnum, root); + appendStringInfo(buf, " = "); + deparseExpr((Expr *) tle->expr, &context); + } + if (remote_conds) + appendWhereClause(buf, root, baserel, remote_conds, + true, ¶ms_list); + + deparseReturningList(buf, root, rtindex, rel, false, + returningList, retrieved_attrs); + } + + /* * deparse remote DELETE statement * * The statement text is appended to buf, and we also create an integer List *************** *** 950,955 **** deparseDeleteSql(StringInfo buf, PlannerInfo *root, --- 999,1031 ---- } /* + * deparse remote DELETE statement + * + * The statement text is appended to buf, and we also create an integer List + * of the columns being retrieved by RETURNING (if any), which is returned + * to *retrieved_attrs. + */ + void + deparseDirectDeleteSql(StringInfo buf, PlannerInfo *root, + Index rtindex, Relation rel, + List *remote_conds, + List *returningList, + List **retrieved_attrs) + { + RelOptInfo *baserel = root->simple_rel_array[rtindex]; + List *params_list = NIL; + + appendStringInfoString(buf, "DELETE FROM "); + deparseRelation(buf, rel); + if (remote_conds) + appendWhereClause(buf, root, baserel, remote_conds, + true, ¶ms_list); + + deparseReturningList(buf, root, rtindex, rel, false, + returningList, retrieved_attrs); + } + + /* * Add a RETURNING clause, if needed, to an INSERT/UPDATE/DELETE. */ static void *** a/contrib/postgres_fdw/expected/postgres_fdw.out --- b/contrib/postgres_fdw/expected/postgres_fdw.out *************** *** 1124,1138 **** UPDATE ft2 SET c2 = ft2.c2 + 500, c3 = ft2.c3 || '_update9', c7 = DEFAULT FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9; EXPLAIN (verbose, costs off) DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4; ! QUERY PLAN ! ---------------------------------------------------------------------------------------- Delete on public.ft2 Output: c1, c4 - Remote SQL: DELETE FROM "S 1"."T 1" WHERE ctid = $1 RETURNING "C 1", c4 -> Foreign Scan on public.ft2 Output: ctid ! Remote SQL: SELECT ctid FROM "S 1"."T 1" WHERE ((("C 1" % 10) = 5)) FOR UPDATE ! (6 rows) DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4; c1 | c4 --- 1124,1137 ---- FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9; EXPLAIN (verbose, costs off) DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4; ! QUERY PLAN ! -------------------------------------------------------------------------------------------- Delete on public.ft2 Output: c1, c4 -> Foreign Scan on public.ft2 Output: ctid ! Remote SQL: DELETE FROM "S 1"."T 1" WHERE ((("C 1" % 10) = 5)) RETURNING "C 1", c4 ! (5 rows) DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4; c1 | c4 *************** *** 2227,2233 **** CONTEXT: Remote SQL command: INSERT INTO "S 1"."T 1"("C 1", c2, c3, c4, c5, c6, UPDATE ft1 SET c2 = -c2 WHERE c1 = 1; -- c2positive ERROR: new row for relation "T 1" violates check constraint "c2positive" DETAIL: Failing row contains (1, -1, 00001_trig_update, 1970-01-02 08:00:00+00, 1970-01-02 00:00:00, 1, 1 , foo). ! CONTEXT: Remote SQL command: UPDATE "S 1"."T 1" SET c2 = $2 WHERE ctid = $1 -- Test savepoint/rollback behavior select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1; c2 | count --- 2226,2232 ---- UPDATE ft1 SET c2 = -c2 WHERE c1 = 1; -- c2positive ERROR: new row for relation "T 1" violates check constraint "c2positive" DETAIL: Failing row contains (1, -1, 00001_trig_update, 1970-01-02 08:00:00+00, 1970-01-02 00:00:00, 1, 1 , foo). ! CONTEXT: Remote SQL command: UPDATE "S 1"."T 1" SET c2 = (- c2) WHERE (("C 1" = 1)) -- Test savepoint/rollback behavior select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1; c2 | count *************** *** 2386,2392 **** savepoint s3; update ft2 set c2 = -2 where c2 = 42 and c1 = 10; -- fail on remote side ERROR: new row for relation "T 1" violates check constraint "c2positive" DETAIL: Failing row contains (10, -2, 00010_trig_update_trig_update, 1970-01-11 08:00:00+00, 1970-01-11 00:00:00, 0, 0 , foo). ! CONTEXT: Remote SQL command: UPDATE "S 1"."T 1" SET c2 = $2 WHERE ctid = $1 rollback to savepoint s3; select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1; c2 | count --- 2385,2391 ---- update ft2 set c2 = -2 where c2 = 42 and c1 = 10; -- fail on remote side ERROR: new row for relation "T 1" violates check constraint "c2positive" DETAIL: Failing row contains (10, -2, 00010_trig_update_trig_update, 1970-01-11 08:00:00+00, 1970-01-11 00:00:00, 0, 0 , foo). ! CONTEXT: Remote SQL command: UPDATE "S 1"."T 1" SET c2 = (-2) WHERE ((c2 = 42)) AND (("C 1" = 10)) rollback to savepoint s3; select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1; c2 | count *** a/contrib/postgres_fdw/postgres_fdw.c --- b/contrib/postgres_fdw/postgres_fdw.c *************** *** 87,93 **** typedef struct PgFdwRelationInfo * planner to executor. Currently we store: * * 1) SELECT statement text to be sent to the remote server ! * 2) Integer list of attribute numbers retrieved by the SELECT * * These items are indexed with the enum FdwScanPrivateIndex, so an item * can be fetched with list_nth(). For example, to get the SELECT statement: --- 87,98 ---- * planner to executor. Currently we store: * * 1) SELECT statement text to be sent to the remote server ! * 2) List of restriction clauses that can be executed remotely ! * 3) Integer list of attribute numbers retrieved by the SELECT ! * 4) UPDATE/DELETE statement text to be sent to the remote server ! * 5) Boolean flag showing if we set the command es_processed ! * 6) Boolean flag showing if the remote query has a RETURNING clause ! * 7) Integer list of attribute numbers retrieved by RETURNING, if any * * These items are indexed with the enum FdwScanPrivateIndex, so an item * can be fetched with list_nth(). For example, to get the SELECT statement: *************** *** 97,106 **** enum FdwScanPrivateIndex { /* SQL statement to execute remotely (as a String node) */ FdwScanPrivateSelectSql, ! /* Integer list of attribute numbers retrieved by the SELECT */ ! FdwScanPrivateRetrievedAttrs }; /* * Similarly, this enum describes what's kept in the fdw_private list for * a ModifyTable node referencing a postgres_fdw foreign table. We store: --- 102,124 ---- { /* SQL statement to execute remotely (as a String node) */ FdwScanPrivateSelectSql, ! /* List of restriction clauses that can be executed remotely */ ! FdwScanPrivateRemoteConds, ! /* Integer list of attribute numbers retrieved by SELECT */ ! FdwScanPrivateRetrievedAttrsBySelect, ! /* UPDATE/DELETE statement to execute remotely (as a String node) */ ! FdwScanPrivateUpdateSql, ! /* set-processed flag (as an integer Value node) */ ! FdwScanPrivateSetProcessed, ! /* has-returning flag (as an integer Value node) */ ! FdwScanPrivateHasReturning, ! /* Integer list of attribute numbers retrieved by RETURNING */ ! FdwScanPrivateRetrievedAttrsByReturning }; + #define MinFdwScanFdwPrivateLength 3 + #define MaxFdwScanFdwPrivateLength 7 + /* * Similarly, this enum describes what's kept in the fdw_private list for * a ModifyTable node referencing a postgres_fdw foreign table. We store: *************** *** 132,139 **** typedef struct PgFdwScanState AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ /* extracted fdw_private data */ ! char *query; /* text of SELECT command */ List *retrieved_attrs; /* list of retrieved attribute numbers */ /* for remote query execution */ PGconn *conn; /* connection for the scan */ --- 150,159 ---- AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ /* extracted fdw_private data */ ! char *query; /* text of SELECT or UPDATE/DELETE command */ List *retrieved_attrs; /* list of retrieved attribute numbers */ + bool set_processed; /* do we set the command es_processed? */ + bool has_returning; /* is there a RETURNING clause? */ /* for remote query execution */ PGconn *conn; /* connection for the scan */ *************** *** 153,158 **** typedef struct PgFdwScanState --- 173,183 ---- int fetch_ct_2; /* Min(# of fetches done, 2) */ bool eof_reached; /* true if last fetch reached EOF */ + /* for direct update */ + bool direct_update; /* do we update the foreign table directly? */ + PGresult *result; /* result of an UPDATE/DELETE query */; + TupleTableSlot *rslot; /* slot containing the result tuple */ + /* working memory contexts */ MemoryContext batch_cxt; /* context holding current batch of tuples */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ *************** *** 181,186 **** typedef struct PgFdwModifyState --- 206,215 ---- int p_nums; /* number of parameters to transmit */ FmgrInfo *p_flinfo; /* output conversion functions for them */ + /* for direct update */ + bool direct_update; /* do we update the foreign table directly? */ + PgFdwScanState *fsstate; /* execution state of a foreign scan */ + /* working memory context */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ } PgFdwModifyState; *************** *** 307,318 **** static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, static void create_cursor(ForeignScanState *node); static void fetch_more_data(ForeignScanState *node); static void close_cursor(PGconn *conn, unsigned int cursor_number); static void prepare_foreign_modify(PgFdwModifyState *fmstate); static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, TupleTableSlot *slot); ! static void store_returning_result(PgFdwModifyState *fmstate, ! TupleTableSlot *slot, PGresult *res); static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, HeapTuple *rows, int targrows, double *totalrows, --- 336,364 ---- static void create_cursor(ForeignScanState *node); static void fetch_more_data(ForeignScanState *node); static void close_cursor(PGconn *conn, unsigned int cursor_number); + static bool check_direct_update(PlannerInfo *root, + ModifyTable *plan, + Index resultRelation, + int subplan_index, + Relation rel, + List *targetAttrs); + static List *plan_direct_update(PlannerInfo *root, + ModifyTable *plan, + Index resultRelation, + int subplan_index, + Relation rel, + List *targetAttrs); static void prepare_foreign_modify(PgFdwModifyState *fmstate); static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, TupleTableSlot *slot); ! static void store_returning_result(TupleTableSlot *slot, ! PGresult *res, ! int row, ! Relation rel, ! AttInMetadata *attinmeta, ! List *retrieved_attrs, ! MemoryContext temp_context); static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, HeapTuple *rows, int targrows, double *totalrows, *************** *** 848,854 **** postgresGetForeignPlan(PlannerInfo *root, * Build the fdw_private list that will be available to the executor. * Items in the list must match enum FdwScanPrivateIndex, above. */ ! fdw_private = list_make2(makeString(sql.data), retrieved_attrs); /* --- 894,901 ---- * Build the fdw_private list that will be available to the executor. * Items in the list must match enum FdwScanPrivateIndex, above. */ ! fdw_private = list_make3(makeString(sql.data), ! remote_conds, retrieved_attrs); /* *************** *** 910,931 **** postgresBeginForeignScan(ForeignScanState *node, int eflags) server = GetForeignServer(table->serverid); user = GetUserMapping(userid, server->serverid); /* * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ fsstate->conn = GetConnection(server, user, false); - /* Assign a unique ID for my cursor */ - fsstate->cursor_number = GetCursorNumber(fsstate->conn); - fsstate->cursor_exists = false; - - /* Get private info created by planner functions. */ - fsstate->query = strVal(list_nth(fsplan->fdw_private, - FdwScanPrivateSelectSql)); - fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private, - FdwScanPrivateRetrievedAttrs); - /* Create contexts for batches of tuples and per-tuple temp workspace. */ fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt, "postgres_fdw tuple data", --- 957,996 ---- server = GetForeignServer(table->serverid); user = GetUserMapping(userid, server->serverid); + /* Get private info created by planner functions. */ + if (list_length(fsplan->fdw_private) >= MaxFdwScanFdwPrivateLength) + { + Assert(list_length(fsplan->fdw_private) == MaxFdwScanFdwPrivateLength); + + fsstate->query = strVal(list_nth(fsplan->fdw_private, + FdwScanPrivateUpdateSql)); + fsstate->set_processed = intVal(list_nth(fsplan->fdw_private, + FdwScanPrivateSetProcessed)); + fsstate->has_returning = intVal(list_nth(fsplan->fdw_private, + FdwScanPrivateHasReturning)); + fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private, + FdwScanPrivateRetrievedAttrsByReturning); + + fsstate->direct_update = true; + } + else + { + Assert(list_length(fsplan->fdw_private) == MinFdwScanFdwPrivateLength); + + fsstate->query = strVal(list_nth(fsplan->fdw_private, + FdwScanPrivateSelectSql)); + fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private, + FdwScanPrivateRetrievedAttrsBySelect); + + fsstate->direct_update = false; + } + /* * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ fsstate->conn = GetConnection(server, user, false); /* Create contexts for batches of tuples and per-tuple temp workspace. */ fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt, "postgres_fdw tuple data", *************** *** 941,946 **** postgresBeginForeignScan(ForeignScanState *node, int eflags) --- 1006,1037 ---- /* Get info we'll need for input data conversion. */ fsstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(fsstate->rel)); + if (fsstate->direct_update) + { + /* + * Execute the update statement, and check for success. + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + fsstate->result = PQexec(fsstate->conn, fsstate->query); + if (PQresultStatus(fsstate->result) != + (fsstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) + pgfdw_report_error(ERROR, fsstate->result, fsstate->conn, true, + fsstate->query); + + /* Check number of rows affected. */ + if (fsstate->has_returning) + fsstate->num_tuples = PQntuples(fsstate->result); + else + fsstate->num_tuples = atoi(PQcmdTuples(fsstate->result)); + return; + } + + /* Assign a unique ID for my cursor */ + fsstate->cursor_number = GetCursorNumber(fsstate->conn); + fsstate->cursor_exists = false; + /* Prepare for output conversion of parameters used in remote query. */ numParams = list_length(fsplan->fdw_exprs); fsstate->numParams = numParams; *************** *** 990,995 **** postgresIterateForeignScan(ForeignScanState *node) --- 1081,1129 ---- PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; + if (fsstate->direct_update) + { + MemoryContext oldcontext; + + if (!fsstate->has_returning) + { + EState *estate = node->ss.ps.state; + + if (fsstate->set_processed) + estate->es_processed += fsstate->num_tuples; + return ExecClearTuple(slot); + } + + /* If we didn't get any tuples, must be end of data. */ + if (fsstate->next_tuple >= fsstate->num_tuples) + return ExecClearTuple(slot); + + /* We'll store RETURNING tuples in the batch_cxt. */ + oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt); + + /* Fetch the next tuple. */ + store_returning_result(slot, + fsstate->result, + fsstate->next_tuple, + fsstate->rel, + fsstate->attinmeta, + fsstate->retrieved_attrs, + fsstate->temp_cxt); + + MemoryContextSwitchTo(oldcontext); + + /* Save the result. */ + fsstate->rslot = slot; + fsstate->next_tuple++; + + /* + * Return slot. Note that this is safe because that there are no local + * conditions and because that the tuple contained in the slot is + * projected safely and then ignored (see also plan_direct_update). + */ + return slot; + } + /* * If this is the first call after Begin or ReScan, we need to create the * cursor on the remote side. *************** *** 1090,1095 **** postgresEndForeignScan(ForeignScanState *node) --- 1224,1236 ---- if (fsstate == NULL) return; + if (fsstate->direct_update) + { + /* Clean up */ + if (fsstate->result) + PQclear(fsstate->result); + } + /* Close the cursor if open, to prevent accumulation of cursors */ if (fsstate->cursor_exists) close_cursor(fsstate->conn, fsstate->cursor_number); *************** *** 1206,1211 **** postgresPlanForeignModify(PlannerInfo *root, --- 1347,1378 ---- } /* + * For an UPDATE/DELETE command, if there are no local conditions or joins + * needed (see check_direct_update for more details), let the scan node do + * UPDATE/DELETE RETURNING and then do nothing at ModifyTable. + */ + if (operation == CMD_UPDATE || operation == CMD_DELETE) + { + /* Check whether it's safe to do direct update. */ + if (check_direct_update(root, plan, + resultRelation, + subplan_index, + rel, targetAttrs)) + { + List *fdw_private; + + /* OK, generate a plan to do direct update. */ + fdw_private = plan_direct_update(root, plan, + resultRelation, + subplan_index, + rel, targetAttrs); + + heap_close(rel, NoLock); + return fdw_private; + } + } + + /* * Extract the relevant RETURNING list if any. */ if (plan->returningLists) *************** *** 1284,1289 **** postgresBeginForeignModify(ModifyTableState *mtstate, --- 1451,1489 ---- fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState)); fmstate->rel = rel; + /* Deconstruct fdw_private data. */ + fmstate->query = strVal(list_nth(fdw_private, + FdwModifyPrivateUpdateSql)); + fmstate->target_attrs = (List *) list_nth(fdw_private, + FdwModifyPrivateTargetAttnums); + fmstate->has_returning = intVal(list_nth(fdw_private, + FdwModifyPrivateHasReturning)); + fmstate->retrieved_attrs = (List *) list_nth(fdw_private, + FdwModifyPrivateRetrievedAttrs); + + if (fmstate->query == NULL) + { + PlanState *node = mtstate->mt_plans[subplan_index]; + PgFdwScanState *fsstate; + + Assert(fmstate->target_attrs == NIL); + Assert(fmstate->has_returning == false); + Assert(fmstate->retrieved_attrs == NIL); + + Assert(nodeTag(node) == T_ForeignScanState); + fsstate = ((ForeignScanState *) node)->fdw_state; + + Assert(fsstate->direct_update); + fmstate->direct_update = true; + if (fsstate->has_returning) + { + fmstate->has_returning = true; + fmstate->fsstate = fsstate; + } + resultRelInfo->ri_FdwState = fmstate; + return; + } + /* * Identify which user to do the remote access as. This should match what * ExecCheckRTEPerms() does. *************** *** 1300,1315 **** postgresBeginForeignModify(ModifyTableState *mtstate, fmstate->conn = GetConnection(server, user, true); fmstate->p_name = NULL; /* prepared statement not made yet */ - /* Deconstruct fdw_private data. */ - fmstate->query = strVal(list_nth(fdw_private, - FdwModifyPrivateUpdateSql)); - fmstate->target_attrs = (List *) list_nth(fdw_private, - FdwModifyPrivateTargetAttnums); - fmstate->has_returning = intVal(list_nth(fdw_private, - FdwModifyPrivateHasReturning)); - fmstate->retrieved_attrs = (List *) list_nth(fdw_private, - FdwModifyPrivateRetrievedAttrs); - /* Create context for per-tuple temp workspace. */ fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, "postgres_fdw temporary data", --- 1500,1505 ---- *************** *** 1407,1413 **** postgresExecForeignInsert(EState *estate, { n_rows = PQntuples(res); if (n_rows > 0) ! store_returning_result(fmstate, slot, res); } else n_rows = atoi(PQcmdTuples(res)); --- 1597,1607 ---- { n_rows = PQntuples(res); if (n_rows > 0) ! store_returning_result(slot, res, 0, ! fmstate->rel, ! fmstate->attinmeta, ! fmstate->retrieved_attrs, ! fmstate->temp_cxt); } else n_rows = atoi(PQcmdTuples(res)); *************** *** 1438,1443 **** postgresExecForeignUpdate(EState *estate, --- 1632,1645 ---- PGresult *res; int n_rows; + /* Return slot created in the ForeignScan node when doing direct update. */ + if (fmstate->direct_update) + { + Assert(fmstate->has_returning); + Assert(fmstate->fsstate->rslot); + return fmstate->fsstate->rslot; + } + /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) prepare_foreign_modify(fmstate); *************** *** 1477,1483 **** postgresExecForeignUpdate(EState *estate, { n_rows = PQntuples(res); if (n_rows > 0) ! store_returning_result(fmstate, slot, res); } else n_rows = atoi(PQcmdTuples(res)); --- 1679,1689 ---- { n_rows = PQntuples(res); if (n_rows > 0) ! store_returning_result(slot, res, 0, ! fmstate->rel, ! fmstate->attinmeta, ! fmstate->retrieved_attrs, ! fmstate->temp_cxt); } else n_rows = atoi(PQcmdTuples(res)); *************** *** 1508,1513 **** postgresExecForeignDelete(EState *estate, --- 1714,1727 ---- PGresult *res; int n_rows; + /* Return slot created in the ForeignScan node when doing direct update. */ + if (fmstate->direct_update) + { + Assert(fmstate->has_returning); + Assert(fmstate->fsstate->rslot); + return fmstate->fsstate->rslot; + } + /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) prepare_foreign_modify(fmstate); *************** *** 1547,1553 **** postgresExecForeignDelete(EState *estate, { n_rows = PQntuples(res); if (n_rows > 0) ! store_returning_result(fmstate, slot, res); } else n_rows = atoi(PQcmdTuples(res)); --- 1761,1771 ---- { n_rows = PQntuples(res); if (n_rows > 0) ! store_returning_result(slot, res, 0, ! fmstate->rel, ! fmstate->attinmeta, ! fmstate->retrieved_attrs, ! fmstate->temp_cxt); } else n_rows = atoi(PQcmdTuples(res)); *************** *** 1575,1580 **** postgresEndForeignModify(EState *estate, --- 1793,1802 ---- if (fmstate == NULL) return; + /* If doing direct update, there is nothing to do */ + if (fmstate->direct_update) + return; + /* If we created a prepared statement, destroy it */ if (fmstate->p_name) { *************** *** 1653,1663 **** postgresExplainForeignScan(ForeignScanState *node, ExplainState *es) { List *fdw_private; char *sql; if (es->verbose) { fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private; ! sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); ExplainPropertyText("Remote SQL", sql, es); } } --- 1875,1900 ---- { List *fdw_private; char *sql; + bool direct_update; if (es->verbose) { fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private; ! if (list_length(fdw_private) >= MaxFdwScanFdwPrivateLength) ! { ! Assert(list_length(fdw_private) == MaxFdwScanFdwPrivateLength); ! direct_update = true; ! } ! else ! { ! Assert(list_length(fdw_private) == MinFdwScanFdwPrivateLength); ! direct_update = false; ! } ! ! if (direct_update) ! sql = strVal(list_nth(fdw_private, FdwScanPrivateUpdateSql)); ! else ! sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); ExplainPropertyText("Remote SQL", sql, es); } } *************** *** 1678,1684 **** postgresExplainForeignModify(ModifyTableState *mtstate, char *sql = strVal(list_nth(fdw_private, FdwModifyPrivateUpdateSql)); ! ExplainPropertyText("Remote SQL", sql, es); } } --- 1915,1922 ---- char *sql = strVal(list_nth(fdw_private, FdwModifyPrivateUpdateSql)); ! if (sql != NULL) ! ExplainPropertyText("Remote SQL", sql, es); } } *************** *** 1907,1912 **** ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, --- 2145,2338 ---- } /* + * Check whether it's safe to update the foreign table directly. + */ + static bool + check_direct_update(PlannerInfo *root, + ModifyTable *plan, + Index resultRelation, + int subplan_index, + Relation rel, + List *targetAttrs) + { + RelOptInfo *baserel = root->simple_rel_array[resultRelation]; + Plan *subplan = (Plan *) list_nth(plan->plans, subplan_index); + ListCell *lc; + + if (rel->trigdesc && + (rel->trigdesc->trig_update_after_row || + rel->trigdesc->trig_update_before_row)) + return false; + + if (nodeTag(subplan) != T_ForeignScan || subplan->qual != NIL) + return false; + + foreach(lc, targetAttrs) + { + int attnum = lfirst_int(lc); + TargetEntry *tle = get_tle_by_resno(subplan->targetlist, + attnum); + + if (!is_foreign_expr(root, baserel, (Expr *) tle->expr)) + return false; + } + + return true; + } + + /* + * Generate a plan to update the foreign table directly. + */ + static List * + plan_direct_update(PlannerInfo *root, + ModifyTable *plan, + Index resultRelation, + int subplan_index, + Relation rel, + List *targetAttrs) + { + CmdType operation = plan->operation; + bool canSetTag = plan->canSetTag; + Plan *subplan = (Plan *) list_nth(plan->plans, subplan_index); + ForeignScan *fscan = (ForeignScan *) subplan; + StringInfoData sql; + List *remote_conds; + List *returningList = NIL; + List *retrieved_attrs = NIL; + List *new_tlist = NIL; + List *fdw_private; + + Assert(operation == CMD_UPDATE || operation == CMD_DELETE); + + initStringInfo(&sql); + + /* + * Extract the baserestrictinfo clauses that can be evaluated remotely. + */ + remote_conds = (List *) list_nth(fscan->fdw_private, + FdwScanPrivateRemoteConds); + + /* + * Extract the relevant RETURNING list if any. + */ + if (plan->returningLists) + returningList = (List *) list_nth(plan->returningLists, subplan_index); + + /* + * Construct the SQL command string. + */ + if (operation == CMD_UPDATE) + { + List *targetlist = subplan->targetlist; + + deparseDirectUpdateSql(&sql, root, resultRelation, rel, + remote_conds, + targetlist, + targetAttrs, + returningList, + &retrieved_attrs); + } + else + { + Assert(operation == CMD_DELETE); + + deparseDirectDeleteSql(&sql, root, resultRelation, rel, + remote_conds, + returningList, + &retrieved_attrs); + } + + /* + * Update the fdw_private list that will be available to the executor. + * Items in the list must match enum FdwScanPrivateIndex, above. + */ + fscan->fdw_private = lappend(fscan->fdw_private, makeString(sql.data)); + fscan->fdw_private = lappend(fscan->fdw_private, makeInteger(canSetTag)); + fscan->fdw_private = lappend(fscan->fdw_private, + makeInteger((retrieved_attrs != NIL))); + fscan->fdw_private = lappend(fscan->fdw_private, retrieved_attrs); + + /* + * Rrewrite the targetlist for an UPDATE command for safety of ExecProject. + * Note we ignore and do not reference result tuples in direct update case. + */ + if (operation == CMD_UPDATE) + { + ListCell *lc; + int attrno = 1; + int numattrs = RelationGetNumberOfAttributes(rel); + + foreach(lc, subplan->targetlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + + if (tle->resjunk) + { + new_tlist = lappend(new_tlist, tle); + continue; + } + + if (attrno > numattrs) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("table row type and query-specified row type do not match"), + errdetail("Query has too many columns."))); + + if (!list_member_int(targetAttrs, attrno)) + new_tlist = lappend(new_tlist, tle); + else + { + Form_pg_attribute attr; + Oid atttype; + int32 atttypmod; + Oid attcollation; + Node *new_expr; + TargetEntry *new_tle; + + attr = rel->rd_att->attrs[attrno - 1]; + + Assert(!attr->attisdropped); + atttype = attr->atttypid; + atttypmod = attr->atttypmod; + attcollation = attr->attcollation; + + new_expr = (Node *) makeVar(resultRelation, + attrno, + atttype, + atttypmod, + attcollation, + 0); + + new_tle = makeTargetEntry((Expr *) new_expr, + attrno, + pstrdup(NameStr(attr->attname)), + false); + + new_tlist = lappend(new_tlist, new_tle); + } + + attrno++; + } + + if (attrno != numattrs + 1) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("table row type and query-specified row type do not match"), + errdetail("Query has too few columns."))); + + subplan->targetlist = new_tlist; + } + + /* + * Build the fdw_private list that will be available to the executor. + * Items in the list must match enum FdwModifyPrivateIndex, above. + */ + fdw_private = list_make4(makeString(NULL), NIL, makeInteger(false), NIL); + + return fdw_private; + } + + /* * Create cursor for node's query with current parameter values. */ static void *************** *** 2254,2272 **** convert_prep_stmt_params(PgFdwModifyState *fmstate, * have PG_TRY blocks to ensure this happens. */ static void ! store_returning_result(PgFdwModifyState *fmstate, ! TupleTableSlot *slot, PGresult *res) { /* PGresult must be released before leaving this function. */ PG_TRY(); { HeapTuple newtup; ! newtup = make_tuple_from_result_row(res, 0, ! fmstate->rel, ! fmstate->attinmeta, ! fmstate->retrieved_attrs, ! fmstate->temp_cxt); /* tuple will be deleted when it is cleared from the slot */ ExecStoreTuple(newtup, slot, InvalidBuffer, true); } --- 2680,2703 ---- * have PG_TRY blocks to ensure this happens. */ static void ! store_returning_result(TupleTableSlot *slot, ! PGresult *res, ! int row, ! Relation rel, ! AttInMetadata *attinmeta, ! List *retrieved_attrs, ! MemoryContext temp_context) { /* PGresult must be released before leaving this function. */ PG_TRY(); { HeapTuple newtup; ! newtup = make_tuple_from_result_row(res, row, ! rel, ! attinmeta, ! retrieved_attrs, ! temp_context); /* tuple will be deleted when it is cleared from the slot */ ExecStoreTuple(newtup, slot, InvalidBuffer, true); } *** a/contrib/postgres_fdw/postgres_fdw.h --- b/contrib/postgres_fdw/postgres_fdw.h *************** *** 66,75 **** extern void deparseUpdateSql(StringInfo buf, PlannerInfo *root, --- 66,87 ---- Index rtindex, Relation rel, List *targetAttrs, List *returningList, List **retrieved_attrs); + extern void deparseDirectUpdateSql(StringInfo buf, PlannerInfo *root, + Index rtindex, Relation rel, + List *remote_conds, + List *targetlist, + List *targetAttrs, + List *returningList, + List **retrieved_attrs); extern void deparseDeleteSql(StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, List *returningList, List **retrieved_attrs); + extern void deparseDirectDeleteSql(StringInfo buf, PlannerInfo *root, + Index rtindex, Relation rel, + List *remote_conds, + List *returningList, + List **retrieved_attrs); extern void deparseAnalyzeSizeSql(StringInfo buf, Relation rel); extern void deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs);