From f569b0da822e9ad0bef521b7f4f8532a45948577 Mon Sep 17 00:00:00 2001 From: Matheus Alcantara Date: Wed, 28 Jan 2026 19:55:48 -0300 Subject: [PATCH v11] postgres_fdw: Use COPY as remote SQL when possible Previously when an user execute a COPY on a foreign table, postgres_fdw send a INSERT as a remote SQL to the foreign server. This commit introduce the ability to use the COPY command instead. The COPY command will only be used when an user execute a COPY on a foreign table and also the foreign table should not have triggers because remote triggers might modify the inserted row and since COPY does not support a RETURNING clause, we cannot synchronize the local TupleTableSlot with those changes for use in local AFTER triggers, so if the foreign table has any trigger INSERT will be used. Author: Matheus Alcantara Reviewed-By: Tomas Vondra Reviewed-By: Jakub Wartak Reviewed-By: jian he Reviewed-By: Dewei Dai Reviewed-By: Masahiko Sawada Discussion: https://www.postgresql.org/message-id/flat/DDIZJ217OUDK.2R5WE4OGL5PTY%40gmail.com --- contrib/postgres_fdw/deparse.c | 36 ++++ .../postgres_fdw/expected/postgres_fdw.out | 33 +++- contrib/postgres_fdw/postgres_fdw.c | 179 +++++++++++++++++- contrib/postgres_fdw/postgres_fdw.h | 1 + contrib/postgres_fdw/sql/postgres_fdw.sql | 27 +++ 5 files changed, 268 insertions(+), 8 deletions(-) diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index ebe2c3a596a..04829b6ee45 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -2177,6 +2177,42 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte, withCheckOptionList, returningList, retrieved_attrs); } +/* + * Build a COPY FROM STDIN statement using the TEXT format + */ +void +deparseCopySql(StringInfo buf, Relation rel, List *target_attrs) +{ + TupleDesc tupdesc = RelationGetDescr(rel); + bool first = true; + int nattrs = list_length(target_attrs); + + appendStringInfo(buf, "COPY "); + deparseRelation(buf, rel); + if (nattrs > 0) + appendStringInfo(buf, "("); + + foreach_int(attnum, target_attrs) + { + Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); + + if (attr->attgenerated) + continue; + + if (!first) + appendStringInfoString(buf, ", "); + + first = false; + + appendStringInfoString(buf, quote_identifier(NameStr(attr->attname))); + } + if (nattrs > 0) + appendStringInfoString(buf, ") FROM STDIN"); + else + appendStringInfoString(buf, " FROM STDIN"); +} + + /* * rebuild remote INSERT statement * diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 2ccb72c539a..22c2dcdb6b1 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -7603,6 +7603,27 @@ select * from grem1; (2 rows) delete from grem1; +-- test that fdw also use COPY FROM as a remote sql +set client_min_messages to 'log'; +create function insert_or_copy() returns trigger as $$ +declare query text; +begin + query := current_query(); + if query ~* '^COPY' then + raise notice 'COPY command'; + elsif query ~* '^INSERT' then + raise notice 'INSERT command'; + end if; +return new; +end; +$$ language plpgsql; +CREATE TRIGGER trig_row_before +BEFORE INSERT OR UPDATE OR DELETE ON gloc1 +FOR EACH ROW EXECUTE PROCEDURE insert_or_copy(); +copy grem1 from stdin; +LOG: received message via remote connection: NOTICE: COPY command +drop trigger trig_row_before on gloc1; +reset client_min_messages; -- test batch insert alter server loopback options (add batch_size '10'); explain (verbose, costs off) @@ -7620,16 +7641,18 @@ insert into grem1 (a) values (1), (2); select * from gloc1; a | b | c ---+---+--- + 3 | 6 | 1 | 2 | 2 | 4 | -(2 rows) +(3 rows) select * from grem1; a | b | c ---+---+--- + 3 | 6 | 9 1 | 2 | 3 2 | 4 | 6 -(2 rows) +(3 rows) delete from grem1; -- batch insert with foreign partitions. @@ -9544,7 +9567,8 @@ copy rem2 from stdin; copy rem2 from stdin; -- ERROR ERROR: new row for relation "loc2" violates check constraint "loc2_f1positive" DETAIL: Failing row contains (-1, xyzzy). -CONTEXT: remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2) +CONTEXT: COPY loc2, line 1: "-1 xyzzy" +remote SQL command: COPY public.loc2(f1, f2) FROM STDIN COPY rem2, line 1: "-1 xyzzy" select * from rem2; f1 | f2 @@ -9701,7 +9725,8 @@ copy rem2 from stdin; copy rem2 from stdin; -- ERROR ERROR: new row for relation "loc2" violates check constraint "loc2_f1positive" DETAIL: Failing row contains (-1, xyzzy). -CONTEXT: remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2) +CONTEXT: COPY loc2, line 1: "-1 xyzzy" +remote SQL command: COPY public.loc2(f1, f2) FROM STDIN COPY rem2 select * from rem2; f1 | f2 diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 60d90329a65..6dd22e4043d 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -63,6 +63,9 @@ PG_MODULE_MAGIC_EXT( /* If no remote estimates, assume a sort costs 20% extra */ #define DEFAULT_FDW_SORT_MULTIPLIER 1.2 +/* Buffer size to send COPY IN data*/ +#define COPYBUFSIZ 8192 + /* * Indexes of FDW-private information stored in fdw_private lists. * @@ -198,6 +201,8 @@ typedef struct PgFdwModifyState bool has_returning; /* is there a RETURNING clause? */ List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ + bool use_copy; + /* info about parameters for prepared statement */ AttrNumber ctidAttno; /* attnum of input resjunk ctid column */ int p_nums; /* number of parameters to transmit */ @@ -545,6 +550,12 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo, const PgFdwRelationInfo *fpinfo_o, const PgFdwRelationInfo *fpinfo_i); static int get_batch_size_option(Relation rel); +static TupleTableSlot **execute_foreign_modify_using_copy(PgFdwModifyState *fmstate, + TupleTableSlot **slots, + int *numSlots); +static void convert_slot_to_copy_text(StringInfo buf, + PgFdwModifyState *fmstate, + TupleTableSlot *slot); /* @@ -2170,6 +2181,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, List *targetAttrs = NIL; List *retrieved_attrs = NIL; bool doNothing = false; + bool useCopy = false; /* * If the foreign table we are about to insert routed rows into is also an @@ -2247,11 +2259,41 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, rte = exec_rt_fetch(resultRelation, estate); } + /* + * We can use COPY for remote inserts only if all the following conditions + * are met: + * + * Direct Execution: The command is a COPY FROM on the foreign table + * itself, not part of a partitioned table's tuple routing. ( + * resultRelInfo->ri_RootResultRelInfo == NULL) + * + * No Local AFTER Triggers: There are no AFTER ROW triggers defined locally + * on the foreign table. + * + * Remote triggers might modify the inserted row. Because the COPY protocol + * does not support a RETURNING clause, we cannot retrieve those changes to + * synchronize the local TupleTableSlot required by local AFTER triggers. + */ + if (resultRelInfo->ri_RootResultRelInfo == NULL) + { + /* There is no RETURNING clause on COPY */ + Assert(resultRelInfo->ri_returningList == NIL); + + useCopy = (resultRelInfo->ri_TrigDesc == NULL || + !resultRelInfo->ri_TrigDesc->trig_insert_after_row); + } + /* Construct the SQL command string. */ - deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing, - resultRelInfo->ri_WithCheckOptions, - resultRelInfo->ri_returningList, - &retrieved_attrs, &values_end_len); + if (useCopy) + { + deparseCopySql(&sql, rel, targetAttrs); + values_end_len = 0; /* Keep compiler quiet */ + } + else + deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing, + resultRelInfo->ri_WithCheckOptions, + resultRelInfo->ri_returningList, + &retrieved_attrs, &values_end_len); /* Construct an execution state. */ fmstate = create_foreign_modify(mtstate->ps.state, @@ -2264,6 +2306,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, values_end_len, retrieved_attrs != NIL, retrieved_attrs); + fmstate->use_copy = useCopy; /* * If the given resultRelInfo already has PgFdwModifyState set, it means @@ -4093,6 +4136,9 @@ execute_foreign_modify(EState *estate, operation == CMD_UPDATE || operation == CMD_DELETE); + if (fmstate->use_copy) + return execute_foreign_modify_using_copy(fmstate, slots, numSlots); + /* First, process a pending asynchronous request, if any. */ if (fmstate->conn_state->pendingAreq) process_pending_request(fmstate->conn_state->pendingAreq); @@ -7886,3 +7932,128 @@ get_batch_size_option(Relation rel) return batch_size; } + +/* + * execute_foreign_modify_using_copy + * Perform foreign-table modification using the COPY command. + */ +static TupleTableSlot ** +execute_foreign_modify_using_copy(PgFdwModifyState *fmstate, + TupleTableSlot **slots, + int *numSlots) +{ + PGresult *res; + StringInfoData copy_data; + int n_rows; + int i; + + Assert(fmstate->use_copy == true); + + /* Send COPY command */ + if (!PQsendQuery(fmstate->conn, fmstate->query)) + pgfdw_report_error(NULL, fmstate->conn, fmstate->query); + + /* get the COPY result */ + res = pgfdw_get_result(fmstate->conn); + if (PQresultStatus(res) != PGRES_COPY_IN) + pgfdw_report_error(res, fmstate->conn, fmstate->query); + + /* Clean up the COPY command result */ + PQclear(res); + + /* Convert the TupleTableSlot data into a TEXT-formatted line */ + initStringInfo(©_data); + for (i = 0; i < *numSlots; i++) + { + convert_slot_to_copy_text(©_data, fmstate, slots[i]); + + /* + * Send initial COPY data if the buffer reaches the limit to avoid + * large memory usage. + */ + if (copy_data.len >= COPYBUFSIZ) + { + if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0) + pgfdw_report_error(NULL, fmstate->conn, fmstate->query); + resetStringInfo(©_data); + } + } + + /* Send the remaining COPY data */ + if (copy_data.len > 0) + { + if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0) + pgfdw_report_error(NULL, fmstate->conn, fmstate->query); + } + + pfree(copy_data.data); + + /* End the COPY operation */ + if (PQputCopyEnd(fmstate->conn, NULL) < 0 || PQflush(fmstate->conn)) + pgfdw_report_error(NULL, fmstate->conn, fmstate->query); + + /* + * Get the result, and check for success. + */ + res = pgfdw_get_result(fmstate->conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(res, fmstate->conn, fmstate->query); + + n_rows = atoi(PQcmdTuples(res)); + + /* And clean up */ + PQclear(res); + + MemoryContextReset(fmstate->temp_cxt); + + *numSlots = n_rows; + + /* + * Return NULL if nothing was inserted on the remote end + */ + return (n_rows > 0) ? slots : NULL; +} + +/* + * Write target attribute values from fmstate into buf buffer to be sent as + * COPY FROM STDIN data + */ +static void +convert_slot_to_copy_text(StringInfo buf, + PgFdwModifyState *fmstate, + TupleTableSlot *slot) +{ + TupleDesc tupdesc = RelationGetDescr(fmstate->rel); + bool first = true; + int i = 0; + + foreach_int(attnum, fmstate->target_attrs) + { + CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum - 1); + Datum datum; + bool isnull; + + /* Ignore generated columns; they are set to DEFAULT */ + if (attr->attgenerated) + continue; + + if (!first) + appendStringInfoCharMacro(buf, '\t'); + first = false; + + datum = slot_getattr(slot, attnum, &isnull); + + if (isnull) + appendStringInfoString(buf, "\\N"); + else + { + const char *value = OutputFunctionCall(&fmstate->p_flinfo[i], + datum); + + appendStringInfoString(buf, value); + } + i++; + } + + appendStringInfoCharMacro(buf, '\n'); +} diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index a2bb1ff352c..fc6922ddd4f 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -204,6 +204,7 @@ extern void rebuildInsertSql(StringInfo buf, Relation rel, char *orig_query, List *target_attrs, int values_end_len, int num_params, int num_rows); +extern void deparseCopySql(StringInfo buf, Relation rel, List *target_attrs); extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 72d2d9c311b..90246ddbd02 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -1929,6 +1929,33 @@ select * from gloc1; select * from grem1; delete from grem1; +-- test that fdw also use COPY FROM as a remote sql +set client_min_messages to 'log'; + +create function insert_or_copy() returns trigger as $$ +declare query text; +begin + query := current_query(); + if query ~* '^COPY' then + raise notice 'COPY command'; + elsif query ~* '^INSERT' then + raise notice 'INSERT command'; + end if; +return new; +end; +$$ language plpgsql; + +CREATE TRIGGER trig_row_before +BEFORE INSERT OR UPDATE OR DELETE ON gloc1 +FOR EACH ROW EXECUTE PROCEDURE insert_or_copy(); + +copy grem1 from stdin; +3 +\. + +drop trigger trig_row_before on gloc1; +reset client_min_messages; + -- test batch insert alter server loopback options (add batch_size '10'); explain (verbose, costs off) -- 2.52.0