Skip site navigation (1) Skip section navigation (2)

SVN Commit by dpage: r4219 - in trunk/pgadmin3/xtra/pgagent: . include

From: svn(at)pgadmin(dot)org
To: pgadmin-hackers(at)postgresql(dot)org
Subject: SVN Commit by dpage: r4219 - in trunk/pgadmin3/xtra/pgagent: . include
Date: 2005-05-19 14:53:32
Message-ID: 200505191453.j4JErWtx014110@developer.pgadmin.org (view raw, whole thread or download thread mbox)
Thread:
Lists: pgadmin-hackers
Author: dpage
Date: 2005-05-19 15:53:32 +0100 (Thu, 19 May 2005)
New Revision: 4219

Modified:
   trunk/pgadmin3/xtra/pgagent/connection.cpp
   trunk/pgadmin3/xtra/pgagent/include/connection.h
   trunk/pgadmin3/xtra/pgagent/include/job.h
   trunk/pgadmin3/xtra/pgagent/include/pgAgent.h
   trunk/pgadmin3/xtra/pgagent/job.cpp
   trunk/pgadmin3/xtra/pgagent/misc.cpp
   trunk/pgadmin3/xtra/pgagent/pgAgent.cpp
   trunk/pgadmin3/xtra/pgagent/pgAgent.dsp
   trunk/pgadmin3/xtra/pgagent/unix.cpp
   trunk/pgadmin3/xtra/pgagent/win32.cpp
Log:
Dynamic connection pooling in preparation for threaded operation.

Modified: trunk/pgadmin3/xtra/pgagent/connection.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/connection.cpp	2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/connection.cpp	2005-05-19 14:53:32 UTC (rev 4219)
@@ -13,18 +13,15 @@
 #include <libpq-fe.h>
 #include <time.h>
 
-
-// entries in the connection pool
-int connPoolCount=5;
-
-
-DBconn **DBconn::pool=0;
+DBconn *DBconn::primaryConn;
 string DBconn::basicConnectString;
 
-
 DBconn::DBconn(const string &name)
 {
     dbname = name;
+	inUse = false;
+	next=0;
+	prev=0;
     Connect(basicConnectString  + " dbname=" + dbname);
 }
 
@@ -32,6 +29,9 @@
 DBconn::DBconn(const string &connectString, const string &name)
 {
     dbname = name;
+	inUse = false;
+	next=0;
+	prev=0;
     Connect(connectString);
 }
 
@@ -65,21 +65,12 @@
 
 DBconn *DBconn::InitConnection(const string &connectString)
 {
-    if (!pool)
-    {
-        pool = new DBconn*[connPoolCount];
-        if (pool)
-            memset(pool, 0, sizeof(DBconn*) * connPoolCount);
-    }
-    if (!pool)
-        LogMessage("Out of memory for connection pool", LOG_ERROR);
-
     basicConnectString=connectString;
     string dbname;
 
     int pos=basicConnectString.find("dbname=");
     if (pos == -1)
-        dbname = "dba";
+        dbname = "pgadmin";
     else
     {
         dbname = basicConnectString.substr(pos+7);
@@ -93,89 +84,84 @@
             dbname = dbname.substr(0, pos);
         }
     }
-    pool[0] = new DBconn(connectString, dbname);
-    pool[0]->primary = true;
+    primaryConn = new DBconn(connectString, dbname);
 
-    return pool[0];
+    if (!primaryConn)
+        LogMessage("Failed to create primary connection!", LOG_ERROR);
+
+	primaryConn->inUse = true;
+
+    return primaryConn;
 }
 
 
