SVN Commit by dpage: r4229 - in trunk/pgadmin3/xtra/pgagent: . include - Mailing list pgadmin-hackers

From svn@pgadmin.org
Subject SVN Commit by dpage: r4229 - in trunk/pgadmin3/xtra/pgagent: . include
Date
Msg-id 200505202149.j4KLnNhW028277@developer.pgadmin.org
Whole thread Raw
List pgadmin-hackers
Author: dpage
Date: 2005-05-20 22:49:23 +0100 (Fri, 20 May 2005)
New Revision: 4229

Modified:
   trunk/pgadmin3/xtra/pgagent/connection.cpp
   trunk/pgadmin3/xtra/pgagent/include/job.h
   trunk/pgadmin3/xtra/pgagent/job.cpp
   trunk/pgadmin3/xtra/pgagent/pgAgent.cpp
Log:
Add threading support to pgAgent to allow multiple jobs to start in the same timeslot. Seems to work OK with no memory
leaksor sync problems, however more testing is required. Also need to add code to keep track of threads that are
createdso they can be killed if the app terminates. 

Modified: trunk/pgadmin3/xtra/pgagent/connection.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/connection.cpp    2005-05-20 15:44:01 UTC (rev 4228)
+++ trunk/pgadmin3/xtra/pgagent/connection.cpp    2005-05-20 21:49:23 UTC (rev 4229)
@@ -13,6 +13,7 @@

 DBconn *DBconn::primaryConn;
 wxString DBconn::basicConnectString;
