SVN Commit by dpage: r4229 - in trunk/pgadmin3/xtra/pgagent: . include

From: svn(at)pgadmin(dot)org
To: pgadmin-hackers(at)postgresql(dot)org
Subject: SVN Commit by dpage: r4229 - in trunk/pgadmin3/xtra/pgagent: . include
Date: 2005-05-20 21:49:23
Message-ID: 200505202149.j4KLnNhW028277@developer.pgadmin.org
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgadmin-hackers

Author: dpage
Date: 2005-05-20 22:49:23 +0100 (Fri, 20 May 2005)
New Revision: 4229

Modified:
trunk/pgadmin3/xtra/pgagent/connection.cpp
trunk/pgadmin3/xtra/pgagent/include/job.h
trunk/pgadmin3/xtra/pgagent/job.cpp
trunk/pgadmin3/xtra/pgagent/pgAgent.cpp
Log:
Add threading support to pgAgent to allow multiple jobs to start in the same timeslot. Seems to work OK with no memory leaks or sync problems, however more testing is required. Also need to add code to keep track of threads that are created so they can be killed if the app terminates.

Modified: trunk/pgadmin3/xtra/pgagent/connection.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/connection.cpp 2005-05-20 15:44:01 UTC (rev 4228)
+++ trunk/pgadmin3/xtra/pgagent/connection.cpp 2005-05-20 21:49:23 UTC (rev 4229)
@@ -13,6 +13,7 @@

DBconn *DBconn::primaryConn;
wxString DBconn::basicConnectString;
+static wxMutex s_PoolLock;

