diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index c3bd4f9..366f278 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -4656,6 +4656,526 @@ int PQflush(PGconn *conn); + + Batch mode and query pipelining + + + libpq + batch mode + + + + libpq + pipelining + + + + libpq supports queueing up queries into + a pipeline to be executed as a batch on the server. Batching queries allows + applications to avoid a client/server round-trip after each query to get + the results before issuing the next query. + + + + An example of batch use may be found in the source distribution in + src/test/modules/test_libpq/testlibpqbatch.c. + + + + When to use batching + + + Much like asynchronous query mode, there is no performance disadvantage to + using batching and pipelining. It increases client application complexity + and extra caution is required to prevent client/server deadlocks but + can sometimes offer considerable performance improvements. + + + + Batching is most useful when the server is distant, i.e. network latency + (ping time) is high, and when many small operations are being performed in + rapid sequence. There is usually less benefit in using batches when each + query takes many multiples of the client/server round-trip time to execute. + A 100-statement operation run on a server 300ms round-trip-time away would take + 30 seconds in network latency alone without batching; with batching it may spend + as little as 0.3s waiting for results from the server. + + + + Use batches when your application does lots of small + INSERT, UPDATE and + DELETE operations that can't easily be transformed into + operations on sets or into a + COPY operation. + + + + Batching is less useful when information from one operation is required by the + client before it knows enough to send the next operation. The client must + introduce a synchronisation point and wait for a full client/server + round-trip to get the results it needs. However, it's often possible to + adjust the client design to exchange the required information server-side. + Read-modify-write cycles are especially good candidates; for example: + + BEGIN; + SELECT x FROM mytable WHERE id = 42 FOR UPDATE; + -- result: x=2 + -- client adds 1 to x: + UPDATE mytable SET x = 3 WHERE id = 42; + COMMIT; + + could be much more efficiently done with: + + UPDATE mytable SET x = x + 1 WHERE id = 42; + + + + + + The batch API was introduced in PostgreSQL 10.0, but clients using PostgresSQL 10.0 version of libpq can + use batches on server versions 8.4 and newer. Batching works on any server + that supports the v3 extended query protocol. + + + + + + + Using batch mode + + + To issue batches the application must switch + a connection into batch mode. Enter batch mode with PQenterBatchMode(conn) or test + whether batch mode is active with PQbatchStatus(conn). In batch mode only asynchronous operations are permitted, and + COPY is not recommended as it most likely will trigger failure in batch processing. + Using any synchronous command execution functions such as PQfn, + PQexec or one of its sibling functions are error conditions. + Functions allowed in batch mode are described in . + + + + The client uses libpq's asynchronous query functions to dispatch work, + marking the end of each batch with PQbatchSyncQueue. + And to get results, it uses PQgetResult and + PQbatchProcessQueue. It may eventually exit + batch mode with PQexitBatchMode once all results are + processed. + + + + + It is best to use batch mode with libpq in + non-blocking mode. If used in + blocking mode it is possible for a client/server deadlock to occur. The + client will block trying to send queries to the server, but the server will + block trying to send results from queries it has already processed to the + client. This only occurs when the client sends enough queries to fill its + output buffer and the server's receive buffer before switching to + processing input from the server, but it's hard to predict exactly when + that'll happen so it's best to always use non-blocking mode. + + + + + Issuing queries + + + After entering batch mode the application dispatches requests + using normal asynchronous libpq functions such as + PQsendQueryParams, PQsendPrepare, + PQsendQueryPrepared, PQsendDescribePortal, + PQsendDescribePrepared. + The asynchronous requests are followed by a PQbatchSyncQueue(conn) call to mark + the end of the batch. The client does not need to call + PQgetResult immediately after dispatching each + operation. Result processing + is handled separately. + + + + Batched operations will be executed by the server in the order the client + sends them. The server will send the results in the order the statements + executed. The server may begin executing the batch before all commands + in the batch are queued and the end of batch command is sent. If any + statement encounters an error the server aborts the current transaction and + skips processing the rest of the batch. Query processing resumes after the + end of the failed batch. + + + + It's fine for one operation to depend on the results of a + prior one. One query may define a table that the next query in the same + batch uses; similarly, an application may create a named prepared statement + then execute it with later statements in the same batch. + + + + + + Processing results + + + The client interleaves result + processing with sending batch queries, or for small batches may + process all results after sending the whole batch. + + + + To get the result of the first batch entry the client must call PQbatchProcessQueue. It must then call + PQgetResult and handle the results until + PQgetResult returns null. The result from the next batch entry + may then be retrieved using PQbatchProcessQueue and the cycle repeated. The + application handles individual statement results as normal. + + + + To enter single-row mode, call PQsetSingleRowMode immediately after a + successful call of PQbatchProcessQueue. This mode selection is effective + only for the query currently being processed. For more information on the use of PQsetSingleRowMode + , refer to . + + + + + PQgetResult behaves the same as for normal asynchronous + processing except that it may contain the new PGresult types + PGRES_BATCH_END and PGRES_BATCH_ABORTED. + PGRES_BATCH_END is reported exactly once for each + PQbatchSyncQueue call at the corresponding point in + the result stream and at no other time. PGRES_BATCH_ABORTED + is emitted during error handling; see + error handling. + + + + PQisBusy, PQconsumeInput, etc + operate as normal when processing batch results. + + + + libpq does not provide any information to the + application about the query currently being processed. The application + must keep track of the order in which it sent queries and the expected + results. Applications will typically use a state machine or a FIFO queue + for this. + + + + + + Error handling + + + When a query in a batch causes an ERROR the server + skips processing all subsequent messages until the end-of-batch message. + The open transaction is aborted. + + + + From the client perspective, after the client gets a + PGRES_FATAL_ERROR return from + PQresultStatus the batch is flagged as aborted. + libpq will report + PGRES_BATCH_ABORTED result for each remaining queued + operation in an aborted batch. The result for + PQbatchSyncQueue is reported as + PGRES_BATCH_END to signal the end of the aborted batch + and resumption of normal result processing. + + + + The client must process results with + PQbatchProcessQueue(...) and + PQgetResult during error recovery. + + + + If the batch used an implicit transaction then operations that have + already executed are rolled back and operations that were queued for after + the failed operation are skipped entirely. The same behaviour holds if the + batch starts and commits a single explicit transaction (i.e. the first + statement is BEGIN and the last is + COMMIT) except that the session remains in an aborted + transaction state at the end of the batch. If a batch contains + multiple explicit transactions, all transactions that committed + prior to the error remain committed, the currently in-progress transaction + is aborted and all subsequent operations in the current and all later + transactions in the same batch are skipped completely. + + + + + The client must not assume that work is committed when it + sends a COMMIT, only when the + corresponding result is received to confirm the commit is complete. + Because errors arrive asynchronously the application needs to be able to + restart from the last received committed change and + resend work done after that point if something goes wrong. + + + + + + + Interleaving result processing and query dispatch + + + To avoid deadlocks on large batches the client should be structured around + a nonblocking I/O loop using a function like select, + poll, epoll, + WaitForMultipleObjectEx, etc. + + + + The client application should generally maintain a queue of work still to + be dispatched and a queue of work that has been dispatched but not yet had + its results processed. When the socket is writable it should dispatch more + work. When the socket is readable it should read results and process them, + matching them up to the next entry in its expected results queue. Batches + should be scoped to logical units of work, usually (but not always) one + transaction per batch. There's no need to exit batch mode and re-enter it + between batches or to wait for one batch to finish before sending the next. + + + + An example using select() and a simple state machine + to track sent and received work is in + src/test/modules/test_libpq/testlibpqbatch.c in the PostgreSQL + source distribution. + + + + + + Ending batch mode + + + Once all dispatched commands have had their results processed and the end batch + result has been consumed the application may return to non-batched mode with + PQexitBatchMode(conn). + + + + + + + Functions associated with batch mode + + + + + + PQbatchStatus + + PQbatchStatus + + + + + + Returns current batch mode status of the libpq connection. + +int PQbatchStatus(PGconn *conn); + + + + + + PQBATCH_MODE_ON + + + + + Returns PQBATCH_MODE_ON if libpq connection is in batch mode. + + + + + + + PQBATCH_MODE_OFF + + + + + Returns PQBATCH_MODE_OFF if libpq connection is not in batch mode. + + + + + + + PQBATCH_MODE_ABORTED + + + + Returns PQBATCH_MODE_ABORTED if libpq connection is in + aborted status. The aborted flag is cleared as soon as the result of the + PQbatchSyncQueue at the end of the aborted batch is + processed. Clients don't usually need this function to verify aborted status + as they can tell that the batch is aborted from PGRES_BATCH_ABORTED + result codes. + + + + + + + + + + + + PQenterBatchMode + + PQenterBatchMode + + + + + + Causes a connection to enter batch mode if it is currently idle or + already in batch mode. + + +int PQenterBatchMode(PGconn *conn); + + + + + Returns 1 for success. Returns 0 and has no + effect if the connection is not currently idle, i.e. it has a result + ready, is waiting for more input from the server, etc. This function + does not actually send anything to the server, it just changes the + libpq connection state. + + + + + + + + PQexitBatchMode + + PQexitBatchMode + + + + + + Causes a connection to exit batch mode if it is currently in batch mode + with an empty queue and no pending results. + +int PQexitBatchMode(PGconn *conn); + + + Returns 1 for success. + Returns 1 and takes no action if not in batch mode. If the connection has + pending batch items in the queue for reading with + PQbatchProcessQueue, the current statement isn't finished + processing or there are results pending for collection with + PQgetResult, returns 0 and does nothing. + + + + + + + + PQbatchSyncQueue + + PQbatchSyncQueue + + + + + + Delimits the end of a set of a batched commands by sending a sync message and flushing + the send buffer. The end of a batch serves as + the delimiter of an implicit transaction and + an error recovery point; see + error handling. + + +int PQbatchSyncQueue(PGconn *conn); + + + Returns 1 for success. Returns 0 if the connection is not in batch mode + or sending a sync message is failed. + + + + + + + + PQbatchProcessQueue + + PQbatchProcessQueue + + + + + + Causes the connection to start processing the next queued query's + results. + + + +int PQbatchProcessQueue(PGconn *conn); + + + + Returns 1 if a new query was popped from the result queue + for processing. Returns 0 and has no effect if there are no query results + pending, batch mode is not enabled, or if the query currently processed + is incomplete or still has pending results. Reason for these failures can + be verified with PQbatchQueueCount, PQbatchStatus + and PQgetResult. + See processing results. + + + + + + + + PQbatchQueueCount + + PQbatchQueueCount + + + + + + Returns the number of queries still in the queue for this batch, not + including any query that's currently having results being processed. + This is the number of times PQbatchProcessQueue has to be + called before the query queue is empty again. + + +int PQbatchQueueCount(PGconn *conn); + + + + + + + + + + + + + Retrieving Query Results Row-By-Row @@ -4696,6 +5216,14 @@ int PQflush(PGconn *conn); Each object should be freed with PQclear as usual. + + + On using batch mode, call PQsetSingleRowMode + immediately after a successful call of PQbatchProcessQueue + See for more information. + + + diff --git a/doc/src/sgml/lobj.sgml b/doc/src/sgml/lobj.sgml index 7757e1e..db8523d 100644 --- a/doc/src/sgml/lobj.sgml +++ b/doc/src/sgml/lobj.sgml @@ -130,6 +130,10 @@ libpq library. + + Client applications cannot use these functions while libpq connection is in batch mode. + + Creating a Large Object diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index f6fa0e4..d4ee576 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -936,6 +936,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query, walres->status = WALRCV_ERROR; walres->err = pchomp(PQerrorMessage(conn->streamConn)); break; + default: + /* This is just to keep compiler quiet */ + break; } PQclear(pgres); diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index d6a38d0..49871f5 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -172,3 +172,9 @@ PQsslAttribute 169 PQsetErrorContextVisibility 170 PQresultVerboseErrorMessage 171 PQencryptPasswordConn 172 +PQbatchQueueCount 173 +PQenterBatchMode 174 +PQexitBatchMode 175 +PQbatchSyncQueue 176 +PQbatchProcessQueue 177 +PQbatchStatus 178 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 02ec8f0..a92f4bc 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -3483,6 +3483,25 @@ sendTerminateConn(PGconn *conn) } /* + * PQfreeCommandQueue + * Free all the entries of PGcommandQueueEntry queue passed. + */ +static void +PQfreeCommandQueue(PGcommandQueueEntry *queue) +{ + + while (queue != NULL) + { + PGcommandQueueEntry *prev = queue; + + queue = queue->next; + if (prev->query) + free(prev->query); + free(prev); + } +} + +/* * closePGconn * - properly close a connection to the backend * @@ -3494,6 +3513,7 @@ static void closePGconn(PGconn *conn) { PGnotify *notify; + PGcommandQueueEntry *queue; pgParameterStatus *pstatus; sendTerminateConn(conn); @@ -3526,6 +3546,14 @@ closePGconn(PGconn *conn) free(prev); } conn->notifyHead = conn->notifyTail = NULL; + queue = conn->cmd_queue_head; + PQfreeCommandQueue(queue); + conn->cmd_queue_head = conn->cmd_queue_tail = NULL; + + queue = conn->cmd_queue_recycle; + PQfreeCommandQueue(queue); + + conn->cmd_queue_recycle = NULL; pstatus = conn->pstatus; while (pstatus != NULL) { diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 9decd53..2a3928b 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -39,7 +39,9 @@ char *const pgresStatus[] = { "PGRES_NONFATAL_ERROR", "PGRES_FATAL_ERROR", "PGRES_COPY_BOTH", - "PGRES_SINGLE_TUPLE" + "PGRES_SINGLE_TUPLE", + "PGRES_BATCH_END", + "PGRES_BATCH_ABORTED" }; /* @@ -69,6 +71,9 @@ static PGresult *PQexecFinish(PGconn *conn); static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target); static int check_field_number(const PGresult *res, int field_num); +static PGcommandQueueEntry *PQmakePipelinedCommand(PGconn *conn); +static void PQappendPipelinedCommand(PGconn *conn, PGcommandQueueEntry * entry); +static void PQrecyclePipelinedCommand(PGconn *conn, PGcommandQueueEntry * entry); /* ---------------- @@ -1108,7 +1113,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) conn->next_result = conn->result; conn->result = res; /* And mark the result ready to return */ - conn->asyncStatus = PGASYNC_READY; + conn->asyncStatus = PGASYNC_READY_MORE; } return 1; @@ -1131,6 +1136,13 @@ fail: int PQsendQuery(PGconn *conn, const char *query) { + if (conn->batch_status != PQBATCH_MODE_OFF) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("cannot PQsendQuery in batch mode, use PQsendQueryParams\n")); + return false; + } + if (!PQsendQueryStart(conn)) return 0; @@ -1229,6 +1241,10 @@ PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes) { + PGcommandQueueEntry *pipeCmd = NULL; + char **last_query; + PGQueryClass *queryclass; + if (!PQsendQueryStart(conn)) return 0; @@ -1287,18 +1303,34 @@ PQsendPrepare(PGconn *conn, goto sendFailed; /* construct the Sync message */ - if (pqPutMsgStart('S', false, conn) < 0 || - pqPutMsgEnd(conn) < 0) - goto sendFailed; + if (conn->batch_status == PQBATCH_MODE_OFF) + { + if (pqPutMsgStart('S', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + + last_query = &conn->last_query; + queryclass = &conn->queryclass; + } + else + { + pipeCmd = PQmakePipelinedCommand(conn); + + if (pipeCmd == NULL) + return 0; /* error msg already set */ + + last_query = &pipeCmd->query; + queryclass = &pipeCmd->queryclass; + } /* remember we are doing just a Parse */ - conn->queryclass = PGQUERY_PREPARE; + *queryclass = PGQUERY_PREPARE; /* and remember the query text too, if possible */ /* if insufficient memory, last_query just winds up NULL */ - if (conn->last_query) - free(conn->last_query); - conn->last_query = strdup(query); + if (*last_query) + free(*last_query); + *last_query = strdup(query); /* * Give the data a push. In nonblock mode, don't complain if we're unable @@ -1308,10 +1340,14 @@ PQsendPrepare(PGconn *conn, goto sendFailed; /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + if (conn->batch_status != PQBATCH_MODE_OFF) + PQappendPipelinedCommand(conn, pipeCmd); + else + conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: + PQrecyclePipelinedCommand(conn, pipeCmd); pqHandleSendFailure(conn); return 0; } @@ -1359,7 +1395,80 @@ PQsendQueryPrepared(PGconn *conn, } /* - * Common startup code for PQsendQuery and sibling routines + * PQmakePipelinedCommand + * Get a new command queue entry, allocating it if required. Doesn't add it to + * the tail of the queue yet, use PQappendPipelinedCommand once the command has + * been written for that. If a command fails once it's called this, it should + * use PQrecyclePipelinedCommand to put it on the freelist or release it. + * + * If allocation fails sets the error message and returns null. + */ +static PGcommandQueueEntry * +PQmakePipelinedCommand(PGconn *conn) +{ + PGcommandQueueEntry *entry; + + if (conn->cmd_queue_recycle == NULL) + { + entry = (PGcommandQueueEntry *) malloc(sizeof(PGcommandQueueEntry)); + if (entry == NULL) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("out of memory\n")); + return NULL; + } + } + else + { + entry = conn->cmd_queue_recycle; + conn->cmd_queue_recycle = entry->next; + } + entry->next = NULL; + entry->query = NULL; + + return entry; +} + +/* + * PQappendPipelinedCommand + * Append a precreated command queue entry to the queue after it's been + * sent successfully. + */ +static void +PQappendPipelinedCommand(PGconn *conn, PGcommandQueueEntry * entry) +{ + if (conn->cmd_queue_head == NULL) + conn->cmd_queue_head = entry; + else + conn->cmd_queue_tail->next = entry; + conn->cmd_queue_tail = entry; +} + +/* + * PQrecyclePipelinedCommand + * Push a command queue entry onto the freelist. It must be a dangling entry + * with null next pointer and not referenced by any other entry's next pointer. + */ +static void +PQrecyclePipelinedCommand(PGconn *conn, PGcommandQueueEntry * entry) +{ + if (entry == NULL) + return; + if (entry->next != NULL) + { + fprintf(stderr, libpq_gettext("tried to recycle non-dangling command queue entry")); + abort(); + } + if (entry->query) + free(entry->query); + + entry->next = conn->cmd_queue_recycle; + conn->cmd_queue_recycle = entry; +} + +/* + * PQsendQueryStart + * Common startup code for PQsendQuery and sibling routines */ static bool PQsendQueryStart(PGconn *conn) @@ -1377,20 +1486,60 @@ PQsendQueryStart(PGconn *conn) libpq_gettext("no connection to the server\n")); return false; } - /* Can't send while already busy, either. */ - if (conn->asyncStatus != PGASYNC_IDLE) + + /* Can't send while already busy, either, unless enqueuing for later */ + if (conn->asyncStatus != PGASYNC_IDLE && conn->batch_status == PQBATCH_MODE_OFF) { printfPQExpBuffer(&conn->errorMessage, libpq_gettext("another command is already in progress\n")); return false; } - /* initialize async result-accumulation state */ - pqClearAsyncResult(conn); + if (conn->batch_status != PQBATCH_MODE_OFF) + { + /* + * When enqueuing a message we don't change much of the connection + * state since it's already in use for the current command. The + * connection state will get updated when PQbatchQueueProcess(...) + * advances to start processing the queued message. + * + * Just make sure we can safely enqueue given the current connection + * state. We can enqueue behind another queue item, or behind a + * non-queue command (one that sends its own sync), but we can't + * enqueue if the connection is in a copy state. + */ + switch (conn->asyncStatus) + { + case PGASYNC_QUEUED: + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + /* ok to queue */ + break; + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("cannot queue commands during COPY\n")); + return false; + break; + case PGASYNC_IDLE: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext_noop("internal error, idle state in batch mode")); + break; + } + } + else + { + /* This command's results will come in immediately. + * Initialize async result-accumulation state + */ + pqClearAsyncResult(conn); - /* reset single-row processing mode */ - conn->singleRowMode = false; + /* reset single-row processing mode */ + conn->singleRowMode = false; + } /* ready to send command message */ return true; } @@ -1414,6 +1563,10 @@ PQsendQueryGuts(PGconn *conn, int resultFormat) { int i; + PGcommandQueueEntry *pipeCmd = NULL; + char **last_query; + PGQueryClass *queryclass; + /* This isn't gonna work on a 2.0 server */ if (PG_PROTOCOL_MAJOR(conn->pversion) < 3) @@ -1423,6 +1576,23 @@ PQsendQueryGuts(PGconn *conn, return 0; } + if (conn->batch_status != PQBATCH_MODE_OFF) + { + pipeCmd = PQmakePipelinedCommand(conn); + + if (pipeCmd == NULL) + return 0; /* error msg already set */ + + last_query = &pipeCmd->query; + queryclass = &pipeCmd->queryclass; + } + else + { + last_query = &conn->last_query; + queryclass = &conn->queryclass; + } + + /* * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync, * using specified statement name and the unnamed portal. @@ -1535,22 +1705,25 @@ PQsendQueryGuts(PGconn *conn, pqPutMsgEnd(conn) < 0) goto sendFailed; - /* construct the Sync message */ - if (pqPutMsgStart('S', false, conn) < 0 || - pqPutMsgEnd(conn) < 0) - goto sendFailed; + if (conn->batch_status == PQBATCH_MODE_OFF) + { + /* construct the Sync message */ + if (pqPutMsgStart('S', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + } /* remember we are using extended query protocol */ - conn->queryclass = PGQUERY_EXTENDED; + *queryclass = PGQUERY_EXTENDED; /* and remember the query text too, if possible */ /* if insufficient memory, last_query just winds up NULL */ - if (conn->last_query) - free(conn->last_query); + if (*last_query) + free(*last_query); if (command) - conn->last_query = strdup(command); + *last_query = strdup(command); else - conn->last_query = NULL; + *last_query = NULL; /* * Give the data a push. In nonblock mode, don't complain if we're unable @@ -1560,10 +1733,14 @@ PQsendQueryGuts(PGconn *conn, goto sendFailed; /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + if (conn->batch_status != PQBATCH_MODE_OFF) + PQappendPipelinedCommand(conn, pipeCmd); + else + conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: + PQrecyclePipelinedCommand(conn, pipeCmd); pqHandleSendFailure(conn); return 0; } @@ -1690,6 +1867,302 @@ PQisBusy(PGconn *conn) return conn->asyncStatus == PGASYNC_BUSY; } +/* + * PQbatchQueueCount + * Return number of queries currently pending in batch mode + */ +int +PQbatchQueueCount(PGconn *conn) +{ + int count = 0; + PGcommandQueueEntry *entry; + + if (PQbatchStatus(conn) == PQBATCH_MODE_OFF) + return 0; + + entry = conn->cmd_queue_head; + while (entry != NULL) + { + ++count; + entry = entry->next; + } + return count; +} + +/* + * PQbatchStatus + * Returns current batch mode status + */ +int +PQbatchStatus(PGconn *conn) +{ + if (!conn) + return FALSE; + + return conn->batch_status; +} + +/* + * PQbatchBegin + * Put an idle connection in batch mode. Commands submitted after this + * can be pipelined on the connection, there's no requirement to wait for + * one to finish before the next is dispatched. + * + * Queuing of new query or syncing during COPY is not allowed. + * + * A set of commands is terminated by a PQbatchQueueSync. Multiple sets of batched + * commands may be sent while in batch mode. Batch mode can be exited by + * calling PQbatchEnd() once all results are processed. + * + * This doesn't actually send anything on the wire, it just puts libpq + * into a state where it can pipeline work. + */ +int +PQenterBatchMode(PGconn *conn) +{ + if (!conn) + return false; + + if (conn->batch_status != PQBATCH_MODE_OFF) + return true; + + if (conn->asyncStatus != PGASYNC_IDLE) + return false; + + conn->batch_status = PQBATCH_MODE_ON; + conn->asyncStatus = PGASYNC_QUEUED; + + return true; +} + +/* + * PQbatchEnd + * End batch mode and return to normal command mode. + * + * Has no effect unless the client has processed all results + * from all outstanding batches and the connection is idle. + * + * Returns true if batch mode ended. + */ +int +PQexitBatchMode(PGconn *conn) +{ + if (!conn) + return false; + + if (conn->batch_status == PQBATCH_MODE_OFF) + return true; + + switch (conn->asyncStatus) + { + case PGASYNC_IDLE: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext_noop("internal error, IDLE in batch mode")); + break; + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext_noop("internal error, COPY in batch mode")); + break; + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + /* can't end batch while busy */ + return false; + case PGASYNC_QUEUED: + break; + } + + /* still work to process */ + if (conn->cmd_queue_head != NULL) + return false; + + conn->batch_status = PQBATCH_MODE_OFF; + conn->asyncStatus = PGASYNC_IDLE; + + return true; +} + +/* + * PQbatchQueueSync + * End a batch submission by sending a protocol sync. The connection will + * remain in batch mode and unavailable for new non-batch commands until all + * results from the batch are processed by the client. + * + * It's legal to start submitting another batch immediately, without waiting + * for the results of the current batch. There's no need to end batch mode + * and start it again. + * + * If a command in a batch fails, every subsequent command up to and including + * the PQbatchQueueSync command result gets set to PGRES_BATCH_ABORTED state. If the + * whole batch is processed without error, a PGresult with PGRES_BATCH_END is + * produced. + */ +int +PQbatchSyncQueue(PGconn *conn) +{ + PGcommandQueueEntry *entry; + + if (!conn) + return false; + + if (conn->batch_status == PQBATCH_MODE_OFF) + return false; + + switch (conn->asyncStatus) + { + case PGASYNC_IDLE: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext_noop("internal error, IDLE in batch mode")); + break; + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext_noop("internal error, COPY in batch mode")); + break; + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + case PGASYNC_QUEUED: + /* can send sync to end this batch of cmds */ + break; + } + + entry = PQmakePipelinedCommand(conn); + if (entry == NULL) + return false; /* error msg already set */ + + entry->queryclass = PGQUERY_SYNC; + entry->query = NULL; + + /* construct the Sync message */ + if (pqPutMsgStart('S', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + + PQappendPipelinedCommand(conn, entry); + + /* + * Give the data a push. In nonblock mode, don't complain if we're unable + * to send it all; PQgetResult() will do any additional flushing needed. + */ + if (PQflush(conn) < 0) + goto sendFailed; + + return true; + +sendFailed: + PQrecyclePipelinedCommand(conn, entry); + pqHandleSendFailure(conn); + return false; +} + +/* + * PQbatchQueueProcess + * In batch mode, start processing the next query in the queue. + * + * Returns true if the next query was popped from the queue and can + * be processed by PQconsumeInput, PQgetResult, etc. + * + * Returns false if the current query isn't done yet, the connection + * is not in a batch, or there are no more queries to process. + */ +int +PQbatchProcessQueue(PGconn *conn) +{ + PGcommandQueueEntry *next_query; + + if (!conn) + return false; + + if (conn->batch_status == PQBATCH_MODE_OFF) + return false; + + switch (conn->asyncStatus) + { + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext_noop("internal error, COPY in batch mode")); + break; + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + /* client still has to process current query or results */ + return false; + break; + case PGASYNC_IDLE: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext_noop("internal error, IDLE in batch mode")); + break; + case PGASYNC_QUEUED: + /* next query please */ + break; + } + + if (conn->cmd_queue_head == NULL) + { + /* + * In batch mode but nothing left on the queue; caller can submit more + * work or PQbatchEnd() now. + */ + return false; + } + + /* + * Pop the next query from the queue and set up the connection state as if + * it'd just been dispatched from a non-batched call + */ + next_query = conn->cmd_queue_head; + conn->cmd_queue_head = next_query->next; + next_query->next = NULL; + + /* This command's results will come in immediately. + * Initialize async result-accumulation state */ + pqClearAsyncResult(conn); + + /* reset single-row processing mode */ + conn->singleRowMode = false; + + + conn->last_query = next_query->query; + next_query->query = NULL; + conn->queryclass = next_query->queryclass; + + PQrecyclePipelinedCommand(conn, next_query); + + if (conn->batch_status == PQBATCH_MODE_ABORTED && conn->queryclass != PGQUERY_SYNC) + { + /* + * In an aborted batch we don't get anything from the server for each + * result; we're just discarding input until we get to the next sync + * from the server. The client needs to know its queries got aborted + * so we create a fake PGresult to return immediately from + * PQgetResult. + */ + conn->result = PQmakeEmptyPGresult(conn, + PGRES_BATCH_ABORTED); + if (!conn->result) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("out of memory")); + pqSaveErrorResult(conn); + return false; + } + conn->asyncStatus = PGASYNC_READY; + } + else + { + /* allow parsing to continue */ + conn->asyncStatus = PGASYNC_BUSY; + } + + return true; +} + /* * PQgetResult @@ -1749,10 +2222,32 @@ PQgetResult(PGconn *conn) switch (conn->asyncStatus) { case PGASYNC_IDLE: + case PGASYNC_QUEUED: res = NULL; /* query is complete */ break; case PGASYNC_READY: res = pqPrepareAsyncResult(conn); + if (conn->batch_status != PQBATCH_MODE_OFF) + { + /* + * batched queries aren't followed by a Sync to put us back in + * PGASYNC_IDLE state, and when we do get a sync we could + * still have another batch coming after this one. + * + * The connection isn't idle since we can't submit new + * nonbatched commands. It isn't also busy since the current + * command is done and we need to process a new one. + */ + conn->asyncStatus = PGASYNC_QUEUED; + } + else + { + /* Set the state back to BUSY, allowing parsing to proceed. */ + conn->asyncStatus = PGASYNC_BUSY; + } + break; + case PGASYNC_READY_MORE: + res = pqPrepareAsyncResult(conn); /* Set the state back to BUSY, allowing parsing to proceed. */ conn->asyncStatus = PGASYNC_BUSY; break; @@ -1932,6 +2427,13 @@ PQexecStart(PGconn *conn) if (!conn) return false; + if (conn->asyncStatus == PGASYNC_QUEUED || conn->batch_status != PQBATCH_MODE_OFF) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("Synchronous command execution functions are not allowed in batch mode\n")); + return false; + } + /* * Silently discard any prior query result that application didn't eat. * This is probably poor design, but it's here for backward compatibility. @@ -2126,6 +2628,9 @@ PQsendDescribePortal(PGconn *conn, const char *portal) static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) { + PGcommandQueueEntry *pipeCmd = NULL; + PGQueryClass *queryclass; + /* Treat null desc_target as empty string */ if (!desc_target) desc_target = ""; @@ -2141,6 +2646,20 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) return 0; } + if (conn->batch_status != PQBATCH_MODE_OFF) + { + pipeCmd = PQmakePipelinedCommand(conn); + + if (pipeCmd == NULL) + return 0; /* error msg already set */ + + queryclass = &pipeCmd->queryclass; + } + else + { + queryclass = &conn->queryclass; + } + /* construct the Describe message */ if (pqPutMsgStart('D', false, conn) < 0 || pqPutc(desc_type, conn) < 0 || @@ -2149,15 +2668,18 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) goto sendFailed; /* construct the Sync message */ - if (pqPutMsgStart('S', false, conn) < 0 || - pqPutMsgEnd(conn) < 0) - goto sendFailed; + if (conn->batch_status == PQBATCH_MODE_OFF) + { + if (pqPutMsgStart('S', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + } /* remember we are doing a Describe */ - conn->queryclass = PGQUERY_DESCRIBE; + *queryclass = PGQUERY_DESCRIBE; /* reset last-query string (not relevant now) */ - if (conn->last_query) + if (conn->last_query && conn->batch_status != PQBATCH_MODE_OFF) { free(conn->last_query); conn->last_query = NULL; @@ -2171,10 +2693,14 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) goto sendFailed; /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + if (conn->batch_status != PQBATCH_MODE_OFF) + PQappendPipelinedCommand(conn, pipeCmd); + else + conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: + PQrecyclePipelinedCommand(conn, pipeCmd); pqHandleSendFailure(conn); return 0; } @@ -2569,6 +3095,13 @@ PQfn(PGconn *conn, /* clear the error string */ resetPQExpBuffer(&conn->errorMessage); + if (conn->batch_status != PQBATCH_MODE_OFF) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("Synchronous command execution functions are not allowed in batch mode\n")); + return NULL; + } + if (conn->sock == PGINVALID_SOCKET || conn->asyncStatus != PGASYNC_IDLE || conn->result != NULL) { diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c index 3b0500f..c01f1a2 100644 --- a/src/interfaces/libpq/fe-protocol2.c +++ b/src/interfaces/libpq/fe-protocol2.c @@ -412,6 +412,12 @@ pqParseInput2(PGconn *conn) { char id; + if (conn->asyncStatus == PGASYNC_QUEUED || conn->batch_status != PQBATCH_MODE_OFF) + { + fprintf(stderr, "internal error, attempt to read v2 protocol in batch mode"); + abort(); + } + /* * Loop to parse successive complete messages available in the buffer. */ diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 53776e2..e24d7ce 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -220,10 +220,18 @@ pqParseInput3(PGconn *conn) return; conn->asyncStatus = PGASYNC_READY; break; - case 'Z': /* backend is ready for new query */ + case 'Z': /* sync response, backend is ready for new query */ if (getReadyForQuery(conn)) return; - conn->asyncStatus = PGASYNC_IDLE; + if (conn->batch_status != PQBATCH_MODE_OFF) + { + conn->batch_status = PQBATCH_MODE_ON; + conn->result = PQmakeEmptyPGresult(conn, + PGRES_BATCH_END); + conn->asyncStatus = PGASYNC_READY; + } + else + conn->asyncStatus = PGASYNC_IDLE; break; case 'I': /* empty query */ if (conn->result == NULL) @@ -880,6 +888,9 @@ pqGetErrorNotice3(PGconn *conn, bool isError) PQExpBufferData workBuf; char id; + if (isError && conn->batch_status != PQBATCH_MODE_OFF) + conn->batch_status = PQBATCH_MODE_ABORTED; + /* * Since the fields might be pretty long, we create a temporary * PQExpBuffer rather than using conn->workBuffer. workBuffer is intended diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index e7496c5..321fc60 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -95,7 +95,10 @@ typedef enum PGRES_NONFATAL_ERROR, /* notice or warning message */ PGRES_FATAL_ERROR, /* query failed */ PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */ - PGRES_SINGLE_TUPLE /* single tuple from larger resultset */ + PGRES_SINGLE_TUPLE, /* single tuple from larger resultset */ + PGRES_BATCH_END, /* end of a batch of commands */ + PGRES_BATCH_ABORTED, /* Command didn't run because of an abort + * earlier in a batch */ } ExecStatusType; typedef enum @@ -134,6 +137,17 @@ typedef enum PQPING_NO_ATTEMPT /* connection not attempted (bad params) */ } PGPing; +/* + * PQBatchStatus - Current status of batch mode + */ + +typedef enum +{ + PQBATCH_MODE_OFF, + PQBATCH_MODE_ON, + PQBATCH_MODE_ABORTED +} PQBatchStatus; + /* PGconn encapsulates a connection to the backend. * The contents of this struct are not supposed to be known to applications. */ @@ -425,6 +439,14 @@ extern PGresult *PQgetResult(PGconn *conn); extern int PQisBusy(PGconn *conn); extern int PQconsumeInput(PGconn *conn); +/* Routines for batch mode management */ +extern int PQbatchStatus(PGconn *conn); +extern int PQbatchQueueCount(PGconn *conn); +extern int PQenterBatchMode(PGconn *conn); +extern int PQexitBatchMode(PGconn *conn); +extern int PQbatchSyncQueue(PGconn *conn); +extern int PQbatchProcessQueue(PGconn *conn); + /* LISTEN/NOTIFY support */ extern PGnotify *PQnotifies(PGconn *conn); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 335568b..619f5c0 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -215,10 +215,15 @@ typedef enum { PGASYNC_IDLE, /* nothing's happening, dude */ PGASYNC_BUSY, /* query in progress */ - PGASYNC_READY, /* result ready for PQgetResult */ + PGASYNC_READY, /* query done, waiting for client to fetch + * result */ + PGASYNC_READY_MORE, /* query done, waiting for client to fetch + * result, More results expected from this + * query */ PGASYNC_COPY_IN, /* Copy In data transfer in progress */ PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */ - PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */ + PGASYNC_COPY_BOTH, /* Copy In/Out data transfer in progress */ + PGASYNC_QUEUED /* Current query done, more in queue */ } PGAsyncStatusType; /* PGQueryClass tracks which query protocol we are now executing */ @@ -227,7 +232,8 @@ typedef enum PGQUERY_SIMPLE, /* simple Query protocol (PQexec) */ PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */ PGQUERY_PREPARE, /* Parse only (PQprepare) */ - PGQUERY_DESCRIBE /* Describe Statement or Portal */ + PGQUERY_DESCRIBE, /* Describe Statement or Portal */ + PGQUERY_SYNC /* A protocol sync to end a batch */ } PGQueryClass; /* PGSetenvStatusType defines the state of the PQSetenv state machine */ @@ -297,6 +303,22 @@ typedef enum pg_conn_host_type CHT_UNIX_SOCKET } pg_conn_host_type; +/* An entry in the pending command queue. Used by batch mode to keep track + * of the expected results of future commands we've dispatched. + * + * Note that entries in this list are reused by being zeroed and appended to + * the tail when popped off the head. The entry with null next pointer is not + * the end of the list of expected commands, that's the tail pointer in + * pg_conn. + */ +typedef struct pgCommandQueueEntry +{ + PGQueryClass queryclass; /* Query type; PGQUERY_SYNC for sync msg */ + char *query; /* SQL command, or NULL if unknown */ + struct pgCommandQueueEntry *next; +} PGcommandQueueEntry; + + /* * pg_conn_host stores all information about one of possibly several hosts * mentioned in the connection string. Derived by splitting the pghost @@ -384,6 +406,7 @@ struct pg_conn bool options_valid; /* true if OK to attempt connection */ bool nonblocking; /* whether this connection is using nonblock * sending semantics */ + PQBatchStatus batch_status; /* Batch(pipelining) mode status of connection */ bool singleRowMode; /* return current query result row-by-row? */ char copy_is_binary; /* 1 = copy binary, 0 = copy text */ int copy_already_done; /* # bytes already returned in COPY @@ -396,6 +419,16 @@ struct pg_conn int whichhost; /* host we're currently considering */ pg_conn_host *connhost; /* details about each possible host */ + /* + * The command queue + * + * head is the next pending cmd, tail is where we append new commands. + * Freed entries for recycling go on the recycle linked list. + */ + PGcommandQueueEntry *cmd_queue_head; + PGcommandQueueEntry *cmd_queue_tail; + PGcommandQueueEntry *cmd_queue_recycle; + /* Connection data */ pgsocket sock; /* FD for socket, PGINVALID_SOCKET if * unconnected */ @@ -684,6 +717,8 @@ extern char *libpq_ngettext(const char *msgid, const char *msgid_plural, unsigne #define libpq_ngettext(s, p, n) ((n) == 1 ? (s) : (p)) #endif +#define libpq_gettext_noop(x) (x) + /* * These macros are needed to let error-handling code be portable between * Unix and Windows. (ugh)