+static wxMutex s_PoolLock;

 DBconn::DBconn(const wxString &name)
 {
@@ -64,6 +65,8 @@

 DBconn *DBconn::InitConnection(const wxString &connectString)
 {
+    wxMutexLocker lock(s_PoolLock);
+
     basicConnectString=connectString;
     wxString dbname;

@@ -96,6 +99,8 @@

 DBconn *DBconn::Get(const wxString &dbname)
 {
+    wxMutexLocker lock(s_PoolLock);
+
     DBconn *thisConn = primaryConn, *testConn;

     // find an existing connection
@@ -132,50 +137,77 @@

 void DBconn::Return()
 {
+    wxMutexLocker lock(s_PoolLock);
+
     LogMessage(_("Returning connection to database ") + this->dbname, LOG_DEBUG);
     inUse = false;
 }

 void DBconn::ClearConnections(bool all)
 {
+    wxMutexLocker lock(s_PoolLock);
+
     if (all)
         LogMessage(_("Clearing all connections"), LOG_DEBUG);
     else
         LogMessage(_("Clearing inactive connections"), LOG_DEBUG);

     DBconn *thisConn=primaryConn, *deleteConn;
+    int total=0, free=0, deleted=0;

-    // Find the last connection
-    while (thisConn->next != 0)
-        thisConn = thisConn->next;
+    if (thisConn)
+    {

-    // Delete connections as required
-    // If a connection is not in use, delete it, and reset the next and previous
-    // pointers appropriately. If it is in use, don't touch it.
-    while (thisConn->prev != 0)
-    {
-        if ((!thisConn->inUse) || all)
+        total++;
+
+        // Find the last connection
+        while (thisConn->next != 0)
         {
-            deleteConn = thisConn;
+            total++;
+
+            if (!thisConn->inUse)
+                free++;

-            thisConn = deleteConn->prev;
-
-            thisConn->next = deleteConn->next;
-
-            if (deleteConn->next)
-                deleteConn->next->prev = deleteConn->prev;
-
-            delete deleteConn;
+            thisConn = thisConn->next;
         }
-        else
+        if (!thisConn->inUse)
+            free++;
+
+        // Delete connections as required
+        // If a connection is not in use, delete it, and reset the next and previous
+        // pointers appropriately. If it is in use, don't touch it.
+        while (thisConn->prev != 0)
         {
-            thisConn = thisConn->prev;
+            if ((!thisConn->inUse) || all)
+            {
+                deleteConn = thisConn;
+                thisConn = deleteConn->prev;
+                thisConn->next = deleteConn->next;
+                if (deleteConn->next)
+                    deleteConn->next->prev = deleteConn->prev;
+                delete deleteConn;
+                deleted++;
+            }
+            else
+            {
+                thisConn = thisConn->prev;
+            }
         }
+
+        if (all)
+        {
+            delete thisConn;
+            deleted++;
+        }
+
+        wxString tmp;
+        tmp.Printf(_("Connection stats: total - %d, free - %d, deleted - %d"), total, free, deleted);
+        LogMessage(tmp, LOG_DEBUG);
+
     }
+    else
+        LogMessage(_("No connections found!"), LOG_DEBUG);

-    if (all)
-        delete thisConn;
-
 }



Modified: trunk/pgadmin3/xtra/pgagent/include/job.h
===================================================================
--- trunk/pgadmin3/xtra/pgagent/include/job.h    2005-05-20 15:44:01 UTC (rev 4228)
+++ trunk/pgadmin3/xtra/pgagent/include/job.h    2005-05-20 21:49:23 UTC (rev 4229)
@@ -37,11 +37,14 @@
 public:
     JobThread(const wxString &jid);
     ~JobThread();
+    bool Runnable() { return runnable; }

     virtual void *Entry();

 private:
     wxString jobid;
+    bool runnable;
+    Job *job;
 };

 #endif // JOB_H

Modified: trunk/pgadmin3/xtra/pgagent/job.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/job.cpp    2005-05-20 15:44:01 UTC (rev 4228)
+++ trunk/pgadmin3/xtra/pgagent/job.cpp    2005-05-20 21:49:23 UTC (rev 4229)
@@ -11,12 +11,16 @@

 #include "pgAgent.h"

+wxSemaphore *getDb;
+
 Job::Job(DBconn *conn, const wxString &jid)
 {
     threadConn=conn;
     jobid=jid;
     status=wxT("");

+    LogMessage(_("Starting job: ") + jobid, LOG_DEBUG);
+
     int rc=threadConn->ExecuteVoid(
         wxT("UPDATE pgagent.pga_job SET jobagentid=") + backendPid + wxT(", joblastrun=now() ")
         wxT(" WHERE jobagentid IS NULL AND jobid=") + jobid);
@@ -58,6 +62,8 @@
             );
     }
     threadConn->Return();
+
+    LogMessage(_("Completed job: ") + jobid, LOG_DEBUG);
 }


@@ -117,7 +123,7 @@
                 stepConn=DBconn::Get(steps->GetString(wxT("jstdbname")));
                 if (stepConn)
                 {
-                    LogMessage(_("Executing step ") + stepid + _(" on database ") +
steps->GetString(wxT("jstdbname")),LOG_DEBUG); 
+                    LogMessage(_("Executing step ") + stepid + _(" (part of job ") + jobid + wxT(")"), LOG_DEBUG);
                     rc=stepConn->ExecuteVoid(steps->GetString(wxT("jstcode")));
                     stepConn->Return();
                 }
@@ -168,7 +174,16 @@
 : wxThread(wxTHREAD_DETACHED)
 {
     LogMessage(_("Creating job thread for job ") + jid, LOG_DEBUG);
+
+    runnable = false;
     jobid = jid;
+
+    DBconn *threadConn=DBconn::Get(serviceDBname);
+    job = new Job(threadConn, jobid);
+
+    if (job->Runnable())
+        runnable = true;
+
 }


@@ -180,7 +195,11 @@

 void *JobThread::Entry()
 {
-    LogMessage(_("Running job thread for job ") + jobid, LOG_DEBUG);
+    if (runnable)
+    {
+        job->Execute();
+        delete job;
+    }

     return(NULL);
 }
\ No newline at end of file

Modified: trunk/pgadmin3/xtra/pgagent/pgAgent.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/pgAgent.cpp    2005-05-20 15:44:01 UTC (rev 4228)
+++ trunk/pgadmin3/xtra/pgagent/pgAgent.cpp    2005-05-20 21:49:23 UTC (rev 4229)
@@ -89,32 +89,25 @@

         if (res)
         {
-            wxString jobid=res->GetString(wxT("jobid"));
-            delete res;
+            while(res->HasData())
+            {
+                wxString jobid=res->GetString(wxT("jobid"));

-            if (jobid != wxT(""))
-            {
-                DBconn *threadConn=DBconn::Get(serviceDBname);
-                Job job(threadConn, jobid);
+                JobThread *jt = new JobThread(jobid);
+
+                if (jt->Runnable())
+                {
+                    jt->Create();
+                    jt->Run();
+                    foundJobToExecute = true;
+                }
+                res->MoveNext();

-                if (job.Runnable())
-                {
-                    foundJobToExecute=true;
-                    LogMessage(_("Running job: ") + jobid, LOG_DEBUG);
-
-                    // JobThread *jt = new JobThread(jobid);
-                    // jt->Run();
-                    // jt->Wait();
-
-                    job.Execute();
-                    LogMessage(_("Completed job: ") + jobid, LOG_DEBUG);
-                }
-            }
-            else
-            {
-                LogMessage(_("No jobs to run - sleeping..."), LOG_DEBUG);
-                WaitAWhile();
-            }
+            }
+
+            delete res;
+            LogMessage(_("Sleeping..."), LOG_DEBUG);
+            WaitAWhile();
         }
         else
         {


pgadmin-hackers by date:

Previous
From: "Florian G. Pflug"
Date:
Subject: Re: pgAdmin3.app library problem
Next
From: OpenMacNews
Date:
Subject: PgAdmin3 on OSX 10.4.1 w/ gcc4