Re: Error on PQputline()

From: "Dann Corbit" <DCorbit(at)connx(dot)com>
To: "Tom Lane" <tgl(at)sss(dot)pgh(dot)pa(dot)us>
Cc: <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Error on PQputline()
Date: 2002-05-17 23:18:16
Message-ID: D90A5A6C612A39408103E6ECDD77B82920CE77@voyager.corporate.connx.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

> -----Original Message-----
> From: Tom Lane [mailto:tgl(at)sss(dot)pgh(dot)pa(dot)us]
> Sent: Friday, May 17, 2002 4:10 PM
> To: Dann Corbit
> Cc: pgsql-hackers(at)postgresql(dot)org
> Subject: Re: [HACKERS] Error on PQputline()
>
>
> "Dann Corbit" <DCorbit(at)connx(dot)com> writes:
> > The contents of the error message are:
> > conn->errorMessage.data 0x00312440 "pqFlush() --
> couldn't send data:
> > errno=0
> > No error A non-blocking socket operation could not be completed
> > immediately.
>
> You're running libpq with the nonblocking mode selected?

Actually no. It should be the default mode for a connection made by
PQconnectdb(). That's what made the error so puzzling.

> > What is the correct recovery action?
>
> Redesign libpq's nonblock mode :-(. It's a mess; a quick hack that
> doesn't even try to cover all cases, and is unreliable in the ones it
> does cover. You can find my previous rants on the subject in the
> archives from a couple years back (around Jan '00 I believe). IMHO
> we should never have accepted that patch at all.
>
> Short of that, don't use the COPY code with nonblock.

I am trying to figure out if it is faster to bulk copy from a file on
the server or using an API from the client. It boils down to this:

"Would it be faster to write a file to disk and read it again on the
local host for the server or to send the calls via libpq client
messages?"

It could be that the TCP/IP overhead exceeds the overhead of writing the
file to disk and reading it again.

I have a data statement (in test.h) that consists of 1.6 million rows of
data to spin into the database.

Here is the complete program:

#include <windows.h>
#include <stdlib.h>
#include <time.h>
#include "libpq-fe.h"
#include "glob.h" /* member variables in the objects */

#include "test.h"

int init_comm(void)
{
WORD wVersionRequested;
WSADATA wsaData;
int err;

wVersionRequested = MAKEWORD(2, 2);

err = WSAStartup(wVersionRequested, &wsaData);
if (err != 0) {
/* Tell the user that we could not find a usable */
/* WinSock DLL. */
return 0;
}
return 1;
}

void ProcessTuples(void);

int ExecuteImmediate(char *command, Qtype q_t)
{
int problem = 0;
#ifdef _DEBUG
printf("%s\n", command);
#endif
result = PQexec(conn, command);
switch (rc = PQresultStatus(result)) {

/* We should never actually call this. Left in for debugging...
*/
/* All tuple processing is handled low-level to pass data back
to
* CONNX */

case PGRES_TUPLES_OK: /* Data set successfully created */
#ifdef _DEBUG
printf("#rows affected %s\n", PQcmdTuples(result));
#endif
ProcessTuples();
break;
case PGRES_EMPTY_QUERY: /* Empty query supplied -- do nothing...
*/
case PGRES_COMMAND_OK: /* Query succeeds, but returns no
results */
/* If we did a select, we should (at least) have a result set of
* empty tuples. */
if (q_t == QUERY_TYPE_SELECT)
problem = 1;
break;
case PGRES_BAD_RESPONSE:
case PGRES_NONFATAL_ERROR:
case PGRES_FATAL_ERROR:
{
problem = 1;
}
}
if (q_t == QUERY_TYPE_INSERT) {
InsertedOID = PQoidValue(result);
#ifdef _DEBUG
printf("OID of inserted row is %lu\n", (unsigned long)
InsertedOID);
#endif
}
PQclear(result);
return problem;
}

void HandleProblem(void)
{
const char *m1 = PQresStatus(rc);
const char *m2 = PQresultErrorMessage(result);
#ifdef __cplusplus
String err = m1;
err = err + m2;
throw Mcnew CPOSTGRESQLException(conn, rc, (LPCSTR) err,
szSQLState);
#endif
#ifdef _DEBUG
printf("status is %s\n", m1);
printf("result message: %s\n", m2);
#endif
}

void BeginTrans(void)
{
int problem;
problem = ExecuteImmediate("BEGIN work", QUERY_TYPE_TRANSACT);
if (problem)
HandleProblem();
}

void CommitTrans(void)
{
int problem;

problem = ExecuteImmediate("COMMIT work", QUERY_TYPE_TRANSACT);
if (problem)
HandleProblem();
}

void RollbackTrans(void)
{
int problem;

problem = ExecuteImmediate("ROLLBACK work", QUERY_TYPE_TRANSACT);
if (problem)
HandleProblem();
}

void ProcessTuples()
{
nrows = PQntuples(result);
nfields = PQnfields(result);
#ifdef _DEBUG
printf("number of rows returned = %d\n", nrows);
printf("number of fields returned = %d\n", nfields);
#endif
for (r = 0; r < nrows; r++) {
for (n = 0; n < nfields; n++)
printf(" %s = %s(%d),",
PQfname(result, n),
PQgetvalue(result, r, n),
PQgetlength(result, r, n));
printf("\n");
}
}

static long cursor_number = 0;

int main(void)
{
int problem;
int i = 0;

struct tm *newtime;
time_t aclock;

if (init_comm()) {
conn = PQconnectdb("dbname=connxdatasync host=dannfast");
if (PQstatus(conn) == CONNECTION_OK) {
char insert_sql[256];
printf("connection made\n");
} else {
printf("connection failed\n");
return EXIT_FAILURE;
}

puts("DROP TABLE cnx_ds_sis_bill_detl_tb started");
problem = ExecuteImmediate("DROP TABLE cnx_ds_sis_bill_detl_tb",
QUERY_TYPE_OTHER);
if (problem)
HandleProblem();

puts("DROP TABLE cnx_ds_sis_bill_detl_tb finished");
puts("CREATE TABLE cnx_ds_sis_bill_detl_tb started");
problem = ExecuteImmediate("CREATE TABLE cnx_ds_sis_bill_detl_tb
( extr_stu_id char(10), term_cyt char(5), subcode char(5), tran_seq
int2, crc int8)", QUERY_TYPE_OTHER);
if (problem)
HandleProblem();

puts("CREATE TABLE cnx_ds_sis_bill_detl_tb finished");
puts("going to start bulk copy...");

time(&aclock);
newtime = localtime(&aclock);
puts(asctime(newtime));
result = PQexec(conn, "COPY cnx_ds_sis_bill_detl_tb FROM STDIN
DELIMITERS '|'");
problem = 0;
switch (rc = PQresultStatus(result)) {
case PGRES_BAD_RESPONSE:
case PGRES_NONFATAL_ERROR:
case PGRES_FATAL_ERROR:
{
problem = 1;
}
}

if (problem)
HandleProblem();

puts("done with initialization...");

while (pszBCPdata[i])
{
if (PQputline(conn, pszBCPdata[i++]) == EOF)
printf("Error inserting data on row %d\n",
i-1);
}

PQputline(conn, "\\.\n");

PQendcopy(conn);

puts("finished with bulk copy...");

time(&aclock);
newtime = localtime(&aclock);
puts(asctime(newtime));

return EXIT_SUCCESS;
}
puts("initialization of winsock failed.");
return EXIT_FAILURE;
}

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Tom Lane 2002-05-17 23:22:59 Re: Updated CREATE FUNCTION syntax
Previous Message Tom Lane 2002-05-17 23:10:04 Re: Error on PQputline()