From a5555f339d88ad2a77e567cfd249cb948871b796 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20V=C3=A9rit=C3=A9?= Date: Mon, 20 Nov 2023 18:42:41 +0100 Subject: [PATCH v4 2/2] Reimplement FETCH_COUNT with the chunked mode in libpq instead of cursors. Cursors were used only when the command starts with the keyword "SELECT", excluding queries that start with "WITH" or "UPDATE" or "INSERT" that may also return large result sets. Also, cursors imply more commands sent to the server (begin/declare cursor /repeated fetch/close cursor/commit), whereas in chunked mode, only the actual user query is sent, resulting in less round-trips. This also fixes the bug that combined queries (query1 \; query2;) were not correctly handled with FETCH_COUNT set, due to cursors not supporting multiple queries. --- src/bin/psql/common.c | 545 ++++++++++------------------- src/bin/psql/t/001_basic.pl | 6 +- src/test/regress/expected/psql.out | 9 +- src/test/regress/sql/psql.sql | 4 +- 4 files changed, 196 insertions(+), 368 deletions(-) diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c index daabf6f12b..adb915e5c2 100644 --- a/src/bin/psql/common.c +++ b/src/bin/psql/common.c @@ -31,7 +31,6 @@ #include "settings.h" static bool DescribeQuery(const char *query, double *elapsed_msec); -static bool ExecQueryUsingCursor(const char *query, double *elapsed_msec); static int ExecQueryAndProcessResults(const char *query, double *elapsed_msec, bool *svpt_gone_p, @@ -40,8 +39,6 @@ static int ExecQueryAndProcessResults(const char *query, const printQueryOpt *opt, FILE *printQueryFout); static bool command_no_begin(const char *query); -static bool is_select_command(const char *query); - /* * openQueryOutputFile --- attempt to open a query output file @@ -373,6 +370,7 @@ AcceptResult(const PGresult *result, bool show_error) { case PGRES_COMMAND_OK: case PGRES_TUPLES_OK: + case PGRES_TUPLES_CHUNK: case PGRES_EMPTY_QUERY: case PGRES_COPY_IN: case PGRES_COPY_OUT: @@ -1131,16 +1129,10 @@ SendQuery(const char *query) /* Describe query's result columns, without executing it */ OK = DescribeQuery(query, &elapsed_msec); } - else if (pset.fetch_count <= 0 || pset.gexec_flag || - pset.crosstab_flag || !is_select_command(query)) - { - /* Default fetch-it-all-and-print mode */ - OK = (ExecQueryAndProcessResults(query, &elapsed_msec, &svpt_gone, false, 0, NULL, NULL) > 0); - } else { - /* Fetch-in-segments mode */ - OK = ExecQueryUsingCursor(query, &elapsed_msec); + /* Default fetch-and-print mode */ + OK = (ExecQueryAndProcessResults(query, &elapsed_msec, &svpt_gone, false, 0, NULL, NULL) > 0); } if (!OK && pset.echo == PSQL_ECHO_ERRORS) @@ -1392,6 +1384,47 @@ DescribeQuery(const char *query, double *elapsed_msec) return OK; } +/* + * Check if an output stream for \g needs to be opened, and if + * yes, open it. + * Return false if an error occurred, true otherwise. + */ +static bool +SetupGOutput(PGresult *result, FILE **gfile_fout, bool *is_pipe) +{ + ExecStatusType status = PQresultStatus(result); + if (pset.gfname != NULL && /* there is a \g file or program */ + *gfile_fout == NULL && /* and it's not already opened */ + (status == PGRES_TUPLES_OK || + status == PGRES_TUPLES_CHUNK || + status == PGRES_COPY_OUT)) + { + if (openQueryOutputFile(pset.gfname, gfile_fout, is_pipe)) + { + if (is_pipe) + disable_sigpipe_trap(); + } + else + return false; + } + return true; +} + +static void +CloseGOutput(FILE *gfile_fout, bool is_pipe) +{ + /* close \g file if we opened it */ + if (gfile_fout) + { + if (is_pipe) + { + SetShellResultVariables(pclose(gfile_fout)); + restore_sigpipe_trap(); + } + else + fclose(gfile_fout); + } +} /* * ExecQueryAndProcessResults: utility function for use by SendQuery() @@ -1424,10 +1457,16 @@ ExecQueryAndProcessResults(const char *query, bool return_early = false; instr_time before, after; + int fetch_count = pset.fetch_count; PGresult *result; + FILE *gfile_fout = NULL; bool gfile_is_pipe = false; + int64 total_tuples = 0; + int flush_error = 0; + bool is_pager = false; + if (timing) INSTR_TIME_SET_CURRENT(before); else @@ -1450,6 +1489,29 @@ ExecQueryAndProcessResults(const char *query, return -1; } + /* + * If FETCH_COUNT is set and the context allows it, use the single row + * mode to fetch results and have no more than FETCH_COUNT rows in + * memory. + */ + if (fetch_count > 0 && !pset.crosstab_flag && !pset.gexec_flag && !is_watch + && !pset.gset_prefix && pset.show_all_results) + { + /* + * The row-by-chunks fetch is not enabled when SHOW_ALL_RESULTS is false, + * since we would need to accumulate all rows before knowing + * whether they need to be discarded or displayed, which contradicts + * FETCH_COUNT. + */ + if (!PQsetChunkedRowsMode(pset.db, fetch_count)) + { + pg_log_warning("fetching results in chunks mode is unavailable"); + fetch_count = 0; + } + } + else + fetch_count = 0; /* fetch one resultset per query */ + /* * If SIGINT is sent while the query is processing, the interrupt will be * consumed. The user's intention, though, is to cancel the entire watch @@ -1473,6 +1535,8 @@ ExecQueryAndProcessResults(const char *query, ExecStatusType result_status; PGresult *next_result; bool last; + /* whether the output starts before results are fully fetched */ + bool partial_display = false; if (!AcceptResult(result, false)) { @@ -1568,20 +1632,9 @@ ExecQueryAndProcessResults(const char *query, } else if (pset.gfname) { - /* send to \g file, which we may have opened already */ - if (gfile_fout == NULL) - { - if (openQueryOutputFile(pset.gfname, - &gfile_fout, &gfile_is_pipe)) - { - if (gfile_is_pipe) - disable_sigpipe_trap(); - copy_stream = gfile_fout; - } - else - success = false; - } - else + /* COPY followed by \g filename or \g |program */ + success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe); + if (success) copy_stream = gfile_fout; } else @@ -1599,6 +1652,90 @@ ExecQueryAndProcessResults(const char *query, success &= HandleCopyResult(&result, copy_stream); } + if (fetch_count > 0 && result_status == PGRES_TUPLES_CHUNK) + { + FILE *tuples_fout = printQueryFout ? printQueryFout : stdout; + printQueryOpt my_popt = pset.popt; + + total_tuples = 0; + partial_display = true; + + success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe); + if (gfile_fout) + tuples_fout = gfile_fout; + + /* initialize print options for partial table output */ + my_popt.topt.start_table = true; + my_popt.topt.stop_table = false; + my_popt.topt.prior_records = 0; + + while (success) + { + /* pager: open at most once per resultset */ + if (tuples_fout == stdout && !is_pager) + { + tuples_fout = PageOutput(INT_MAX, &(my_popt.topt)); + is_pager = true; + } + /* display the current chunk of results unless the output stream is not working */ + if (!flush_error) + { + printQuery(result, &my_popt, tuples_fout, is_pager, pset.logfile); + flush_error = fflush(tuples_fout); + } + + /* after the first result set, disallow header decoration */ + my_popt.topt.start_table = false; + my_popt.topt.prior_records += PQntuples(result); + total_tuples += PQntuples(result); + + ClearOrSaveResult(result); + + result = PQgetResult(pset.db); + if (result == NULL) + { + /* + * Error. We expect a PGRES_TUPLES_OK result with + * zero tuple in it to finish the fetch sequence. + */ + success = false; + if (is_pager) + ClosePager(tuples_fout); + break; + } + else if (PQresultStatus(result) == PGRES_TUPLES_OK) + { + /* + * The last row has been read. Display the footer. + */ + my_popt.topt.stop_table = true; + printQuery(result, &my_popt, tuples_fout, is_pager, pset.logfile); + total_tuples += PQntuples(result); + + if (is_pager) + ClosePager(tuples_fout); + ClearOrSaveResult(result); + result = NULL; + break; + } + else if (PQresultStatus(result) != PGRES_TUPLES_CHUNK) + { + /* + * Error. We expect either PGRES_TUPLES_CHUNK or + * PGRES_TUPLES_OK. + */ + if (is_pager) + ClosePager(tuples_fout); + success = false; + AcceptResult(result, true); /* display error whenever appropriate */ + SetResultVariables(result, success); + break; + } + } + } + else + partial_display = false; + /* * Check PQgetResult() again. In the typical case of a single-command * string, it will return NULL. Otherwise, we'll have other results @@ -1627,7 +1764,7 @@ ExecQueryAndProcessResults(const char *query, } /* this may or may not print something depending on settings */ - if (result != NULL) + if (result != NULL && !partial_display) { /* * If results need to be printed into the file specified by \g, @@ -1636,32 +1773,33 @@ ExecQueryAndProcessResults(const char *query, * tuple output, but it's still used for status output. */ FILE *tuples_fout = printQueryFout; - bool do_print = true; - - if (PQresultStatus(result) == PGRES_TUPLES_OK && - pset.gfname) - { - if (gfile_fout == NULL) - { - if (openQueryOutputFile(pset.gfname, - &gfile_fout, &gfile_is_pipe)) - { - if (gfile_is_pipe) - disable_sigpipe_trap(); - } - else - success = do_print = false; - } + success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe); + if (gfile_fout) tuples_fout = gfile_fout; - } - if (do_print) + if (success) success &= PrintQueryResult(result, last, opt, tuples_fout, printQueryFout); } /* set variables from last result */ if (!is_watch && last) - SetResultVariables(result, success); + { + if (!partial_display) + SetResultVariables(result, success); + else if (success) + { + /* + * fake SetResultVariables(). If an error occurred when + * retrieving chunks, these variables have been set already. + */ + char buf[32]; + + SetVariable(pset.vars, "ERROR", "false"); + SetVariable(pset.vars, "SQLSTATE", "00000"); + snprintf(buf, sizeof(buf), INT64_FORMAT, total_tuples); + SetVariable(pset.vars, "ROW_COUNT", buf); + } + } ClearOrSaveResult(result); result = next_result; @@ -1673,17 +1811,7 @@ ExecQueryAndProcessResults(const char *query, } } - /* close \g file if we opened it */ - if (gfile_fout) - { - if (gfile_is_pipe) - { - SetShellResultVariables(pclose(gfile_fout)); - restore_sigpipe_trap(); - } - else - fclose(gfile_fout); - } + CloseGOutput(gfile_fout, gfile_is_pipe); /* may need this to recover from conn loss during COPY */ if (!CheckConnection()) @@ -1696,274 +1824,6 @@ ExecQueryAndProcessResults(const char *query, } -/* - * ExecQueryUsingCursor: run a SELECT-like query using a cursor - * - * This feature allows result sets larger than RAM to be dealt with. - * - * Returns true if the query executed successfully, false otherwise. - * - * If pset.timing is on, total query time (exclusive of result-printing) is - * stored into *elapsed_msec. - */ -static bool -ExecQueryUsingCursor(const char *query, double *elapsed_msec) -{ - bool OK = true; - PGresult *result; - PQExpBufferData buf; - printQueryOpt my_popt = pset.popt; - bool timing = pset.timing; - FILE *fout; - bool is_pipe; - bool is_pager = false; - bool started_txn = false; - int64 total_tuples = 0; - int ntuples; - int fetch_count; - char fetch_cmd[64]; - instr_time before, - after; - int flush_error; - - *elapsed_msec = 0; - - /* initialize print options for partial table output */ - my_popt.topt.start_table = true; - my_popt.topt.stop_table = false; - my_popt.topt.prior_records = 0; - - if (timing) - INSTR_TIME_SET_CURRENT(before); - else - INSTR_TIME_SET_ZERO(before); - - /* if we're not in a transaction, start one */ - if (PQtransactionStatus(pset.db) == PQTRANS_IDLE) - { - result = PQexec(pset.db, "BEGIN"); - OK = AcceptResult(result, true) && - (PQresultStatus(result) == PGRES_COMMAND_OK); - ClearOrSaveResult(result); - if (!OK) - return false; - started_txn = true; - } - - /* Send DECLARE CURSOR */ - initPQExpBuffer(&buf); - appendPQExpBuffer(&buf, "DECLARE _psql_cursor NO SCROLL CURSOR FOR\n%s", - query); - - result = PQexec(pset.db, buf.data); - OK = AcceptResult(result, true) && - (PQresultStatus(result) == PGRES_COMMAND_OK); - if (!OK) - SetResultVariables(result, OK); - ClearOrSaveResult(result); - termPQExpBuffer(&buf); - if (!OK) - goto cleanup; - - if (timing) - { - INSTR_TIME_SET_CURRENT(after); - INSTR_TIME_SUBTRACT(after, before); - *elapsed_msec += INSTR_TIME_GET_MILLISEC(after); - } - - /* - * In \gset mode, we force the fetch count to be 2, so that we will throw - * the appropriate error if the query returns more than one row. - */ - if (pset.gset_prefix) - fetch_count = 2; - else - fetch_count = pset.fetch_count; - - snprintf(fetch_cmd, sizeof(fetch_cmd), - "FETCH FORWARD %d FROM _psql_cursor", - fetch_count); - - /* prepare to write output to \g argument, if any */ - if (pset.gfname) - { - if (!openQueryOutputFile(pset.gfname, &fout, &is_pipe)) - { - OK = false; - goto cleanup; - } - if (is_pipe) - disable_sigpipe_trap(); - } - else - { - fout = pset.queryFout; - is_pipe = false; /* doesn't matter */ - } - - /* clear any pre-existing error indication on the output stream */ - clearerr(fout); - - for (;;) - { - if (timing) - INSTR_TIME_SET_CURRENT(before); - - /* get fetch_count tuples at a time */ - result = PQexec(pset.db, fetch_cmd); - - if (timing) - { - INSTR_TIME_SET_CURRENT(after); - INSTR_TIME_SUBTRACT(after, before); - *elapsed_msec += INSTR_TIME_GET_MILLISEC(after); - } - - if (PQresultStatus(result) != PGRES_TUPLES_OK) - { - /* shut down pager before printing error message */ - if (is_pager) - { - ClosePager(fout); - is_pager = false; - } - - OK = AcceptResult(result, true); - Assert(!OK); - SetResultVariables(result, OK); - ClearOrSaveResult(result); - break; - } - - if (pset.gset_prefix) - { - /* StoreQueryTuple will complain if not exactly one row */ - OK = StoreQueryTuple(result); - ClearOrSaveResult(result); - break; - } - - /* - * Note we do not deal with \gdesc, \gexec or \crosstabview modes here - */ - - ntuples = PQntuples(result); - total_tuples += ntuples; - - if (ntuples < fetch_count) - { - /* this is the last result set, so allow footer decoration */ - my_popt.topt.stop_table = true; - } - else if (fout == stdout && !is_pager) - { - /* - * If query requires multiple result sets, hack to ensure that - * only one pager instance is used for the whole mess - */ - fout = PageOutput(INT_MAX, &(my_popt.topt)); - is_pager = true; - } - - printQuery(result, &my_popt, fout, is_pager, pset.logfile); - - ClearOrSaveResult(result); - - /* after the first result set, disallow header decoration */ - my_popt.topt.start_table = false; - my_popt.topt.prior_records += ntuples; - - /* - * Make sure to flush the output stream, so intermediate results are - * visible to the client immediately. We check the results because if - * the pager dies/exits/etc, there's no sense throwing more data at - * it. - */ - flush_error = fflush(fout); - - /* - * Check if we are at the end, if a cancel was pressed, or if there - * were any errors either trying to flush out the results, or more - * generally on the output stream at all. If we hit any errors - * writing things to the stream, we presume $PAGER has disappeared and - * stop bothering to pull down more data. - */ - if (ntuples < fetch_count || cancel_pressed || flush_error || - ferror(fout)) - break; - } - - if (pset.gfname) - { - /* close \g argument file/pipe */ - if (is_pipe) - { - SetShellResultVariables(pclose(fout)); - restore_sigpipe_trap(); - } - else - fclose(fout); - } - else if (is_pager) - { - /* close transient pager */ - ClosePager(fout); - } - - if (OK) - { - /* - * We don't have a PGresult here, and even if we did it wouldn't have - * the right row count, so fake SetResultVariables(). In error cases, - * we already set the result variables above. - */ - char buf[32]; - - SetVariable(pset.vars, "ERROR", "false"); - SetVariable(pset.vars, "SQLSTATE", "00000"); - snprintf(buf, sizeof(buf), INT64_FORMAT, total_tuples); - SetVariable(pset.vars, "ROW_COUNT", buf); - } - -cleanup: - if (timing) - INSTR_TIME_SET_CURRENT(before); - - /* - * We try to close the cursor on either success or failure, but on failure - * ignore the result (it's probably just a bleat about being in an aborted - * transaction) - */ - result = PQexec(pset.db, "CLOSE _psql_cursor"); - if (OK) - { - OK = AcceptResult(result, true) && - (PQresultStatus(result) == PGRES_COMMAND_OK); - ClearOrSaveResult(result); - } - else - PQclear(result); - - if (started_txn) - { - result = PQexec(pset.db, OK ? "COMMIT" : "ROLLBACK"); - OK &= AcceptResult(result, true) && - (PQresultStatus(result) == PGRES_COMMAND_OK); - ClearOrSaveResult(result); - } - - if (timing) - { - INSTR_TIME_SET_CURRENT(after); - INSTR_TIME_SUBTRACT(after, before); - *elapsed_msec += INSTR_TIME_GET_MILLISEC(after); - } - - return OK; -} - - /* * Advance the given char pointer over white space and SQL comments. */ @@ -2243,43 +2103,6 @@ command_no_begin(const char *query) } -/* - * Check whether the specified command is a SELECT (or VALUES). - */ -static bool -is_select_command(const char *query) -{ - int wordlen; - - /* - * First advance over any whitespace, comments and left parentheses. - */ - for (;;) - { - query = skip_white_space(query); - if (query[0] == '(') - query++; - else - break; - } - - /* - * Check word length (since "selectx" is not "select"). - */ - wordlen = 0; - while (isalpha((unsigned char) query[wordlen])) - wordlen += PQmblenBounded(&query[wordlen], pset.encoding); - - if (wordlen == 6 && pg_strncasecmp(query, "select", 6) == 0) - return true; - - if (wordlen == 6 && pg_strncasecmp(query, "values", 6) == 0) - return true; - - return false; -} - - /* * Test if the current user is a database superuser. */ diff --git a/src/bin/psql/t/001_basic.pl b/src/bin/psql/t/001_basic.pl index 95f4e60ab2..62a5d0f383 100644 --- a/src/bin/psql/t/001_basic.pl +++ b/src/bin/psql/t/001_basic.pl @@ -161,7 +161,7 @@ psql_like( '\errverbose with no previous error'); # There are three main ways to run a query that might affect -# \errverbose: The normal way, using a cursor by setting FETCH_COUNT, +# \errverbose: The normal way, piecemeal retrieval using FETCH_COUNT, # and using \gdesc. Test them all. like( @@ -184,10 +184,10 @@ like( "\\set FETCH_COUNT 1\nSELECT error;\n\\errverbose", on_error_stop => 0))[2], qr/\A^psql::2: ERROR: .*$ -^LINE 2: SELECT error;$ +^LINE 1: SELECT error;$ ^ *^.*$ ^psql::3: error: ERROR: [0-9A-Z]{5}: .*$ -^LINE 2: SELECT error;$ +^LINE 1: SELECT error;$ ^ *^.*$ ^LOCATION: .*$/m, '\errverbose after FETCH_COUNT query with error'); diff --git a/src/test/regress/expected/psql.out b/src/test/regress/expected/psql.out index 13e4f6db7b..aa53f11682 100644 --- a/src/test/regress/expected/psql.out +++ b/src/test/regress/expected/psql.out @@ -4754,7 +4754,7 @@ number of rows: 0 last error message: syntax error at end of input \echo 'last error code:' :LAST_ERROR_SQLSTATE last error code: 42601 --- check row count for a cursor-fetched query +-- check row count for a query with chunked results \set FETCH_COUNT 10 select unique2 from tenk1 order by unique2 limit 19; unique2 @@ -4786,7 +4786,7 @@ error: false error code: 00000 \echo 'number of rows:' :ROW_COUNT number of rows: 19 --- cursor-fetched query with an error after the first group +-- chunked results with an error after the first chunk select 1/(15-unique2) from tenk1 order by unique2 limit 19; ?column? ---------- @@ -4800,6 +4800,11 @@ select 1/(15-unique2) from tenk1 order by unique2 limit 19; 0 0 0 + 0 + 0 + 0 + 0 + 1 ERROR: division by zero \echo 'error:' :ERROR error: true diff --git a/src/test/regress/sql/psql.sql b/src/test/regress/sql/psql.sql index 695c72d866..3c4e6962ba 100644 --- a/src/test/regress/sql/psql.sql +++ b/src/test/regress/sql/psql.sql @@ -1160,14 +1160,14 @@ SELECT 4 AS \gdesc \echo 'last error message:' :LAST_ERROR_MESSAGE \echo 'last error code:' :LAST_ERROR_SQLSTATE --- check row count for a cursor-fetched query +-- check row count for a query with chunked results \set FETCH_COUNT 10 select unique2 from tenk1 order by unique2 limit 19; \echo 'error:' :ERROR \echo 'error code:' :SQLSTATE \echo 'number of rows:' :ROW_COUNT --- cursor-fetched query with an error after the first group +-- chunked results with an error after the first chunk select 1/(15-unique2) from tenk1 order by unique2 limit 19; \echo 'error:' :ERROR \echo 'error code:' :SQLSTATE -- 2.34.1