From 766bbe84def2db494f646caeaf29eefeba893c1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20V=C3=A9rit=C3=A9?= Date: Mon, 20 Nov 2023 17:24:55 +0100 Subject: [PATCH v4 1/2] Implement retrieval of results in chunks with libpq. This mode is similar to the single-row mode except that chunks of results contain up to N rows instead of a single row. It is meant to reduce the overhead of the row-by-row allocations for large result sets. The mode is selected with PQsetChunkedRowsMode(int maxRows) and results have the new status code PGRES_TUPLES_CHUNK. --- doc/src/sgml/libpq.sgml | 96 +++++++++++++++++++------ src/bin/pg_amcheck/pg_amcheck.c | 1 + src/interfaces/libpq/exports.txt | 1 + src/interfaces/libpq/fe-exec.c | 118 +++++++++++++++++++++++++------ src/interfaces/libpq/libpq-fe.h | 4 +- src/interfaces/libpq/libpq-int.h | 7 +- 6 files changed, 183 insertions(+), 44 deletions(-) diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index ed88ac001a..8007bf67d8 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -3537,7 +3537,20 @@ ExecStatusType PQresultStatus(const PGresult *res); The PGresult contains a single result tuple from the current command. This status occurs only when single-row mode has been selected for the query - (see ). + (see ). + + + + + + PGRES_TUPLES_CHUNK + + + The PGresult contains several tuples + from the current command. The count of tuples cannot exceed + the maximum passed to . + This status occurs only when the chunked mode has been selected + for the query (see ). @@ -5187,8 +5200,8 @@ PGresult *PQgetResult(PGconn *conn); Another frequently-desired feature that can be obtained with and - is retrieving large query results a row at a time. This is discussed - in . + is retrieving large query results a limited number of rows at a time. This is discussed + in . @@ -5551,12 +5564,13 @@ int PQflush(PGconn *conn); - To enter single-row mode, call PQsetSingleRowMode - before retrieving results with PQgetResult. - This mode selection is effective only for the query currently - being processed. For more information on the use of - PQsetSingleRowMode, - refer to . + To enter single-row or chunked modes, call + respectively PQsetSingleRowMode + or PQsetChunkedRowsMode before retrieving results + with PQgetResult. This mode selection is effective + only for the query currently being processed. For more information on the + use of these functions refer + to . @@ -5895,10 +5909,10 @@ UPDATE mytable SET x = x + 1 WHERE id = 42; - - Retrieving Query Results Row-by-Row + + Retrieving Query Results by chunks - + libpq single-row mode @@ -5909,13 +5923,15 @@ UPDATE mytable SET x = x + 1 WHERE id = 42; PGresult. This can be unworkable for commands that return a large number of rows. For such cases, applications can use and in - single-row mode. In this mode, the result row(s) are - returned to the application one at a time, as they are received from the - server. + single-row mode or chunked mode. + In these modes, the result row(s) are returned to the application one at a + time for the single-row mode and by chunks for the chunked mode, as they + are received from the server. - To enter single-row mode, call + To enter these modes, call + or immediately after a successful call of (or a sibling function). This mode selection is effective only for the currently executing query. Then call @@ -5923,7 +5939,8 @@ UPDATE mytable SET x = x + 1 WHERE id = 42; linkend="libpq-async"/>. If the query returns any rows, they are returned as individual PGresult objects, which look like normal query results except for having status code - PGRES_SINGLE_TUPLE instead of + PGRES_SINGLE_TUPLE for the single-row mode and + PGRES_TUPLES_CHUNK for the chunked mode, instead of PGRES_TUPLES_OK. After the last row, or immediately if the query returns zero rows, a zero-row object with status PGRES_TUPLES_OK is returned; this is the signal that no @@ -5936,9 +5953,9 @@ UPDATE mytable SET x = x + 1 WHERE id = 42; - When using pipeline mode, single-row mode needs to be activated for each - query in the pipeline before retrieving results for that query - with PQgetResult. + When using pipeline mode, the single-row or chunked mode need to be + activated for each query in the pipeline before retrieving results for that + query with PQgetResult. See for more information. @@ -5972,14 +5989,49 @@ int PQsetSingleRowMode(PGconn *conn); + + + + PQsetChunkedRowsMode + PQsetChunkedRowsMode + + + Select the mode retrieving results in chunks for the currently-executing query. + + + int PQsetChunkedRowsMode(PGconn *conn, + int maxRows); + + + + + This function is similar to , + except that it can retrieve a user-specified number of rows + per call to , instead of a single row. + This function can only be called immediately after + or one of its sibling functions, + before any other operation on the connection such as + or + . If called at the correct time, + the function activates the chunked mode for the current query and + returns 1. Otherwise the mode stays unchanged and the function + returns 0. In any case, the mode reverts to normal after + completion of the current query. + + + + + + While processing a query, the server may return some rows and then encounter an error, causing the query to be aborted. Ordinarily, libpq discards any such rows and reports only the - error. But in single-row mode, those rows will have already been + error. But in single-row or chunked modes, those rows will have already been returned to the application. Hence, the application will see some - PGRES_SINGLE_TUPLE PGresult + PGRES_SINGLE_TUPLE or PGRES_TUPLES_CHUNK + PGresult objects followed by a PGRES_FATAL_ERROR object. For proper transactional behavior, the application must be designed to discard or undo whatever has been done with the previously-processed diff --git a/src/bin/pg_amcheck/pg_amcheck.c b/src/bin/pg_amcheck/pg_amcheck.c index a6b3b56457..9c8a0916c7 100644 --- a/src/bin/pg_amcheck/pg_amcheck.c +++ b/src/bin/pg_amcheck/pg_amcheck.c @@ -989,6 +989,7 @@ should_processing_continue(PGresult *res) case PGRES_COPY_IN: case PGRES_COPY_BOTH: case PGRES_SINGLE_TUPLE: + case PGRES_TUPLES_CHUNK: case PGRES_PIPELINE_SYNC: case PGRES_PIPELINE_ABORTED: return false; diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 850734ac96..ae7c84247b 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -191,3 +191,4 @@ PQclosePrepared 188 PQclosePortal 189 PQsendClosePrepared 190 PQsendClosePortal 191 +PQsetChunkedRowsMode 192 diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 04610ccf5e..2e96d1b538 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -41,7 +41,8 @@ char *const pgresStatus[] = { "PGRES_COPY_BOTH", "PGRES_SINGLE_TUPLE", "PGRES_PIPELINE_SYNC", - "PGRES_PIPELINE_ABORTED" + "PGRES_PIPELINE_ABORTED", + "PGRES_TUPLES_CHUNK" }; /* We return this if we're unable to make a PGresult at all */ @@ -82,7 +83,7 @@ static int PQsendTypedCommand(PGconn *conn, char command, char type, static int check_field_number(const PGresult *res, int field_num); static void pqPipelineProcessQueue(PGconn *conn); static int pqPipelineFlush(PGconn *conn); - +static bool canChangeRowMode(PGconn *conn); /* ---------------- * Space management for PGresult. @@ -199,6 +200,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) case PGRES_COPY_IN: case PGRES_COPY_BOTH: case PGRES_SINGLE_TUPLE: + case PGRES_TUPLES_CHUNK: /* non-error cases */ break; default: @@ -910,8 +912,9 @@ pqPrepareAsyncResult(PGconn *conn) /* * Replace conn->result with next_result, if any. In the normal case * there isn't a next result and we're just dropping ownership of the - * current result. In single-row mode this restores the situation to what - * it was before we created the current single-row result. + * current result. In single-row and chunked modes this restores the + * situation to what it was before we created the current single-row or + * chunk-of-rows result. */ conn->result = conn->next_result; conn->error_result = false; /* next_result is never an error */ @@ -1197,10 +1200,11 @@ pqSaveParameterStatus(PGconn *conn, const char *name, const char *value) * (Such a string should already be translated via libpq_gettext().) * If it is left NULL, the error is presumed to be "out of memory". * - * In single-row mode, we create a new result holding just the current row, - * stashing the previous result in conn->next_result so that it becomes - * active again after pqPrepareAsyncResult(). This allows the result metadata - * (column descriptions) to be carried forward to each result row. + * In single-row or chunked mode, we create a new result holding just the + * current set of rows, stashing the previous result in conn->next_result so + * that it becomes active again after pqPrepareAsyncResult(). This allows the + * result metadata (column descriptions) to be carried forward to each result + * row. */ int pqRowProcessor(PGconn *conn, const char **errmsgp) @@ -1225,6 +1229,28 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) if (!res) return 0; } + else if (conn->rowsChunkSize > 0) + { + /* + * In chunked mode, make a new PGresult that will hold N rows; the + * original conn->result is left unchanged, as in the single-row mode. + */ + if (!conn->chunk_result) + { + /* Allocate and initialize the result to hold a chunk of rows */ + res = PQcopyResult(res, + PG_COPYRES_ATTRS | PG_COPYRES_EVENTS | + PG_COPYRES_NOTICEHOOKS); + if (!res) + return 0; + /* Change result status to special chunk-of-rows value */ + res->resultStatus = PGRES_TUPLES_CHUNK; + /* Keep this result to reuse for the next rows of the chunk */ + conn->chunk_result = res; + } + else + res = conn->chunk_result; /* Use the current chunk */ + } /* * Basically we just allocate space in the PGresult for each field and @@ -1287,6 +1313,21 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) conn->asyncStatus = PGASYNC_READY_MORE; } + /* + * In chunked mode, if the count has reached the requested limit, make the + * rows of the current chunk available immediately. + */ + else if (conn->rowsChunkSize > 0 && res->ntups >= conn->rowsChunkSize) + { + /* Stash old result for re-use later */ + conn->next_result = conn->result; + conn->result = res; + /* Do not reuse that chunk of results */ + conn->chunk_result = NULL; + /* And mark the result ready to return */ + conn->asyncStatus = PGASYNC_READY_MORE; + } + return 1; fail: @@ -1742,8 +1783,9 @@ PQsendQueryStart(PGconn *conn, bool newQuery) */ pqClearAsyncResult(conn); - /* reset single-row processing mode */ + /* reset row-by-row and chunked processing modes */ conn->singleRowMode = false; + conn->rowsChunkSize = 0; } /* ready to send command message */ @@ -1927,25 +1969,51 @@ sendFailed: */ int PQsetSingleRowMode(PGconn *conn) +{ + if (canChangeRowMode(conn)) + { + conn->singleRowMode = true; + return 1; + } + else + return 0; +} + +/* + * Select chunked results processing mode + */ +int +PQsetChunkedRowsMode(PGconn *conn, int chunkSize) +{ + if (chunkSize >= 0 && canChangeRowMode(conn)) + { + conn->rowsChunkSize = chunkSize; + return 1; + } + else + return 0; +} + +static +bool +canChangeRowMode(PGconn *conn) { /* - * Only allow setting the flag when we have launched a query and not yet - * received any results. + * Only allow setting the row-by-row or by-chunks modes when we have + * launched a query and not yet received any results. */ if (!conn) - return 0; + return false; if (conn->asyncStatus != PGASYNC_BUSY) - return 0; + return false; if (!conn->cmd_queue_head || (conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE && conn->cmd_queue_head->queryclass != PGQUERY_EXTENDED)) - return 0; + return false; if (pgHavePendingResult(conn)) - return 0; + return false; - /* OK, set flag */ - conn->singleRowMode = true; - return 1; + return true; } /* @@ -2113,6 +2181,16 @@ PQgetResult(PGconn *conn) case PGASYNC_READY: + /* + * If there is a pending chunk of results, return it + */ + if (conn->chunk_result != NULL) + { + res = conn->chunk_result; + conn->chunk_result = NULL; + break; + } + /* * For any query type other than simple query protocol, we advance * the command queue here. This is because for simple query @@ -3151,10 +3229,10 @@ pqPipelineProcessQueue(PGconn *conn) } /* - * Reset single-row processing mode. (Client has to set it up for each + * Reset processing mode in chunks. (Client has to set it up for each * query, if desired.) */ - conn->singleRowMode = false; + conn->rowsChunkSize = 0; /* * If there are no further commands to process in the queue, get us in diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index 97762d56f5..002ed772c8 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -109,8 +109,9 @@ typedef enum PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */ PGRES_SINGLE_TUPLE, /* single tuple from larger resultset */ PGRES_PIPELINE_SYNC, /* pipeline synchronization point */ - PGRES_PIPELINE_ABORTED /* Command didn't run because of an abort + PGRES_PIPELINE_ABORTED, /* Command didn't run because of an abort * earlier in a pipeline */ + PGRES_TUPLES_CHUNK /* set of tuples from larger resultset */ } ExecStatusType; typedef enum @@ -463,6 +464,7 @@ extern int PQsendQueryPrepared(PGconn *conn, const int *paramFormats, int resultFormat); extern int PQsetSingleRowMode(PGconn *conn); +extern int PQsetChunkedRowsMode(PGconn *conn, int chunkSize); extern PGresult *PQgetResult(PGconn *conn); /* Routines for managing an asynchronous query */ diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index c745facfec..7786bd2435 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -431,6 +431,8 @@ struct pg_conn * sending semantics */ PGpipelineStatus pipelineStatus; /* status of pipeline mode */ bool singleRowMode; /* return current query result row-by-row? */ + int rowsChunkSize; /* non-zero to return query results by chunks + * not exceeding that number of rows */ char copy_is_binary; /* 1 = copy binary, 0 = copy text */ int copy_already_done; /* # bytes already returned in COPY OUT */ PGnotify *notifyHead; /* oldest unreported Notify msg */ @@ -536,7 +538,10 @@ struct pg_conn */ PGresult *result; /* result being constructed */ bool error_result; /* do we need to make an ERROR result? */ - PGresult *next_result; /* next result (used in single-row mode) */ + PGresult *next_result; /* next result (used in single-row and + * by-chunks modes) */ + PGresult *chunk_result; /* current chunk of results (limited to + * rowsChunkSize) */ /* Assorted state for SASL, SSL, GSS, etc */ const pg_fe_sasl_mech *sasl; -- 2.34.1