-DBconn *DBconn::Get(const string &dbname, bool asPrimary)
+DBconn *DBconn::Get(const string &dbname)
 {
-    if (!pool)
-    {
-        pool = new DBconn*[connPoolCount];
-        if (pool)
-            memset(pool, 0, sizeof(DBconn*) * connPoolCount);
-    }
-    if (!pool)
-        LogMessage("Out of memory for connection pool", LOG_ERROR);
+	DBconn *thisConn = primaryConn, *testConn;
 
-    int i;
-    DBconn **emptyConn=0, **oldestConn=0;
-
     // find an existing connection
-    for (i=0 ; i < connPoolCount ; i++)
+    do
     {
-        if (pool[i])
-        {
-            if (dbname == pool[i]->dbname)
-                return pool[i];
+        if (dbname == thisConn->dbname && !thisConn->inUse)
+		{
+			LogMessage("Allocating existing connection to database " + thisConn->dbname, LOG_DEBUG);
+			thisConn->inUse = true;
+            return thisConn;
+		}
 
-            // while searching, also mark the oldest non-primary connection
-            if (!pool[i]->primary)
-            {
-                if (!oldestConn || pool[i]->timestamp < (*oldestConn)->timestamp)
-                    oldestConn=pool+i;
-            }
-        }
-        else
-        {
-            // while searching, mark the first empty slot
-            if (!emptyConn)
-                emptyConn=pool+i;
-        }
-    }
-    if (!emptyConn)
-    {
-        delete *oldestConn;
-        emptyConn=oldestConn;
-        *emptyConn=0;
-    }
+		testConn = thisConn;
+		if (thisConn->next != 0)
+		    thisConn = thisConn->next;
 
-    DBconn *conn=new DBconn(dbname);
-    if (conn->conn)
+    } while (testConn->next != 0);
+
+
+	// No suitable connection was found, so create a new one.
+    DBconn *newConn=new DBconn(dbname);
+    if (newConn->conn)
     {
-        *emptyConn=conn;
+		LogMessage("Allocating new connection to database " + newConn->dbname, LOG_DEBUG);
+		newConn->inUse = true;
+		newConn->prev = thisConn;
+		thisConn->next = newConn;
     }
-    return *emptyConn;
+	else
+		LogMessage("Failed to create new connection to database: " + dbname, LOG_ERROR);
+
+    return newConn;
 }
 
+void DBconn::Return()
+{
+	LogMessage("Returning connection to database " + this->dbname, LOG_DEBUG);
+	inUse = false;
+}
 
 void DBconn::ClearConnections(bool all)
 {
-    // clears all connections, except for the primary one.
-    // if all is true, even the primary connection will be killed.
-    if (pool)
-    {
-        int i;
-        for (i=0 ; i < connPoolCount ; i++)
-        {
-            if (pool[i])
-            {
-                if (all || !pool[i]->primary)
-                {
-                    delete pool[i];
-                    pool[i]=0;
-                }
-            }
-        }
-        if (all)
-        {
-            delete[] pool;
-            pool=0;
-        }
-    }
+	if (all)
+		LogMessage("Clearing all connections", LOG_DEBUG);
+	else
+		LogMessage("Clearing all connections except the primary", LOG_DEBUG);
+
+	DBconn *thisConn=primaryConn, *deleteConn;
+
+	// Find the last connection
+	while (thisConn->next != 0)
+		thisConn = thisConn->next;
+
+	// Delete connections as required
+	while (thisConn->prev != 0)
+	{
+		deleteConn = thisConn;
+		thisConn = deleteConn->prev;
+		delete deleteConn;
+		thisConn->next = 0;
+	}
+
+	if (all)
+		delete thisConn;
+
 }
 
 
@@ -205,6 +191,18 @@
     return rows;
 }
 
+string DBconn::GetLastError() 
+{ 
+	// Return the last error message, minus any trailing line ends
+	if (lastError.substr(lastError.length()-2, 2) == "\r\n") // DOS
+	    return lastError.substr(0, lastError.length()-2); 
+    else if (lastError.substr(lastError.length()-1, 1) == "\n") // Unix
+	    return lastError.substr(0, lastError.length()-1);
+    else if (lastError.substr(lastError.length()-1, 1) == "\r") // Mac
+	    return lastError.substr(0, lastError.length()-1);
+	else
+		return lastError;
+}
 
 ///////////////////////////////////////////////////////7
 