DBconn::DBconn(const wxString &name)
{
@@ -64,6 +65,8 @@

DBconn *DBconn::InitConnection(const wxString &connectString)
{
+ wxMutexLocker lock(s_PoolLock);
+
basicConnectString=connectString;
wxString dbname;

@@ -96,6 +99,8 @@

DBconn *DBconn::Get(const wxString &dbname)
{
+ wxMutexLocker lock(s_PoolLock);
+
DBconn *thisConn = primaryConn, *testConn;

// find an existing connection
@@ -132,50 +137,77 @@

void DBconn::Return()
{
+ wxMutexLocker lock(s_PoolLock);
+
LogMessage(_("Returning connection to database ") + this->dbname, LOG_DEBUG);
inUse = false;
}

void DBconn::ClearConnections(bool all)
{
+ wxMutexLocker lock(s_PoolLock);
+
if (all)
LogMessage(_("Clearing all connections"), LOG_DEBUG);
else
LogMessage(_("Clearing inactive connections"), LOG_DEBUG);

DBconn *thisConn=primaryConn, *deleteConn;
+ int total=0, free=0, deleted=0;

- // Find the last connection
- while (thisConn->next != 0)
- thisConn = thisConn->next;
+ if (thisConn)
+ {

- // Delete connections as required
- // If a connection is not in use, delete it, and reset the next and previous
- // pointers appropriately. If it is in use, don't touch it.
- while (thisConn->prev != 0)
- {
- if ((!thisConn->inUse) || all)
+ total++;
+
+ // Find the last connection
+ while (thisConn->next != 0)
{
- deleteConn = thisConn;
+ total++;
+
+ if (!thisConn->inUse)
+ free++;

- thisConn = deleteConn->prev;
-
- thisConn->next = deleteConn->next;
-
- if (deleteConn->next)
- deleteConn->next->prev = deleteConn->prev;
-
- delete deleteConn;
+ thisConn = thisConn->next;
}
- else
+ if (!thisConn->inUse)
+ free++;
+
+ // Delete connections as required
+ // If a connection is not in use, delete it, and reset the next and previous
+ // pointers appropriately. If it is in use, don't touch it.
+ while (thisConn->prev != 0)
{
- thisConn = thisConn->prev;
+ if ((!thisConn->inUse) || all)
+ {
+ deleteConn = thisConn;
+ thisConn = deleteConn->prev;
+ thisConn->next = deleteConn->next;
+ if (deleteConn->next)
+ deleteConn->next->prev = deleteConn->prev;
+ delete deleteConn;
+ deleted++;
+ }
+ else
+ {
+ thisConn = thisConn->prev;
+ }
}
+
+ if (all)
+ {
+ delete thisConn;
+ deleted++;
+ }
+
+ wxString tmp;
+ tmp.Printf(_("Connection stats: total - %d, free - %d, deleted - %d"), total, free, deleted);
+ LogMessage(tmp, LOG_DEBUG);
+
}
+ else
+ LogMessage(_("No connections found!"), LOG_DEBUG);

- if (all)
- delete thisConn;
-
}

Modified: trunk/pgadmin3/xtra/pgagent/include/job.h
===================================================================
--- trunk/pgadmin3/xtra/pgagent/include/job.h 2005-05-20 15:44:01 UTC (rev 4228)
+++ trunk/pgadmin3/xtra/pgagent/include/job.h 2005-05-20 21:49:23 UTC (rev 4229)
@@ -37,11 +37,14 @@
public:
JobThread(const wxString &jid);
~JobThread();
+ bool Runnable() { return runnable; }

virtual void *Entry();

private:
wxString jobid;
+ bool runnable;
+ Job *job;
};

#endif // JOB_H

Modified: trunk/pgadmin3/xtra/pgagent/job.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/job.cpp 2005-05-20 15:44:01 UTC (rev 4228)
+++ trunk/pgadmin3/xtra/pgagent/job.cpp 2005-05-20 21:49:23 UTC (rev 4229)
@@ -11,12 +11,16 @@

#include "pgAgent.h"

+wxSemaphore *getDb;
+
Job::Job(DBconn *conn, const wxString &jid)
{
threadConn=conn;
jobid=jid;
status=wxT("");

+ LogMessage(_("Starting job: ") + jobid, LOG_DEBUG);
+
int rc=threadConn->ExecuteVoid(
wxT("UPDATE pgagent.pga_job SET jobagentid=") + backendPid + wxT(", joblastrun=now() ")
wxT(" WHERE jobagentid IS NULL AND jobid=") + jobid);
@@ -58,6 +62,8 @@
);
}
threadConn->Return();
+
+ LogMessage(_("Completed job: ") + jobid, LOG_DEBUG);
}


@@ -117,7 +123,7 @@
stepConn=DBconn::Get(steps->GetString(wxT("jstdbname")));
if (stepConn)
{
- LogMessage(_("Executing step ") + stepid + _(" on database ") + steps->GetString(wxT("jstdbname")), LOG_DEBUG);
+ LogMessage(_("Executing step ") + stepid + _(" (part of job ") + jobid + wxT(")"), LOG_DEBUG);
rc=stepConn->ExecuteVoid(steps->GetString(wxT("jstcode")));
stepConn->Return();
}
@@ -168,7 +174,16 @@
: wxThread(wxTHREAD_DETACHED)
{
LogMessage(_("Creating job thread for job ") + jid, LOG_DEBUG);
+
+ runnable = false;
jobid = jid;
+
+ DBconn *threadConn=DBconn::Get(serviceDBname);
+ job = new Job(threadConn, jobid);
+
+ if (job->Runnable())
+ runnable = true;
+
}


@@ -180,7 +195,11 @@

void *JobThread::Entry()
{
- LogMessage(_("Running job thread for job ") + jobid, LOG_DEBUG);
+ if (runnable)
+ {
+ job->Execute();
+ delete job;
+ }

return(NULL);
}
\ No newline at end of file

Modified: trunk/pgadmin3/xtra/pgagent/pgAgent.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/pgAgent.cpp 2005-05-20 15:44:01 UTC (rev 4228)
+++ trunk/pgadmin3/xtra/pgagent/pgAgent.cpp 2005-05-20 21:49:23 UTC (rev 4229)
@@ -89,32 +89,25 @@

if (res)
{
- wxString jobid=res->GetString(wxT("jobid"));
- delete res;
+ while(res->HasData())
+ {
+ wxString jobid=res->GetString(wxT("jobid"));

- if (jobid != wxT(""))
- {
- DBconn *threadConn=DBconn::Get(serviceDBname);
- Job job(threadConn, jobid);
+ JobThread *jt = new JobThread(jobid);
+
+ if (jt->Runnable())
+ {
+ jt->Create();
+ jt->Run();
+ foundJobToExecute = true;
+ }
+ res->MoveNext();

- if (job.Runnable())
- {
- foundJobToExecute=true;
- LogMessage(_("Running job: ") + jobid, LOG_DEBUG);
-
- // JobThread *jt = new JobThread(jobid);
- // jt->Run();
- // jt->Wait();
-
- job.Execute();
- LogMessage(_("Completed job: ") + jobid, LOG_DEBUG);
- }
- }
- else
- {
- LogMessage(_("No jobs to run - sleeping..."), LOG_DEBUG);
- WaitAWhile();
- }
+ }
+
+ delete res;
+ LogMessage(_("Sleeping..."), LOG_DEBUG);
+ WaitAWhile();
}
else
{

Browse pgadmin-hackers by date

  From Date Subject
Next Message OpenMacNews 2005-05-21 18:29:16 PgAdmin3 on OSX 10.4.1 w/ gcc4
Previous Message Florian G. Pflug 2005-05-20 16:42:30 Re: pgAdmin3.app library problem