From 0175f829cc2944eb596f18d701b64c2d90d8e2bb Mon Sep 17 00:00:00 2001 From: Matheus Alcantara Date: Fri, 10 Oct 2025 16:07:08 -0300 Subject: [PATCH v3] postgres_fdw: Use COPY to speed up batch inserts --- contrib/postgres_fdw/deparse.c | 32 ++++++ contrib/postgres_fdw/postgres_fdw.c | 159 +++++++++++++++++++++++++++- contrib/postgres_fdw/postgres_fdw.h | 1 + 3 files changed, 190 insertions(+), 2 deletions(-) diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index f2fb0051843..afd1cc636d7 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -2236,6 +2236,38 @@ rebuildInsertSql(StringInfo buf, Relation rel, appendStringInfoString(buf, orig_query + values_end_len); } +/* + * Build a COPY FROM STDIN statement using the TEXT format + */ +void +buildCopySql(StringInfo buf, Relation rel, List *target_attrs) +{ + ListCell *lc; + TupleDesc tupdesc = RelationGetDescr(rel); + bool first = true; + + appendStringInfo(buf, "COPY "); + deparseRelation(buf, rel); + appendStringInfo(buf, "("); + + foreach(lc, target_attrs) + { + int attnum = lfirst_int(lc); + Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); + + if (attr->attgenerated) + continue; + + if (!first) + appendStringInfoString(buf, ", "); + + first = false; + + appendStringInfoString(buf, quote_identifier(NameStr(attr->attname))); + } + appendStringInfoString(buf, ") FROM STDIN"); +} + /* * deparse remote UPDATE statement * diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 456b267f70b..0ccbff8e390 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -63,6 +63,9 @@ PG_MODULE_MAGIC_EXT( /* If no remote estimates, assume a sort costs 20% extra */ #define DEFAULT_FDW_SORT_MULTIPLIER 1.2 +/* Buffer size to send COPY IN data*/ +#define COPYBUFSIZ 8192 + /* * Indexes of FDW-private information stored in fdw_private lists. * @@ -192,6 +195,7 @@ typedef struct PgFdwModifyState /* extracted fdw_private data */ char *query; /* text of INSERT/UPDATE/DELETE command */ char *orig_query; /* original text of INSERT command */ + char *copy_query; /* text of COPY command if it's being used */ List *target_attrs; /* list of target attribute numbers */ int values_end; /* length up to the end of VALUES */ int batch_size; /* value of FDW option "batch_size" */ @@ -545,6 +549,9 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo, const PgFdwRelationInfo *fpinfo_o, const PgFdwRelationInfo *fpinfo_i); static int get_batch_size_option(Relation rel); +static TupleTableSlot **execute_foreign_insert_using_copy(PgFdwModifyState *fmstate, + TupleTableSlot **slots, + int *numSlots); /* @@ -2942,8 +2949,23 @@ postgresExplainForeignModify(ModifyTableState *mtstate, { if (es->verbose) { - char *sql = strVal(list_nth(fdw_private, - FdwModifyPrivateUpdateSql)); + char *sql = NULL; + + /* + * We only have ri_FdwState during EXPLAIN(ANALYZE), so check if the + * COPY was used during query execution and show it as a Remote SQL. + */ + if (rinfo->ri_FdwState != NULL) + { + PgFdwModifyState *fmstate = (PgFdwModifyState *) rinfo->ri_FdwState; + + if (fmstate->copy_query != NULL) + sql = fmstate->copy_query; + } + + if (sql == NULL) + sql = strVal(list_nth(fdw_private, + FdwModifyPrivateUpdateSql)); ExplainPropertyText("Remote SQL", sql, es); @@ -4066,6 +4088,50 @@ create_foreign_modify(EState *estate, return fmstate; } +/* + * Write target attribute values from fmstate into buf buffer to be sent as + * COPY FROM STDIN data + */ +static void +convert_slot_to_copy_text(StringInfo buf, + PgFdwModifyState *fmstate, + TupleTableSlot *slot) +{ + ListCell *lc; + TupleDesc tupdesc = RelationGetDescr(fmstate->rel); + bool first = true; + + foreach(lc, fmstate->target_attrs) + { + int attnum = lfirst_int(lc); + CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum - 1); + Datum datum; + bool isnull; + + /* Ignore generated columns; they are set to DEFAULT */ + if (attr->attgenerated) + continue; + + if (!first) + appendStringInfoCharMacro(buf, '\t'); + first = false; + + datum = slot_getattr(slot, attnum, &isnull); + + if (isnull) + appendStringInfoString(buf, "\\N"); + else + { + const char *value = OutputFunctionCall(&fmstate->p_flinfo[attnum - 1], + datum); + + appendStringInfoString(buf, value); + } + } + + appendStringInfoCharMacro(buf, '\n'); +} + /* * execute_foreign_modify * Perform foreign-table modification as required, and fetch RETURNING @@ -4097,6 +4163,13 @@ execute_foreign_modify(EState *estate, if (fmstate->conn_state->pendingAreq) process_pending_request(fmstate->conn_state->pendingAreq); + /* + * Use COPY command for batch insert if the original query don't include a + * RETURNING clause + */ + if (operation == CMD_INSERT && *numSlots > 1 && !fmstate->has_returning) + return execute_foreign_insert_using_copy(fmstate, slots, numSlots); + /* * If the existing query was deparsed and prepared for a different number * of rows, rebuild it for the proper number. @@ -7886,3 +7959,85 @@ get_batch_size_option(Relation rel) return batch_size; } + +/* Execute a batch insert into a foreign table using the COPY command */ +static TupleTableSlot ** +execute_foreign_insert_using_copy(PgFdwModifyState *fmstate, + TupleTableSlot **slots, + int *numSlots) +{ + PGresult *res; + StringInfoData sql; + StringInfoData copy_data; + int n_rows; + int i; + + if (fmstate->copy_query == NULL) + { + /* Build COPY command */ + initStringInfo(&sql); + buildCopySql(&sql, fmstate->rel, fmstate->target_attrs); + + /* Cache for reuse. */ + fmstate->copy_query = sql.data; + } + + /* Send COPY command */ + if (!PQsendQuery(fmstate->conn, fmstate->copy_query)) + pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query); + + /* get the COPY result */ + res = pgfdw_get_result(fmstate->conn); + if (PQresultStatus(res) != PGRES_COPY_IN) + pgfdw_report_error(res, fmstate->conn, fmstate->copy_query); + + /* Convert the TupleTableSlot data into a TEXT-formatted line */ + initStringInfo(©_data); + for (i = 0; i < *numSlots; i++) + { + convert_slot_to_copy_text(©_data, fmstate, slots[i]); + + /* + * Send initial COPY data if the buffer reach the limit to avoid large + * memory usage. + */ + if (copy_data.len >= COPYBUFSIZ) + { + if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0) + pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query); + resetStringInfo(©_data); + } + } + + /* Send the remaining COPY data */ + if (copy_data.len > 0) + { + if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0) + pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query); + } + + /* End the COPY operation */ + if (PQputCopyEnd(fmstate->conn, NULL) < 0 || PQflush(fmstate->conn)) + pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query); + + /* + * Get the result, and check for success. + */ + res = pgfdw_get_result(fmstate->conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(res, fmstate->conn, fmstate->copy_query); + + n_rows = atoi(PQcmdTuples(res)); + + /* And clean up */ + PQclear(res); + + MemoryContextReset(fmstate->temp_cxt); + + *numSlots = n_rows; + + /* + * Return NULL if nothing was inserted on the remote end + */ + return (n_rows > 0) ? slots : NULL; +} diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index e69735298d7..c0198b865f3 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -204,6 +204,7 @@ extern void rebuildInsertSql(StringInfo buf, Relation rel, char *orig_query, List *target_attrs, int values_end_len, int num_params, int num_rows); +extern void buildCopySql(StringInfo buf, Relation rel, List *target_attrs); extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, -- 2.51.0