Modified: trunk/pgadmin3/xtra/pgagent/include/connection.h
===================================================================
--- trunk/pgadmin3/xtra/pgagent/include/connection.h	2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/include/connection.h	2005-05-19 14:53:32 UTC (rev 4219)
@@ -16,7 +16,6 @@
 #include <libpq-fe.h>
 
 class DBresult;
-extern int connPoolCount;
 
 
 class DBconn
@@ -27,29 +26,31 @@
     ~DBconn();
 
 public:
-    static DBconn *Get(const string &dbname, bool asPrimary=false);
+    static DBconn *Get(const string &dbname);
     static DBconn *InitConnection(const string &connectString);
 
     static void ClearConnections(bool allIncludingPrimary=false);
     static void SetBasicConnectString(const string &bcs) { basicConnectString = bcs; }
-    string GetLastError() { return lastError; }
+
+    string GetLastError();
     string GetDBname() {return dbname; }
     bool IsValid() { return conn != 0; }
 
     DBresult *Execute(const string &query);
     int ExecuteVoid(const string &query);
+	void Return();
 
 private:
     bool DBconn::Connect(const string &connectString);
 
 protected:
     static string basicConnectString;
-    static DBconn **pool;
+	static DBconn *primaryConn;
 
     string dbname, lastError;
     PGconn *conn;
-    long timestamp;
-    bool primary;
+	DBconn *next, *prev;
+    bool inUse;
 
     friend class DBresult;
 

Modified: trunk/pgadmin3/xtra/pgagent/include/job.h
===================================================================
--- trunk/pgadmin3/xtra/pgagent/include/job.h	2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/include/job.h	2005-05-19 14:53:32 UTC (rev 4219)
@@ -23,7 +23,7 @@
     bool Runnable() { return status == "r"; }
 
 protected:
-    DBconn *serviceConn;
+    DBconn *threadConn;
     string jobid, logid;
     string status;
 };

Modified: trunk/pgadmin3/xtra/pgagent/include/pgAgent.h
===================================================================
--- trunk/pgadmin3/xtra/pgagent/include/pgAgent.h	2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/include/pgAgent.h	2005-05-19 14:53:32 UTC (rev 4219)
@@ -29,6 +29,7 @@
 extern long minLogLevel;
 extern string connectString;
 extern string serviceDBname;
+extern string backendPid;
 
 // Log levels
 enum

