From a4ce235442d38ebf751c4711bdd4b685db0c35ef Mon Sep 17 00:00:00 2001 From: Matheus Alcantara Date: Wed, 28 Jan 2026 19:55:48 -0300 Subject: [PATCH v15 2/2] postgres_fdw: Use COPY as remote SQL when possible When a user executes COPY on a foreign table, postgres_fdw previously sent INSERT statements to the foreign server. This commit enables using the COPY protocol instead, which is significantly faster for bulk inserts. The COPY protocol is used when copying data to a foreign table that has no triggers. Triggers are excluded because they might modify the inserted row, and since COPY does not support a RETURNING clause, we cannot synchronize the local TupleTableSlot with those changes for use in local AFTER triggers. This uses the CopyEscapeText() function introduced on 051b56b4f95 to properly escape text values for the COPY protocol. Author: Matheus Alcantara Reviewed-By: Tomas Vondra Reviewed-By: Jakub Wartak Reviewed-By: jian he Reviewed-By: Dewei Dai Reviewed-By: Masahiko Sawada Discussion: https://www.postgresql.org/message-id/flat/DDIZJ217OUDK.2R5WE4OGL5PTY%40gmail.com --- contrib/postgres_fdw/deparse.c | 56 +++++ .../postgres_fdw/expected/postgres_fdw.out | 75 ++++++- contrib/postgres_fdw/postgres_fdw.c | 202 +++++++++++++++++- contrib/postgres_fdw/postgres_fdw.h | 1 + contrib/postgres_fdw/sql/postgres_fdw.sql | 82 +++++++ 5 files changed, 407 insertions(+), 9 deletions(-) diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index 2dcc6c8af1b..b34792e7dc1 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -2177,6 +2177,62 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte, withCheckOptionList, returningList, retrieved_attrs); } +/* + * Build a COPY FROM STDIN statement using the TEXT format + */ +void +deparseCopySql(StringInfo buf, Relation rel, List *target_attrs) +{ + Oid relid = RelationGetRelid(rel); + TupleDesc tupdesc = RelationGetDescr(rel); + bool first = true; + int nattrs = list_length(target_attrs); + + appendStringInfo(buf, "COPY "); + deparseRelation(buf, rel); + if (nattrs > 0) + appendStringInfoChar(buf, '('); + + foreach_int(attnum, target_attrs) + { + Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); + char *colname; + List *options; + ListCell *lc; + + if (attr->attgenerated) + continue; + + if (!first) + appendStringInfoString(buf, ", "); + + first = false; + + /* Use attribute name or column_name option. */ + colname = NameStr(attr->attname); + options = GetForeignColumnOptions(relid, attnum); + foreach(lc, options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "column_name") == 0) + { + colname = defGetString(def); + break; + } + } + + appendStringInfoString(buf, quote_identifier(colname)); + } + if (nattrs > 0) + appendStringInfoString(buf, ") FROM STDIN"); + else + appendStringInfoString(buf, " FROM STDIN"); + + appendStringInfoString(buf, " (FORMAT TEXT)"); +} + + /* * rebuild remote INSERT statement * diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index e90289e4ab1..ae655545074 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -7654,6 +7654,28 @@ select * from grem1; (2 rows) delete from grem1; +-- test that fdw also use COPY FROM as a remote sql +set client_min_messages to 'log'; +create function insert_or_copy() returns trigger as $$ +declare query text; +begin + query := current_query(); + raise notice '%', query; +return new; +end; +$$ language plpgsql; +CREATE TRIGGER trig_row_before +BEFORE INSERT OR UPDATE OR DELETE ON gloc1 +FOR EACH ROW EXECUTE PROCEDURE insert_or_copy(); +copy grem1 from stdin; +LOG: received message via remote connection: NOTICE: COPY public.gloc1(a) FROM STDIN (FORMAT TEXT) +drop trigger trig_row_before on gloc1; +reset client_min_messages; +-- test that copy does not fail with column_name alias +create table gloc2(xxx int); +create foreign table grem2(a int) server loopback options(table_name 'gloc2'); +alter foreign table grem2 alter column a options (column_name 'xxx'); +copy grem2 from stdin; -- test batch insert alter server loopback options (add batch_size '10'); explain (verbose, costs off) @@ -7671,16 +7693,18 @@ insert into grem1 (a) values (1), (2); select * from gloc1; a | b | c ---+---+--- + 3 | 6 | 1 | 2 | 2 | 4 | -(2 rows) +(3 rows) select * from grem1; a | b | c ---+---+--- + 3 | 6 | 9 1 | 2 | 3 2 | 4 | 6 -(2 rows) +(3 rows) delete from grem1; -- batch insert with foreign partitions. @@ -7705,6 +7729,12 @@ select count(*) from tab_batch_sharded; drop table tab_batch_local; drop table tab_batch_sharded; drop table tab_batch_sharded_p1_remote; +-- test batch insert using copy +set client_min_messages to 'debug1'; +copy grem1 from stdin; +DEBUG: foreign modify with COPY batch_size: 10 +DEBUG: foreign modify with COPY batch_size: 10 +reset client_min_messages; alter server loopback options (drop batch_size); -- =================================================================== -- test local triggers @@ -9595,7 +9625,8 @@ copy rem2 from stdin; copy rem2 from stdin; -- ERROR ERROR: new row for relation "loc2" violates check constraint "loc2_f1positive" DETAIL: Failing row contains (-1, xyzzy). -CONTEXT: remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2) +CONTEXT: COPY loc2, line 1: "-1 xyzzy" +remote SQL command: COPY public.loc2(f1, f2) FROM STDIN (FORMAT TEXT) COPY rem2, line 1: "-1 xyzzy" select * from rem2; f1 | f2 @@ -9752,7 +9783,8 @@ copy rem2 from stdin; copy rem2 from stdin; -- ERROR ERROR: new row for relation "loc2" violates check constraint "loc2_f1positive" DETAIL: Failing row contains (-1, xyzzy). -CONTEXT: remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2) +CONTEXT: COPY loc2, line 1: "-1 xyzzy" +remote SQL command: COPY public.loc2(f1, f2) FROM STDIN (FORMAT TEXT) COPY rem2 select * from rem2; f1 | f2 @@ -9791,6 +9823,41 @@ select * from rem2; drop trigger trig_null on loc2; delete from rem2; +-- Test COPY FROM with column list and special characters +copy rem2 (f1, f2) from stdin; +select * from rem2; + f1 | f2 +----+------- + 1 | hello+ + | world +(1 row) + +delete from rem2; +-- Test COPY with NULL and special characters +copy rem2 from stdin; +select * from rem2; + f1 | f2 +----+----- + 1 | + | bar + 3 | a"b +(3 rows) + +delete from rem2; +-- Test that float numbers do not loose precision when sending to the foreign +-- server +create table f(a float); +create foreign table f_fdw(a float) server loopback options(table_name 'f'); +set extra_float_digits = 0; +copy f_fdw from stdin; +reset extra_float_digits; +select * from f; + a +-------------------- + 1.0000000000000002 +(1 row) + +drop table f; -- Check with zero-column foreign table; batch insert will be disabled alter table loc2 drop column f1; alter table loc2 drop column f2; diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 0a589f8db74..7bde40ffe92 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -18,6 +18,7 @@ #include "access/sysattr.h" #include "access/table.h" #include "catalog/pg_opfamily.h" +#include "commands/copy.h" #include "commands/defrem.h" #include "commands/explain_format.h" #include "commands/explain_state.h" @@ -27,6 +28,7 @@ #include "executor/spi.h" #include "foreign/fdwapi.h" #include "funcapi.h" +#include "mb/pg_wchar.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" @@ -67,6 +69,9 @@ PG_MODULE_MAGIC_EXT( /* If no remote estimates, assume a sort costs 20% extra */ #define DEFAULT_FDW_SORT_MULTIPLIER 1.2 +/* Buffer size to send COPY IN data*/ +#define COPYBUFSIZ 8192 + /* * Indexes of FDW-private information stored in fdw_private lists. * @@ -202,6 +207,8 @@ typedef struct PgFdwModifyState bool has_returning; /* is there a RETURNING clause? */ List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ + bool use_copy; /* use COPY protocol for ExecForeignInsert? */ + /* info about parameters for prepared statement */ AttrNumber ctidAttno; /* attnum of input resjunk ctid column */ int p_nums; /* number of parameters to transmit */ @@ -760,6 +767,13 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo, const PgFdwRelationInfo *fpinfo_o, const PgFdwRelationInfo *fpinfo_i); static int get_batch_size_option(Relation rel); +static TupleTableSlot **execute_foreign_modify_using_copy(PgFdwModifyState *fmstate, + TupleTableSlot **slots, + int *numSlots); +static void convert_slot_to_copy_text(StringInfo buf, + PgFdwModifyState *fmstate, + TupleTableSlot *slot); +static void appendStringInfoText(StringInfo buf, const char *string); /* @@ -2381,11 +2395,12 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, RangeTblEntry *rte; TupleDesc tupdesc = RelationGetDescr(rel); int attnum; - int values_end_len; + int values_end_len = 0; StringInfoData sql; List *targetAttrs = NIL; List *retrieved_attrs = NIL; bool doNothing = false; + bool useCopy = false; /* * If the foreign table we are about to insert routed rows into is also an @@ -2463,11 +2478,38 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, rte = exec_rt_fetch(resultRelation, estate); } + /* + * We can use COPY for remote inserts only if all the following conditions + * are met: + * + * Direct Execution: The command is a COPY FROM on the foreign table + * itself, not part of a partitioned table's tuple routing. + * + * No Local AFTER Triggers: There are no AFTER ROW triggers defined + * locally on the foreign table. + * + * Remote triggers might modify the inserted row. Because the COPY + * protocol does not support a RETURNING clause, we cannot retrieve those + * changes to synchronize the local TupleTableSlot required by local AFTER + * triggers. + */ + if (resultRelInfo->ri_RootResultRelInfo == NULL) + { + /* There is no RETURNING clause on COPY */ + Assert(resultRelInfo->ri_returningList == NIL); + + useCopy = (resultRelInfo->ri_TrigDesc == NULL || + !resultRelInfo->ri_TrigDesc->trig_insert_after_row); + } + /* Construct the SQL command string. */ - deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing, - resultRelInfo->ri_WithCheckOptions, - resultRelInfo->ri_returningList, - &retrieved_attrs, &values_end_len); + if (useCopy) + deparseCopySql(&sql, rel, targetAttrs); + else + deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing, + resultRelInfo->ri_WithCheckOptions, + resultRelInfo->ri_returningList, + &retrieved_attrs, &values_end_len); /* Construct an execution state. */ fmstate = create_foreign_modify(mtstate->ps.state, @@ -2480,6 +2522,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, values_end_len, retrieved_attrs != NIL, retrieved_attrs); + fmstate->use_copy = useCopy; /* * If the given resultRelInfo already has PgFdwModifyState set, it means @@ -4309,6 +4352,9 @@ execute_foreign_modify(EState *estate, operation == CMD_UPDATE || operation == CMD_DELETE); + if (fmstate->use_copy) + return execute_foreign_modify_using_copy(fmstate, slots, numSlots); + /* First, process a pending asynchronous request, if any. */ if (fmstate->conn_state->pendingAreq) process_pending_request(fmstate->conn_state->pendingAreq); @@ -8835,3 +8881,149 @@ get_batch_size_option(Relation rel) return batch_size; } + +/* + * execute_foreign_modify_using_copy + * Perform foreign-table modification using the COPY command. + */ +static TupleTableSlot ** +execute_foreign_modify_using_copy(PgFdwModifyState *fmstate, + TupleTableSlot **slots, + int *numSlots) +{ + PGresult *res; + StringInfoData copy_data; + int n_rows; + int i; + int nestlevel; + + Assert(fmstate->use_copy == true); + + elog(DEBUG1, "foreign modify with COPY batch_size: %d", fmstate->batch_size); + + /* Make sure any constants in the slots are printed portably */ + nestlevel = set_transmission_modes(); + + /* Send COPY command */ + if (!PQsendQuery(fmstate->conn, fmstate->query)) + pgfdw_report_error(NULL, fmstate->conn, fmstate->query); + + /* get the COPY result */ + res = pgfdw_get_result(fmstate->conn); + if (PQresultStatus(res) != PGRES_COPY_IN) + pgfdw_report_error(res, fmstate->conn, fmstate->query); + + /* Clean up the COPY command result */ + PQclear(res); + + /* Convert the TupleTableSlot data into a TEXT-formatted line */ + initStringInfo(©_data); + for (i = 0; i < *numSlots; i++) + { + convert_slot_to_copy_text(©_data, fmstate, slots[i]); + + /* + * Send initial COPY data if the buffer reaches the limit to avoid + * large memory usage. + */ + if (copy_data.len >= COPYBUFSIZ) + { + if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0) + pgfdw_report_error(NULL, fmstate->conn, fmstate->query); + resetStringInfo(©_data); + } + } + + /* Send the remaining COPY data */ + if (copy_data.len > 0) + { + if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0) + pgfdw_report_error(NULL, fmstate->conn, fmstate->query); + } + + pfree(copy_data.data); + + /* End the COPY operation */ + if (PQputCopyEnd(fmstate->conn, NULL) < 0 || PQflush(fmstate->conn)) + pgfdw_report_error(NULL, fmstate->conn, fmstate->query); + + /* + * Get the result, and check for success. + */ + res = pgfdw_get_result(fmstate->conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(res, fmstate->conn, fmstate->query); + + n_rows = atoi(PQcmdTuples(res)); + + /* And clean up */ + PQclear(res); + + reset_transmission_modes(nestlevel); + + MemoryContextReset(fmstate->temp_cxt); + + *numSlots = n_rows; + + /* + * Return NULL if nothing was inserted on the remote end + */ + return (n_rows > 0) ? slots : NULL; +} + +/* + * Write target attribute values from fmstate into buf buffer to be sent as + * COPY FROM STDIN data + */ +static void +convert_slot_to_copy_text(StringInfo buf, + PgFdwModifyState *fmstate, + TupleTableSlot *slot) +{ + TupleDesc tupdesc = RelationGetDescr(fmstate->rel); + bool first = true; + int i = 0; + + foreach_int(attnum, fmstate->target_attrs) + { + CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum - 1); + Datum datum; + bool isnull; + + /* Ignore generated columns; they are set to DEFAULT */ + if (attr->attgenerated) + continue; + + if (!first) + appendStringInfoCharMacro(buf, '\t'); + first = false; + + datum = slot_getattr(slot, attnum, &isnull); + + if (isnull) + appendStringInfoString(buf, "\\N"); + else + { + const char *value = OutputFunctionCall(&fmstate->p_flinfo[i], + datum); + + /* Escape the value if needed */ + appendStringInfoText(buf, value); + } + i++; + } + + appendStringInfoCharMacro(buf, '\n'); +} + +/* Append a string to buf, escaping special characters for COPY TEXT format. */ +static void +appendStringInfoText(StringInfo buf, const char *string) +{ + CopyEscapeText(buf, + string, + '\t', + PG_UTF8, + false, + false); +} diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index a2bb1ff352c..fc6922ddd4f 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -204,6 +204,7 @@ extern void rebuildInsertSql(StringInfo buf, Relation rel, char *orig_query, List *target_attrs, int values_end_len, int num_params, int num_rows); +extern void deparseCopySql(StringInfo buf, Relation rel, List *target_attrs); extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index dfc58beb0d2..db95b148ca8 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -1970,6 +1970,37 @@ select * from gloc1; select * from grem1; delete from grem1; +-- test that fdw also use COPY FROM as a remote sql +set client_min_messages to 'log'; + +create function insert_or_copy() returns trigger as $$ +declare query text; +begin + query := current_query(); + raise notice '%', query; +return new; +end; +$$ language plpgsql; + +CREATE TRIGGER trig_row_before +BEFORE INSERT OR UPDATE OR DELETE ON gloc1 +FOR EACH ROW EXECUTE PROCEDURE insert_or_copy(); + +copy grem1 from stdin; +3 +\. + +drop trigger trig_row_before on gloc1; +reset client_min_messages; + +-- test that copy does not fail with column_name alias +create table gloc2(xxx int); +create foreign table grem2(a int) server loopback options(table_name 'gloc2'); +alter foreign table grem2 alter column a options (column_name 'xxx'); +copy grem2 from stdin; +1 +\. + -- test batch insert alter server loopback options (add batch_size '10'); explain (verbose, costs off) @@ -1996,6 +2027,24 @@ drop table tab_batch_local; drop table tab_batch_sharded; drop table tab_batch_sharded_p1_remote; +-- test batch insert using copy +set client_min_messages to 'debug1'; +copy grem1 from stdin; +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +\. +reset client_min_messages; + alter server loopback options (drop batch_size); -- =================================================================== @@ -3060,6 +3109,39 @@ drop trigger trig_null on loc2; delete from rem2; +-- Test COPY FROM with column list and special characters +copy rem2 (f1, f2) from stdin; +1 hello\nworld +\. +select * from rem2; + +delete from rem2; + +-- Test COPY with NULL and special characters +copy rem2 from stdin; +1 \N +\N bar +3 a"b +\. +select * from rem2; + +delete from rem2; + +-- Test that float numbers do not loose precision when sending to the foreign +-- server +create table f(a float); +create foreign table f_fdw(a float) server loopback options(table_name 'f'); + +set extra_float_digits = 0; +copy f_fdw from stdin; +1.0000000000000002 +\. + +reset extra_float_digits; +select * from f; + +drop table f; + -- Check with zero-column foreign table; batch insert will be disabled alter table loc2 drop column f1; alter table loc2 drop column f2; -- 2.50.1 (Apple Git-155)