From c344c5ba3d3d7c0e0e9f47e279442cbc7f252a6d Mon Sep 17 00:00:00 2001 From: Matheus Alcantara Date: Wed, 28 Jan 2026 19:55:48 -0300 Subject: [PATCH v13] postgres_fdw: Use COPY as remote SQL when possible Previously when an user execute a COPY on a foreign table, postgres_fdw send a INSERT as a remote SQL to the foreign server. This commit introduce the ability to use the COPY command instead. The COPY command will only be used when an user execute a COPY on a foreign table and also the foreign table should not have triggers because remote triggers might modify the inserted row and since COPY does not support a RETURNING clause, we cannot synchronize the local TupleTableSlot with those changes for use in local AFTER triggers, so if the foreign table has any trigger INSERT will be used. Author: Matheus Alcantara Reviewed-By: Tomas Vondra Reviewed-By: Jakub Wartak Reviewed-By: jian he Reviewed-By: Dewei Dai Reviewed-By: Masahiko Sawada Discussion: https://www.postgresql.org/message-id/flat/DDIZJ217OUDK.2R5WE4OGL5PTY%40gmail.com --- contrib/postgres_fdw/deparse.c | 56 ++++ .../postgres_fdw/expected/postgres_fdw.out | 64 ++++- contrib/postgres_fdw/postgres_fdw.c | 265 +++++++++++++++++- contrib/postgres_fdw/postgres_fdw.h | 1 + contrib/postgres_fdw/sql/postgres_fdw.sql | 72 +++++ 5 files changed, 449 insertions(+), 9 deletions(-) diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index c159ecd1558..9ca0b05c771 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -2177,6 +2177,62 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte, withCheckOptionList, returningList, retrieved_attrs); } +/* + * Build a COPY FROM STDIN statement using the TEXT format + */ +void +deparseCopySql(StringInfo buf, Relation rel, List *target_attrs) +{ + Oid relid = RelationGetRelid(rel); + TupleDesc tupdesc = RelationGetDescr(rel); + bool first = true; + int nattrs = list_length(target_attrs); + + appendStringInfo(buf, "COPY "); + deparseRelation(buf, rel); + if (nattrs > 0) + appendStringInfoChar(buf, '('); + + foreach_int(attnum, target_attrs) + { + Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); + char *colname; + List *options; + ListCell *lc; + + if (attr->attgenerated) + continue; + + if (!first) + appendStringInfoString(buf, ", "); + + first = false; + + /* Use attribute name or column_name option. */ + colname = NameStr(attr->attname); + options = GetForeignColumnOptions(relid, attnum); + foreach(lc, options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "column_name") == 0) + { + colname = defGetString(def); + break; + } + } + + appendStringInfoString(buf, quote_identifier(colname)); + } + if (nattrs > 0) + appendStringInfoString(buf, ") FROM STDIN"); + else + appendStringInfoString(buf, " FROM STDIN"); + + appendStringInfoString(buf, " (FORMAT TEXT)"); +} + + /* * rebuild remote INSERT statement * diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 0f5271d476e..d4e3dff47a0 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -7611,6 +7611,28 @@ select * from grem1; (2 rows) delete from grem1; +-- test that fdw also use COPY FROM as a remote sql +set client_min_messages to 'log'; +create function insert_or_copy() returns trigger as $$ +declare query text; +begin + query := current_query(); + raise notice '%', query; +return new; +end; +$$ language plpgsql; +CREATE TRIGGER trig_row_before +BEFORE INSERT OR UPDATE OR DELETE ON gloc1 +FOR EACH ROW EXECUTE PROCEDURE insert_or_copy(); +copy grem1 from stdin; +LOG: received message via remote connection: NOTICE: COPY public.gloc1(a) FROM STDIN (FORMAT TEXT) +drop trigger trig_row_before on gloc1; +reset client_min_messages; +-- test that copy does not fail with column_name alias +create table gloc2(xxx int); +create foreign table grem2(a int) server loopback options(table_name 'gloc2'); +alter foreign table grem2 alter column a options (column_name 'xxx'); +copy grem2 from stdin; -- test batch insert alter server loopback options (add batch_size '10'); explain (verbose, costs off) @@ -7628,16 +7650,18 @@ insert into grem1 (a) values (1), (2); select * from gloc1; a | b | c ---+---+--- + 3 | 6 | 1 | 2 | 2 | 4 | -(2 rows) +(3 rows) select * from grem1; a | b | c ---+---+--- + 3 | 6 | 9 1 | 2 | 3 2 | 4 | 6 -(2 rows) +(3 rows) delete from grem1; -- batch insert with foreign partitions. @@ -7662,6 +7686,12 @@ select count(*) from tab_batch_sharded; drop table tab_batch_local; drop table tab_batch_sharded; drop table tab_batch_sharded_p1_remote; +-- test batch insert using copy +set client_min_messages to 'debug1'; +copy grem1 from stdin; +DEBUG: foreign modify with COPY batch_size: 10 +DEBUG: foreign modify with COPY batch_size: 10 +reset client_min_messages; alter server loopback options (drop batch_size); -- =================================================================== -- test local triggers @@ -9552,7 +9582,8 @@ copy rem2 from stdin; copy rem2 from stdin; -- ERROR ERROR: new row for relation "loc2" violates check constraint "loc2_f1positive" DETAIL: Failing row contains (-1, xyzzy). -CONTEXT: remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2) +CONTEXT: COPY loc2, line 1: "-1 xyzzy" +remote SQL command: COPY public.loc2(f1, f2) FROM STDIN (FORMAT TEXT) COPY rem2, line 1: "-1 xyzzy" select * from rem2; f1 | f2 @@ -9709,7 +9740,8 @@ copy rem2 from stdin; copy rem2 from stdin; -- ERROR ERROR: new row for relation "loc2" violates check constraint "loc2_f1positive" DETAIL: Failing row contains (-1, xyzzy). -CONTEXT: remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2) +CONTEXT: COPY loc2, line 1: "-1 xyzzy" +remote SQL command: COPY public.loc2(f1, f2) FROM STDIN (FORMAT TEXT) COPY rem2 select * from rem2; f1 | f2 @@ -9748,6 +9780,30 @@ select * from rem2; drop trigger trig_null on loc2; delete from rem2; +-- Test COPY FROM with column list and special characters +copy rem2 (f1, f2) from stdin; +select * from rem2; + f1 | f2 +----+------- + 1 | hello+ + | world +(1 row) + +delete from rem2; +-- Test that float numbers do not loose precision when sending to the foreign +-- server +create table f(a float); +create foreign table f_fdw(a float) server loopback options(table_name 'f'); +set extra_float_digits = 0; +copy f_fdw from stdin; +reset extra_float_digits; +select * from f; + a +-------------------- + 1.0000000000000002 +(1 row) + +drop table f; -- Check with zero-column foreign table; batch insert will be disabled alter table loc2 drop column f1; alter table loc2 drop column f2; diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 41e47cc795b..c0e7c5d6fde 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -64,6 +64,9 @@ PG_MODULE_MAGIC_EXT( /* If no remote estimates, assume a sort costs 20% extra */ #define DEFAULT_FDW_SORT_MULTIPLIER 1.2 +/* Buffer size to send COPY IN data*/ +#define COPYBUFSIZ 8192 + /* * Indexes of FDW-private information stored in fdw_private lists. * @@ -199,6 +202,8 @@ typedef struct PgFdwModifyState bool has_returning; /* is there a RETURNING clause? */ List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ + bool use_copy; + /* info about parameters for prepared statement */ AttrNumber ctidAttno; /* attnum of input resjunk ctid column */ int p_nums; /* number of parameters to transmit */ @@ -546,6 +551,13 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo, const PgFdwRelationInfo *fpinfo_o, const PgFdwRelationInfo *fpinfo_i); static int get_batch_size_option(Relation rel); +static TupleTableSlot **execute_foreign_modify_using_copy(PgFdwModifyState *fmstate, + TupleTableSlot **slots, + int *numSlots); +static void convert_slot_to_copy_text(StringInfo buf, + PgFdwModifyState *fmstate, + TupleTableSlot *slot); +static void appendStringInfoText(StringInfo buf, const char *string); /* @@ -2166,11 +2178,12 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, RangeTblEntry *rte; TupleDesc tupdesc = RelationGetDescr(rel); int attnum; - int values_end_len; + int values_end_len = 0; StringInfoData sql; List *targetAttrs = NIL; List *retrieved_attrs = NIL; bool doNothing = false; + bool useCopy = false; /* * If the foreign table we are about to insert routed rows into is also an @@ -2248,11 +2261,43 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, rte = exec_rt_fetch(resultRelation, estate); } + /* + * We can use COPY for remote inserts only if all the following conditions + * are met: + * + * Direct Execution: The command is a COPY FROM on the foreign table + * itself, not part of a partitioned table's tuple routing. ( + * resultRelInfo->ri_RootResultRelInfo == NULL) + * + * No Check Options: There are no WITH CHECK OPTION constraints or + * Row-Level Security policies that need to be enforced locally + * (resultRelInfo->ri_WithCheckOptions == NIL). + * + * No Local AFTER Triggers: There are no AFTER ROW triggers defined + * locally on the foreign table. + * + * Remote triggers might modify the inserted row. Because the COPY + * protocol does not support a RETURNING clause, we cannot retrieve those + * changes to synchronize the local TupleTableSlot required by local AFTER + * triggers. + */ + if (resultRelInfo->ri_RootResultRelInfo == NULL && resultRelInfo->ri_WithCheckOptions == NIL) + { + /* There is no RETURNING clause on COPY */ + Assert(resultRelInfo->ri_returningList == NIL); + + useCopy = (resultRelInfo->ri_TrigDesc == NULL || + !resultRelInfo->ri_TrigDesc->trig_insert_after_row); + } + /* Construct the SQL command string. */ - deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing, - resultRelInfo->ri_WithCheckOptions, - resultRelInfo->ri_returningList, - &retrieved_attrs, &values_end_len); + if (useCopy) + deparseCopySql(&sql, rel, targetAttrs); + else + deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing, + resultRelInfo->ri_WithCheckOptions, + resultRelInfo->ri_returningList, + &retrieved_attrs, &values_end_len); /* Construct an execution state. */ fmstate = create_foreign_modify(mtstate->ps.state, @@ -2265,6 +2310,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, values_end_len, retrieved_attrs != NIL, retrieved_attrs); + fmstate->use_copy = useCopy; /* * If the given resultRelInfo already has PgFdwModifyState set, it means @@ -4094,6 +4140,9 @@ execute_foreign_modify(EState *estate, operation == CMD_UPDATE || operation == CMD_DELETE); + if (fmstate->use_copy) + return execute_foreign_modify_using_copy(fmstate, slots, numSlots); + /* First, process a pending asynchronous request, if any. */ if (fmstate->conn_state->pendingAreq) process_pending_request(fmstate->conn_state->pendingAreq); @@ -7887,3 +7936,209 @@ get_batch_size_option(Relation rel) return batch_size; } + +/* + * execute_foreign_modify_using_copy + * Perform foreign-table modification using the COPY command. + */ +static TupleTableSlot ** +execute_foreign_modify_using_copy(PgFdwModifyState *fmstate, + TupleTableSlot **slots, + int *numSlots) +{ + PGresult *res; + StringInfoData copy_data; + int n_rows; + int i; + int nestlevel; + + Assert(fmstate->use_copy == true); + + elog(DEBUG1, "foreign modify with COPY batch_size: %d", fmstate->batch_size); + + /* Make sure any constants in the slots are printed portably */ + nestlevel = set_transmission_modes(); + + /* Send COPY command */ + if (!PQsendQuery(fmstate->conn, fmstate->query)) + pgfdw_report_error(NULL, fmstate->conn, fmstate->query); + + /* get the COPY result */ + res = pgfdw_get_result(fmstate->conn); + if (PQresultStatus(res) != PGRES_COPY_IN) + pgfdw_report_error(res, fmstate->conn, fmstate->query); + + /* Clean up the COPY command result */ + PQclear(res); + + /* Convert the TupleTableSlot data into a TEXT-formatted line */ + initStringInfo(©_data); + for (i = 0; i < *numSlots; i++) + { + convert_slot_to_copy_text(©_data, fmstate, slots[i]); + + /* + * Send initial COPY data if the buffer reaches the limit to avoid + * large memory usage. + */ + if (copy_data.len >= COPYBUFSIZ) + { + if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0) + pgfdw_report_error(NULL, fmstate->conn, fmstate->query); + resetStringInfo(©_data); + } + } + + /* Send the remaining COPY data */ + if (copy_data.len > 0) + { + if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0) + pgfdw_report_error(NULL, fmstate->conn, fmstate->query); + } + + pfree(copy_data.data); + + /* End the COPY operation */ + if (PQputCopyEnd(fmstate->conn, NULL) < 0 || PQflush(fmstate->conn)) + pgfdw_report_error(NULL, fmstate->conn, fmstate->query); + + /* + * Get the result, and check for success. + */ + res = pgfdw_get_result(fmstate->conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(res, fmstate->conn, fmstate->query); + + n_rows = atoi(PQcmdTuples(res)); + + /* And clean up */ + PQclear(res); + + reset_transmission_modes(nestlevel); + + MemoryContextReset(fmstate->temp_cxt); + + *numSlots = n_rows; + + /* + * Return NULL if nothing was inserted on the remote end + */ + return (n_rows > 0) ? slots : NULL; +} + +/* + * Write target attribute values from fmstate into buf buffer to be sent as + * COPY FROM STDIN data + */ +static void +convert_slot_to_copy_text(StringInfo buf, + PgFdwModifyState *fmstate, + TupleTableSlot *slot) +{ + TupleDesc tupdesc = RelationGetDescr(fmstate->rel); + bool first = true; + int i = 0; + + foreach_int(attnum, fmstate->target_attrs) + { + CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum - 1); + Datum datum; + bool isnull; + + /* Ignore generated columns; they are set to DEFAULT */ + if (attr->attgenerated) + continue; + + if (!first) + appendStringInfoCharMacro(buf, '\t'); + first = false; + + datum = slot_getattr(slot, attnum, &isnull); + + if (isnull) + appendStringInfoString(buf, "\\N"); + else + { + const char *value = OutputFunctionCall(&fmstate->p_flinfo[i], + datum); + + /* Escape the value if needed */ + appendStringInfoText(buf, value); + } + i++; + } + + appendStringInfoCharMacro(buf, '\n'); +} + +/* + * Append a string to buf, escaping special characters for COPY TEXT format. + * Similar to CopyAttributeOutText(). + */ +static void +appendStringInfoText(StringInfo buf, const char *string) +{ + const char *ptr; + const char *start; + char c; + + ptr = string; + start = ptr; + + while ((c = *ptr) != '\0') + { + if ((unsigned char) c < (unsigned char) 0x20) + { + /* + * We use C-like backslash notation for the common control + * characters; other control chars are passed through as-is since + * they won't confuse the COPY protocol. + */ + switch (c) + { + case '\b': + c = 'b'; + break; + case '\f': + c = 'f'; + break; + case '\n': + c = 'n'; + break; + case '\r': + c = 'r'; + break; + case '\t': + c = 't'; + break; + case '\v': + c = 'v'; + break; + default: + /* All ASCII control chars are length 1 */ + ptr++; + continue; + } + /* Dump literal portion before the control character */ + if (ptr > start) + appendBinaryStringInfo(buf, start, ptr - start); + appendStringInfoChar(buf, '\\'); + appendStringInfoChar(buf, c); + start = ++ptr; + } + else if (c == '\\') + { + /* Backslash must be escaped as \\ */ + if (ptr > start) + appendBinaryStringInfo(buf, start, ptr - start); + appendStringInfoChar(buf, '\\'); + start = ptr++; + } + else + ptr++; + } + + /* Dump remaining literal portion */ + if (ptr > start) + appendBinaryStringInfo(buf, start, ptr - start); +} diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index a2bb1ff352c..fc6922ddd4f 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -204,6 +204,7 @@ extern void rebuildInsertSql(StringInfo buf, Relation rel, char *orig_query, List *target_attrs, int values_end_len, int num_params, int num_rows); +extern void deparseCopySql(StringInfo buf, Relation rel, List *target_attrs); extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 49ed797e8ef..9b0f2bc16a1 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -1936,6 +1936,37 @@ select * from gloc1; select * from grem1; delete from grem1; +-- test that fdw also use COPY FROM as a remote sql +set client_min_messages to 'log'; + +create function insert_or_copy() returns trigger as $$ +declare query text; +begin + query := current_query(); + raise notice '%', query; +return new; +end; +$$ language plpgsql; + +CREATE TRIGGER trig_row_before +BEFORE INSERT OR UPDATE OR DELETE ON gloc1 +FOR EACH ROW EXECUTE PROCEDURE insert_or_copy(); + +copy grem1 from stdin; +3 +\. + +drop trigger trig_row_before on gloc1; +reset client_min_messages; + +-- test that copy does not fail with column_name alias +create table gloc2(xxx int); +create foreign table grem2(a int) server loopback options(table_name 'gloc2'); +alter foreign table grem2 alter column a options (column_name 'xxx'); +copy grem2 from stdin; +1 +\. + -- test batch insert alter server loopback options (add batch_size '10'); explain (verbose, costs off) @@ -1962,6 +1993,24 @@ drop table tab_batch_local; drop table tab_batch_sharded; drop table tab_batch_sharded_p1_remote; +-- test batch insert using copy +set client_min_messages to 'debug1'; +copy grem1 from stdin; +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +\. +reset client_min_messages; + alter server loopback options (drop batch_size); -- =================================================================== @@ -3026,6 +3075,29 @@ drop trigger trig_null on loc2; delete from rem2; +-- Test COPY FROM with column list and special characters +copy rem2 (f1, f2) from stdin; +1 hello\nworld +\. +select * from rem2; + +delete from rem2; + +-- Test that float numbers do not loose precision when sending to the foreign +-- server +create table f(a float); +create foreign table f_fdw(a float) server loopback options(table_name 'f'); + +set extra_float_digits = 0; +copy f_fdw from stdin; +1.0000000000000002 +\. + +reset extra_float_digits; +select * from f; + +drop table f; + -- Check with zero-column foreign table; batch insert will be disabled alter table loc2 drop column f1; alter table loc2 drop column f2; -- 2.52.0