From d4041814ba377475a1fa36b6972d5cf5f989a38f Mon Sep 17 00:00:00 2001 From: Matheus Alcantara Date: Fri, 10 Oct 2025 16:07:08 -0300 Subject: [PATCH v1] postgres_fdw: Use COPY to speed up batch inserts --- contrib/postgres_fdw/deparse.c | 32 +++++++++ contrib/postgres_fdw/postgres_fdw.c | 108 ++++++++++++++++++++++++++++ contrib/postgres_fdw/postgres_fdw.h | 1 + 3 files changed, 141 insertions(+) diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index e5b5e1a5f51..cd80bb7306a 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -2238,6 +2238,38 @@ rebuildInsertSql(StringInfo buf, Relation rel, appendStringInfoString(buf, orig_query + values_end_len); } +/* + * Build a COPY FROM STDIN statement using the TEXT format + */ +void +buildCopySql(StringInfo buf, Relation rel, List *target_attrs) +{ + ListCell *lc; + TupleDesc tupdesc = RelationGetDescr(rel); + bool first = true; + + appendStringInfo(buf, "COPY "); + deparseRelation(buf, rel); + appendStringInfo(buf, "("); + + foreach(lc, target_attrs) + { + int attnum = lfirst_int(lc); + Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); + + if (attr->attgenerated) + continue; + + if (!first) + appendStringInfoString(buf, ", "); + + first = false; + + appendStringInfoString(buf, quote_identifier(NameStr(attr->attname))); + } + appendStringInfoString(buf, ") FROM STDIN (FORMAT TEXT, DELIMITER ',')"); +} + /* * deparse remote UPDATE statement * diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 456b267f70b..985c9bc5be7 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -4066,6 +4066,50 @@ create_foreign_modify(EState *estate, return fmstate; } +/* + * Write target attribute values from fmstate into buf buffer to be sent as + * COPY FROM STDIN data + */ +static void +convert_slot_to_copy_text(StringInfo buf, + PgFdwModifyState *fmstate, + TupleTableSlot *slot) +{ + ListCell *lc; + TupleDesc tupdesc = RelationGetDescr(fmstate->rel); + bool first = true; + + foreach(lc, fmstate->target_attrs) + { + int attnum = lfirst_int(lc); + CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum - 1); + Datum datum; + bool isnull; + + /* Ignore generated columns; they are set to DEFAULT */ + if (attr->attgenerated) + continue; + + if (!first) + appendStringInfoString(buf, ","); + first = false; + + datum = slot_getattr(slot, attnum, &isnull); + + if (isnull) + appendStringInfoString(buf, "\\N"); + else + { + const char *value = OutputFunctionCall(&fmstate->p_flinfo[attnum - 1], + datum); + + appendStringInfoString(buf, value); + } + } + + appendStringInfoChar(buf, '\n'); +} + /* * execute_foreign_modify * Perform foreign-table modification as required, and fetch RETURNING @@ -4097,6 +4141,70 @@ execute_foreign_modify(EState *estate, if (fmstate->conn_state->pendingAreq) process_pending_request(fmstate->conn_state->pendingAreq); + /* + * Use COPY command for batch insert if the original query don't include a + * RETURNING clause + */ + if (operation == CMD_INSERT && *numSlots > 1 && !fmstate->has_returning) + { + int i; + StringInfoData copy_data; + + /* Build COPY command */ + initStringInfo(&sql); + buildCopySql(&sql, fmstate->rel, fmstate->target_attrs); + + /* Send COPY command */ + if (!PQsendQuery(fmstate->conn, sql.data)) + pgfdw_report_error(NULL, fmstate->conn, sql.data); + + /* get the COPY result */ + res = pgfdw_get_result(fmstate->conn); + if (PQresultStatus(res) != PGRES_COPY_IN) + pgfdw_report_error(res, fmstate->conn, sql.data); + + /* Convert the TupleTableSlot data into a TEXT-formatted line */ + initStringInfo(©_data); + for (i = 0; i < *numSlots; i++) + { + /* + * XXX(matheus): Should we have a COPYBUFSIZ limit to send large + * data in batches instead of grow the buffer too much? + */ + convert_slot_to_copy_text(©_data, fmstate, slots[i]); + } + + /* Send COPY data */ + if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0) + pgfdw_report_error(NULL, fmstate->conn, sql.data); + + /* End the COPY operation */ + if (PQputCopyEnd(fmstate->conn, NULL) < 0 || PQflush(fmstate->conn)) + pgfdw_report_error(NULL, fmstate->conn, sql.data); + + /* + * Get the result, and check for success. + */ + res = pgfdw_get_result(fmstate->conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(res, fmstate->conn, sql.data); + + n_rows = atoi(PQcmdTuples(res)); + + /* And clean up */ + PQclear(res); + + MemoryContextReset(fmstate->temp_cxt); + + *numSlots = n_rows; + + /* + * Return NULL if nothing was inserted/updated/deleted on the remote + * end + */ + return (n_rows > 0) ? slots : NULL; + } + /* * If the existing query was deparsed and prepared for a different number * of rows, rebuild it for the proper number. diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index e69735298d7..c0198b865f3 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -204,6 +204,7 @@ extern void rebuildInsertSql(StringInfo buf, Relation rel, char *orig_query, List *target_attrs, int values_end_len, int num_params, int num_rows); +extern void buildCopySql(StringInfo buf, Relation rel, List *target_attrs); extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, -- 2.51.0