Skip site navigation (1) Skip section navigation (2)

SVN Commit by dpage: r4229 - in trunk/pgadmin3/xtra/pgagent: . include

From: svn(at)pgadmin(dot)org
To: pgadmin-hackers(at)postgresql(dot)org
Subject: SVN Commit by dpage: r4229 - in trunk/pgadmin3/xtra/pgagent: . include
Date: 2005-05-20 21:49:23
Message-ID: 200505202149.j4KLnNhW028277@developer.pgadmin.org (view raw or flat)
Thread:
Lists: pgadmin-hackers
Author: dpage
Date: 2005-05-20 22:49:23 +0100 (Fri, 20 May 2005)
New Revision: 4229

Modified:
   trunk/pgadmin3/xtra/pgagent/connection.cpp
   trunk/pgadmin3/xtra/pgagent/include/job.h
   trunk/pgadmin3/xtra/pgagent/job.cpp
   trunk/pgadmin3/xtra/pgagent/pgAgent.cpp
Log:
Add threading support to pgAgent to allow multiple jobs to start in the same timeslot. Seems to work OK with no memory leaks or sync problems, however more testing is required. Also need to add code to keep track of threads that are created so they can be killed if the app terminates.

Modified: trunk/pgadmin3/xtra/pgagent/connection.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/connection.cpp	2005-05-20 15:44:01 UTC (rev 4228)
+++ trunk/pgadmin3/xtra/pgagent/connection.cpp	2005-05-20 21:49:23 UTC (rev 4229)
@@ -13,6 +13,7 @@
 
 DBconn *DBconn::primaryConn;
 wxString DBconn::basicConnectString;
