From 17bbb0b129adc96f595e2eb4641b4b601f9fbf6b Mon Sep 17 00:00:00 2001 From: Matheus Alcantara Date: Wed, 26 Nov 2025 16:34:46 -0300 Subject: [PATCH v7] postgres_fdw: speed up batch inserts using COPY Previously when the user execute a COPY into a foreign table the statement was translated into a INSERT statement to be executed on the foreign server. This commit introduce a new foreign table/server option "use_copy_for_batch_insert" to enable the usage of the COPY command instead of an INSERT. Another option "copy_for_batch_insert_threshold" was also added to switch to use the COPY command when the number of rows being inserted is >= than the configured value. This logic was implement on postgresBeginForeignInsert() that is the implementation of BeginForeignInsert() fdw routine. As this function is also called when inserting tuples into partitions the COPY can also be used to speed up batch inserts for table partitions that are postgres_fdw tables. --- contrib/postgres_fdw/deparse.c | 30 +++ .../postgres_fdw/expected/postgres_fdw.out | 13 + contrib/postgres_fdw/option.c | 13 +- contrib/postgres_fdw/postgres_fdw.c | 237 +++++++++++++++++- contrib/postgres_fdw/postgres_fdw.h | 1 + contrib/postgres_fdw/sql/postgres_fdw.sql | 13 + 6 files changed, 304 insertions(+), 3 deletions(-) diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index f2fb0051843..1cdf1d8cc8d 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -2236,6 +2236,36 @@ rebuildInsertSql(StringInfo buf, Relation rel, appendStringInfoString(buf, orig_query + values_end_len); } +/* + * Build a COPY FROM STDIN statement using the TEXT format + */ +void +deparseCopySql(StringInfo buf, Relation rel, List *target_attrs) +{ + TupleDesc tupdesc = RelationGetDescr(rel); + bool first = true; + + appendStringInfo(buf, "COPY "); + deparseRelation(buf, rel); + appendStringInfo(buf, "("); + + foreach_int(attnum, target_attrs) + { + Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); + + if (attr->attgenerated) + continue; + + if (!first) + appendStringInfoString(buf, ", "); + + first = false; + + appendStringInfoString(buf, quote_identifier(NameStr(attr->attname))); + } + appendStringInfoString(buf, ") FROM STDIN"); +} + /* * deparse remote UPDATE statement * diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index cd28126049d..9d06d9b6eb0 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -9524,6 +9524,19 @@ select * from rem2; 2 | bar (2 rows) +delete from rem2; +-- Test COPY with can_use_copy = true +alter foreign table rem2 options (add use_copy_for_batch_insert 'true', copy_for_batch_insert_threshold '2'); +-- Insert 3 rows so that the third row fallback to normal INSERT statement path +copy rem2 from stdin; +select * from rem2; + f1 | f2 +----+----- + 1 | foo + 2 | bar + 3 | baz +(3 rows) + delete from rem2; -- Test check constraints alter table loc2 add constraint loc2_f1positive check (f1 >= 0); diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c index 04788b7e8b3..d56b6cc142d 100644 --- a/contrib/postgres_fdw/option.c +++ b/contrib/postgres_fdw/option.c @@ -157,7 +157,8 @@ postgres_fdw_validator(PG_FUNCTION_ARGS) (void) ExtractExtensionList(defGetString(def), true); } else if (strcmp(def->defname, "fetch_size") == 0 || - strcmp(def->defname, "batch_size") == 0) + strcmp(def->defname, "batch_size") == 0 || + strcmp(def->defname, "copy_for_batch_insert_threshold") == 0) { char *value; int int_val; @@ -263,6 +264,16 @@ InitPgFdwOptions(void) /* batch_size is available on both server and table */ {"batch_size", ForeignServerRelationId, false}, {"batch_size", ForeignTableRelationId, false}, + /* use_copy_for_batch_insert is available on both server and table */ + {"use_copy_for_batch_insert", ForeignServerRelationId, false}, + {"use_copy_for_batch_insert", ForeignTableRelationId, false}, + + /* + * copy_for_batch_insert_threshold is available on both server and + * table + */ + {"copy_for_batch_insert_threshold", ForeignServerRelationId, false}, + {"copy_for_batch_insert_threshold", ForeignTableRelationId, false}, /* async_capable is available on both server and table */ {"async_capable", ForeignServerRelationId, false}, {"async_capable", ForeignTableRelationId, false}, diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 06b52c65300..02c6312a655 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -63,6 +63,9 @@ PG_MODULE_MAGIC_EXT( /* If no remote estimates, assume a sort costs 20% extra */ #define DEFAULT_FDW_SORT_MULTIPLIER 1.2 +/* Buffer size to send COPY IN data*/ +#define COPYBUFSIZ 8192 + /* * Indexes of FDW-private information stored in fdw_private lists. * @@ -198,6 +201,12 @@ typedef struct PgFdwModifyState bool has_returning; /* is there a RETURNING clause? */ List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ + /* COPY usage stuff */ + bool use_copy_for_batch_insert; /* COPY command is enabled to use? */ + int copy_for_batch_insert_threshold; /* # of rows to switch to + * use COPY */ + bool usingcopy; /* is COPY being used ? */ + /* info about parameters for prepared statement */ AttrNumber ctidAttno; /* attnum of input resjunk ctid column */ int p_nums; /* number of parameters to transmit */ @@ -545,6 +554,11 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo, const PgFdwRelationInfo *fpinfo_o, const PgFdwRelationInfo *fpinfo_i); static int get_batch_size_option(Relation rel); +static bool get_use_copy_for_batch_insert(Relation rel); +static int get_copy_for_batch_insert_threshold(Relation rel); +static TupleTableSlot **execute_foreign_insert_using_copy(PgFdwModifyState *fmstate, + TupleTableSlot **slots, + int *numSlots); /* @@ -2265,6 +2279,10 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, retrieved_attrs != NIL, retrieved_attrs); + fmstate->use_copy_for_batch_insert = get_use_copy_for_batch_insert(rel); + if (fmstate->use_copy_for_batch_insert) + fmstate->copy_for_batch_insert_threshold = get_copy_for_batch_insert_threshold(rel); + /* * If the given resultRelInfo already has PgFdwModifyState set, it means * the foreign table is an UPDATE subplan result rel; in which case, store @@ -4066,6 +4084,50 @@ create_foreign_modify(EState *estate, return fmstate; } +/* + * Write target attribute values from fmstate into buf buffer to be sent as + * COPY FROM STDIN data + */ +static void +convert_slot_to_copy_text(StringInfo buf, + PgFdwModifyState *fmstate, + TupleTableSlot *slot) +{ + TupleDesc tupdesc = RelationGetDescr(fmstate->rel); + bool first = true; + int i = 0; + + foreach_int(attnum, fmstate->target_attrs) + { + CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum - 1); + Datum datum; + bool isnull; + + /* Ignore generated columns; they are set to DEFAULT */ + if (attr->attgenerated) + continue; + + if (!first) + appendStringInfoCharMacro(buf, '\t'); + first = false; + + datum = slot_getattr(slot, attnum, &isnull); + + if (isnull) + appendStringInfoString(buf, "\\N"); + else + { + const char *value = OutputFunctionCall(&fmstate->p_flinfo[i], + datum); + + appendStringInfoString(buf, value); + } + i++; + } + + appendStringInfoCharMacro(buf, '\n'); +} + /* * execute_foreign_modify * Perform foreign-table modification as required, and fetch RETURNING @@ -4097,11 +4159,34 @@ execute_foreign_modify(EState *estate, if (fmstate->conn_state->pendingAreq) process_pending_request(fmstate->conn_state->pendingAreq); + /* + * Check if the COPY command can be used to speed up inserts. The COPY + * command can not be used if the original query has a RETURNING clause. + */ + if (operation == CMD_INSERT && + fmstate->use_copy_for_batch_insert && + !fmstate->has_returning && + *numSlots >= fmstate->copy_for_batch_insert_threshold) + { + + /* Build the COPY command if it's not already built */ + if (!fmstate->usingcopy) + { + pfree(fmstate->query); + initStringInfo(&sql); + deparseCopySql(&sql, fmstate->rel, fmstate->target_attrs); + fmstate->query = sql.data; + fmstate->usingcopy = true; + } + return execute_foreign_insert_using_copy(fmstate, slots, numSlots); + } + /* * If the existing query was deparsed and prepared for a different number - * of rows, rebuild it for the proper number. + * of rows or if COPY was being used on the previous execution, rebuild + * the INSERT statement and use the proper number. */ - if (operation == CMD_INSERT && fmstate->num_slots != *numSlots) + if ((operation == CMD_INSERT && fmstate->num_slots != *numSlots) || fmstate->usingcopy) { /* Destroy the prepared statement created previously */ if (fmstate->p_name) @@ -7886,3 +7971,151 @@ get_batch_size_option(Relation rel) return batch_size; } + +/* + * Determine if the usage of the COPY command to execute a INSERT into a foreign + * table is enabled. The option specified for a table has precedence. + */ +static bool +get_use_copy_for_batch_insert(Relation rel) +{ + Oid foreigntableid = RelationGetRelid(rel); + List *options = NIL; + ListCell *lc; + ForeignTable *table; + ForeignServer *server; + bool can_use_copy = false; + + /* + * Load options for table and server. We append server options after table + * options, because table options take precedence. + */ + table = GetForeignTable(foreigntableid); + server = GetForeignServer(table->serverid); + + options = list_concat(options, table->options); + options = list_concat(options, server->options); + + /* See if either table or server specifies enable_batch_with_copy. */ + foreach(lc, options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "use_copy_for_batch_insert") == 0) + { + (void) parse_bool(defGetString(def), &can_use_copy); + break; + } + } + return can_use_copy; +} + +static int +get_copy_for_batch_insert_threshold(Relation rel) +{ + Oid foreigntableid = RelationGetRelid(rel); + List *options = NIL; + ListCell *lc; + ForeignTable *table; + ForeignServer *server; + + /* + * We use 1 as default, which means that COPY will be used once + * "can_use_copy" is set to true. + */ + int copy_for_batch_insert_threshold = 1; + + /* + * Load options for table and server. We append server options after table + * options, because table options take precedence. + */ + table = GetForeignTable(foreigntableid); + server = GetForeignServer(table->serverid); + + options = list_concat(options, table->options); + options = list_concat(options, server->options); + + /* See if either table or server specifies enable_batch_with_copy. */ + foreach(lc, options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "copy_for_batch_insert_threshold") == 0) + { + (void) parse_int(defGetString(def), ©_for_batch_insert_threshold, 0, NULL); + break; + } + } + return copy_for_batch_insert_threshold; +} + +/* Execute an insert into a foreign table using the COPY command */ +static TupleTableSlot ** +execute_foreign_insert_using_copy(PgFdwModifyState *fmstate, + TupleTableSlot **slots, + int *numSlots) +{ + PGresult *res; + StringInfoData copy_data; + int n_rows; + int i; + + /* Send COPY command */ + if (!PQsendQuery(fmstate->conn, fmstate->query)) + pgfdw_report_error(NULL, fmstate->conn, fmstate->query); + + /* get the COPY result */ + res = pgfdw_get_result(fmstate->conn); + if (PQresultStatus(res) != PGRES_COPY_IN) + pgfdw_report_error(res, fmstate->conn, fmstate->query); + + /* Convert the TupleTableSlot data into a TEXT-formatted line */ + initStringInfo(©_data); + for (i = 0; i < *numSlots; i++) + { + convert_slot_to_copy_text(©_data, fmstate, slots[i]); + + /* + * Send initial COPY data if the buffer reach the limit to avoid large + * memory usage. + */ + if (copy_data.len >= COPYBUFSIZ) + { + if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0) + pgfdw_report_error(NULL, fmstate->conn, fmstate->query); + resetStringInfo(©_data); + } + } + + /* Send the remaining COPY data */ + if (copy_data.len > 0) + { + if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0) + pgfdw_report_error(NULL, fmstate->conn, fmstate->query); + } + + /* End the COPY operation */ + if (PQputCopyEnd(fmstate->conn, NULL) < 0 || PQflush(fmstate->conn)) + pgfdw_report_error(NULL, fmstate->conn, fmstate->query); + + /* + * Get the result, and check for success. + */ + res = pgfdw_get_result(fmstate->conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(res, fmstate->conn, fmstate->query); + + n_rows = atoi(PQcmdTuples(res)); + + /* And clean up */ + PQclear(res); + + MemoryContextReset(fmstate->temp_cxt); + + *numSlots = n_rows; + + /* + * Return NULL if nothing was inserted on the remote end + */ + return (n_rows > 0) ? slots : NULL; +} diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index e69735298d7..aa54d6bba53 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -204,6 +204,7 @@ extern void rebuildInsertSql(StringInfo buf, Relation rel, char *orig_query, List *target_attrs, int values_end_len, int num_params, int num_rows); +extern void deparseCopySql(StringInfo buf, Relation rel, List *target_attrs); extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 9a8f9e28135..fac00c55553 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -2807,6 +2807,19 @@ select * from rem2; delete from rem2; +-- Test COPY with can_use_copy = true +alter foreign table rem2 options (add use_copy_for_batch_insert 'true', copy_for_batch_insert_threshold '2'); + +-- Insert 3 rows so that the third row fallback to normal INSERT statement path +copy rem2 from stdin; +1 foo +2 bar +3 baz +\. +select * from rem2; + +delete from rem2; + -- Test check constraints alter table loc2 add constraint loc2_f1positive check (f1 >= 0); alter foreign table rem2 add constraint rem2_f1positive check (f1 >= 0); -- 2.51.2