Modified: trunk/pgadmin3/xtra/pgagent/job.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/job.cpp	2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/job.cpp	2005-05-19 14:53:32 UTC (rev 4219)
@@ -15,23 +15,23 @@
 
 Job::Job(DBconn *conn, const string &jid)
 {
-    serviceConn=conn;
+    threadConn=conn;
     jobid=jid;
     status="";
 
-    int rc=serviceConn->ExecuteVoid(
-        "UPDATE pgagent.pga_job SET jobagentid=pg_backend_pid(), joblastrun=now() "
+    int rc=threadConn->ExecuteVoid(
+        "UPDATE pgagent.pga_job SET jobagentid=" + backendPid + ", joblastrun=now() "
         " WHERE jobagentid IS NULL AND jobid=" + jobid);
 
     if (rc == 1)
     {
-		DBresult *id=serviceConn->Execute(
+		DBresult *id=threadConn->Execute(
 			"SELECT nextval('pgagent.pga_joblog_jlgid_seq') AS id");
 		if (id)
 		{
 			logid=id->GetString("id");
 
-			DBresult *res=serviceConn->Execute(
+			DBresult *res=threadConn->Execute(
 				"INSERT INTO pgagent.pga_joblog(jlgid, jlgjobid, jlgstatus) "
 	            "VALUES (" + logid + ", " + jobid + ", 'r')");
 			if (res)
@@ -49,7 +49,7 @@
 {
     if (status != "")
     {
-        serviceConn->ExecuteVoid(
+        threadConn->ExecuteVoid(
             "UPDATE pgagent.pga_joblog "
             "   SET jlgstatus='" + status + "', jlgduration=now() - jlgstart "
             " WHERE jlgid=" + logid + ";\n"
@@ -59,13 +59,14 @@
             " WHERE jobid=" + jobid
             );
     }
+	threadConn->Return();
 }
 
 
 int Job::Execute()
 {
     int rc=0;
-    DBresult *steps=serviceConn->Execute(
+    DBresult *steps=threadConn->Execute(
         "SELECT jstid, jstkind, jstdbname, jstcode, jstonerror "
         "  FROM pgagent.pga_jobstep "
         " WHERE jstenabled "
@@ -80,17 +81,17 @@
 
     while (steps->HasData())
     {
-        DBconn *conn;
+        DBconn *stepConn;
         string jslid, stepid, jpecode;
 
         stepid = steps->GetString("jstid");
         
-		DBresult *id=serviceConn->Execute(
+		DBresult *id=threadConn->Execute(
 			"SELECT nextval('pgagent.pga_jobsteplog_jslid_seq') AS id");
 		if (id)
 		{
 			jslid=id->GetString("id");
-			DBresult *res=serviceConn->Execute(
+			DBresult *res=threadConn->Execute(
 				"INSERT INTO pgagent.pga_jobsteplog(jslid, jsljlgid, jsljstid, jslstatus) "
 				"SELECT " + jslid + ", " + logid + ", " + stepid + ", 'r'"
 				"  FROM pgagent.pga_jobstep WHERE jstid=" + stepid);
@@ -115,11 +116,12 @@
         {
             case 's':
             {
-                conn=DBconn::Get(steps->GetString("jstdbname"));
-                if (conn)
+                stepConn=DBconn::Get(steps->GetString("jstdbname"));
+                if (stepConn)
                 {
                     LogMessage("Executing step " + stepid + " on database " + steps->GetString("jstdbname"), LOG_DEBUG);
-                    rc=conn->ExecuteVoid(steps->GetString("jstcode"));
+                    rc=stepConn->ExecuteVoid(steps->GetString("jstcode"));
+					stepConn->Return();
                 }
                 else
                     rc=-1;
@@ -144,7 +146,7 @@
         else
             stepstatus = steps->GetString("jstonerror");
 
-        rc=serviceConn->ExecuteVoid(
+        rc=threadConn->ExecuteVoid(
             "UPDATE pgagent.pga_jobsteplog "
             "   SET jslduration = now() - jslstart, "
             "       jslresult = " + NumToStr(rc) + ", jslstatus = '" + stepstatus + "' "

Modified: trunk/pgadmin3/xtra/pgagent/misc.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/misc.cpp	2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/misc.cpp	2005-05-19 14:53:32 UTC (rev 4219)
@@ -64,13 +64,6 @@
                         longWait = val;
                     break;
                 }
-                case 'c':
-                {
-                    int val = atoi(getArg(argc, argv).c_str());
-                    if (val >= 5)
-                        connPoolCount = val;
-                    break;
-                }
                 case 'l':
                 {
                     int val = atoi(getArg(argc, argv).c_str());

Modified: trunk/pgadmin3/xtra/pgagent/pgAgent.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/pgAgent.cpp	2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/pgAgent.cpp	2005-05-19 14:53:32 UTC (rev 4219)
@@ -11,6 +11,8 @@
 
 #include "pgAgent.h"
 
+#include <wx/thread.h>
+
 #ifdef WIN32
 #include <winsock2.h>
 #else
@@ -19,12 +21,12 @@
 
 string connectString;
 string serviceDBname;
+string backendPid;
 long longWait=30;
 long shortWait=10;
 long minLogLevel=LOG_ERROR;
 
 
-
 int MainRestartLoop(DBconn *serviceConn)
 {
     // clean up old jobs
@@ -65,8 +67,6 @@
             );
     }
 
-
-
     char hostname[255];
     gethostname(hostname, 255);
 
@@ -96,18 +96,20 @@
 
             if (jobid != "")
             {
-                Job job(serviceConn, jobid);
+				DBconn *threadConn=DBconn::Get(serviceDBname);
+                Job job(threadConn, jobid);
 
                 if (job.Runnable())
                 {
                     foundJobToExecute=true;
                     LogMessage("Running job: " + jobid, LOG_DEBUG);
                     job.Execute();
+					LogMessage("Completed job: " + jobid, LOG_DEBUG);
                 }
             }
             else
             {
-				LogMessage("No jobs to run - time for a pint :-)", LOG_DEBUG);
+				LogMessage("No jobs to run - sleeping...", LOG_DEBUG);
                 WaitAWhile();
             }
         }
@@ -124,28 +126,34 @@
 
 void MainLoop()
 {
-    // Basic sanity check
-	LogMessage("Database sanity check", LOG_DEBUG);
-    DBconn *sanityConn=DBconn::Get(serviceDBname, true);
-    DBresult *res=sanityConn->Execute("SELECT count(*) As count FROM pg_class cl JOIN pg_namespace ns ON ns.oid=relnamespace WHERE relname='pga_job' AND nspname='pgagent'");
-    if (res)
-    {
-        string val=res->GetString("count");
-        
-        if (val == "0")
-            LogMessage("Could not find the table 'pgagent.pga_job'. Have you run pgagent.sql on this database?", LOG_ERROR);
-    }
     
     // OK, let's get down to business 
     do
     {
-        DBconn *serviceConn=DBconn::Get(serviceDBname, true);
+	    LogMessage("Creating primary connection", LOG_DEBUG);
+        DBconn *serviceConn=DBconn::InitConnection(connectString);
+    
+		if (serviceConn && serviceConn->IsValid())
+        {
+            serviceDBname = serviceConn->GetDBname();
 
-        if (serviceConn)
-        {
+			// Basic sanity check, and a chance to get the serviceConn's PID
+	        LogMessage("Database sanity check", LOG_DEBUG);
+            DBresult *res=serviceConn->Execute("SELECT count(*) As count, pg_backend_pid() AS pid FROM pg_class cl JOIN pg_namespace ns ON ns.oid=relnamespace WHERE relname='pga_job' AND nspname='pgagent'");
+            if (res)
+			{
+                string val=res->GetString("count");
+            
+                if (val == "0")
+                    LogMessage("Could not find the table 'pgagent.pga_job'. Have you run pgagent.sql on this database?", LOG_ERROR);
+    
+	    	    backendPid=res->GetString("pid");
+			}
+        
             MainRestartLoop(serviceConn);
         }
 
+		LogMessage("Couldn't create connection: " + serviceConn->GetLastError(), LOG_WARNING);
         DBconn::ClearConnections(true);
         WaitAWhile(true);
     }

Modified: trunk/pgadmin3/xtra/pgagent/pgAgent.dsp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/pgAgent.dsp	2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/pgAgent.dsp	2005-05-19 14:53:32 UTC (rev 4219)
@@ -70,7 +70,7 @@
 # PROP Ignore_Export_Lib 0
 # PROP Target_Dir ""
 # ADD BASE CPP /nologo /W3 /Gm /GX /ZI /Od /I "include" /I "c:/program files/postgresql/8.0/include" /D "WIN32" /D "_DEBUG" /D "_WINDOWS" /D "_MBCS" /D "_MT" /Yu"pgAgent.h" /FD /GZ /c
-# ADD CPP /nologo /MT /W3 /GX /O2 /I "include" /I "c:/program files/postgresql/8.0/include" /I "c:/wxWidgets-2.6/include" /I "c:/wxWidgets-2.6/contrib/include" /D "WIN32" /D "_WINDOWS" /D "_MBCS" /D "_MT" /FR /Yu"pgamsgevent.h" /FD /c
+# ADD CPP /nologo /MT /W3 /GX /O2 /I "include" /I "c:/program files/postgresql/8.0/include" /I "c:/wxWidgets-2.6/include" /I "c:/wxWidgets-2.6/contrib/include" /D "WIN32" /D "_WINDOWS" /D "_MBCS" /D "_MT" /FR /Yu"pgAgent.h" /FD /c
 # ADD BASE RSC /l 0x407 /d "_DEBUG"
 # ADD RSC /l 0x809 /i "c:/wxWidgets-2.6/include"
 BSC32=bscmake.exe

Modified: trunk/pgadmin3/xtra/pgagent/unix.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/unix.cpp	2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/unix.cpp	2005-05-19 14:53:32 UTC (rev 4219)
@@ -25,7 +25,6 @@
         "options:\n"
         "-t <poll time interval in seconds (default 10)>\n"
         "-r <retry period after connection abort in seconds (>=10, default 30)>\n"
-        "-c <connection pool size (>=5, default 5)>\n"
         "-l <logging verbosity (ERROR=0, WARNING=1, DEBUG=2, default 0)>\n"
         );
 }
@@ -87,12 +86,6 @@
 
     setOptions(argc, argv);
 
-    DBconn *conn=DBconn::InitConnection(connectString);
-    if (!conn->IsValid())
-        LogMessage("Invalid connection: " + conn->GetLastError(), LOG_ERROR);
-
-    serviceDBname = conn->GetDBname();
-
     daemonize();
 
     MainLoop();

Modified: trunk/pgadmin3/xtra/pgagent/win32.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/win32.cpp	2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/win32.cpp	2005-05-19 14:53:32 UTC (rev 4219)
@@ -328,7 +328,6 @@
         "-d <displayname>\n"
         "-t <poll time interval in seconds (default 10)>\n"
         "-r <retry period after connection abort in seconds (>=10, default 30)>\n"
-        "-c <connection pool size (>=5, default 5)>\n"
         "-l <logging verbosity (ERROR=0, WARNING=1, DEBUG=2, default 0)>\n"
         );
 }
@@ -337,19 +336,16 @@
 
 ////////////////////////////////////////////////////////////
 
-void setupForRun(int argc, char **argv)
+void setupForRun(int argc, char **argv, bool debug=false)
 {
-    eventHandle = RegisterEventSource(0, serviceName.c_str());
-    if (!eventHandle)
-        LogMessage("Couldn't register event handle.", LOG_ERROR);
+	if (!debug)
+	{
+		eventHandle = RegisterEventSource(0, serviceName.c_str());
+		if (!eventHandle)
+			LogMessage("Couldn't register event handle.", LOG_ERROR);
+	}
 
     setOptions(argc, argv);
-
-    DBconn *conn=DBconn::InitConnection(connectString);
-    if (!conn->IsValid())
-        LogMessage("Invalid connection: " + conn->GetLastError(), LOG_ERROR);
-
-    serviceDBname = conn->GetDBname();
 }
 
         
@@ -368,7 +364,7 @@
 
     if (command == "INSTALL")
     {
-        string displayname = "pgAgent " + serviceName;
+        string displayname = "PostgreSQL scheduling agent - " + serviceName;
         string arg = executable + " RUN " + serviceName;
 
         while (argc-- > 0)
@@ -416,7 +412,7 @@
     }
     else if (command == "DEBUG")
     {
-        setupForRun(argc, argv);
+        setupForRun(argc, argv, true);
 
         initService();
 #if START_SUSPENDED


pgadmin-hackers by date

Next:From: Raphaƫl EnriciDate: 2005-05-19 17:07:55
Subject: Re: New acinclude.m4
Previous:From: Adam H.PendletonDate: 2005-05-19 11:57:10
Subject: Re: New acinclude.m4

Privacy Policy | About PostgreSQL
Copyright © 1996-2017 The PostgreSQL Global Development Group