From df2cf502909886fbfc86f93f36b2daba03f785e4 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Sun, 28 Jun 2020 14:31:18 +0200 Subject: [PATCH] patch --- contrib/postgres_fdw/deparse.c | 73 ++++++++ contrib/postgres_fdw/postgres_fdw.c | 261 ++++++++++++++++++++++++---- contrib/postgres_fdw/postgres_fdw.h | 5 + 3 files changed, 305 insertions(+), 34 deletions(-) diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index ad37a74221..374d2f5dbb 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -1758,6 +1758,79 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte, withCheckOptionList, returningList, retrieved_attrs); } +/* + * deparse remote batch INSERT statement + * + * The statement text is appended to buf, and we also create an integer List + * of the columns being retrieved by WITH CHECK OPTION or RETURNING (if any), + * which is returned to *retrieved_attrs. + */ +void +deparseBatchInsertSql(StringInfo buf, RangeTblEntry *rte, + Index rtindex, Relation rel, + List *targetAttrs, bool doNothing, + List *withCheckOptionList, List *returningList, + List **retrieved_attrs, int batchSize) +{ + AttrNumber pindex; + bool first; + ListCell *lc; + int i; + + appendStringInfoString(buf, "INSERT INTO "); + deparseRelation(buf, rel); + + if (targetAttrs) + { + appendStringInfoChar(buf, '('); + + first = true; + foreach(lc, targetAttrs) + { + int attnum = lfirst_int(lc); + + if (!first) + appendStringInfoString(buf, ", "); + first = false; + + deparseColumnRef(buf, rtindex, attnum, rte, false); + } + + appendStringInfoString(buf, ") VALUES "); + + pindex = 1; + for (i = 0; i < batchSize; i++) + { + if (i > 0) + appendStringInfoString(buf, ", "); + + appendStringInfoString(buf, "("); + + first = true; + foreach(lc, targetAttrs) + { + if (!first) + appendStringInfoString(buf, ", "); + first = false; + + appendStringInfo(buf, "$%d", pindex); + pindex++; + } + + appendStringInfoChar(buf, ')'); + } + } + else + appendStringInfoString(buf, " DEFAULT VALUES"); + + if (doNothing) + appendStringInfoString(buf, " ON CONFLICT DO NOTHING"); + + deparseReturningList(buf, rte, rtindex, rel, + rel->trigdesc && rel->trigdesc->trig_insert_after_row, + withCheckOptionList, returningList, retrieved_attrs); +} + /* * deparse remote UPDATE statement * diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 9fc53cad68..17421f6b65 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -56,6 +56,8 @@ PG_MODULE_MAGIC; /* If no remote estimates, assume a sort costs 20% extra */ #define DEFAULT_FDW_SORT_MULTIPLIER 1.2 +#define BATCH_SIZE 100 + /* * Indexes of FDW-private information stored in fdw_private lists. * @@ -93,6 +95,8 @@ enum FdwModifyPrivateIndex { /* SQL statement to execute remotely (as a String node) */ FdwModifyPrivateUpdateSql, + /* SQL statement to execute remotely (as a String node) */ + FdwModifyPrivateUpdateBatchSql, /* Integer list of target attribute numbers for INSERT/UPDATE */ FdwModifyPrivateTargetAttnums, /* has-returning flag (as an integer Value node) */ @@ -172,9 +176,11 @@ typedef struct PgFdwModifyState /* for remote query execution */ PGconn *conn; /* connection for the scan */ char *p_name; /* name of prepared statement, if created */ + char *p_name_batch; /* name of prepared batch statement, if created */ /* extracted fdw_private data */ char *query; /* text of INSERT/UPDATE/DELETE command */ + char *batch_query; /* text of INSERT/UPDATE/DELETE command */ List *target_attrs; /* list of target attribute numbers */ bool has_returning; /* is there a RETURNING clause? */ List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ @@ -187,6 +193,12 @@ typedef struct PgFdwModifyState /* working memory context */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ + /* batching of values */ + MemoryContext batch_cxt; + int maxbatched; + int nbatched; + const char **values; + /* for update row movement if subplan result rel */ struct PgFdwModifyState *aux_fmstate; /* foreign-insert state, if * created */ @@ -427,6 +439,7 @@ static PgFdwModifyState *create_foreign_modify(EState *estate, CmdType operation, Plan *subplan, char *query, + char *batch_query, List *target_attrs, bool has_returning, List *retrieved_attrs); @@ -435,6 +448,11 @@ static TupleTableSlot *execute_foreign_modify(EState *estate, CmdType operation, TupleTableSlot *slot, TupleTableSlot *planSlot); +static TupleTableSlot *flush_foreign_modify(EState *estate, + ResultRelInfo *resultRelInfo, + CmdType operation, + TupleTableSlot *slot, + TupleTableSlot *planSlot); static void prepare_foreign_modify(PgFdwModifyState *fmstate); static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, @@ -1659,13 +1677,16 @@ postgresPlanForeignModify(PlannerInfo *root, RangeTblEntry *rte = planner_rt_fetch(resultRelation, root); Relation rel; StringInfoData sql; + StringInfoData batch_sql; List *targetAttrs = NIL; List *withCheckOptionList = NIL; List *returningList = NIL; List *retrieved_attrs = NIL; bool doNothing = false; + List *priv = NIL; initStringInfo(&sql); + initStringInfo(&batch_sql); /* * Core code already has some lock on each rel being planned, so we can @@ -1752,6 +1773,11 @@ postgresPlanForeignModify(PlannerInfo *root, targetAttrs, doNothing, withCheckOptionList, returningList, &retrieved_attrs); + + deparseBatchInsertSql(&batch_sql, rte, resultRelation, rel, + targetAttrs, doNothing, + withCheckOptionList, returningList, + &retrieved_attrs, BATCH_SIZE); break; case CMD_UPDATE: deparseUpdateSql(&sql, rte, resultRelation, rel, @@ -1775,10 +1801,13 @@ postgresPlanForeignModify(PlannerInfo *root, * Build the fdw_private list that will be available to the executor. * Items in the list must match enum FdwModifyPrivateIndex, above. */ - return list_make4(makeString(sql.data), - targetAttrs, - makeInteger((retrieved_attrs != NIL)), - retrieved_attrs); + priv = lappend(priv, makeString(sql.data)); + priv = lappend(priv, makeString(batch_sql.data)); + priv = lappend(priv, targetAttrs); + priv = lappend(priv, makeInteger((retrieved_attrs != NIL))); + priv = lappend(priv, retrieved_attrs); + + return priv; } /* @@ -1794,6 +1823,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate, { PgFdwModifyState *fmstate; char *query; + char *batch_query; List *target_attrs; bool has_returning; List *retrieved_attrs; @@ -1809,6 +1839,8 @@ postgresBeginForeignModify(ModifyTableState *mtstate, /* Deconstruct fdw_private data. */ query = strVal(list_nth(fdw_private, FdwModifyPrivateUpdateSql)); + batch_query = strVal(list_nth(fdw_private, + FdwModifyPrivateUpdateBatchSql)); target_attrs = (List *) list_nth(fdw_private, FdwModifyPrivateTargetAttnums); has_returning = intVal(list_nth(fdw_private, @@ -1827,6 +1859,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate, mtstate->operation, mtstate->mt_plans[subplan_index]->plan, query, + batch_query, target_attrs, has_returning, retrieved_attrs); @@ -1925,6 +1958,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, TupleDesc tupdesc = RelationGetDescr(rel); int attnum; StringInfoData sql; + StringInfoData batch_sql; List *targetAttrs = NIL; List *retrieved_attrs = NIL; bool doNothing = false; @@ -1946,6 +1980,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, RelationGetRelationName(rel)))); initStringInfo(&sql); + initStringInfo(&batch_sql); /* We transmit all columns that are defined in the foreign table. */ for (attnum = 1; attnum <= tupdesc->natts; attnum++) @@ -2002,6 +2037,12 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, resultRelInfo->ri_returningList, &retrieved_attrs); + /* Construct the SQL command string. */ + deparseBatchInsertSql(&batch_sql, rte, resultRelation, rel, targetAttrs, doNothing, + resultRelInfo->ri_WithCheckOptions, + resultRelInfo->ri_returningList, + &retrieved_attrs, BATCH_SIZE); + /* Construct an execution state. */ fmstate = create_foreign_modify(mtstate->ps.state, rte, @@ -2009,6 +2050,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, CMD_INSERT, NULL, sql.data, + batch_sql.data, targetAttrs, retrieved_attrs != NIL, retrieved_attrs); @@ -2040,6 +2082,9 @@ postgresEndForeignInsert(EState *estate, Assert(fmstate != NULL); + /* Send over remaining data to insert. */ + flush_foreign_modify(estate, resultRelInfo, CMD_INSERT, NULL, NULL); + /* * If the fmstate has aux_fmstate set, get the aux_fmstate (see * postgresBeginForeignInsert()) @@ -3536,6 +3581,7 @@ create_foreign_modify(EState *estate, CmdType operation, Plan *subplan, char *query, + char *batch_query, List *target_attrs, bool has_returning, List *retrieved_attrs) @@ -3571,15 +3617,24 @@ create_foreign_modify(EState *estate, /* Set up remote query information. */ fmstate->query = query; + fmstate->batch_query = batch_query; fmstate->target_attrs = target_attrs; fmstate->has_returning = has_returning; fmstate->retrieved_attrs = retrieved_attrs; + fmstate->nbatched = 0; + fmstate->maxbatched = BATCH_SIZE * list_length(target_attrs); + fmstate->values = palloc(fmstate->maxbatched * sizeof(char *)); + /* Create context for per-tuple temp workspace. */ fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, "postgres_fdw temporary data", ALLOCSET_SMALL_SIZES); + fmstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt, + "postgres_fdw batch data", + ALLOCSET_DEFAULT_SIZES); + /* Prepare for input conversion of RETURNING results. */ if (fmstate->has_returning) fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc); @@ -3646,7 +3701,9 @@ execute_foreign_modify(EState *estate, ItemPointer ctid = NULL; const char **p_values; PGresult *res; - int n_rows; + int n_rows = 0; + MemoryContext oldctx; + int i; /* The operation should be INSERT, UPDATE, or DELETE */ Assert(operation == CMD_INSERT || @@ -3677,48 +3734,163 @@ execute_foreign_modify(EState *estate, /* Convert parameters needed by prepared statement to text form */ p_values = convert_prep_stmt_params(fmstate, ctid, slot); - /* - * Execute the prepared statement. - */ - if (!PQsendQueryPrepared(fmstate->conn, - fmstate->p_name, - fmstate->p_nums, - p_values, - NULL, - NULL, - 0)) - pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + /* copy the parameters to the batch */ + oldctx = MemoryContextSwitchTo(fmstate->batch_cxt); + + for (i = 0; i < fmstate->p_nums; i++) + if (p_values[i] == NULL) + fmstate->values[fmstate->nbatched++] = NULL; + else + fmstate->values[fmstate->nbatched++] = pstrdup(p_values[i]); + + MemoryContextSwitchTo(oldctx); + + Assert(fmstate->nbatched <= fmstate->maxbatched); + + /* if the batch is "full" we need to flush it */ + if (fmstate->nbatched == fmstate->maxbatched || fmstate->has_returning) + { + /* + * Execute the prepared statement. + */ + if (fmstate->has_returning) + { + if (!PQsendQueryPrepared(fmstate->conn, + fmstate->p_name, + fmstate->nbatched, + fmstate->values, + NULL, + NULL, + 0)) + pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + } + else if (!PQsendQueryPrepared(fmstate->conn, + fmstate->p_name_batch, + fmstate->nbatched, + fmstate->values, + NULL, + NULL, + 0)) + pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->batch_query); + + /* + * Get the result, and check for success. + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = pgfdw_get_result(fmstate->conn, fmstate->query); + if (PQresultStatus(res) != + (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) + pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + + /* Check number of rows affected, and fetch RETURNING tuple if any */ + if (fmstate->has_returning) + { + n_rows = PQntuples(res); + if (n_rows > 0) + store_returning_result(fmstate, slot, res); + } + else + n_rows = atoi(PQcmdTuples(res)); + + /* And clean up */ + PQclear(res); + + MemoryContextReset(fmstate->batch_cxt); + + fmstate->nbatched = 0; + } + else + /* XXX we don't know if the future insert succeeds */ + n_rows = 1; + + MemoryContextReset(fmstate->temp_cxt); /* - * Get the result, and check for success. - * - * We don't use a PG_TRY block here, so be careful not to throw error - * without releasing the PGresult. + * Return NULL if nothing was inserted/updated/deleted on the remote end */ - res = pgfdw_get_result(fmstate->conn, fmstate->query); - if (PQresultStatus(res) != - (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) - pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + return (n_rows > 0) ? slot : NULL; +} - /* Check number of rows affected, and fetch RETURNING tuple if any */ - if (fmstate->has_returning) +/* + * flush_foreign_modify + * Perform foreign-table modification as required, and fetch RETURNING + * result if any. (This is the shared guts of postgresExecForeignInsert, + * postgresExecForeignUpdate, and postgresExecForeignDelete.) + */ +static TupleTableSlot * +flush_foreign_modify(EState *estate, + ResultRelInfo *resultRelInfo, + CmdType operation, + TupleTableSlot *slot, + TupleTableSlot *planSlot) +{ + PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + PGresult *res; + int n_rows = 0; + int i; + + /* The operation should be INSERT, UPDATE, or DELETE */ + Assert(operation == CMD_INSERT || + operation == CMD_UPDATE || + operation == CMD_DELETE); + + /* Set up the prepared statement on the remote server, if we didn't yet */ + if (!fmstate->p_name) + prepare_foreign_modify(fmstate); + + /* if the batch is "full" we need to flush it */ + i = 0; + while (i < fmstate->nbatched) { - n_rows = PQntuples(res); - if (n_rows > 0) - store_returning_result(fmstate, slot, res); + /* + * Execute the prepared statement. + */ + if (!PQsendQueryPrepared(fmstate->conn, + fmstate->p_name, + fmstate->p_nums, + &fmstate->values[i], + NULL, + NULL, + 0)) + pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + + /* + * Get the result, and check for success. + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = pgfdw_get_result(fmstate->conn, fmstate->query); + if (PQresultStatus(res) != + (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) + pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + + /* Check number of rows affected, and fetch RETURNING tuple if any */ + if (fmstate->has_returning) + { + n_rows = PQntuples(res); + if (n_rows > 0) + store_returning_result(fmstate, slot, res); + } + else + n_rows = atoi(PQcmdTuples(res)); + + /* And clean up */ + PQclear(res); + + i += fmstate->p_nums; } - else - n_rows = atoi(PQcmdTuples(res)); - /* And clean up */ - PQclear(res); + Assert(i == fmstate->nbatched); MemoryContextReset(fmstate->temp_cxt); /* * Return NULL if nothing was inserted/updated/deleted on the remote end */ - return (n_rows > 0) ? slot : NULL; + return NULL; } /* @@ -3729,7 +3901,9 @@ static void prepare_foreign_modify(PgFdwModifyState *fmstate) { char prep_name[NAMEDATALEN]; + char prep_name_batch[NAMEDATALEN]; char *p_name; + char *p_name_batch; PGresult *res; /* Construct name we'll use for the prepared statement. */ @@ -3737,6 +3911,11 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) GetPrepStmtNumber(fmstate->conn)); p_name = pstrdup(prep_name); + /* Construct name we'll use for the batch prepared statement. */ + snprintf(prep_name_batch, sizeof(prep_name_batch), "pgsql_fdw_prep_%u", + GetPrepStmtNumber(fmstate->conn)); + p_name_batch = pstrdup(prep_name_batch); + /* * We intentionally do not specify parameter types here, but leave the * remote server to derive them by default. This avoids possible problems @@ -3762,8 +3941,22 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); PQclear(res); + if (fmstate->batch_query && + !PQsendPrepare(fmstate->conn, + p_name_batch, + fmstate->batch_query, + 0, + NULL)) + pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->batch_query); + + res = pgfdw_get_result(fmstate->conn, fmstate->batch_query); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->batch_query); + PQclear(res); + /* This action shows that the prepare has been done. */ fmstate->p_name = p_name; + fmstate->p_name_batch = p_name_batch; } /* diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index eef410db39..7e4342cab6 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -162,6 +162,11 @@ extern void deparseInsertSql(StringInfo buf, RangeTblEntry *rte, List *targetAttrs, bool doNothing, List *withCheckOptionList, List *returningList, List **retrieved_attrs); +extern void deparseBatchInsertSql(StringInfo buf, RangeTblEntry *rte, + Index rtindex, Relation rel, + List *targetAttrs, bool doNothing, + List *withCheckOptionList, List *returningList, + List **retrieved_attrs, int batchSize); extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, -- 2.25.4