SVN Commit by dpage: r4219 - in trunk/pgadmin3/xtra/pgagent: . include

From: svn(at)pgadmin(dot)org
To: pgadmin-hackers(at)postgresql(dot)org
Subject: SVN Commit by dpage: r4219 - in trunk/pgadmin3/xtra/pgagent: . include
Date: 2005-05-19 14:53:32
Message-ID: 200505191453.j4JErWtx014110@developer.pgadmin.org
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgadmin-hackers

Author: dpage
Date: 2005-05-19 15:53:32 +0100 (Thu, 19 May 2005)
New Revision: 4219

Modified:
trunk/pgadmin3/xtra/pgagent/connection.cpp
trunk/pgadmin3/xtra/pgagent/include/connection.h
trunk/pgadmin3/xtra/pgagent/include/job.h
trunk/pgadmin3/xtra/pgagent/include/pgAgent.h
trunk/pgadmin3/xtra/pgagent/job.cpp
trunk/pgadmin3/xtra/pgagent/misc.cpp
trunk/pgadmin3/xtra/pgagent/pgAgent.cpp
trunk/pgadmin3/xtra/pgagent/pgAgent.dsp
trunk/pgadmin3/xtra/pgagent/unix.cpp
trunk/pgadmin3/xtra/pgagent/win32.cpp
Log:
Dynamic connection pooling in preparation for threaded operation.

Modified: trunk/pgadmin3/xtra/pgagent/connection.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/connection.cpp 2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/connection.cpp 2005-05-19 14:53:32 UTC (rev 4219)
@@ -13,18 +13,15 @@
#include <libpq-fe.h>
#include <time.h>

-
-// entries in the connection pool
-int connPoolCount=5;
-
-
-DBconn **DBconn::pool=0;
+DBconn *DBconn::primaryConn;
string DBconn::basicConnectString;

-
DBconn::DBconn(const string &name)
{
dbname = name;
+ inUse = false;
+ next=0;
+ prev=0;
Connect(basicConnectString + " dbname=" + dbname);
}

@@ -32,6 +29,9 @@
DBconn::DBconn(const string &connectString, const string &name)
{
dbname = name;
+ inUse = false;
+ next=0;
+ prev=0;
Connect(connectString);
}

@@ -65,21 +65,12 @@

DBconn *DBconn::InitConnection(const string &connectString)
{
- if (!pool)
- {
- pool = new DBconn*[connPoolCount];
- if (pool)
- memset(pool, 0, sizeof(DBconn*) * connPoolCount);
- }
- if (!pool)
- LogMessage("Out of memory for connection pool", LOG_ERROR);
-
basicConnectString=connectString;
string dbname;

int pos=basicConnectString.find("dbname=");
if (pos == -1)
- dbname = "dba";
+ dbname = "pgadmin";
else
{
dbname = basicConnectString.substr(pos+7);
@@ -93,89 +84,84 @@
dbname = dbname.substr(0, pos);
}
}
- pool[0] = new DBconn(connectString, dbname);
- pool[0]->primary = true;
+ primaryConn = new DBconn(connectString, dbname);

- return pool[0];
+ if (!primaryConn)
+ LogMessage("Failed to create primary connection!", LOG_ERROR);
+
+ primaryConn->inUse = true;
+
+ return primaryConn;
}


