From 54f1194ef096b74ecf2405cc6afff95902885189 Mon Sep 17 00:00:00 2001 From: Matheus Alcantara Date: Wed, 28 Jan 2026 19:55:48 -0300 Subject: [PATCH v10] postgres_fdw: Use COPY as remote SQL when possible Previously when an user execute a COPY on a foreign table, postgres_fdw send a INSERT as a remote SQL to the foreign server. This commit introduce the ability to use the COPY command instead. The COPY command will only be used when an user execute a COPY on a foreign table and also the foreign table should not have triggers because remote triggers might modify the inserted row and since COPY does not support a RETURNING clause, we cannot synchronize the local TupleTableSlot with those changes for use in local AFTER triggers, so if the foreign table has any trigger INSERT will be used. Discussion: https://www.postgresql.org/message-id/flat/DDIZJ217OUDK.2R5WE4OGL5PTY%40gmail.com --- contrib/postgres_fdw/deparse.c | 36 ++++ .../postgres_fdw/expected/postgres_fdw.out | 6 +- contrib/postgres_fdw/postgres_fdw.c | 169 +++++++++++++++++- contrib/postgres_fdw/postgres_fdw.h | 1 + 4 files changed, 206 insertions(+), 6 deletions(-) diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index ebe2c3a596a..04829b6ee45 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -2177,6 +2177,42 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte, withCheckOptionList, returningList, retrieved_attrs); } +/* + * Build a COPY FROM STDIN statement using the TEXT format + */ +void +deparseCopySql(StringInfo buf, Relation rel, List *target_attrs) +{ + TupleDesc tupdesc = RelationGetDescr(rel); + bool first = true; + int nattrs = list_length(target_attrs); + + appendStringInfo(buf, "COPY "); + deparseRelation(buf, rel); + if (nattrs > 0) + appendStringInfo(buf, "("); + + foreach_int(attnum, target_attrs) + { + Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); + + if (attr->attgenerated) + continue; + + if (!first) + appendStringInfoString(buf, ", "); + + first = false; + + appendStringInfoString(buf, quote_identifier(NameStr(attr->attname))); + } + if (nattrs > 0) + appendStringInfoString(buf, ") FROM STDIN"); + else + appendStringInfoString(buf, " FROM STDIN"); +} + + /* * rebuild remote INSERT statement * diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 6066510c7c0..2725f067223 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -9533,7 +9533,8 @@ copy rem2 from stdin; copy rem2 from stdin; -- ERROR ERROR: new row for relation "loc2" violates check constraint "loc2_f1positive" DETAIL: Failing row contains (-1, xyzzy). -CONTEXT: remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2) +CONTEXT: COPY loc2, line 1: "-1 xyzzy" +remote SQL command: COPY public.loc2(f1, f2) FROM STDIN COPY rem2, line 1: "-1 xyzzy" select * from rem2; f1 | f2 @@ -9690,7 +9691,8 @@ copy rem2 from stdin; copy rem2 from stdin; -- ERROR ERROR: new row for relation "loc2" violates check constraint "loc2_f1positive" DETAIL: Failing row contains (-1, xyzzy). -CONTEXT: remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2) +CONTEXT: COPY loc2, line 1: "-1 xyzzy" +remote SQL command: COPY public.loc2(f1, f2) FROM STDIN COPY rem2 select * from rem2; f1 | f2 diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 3572689e33b..9630bae3146 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -63,6 +63,9 @@ PG_MODULE_MAGIC_EXT( /* If no remote estimates, assume a sort costs 20% extra */ #define DEFAULT_FDW_SORT_MULTIPLIER 1.2 +/* Buffer size to send COPY IN data*/ +#define COPYBUFSIZ 8192 + /* * Indexes of FDW-private information stored in fdw_private lists. * @@ -198,6 +201,8 @@ typedef struct PgFdwModifyState bool has_returning; /* is there a RETURNING clause? */ List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ + bool use_copy; + /* info about parameters for prepared statement */ AttrNumber ctidAttno; /* attnum of input resjunk ctid column */ int p_nums; /* number of parameters to transmit */ @@ -545,6 +550,12 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo, const PgFdwRelationInfo *fpinfo_o, const PgFdwRelationInfo *fpinfo_i); static int get_batch_size_option(Relation rel); +static TupleTableSlot **execute_foreign_modify_using_copy(PgFdwModifyState *fmstate, + TupleTableSlot **slots, + int *numSlots); +static void convert_slot_to_copy_text(StringInfo buf, + PgFdwModifyState *fmstate, + TupleTableSlot *slot); /* @@ -2170,6 +2181,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, List *targetAttrs = NIL; List *retrieved_attrs = NIL; bool doNothing = false; + bool useCopy = false; /* * If the foreign table we are about to insert routed rows into is also an @@ -2247,11 +2259,31 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, rte = exec_rt_fetch(resultRelation, estate); } + /* + * We can use COPY for remote inserts only if the following conditions are + * met: 1. The insert is not part of a partitioned table's tuple routing + * (ri_RootResultRelInfo == NULL). 2. There are no local triggers on the + * foreign table (ri_TrigDesc == NULL). Remote triggers might modify the + * inserted row; since COPY does not support a RETURNING clause, we cannot + * synchronize the local TupleTableSlot with those changes for use in + * local AFTER triggers. 3. There is no explicit RETURNING clause + * requested by the query. + */ + useCopy = resultRelInfo->ri_RootResultRelInfo == NULL + && resultRelInfo->ri_TrigDesc == NULL + && resultRelInfo->ri_returningList == NIL; + /* Construct the SQL command string. */ - deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing, - resultRelInfo->ri_WithCheckOptions, - resultRelInfo->ri_returningList, - &retrieved_attrs, &values_end_len); + if (useCopy) + { + deparseCopySql(&sql, rel, targetAttrs); + values_end_len = 0; /* Keep compiler quiet */ + } + else + deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing, + resultRelInfo->ri_WithCheckOptions, + resultRelInfo->ri_returningList, + &retrieved_attrs, &values_end_len); /* Construct an execution state. */ fmstate = create_foreign_modify(mtstate->ps.state, @@ -2264,6 +2296,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, values_end_len, retrieved_attrs != NIL, retrieved_attrs); + fmstate->use_copy = useCopy; /* * If the given resultRelInfo already has PgFdwModifyState set, it means @@ -4093,6 +4126,9 @@ execute_foreign_modify(EState *estate, operation == CMD_UPDATE || operation == CMD_DELETE); + if (fmstate->use_copy) + return execute_foreign_modify_using_copy(fmstate, slots, numSlots); + /* First, process a pending asynchronous request, if any. */ if (fmstate->conn_state->pendingAreq) process_pending_request(fmstate->conn_state->pendingAreq); @@ -7886,3 +7922,128 @@ get_batch_size_option(Relation rel) return batch_size; } + +/* + * execute_foreign_modify_using_copy + * Perform foreign-table modification using the COPY command. + */ +static TupleTableSlot ** +execute_foreign_modify_using_copy(PgFdwModifyState *fmstate, + TupleTableSlot **slots, + int *numSlots) +{ + PGresult *res; + StringInfoData copy_data; + int n_rows; + int i; + + Assert(fmstate->use_copy == true); + + /* Send COPY command */ + if (!PQsendQuery(fmstate->conn, fmstate->query)) + pgfdw_report_error(NULL, fmstate->conn, fmstate->query); + + /* get the COPY result */ + res = pgfdw_get_result(fmstate->conn); + if (PQresultStatus(res) != PGRES_COPY_IN) + pgfdw_report_error(res, fmstate->conn, fmstate->query); + + /* Clean up the COPY command result */ + PQclear(res); + + /* Convert the TupleTableSlot data into a TEXT-formatted line */ + initStringInfo(©_data); + for (i = 0; i < *numSlots; i++) + { + convert_slot_to_copy_text(©_data, fmstate, slots[i]); + + /* + * Send initial COPY data if the buffer reaches the limit to avoid + * large memory usage. + */ + if (copy_data.len >= COPYBUFSIZ) + { + if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0) + pgfdw_report_error(NULL, fmstate->conn, fmstate->query); + resetStringInfo(©_data); + } + } + + /* Send the remaining COPY data */ + if (copy_data.len > 0) + { + if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0) + pgfdw_report_error(NULL, fmstate->conn, fmstate->query); + } + + pfree(copy_data.data); + + /* End the COPY operation */ + if (PQputCopyEnd(fmstate->conn, NULL) < 0 || PQflush(fmstate->conn)) + pgfdw_report_error(NULL, fmstate->conn, fmstate->query); + + /* + * Get the result, and check for success. + */ + res = pgfdw_get_result(fmstate->conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(res, fmstate->conn, fmstate->query); + + n_rows = atoi(PQcmdTuples(res)); + + /* And clean up */ + PQclear(res); + + MemoryContextReset(fmstate->temp_cxt); + + *numSlots = n_rows; + + /* + * Return NULL if nothing was inserted on the remote end + */ + return (n_rows > 0) ? slots : NULL; +} + +/* + * Write target attribute values from fmstate into buf buffer to be sent as + * COPY FROM STDIN data + */ +static void +convert_slot_to_copy_text(StringInfo buf, + PgFdwModifyState *fmstate, + TupleTableSlot *slot) +{ + TupleDesc tupdesc = RelationGetDescr(fmstate->rel); + bool first = true; + int i = 0; + + foreach_int(attnum, fmstate->target_attrs) + { + CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum - 1); + Datum datum; + bool isnull; + + /* Ignore generated columns; they are set to DEFAULT */ + if (attr->attgenerated) + continue; + + if (!first) + appendStringInfoCharMacro(buf, '\t'); + first = false; + + datum = slot_getattr(slot, attnum, &isnull); + + if (isnull) + appendStringInfoString(buf, "\\N"); + else + { + const char *value = OutputFunctionCall(&fmstate->p_flinfo[i], + datum); + + appendStringInfoString(buf, value); + } + i++; + } + + appendStringInfoCharMacro(buf, '\n'); +} diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index a2bb1ff352c..fc6922ddd4f 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -204,6 +204,7 @@ extern void rebuildInsertSql(StringInfo buf, Relation rel, char *orig_query, List *target_attrs, int values_end_len, int num_params, int num_rows); +extern void deparseCopySql(StringInfo buf, Relation rel, List *target_attrs); extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, -- 2.51.2