From 1856fba0c49d5aa7b164debb90b376c72cfa3e02 Mon Sep 17 00:00:00 2001 From: Matheus Alcantara Date: Fri, 10 Oct 2025 16:07:08 -0300 Subject: [PATCH v4] postgres_fdw: Use COPY to speed up batch inserts --- contrib/postgres_fdw/deparse.c | 30 ++++ .../postgres_fdw/expected/postgres_fdw.out | 120 +++++++++++-- contrib/postgres_fdw/postgres_fdw.c | 159 +++++++++++++++++- contrib/postgres_fdw/postgres_fdw.h | 1 + contrib/postgres_fdw/sql/postgres_fdw.sql | 52 ++++++ 5 files changed, 350 insertions(+), 12 deletions(-) diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index f2fb0051843..113e6fb7d91 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -2236,6 +2236,36 @@ rebuildInsertSql(StringInfo buf, Relation rel, appendStringInfoString(buf, orig_query + values_end_len); } +/* + * Build a COPY FROM STDIN statement using the TEXT format + */ +void +buildCopySql(StringInfo buf, Relation rel, List *target_attrs) +{ + TupleDesc tupdesc = RelationGetDescr(rel); + bool first = true; + + appendStringInfo(buf, "COPY "); + deparseRelation(buf, rel); + appendStringInfo(buf, "("); + + foreach_int(attnum, target_attrs) + { + Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); + + if (attr->attgenerated) + continue; + + if (!first) + appendStringInfoString(buf, ", "); + + first = false; + + appendStringInfoString(buf, quote_identifier(NameStr(attr->attname))); + } + appendStringInfoString(buf, ") FROM STDIN"); +} + /* * deparse remote UPDATE statement * diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index cd28126049d..dd507ad6186 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -50,6 +50,18 @@ CREATE TABLE "S 1"."T 4" ( c3 text, CONSTRAINT t4_pkey PRIMARY KEY (c1) ); +CREATE TABLE "S 1"."T 5"( + x int +); +CREATE TABLE "S 1"."T 6"( + id int not null, + note text, + value int NOT NULL +); +CREATE TABLE "S 1"."T 7"( + id int, + t text +); -- Disable autovacuum for these tables to avoid unexpected effects of that ALTER TABLE "S 1"."T 1" SET (autovacuum_enabled = 'false'); ALTER TABLE "S 1"."T 2" SET (autovacuum_enabled = 'false'); @@ -132,6 +144,21 @@ CREATE FOREIGN TABLE ft7 ( c2 int NOT NULL, c3 text ) SERVER loopback3 OPTIONS (schema_name 'S 1', table_name 'T 4'); +CREATE FOREIGN TABLE ft8 ( + x int +) +SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 5', batch_size '10'); +CREATE FOREIGN TABLE ft9 ( + id int not null, + note text, + value int NOT NULL +) +SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 6', batch_size '10'); +CREATE FOREIGN TABLE ft10 ( + id int, + t text +) +SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 7', batch_size '10'); -- =================================================================== -- tests for validator -- =================================================================== @@ -205,16 +232,19 @@ ALTER FOREIGN TABLE ft2 OPTIONS (schema_name 'S 1', table_name 'T 1'); ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (column_name 'C 1'); ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1'); \det+ - List of foreign tables - Schema | Table | Server | FDW options | Description ---------+-------+-----------+---------------------------------------+------------- - public | ft1 | loopback | (schema_name 'S 1', table_name 'T 1') | - public | ft2 | loopback | (schema_name 'S 1', table_name 'T 1') | - public | ft4 | loopback | (schema_name 'S 1', table_name 'T 3') | - public | ft5 | loopback | (schema_name 'S 1', table_name 'T 4') | - public | ft6 | loopback2 | (schema_name 'S 1', table_name 'T 4') | - public | ft7 | loopback3 | (schema_name 'S 1', table_name 'T 4') | -(6 rows) + List of foreign tables + Schema | Table | Server | FDW options | Description +--------+-------+-----------+--------------------------------------------------------+------------- + public | ft1 | loopback | (schema_name 'S 1', table_name 'T 1') | + public | ft10 | loopback | (schema_name 'S 1', table_name 'T 7', batch_size '10') | + public | ft2 | loopback | (schema_name 'S 1', table_name 'T 1') | + public | ft4 | loopback | (schema_name 'S 1', table_name 'T 3') | + public | ft5 | loopback | (schema_name 'S 1', table_name 'T 4') | + public | ft6 | loopback2 | (schema_name 'S 1', table_name 'T 4') | + public | ft7 | loopback3 | (schema_name 'S 1', table_name 'T 4') | + public | ft8 | loopback | (schema_name 'S 1', table_name 'T 5', batch_size '10') | + public | ft9 | loopback | (schema_name 'S 1', table_name 'T 6', batch_size '10') | +(9 rows) -- Test that alteration of server options causes reconnection -- Remote's errors might be non-English, so hide them to ensure stable results @@ -12664,6 +12694,76 @@ ANALYZE analyze_ftable; -- cleanup DROP FOREIGN TABLE analyze_ftable; DROP TABLE analyze_table; +-- =================================================================== +-- test for batch insert using COPY +-- =================================================================== +ALTER FOREIGN TABLE ft8 DROP COLUMN x; +ALTER FOREIGN TABLE ft8 add COLUMN x int; +INSERT INTO ft8 SELECT * FROM generate_series(1, 10) i; +SELECT * FROM ft8; + x +---- + 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10 +(10 rows) + +EXPLAIN(ANALYZE, VERBOSE, COSTS OFF, SUMMARY OFF, BUFFERS OFF, TIMING OFF) INSERT INTO ft9 (id, value, note) +SELECT g, + g * 2, + 'batch insert test data' || g +FROM generate_series(1, 20) g; + QUERY PLAN +--------------------------------------------------------------------------------- + Insert on public.ft9 (actual rows=0.00 loops=1) + Remote SQL: COPY "S 1"."T 6"(id, note, value) FROM STDIN + Batch Size: 10 + -> Function Scan on pg_catalog.generate_series g (actual rows=20.00 loops=1) + Output: g.g, ('batch insert test data'::text || (g.g)::text), (g.g * 2) + Function Call: generate_series(1, 20) +(6 rows) + +SELECT * FROM ft9; + id | note | value +----+--------------------------+------- + 1 | batch insert test data1 | 2 + 2 | batch insert test data2 | 4 + 3 | batch insert test data3 | 6 + 4 | batch insert test data4 | 8 + 5 | batch insert test data5 | 10 + 6 | batch insert test data6 | 12 + 7 | batch insert test data7 | 14 + 8 | batch insert test data8 | 16 + 9 | batch insert test data9 | 18 + 10 | batch insert test data10 | 20 + 11 | batch insert test data11 | 22 + 12 | batch insert test data12 | 24 + 13 | batch insert test data13 | 26 + 14 | batch insert test data14 | 28 + 15 | batch insert test data15 | 30 + 16 | batch insert test data16 | 32 + 17 | batch insert test data17 | 34 + 18 | batch insert test data18 | 36 + 19 | batch insert test data19 | 38 + 20 | batch insert test data20 | 40 +(20 rows) + +-- Test buffer limit of copy data on COPYBUFSIZ +INSERT INTO ft10 (id, t) +SELECT s, repeat(md5(s::text), 10000) from generate_series(100, 103) s; +SELECT COUNT(*) FROM ft10; + count +------- + 4 +(1 row) + -- =================================================================== -- test for postgres_fdw_get_connections function with check_conn = true -- =================================================================== diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 456b267f70b..d8e13e78938 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -63,6 +63,9 @@ PG_MODULE_MAGIC_EXT( /* If no remote estimates, assume a sort costs 20% extra */ #define DEFAULT_FDW_SORT_MULTIPLIER 1.2 +/* Buffer size to send COPY IN data*/ +#define COPYBUFSIZ 8192 + /* * Indexes of FDW-private information stored in fdw_private lists. * @@ -192,6 +195,7 @@ typedef struct PgFdwModifyState /* extracted fdw_private data */ char *query; /* text of INSERT/UPDATE/DELETE command */ char *orig_query; /* original text of INSERT command */ + char *copy_query; /* text of COPY command if it's being used */ List *target_attrs; /* list of target attribute numbers */ int values_end; /* length up to the end of VALUES */ int batch_size; /* value of FDW option "batch_size" */ @@ -545,6 +549,9 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo, const PgFdwRelationInfo *fpinfo_o, const PgFdwRelationInfo *fpinfo_i); static int get_batch_size_option(Relation rel); +static TupleTableSlot **execute_foreign_insert_using_copy(PgFdwModifyState *fmstate, + TupleTableSlot **slots, + int *numSlots); /* @@ -2942,8 +2949,23 @@ postgresExplainForeignModify(ModifyTableState *mtstate, { if (es->verbose) { - char *sql = strVal(list_nth(fdw_private, - FdwModifyPrivateUpdateSql)); + char *sql = NULL; + + /* + * We only have ri_FdwState during EXPLAIN(ANALYZE), so check if the + * COPY was used during query execution and show it as a Remote SQL. + */ + if (rinfo->ri_FdwState != NULL) + { + PgFdwModifyState *fmstate = (PgFdwModifyState *) rinfo->ri_FdwState; + + if (fmstate->copy_query != NULL) + sql = fmstate->copy_query; + } + + if (sql == NULL) + sql = strVal(list_nth(fdw_private, + FdwModifyPrivateUpdateSql)); ExplainPropertyText("Remote SQL", sql, es); @@ -4066,6 +4088,50 @@ create_foreign_modify(EState *estate, return fmstate; } +/* + * Write target attribute values from fmstate into buf buffer to be sent as + * COPY FROM STDIN data + */ +static void +convert_slot_to_copy_text(StringInfo buf, + PgFdwModifyState *fmstate, + TupleTableSlot *slot) +{ + TupleDesc tupdesc = RelationGetDescr(fmstate->rel); + bool first = true; + int i = 0; + + foreach_int(attnum, fmstate->target_attrs) + { + CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum - 1); + Datum datum; + bool isnull; + + /* Ignore generated columns; they are set to DEFAULT */ + if (attr->attgenerated) + continue; + + if (!first) + appendStringInfoCharMacro(buf, '\t'); + first = false; + + datum = slot_getattr(slot, attnum, &isnull); + + if (isnull) + appendStringInfoString(buf, "\\N"); + else + { + const char *value = OutputFunctionCall(&fmstate->p_flinfo[i], + datum); + + appendStringInfoString(buf, value); + } + i++; + } + + appendStringInfoCharMacro(buf, '\n'); +} + /* * execute_foreign_modify * Perform foreign-table modification as required, and fetch RETURNING @@ -4097,6 +4163,13 @@ execute_foreign_modify(EState *estate, if (fmstate->conn_state->pendingAreq) process_pending_request(fmstate->conn_state->pendingAreq); + /* + * Use COPY command for batch insert if the original query don't include a + * RETURNING clause + */ + if (operation == CMD_INSERT && *numSlots > 1 && !fmstate->has_returning) + return execute_foreign_insert_using_copy(fmstate, slots, numSlots); + /* * If the existing query was deparsed and prepared for a different number * of rows, rebuild it for the proper number. @@ -7886,3 +7959,85 @@ get_batch_size_option(Relation rel) return batch_size; } + +/* Execute a batch insert into a foreign table using the COPY command */ +static TupleTableSlot ** +execute_foreign_insert_using_copy(PgFdwModifyState *fmstate, + TupleTableSlot **slots, + int *numSlots) +{ + PGresult *res; + StringInfoData sql; + StringInfoData copy_data; + int n_rows; + int i; + + if (fmstate->copy_query == NULL) + { + /* Build COPY command */ + initStringInfo(&sql); + buildCopySql(&sql, fmstate->rel, fmstate->target_attrs); + + /* Cache for reuse. */ + fmstate->copy_query = sql.data; + } + + /* Send COPY command */ + if (!PQsendQuery(fmstate->conn, fmstate->copy_query)) + pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query); + + /* get the COPY result */ + res = pgfdw_get_result(fmstate->conn); + if (PQresultStatus(res) != PGRES_COPY_IN) + pgfdw_report_error(res, fmstate->conn, fmstate->copy_query); + + /* Convert the TupleTableSlot data into a TEXT-formatted line */ + initStringInfo(©_data); + for (i = 0; i < *numSlots; i++) + { + convert_slot_to_copy_text(©_data, fmstate, slots[i]); + + /* + * Send initial COPY data if the buffer reach the limit to avoid large + * memory usage. + */ + if (copy_data.len >= COPYBUFSIZ) + { + if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0) + pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query); + resetStringInfo(©_data); + } + } + + /* Send the remaining COPY data */ + if (copy_data.len > 0) + { + if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0) + pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query); + } + + /* End the COPY operation */ + if (PQputCopyEnd(fmstate->conn, NULL) < 0 || PQflush(fmstate->conn)) + pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query); + + /* + * Get the result, and check for success. + */ + res = pgfdw_get_result(fmstate->conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(res, fmstate->conn, fmstate->copy_query); + + n_rows = atoi(PQcmdTuples(res)); + + /* And clean up */ + PQclear(res); + + MemoryContextReset(fmstate->temp_cxt); + + *numSlots = n_rows; + + /* + * Return NULL if nothing was inserted on the remote end + */ + return (n_rows > 0) ? slots : NULL; +} diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index e69735298d7..c0198b865f3 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -204,6 +204,7 @@ extern void rebuildInsertSql(StringInfo buf, Relation rel, char *orig_query, List *target_attrs, int values_end_len, int num_params, int num_rows); +extern void buildCopySql(StringInfo buf, Relation rel, List *target_attrs); extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 9a8f9e28135..79f4f305641 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -54,6 +54,18 @@ CREATE TABLE "S 1"."T 4" ( c3 text, CONSTRAINT t4_pkey PRIMARY KEY (c1) ); +CREATE TABLE "S 1"."T 5"( + x int +); +CREATE TABLE "S 1"."T 6"( + id int not null, + note text, + value int NOT NULL +); +CREATE TABLE "S 1"."T 7"( + id int, + t text +); -- Disable autovacuum for these tables to avoid unexpected effects of that ALTER TABLE "S 1"."T 1" SET (autovacuum_enabled = 'false'); @@ -146,6 +158,24 @@ CREATE FOREIGN TABLE ft7 ( c3 text ) SERVER loopback3 OPTIONS (schema_name 'S 1', table_name 'T 4'); +CREATE FOREIGN TABLE ft8 ( + x int +) +SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 5', batch_size '10'); + +CREATE FOREIGN TABLE ft9 ( + id int not null, + note text, + value int NOT NULL +) +SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 6', batch_size '10'); + +CREATE FOREIGN TABLE ft10 ( + id int, + t text +) +SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 7', batch_size '10'); + -- =================================================================== -- tests for validator -- =================================================================== @@ -4379,6 +4409,28 @@ ANALYZE analyze_ftable; DROP FOREIGN TABLE analyze_ftable; DROP TABLE analyze_table; +-- =================================================================== +-- test for batch insert using COPY +-- =================================================================== +ALTER FOREIGN TABLE ft8 DROP COLUMN x; +ALTER FOREIGN TABLE ft8 add COLUMN x int; + +INSERT INTO ft8 SELECT * FROM generate_series(1, 10) i; +SELECT * FROM ft8; + +EXPLAIN(ANALYZE, VERBOSE, COSTS OFF, SUMMARY OFF, BUFFERS OFF, TIMING OFF) INSERT INTO ft9 (id, value, note) +SELECT g, + g * 2, + 'batch insert test data' || g +FROM generate_series(1, 20) g; + +SELECT * FROM ft9; + +-- Test buffer limit of copy data on COPYBUFSIZ +INSERT INTO ft10 (id, t) +SELECT s, repeat(md5(s::text), 10000) from generate_series(100, 103) s; +SELECT COUNT(*) FROM ft10; + -- =================================================================== -- test for postgres_fdw_get_connections function with check_conn = true -- =================================================================== -- 2.51.0