-DBconn *DBconn::Get(const string &dbname, bool asPrimary)
+DBconn *DBconn::Get(const string &dbname)
{
- if (!pool)
- {
- pool = new DBconn*[connPoolCount];
- if (pool)
- memset(pool, 0, sizeof(DBconn*) * connPoolCount);
- }
- if (!pool)
- LogMessage("Out of memory for connection pool", LOG_ERROR);
+ DBconn *thisConn = primaryConn, *testConn;

- int i;
- DBconn **emptyConn=0, **oldestConn=0;
-
// find an existing connection
- for (i=0 ; i < connPoolCount ; i++)
+ do
{
- if (pool[i])
- {
- if (dbname == pool[i]->dbname)
- return pool[i];
+ if (dbname == thisConn->dbname && !thisConn->inUse)
+ {
+ LogMessage("Allocating existing connection to database " + thisConn->dbname, LOG_DEBUG);
+ thisConn->inUse = true;
+ return thisConn;
+ }

- // while searching, also mark the oldest non-primary connection
- if (!pool[i]->primary)
- {
- if (!oldestConn || pool[i]->timestamp < (*oldestConn)->timestamp)
- oldestConn=pool+i;
- }
- }
- else
- {
- // while searching, mark the first empty slot
- if (!emptyConn)
- emptyConn=pool+i;
- }
- }
- if (!emptyConn)
- {
- delete *oldestConn;
- emptyConn=oldestConn;
- *emptyConn=0;
- }
+ testConn = thisConn;
+ if (thisConn->next != 0)
+ thisConn = thisConn->next;

- DBconn *conn=new DBconn(dbname);
- if (conn->conn)
+ } while (testConn->next != 0);
+
+
+ // No suitable connection was found, so create a new one.
+ DBconn *newConn=new DBconn(dbname);
+ if (newConn->conn)
{
- *emptyConn=conn;
+ LogMessage("Allocating new connection to database " + newConn->dbname, LOG_DEBUG);
+ newConn->inUse = true;
+ newConn->prev = thisConn;
+ thisConn->next = newConn;
}
- return *emptyConn;
+ else
+ LogMessage("Failed to create new connection to database: " + dbname, LOG_ERROR);
+
+ return newConn;
}

+void DBconn::Return()
+{
+ LogMessage("Returning connection to database " + this->dbname, LOG_DEBUG);
+ inUse = false;
+}

void DBconn::ClearConnections(bool all)
{
- // clears all connections, except for the primary one.
- // if all is true, even the primary connection will be killed.
- if (pool)
- {
- int i;
- for (i=0 ; i < connPoolCount ; i++)
- {
- if (pool[i])
- {
- if (all || !pool[i]->primary)
- {
- delete pool[i];
- pool[i]=0;
- }
- }
- }
- if (all)
- {
- delete[] pool;
- pool=0;
- }
- }
+ if (all)
+ LogMessage("Clearing all connections", LOG_DEBUG);
+ else
+ LogMessage("Clearing all connections except the primary", LOG_DEBUG);
+
+ DBconn *thisConn=primaryConn, *deleteConn;
+
+ // Find the last connection
+ while (thisConn->next != 0)
+ thisConn = thisConn->next;
+
+ // Delete connections as required
+ while (thisConn->prev != 0)
+ {
+ deleteConn = thisConn;
+ thisConn = deleteConn->prev;
+ delete deleteConn;
+ thisConn->next = 0;
+ }
+
+ if (all)
+ delete thisConn;
+
}


@@ -205,6 +191,18 @@
return rows;
}

+string DBconn::GetLastError()
+{
+ // Return the last error message, minus any trailing line ends
+ if (lastError.substr(lastError.length()-2, 2) == "\r\n") // DOS
+ return lastError.substr(0, lastError.length()-2);
+ else if (lastError.substr(lastError.length()-1, 1) == "\n") // Unix
+ return lastError.substr(0, lastError.length()-1);
+ else if (lastError.substr(lastError.length()-1, 1) == "\r") // Mac
+ return lastError.substr(0, lastError.length()-1);
+ else
+ return lastError;
+}

///////////////////////////////////////////////////////7

Modified: trunk/pgadmin3/xtra/pgagent/include/connection.h
===================================================================
--- trunk/pgadmin3/xtra/pgagent/include/connection.h 2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/include/connection.h 2005-05-19 14:53:32 UTC (rev 4219)
@@ -16,7 +16,6 @@
#include <libpq-fe.h>

class DBresult;
-extern int connPoolCount;


class DBconn
@@ -27,29 +26,31 @@
~DBconn();

public:
- static DBconn *Get(const string &dbname, bool asPrimary=false);
+ static DBconn *Get(const string &dbname);
static DBconn *InitConnection(const string &connectString);

static void ClearConnections(bool allIncludingPrimary=false);
static void SetBasicConnectString(const string &bcs) { basicConnectString = bcs; }
- string GetLastError() { return lastError; }
+
+ string GetLastError();
string GetDBname() {return dbname; }
bool IsValid() { return conn != 0; }

DBresult *Execute(const string &query);
int ExecuteVoid(const string &query);
+ void Return();

private:
bool DBconn::Connect(const string &connectString);

protected:
static string basicConnectString;
- static DBconn **pool;
+ static DBconn *primaryConn;

string dbname, lastError;
PGconn *conn;
- long timestamp;
- bool primary;
+ DBconn *next, *prev;
+ bool inUse;

friend class DBresult;