+static wxMutex s_PoolLock;
 
 DBconn::DBconn(const wxString &name)
 {
@@ -64,6 +65,8 @@
 
 DBconn *DBconn::InitConnection(const wxString &connectString)
 {
+	wxMutexLocker lock(s_PoolLock);
+
     basicConnectString=connectString;
     wxString dbname;
 
@@ -96,6 +99,8 @@
 
 DBconn *DBconn::Get(const wxString &dbname)
 {
+	wxMutexLocker lock(s_PoolLock);
+
 	DBconn *thisConn = primaryConn, *testConn;
 
     // find an existing connection
@@ -132,50 +137,77 @@
 
 void DBconn::Return()
 {
+	wxMutexLocker lock(s_PoolLock);
+
 	LogMessage(_("Returning connection to database ") + this->dbname, LOG_DEBUG);
 	inUse = false;
 }
 
 void DBconn::ClearConnections(bool all)
 {
+	wxMutexLocker lock(s_PoolLock);
+
 	if (all)
 		LogMessage(_("Clearing all connections"), LOG_DEBUG);
 	else
 		LogMessage(_("Clearing inactive connections"), LOG_DEBUG);
 
 	DBconn *thisConn=primaryConn, *deleteConn;
+	int total=0, free=0, deleted=0;
 
-	// Find the last connection
-	while (thisConn->next != 0)
-		thisConn = thisConn->next;
+	if (thisConn)
+	{
 
-	// Delete connections as required
-	// If a connection is not in use, delete it, and reset the next and previous
-	// pointers appropriately. If it is in use, don't touch it.
-	while (thisConn->prev != 0)
-	{
-		if ((!thisConn->inUse) || all)
+		total++;
+
+		// Find the last connection
+		while (thisConn->next != 0)
 		{
-			deleteConn = thisConn;
+			total++;
+			
+			if (!thisConn->inUse)
+				free++;
 
-			thisConn = deleteConn->prev;
-			
-			thisConn->next = deleteConn->next;
-			
-			if (deleteConn->next)
-				deleteConn->next->prev = deleteConn->prev;
-			
-			delete deleteConn;
+			thisConn = thisConn->next;
 		}
-		else
+		if (!thisConn->inUse)
+			free++;
+
+		// Delete connections as required
+		// If a connection is not in use, delete it, and reset the next and previous
+		// pointers appropriately. If it is in use, don't touch it.
+		while (thisConn->prev != 0)
 		{
-			thisConn = thisConn->prev;
+			if ((!thisConn->inUse) || all)
+			{
+				deleteConn = thisConn;
+				thisConn = deleteConn->prev;
+				thisConn->next = deleteConn->next;
+				if (deleteConn->next)
+					deleteConn->next->prev = deleteConn->prev;
+				delete deleteConn;
+				deleted++;
+			}
+			else
+			{
+				thisConn = thisConn->prev;
+			}
 		}
+
+		if (all)
+		{
+			delete thisConn;
+			deleted++;
+		}
+
+		wxString tmp;
+		tmp.Printf(_("Connection stats: total - %d, free - %d, deleted - %d"), total, free, deleted);
+		LogMessage(tmp, LOG_DEBUG);
+
 	}
+	else
+		LogMessage(_("No connections found!"), LOG_DEBUG);
 
-	if (all)
-		delete thisConn;
-
 }
 
 

Modified: trunk/pgadmin3/xtra/pgagent/include/job.h
===================================================================
--- trunk/pgadmin3/xtra/pgagent/include/job.h	2005-05-20 15:44:01 UTC (rev 4228)
+++ trunk/pgadmin3/xtra/pgagent/include/job.h	2005-05-20 21:49:23 UTC (rev 4229)
@@ -37,11 +37,14 @@
 public:
     JobThread(const wxString &jid);
     ~JobThread();
+	bool Runnable() { return runnable; }
 
 	virtual void *Entry();
 
 private:
 	wxString jobid;
+	bool runnable;
+	Job *job;
 };
 
 #endif // JOB_H

Modified: trunk/pgadmin3/xtra/pgagent/job.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/job.cpp	2005-05-20 15:44:01 UTC (rev 4228)
+++ trunk/pgadmin3/xtra/pgagent/job.cpp	2005-05-20 21:49:23 UTC (rev 4229)
@@ -11,12 +11,16 @@
 
 #include "pgAgent.h"
 
+wxSemaphore *getDb;
+
 Job::Job(DBconn *conn, const wxString &jid)
 {
     threadConn=conn;
     jobid=jid;
     status=wxT("");
 
+	LogMessage(_("Starting job: ") + jobid, LOG_DEBUG);
+
     int rc=threadConn->ExecuteVoid(
         wxT("UPDATE pgagent.pga_job SET jobagentid=") + backendPid + wxT(", joblastrun=now() ")
         wxT(" WHERE jobagentid IS NULL AND jobid=") + jobid);
@@ -58,6 +62,8 @@
             );
     }
 	threadConn->Return();
+
+	LogMessage(_("Completed job: ") + jobid, LOG_DEBUG);
 }
 
 
@@ -117,7 +123,7 @@
                 stepConn=DBconn::Get(steps->GetString(wxT("jstdbname")));
                 if (stepConn)
                 {
-                    LogMessage(_("Executing step ") + stepid + _(" on database ") + steps->GetString(wxT("jstdbname")), LOG_DEBUG);
+                    LogMessage(_("Executing step ") + stepid + _(" (part of job ") + jobid + wxT(")"), LOG_DEBUG);
                     rc=stepConn->ExecuteVoid(steps->GetString(wxT("jstcode")));
 					stepConn->Return();
                 }
@@ -168,7 +174,16 @@
 : wxThread(wxTHREAD_DETACHED)
 { 
 	LogMessage(_("Creating job thread for job ") + jid, LOG_DEBUG); 
+	
+	runnable = false;
 	jobid = jid; 
+
+	DBconn *threadConn=DBconn::Get(serviceDBname);
+    job = new Job(threadConn, jobid);
+
+    if (job->Runnable())
+        runnable = true;
+
 }
     
 
@@ -180,7 +195,11 @@
 
 void *JobThread::Entry()
 {
-	LogMessage(_("Running job thread for job ") + jobid, LOG_DEBUG);
+	if (runnable)
+	{
+		job->Execute();
+		delete job;
+	}
 
 	return(NULL);
 }
\ No newline at end of file

Modified: trunk/pgadmin3/xtra/pgagent/pgAgent.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/pgAgent.cpp	2005-05-20 15:44:01 UTC (rev 4228)
+++ trunk/pgadmin3/xtra/pgagent/pgAgent.cpp	2005-05-20 21:49:23 UTC (rev 4229)
@@ -89,32 +89,25 @@
 
         if (res)
         {
-            wxString jobid=res->GetString(wxT("jobid"));
-            delete res;
+			while(res->HasData())
+			{
+				wxString jobid=res->GetString(wxT("jobid"));
 
-            if (jobid != wxT(""))
-            {
-				DBconn *threadConn=DBconn::Get(serviceDBname);
-                Job job(threadConn, jobid);
+				JobThread *jt = new JobThread(jobid);
+	
+				if (jt->Runnable())
+				{
+					jt->Create();
+					jt->Run();
+					foundJobToExecute = true;
+				}
+				res->MoveNext();
 
-                if (job.Runnable())
-                {
-                    foundJobToExecute=true;
-                    LogMessage(_("Running job: ") + jobid, LOG_DEBUG);
-					
-					// JobThread *jt = new JobThread(jobid);
-					// jt->Run();
-					// jt->Wait();
-                    
-					job.Execute();
-					LogMessage(_("Completed job: ") + jobid, LOG_DEBUG);
-                }
-            }
-            else
-            {
-				LogMessage(_("No jobs to run - sleeping..."), LOG_DEBUG);
-                WaitAWhile();
-            }
+			}
+
+			delete res;
+			LogMessage(_("Sleeping..."), LOG_DEBUG);
+            WaitAWhile();
         }
         else
         {


pgadmin-hackers by date

Next:From: OpenMacNewsDate: 2005-05-21 18:29:16
Subject: PgAdmin3 on OSX 10.4.1 w/ gcc4
Previous:From: Florian G. PflugDate: 2005-05-20 16:42:30
Subject: Re: pgAdmin3.app library problem

Privacy Policy | About PostgreSQL
Copyright © 1996-2014 The PostgreSQL Global Development Group