diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 9ce32fb39b..2a94f8f6b9 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -3061,6 +3061,30 @@ ExecStatusType PQresultStatus(const PGresult *res); + + + PGRES_BATCH_END + + + The PGresult represents the end of a batch. + This status occurs only when batch mode has been selected. + + + + + + PGRES_BATCH_ABORTED + + + The PGresult represents a batch that's + received an error from the server. PQgetResult + must be called repeatedly, and it will return this status code, + until the end of the current batch, at which point it will return + PGRES_BATCH_END and normal processing can resume. + + + + If the result status is PGRES_TUPLES_OK or @@ -4819,6 +4843,482 @@ int PQflush(PGconn *conn); + + Batch Mode + + + libpq + batch mode + + + + pipelining + in libpq + + + + libpq batch mode allows applications to + send a query without having to read the result of the previously + sent query. Taking advantage of the batch mode, a client will wait + less for the server, since multiple queries/results can be sent/ + received in a single network transaction. + + + + While batch mode provides a significant performance boost, writing + clients using the batch mode is more complex because it involves + managing a queue of pending queries and finding which result + corresponds to which query in the queue. + + + + Using Batch Mode + + + To issue batches the application must switch a connection into batch mode. + Enter batch mode with + or test whether batch mode is active with + . + In batch mode, only asynchronous operations + are permitted, and COPY is not recommended as it + may trigger failure in batch processing. Using any synchronous + command execution functions such as PQfn, + PQexec or one of its sibling functions are error + conditions. + + + + + It is best to use batch mode with libpq in + non-blocking mode. If used + in blocking mode it is possible for a client/server deadlock to occur. + + + The client will block trying to send queries to the server, but the + server will block trying to send results to the client from queries + it has already processed. This only occurs when the client sends + enough queries to fill its output buffer and the server's receive + buffer before switching to processing input from the server, + but it's hard to predict exactly when that will happen. + + + + + Batch mode consumes more memory when send/receive is not done as required, + even in non-blocking mode. + + + + + Issuing Queries + + + After entering batch mode the application dispatches requests using + normal asynchronous libpq functions such as + PQsendQueryParams, PQsendPrepare, + PQsendQueryPrepared, PQsendDescribePortal, + PQsendDescribePrepared. + The asynchronous requests are followed by a + + call to mark the end of the batch. The client needs not + call PQgetResult immediately after + dispatching each operation. + Result processing + is handled separately. + + + + The server executes statements, and returns results, in the order the + client sends them. The server may begin executing the batch before all + commands in the batch are queued and the end of batch command is sent. + If any statement encounters an error the server aborts the current + transaction and skips processing the rest of the batch. + Query processing resumes after the end of the failed batch. + + + + It's fine for one operation to depend on the results of a + prior one. One query may define a table that the next query in the same + batch uses; similarly, an application may create a named prepared statement + then execute it with later statements in the same batch. + + + + + Processing Results + + + To process the result of one batch query, the application calls + PQgetResult repeatedly and handles each result + until PQgetResult returns null. + The result from the next batch query may then be retrieved using + PQgetResult again and the cycle repeated. + The application handles individual statement results as normal. + When the results of all the queries in the batch have been + returned, PQgetResult returns a result + containing the status value PGRES_BATCH_END. + + + + The client may choose to defer result processing until the complete + batch has been sent, or interleave that with sending further batch + queries; see . + + + + To enter single-row mode, call PQsetSingleRowMode + before retrieving results with PQgetResult. + This mode selection is effective only for the query currently + being processed. For more information on the use of + PQsetSingleRowMode, + refer to . + + + + PQgetResult behaves the same as for normal + asynchronous processing except that it may contain the new + PGresult types PGRES_BATCH_END + and PGRES_BATCH_ABORTED. + PGRES_BATCH_END is reported exactly once for each + PQbatchSendQueue call at the corresponding point in + the result stream and at no other time. + PGRES_BATCH_ABORTED is emitted during error handling; + see . + + + + PQisBusy, PQconsumeInput, etc + operate as normal when processing batch results. + + + + libpq does not provide any information to the + application about the query currently being processed (except that + PQgetResult returns null to indicate that we start + returning the results of next query). The application must keep track + of the order in which it sent queries and the expected results. + Applications will typically use a state machine or a FIFO queue for this. + + + + + + Error Handling + + + When a query in a batch causes an ERROR the server + skips processing all subsequent messages until the end-of-batch message. + The open transaction is aborted. + + + + From the client perspective, after the client gets a + PGRES_FATAL_ERROR return from + PQresultStatus the batch is flagged as aborted. + libpq will report + PGRES_BATCH_ABORTED result for each remaining queued + operation in an aborted batch. The result for + PQbatchSendQueue is reported as + PGRES_BATCH_END to signal the end of the aborted batch + and resumption of normal result processing. + + + + The client must process results with + PQgetResult during error recovery. + + + + If the batch used an implicit transaction then operations that have + already executed are rolled back and operations that were queued for after + the failed operation are skipped entirely. The same behaviour holds if the + batch starts and commits a single explicit transaction (i.e. the first + statement is BEGIN and the last is + COMMIT) except that the session remains in an aborted + transaction state at the end of the batch. If a batch contains + multiple explicit transactions, all transactions that committed + prior to the error remain committed, the currently in-progress transaction + is aborted and all subsequent operations in the current and all later + transactions in the same batch are skipped completely. + + + + + The client must not assume that work is committed when it + sends a COMMIT, only when the + corresponding result is received to confirm the commit is complete. + Because errors arrive asynchronously the application needs to be able to + restart from the last received committed change and + resend work done after that point if something goes wrong. + + + + + + Interleaving Result Processing and Query Dispatch + + + To avoid deadlocks on large batches the client should be structured + around a non-blocking event loop using operating system facilities + such as select, poll, + WaitForMultipleObjectEx, etc. + + + + The client application should generally maintain a queue of work + still to be dispatched and a queue of work that has been dispatched + but not yet had its results processed. When the socket is writable + it should dispatch more work. When the socket is readable it should + read results and process them, matching them up to the next entry in + its expected results queue. Based on available memory, results from + socket should be read frequently and there's no need to wait till the + batch end to read the results. Batches should be scoped to logical + units of work, usually (but not necessarily) one transaction per batch. + There's no need to exit batch mode and re-enter it between batches + or to wait for one batch to finish before sending the next. + + + + An example using select() and a simple state + machine to track sent and received work is in + src/test/modules/test_libpq/testlibpqbatch.c + in the PostgreSQL source distribution. + + + + + Ending Batch Mode + + + Once all dispatched commands have had their results processed and + the end batch result has been consumed the application may return + to non-batched mode with . + + + + + + Functions Associated with Batch Mode + + + + + + PQbatchStatus + + PQbatchStatus + + + + + + Returns current batch mode status of the libpq + connection. + +int PQbatchStatus(const PGconn *conn); + + + + + PQbatchStatus can return one of the following values: + + + + + PQBATCH_MODE_ON + + + + The libpq connection is in + batch mode. + + + + + + + PQBATCH_MODE_OFF + + + + The libpq connection is + not in batch mode. + + + + + + + PQBATCH_MODE_ABORTED + + + + The libpq connection is in aborted + batch status. The aborted flag is cleared as soon as the result + of the PQbatchSendQueue at the end of the aborted + batch is processed. Clients don't usually need this function to + verify aborted status, as they can tell that the batch is aborted + from the PGRES_BATCH_ABORTED result code. + + + + + + + + + + + + PQenterBatchMode + + PQenterBatchMode + + + + + + Causes a connection to enter batch mode if it is currently idle or + already in batch mode. + + +int PQenterBatchMode(PGconn *conn); + + + + + Returns 1 for success. + Returns 0 and has no effect if the connection is not currently + idle, i.e., it has a result ready, or it is waiting for more + input from the server, etc. + This function does not actually send anything to the server, + it just changes the libpq connection + state. + + + + + + + PQexitBatchMode + + PQexitBatchMode + + + + + + Causes a connection to exit batch mode if it is currently in batch mode + with an empty queue and no pending results. + +int PQexitBatchMode(PGconn *conn); + + + + Returns 1 for success. Returns 1 and takes no action if not in + batch mode. If the current statement isn't finished processing + or there are results pending for collection with + PQgetResult, returns 0 and does nothing. + + + + + + + PQbatchSendQueue + + PQbatchSendQueue + + + + + + Delimits the end of a set of a batched commands by sending a + sync message + and flushing the send buffer. The end of a batch serves as + the delimiter of an implicit transaction and an error recovery + point; see . + + +int PQbatchSendQueue(PGconn *conn); + + + + Returns 1 for success. Returns 0 if the connection is not in + batch mode or sending a + sync message + is failed. + + + + + + + + When to Use Batching + + + Much like asynchronous query mode, there is no performance disadvantage to + using batching and pipelining. It increases client application complexity + and extra caution is required to prevent client/server deadlocks, but + pipelining can sometimes offer considerable performance improvements. + + + + Batching is most useful when the server is distant, i.e., network latency + (ping time) is high, and also when many small operations + are being performed in rapid sequence. There is usually less benefit + in using batches when each query takes many multiples of the client/server + round-trip time to execute. A 100-statement operation run on a server + 300ms round-trip-time away would take 30 seconds in network latency alone + without batching; with batching it may spend as little as 0.3s waiting for + results from the server. + + + + Use batches when your application does lots of small + INSERT, UPDATE and + DELETE operations that can't easily be transformed + into operations on sets or into a COPY operation. + + + + Batching is not useful when information from one operation is required by + the client to produce the next operation. In such cases, the client + must introduce a synchronization point and wait for a full client/server + round-trip to get the results it needs. However, it's often possible to + adjust the client design to exchange the required information server-side. + Read-modify-write cycles are especially good candidates; for example: + +BEGIN; +SELECT x FROM mytable WHERE id = 42 FOR UPDATE; +-- result: x=2 +-- client adds 1 to x: +UPDATE mytable SET x = 3 WHERE id = 42; +COMMIT; + + could be much more efficiently done with: + +UPDATE mytable SET x = x + 1 WHERE id = 42; + + + + + Batching is less useful, and more complex, when a single batch contains + multiple transactions (see ). + + + + + The batch API was introduced in PostgreSQL 14.0, but clients using + the PostgreSQL 14 version of libpq can use + batches on server versions 7.4 and newer. Batching works on any server + that supports the v3 extended query protocol. + + + + + Retrieving Query Results Row-by-Row @@ -4859,6 +5359,17 @@ int PQflush(PGconn *conn); Each object should be freed with as usual. + + + + When using batch mode, activate the single-row mode on the current + batch query by calling PQsetSingleRowMode + before retrieving results with PQgetResult. + See for more information. + + + diff --git a/doc/src/sgml/lobj.sgml b/doc/src/sgml/lobj.sgml index 6329cf0796..49be8b1dbe 100644 --- a/doc/src/sgml/lobj.sgml +++ b/doc/src/sgml/lobj.sgml @@ -130,6 +130,10 @@ libpq library. + + Client applications cannot use these functions while libpq connection is in batch mode. + + Creating a Large Object diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml index 7180fedd65..921e5dccd9 100644 --- a/doc/src/sgml/ref/pgbench.sgml +++ b/doc/src/sgml/ref/pgbench.sgml @@ -1058,6 +1058,20 @@ pgbench options d + + \beginbatch + \endbatch + + + + These commands delimit the start and end of a batch of SQL statements. + In a batch, statements are sent to server without waiting for the results + of previous statements (see ). + Batching requires the extended query protocol. + + + + \gset [prefix] @@ -1086,6 +1100,12 @@ pgbench options d row, the last value is kept. + + \gset and \aset cannot be used + inside a batch section, since query results are not immediately + fetched in this mode. + + The following example puts the final account balance from the first query into variable abalance, and fills variables diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 24f8b3e42e..9ae67387a5 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -1026,6 +1026,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query, walres->status = WALRCV_ERROR; walres->err = pchomp(PQerrorMessage(conn->streamConn)); break; + default: + /* This is just to keep compiler quiet */ + break; } PQclear(pgres); diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 3057665bbe..5846e9153e 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -351,7 +351,8 @@ typedef enum * * CSTATE_START_COMMAND starts the execution of a command. On a SQL * command, the command is sent to the server, and we move to - * CSTATE_WAIT_RESULT state. On a \sleep meta-command, the timer is set, + * CSTATE_WAIT_RESULT state unless in batch mode. + * On a \sleep meta-command, the timer is set, * and we enter the CSTATE_SLEEP state to wait for it to expire. Other * meta-commands are executed immediately. If the command about to start * is actually beyond the end of the script, advance to CSTATE_END_TX. @@ -485,7 +486,9 @@ typedef enum MetaCommand META_IF, /* \if */ META_ELIF, /* \elif */ META_ELSE, /* \else */ - META_ENDIF /* \endif */ + META_ENDIF, /* \endif */ + META_BEGINBATCH, /* \beginbatch */ + META_ENDBATCH /* \endbatch */ } MetaCommand; typedef enum QueryMode @@ -2492,6 +2495,10 @@ getMetaCommand(const char *cmd) mc = META_GSET; else if (pg_strcasecmp(cmd, "aset") == 0) mc = META_ASET; + else if (pg_strcasecmp(cmd, "beginbatch") == 0) + mc = META_BEGINBATCH; + else if (pg_strcasecmp(cmd, "endbatch") == 0) + mc = META_ENDBATCH; else mc = META_NONE; return mc; @@ -2681,11 +2688,24 @@ sendCommand(CState *st, Command *command) if (commands[j]->type != SQL_COMMAND) continue; preparedStatementName(name, st->use_file, j); - res = PQprepare(st->con, name, - commands[j]->argv[0], commands[j]->argc - 1, NULL); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - pg_log_error("%s", PQerrorMessage(st->con)); - PQclear(res); + if (PQbatchStatus(st->con) == PQBATCH_MODE_OFF) + { + res = PQprepare(st->con, name, + commands[j]->argv[0], commands[j]->argc - 1, NULL); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_log_error("%s", PQerrorMessage(st->con)); + PQclear(res); + } + else + { + /* + * In batch mode, we use asynchronous functions. If a server-side + * error occurs, it will be processed later among the other results. + */ + if (!PQsendPrepare(st->con, name, + commands[j]->argv[0], commands[j]->argc - 1, NULL)) + pg_log_error("%s", PQerrorMessage(st->con)); + } } st->prepared[st->use_file] = true; } @@ -2799,6 +2819,12 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix) /* otherwise the result is simply thrown away by PQclear below */ break; + case PGRES_BATCH_END: + pg_log_debug("client %d batch ending", st->id); + if (PQexitBatchMode(st->con) != 1) + pg_log_error("client %d failed to exit batch mode", st->id); + break; + default: /* anything else is unexpected */ pg_log_error("client %d script %d aborted in command %d query %d: %s", @@ -3057,13 +3083,27 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) /* Execute the command */ if (command->type == SQL_COMMAND) { + if ((command->meta == META_GSET || command->meta == META_ASET) + && PQbatchStatus(st->con) != PQBATCH_MODE_OFF) + { + commandFailed(st, "SQL", "\\gset and \\aset are not allowed in a batch section"); + st->state = CSTATE_ABORTED; + break; + } + if (!sendCommand(st, command)) { commandFailed(st, "SQL", "SQL command send failed"); st->state = CSTATE_ABORTED; } else - st->state = CSTATE_WAIT_RESULT; + { + /* Wait for results, unless in batch mode */ + if (PQbatchStatus(st->con) == PQBATCH_MODE_OFF) + st->state = CSTATE_WAIT_RESULT; + else + st->state = CSTATE_END_COMMAND; + } } else if (command->type == META_COMMAND) { @@ -3184,6 +3224,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) } break; + /* * Wait for the current SQL command to complete */ @@ -3203,7 +3244,14 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) if (readCommandResponse(st, sql_script[st->use_file].commands[st->command]->meta, sql_script[st->use_file].commands[st->command]->varprefix)) - st->state = CSTATE_END_COMMAND; + { + /* + * outside of batch mode: stop reading results. + * batch mode: continue reading results until an end of batch response. + */ + if (PQbatchStatus(st->con) != PQBATCH_MODE_ON) + st->state = CSTATE_END_COMMAND; + } else st->state = CSTATE_ABORTED; break; @@ -3447,6 +3495,45 @@ executeMetaCommand(CState *st, instr_time *now) return CSTATE_ABORTED; } } + else if (command->meta == META_BEGINBATCH) + { + /* + * In batch mode, we use a workflow based on libpq batch + * functions. + */ + if (querymode == QUERY_SIMPLE) + { + commandFailed(st, "beginbatch", "cannot use batch mode with the simple query protocol"); + st->state = CSTATE_ABORTED; + return CSTATE_ABORTED; + } + + if (PQbatchStatus(st->con) != PQBATCH_MODE_OFF) + { + commandFailed(st, "beginbatch", "already in batch mode"); + return CSTATE_ABORTED; + } + if (PQenterBatchMode(st->con) == 0) + { + commandFailed(st, "beginbatch", "failed to start a batch"); + return CSTATE_ABORTED; + } + } + else if (command->meta == META_ENDBATCH) + { + if (PQbatchStatus(st->con) != PQBATCH_MODE_ON) + { + commandFailed(st, "endbatch", "not in batch mode"); + return CSTATE_ABORTED; + } + if (!PQbatchSendQueue(st->con)) + { + commandFailed(st, "endbatch", "failed to end the batch"); + return CSTATE_ABORTED; + } + /* collect pending results before getting out of batch mode */ + return CSTATE_WAIT_RESULT; + } /* * executing the expression or shell command might have taken a @@ -4686,6 +4773,12 @@ process_backslash_command(PsqlScanState sstate, const char *source) syntax_error(source, lineno, my_command->first_line, my_command->argv[0], "too many arguments", NULL, -1); } + else if (my_command->meta == META_BEGINBATCH || my_command->meta == META_ENDBATCH) + { + if (my_command->argc != 1) + syntax_error(source, lineno, my_command->first_line, my_command->argv[0], + "unexpected argument", NULL, -1); + } else { /* my_command->meta == META_NONE */ diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index bbc1f90481..ca86f55652 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -179,3 +179,7 @@ PQgetgssctx 176 PQsetSSLKeyPassHook_OpenSSL 177 PQgetSSLKeyPassHook_OpenSSL 178 PQdefaultSSLKeyPassHook_OpenSSL 179 +PQenterBatchMode 180 +PQexitBatchMode 181 +PQbatchSendQueue 182 +PQbatchStatus 183 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index e7781d010f..c673bc405f 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -536,6 +536,23 @@ pqDropConnection(PGconn *conn, bool flushInput) } } +/* + * pqFreeCommandQueue + * Free all the entries of PGcommandQueueEntry queue passed. + */ +static void +pqFreeCommandQueue(PGcommandQueueEntry *queue) +{ + while (queue != NULL) + { + PGcommandQueueEntry *cur = queue; + + queue = cur->next; + if (cur->query) + free(cur->query); + free(cur); + } +} /* * pqDropServerData @@ -555,6 +572,7 @@ pqDropServerData(PGconn *conn) { PGnotify *notify; pgParameterStatus *pstatus; + PGcommandQueueEntry *queue; /* Forget pending notifies */ notify = conn->notifyHead; @@ -567,6 +585,14 @@ pqDropServerData(PGconn *conn) } conn->notifyHead = conn->notifyTail = NULL; + queue = conn->cmd_queue_head; + pqFreeCommandQueue(queue); + conn->cmd_queue_head = conn->cmd_queue_tail = NULL; + + queue = conn->cmd_queue_recycle; + pqFreeCommandQueue(queue); + conn->cmd_queue_recycle = NULL; + /* Reset ParameterStatus data, as well as variables deduced from it */ pstatus = conn->pstatus; while (pstatus != NULL) @@ -6699,6 +6725,15 @@ PQbackendPID(const PGconn *conn) return conn->be_pid; } +int +PQbatchStatus(const PGconn *conn) +{ + if (!conn) + return false; + + return conn->batch_status; +} + int PQconnectionNeedsPassword(const PGconn *conn) { diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index eea0237c3a..93971857d5 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -39,7 +39,9 @@ char *const pgresStatus[] = { "PGRES_NONFATAL_ERROR", "PGRES_FATAL_ERROR", "PGRES_COPY_BOTH", - "PGRES_SINGLE_TUPLE" + "PGRES_SINGLE_TUPLE", + "PGRES_BATCH_END", + "PGRES_BATCH_ABORTED" }; /* @@ -70,6 +72,11 @@ static PGresult *PQexecFinish(PGconn *conn); static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target); static int check_field_number(const PGresult *res, int field_num); +static PGcommandQueueEntry *pqMakePipelineCmd(PGconn *conn); +static void pqAppendPipelineCmd(PGconn *conn, PGcommandQueueEntry *entry); +static void pqRecyclePipelineCmd(PGconn *conn, PGcommandQueueEntry *entry); +static int pqBatchProcessQueue(PGconn *conn); +static int pqBatchFlush(PGconn *conn); /* ---------------- @@ -1210,7 +1217,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) conn->next_result = conn->result; conn->result = res; /* And mark the result ready to return */ - conn->asyncStatus = PGASYNC_READY; + conn->asyncStatus = PGASYNC_READY_MORE; } return 1; @@ -1233,6 +1240,13 @@ fail: int PQsendQuery(PGconn *conn, const char *query) { + if (conn->batch_status != PQBATCH_MODE_OFF) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("cannot PQsendQuery in batch mode, use PQsendQueryParams\n")); + return 0; + } + if (!PQsendQueryStart(conn)) return 0; @@ -1263,10 +1277,11 @@ PQsendQuery(PGconn *conn, const char *query) conn->last_query = strdup(query); /* - * Give the data a push. In nonblock mode, don't complain if we're unable - * to send it all; PQgetResult() will do any additional flushing needed. + * Give the data a push (in batch mode, only if we're past the size + * threshold). In nonblock mode, don't complain if we're unable to send + * it all; PQgetResult() will do any additional flushing needed. */ - if (pqFlush(conn) < 0) + if (pqBatchFlush(conn) < 0) { /* error message should be set up already */ return 0; @@ -1331,6 +1346,10 @@ PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes) { + PGcommandQueueEntry *pipeCmd = NULL; + char **last_query; + PGQueryClass *queryclass; + if (!PQsendQueryStart(conn)) return 0; @@ -1362,6 +1381,15 @@ PQsendPrepare(PGconn *conn, return 0; } + /* Alloc batch memory before doing anything */ + if (conn->batch_status != PQBATCH_MODE_OFF) + { + pipeCmd = pqMakePipelineCmd(conn); + + if (pipeCmd == NULL) + return 0; /* error msg already set */ + } + /* construct the Parse message */ if (pqPutMsgStart('P', false, conn) < 0 || pqPuts(stmtName, conn) < 0 || @@ -1389,31 +1417,47 @@ PQsendPrepare(PGconn *conn, goto sendFailed; /* construct the Sync message */ - if (pqPutMsgStart('S', false, conn) < 0 || - pqPutMsgEnd(conn) < 0) - goto sendFailed; + if (conn->batch_status == PQBATCH_MODE_OFF) + { + if (pqPutMsgStart('S', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + + last_query = &conn->last_query; + queryclass = &conn->queryclass; + } + else + { + last_query = &pipeCmd->query; + queryclass = &pipeCmd->queryclass; + } /* remember we are doing just a Parse */ - conn->queryclass = PGQUERY_PREPARE; + *queryclass = PGQUERY_PREPARE; /* and remember the query text too, if possible */ /* if insufficient memory, last_query just winds up NULL */ - if (conn->last_query) - free(conn->last_query); - conn->last_query = strdup(query); + if (*last_query) + free(*last_query); + *last_query = strdup(query); /* - * Give the data a push. In nonblock mode, don't complain if we're unable - * to send it all; PQgetResult() will do any additional flushing needed. + * Give the data a push (in batch mode, only if we're past the size + * threshold). In nonblock mode, don't complain if we're unable to send + * it all; PQgetResult() will do any additional flushing needed. */ - if (pqFlush(conn) < 0) + if (pqBatchFlush(conn) < 0) goto sendFailed; /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + if (conn->batch_status != PQBATCH_MODE_OFF) + pqAppendPipelineCmd(conn, pipeCmd); + else + conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: + pqRecyclePipelineCmd(conn, pipeCmd); /* error message should be set up already */ return 0; } @@ -1461,7 +1505,8 @@ PQsendQueryPrepared(PGconn *conn, } /* - * Common startup code for PQsendQuery and sibling routines + * PQsendQueryStart + * Common startup code for PQsendQuery and sibling routines */ static bool PQsendQueryStart(PGconn *conn) @@ -1479,20 +1524,62 @@ PQsendQueryStart(PGconn *conn) libpq_gettext("no connection to the server\n")); return false; } - /* Can't send while already busy, either. */ - if (conn->asyncStatus != PGASYNC_IDLE) + + /* Can't send while already busy, either, unless enqueuing for later */ + if (conn->asyncStatus != PGASYNC_IDLE && + conn->batch_status == PQBATCH_MODE_OFF) { printfPQExpBuffer(&conn->errorMessage, libpq_gettext("another command is already in progress\n")); return false; } - /* initialize async result-accumulation state */ - pqClearAsyncResult(conn); + if (conn->batch_status != PQBATCH_MODE_OFF) + { + /* + * When enqueuing a message we don't change much of the connection + * state since it's already in use for the current command. The + * connection state will get updated when PQbatchQueueProcess(...) + * advances to start processing the queued message. + * + * Just make sure we can safely enqueue given the current connection + * state. We can enqueue behind another queue item, or behind a + * non-queue command (one that sends its own sync), but we can't + * enqueue if the connection is in a copy state. + */ + switch (conn->asyncStatus) + { + case PGASYNC_QUEUED: + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + /* ok to queue */ + break; + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("cannot queue commands during COPY\n")); + return false; + break; + case PGASYNC_IDLE: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("internal error, idle state in batch mode")); + break; + } + } + else + { + /* + * This command's results will come in immediately. Initialize async + * result-accumulation state + */ + pqClearAsyncResult(conn); - /* reset single-row processing mode */ - conn->singleRowMode = false; + /* reset single-row processing mode */ + conn->singleRowMode = false; + } /* ready to send command message */ return true; } @@ -1516,6 +1603,10 @@ PQsendQueryGuts(PGconn *conn, int resultFormat) { int i; + PGcommandQueueEntry *pipeCmd = NULL; + char **last_query; + PGQueryClass *queryclass; + /* This isn't gonna work on a 2.0 server */ if (PG_PROTOCOL_MAJOR(conn->pversion) < 3) @@ -1525,6 +1616,23 @@ PQsendQueryGuts(PGconn *conn, return 0; } + if (conn->batch_status != PQBATCH_MODE_OFF) + { + pipeCmd = pqMakePipelineCmd(conn); + + if (pipeCmd == NULL) + return 0; /* error msg already set */ + + last_query = &pipeCmd->query; + queryclass = &pipeCmd->queryclass; + } + else + { + last_query = &conn->last_query; + queryclass = &conn->queryclass; + } + + /* * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync, * using specified statement name and the unnamed portal. @@ -1637,35 +1745,43 @@ PQsendQueryGuts(PGconn *conn, pqPutMsgEnd(conn) < 0) goto sendFailed; - /* construct the Sync message */ - if (pqPutMsgStart('S', false, conn) < 0 || - pqPutMsgEnd(conn) < 0) - goto sendFailed; + if (conn->batch_status == PQBATCH_MODE_OFF) + { + /* construct the Sync message */ + if (pqPutMsgStart('S', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + } /* remember we are using extended query protocol */ - conn->queryclass = PGQUERY_EXTENDED; + *queryclass = PGQUERY_EXTENDED; /* and remember the query text too, if possible */ /* if insufficient memory, last_query just winds up NULL */ - if (conn->last_query) - free(conn->last_query); + if (*last_query) + free(*last_query); if (command) - conn->last_query = strdup(command); + *last_query = strdup(command); else - conn->last_query = NULL; + *last_query = NULL; /* - * Give the data a push. In nonblock mode, don't complain if we're unable - * to send it all; PQgetResult() will do any additional flushing needed. + * Give the data a push (in batch mode, only if we're past the size + * threshold). In nonblock mode, don't complain if we're unable to send + * it all; PQgetResult() will do any additional flushing needed. */ - if (pqFlush(conn) < 0) + if (pqBatchFlush(conn) < 0) goto sendFailed; /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + if (conn->batch_status != PQBATCH_MODE_OFF) + pqAppendPipelineCmd(conn, pipeCmd); + else + conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: + pqRecyclePipelineCmd(conn, pipeCmd); /* error message should be set up already */ return 0; } @@ -1766,14 +1882,17 @@ PQisBusy(PGconn *conn) return conn->asyncStatus == PGASYNC_BUSY || conn->write_failed; } - /* * PQgetResult * Get the next PGresult produced by a query. Returns NULL if no * query work remains or an error has occurred (e.g. out of * memory). + * + * In batch mode, once all the result of a query have been returned, + * PQgetResult returns NULL to let the user know that the next batched + * query is being processed. At the end of the batch, returns a + * end-of-batch result with PQresultStatus(result) == PGRES_BATCH_END. */ - PGresult * PQgetResult(PGconn *conn) { @@ -1842,9 +1961,38 @@ PQgetResult(PGconn *conn) switch (conn->asyncStatus) { case PGASYNC_IDLE: + case PGASYNC_QUEUED: res = NULL; /* query is complete */ + if (conn->batch_status != PQBATCH_MODE_OFF) + { + /* + * In batch mode, we prepare the processing of the results of + * the next query. + */ + pqBatchProcessQueue(conn); + } break; case PGASYNC_READY: + res = pqPrepareAsyncResult(conn); + if (conn->batch_status != PQBATCH_MODE_OFF) + { + /* + * In batch mode, query execution state cannot be IDLE as + * there can be other queries or results waiting in the queue + * + * The connection isn't idle since we can't submit new + * nonbatched commands. It isn't also busy since the current + * command is done and we need to process a new one. + */ + conn->asyncStatus = PGASYNC_QUEUED; + } + else + { + /* Set the state back to BUSY, allowing parsing to proceed. */ + conn->asyncStatus = PGASYNC_BUSY; + } + break; + case PGASYNC_READY_MORE: res = pqPrepareAsyncResult(conn); /* Set the state back to BUSY, allowing parsing to proceed. */ conn->asyncStatus = PGASYNC_BUSY; @@ -2025,6 +2173,13 @@ PQexecStart(PGconn *conn) if (!conn) return false; + if (conn->batch_status != PQBATCH_MODE_OFF) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("Synchronous command execution functions are not allowed in batch mode\n")); + return false; + } + /* * Silently discard any prior query result that application didn't eat. * This is probably poor design, but it's here for backward compatibility. @@ -2219,6 +2374,9 @@ PQsendDescribePortal(PGconn *conn, const char *portal) static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) { + PGcommandQueueEntry *pipeCmd = NULL; + PGQueryClass *queryclass; + /* Treat null desc_target as empty string */ if (!desc_target) desc_target = ""; @@ -2234,6 +2392,18 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) return 0; } + if (conn->batch_status != PQBATCH_MODE_OFF) + { + pipeCmd = pqMakePipelineCmd(conn); + + if (pipeCmd == NULL) + return 0; /* error msg already set */ + + queryclass = &pipeCmd->queryclass; + } + else + queryclass = &conn->queryclass; + /* construct the Describe message */ if (pqPutMsgStart('D', false, conn) < 0 || pqPutc(desc_type, conn) < 0 || @@ -2242,32 +2412,40 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) goto sendFailed; /* construct the Sync message */ - if (pqPutMsgStart('S', false, conn) < 0 || - pqPutMsgEnd(conn) < 0) - goto sendFailed; + if (conn->batch_status == PQBATCH_MODE_OFF) + { + if (pqPutMsgStart('S', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + } /* remember we are doing a Describe */ - conn->queryclass = PGQUERY_DESCRIBE; + *queryclass = PGQUERY_DESCRIBE; - /* reset last_query string (not relevant now) */ - if (conn->last_query) + /* reset last-query string (not relevant now) */ + if (conn->last_query && conn->batch_status != PQBATCH_MODE_OFF) { free(conn->last_query); conn->last_query = NULL; } /* - * Give the data a push. In nonblock mode, don't complain if we're unable - * to send it all; PQgetResult() will do any additional flushing needed. + * Give the data a push (in batch mode, only if we're past the size + * threshold). In nonblock mode, don't complain if we're unable to send + * it all; PQgetResult() will do any additional flushing needed. */ - if (pqFlush(conn) < 0) + if (pqBatchFlush(conn) < 0) goto sendFailed; /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + if (conn->batch_status != PQBATCH_MODE_OFF) + pqAppendPipelineCmd(conn, pipeCmd); + else + conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: + pqRecyclePipelineCmd(conn, pipeCmd); /* error message should be set up already */ return 0; } @@ -2665,6 +2843,13 @@ PQfn(PGconn *conn, /* clear the error string */ resetPQExpBuffer(&conn->errorMessage); + if (conn->batch_status != PQBATCH_MODE_OFF) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("PQfn not allowed in batch mode\n")); + return NULL; + } + if (conn->sock == PGINVALID_SOCKET || conn->asyncStatus != PGASYNC_IDLE || conn->result != NULL) { @@ -2685,6 +2870,377 @@ PQfn(PGconn *conn, args, nargs); } +/* ====== Batch mode support ======== */ + +/* + * PQenterBatchMode + * Put an idle connection in batch mode. + * + * Returns 1 on success. On failure, errorMessage is set and 0 is returned. + * + * Commands submitted after this can be pipelined on the connection; + * there's no requirement to wait for one to finish before the next is + * dispatched. + * + * Queuing of a new query or syncing during COPY is not allowed. + * + * A set of commands is terminated by a PQbatchQueueSync. Multiple sets of + * batched commands may be sent while in batch mode. Batch mode can be exited + * by calling PQexitBatchMode() once all results are processed. + * + * This doesn't actually send anything on the wire, it just puts libpq + * into a state where it can pipeline work. + */ +int +PQenterBatchMode(PGconn *conn) +{ + if (!conn) + return 0; + + /* succeed with no action if already in batch mode */ + if (conn->batch_status != PQBATCH_MODE_OFF) + return 1; + + if (conn->asyncStatus != PGASYNC_IDLE) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("cannot enter batch mode, connection not idle\n")); + return 0; + } + + conn->batch_status = PQBATCH_MODE_ON; + conn->asyncStatus = PGASYNC_QUEUED; + + return 1; +} + +/* + * PQexitBatchMode + * End batch mode and return to normal command mode. + * + * Returns 1 in success (batch mode was ended, or not in batch mode). + * + * Returns 0 if in batch mode and cannot be ended yet. + * Error message will be set. + */ +int +PQexitBatchMode(PGconn *conn) +{ + if (!conn) + return 0; + + if (conn->batch_status == PQBATCH_MODE_OFF) + return 1; + + switch (conn->asyncStatus) + { + case PGASYNC_READY: + case PGASYNC_READY_MORE: + /* there are some uncollected results */ + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("cannot exit batch mode with uncollected results\n")); + return 0; + + case PGASYNC_BUSY: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("cannot exit batch mode while busy\n")); + return 0; + + default: + /* OK */ + break; + } + + /* still work to process */ + if (conn->cmd_queue_head != NULL) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("command queue not clean")); + return 0; + } + + conn->batch_status = PQBATCH_MODE_OFF; + conn->asyncStatus = PGASYNC_IDLE; + + /* Flush any pending data in out buffer */ + if (pqFlush(conn) < 0) + return 0; /* error message is setup already */ + return 1; +} + +/* + * internal function pqBatchProcessQueue + * + * In batch mode, start processing the next query in the queue. + * + * Returns 1 if the next query was popped from the queue and can + * be processed by PQconsumeInput, PQgetResult, etc. + * + * Returns 0 if the current query isn't done yet, the connection + * is not in a batch, or there are no more queries to process. + */ +static int +pqBatchProcessQueue(PGconn *conn) +{ + PGcommandQueueEntry *next_query; + + if (!conn) + return 0; + + if (conn->batch_status == PQBATCH_MODE_OFF) + return 0; + + switch (conn->asyncStatus) + { + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + /* should be unreachable */ + printfPQExpBuffer(&conn->errorMessage, + "internal error, COPY in batch mode"); + break; + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + /* client still has to process current query or results */ + return 0; + break; + case PGASYNC_IDLE: + /* should be unreachable */ + printfPQExpBuffer(&conn->errorMessage, + "internal error, IDLE in batch mode"); + break; + case PGASYNC_QUEUED: + /* next query please */ + break; + } + + if (conn->cmd_queue_head == NULL) + { + /* + * In batch mode but nothing left on the queue; caller can submit more + * work or PQexitBatchMode() now. + */ + return 0; + } + + /* + * Pop the next query from the queue and set up the connection state as if + * it'd just been dispatched from a non-batched call + */ + next_query = conn->cmd_queue_head; + conn->cmd_queue_head = next_query->next; + next_query->next = NULL; + + /* Initialize async result-accumulation state */ + pqClearAsyncResult(conn); + + /* reset single-row processing mode XXX why?? */ + conn->singleRowMode = false; + + + conn->last_query = next_query->query; + next_query->query = NULL; + conn->queryclass = next_query->queryclass; + + pqRecyclePipelineCmd(conn, next_query); + + if (conn->batch_status == PQBATCH_MODE_ABORTED && + conn->queryclass != PGQUERY_SYNC) + { + /* + * In an aborted batch we don't get anything from the server for each + * result; we're just discarding input until we get to the next sync + * from the server. The client needs to know its queries got aborted + * so we create a fake PGresult to return immediately from + * PQgetResult. + */ + conn->result = + PQmakeEmptyPGresult(conn, PGRES_BATCH_ABORTED); + if (!conn->result) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("out of memory")); + pqSaveErrorResult(conn); + return 0; + } + conn->asyncStatus = PGASYNC_READY; + } + else + { + /* allow parsing to continue */ + conn->asyncStatus = PGASYNC_BUSY; + } + + return 1; +} + +/* + * PQbatchSendQueue + * End a batch submission. + * + * It's legal to start submitting another batch immediately, without + * waiting for the results of the current batch. There's no need to end batch + * mode and start it again. + * + * If a command in a batch fails, every subsequent command up to and + * including the PQbatchQueueSync command result gets set to PGRES_BATCH_ABORTED + * state. If the whole batch is processed without error, a PGresult with + * PGRES_BATCH_END is produced. + * + * Queries can already have been sent before PQbatchSendQueue is called, but + * PQbatchSendQueue need to be called before retrieving command results. + * + * The connection will remain in batch mode and unavailable for new synchronous + * command execution functions until all results from the batch are processed + * by the client. + */ +int +PQbatchSendQueue(PGconn *conn) +{ + PGcommandQueueEntry *entry; + + if (!conn) + return 0; + + if (conn->batch_status == PQBATCH_MODE_OFF) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("cannot send batch when not in batch mode\n")); + return 0; + } + + switch (conn->asyncStatus) + { + case PGASYNC_IDLE: + /* should be unreachable */ + printfPQExpBuffer(&conn->errorMessage, + "internal error: cannot send batch when idle\n"); + return 0; + break; + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + /* should be unreachable */ + printfPQExpBuffer(&conn->errorMessage, + "internal error: cannot send batch while in COPY\n"); + return 0; + break; + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + case PGASYNC_QUEUED: + /* can send sync to end this batch of cmds */ + break; + } + + entry = pqMakePipelineCmd(conn); + if (entry == NULL) + return 0; /* error msg already set */ + + entry->queryclass = PGQUERY_SYNC; + entry->query = NULL; + + /* construct the Sync message */ + if (pqPutMsgStart('S', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + + pqAppendPipelineCmd(conn, entry); + + /* + * Give the data a push. In nonblock mode, don't complain if we're unable + * to send it all; PQgetResult() will do any additional flushing needed. + */ + if (PQflush(conn) < 0) + goto sendFailed; + + /* + * Call pqBatchProcessQueue so the user can call start calling getResult. + */ + pqBatchProcessQueue(conn); + + return 1; + +sendFailed: + pqRecyclePipelineCmd(conn, entry); + /* error message should be set up already */ + return 0; +} + +/* + * pqMakePipelineCmd + * Get a command queue entry for caller to fill. + * + * If the recycle queue has a free element, that is returned; if not, a + * fresh one is allocated. Caller is responsible for adding it to the + * command queue (pqAppendPipelineCmd) once the struct is filled in, or + * releasing the memory (pqRecyclePipelineCmd) if an error occurs. + * + * If allocation fails, sets the error message and returns NULL. + */ +static PGcommandQueueEntry * +pqMakePipelineCmd(PGconn *conn) +{ + PGcommandQueueEntry *entry; + + if (conn->cmd_queue_recycle == NULL) + { + entry = (PGcommandQueueEntry *) malloc(sizeof(PGcommandQueueEntry)); + if (entry == NULL) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("out of memory\n")); + return NULL; + } + } + else + { + entry = conn->cmd_queue_recycle; + conn->cmd_queue_recycle = entry->next; + } + entry->next = NULL; + entry->query = NULL; + + return entry; +} + +/* + * pqAppendPipelineCmd + * Append a precreated command queue entry to the queue after it's been + * sent successfully. + */ +static void +pqAppendPipelineCmd(PGconn *conn, PGcommandQueueEntry *entry) +{ + if (conn->cmd_queue_head == NULL) + conn->cmd_queue_head = entry; + else + conn->cmd_queue_tail->next = entry; + conn->cmd_queue_tail = entry; +} + +/* + * pqRecyclePipelineCmd + * Push a command queue entry onto the freelist. It must be an entry + * with null next pointer and not referenced by any other entry's next + * pointer. + */ +static void +pqRecyclePipelineCmd(PGconn *conn, PGcommandQueueEntry *entry) +{ + if (entry == NULL) + return; + + Assert(entry->next == NULL); + + if (entry->query) + free(entry->query); + + entry->next = conn->cmd_queue_recycle; + conn->cmd_queue_recycle = entry; +} + /* ====== accessor funcs for PGresult ======== */ @@ -3285,6 +3841,23 @@ PQflush(PGconn *conn) return pqFlush(conn); } +/* + * pqBatchFlush + * + * In batch mode, data will be flushed only when the out buffer reaches the + * threshold value. In non-batch mode, data will be flushed all the time. + * + * Returns 0 on success. + */ +static int +pqBatchFlush(PGconn *conn) +{ + if ((conn->batch_status == PQBATCH_MODE_OFF) || + (conn->outCount >= OUTBUFFER_THRESHOLD)) + return (pqFlush(conn)); + return 0; +} + /* * PQfreemem - safely frees memory allocated diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c index 9360c541be..2ff3fa4883 100644 --- a/src/interfaces/libpq/fe-protocol2.c +++ b/src/interfaces/libpq/fe-protocol2.c @@ -406,6 +406,12 @@ pqParseInput2(PGconn *conn) { char id; + if (conn->batch_status != PQBATCH_MODE_OFF) + { + fprintf(stderr, "internal error, attempt to read v2 protocol in batch mode"); + abort(); + } + /* * Loop to parse successive complete messages available in the buffer. */ diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 1696525475..da38e6aed1 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -217,10 +217,19 @@ pqParseInput3(PGconn *conn) return; conn->asyncStatus = PGASYNC_READY; break; - case 'Z': /* backend is ready for new query */ + case 'Z': /* sync response, backend is ready for new + * query */ if (getReadyForQuery(conn)) return; - conn->asyncStatus = PGASYNC_IDLE; + if (conn->batch_status != PQBATCH_MODE_OFF) + { + conn->batch_status = PQBATCH_MODE_ON; + conn->result = PQmakeEmptyPGresult(conn, + PGRES_BATCH_END); + conn->asyncStatus = PGASYNC_READY; + } + else + conn->asyncStatus = PGASYNC_IDLE; break; case 'I': /* empty query */ if (conn->result == NULL) @@ -875,6 +884,9 @@ pqGetErrorNotice3(PGconn *conn, bool isError) PQExpBufferData workBuf; char id; + if (isError && conn->batch_status != PQBATCH_MODE_OFF) + conn->batch_status = PQBATCH_MODE_ABORTED; + /* * If this is an error message, pre-emptively clear any incomplete query * result we may have. We'd just throw it away below anyway, and diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index 3b6a9fbce3..fcdd887958 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -97,7 +97,10 @@ typedef enum PGRES_NONFATAL_ERROR, /* notice or warning message */ PGRES_FATAL_ERROR, /* query failed */ PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */ - PGRES_SINGLE_TUPLE /* single tuple from larger resultset */ + PGRES_SINGLE_TUPLE, /* single tuple from larger resultset */ + PGRES_BATCH_END, /* end of a batch of commands */ + PGRES_BATCH_ABORTED, /* Command didn't run because of an abort + * earlier in a batch */ } ExecStatusType; typedef enum @@ -137,6 +140,17 @@ typedef enum PQPING_NO_ATTEMPT /* connection not attempted (bad params) */ } PGPing; +/* + * PQBatchStatus - Current status of batch mode + */ + +typedef enum +{ + PQBATCH_MODE_OFF, + PQBATCH_MODE_ON, + PQBATCH_MODE_ABORTED +} PQBatchStatus; + /* PGconn encapsulates a connection to the backend. * The contents of this struct are not supposed to be known to applications. */ @@ -328,6 +342,7 @@ extern int PQserverVersion(const PGconn *conn); extern char *PQerrorMessage(const PGconn *conn); extern int PQsocket(const PGconn *conn); extern int PQbackendPID(const PGconn *conn); +extern int PQbatchStatus(const PGconn *conn); extern int PQconnectionNeedsPassword(const PGconn *conn); extern int PQconnectionUsedPassword(const PGconn *conn); extern int PQclientEncoding(const PGconn *conn); @@ -435,6 +450,11 @@ extern PGresult *PQgetResult(PGconn *conn); extern int PQisBusy(PGconn *conn); extern int PQconsumeInput(PGconn *conn); +/* Routines for batch mode management */ +extern int PQenterBatchMode(PGconn *conn); +extern int PQexitBatchMode(PGconn *conn); +extern int PQbatchSendQueue(PGconn *conn); + /* LISTEN/NOTIFY support */ extern PGnotify *PQnotifies(PGconn *conn); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 1de91ae295..d2b26f2299 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -217,10 +217,15 @@ typedef enum { PGASYNC_IDLE, /* nothing's happening, dude */ PGASYNC_BUSY, /* query in progress */ - PGASYNC_READY, /* result ready for PQgetResult */ + PGASYNC_READY, /* query done, waiting for client to fetch + * result */ + PGASYNC_READY_MORE, /* query done, waiting for client to fetch + * result, More results expected from this + * query */ PGASYNC_COPY_IN, /* Copy In data transfer in progress */ PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */ - PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */ + PGASYNC_COPY_BOTH, /* Copy In/Out data transfer in progress */ + PGASYNC_QUEUED /* Current query done, more in queue */ } PGAsyncStatusType; /* PGQueryClass tracks which query protocol we are now executing */ @@ -229,7 +234,8 @@ typedef enum PGQUERY_SIMPLE, /* simple Query protocol (PQexec) */ PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */ PGQUERY_PREPARE, /* Parse only (PQprepare) */ - PGQUERY_DESCRIBE /* Describe Statement or Portal */ + PGQUERY_DESCRIBE, /* Describe Statement or Portal */ + PGQUERY_SYNC /* A protocol sync to end a batch */ } PGQueryClass; /* PGSetenvStatusType defines the state of the pqSetenv state machine */ @@ -301,6 +307,22 @@ typedef enum pg_conn_host_type CHT_UNIX_SOCKET } pg_conn_host_type; +/* An entry in the pending command queue. Used by batch mode to keep track + * of the expected results of future commands we've dispatched. + * + * Note that entries in this list are reused by being zeroed and appended to + * the tail when popped off the head. The entry with null next pointer is not + * the end of the list of expected commands, that's the tail pointer in + * pg_conn. + */ +typedef struct pgCommandQueueEntry +{ + PGQueryClass queryclass; /* Query type; PGQUERY_SYNC for sync msg */ + char *query; /* SQL command, or NULL if unknown */ + struct pgCommandQueueEntry *next; +} PGcommandQueueEntry; + + /* * pg_conn_host stores all information about each of possibly several hosts * mentioned in the connection string. Most fields are derived by splitting @@ -394,6 +416,7 @@ struct pg_conn bool options_valid; /* true if OK to attempt connection */ bool nonblocking; /* whether this connection is using nonblock * sending semantics */ + PQBatchStatus batch_status; /* Batch(pipelining) mode status of connection */ bool singleRowMode; /* return current query result row-by-row? */ char copy_is_binary; /* 1 = copy binary, 0 = copy text */ int copy_already_done; /* # bytes already returned in COPY OUT */ @@ -406,6 +429,16 @@ struct pg_conn pg_conn_host *connhost; /* details about each named host */ char *connip; /* IP address for current network connection */ + /* + * The command queue + * + * head is the next pending cmd, tail is where we append new commands. + * Freed entries for recycling go on the recycle linked list. + */ + PGcommandQueueEntry *cmd_queue_head; + PGcommandQueueEntry *cmd_queue_tail; + PGcommandQueueEntry *cmd_queue_recycle; + /* Connection data */ pgsocket sock; /* FD for socket, PGINVALID_SOCKET if * unconnected */ @@ -798,6 +831,11 @@ extern ssize_t pg_GSS_read(PGconn *conn, void *ptr, size_t len); */ #define pqIsnonblocking(conn) ((conn)->nonblocking) +/* + * Connection's outbuffer threshold. + */ +#define OUTBUFFER_THRESHOLD 65536 + #ifdef ENABLE_NLS extern char *libpq_gettext(const char *msgid) pg_attribute_format_arg(1); extern char *libpq_ngettext(const char *msgid, const char *msgid_plural, unsigned long n) pg_attribute_format_arg(1) pg_attribute_format_arg(2); diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index a6d2ffbf9e..287214c544 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -17,6 +17,7 @@ SUBDIRS = \ test_extensions \ test_ginpostinglist \ test_integerset \ + test_libpq \ test_misc \ test_parser \ test_pg_dump \ diff --git a/src/test/modules/test_libpq/.gitignore b/src/test/modules/test_libpq/.gitignore new file mode 100644 index 0000000000..11e8463984 --- /dev/null +++ b/src/test/modules/test_libpq/.gitignore @@ -0,0 +1,5 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ +/testlibpqbatch diff --git a/src/test/modules/test_libpq/Makefile b/src/test/modules/test_libpq/Makefile new file mode 100644 index 0000000000..6d3a0ea4d5 --- /dev/null +++ b/src/test/modules/test_libpq/Makefile @@ -0,0 +1,20 @@ +# src/test/modules/test_libpq/Makefile + +PROGRAM = testlibpqbatch +OBJS = testlibpqbatch.o + +PG_CPPFLAGS = -I$(libpq_srcdir) +PG_LIBS += $(libpq) + +TAP_TESTS = 1 + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_libpq +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_libpq/README b/src/test/modules/test_libpq/README new file mode 100644 index 0000000000..d8174dd579 --- /dev/null +++ b/src/test/modules/test_libpq/README @@ -0,0 +1 @@ +Test programs and libraries for libpq diff --git a/src/test/modules/test_libpq/t/001_libpq_async.pl b/src/test/modules/test_libpq/t/001_libpq_async.pl new file mode 100644 index 0000000000..0b27896a2a --- /dev/null +++ b/src/test/modules/test_libpq/t/001_libpq_async.pl @@ -0,0 +1,26 @@ +use strict; +use warnings; + +use Config; +use PostgresNode; +use TestLib; +use Test::More tests => 6; +use Cwd; + +my $node = get_new_node('main'); +$node->init; +$node->start; + +my $numrows = 10000; +my @tests = + qw(disallowed_in_batch simple_batch multi_batch batch_abort + batch_insert singlerow); +$ENV{PATH} = "$ENV{PATH}:" . getcwd(); +for my $testname (@tests) +{ + $node->command_ok( + [ 'testlibpqbatch', $testname, $node->connstr('postgres'), $numrows ], + "testlibpqbatch $testname"); +} + +$node->stop('fast'); diff --git a/src/test/modules/test_libpq/testlibpqbatch.c b/src/test/modules/test_libpq/testlibpqbatch.c new file mode 100644 index 0000000000..7157e6cb90 --- /dev/null +++ b/src/test/modules/test_libpq/testlibpqbatch.c @@ -0,0 +1,963 @@ +/* + * src/test/modules/test_libpq/testlibpqbatch.c + * Verify libpq batch execution functionality + */ +#include "postgres_fe.h" + +#include + +#include "catalog/pg_type_d.h" +#include "common/fe_memutils.h" +#include "libpq-fe.h" +#include "portability/instr_time.h" + + +static void exit_nicely(PGconn *conn); + +const char *const progname = "testlibpqbatch"; + + +#define DEBUG +#ifdef DEBUG +#define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0) +#else +#define pg_debug(...) +#endif + +static const char *const drop_table_sql = +"DROP TABLE IF EXISTS batch_demo"; +static const char *const create_table_sql = +"CREATE UNLOGGED TABLE batch_demo(id serial primary key, itemno integer);"; +static const char *const insert_sql = +"INSERT INTO batch_demo(itemno) VALUES ($1);"; + +/* max char length of an int32, plus sign and null terminator */ +#define MAXINTLEN 12 + +static void +exit_nicely(PGconn *conn) +{ + PQfinish(conn); + exit(1); +} + +/* + * Print an error to stderr and terminate the program. + */ +#define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__) +static void +pg_fatal_impl(int line, const char *fmt,...) +{ + va_list args; + + fprintf(stderr, "\n"); /* XXX hack */ + fprintf(stderr, "%s:%d: ", progname, line); + + va_start(args, fmt); + vfprintf(stderr, fmt, args); + va_end(args); + printf("Failure, exiting\n"); + exit(1); +} + +static void +test_disallowed_in_batch(PGconn *conn) +{ + PGresult *res = NULL; + + fprintf(stderr, "test error cases... "); + + if (PQisnonblocking(conn)) + pg_fatal("Expected blocking connection mode\n"); + + if (PQenterBatchMode(conn) != 1) + pg_fatal("Unable to enter batch mode\n"); + + if (PQbatchStatus(conn) == PQBATCH_MODE_OFF) + pg_fatal("Batch mode not activated properly\n"); + + /* PQexec should fail in batch mode */ + res = PQexec(conn, "SELECT 1"); + if (PQresultStatus(res) != PGRES_FATAL_ERROR) + pg_fatal("PQexec should fail in batch mode but succeeded\n"); + + /* So should PQsendQuery */ + if (PQsendQuery(conn, "SELECT 1") != 0) + pg_fatal("PQsendQuery should fail in batch mode but succeeded\n"); + + /* Entering batch mode when already in batch mode is OK */ + if (PQenterBatchMode(conn) != 1) + pg_fatal("re-entering batch mode should be a no-op but failed\n"); + + if (PQisBusy(conn) != 0) + pg_fatal("PQisBusy should return 0 when idle in batch, returned 1\n"); + + /* ok, back to normal command mode */ + if (PQexitBatchMode(conn) != 1) + pg_fatal("couldn't exit idle empty batch mode\n"); + + if (PQbatchStatus(conn) != PQBATCH_MODE_OFF) + pg_fatal("Batch mode not terminated properly\n"); + + /* exiting batch mode when not in batch mode should be a no-op */ + if (PQexitBatchMode(conn) != 1) + pg_fatal("batch mode exit when not in batch mode should succeed but failed\n"); + + /* can now PQexec again */ + res = PQexec(conn, "SELECT 1"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("PQexec should succeed after exiting batch mode but failed with: %s\n", + PQerrorMessage(conn)); + + fprintf(stderr, "ok\n"); +} + +static void +test_simple_batch(PGconn *conn) +{ + PGresult *res = NULL; + const char *dummy_params[1] = {"1"}; + Oid dummy_param_oids[1] = {INT4OID}; + + fprintf(stderr, "simple batch... "); + + /* + * Enter batch mode and dispatch a set of operations, which we'll then + * process the results of as they come in. + * + * For a simple case we should be able to do this without interim + * processing of results since our out buffer will give us enough slush to + * work with and we won't block on sending. So blocking mode is fine. + */ + if (PQisnonblocking(conn)) + pg_fatal("Expected blocking connection mode\n"); + + if (PQenterBatchMode(conn) != 1) + pg_fatal("failed to enter batch mode: %s\n", PQerrorMessage(conn)); + + if (PQsendQueryParams(conn, "SELECT $1", + 1, dummy_param_oids, dummy_params, + NULL, NULL, 0) != 1) + pg_fatal("dispatching SELECT failed: %s\n", PQerrorMessage(conn)); + + if (PQexitBatchMode(conn) != 0) + pg_fatal("exiting batch mode with work in progress should fail, but succeeded\n"); + + if (PQbatchSendQueue(conn) != 1) + pg_fatal("Ending a batch failed: %s\n", PQerrorMessage(conn)); + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a batch item: %s\n", + PQerrorMessage(conn)); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("Unexpected result code %s from first batch item\n", + PQresStatus(PQresultStatus(res))); + + PQclear(res); + res = NULL; + + if (PQgetResult(conn) != NULL) + pg_fatal("PQgetResult returned something extra after first query result.\n"); + + /* + * Even though we've processed the result there's still a sync to come and + * we can't exit batch mode yet + */ + if (PQexitBatchMode(conn) != 0) + pg_fatal("exiting batch mode after query but before sync succeeded incorrectly\n"); + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when sync result PGRES_BATCH_END expected: %s\n", + PQerrorMessage(conn)); + + if (PQresultStatus(res) != PGRES_BATCH_END) + pg_fatal("Unexpected result code %s instead of sync result, error: %s\n", + PQresStatus(PQresultStatus(res)), PQerrorMessage(conn)); + + PQclear(res); + res = NULL; + + if (PQgetResult(conn) != NULL) + pg_fatal("PQgetResult returned something extra after end batch call\n"); + + /* We're still in a batch... */ + if (PQbatchStatus(conn) == PQBATCH_MODE_OFF) + pg_fatal("Fell out of batch mode somehow\n"); + + /* ... until we end it, which we can safely do now */ + if (PQexitBatchMode(conn) != 1) + pg_fatal("attempt to exit batch mode failed when it should've succeeded: %s\n", + PQerrorMessage(conn)); + + if (PQbatchStatus(conn) != PQBATCH_MODE_OFF) + pg_fatal("Exiting batch mode didn't seem to work\n"); + + fprintf(stderr, "ok\n"); +} + +static void +test_multi_batch(PGconn *conn) +{ + PGresult *res = NULL; + const char *dummy_params[1] = {"1"}; + Oid dummy_param_oids[1] = {INT4OID}; + + fprintf(stderr, "multi batch... "); + + /* + * Queue up a couple of small batches and process each without returning + * to command mode first. + */ + if (PQenterBatchMode(conn) != 1) + pg_fatal("failed to enter batch mode: %s\n", PQerrorMessage(conn)); + + if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids, + dummy_params, NULL, NULL, 0) != 1) + pg_fatal("dispatching first SELECT failed: %s\n", PQerrorMessage(conn)); + + if (PQbatchSendQueue(conn) != 1) + pg_fatal("Ending first batch failed: %s\n", PQerrorMessage(conn)); + + if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids, + dummy_params, NULL, NULL, 0) != 1) + pg_fatal("dispatching second SELECT failed: %s\n", PQerrorMessage(conn)); + + if (PQbatchSendQueue(conn) != 1) + pg_fatal("Ending second batch failed: %s\n", PQerrorMessage(conn)); + + /* OK, start processing the batch results */ + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a batch item: %s\n", + PQerrorMessage(conn)); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("Unexpected result code %s from first batch item\n", + PQresStatus(PQresultStatus(res))); + PQclear(res); + res = NULL; + + if (PQgetResult(conn) != NULL) + pg_fatal("PQgetResult returned something extra after first result\n"); + + if (PQexitBatchMode(conn) != 0) + pg_fatal("exiting batch mode after query but before sync succeeded incorrectly\n"); + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when sync result expected: %s\n", + PQerrorMessage(conn)); + + if (PQresultStatus(res) != PGRES_BATCH_END) + pg_fatal("Unexpected result code %s instead of sync result, error: %s\n", + PQresStatus(PQresultStatus(res)), PQerrorMessage(conn)); + + PQclear(res); + + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("Expected null result, got %s\n", + PQresStatus(PQresultStatus(res))); + + /* second batch */ + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a batch item: %s\n", + PQerrorMessage(conn)); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("Unexpected result code %s from second batch item\n", + PQresStatus(PQresultStatus(res))); + + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("Expected null result, got %s\n", + PQresStatus(PQresultStatus(res))); + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a batch item: %s\n", + PQerrorMessage(conn)); + + if (PQresultStatus(res) != PGRES_BATCH_END) + pg_fatal("Unexpected result code %s from second end batch\n", + PQresStatus(PQresultStatus(res))); + + /* We're still in a batch... */ + if (PQbatchStatus(conn) == PQBATCH_MODE_OFF) + pg_fatal("Fell out of batch mode somehow\n"); + + /* until we end it, which we can safely do now */ + if (PQexitBatchMode(conn) != 1) + pg_fatal("attempt to exit batch mode failed when it should've succeeded: %s\n", + PQerrorMessage(conn)); + + if (PQbatchStatus(conn) != PQBATCH_MODE_OFF) + pg_fatal("exiting batch mode didn't seem to work\n"); + + fprintf(stderr, "ok\n"); +} + +/* + * When an operation in a batch fails the rest of the batch is flushed. We + * still have to get results for each batch item, but the item will just be + * a PGRES_BATCH_ABORTED code. + * + * This intentionally doesn't use a transaction to wrap the batch. You should + * usually use an xact, but in this case we want to observe the effects of each + * statement. + */ +static void +test_batch_abort(PGconn *conn) +{ + PGresult *res = NULL; + const char *dummy_params[1] = {"1"}; + Oid dummy_param_oids[1] = {INT4OID}; + int i; + + fprintf(stderr, "aborted batch... "); + + res = PQexec(conn, drop_table_sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("dispatching DROP TABLE failed: %s\n", PQerrorMessage(conn)); + + res = PQexec(conn, create_table_sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("dispatching CREATE TABLE failed: %s\n", PQerrorMessage(conn)); + + /* + * Queue up a couple of small batches and process each without returning + * to command mode first. Make sure the second operation in the first + * batch ERRORs. + */ + if (PQenterBatchMode(conn) != 1) + pg_fatal("failed to enter batch mode: %s\n", PQerrorMessage(conn)); + + dummy_params[0] = "1"; + if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids, + dummy_params, NULL, NULL, 0) != 1) + pg_fatal("dispatching first insert failed: %s\n", PQerrorMessage(conn)); + + if (PQsendQueryParams(conn, "SELECT no_such_function($1)", + 1, dummy_param_oids, dummy_params, + NULL, NULL, 0) != 1) + pg_fatal("dispatching error select failed: %s\n", PQerrorMessage(conn)); + + dummy_params[0] = "2"; + if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids, + dummy_params, NULL, NULL, 0) != 1) + pg_fatal("dispatching second insert failed: %s\n", PQerrorMessage(conn)); + + if (PQbatchSendQueue(conn) != 1) + pg_fatal("Sending first batch failed: %s\n", PQerrorMessage(conn)); + + dummy_params[0] = "3"; + if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids, + dummy_params, NULL, NULL, 0) != 1) + pg_fatal("dispatching second-batch insert failed: %s\n", + PQerrorMessage(conn)); + + if (PQbatchSendQueue(conn) != 1) + pg_fatal("Ending second batch failed: %s\n", PQerrorMessage(conn)); + + /* + * OK, start processing the batch results. + * + * We should get a command-ok for the first query, then a fatal error and + * a batch aborted message for the second insert, a batch-end, then a + * command-ok and a batch-ok for the second batch operation. + */ + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("Unexpected result status %s: %s\n", + PQresStatus(PQresultStatus(res)), + PQresultErrorMessage(res)); + PQclear(res); + + /* NULL result to signal end-of-results for this command */ + if ((res = PQgetResult(conn)) != NULL) + pg_fatal("Expected null result, got %s\n", + PQresStatus(PQresultStatus(res))); + + /* Second query caused error, so we expect an error next */ + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("Unexpected NULL result: %s\n", PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_FATAL_ERROR) + pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s\n", + PQresStatus(PQresultStatus(res))); + PQclear(res); + + /* NULL result to signal end-of-results for this command */ + if ((res = PQgetResult(conn)) != NULL) + pg_fatal("Expected null result, got %s\n", + PQresStatus(PQresultStatus(res))); + + /* + * batch should now be aborted. + * + * Note that we could still queue more queries at this point if we wanted; + * they'd get added to a new third batch since we've already sent a + * second. The aborted flag relates only to the batch being received. + */ + if (PQbatchStatus(conn) != PQBATCH_MODE_ABORTED) + pg_fatal("batch should be flagged as aborted but isn't\n"); + + /* third query in batch, the second insert */ + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("Unexpected NULL result: %s\n", PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_BATCH_ABORTED) + pg_fatal("Unexpected result code -- expected PGRES_BATCH_ABORTED, got %s\n", + PQresStatus(PQresultStatus(res))); + PQclear(res); + + /* NULL result to signal end-of-results for this command */ + if ((res = PQgetResult(conn)) != NULL) + pg_fatal("Expected null result, got %s\n", PQresStatus(PQresultStatus(res))); + + if (PQbatchStatus(conn) != PQBATCH_MODE_ABORTED) + pg_fatal("batch should be flagged as aborted but isn't\n"); + + /* Ensure we're still in batch */ + if (PQbatchStatus(conn) == PQBATCH_MODE_OFF) + pg_fatal("Fell out of batch mode somehow\n"); + + /* + * The end of a failed batch is a PGRES_BATCH_END. + * + * (This is so clients know to start processing results normally again and + * can tell the difference between skipped commands and the sync.) + */ + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("Unexpected NULL result: %s\n", PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_BATCH_END) + pg_fatal("Unexpected result code from first batch sync\n" + "Expected PGRES_BATCH_END, got %s\n", + PQresStatus(PQresultStatus(res))); + PQclear(res); + + /* XXX why do we have a NULL result after PGRES_BATCH_END? */ + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("Expected null result, got %s\n", PQresStatus(PQresultStatus(res))); + + if (PQbatchStatus(conn) == PQBATCH_MODE_ABORTED) + pg_fatal("sync should've cleared the aborted flag but didn't\n"); + + /* We're still in a batch... */ + if (PQbatchStatus(conn) == PQBATCH_MODE_OFF) + pg_fatal("Fell out of batch mode somehow\n"); + + /* the insert from the second batch */ + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("Unexpected NULL result: %s\n", PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("Unexpected result code %s from first item in second batch\n", + PQresStatus(PQresultStatus(res))); + PQclear(res); + + /* Read the NULL result at the end of the command */ + if ((res = PQgetResult(conn)) != NULL) + pg_fatal("Expected null result, got %s\n", PQresStatus(PQresultStatus(res))); + + /* the second batch sync */ + if ((res = PQgetResult(conn)) == NULL) + pg_fatal("Unexpected NULL result: %s\n", PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_BATCH_END) + pg_fatal("Unexpected result code %s from second batch sync\n", + PQresStatus(PQresultStatus(res))); + PQclear(res); + + if ((res = PQgetResult(conn)) != NULL) + pg_fatal("Expected null result, got %s: %s\n", + PQresStatus(PQresultStatus(res)), + PQerrorMessage(conn)); + + /* We're still in a batch... */ + if (PQbatchStatus(conn) == PQBATCH_MODE_OFF) + pg_fatal("Fell out of batch mode somehow\n"); + + /* until we end it, which we can safely do now */ + if (PQexitBatchMode(conn) != 1) + pg_fatal("attempt to exit batch mode failed when it should've succeeded: %s\n", + PQerrorMessage(conn)); + + if (PQbatchStatus(conn) != PQBATCH_MODE_OFF) + pg_fatal("exiting batch mode didn't seem to work\n"); + + fprintf(stderr, "ok\n"); + + /*- + * Since we fired the batches off without a surrounding xact, the results + * should be: + * + * - Implicit xact started by server around 1st batch + * - First insert applied + * - Second statement aborted xact + * - Third insert skipped + * - Sync rolled back first implicit xact + * - Implicit xact created by server around 2nd batch + * - insert applied from 2nd batch + * - Sync commits 2nd xact + * + * So we should only have the value 3 that we inserted. + */ + res = PQexec(conn, "SELECT itemno FROM batch_demo"); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("Expected tuples, got %s: %s\n", + PQresStatus(PQresultStatus(res)), PQerrorMessage(conn)); + if (PQntuples(res) != 1) + pg_fatal("expected 1 result, got %d\n", PQntuples(res)); + for (i = 0; i < PQntuples(res); i++) + { + const char *val = PQgetvalue(res, i, 0); + + if (strcmp(val, "3") != 0) + pg_fatal("expected only insert with value 3, got %s", val); + } + + PQclear(res); +} + +/* State machine enum for test_batch_insert */ +typedef enum BatchInsertStep +{ + BI_BEGIN_TX, + BI_DROP_TABLE, + BI_CREATE_TABLE, + BI_PREPARE, + BI_INSERT_ROWS, + BI_COMMIT_TX, + BI_SYNC, + BI_DONE +} BatchInsertStep; + +static void +test_batch_insert(PGconn *conn, int n_rows) +{ + const char *insert_params[1]; + Oid insert_param_oids[1] = {INT4OID}; + char insert_param_0[MAXINTLEN]; + BatchInsertStep send_step = BI_BEGIN_TX, + recv_step = BI_BEGIN_TX; + int rows_to_send, + rows_to_receive; + + insert_params[0] = &insert_param_0[0]; + + rows_to_send = rows_to_receive = n_rows; + + /* + * Do a batched insert into a table created at the start of the batch + */ + if (PQenterBatchMode(conn) != 1) + pg_fatal("failed to enter batch mode: %s\n", PQerrorMessage(conn)); + + while (send_step != BI_PREPARE) + { + const char *sql; + + switch (send_step) + { + case BI_BEGIN_TX: + sql = "BEGIN TRANSACTION"; + send_step = BI_DROP_TABLE; + break; + + case BI_DROP_TABLE: + sql = drop_table_sql; + send_step = BI_CREATE_TABLE; + break; + + case BI_CREATE_TABLE: + sql = create_table_sql; + send_step = BI_PREPARE; + break; + + default: + pg_fatal("invalid state"); + } + + pg_debug("sending: %s\n", sql); + if (PQsendQueryParams(conn, sql, + 0, NULL, NULL, NULL, NULL, 0) != 1) + pg_fatal("dispatching %s failed: %s\n", sql, PQerrorMessage(conn)); + } + + Assert(send_step == BI_PREPARE); + pg_debug("sending: %s\n", insert_sql); + if (PQsendPrepare(conn, "my_insert", insert_sql, 1, insert_param_oids) != 1) + pg_fatal("dispatching PREPARE failed: %s\n", PQerrorMessage(conn)); + send_step = BI_INSERT_ROWS; + + /* + * Now we start inserting. We'll be sending enough data that we could fill + * our out buffer, so to avoid deadlocking we need to enter nonblocking + * mode and consume input while we send more output. As results of each + * query are processed we should pop them to allow processing of the next + * query. There's no need to finish the batch before processing results. + */ + if (PQsetnonblocking(conn, 1) != 0) + pg_fatal("failed to set nonblocking mode: %s\n", PQerrorMessage(conn)); + + while (recv_step != BI_DONE) + { + int sock; + fd_set input_mask; + fd_set output_mask; + + sock = PQsocket(conn); + + if (sock < 0) + break; /* shouldn't happen */ + + FD_ZERO(&input_mask); + FD_SET(sock, &input_mask); + FD_ZERO(&output_mask); + FD_SET(sock, &output_mask); + + if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0) + { + fprintf(stderr, "select() failed: %s\n", strerror(errno)); + exit_nicely(conn); + } + + /* + * Process any results, so we keep the server's out buffer free + * flowing and it can continue to process input + */ + if (FD_ISSET(sock, &input_mask)) + { + PQconsumeInput(conn); + + /* Read until we'd block if we tried to read */ + while (!PQisBusy(conn) && recv_step < BI_DONE) + { + PGresult *res; + const char *cmdtag; + const char *description = ""; + int status; + + /* + * Read next result. If no more results from this query, + * advance to the next query + */ + res = PQgetResult(conn); + if (res == NULL) + continue; + + status = PGRES_COMMAND_OK; + switch (recv_step) + { + case BI_BEGIN_TX: + cmdtag = "BEGIN"; + recv_step++; + break; + case BI_DROP_TABLE: + cmdtag = "DROP TABLE"; + recv_step++; + break; + case BI_CREATE_TABLE: + cmdtag = "CREATE TABLE"; + recv_step++; + break; + case BI_PREPARE: + cmdtag = ""; + description = "PREPARE"; + recv_step++; + break; + case BI_INSERT_ROWS: + cmdtag = "INSERT"; + rows_to_receive--; + if (rows_to_receive == 0) + recv_step++; + break; + case BI_COMMIT_TX: + cmdtag = "COMMIT"; + recv_step++; + break; + case BI_SYNC: + cmdtag = ""; + description = "SYNC"; + status = PGRES_BATCH_END; + recv_step++; + break; + case BI_DONE: + /* unreachable */ + description = ""; + abort(); + } + + if (PQresultStatus(res) != status) + pg_fatal("%s reported status %s, expected %s\n" + "Error message: \"%s\"\n", + description, PQresStatus(PQresultStatus(res)), + PQresStatus(status), PQerrorMessage(conn)); + + if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0) + pg_fatal("%s expected command tag '%s', got '%s'\n", + description, cmdtag, PQcmdStatus(res)); + + pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description); + + PQclear(res); + } + } + + /* Write more rows and/or the end batch message, if needed */ + if (FD_ISSET(sock, &output_mask)) + { + PQflush(conn); + + if (send_step == BI_INSERT_ROWS) + { + snprintf(&insert_param_0[0], MAXINTLEN, "%d", rows_to_send); + + if (PQsendQueryPrepared(conn, "my_insert", + 1, insert_params, NULL, NULL, 0) == 1) + { + pg_debug("sent row %d\n", rows_to_send); + + rows_to_send--; + if (rows_to_send == 0) + send_step = BI_COMMIT_TX; + } + else + { + /* + * in nonblocking mode, so it's OK for an insert to fail + * to send + */ + fprintf(stderr, "WARNING: failed to send insert #%d: %s\n", + rows_to_send, PQerrorMessage(conn)); + } + } + else if (send_step == BI_COMMIT_TX) + { + if (PQsendQueryParams(conn, "COMMIT", + 0, NULL, NULL, NULL, NULL, 0) == 1) + { + pg_debug("sent COMMIT\n"); + send_step = BI_SYNC; + } + else + { + fprintf(stderr, "WARNING: failed to send commit: %s\n", + PQerrorMessage(conn)); + } + } + else if (send_step == BI_SYNC) + { + if (PQbatchSendQueue(conn) == 1) + { + fprintf(stdout, "Dispatched end batch message\n"); + send_step = BI_DONE; + } + else + { + fprintf(stderr, "WARNING: Ending a batch failed: %s\n", + PQerrorMessage(conn)); + } + } + } + } + + /* We've got the sync message and the batch should be done */ + if (PQexitBatchMode(conn) != 1) + pg_fatal("attempt to exit batch mode failed when it should've succeeded: %s\n", + PQerrorMessage(conn)); + + if (PQsetnonblocking(conn, 0) != 0) + pg_fatal("failed to clear nonblocking mode: %s\n", PQerrorMessage(conn)); +} + +static void +test_singlerowmode(PGconn *conn) +{ + PGresult *res; + int i; + bool batch_ended = false; + + /* 1 batch, 3 queries in it */ + if (PQenterBatchMode(conn) != 1) + pg_fatal("failed to enter batch mode: %s\n", + PQerrorMessage(conn)); + + for (i = 0; i < 3; i++) + { + char *param[1]; + + param[0] = psprintf("%d", 44 + i); + + if (PQsendQueryParams(conn, + "SELECT generate_series(42, $1)", + 1, + NULL, + (const char **) param, + NULL, + NULL, + 0) != 1) + pg_fatal("failed to send query: %s\n", + PQerrorMessage(conn)); + pfree(param[0]); + } + PQbatchSendQueue(conn); + + for (i = 0; !batch_ended; i++) + { + bool first = true; + bool saw_ending_tuplesok; + bool isSingleTuple = false; + + /* Set single row mode for only first 2 SELECT queries */ + if (i < 2) + { + if (PQsetSingleRowMode(conn) != 1) + pg_fatal("PQsetSingleRowMode() failed for i=%d\n", i); + } + + /* Consume rows for this query */ + saw_ending_tuplesok = false; + while ((res = PQgetResult(conn)) != NULL) + { + ExecStatusType est = PQresultStatus(res); + + if (est == PGRES_BATCH_END) + { + fprintf(stderr, "end of batch reached\n"); + batch_ended = true; + PQclear(res); + if (i != 3) + pg_fatal("Expected three results, got %d\n", i); + break; + } + + /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */ + if (first) + { + if (i <= 1 && est != PGRES_SINGLE_TUPLE) + pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s\n", + i, PQresStatus(est)); + if (i >= 2 && est != PGRES_TUPLES_OK) + pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s\n", + i, PQresStatus(est)); + first = false; + } + + fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i); + switch (est) + { + case PGRES_TUPLES_OK: + fprintf(stderr, ", tuples: %d\n", PQntuples(res)); + saw_ending_tuplesok = true; + if (isSingleTuple) + { + if (PQntuples(res) == 0) + fprintf(stderr, "all tuples received in query %d\n", i); + else + pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, " + "but received PGRES_TUPLES_OK directly instead\n"); + } + break; + + case PGRES_SINGLE_TUPLE: + fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0)); + break; + + default: + pg_fatal("unexpected\n"); + } + PQclear(res); + } + if (!batch_ended && !saw_ending_tuplesok) + pg_fatal("didn't get expected terminating TUPLES_OK\n"); + } + + if (PQexitBatchMode(conn) != 1) + pg_fatal("failed to end batch mode: %s\n", PQerrorMessage(conn)); +} + +static void +usage(const char *progname) +{ + fprintf(stderr, "%s tests libpq's batch-mode.\n\n", progname); + fprintf(stderr, "Usage:\n"); + fprintf(stderr, " %s testname [conninfo [number_of_rows]]\n", progname); + fprintf(stderr, "Tests:\n"); + fprintf(stderr, " disallowed_in_batch|simple_batch|multi_batch|batch_abort|\n"); + fprintf(stderr, " singlerow|batch_insert\n"); +} + +int +main(int argc, char **argv) +{ + const char *conninfo = ""; + PGconn *conn; + int numrows = 10000; + + /* + * The testname parameter is mandatory; it can be followed by a conninfo + * string and number of rows. + */ + if (argc < 2 || argc > 4) + { + usage(argv[0]); + exit(1); + } + + if (argc >= 3) + conninfo = pg_strdup(argv[2]); + + if (argc >= 4) + { + errno = 0; + numrows = strtol(argv[3], NULL, 10); + if (errno != 0 || numrows <= 0) + { + fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n", argv[3]); + exit(1); + } + } + + /* Make a connection to the database */ + conn = PQconnectdb(conninfo); + if (PQstatus(conn) != CONNECTION_OK) + { + fprintf(stderr, "Connection to database failed: %s\n", + PQerrorMessage(conn)); + exit_nicely(conn); + } + + if (strcmp(argv[1], "disallowed_in_batch") == 0) + test_disallowed_in_batch(conn); + else if (strcmp(argv[1], "simple_batch") == 0) + test_simple_batch(conn); + else if (strcmp(argv[1], "multi_batch") == 0) + test_multi_batch(conn); + else if (strcmp(argv[1], "batch_abort") == 0) + test_batch_abort(conn); + else if (strcmp(argv[1], "batch_insert") == 0) + test_batch_insert(conn, numrows); + else if (strcmp(argv[1], "singlerow") == 0) + test_singlerowmode(conn); + else + { + fprintf(stderr, "\"%s\" is not a recognized test name\n", argv[1]); + usage(argv[0]); + exit(1); + } + + /* close the connection to the database and cleanup */ + PQfinish(conn); + return 0; +} diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm index 90594bd41b..634e48ec85 100644 --- a/src/tools/msvc/Mkvcbuild.pm +++ b/src/tools/msvc/Mkvcbuild.pm @@ -50,7 +50,8 @@ my @contrib_excludes = ( 'pgcrypto', 'sepgsql', 'brin', 'test_extensions', 'test_misc', 'test_pg_dump', - 'snapshot_too_old', 'unsafe_tests'); + 'snapshot_too_old', 'unsafe_tests', + 'test_libpq'); # Set of variables for frontend modules my $frontend_defines = { 'initdb' => 'FRONTEND' }; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 03c4e0fe5b..9b75db962b 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -215,6 +215,7 @@ BackgroundWorkerHandle BackgroundWorkerSlot Barrier BaseBackupCmd +BatchInsertStep BeginDirectModify_function BeginForeignInsert_function BeginForeignModify_function @@ -1544,6 +1545,7 @@ PG_Locale_Strategy PG_Lock_Status PG_init_t PGcancel +PGcommandQueueEntry PGconn PGdataValue PGlobjfuncs @@ -1656,6 +1658,7 @@ PMSignalReason PMState POLYGON PQArgBlock +PQBatchStatus PQEnvironmentOption PQExpBuffer PQExpBufferData