Re: Pipeline mode and PQpipelineSync()

From: Boris Kolpackov <boris(at)codesynthesis(dot)com>
To: Alvaro Herrera <alvaro(dot)herrera(at)2ndquadrant(dot)com>
Cc: pgsql-hackers(at)lists(dot)postgresql(dot)org
Subject: Re: Pipeline mode and PQpipelineSync()
Date: 2021-07-07 15:09:41
Message-ID: boris.20210707170157@codesynthesis.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Alvaro Herrera <alvaro(dot)herrera(at)2ndquadrant(dot)com> writes:

> On 2021-Jul-07, Boris Kolpackov wrote:
>
> > I don't get any difference in behavior with this patch. That is, I
> > still get the unexpected NULL result. Does it make a difference for
> > your reproducer?
>
> Yes, the behavior changes for my repro. Is it possible for you to
> share a full program I can compile and run, plesse?

Here is the test sans the connection setup:

-----------------------------------------------------------------------

#include <libpq-fe.h>

#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <stddef.h>
#include <assert.h>
#include <sys/select.h>

// Note: hack.
//
#include <arpa/inet.h>
#define htonll(x) ((((long long)htonl(x)) << 32) + htonl((x) >> 32))

static const size_t columns = 3;

struct data
{
long long id;
long long idata;
const char* sdata;
};

static char* values[columns];
static int lengths[columns];
static int formats[columns] = {1, 1, 1};

static const unsigned int types[columns] = {
20, // int8
20, // int8
25 // text
};

static void
init (const struct data* d)
{
values[0] = (char*)&d->id;
lengths[0] = sizeof (d->id);

values[1] = (char*)&d->idata;
lengths[1] = sizeof (d->idata);

values[2] = (char*)d->sdata;
lengths[2] = strlen (d->sdata);
}

static void
execute (PGconn* conn, const struct data* ds, size_t n)
{
int sock = PQsocket (conn);
assert (sock != -1);

if (PQsetnonblocking (conn, 1) == -1 ||
PQenterPipelineMode (conn) == 0)
assert (false);

// True if we've written and read everything, respectively.
//
bool wdone = false;
bool rdone = false;

size_t wn = 0;
size_t rn = 0;

while (!rdone)
{
fd_set wds;
if (!wdone)
{
FD_ZERO (&wds);
FD_SET (sock, &wds);
}

fd_set rds;
FD_ZERO (&rds);
FD_SET (sock, &rds);

if (select (sock + 1, &rds, wdone ? NULL : &wds, NULL, NULL) == -1)
{
if (errno == EINTR)
continue;

assert (false);
}

// Try to minimize the chance of blocking the server by first processing
// the result and then sending more queries.
//
if (FD_ISSET (sock, &rds))
{
if (PQconsumeInput (conn) == 0)
assert (false);

while (PQisBusy (conn) == 0)
{
//fprintf (stderr, "PQgetResult %zu\n", rn);

PGresult* res = PQgetResult (conn);
assert (res != NULL);
ExecStatusType stat = PQresultStatus (res);

if (stat == PGRES_PIPELINE_SYNC)
{
assert (wdone && rn == n);
PQclear (res);
rdone = true;
break;
}

if (stat == PGRES_FATAL_ERROR)
{
const char* s = PQresultErrorField (res, PG_DIAG_SQLSTATE);

if (strcmp (s, "23505") == 0)
fprintf (stderr, "duplicate id at %zu\n", rn);
}

PQclear (res);
assert (rn != n);
++rn;

// We get a NULL result after each query result.
//
{
PGresult* end = PQgetResult (conn);
assert (end == NULL);
}
}
}

if (!wdone && FD_ISSET (sock, &wds))
{
// Send queries until we get blocked (write-biased). This feels like
// a better overall strategy to keep the server busy compared to
// sending one query at a time and then re-checking if there is
// anything to read because the results of INSERT/UPDATE/DELETE are
// presumably small and quite a few of them can get buffered before
// the server gets blocked.
//
for (;;)
{
if (wn != n)
{
//fprintf (stderr, "PQsendQueryPrepared %zu\n", wn);

init (ds + wn);

if (PQsendQueryPrepared (conn,
"persist_object",
(int)(columns),
values,
lengths,
formats,
1) == 0)
assert (false);

if (++wn == n)
{
if (PQpipelineSync (conn) == 0)
assert (false);
}
}

// PQflush() result:
//
// 0 -- success (queue is now empty)
// 1 -- blocked
// -1 -- error
//
int r = PQflush (conn);
assert (r != -1);

if (r == 0)
{
if (wn != n)
{
// If we continue here, then we are write-biased. And if we
// break, then we are read-biased.
//
#if 1
break;
#else
continue;
#endif
}

wdone = true;
}

break; // Blocked or done.
}
}
}

if (PQexitPipelineMode (conn) == 0 ||
PQsetnonblocking (conn, 0) == -1)
assert (false);
}

static void
test (PGconn* conn)
{
const size_t batch = 500;
struct data ds[batch];

for (size_t i = 0; i != batch; ++i)
{
ds[i].id = htonll (i == batch / 2 ? i - 1 : i); // Cause duplicate PK.
ds[i].idata = htonll (i);
ds[i].sdata = "abc";
}

// Prepare the statement.
//
{
PGresult* res = PQprepare (
conn,
"persist_object",
"INSERT INTO \"pgsql_bulk_object\" "
"(\"id\", "
"\"idata\", "
"\"sdata\") "
"VALUES "
"($1, $2, $3)",
(int)(columns),
types);
assert (PQresultStatus (res) == PGRES_COMMAND_OK);
PQclear (res);
}

// Begin transaction.
//
{
PGresult* res = PQexec (conn, "begin");
assert (PQresultStatus (res) == PGRES_COMMAND_OK);
PQclear (res);
}

execute (conn, ds, batch);

// Commit transaction.
//
{
PGresult* res = PQexec (conn, "commit");
assert (PQresultStatus (res) == PGRES_COMMAND_OK);
PQclear (res);
}
}

-----------------------------------------------------------------------

Use the following statements to (re)create the table:

DROP TABLE IF EXISTS "pgsql_bulk_object" CASCADE;

CREATE TABLE "pgsql_bulk_object" (
"id" BIGINT NOT NULL PRIMARY KEY,
"idata" BIGINT NOT NULL,
"sdata" TEXT NOT NULL);

It fails consistently for me when running against the local PostgreSQL
9.5 server (connecting via the UNIX socket):

duplicate id at 250
driver: driver.cxx:105: void execute(PGconn*, const data*, size_t): Assertion `res != NULL' failed.

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Tom Lane 2021-07-07 15:14:53 Re: PostgreSQL-13.3 parser.y with positional references by named references
Previous Message Jacob Champion 2021-07-07 15:07:14 Re: [PATCH] Pull general SASL framework out of SCRAM