Modified: trunk/pgadmin3/xtra/pgagent/include/job.h
===================================================================
--- trunk/pgadmin3/xtra/pgagent/include/job.h 2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/include/job.h 2005-05-19 14:53:32 UTC (rev 4219)
@@ -23,7 +23,7 @@
bool Runnable() { return status == "r"; }

protected:
- DBconn *serviceConn;
+ DBconn *threadConn;
string jobid, logid;
string status;
};

Modified: trunk/pgadmin3/xtra/pgagent/include/pgAgent.h
===================================================================
--- trunk/pgadmin3/xtra/pgagent/include/pgAgent.h 2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/include/pgAgent.h 2005-05-19 14:53:32 UTC (rev 4219)
@@ -29,6 +29,7 @@
extern long minLogLevel;
extern string connectString;
extern string serviceDBname;
+extern string backendPid;

// Log levels
enum

Modified: trunk/pgadmin3/xtra/pgagent/job.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/job.cpp 2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/job.cpp 2005-05-19 14:53:32 UTC (rev 4219)
@@ -15,23 +15,23 @@

Job::Job(DBconn *conn, const string &jid)
{
- serviceConn=conn;
+ threadConn=conn;
jobid=jid;
status="";

- int rc=serviceConn->ExecuteVoid(
- "UPDATE pgagent.pga_job SET jobagentid=pg_backend_pid(), joblastrun=now() "
+ int rc=threadConn->ExecuteVoid(
+ "UPDATE pgagent.pga_job SET jobagentid=" + backendPid + ", joblastrun=now() "
" WHERE jobagentid IS NULL AND jobid=" + jobid);

if (rc == 1)
{
- DBresult *id=serviceConn->Execute(
+ DBresult *id=threadConn->Execute(
"SELECT nextval('pgagent.pga_joblog_jlgid_seq') AS id");
if (id)
{
logid=id->GetString("id");

- DBresult *res=serviceConn->Execute(
+ DBresult *res=threadConn->Execute(
"INSERT INTO pgagent.pga_joblog(jlgid, jlgjobid, jlgstatus) "
"VALUES (" + logid + ", " + jobid + ", 'r')");
if (res)
@@ -49,7 +49,7 @@
{
if (status != "")
{
- serviceConn->ExecuteVoid(
+ threadConn->ExecuteVoid(
"UPDATE pgagent.pga_joblog "
" SET jlgstatus='" + status + "', jlgduration=now() - jlgstart "
" WHERE jlgid=" + logid + ";\n"
@@ -59,13 +59,14 @@
" WHERE jobid=" + jobid
);
}
+ threadConn->Return();
}


int Job::Execute()
{
int rc=0;
- DBresult *steps=serviceConn->Execute(
+ DBresult *steps=threadConn->Execute(
"SELECT jstid, jstkind, jstdbname, jstcode, jstonerror "
" FROM pgagent.pga_jobstep "
" WHERE jstenabled "
@@ -80,17 +81,17 @@

while (steps->HasData())
{
- DBconn *conn;
+ DBconn *stepConn;
string jslid, stepid, jpecode;

stepid = steps->GetString("jstid");

- DBresult *id=serviceConn->Execute(
+ DBresult *id=threadConn->Execute(
"SELECT nextval('pgagent.pga_jobsteplog_jslid_seq') AS id");
if (id)
{
jslid=id->GetString("id");
- DBresult *res=serviceConn->Execute(
+ DBresult *res=threadConn->Execute(
"INSERT INTO pgagent.pga_jobsteplog(jslid, jsljlgid, jsljstid, jslstatus) "
"SELECT " + jslid + ", " + logid + ", " + stepid + ", 'r'"
" FROM pgagent.pga_jobstep WHERE jstid=" + stepid);
@@ -115,11 +116,12 @@
{
case 's':
{
- conn=DBconn::Get(steps->GetString("jstdbname"));
- if (conn)
+ stepConn=DBconn::Get(steps->GetString("jstdbname"));
+ if (stepConn)
{
LogMessage("Executing step " + stepid + " on database " + steps->GetString("jstdbname"), LOG_DEBUG);
- rc=conn->ExecuteVoid(steps->GetString("jstcode"));
+ rc=stepConn->ExecuteVoid(steps->GetString("jstcode"));
+ stepConn->Return();
}
else
rc=-1;
@@ -144,7 +146,7 @@
else
stepstatus = steps->GetString("jstonerror");

- rc=serviceConn->ExecuteVoid(
+ rc=threadConn->ExecuteVoid(
"UPDATE pgagent.pga_jobsteplog "
" SET jslduration = now() - jslstart, "
" jslresult = " + NumToStr(rc) + ", jslstatus = '" + stepstatus + "' "

Modified: trunk/pgadmin3/xtra/pgagent/misc.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/misc.cpp 2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/misc.cpp 2005-05-19 14:53:32 UTC (rev 4219)
@@ -64,13 +64,6 @@
longWait = val;
break;
}
- case 'c':
- {
- int val = atoi(getArg(argc, argv).c_str());
- if (val >= 5)
- connPoolCount = val;
- break;
- }
case 'l':
{
int val = atoi(getArg(argc, argv).c_str());

Modified: trunk/pgadmin3/xtra/pgagent/pgAgent.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/pgAgent.cpp 2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/pgAgent.cpp 2005-05-19 14:53:32 UTC (rev 4219)
@@ -11,6 +11,8 @@

#include "pgAgent.h"

+#include <wx/thread.h>
+
#ifdef WIN32
#include <winsock2.h>
#else
@@ -19,12 +21,12 @@

string connectString;
string serviceDBname;
+string backendPid;
long longWait=30;
long shortWait=10;
long minLogLevel=LOG_ERROR;


-
int MainRestartLoop(DBconn *serviceConn)
{
// clean up old jobs
@@ -65,8 +67,6 @@
);
}

-
-
char hostname[255];
gethostname(hostname, 255);

@@ -96,18 +96,20 @@

if (jobid != "")
{
- Job job(serviceConn, jobid);
+ DBconn *threadConn=DBconn::Get(serviceDBname);
+ Job job(threadConn, jobid);

if (job.Runnable())
{
foundJobToExecute=true;
LogMessage("Running job: " + jobid, LOG_DEBUG);
job.Execute();
+ LogMessage("Completed job: " + jobid, LOG_DEBUG);
}
}
else
{
- LogMessage("No jobs to run - time for a pint :-)", LOG_DEBUG);
+ LogMessage("No jobs to run - sleeping...", LOG_DEBUG);
WaitAWhile();
}
}
@@ -124,28 +126,34 @@

void MainLoop()
{
- // Basic sanity check
- LogMessage("Database sanity check", LOG_DEBUG);
- DBconn *sanityConn=DBconn::Get(serviceDBname, true);
- DBresult *res=sanityConn->Execute("SELECT count(*) As count FROM pg_class cl JOIN pg_namespace ns ON ns.oid=relnamespace WHERE relname='pga_job' AND nspname='pgagent'");
- if (res)
- {
- string val=res->GetString("count");
-
- if (val == "0")
- LogMessage("Could not find the table 'pgagent.pga_job'. Have you run pgagent.sql on this database?", LOG_ERROR);
- }

// OK, let's get down to business
do
{
- DBconn *serviceConn=DBconn::Get(serviceDBname, true);
+ LogMessage("Creating primary connection", LOG_DEBUG);
+ DBconn *serviceConn=DBconn::InitConnection(connectString);
+
+ if (serviceConn && serviceConn->IsValid())
+ {
+ serviceDBname = serviceConn->GetDBname();

- if (serviceConn)
- {
+ // Basic sanity check, and a chance to get the serviceConn's PID
+ LogMessage("Database sanity check", LOG_DEBUG);
+ DBresult *res=serviceConn->Execute("SELECT count(*) As count, pg_backend_pid() AS pid FROM pg_class cl JOIN pg_namespace ns ON ns.oid=relnamespace WHERE relname='pga_job' AND nspname='pgagent'");
+ if (res)
+ {
+ string val=res->GetString("count");
+
+ if (val == "0")
+ LogMessage("Could not find the table 'pgagent.pga_job'. Have you run pgagent.sql on this database?", LOG_ERROR);
+
+ backendPid=res->GetString("pid");
+ }
+
MainRestartLoop(serviceConn);
}

+ LogMessage("Couldn't create connection: " + serviceConn->GetLastError(), LOG_WARNING);
DBconn::ClearConnections(true);
WaitAWhile(true);
}

Modified: trunk/pgadmin3/xtra/pgagent/pgAgent.dsp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/pgAgent.dsp 2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/pgAgent.dsp 2005-05-19 14:53:32 UTC (rev 4219)
@@ -70,7 +70,7 @@
# PROP Ignore_Export_Lib 0
# PROP Target_Dir ""
# ADD BASE CPP /nologo /W3 /Gm /GX /ZI /Od /I "include" /I "c:/program files/postgresql/8.0/include" /D "WIN32" /D "_DEBUG" /D "_WINDOWS" /D "_MBCS" /D "_MT" /Yu"pgAgent.h" /FD /GZ /c
-# ADD CPP /nologo /MT /W3 /GX /O2 /I "include" /I "c:/program files/postgresql/8.0/include" /I "c:/wxWidgets-2.6/include" /I "c:/wxWidgets-2.6/contrib/include" /D "WIN32" /D "_WINDOWS" /D "_MBCS" /D "_MT" /FR /Yu"pgamsgevent.h" /FD /c
+# ADD CPP /nologo /MT /W3 /GX /O2 /I "include" /I "c:/program files/postgresql/8.0/include" /I "c:/wxWidgets-2.6/include" /I "c:/wxWidgets-2.6/contrib/include" /D "WIN32" /D "_WINDOWS" /D "_MBCS" /D "_MT" /FR /Yu"pgAgent.h" /FD /c
# ADD BASE RSC /l 0x407 /d "_DEBUG"
# ADD RSC /l 0x809 /i "c:/wxWidgets-2.6/include"
BSC32=bscmake.exe

Modified: trunk/pgadmin3/xtra/pgagent/unix.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/unix.cpp 2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/unix.cpp 2005-05-19 14:53:32 UTC (rev 4219)
@@ -25,7 +25,6 @@
"options:\n"
"-t <poll time interval in seconds (default 10)>\n"
"-r <retry period after connection abort in seconds (>=10, default 30)>\n"
- "-c <connection pool size (>=5, default 5)>\n"
"-l <logging verbosity (ERROR=0, WARNING=1, DEBUG=2, default 0)>\n"
);
}
@@ -87,12 +86,6 @@

setOptions(argc, argv);

- DBconn *conn=DBconn::InitConnection(connectString);
- if (!conn->IsValid())
- LogMessage("Invalid connection: " + conn->GetLastError(), LOG_ERROR);
-
- serviceDBname = conn->GetDBname();
-
daemonize();

MainLoop();

Modified: trunk/pgadmin3/xtra/pgagent/win32.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/win32.cpp 2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/win32.cpp 2005-05-19 14:53:32 UTC (rev 4219)
@@ -328,7 +328,6 @@
"-d <displayname>\n"
"-t <poll time interval in seconds (default 10)>\n"
"-r <retry period after connection abort in seconds (>=10, default 30)>\n"
- "-c <connection pool size (>=5, default 5)>\n"
"-l <logging verbosity (ERROR=0, WARNING=1, DEBUG=2, default 0)>\n"
);
}
@@ -337,19 +336,16 @@

////////////////////////////////////////////////////////////

-void setupForRun(int argc, char **argv)
+void setupForRun(int argc, char **argv, bool debug=false)
{
- eventHandle = RegisterEventSource(0, serviceName.c_str());
- if (!eventHandle)
- LogMessage("Couldn't register event handle.", LOG_ERROR);
+ if (!debug)
+ {
+ eventHandle = RegisterEventSource(0, serviceName.c_str());
+ if (!eventHandle)
+ LogMessage("Couldn't register event handle.", LOG_ERROR);
+ }

setOptions(argc, argv);
-
- DBconn *conn=DBconn::InitConnection(connectString);
- if (!conn->IsValid())
- LogMessage("Invalid connection: " + conn->GetLastError(), LOG_ERROR);
-
- serviceDBname = conn->GetDBname();
}


@@ -368,7 +364,7 @@

if (command == "INSTALL")
{
- string displayname = "pgAgent " + serviceName;
+ string displayname = "PostgreSQL scheduling agent - " + serviceName;
string arg = executable + " RUN " + serviceName;

while (argc-- > 0)
@@ -416,7 +412,7 @@
}
else if (command == "DEBUG")
{
- setupForRun(argc, argv);
+ setupForRun(argc, argv, true);

initService();
#if START_SUSPENDED

Browse pgadmin-hackers by date

  From Date Subject
Next Message Raphaël Enrici 2005-05-19 17:07:55 Re: New acinclude.m4
Previous Message Adam H.Pendleton 2005-05-19 11:57:10 Re: New acinclude.m4