SVN Commit by dpage: r4219 - in trunk/pgadmin3/xtra/pgagent: . include - Mailing list pgadmin-hackers

From svn@pgadmin.org
Subject SVN Commit by dpage: r4219 - in trunk/pgadmin3/xtra/pgagent: . include
Date
Msg-id 200505191453.j4JErWtx014110@developer.pgadmin.org
Whole thread Raw
List pgadmin-hackers
Author: dpage
Date: 2005-05-19 15:53:32 +0100 (Thu, 19 May 2005)
New Revision: 4219

Modified:
   trunk/pgadmin3/xtra/pgagent/connection.cpp
   trunk/pgadmin3/xtra/pgagent/include/connection.h
   trunk/pgadmin3/xtra/pgagent/include/job.h
   trunk/pgadmin3/xtra/pgagent/include/pgAgent.h
   trunk/pgadmin3/xtra/pgagent/job.cpp
   trunk/pgadmin3/xtra/pgagent/misc.cpp
   trunk/pgadmin3/xtra/pgagent/pgAgent.cpp
   trunk/pgadmin3/xtra/pgagent/pgAgent.dsp
   trunk/pgadmin3/xtra/pgagent/unix.cpp
   trunk/pgadmin3/xtra/pgagent/win32.cpp
Log:
Dynamic connection pooling in preparation for threaded operation.

Modified: trunk/pgadmin3/xtra/pgagent/connection.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/connection.cpp    2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/connection.cpp    2005-05-19 14:53:32 UTC (rev 4219)
@@ -13,18 +13,15 @@
 #include <libpq-fe.h>
 #include <time.h>

-
-// entries in the connection pool
-int connPoolCount=5;
-
-
-DBconn **DBconn::pool=0;
+DBconn *DBconn::primaryConn;
 string DBconn::basicConnectString;

-
 DBconn::DBconn(const string &name)
 {
     dbname = name;
+    inUse = false;
+    next=0;
+    prev=0;
     Connect(basicConnectString  + " dbname=" + dbname);
 }

@@ -32,6 +29,9 @@
 DBconn::DBconn(const string &connectString, const string &name)
 {
     dbname = name;
+    inUse = false;
+    next=0;
+    prev=0;
     Connect(connectString);
 }

@@ -65,21 +65,12 @@

 DBconn *DBconn::InitConnection(const string &connectString)
 {
-    if (!pool)
-    {
-        pool = new DBconn*[connPoolCount];
-        if (pool)
-            memset(pool, 0, sizeof(DBconn*) * connPoolCount);
-    }
-    if (!pool)
-        LogMessage("Out of memory for connection pool", LOG_ERROR);
-
     basicConnectString=connectString;
     string dbname;

     int pos=basicConnectString.find("dbname=");
     if (pos == -1)
-        dbname = "dba";
+        dbname = "pgadmin";
     else
     {
         dbname = basicConnectString.substr(pos+7);
@@ -93,89 +84,84 @@
             dbname = dbname.substr(0, pos);
         }
     }
-    pool[0] = new DBconn(connectString, dbname);
-    pool[0]->primary = true;
+    primaryConn = new DBconn(connectString, dbname);

-    return pool[0];
+    if (!primaryConn)
+        LogMessage("Failed to create primary connection!", LOG_ERROR);
+
+    primaryConn->inUse = true;
+
+    return primaryConn;
 }


-DBconn *DBconn::Get(const string &dbname, bool asPrimary)
+DBconn *DBconn::Get(const string &dbname)
 {
-    if (!pool)
-    {
-        pool = new DBconn*[connPoolCount];
-        if (pool)
-            memset(pool, 0, sizeof(DBconn*) * connPoolCount);
-    }
-    if (!pool)
-        LogMessage("Out of memory for connection pool", LOG_ERROR);
+    DBconn *thisConn = primaryConn, *testConn;

-    int i;
-    DBconn **emptyConn=0, **oldestConn=0;
-
     // find an existing connection
-    for (i=0 ; i < connPoolCount ; i++)
+    do
     {
-        if (pool[i])
-        {
-            if (dbname == pool[i]->dbname)
-                return pool[i];
+        if (dbname == thisConn->dbname && !thisConn->inUse)
+        {
+            LogMessage("Allocating existing connection to database " + thisConn->dbname, LOG_DEBUG);
+            thisConn->inUse = true;
+            return thisConn;
+        }

-            // while searching, also mark the oldest non-primary connection
-            if (!pool[i]->primary)
-            {
-                if (!oldestConn || pool[i]->timestamp < (*oldestConn)->timestamp)
-                    oldestConn=pool+i;
-            }
-        }
-        else
-        {
-            // while searching, mark the first empty slot
-            if (!emptyConn)
-                emptyConn=pool+i;
-        }
-    }
-    if (!emptyConn)
-    {
-        delete *oldestConn;
-        emptyConn=oldestConn;
-        *emptyConn=0;
-    }
+        testConn = thisConn;
+        if (thisConn->next != 0)
+            thisConn = thisConn->next;

-    DBconn *conn=new DBconn(dbname);
-    if (conn->conn)
+    } while (testConn->next != 0);
+
+
+    // No suitable connection was found, so create a new one.
+    DBconn *newConn=new DBconn(dbname);
+    if (newConn->conn)
     {
-        *emptyConn=conn;
+        LogMessage("Allocating new connection to database " + newConn->dbname, LOG_DEBUG);
+        newConn->inUse = true;
+        newConn->prev = thisConn;
+        thisConn->next = newConn;
     }
-    return *emptyConn;
+    else
+        LogMessage("Failed to create new connection to database: " + dbname, LOG_ERROR);
+
+    return newConn;
 }

+void DBconn::Return()
+{
+    LogMessage("Returning connection to database " + this->dbname, LOG_DEBUG);
+    inUse = false;
+}

 void DBconn::ClearConnections(bool all)
 {
-    // clears all connections, except for the primary one.
-    // if all is true, even the primary connection will be killed.
-    if (pool)
-    {
-        int i;
-        for (i=0 ; i < connPoolCount ; i++)
-        {
-            if (pool[i])
-            {
-                if (all || !pool[i]->primary)
-                {
-                    delete pool[i];
-                    pool[i]=0;
-                }
-            }
-        }
-        if (all)
-        {
-            delete[] pool;
-            pool=0;
-        }
-    }
+    if (all)
+        LogMessage("Clearing all connections", LOG_DEBUG);
+    else
+        LogMessage("Clearing all connections except the primary", LOG_DEBUG);
+
+    DBconn *thisConn=primaryConn, *deleteConn;
+
+    // Find the last connection
+    while (thisConn->next != 0)
+        thisConn = thisConn->next;
+
+    // Delete connections as required
+    while (thisConn->prev != 0)
+    {
+        deleteConn = thisConn;
+        thisConn = deleteConn->prev;
+        delete deleteConn;
+        thisConn->next = 0;
+    }
+
+    if (all)
+        delete thisConn;
+
 }


@@ -205,6 +191,18 @@
     return rows;
 }

+string DBconn::GetLastError()
+{
+    // Return the last error message, minus any trailing line ends
+    if (lastError.substr(lastError.length()-2, 2) == "\r\n") // DOS
+        return lastError.substr(0, lastError.length()-2);
+    else if (lastError.substr(lastError.length()-1, 1) == "\n") // Unix
+        return lastError.substr(0, lastError.length()-1);
+    else if (lastError.substr(lastError.length()-1, 1) == "\r") // Mac
+        return lastError.substr(0, lastError.length()-1);
+    else
+        return lastError;
+}

 ///////////////////////////////////////////////////////7


Modified: trunk/pgadmin3/xtra/pgagent/include/connection.h
===================================================================
--- trunk/pgadmin3/xtra/pgagent/include/connection.h    2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/include/connection.h    2005-05-19 14:53:32 UTC (rev 4219)
@@ -16,7 +16,6 @@
 #include <libpq-fe.h>

 class DBresult;
-extern int connPoolCount;


 class DBconn
@@ -27,29 +26,31 @@
     ~DBconn();

 public:
-    static DBconn *Get(const string &dbname, bool asPrimary=false);
+    static DBconn *Get(const string &dbname);
     static DBconn *InitConnection(const string &connectString);

     static void ClearConnections(bool allIncludingPrimary=false);
     static void SetBasicConnectString(const string &bcs) { basicConnectString = bcs; }
-    string GetLastError() { return lastError; }
+
+    string GetLastError();
     string GetDBname() {return dbname; }
     bool IsValid() { return conn != 0; }

     DBresult *Execute(const string &query);
     int ExecuteVoid(const string &query);
+    void Return();

 private:
     bool DBconn::Connect(const string &connectString);

 protected:
     static string basicConnectString;
-    static DBconn **pool;
+    static DBconn *primaryConn;

     string dbname, lastError;
     PGconn *conn;
-    long timestamp;
-    bool primary;
+    DBconn *next, *prev;
+    bool inUse;

     friend class DBresult;


Modified: trunk/pgadmin3/xtra/pgagent/include/job.h
===================================================================
--- trunk/pgadmin3/xtra/pgagent/include/job.h    2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/include/job.h    2005-05-19 14:53:32 UTC (rev 4219)
@@ -23,7 +23,7 @@
     bool Runnable() { return status == "r"; }

 protected:
-    DBconn *serviceConn;
+    DBconn *threadConn;
     string jobid, logid;
     string status;
 };

Modified: trunk/pgadmin3/xtra/pgagent/include/pgAgent.h
===================================================================
--- trunk/pgadmin3/xtra/pgagent/include/pgAgent.h    2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/include/pgAgent.h    2005-05-19 14:53:32 UTC (rev 4219)
@@ -29,6 +29,7 @@
 extern long minLogLevel;
 extern string connectString;
 extern string serviceDBname;
+extern string backendPid;

 // Log levels
 enum

Modified: trunk/pgadmin3/xtra/pgagent/job.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/job.cpp    2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/job.cpp    2005-05-19 14:53:32 UTC (rev 4219)
@@ -15,23 +15,23 @@

 Job::Job(DBconn *conn, const string &jid)
 {
-    serviceConn=conn;
+    threadConn=conn;
     jobid=jid;
     status="";

-    int rc=serviceConn->ExecuteVoid(
-        "UPDATE pgagent.pga_job SET jobagentid=pg_backend_pid(), joblastrun=now() "
+    int rc=threadConn->ExecuteVoid(
+        "UPDATE pgagent.pga_job SET jobagentid=" + backendPid + ", joblastrun=now() "
         " WHERE jobagentid IS NULL AND jobid=" + jobid);

     if (rc == 1)
     {
-        DBresult *id=serviceConn->Execute(
+        DBresult *id=threadConn->Execute(
             "SELECT nextval('pgagent.pga_joblog_jlgid_seq') AS id");
         if (id)
         {
             logid=id->GetString("id");

-            DBresult *res=serviceConn->Execute(
+            DBresult *res=threadConn->Execute(
                 "INSERT INTO pgagent.pga_joblog(jlgid, jlgjobid, jlgstatus) "
                 "VALUES (" + logid + ", " + jobid + ", 'r')");
             if (res)
@@ -49,7 +49,7 @@
 {
     if (status != "")
     {
-        serviceConn->ExecuteVoid(
+        threadConn->ExecuteVoid(
             "UPDATE pgagent.pga_joblog "
             "   SET jlgstatus='" + status + "', jlgduration=now() - jlgstart "
             " WHERE jlgid=" + logid + ";\n"
@@ -59,13 +59,14 @@
             " WHERE jobid=" + jobid
             );
     }
+    threadConn->Return();
 }


 int Job::Execute()
 {
     int rc=0;
-    DBresult *steps=serviceConn->Execute(
+    DBresult *steps=threadConn->Execute(
         "SELECT jstid, jstkind, jstdbname, jstcode, jstonerror "
         "  FROM pgagent.pga_jobstep "
         " WHERE jstenabled "
@@ -80,17 +81,17 @@

     while (steps->HasData())
     {
-        DBconn *conn;
+        DBconn *stepConn;
         string jslid, stepid, jpecode;

         stepid = steps->GetString("jstid");

-        DBresult *id=serviceConn->Execute(
+        DBresult *id=threadConn->Execute(
             "SELECT nextval('pgagent.pga_jobsteplog_jslid_seq') AS id");
         if (id)
         {
             jslid=id->GetString("id");
-            DBresult *res=serviceConn->Execute(
+            DBresult *res=threadConn->Execute(
                 "INSERT INTO pgagent.pga_jobsteplog(jslid, jsljlgid, jsljstid, jslstatus) "
                 "SELECT " + jslid + ", " + logid + ", " + stepid + ", 'r'"
                 "  FROM pgagent.pga_jobstep WHERE jstid=" + stepid);
@@ -115,11 +116,12 @@
         {
             case 's':
             {
-                conn=DBconn::Get(steps->GetString("jstdbname"));
-                if (conn)
+                stepConn=DBconn::Get(steps->GetString("jstdbname"));
+                if (stepConn)
                 {
                     LogMessage("Executing step " + stepid + " on database " + steps->GetString("jstdbname"),
LOG_DEBUG);
-                    rc=conn->ExecuteVoid(steps->GetString("jstcode"));
+                    rc=stepConn->ExecuteVoid(steps->GetString("jstcode"));
+                    stepConn->Return();
                 }
                 else
                     rc=-1;
@@ -144,7 +146,7 @@
         else
             stepstatus = steps->GetString("jstonerror");

-        rc=serviceConn->ExecuteVoid(
+        rc=threadConn->ExecuteVoid(
             "UPDATE pgagent.pga_jobsteplog "
             "   SET jslduration = now() - jslstart, "
             "       jslresult = " + NumToStr(rc) + ", jslstatus = '" + stepstatus + "' "

Modified: trunk/pgadmin3/xtra/pgagent/misc.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/misc.cpp    2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/misc.cpp    2005-05-19 14:53:32 UTC (rev 4219)
@@ -64,13 +64,6 @@
                         longWait = val;
                     break;
                 }
-                case 'c':
-                {
-                    int val = atoi(getArg(argc, argv).c_str());
-                    if (val >= 5)
-                        connPoolCount = val;
-                    break;
-                }
                 case 'l':
                 {
                     int val = atoi(getArg(argc, argv).c_str());

Modified: trunk/pgadmin3/xtra/pgagent/pgAgent.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/pgAgent.cpp    2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/pgAgent.cpp    2005-05-19 14:53:32 UTC (rev 4219)
@@ -11,6 +11,8 @@

 #include "pgAgent.h"

+#include <wx/thread.h>
+
 #ifdef WIN32
 #include <winsock2.h>
 #else
@@ -19,12 +21,12 @@

 string connectString;
 string serviceDBname;
+string backendPid;
 long longWait=30;
 long shortWait=10;
 long minLogLevel=LOG_ERROR;


-
 int MainRestartLoop(DBconn *serviceConn)
 {
     // clean up old jobs
@@ -65,8 +67,6 @@
             );
     }

-
-
     char hostname[255];
     gethostname(hostname, 255);

@@ -96,18 +96,20 @@

             if (jobid != "")
             {
-                Job job(serviceConn, jobid);
+                DBconn *threadConn=DBconn::Get(serviceDBname);
+                Job job(threadConn, jobid);

                 if (job.Runnable())
                 {
                     foundJobToExecute=true;
                     LogMessage("Running job: " + jobid, LOG_DEBUG);
                     job.Execute();
+                    LogMessage("Completed job: " + jobid, LOG_DEBUG);
                 }
             }
             else
             {
-                LogMessage("No jobs to run - time for a pint :-)", LOG_DEBUG);
+                LogMessage("No jobs to run - sleeping...", LOG_DEBUG);
                 WaitAWhile();
             }
         }
@@ -124,28 +126,34 @@

 void MainLoop()
 {
-    // Basic sanity check
-    LogMessage("Database sanity check", LOG_DEBUG);
-    DBconn *sanityConn=DBconn::Get(serviceDBname, true);
-    DBresult *res=sanityConn->Execute("SELECT count(*) As count FROM pg_class cl JOIN pg_namespace ns ON
ns.oid=relnamespaceWHERE relname='pga_job' AND nspname='pgagent'"); 
-    if (res)
-    {
-        string val=res->GetString("count");
-
-        if (val == "0")
-            LogMessage("Could not find the table 'pgagent.pga_job'. Have you run pgagent.sql on this database?",
LOG_ERROR);
-    }

     // OK, let's get down to business
     do
     {
-        DBconn *serviceConn=DBconn::Get(serviceDBname, true);
+        LogMessage("Creating primary connection", LOG_DEBUG);
+        DBconn *serviceConn=DBconn::InitConnection(connectString);
+
+        if (serviceConn && serviceConn->IsValid())
+        {
+            serviceDBname = serviceConn->GetDBname();

-        if (serviceConn)
-        {
+            // Basic sanity check, and a chance to get the serviceConn's PID
+            LogMessage("Database sanity check", LOG_DEBUG);
+            DBresult *res=serviceConn->Execute("SELECT count(*) As count, pg_backend_pid() AS pid FROM pg_class cl
JOINpg_namespace ns ON ns.oid=relnamespace WHERE relname='pga_job' AND nspname='pgagent'"); 
+            if (res)
+            {
+                string val=res->GetString("count");
+
+                if (val == "0")
+                    LogMessage("Could not find the table 'pgagent.pga_job'. Have you run pgagent.sql on this
database?",LOG_ERROR); 
+
+                backendPid=res->GetString("pid");
+            }
+
             MainRestartLoop(serviceConn);
         }

+        LogMessage("Couldn't create connection: " + serviceConn->GetLastError(), LOG_WARNING);
         DBconn::ClearConnections(true);
         WaitAWhile(true);
     }

Modified: trunk/pgadmin3/xtra/pgagent/pgAgent.dsp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/pgAgent.dsp    2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/pgAgent.dsp    2005-05-19 14:53:32 UTC (rev 4219)
@@ -70,7 +70,7 @@
 # PROP Ignore_Export_Lib 0
 # PROP Target_Dir ""
 # ADD BASE CPP /nologo /W3 /Gm /GX /ZI /Od /I "include" /I "c:/program files/postgresql/8.0/include" /D "WIN32" /D
"_DEBUG"/D "_WINDOWS" /D "_MBCS" /D "_MT" /Yu"pgAgent.h" /FD /GZ /c 
-# ADD CPP /nologo /MT /W3 /GX /O2 /I "include" /I "c:/program files/postgresql/8.0/include" /I
"c:/wxWidgets-2.6/include"/I "c:/wxWidgets-2.6/contrib/include" /D "WIN32" /D "_WINDOWS" /D "_MBCS" /D "_MT" /FR
/Yu"pgamsgevent.h"/FD /c 
+# ADD CPP /nologo /MT /W3 /GX /O2 /I "include" /I "c:/program files/postgresql/8.0/include" /I
"c:/wxWidgets-2.6/include"/I "c:/wxWidgets-2.6/contrib/include" /D "WIN32" /D "_WINDOWS" /D "_MBCS" /D "_MT" /FR
/Yu"pgAgent.h"/FD /c 
 # ADD BASE RSC /l 0x407 /d "_DEBUG"
 # ADD RSC /l 0x809 /i "c:/wxWidgets-2.6/include"
 BSC32=bscmake.exe

Modified: trunk/pgadmin3/xtra/pgagent/unix.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/unix.cpp    2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/unix.cpp    2005-05-19 14:53:32 UTC (rev 4219)
@@ -25,7 +25,6 @@
         "options:\n"
         "-t <poll time interval in seconds (default 10)>\n"
         "-r <retry period after connection abort in seconds (>=10, default 30)>\n"
-        "-c <connection pool size (>=5, default 5)>\n"
         "-l <logging verbosity (ERROR=0, WARNING=1, DEBUG=2, default 0)>\n"
         );
 }
@@ -87,12 +86,6 @@

     setOptions(argc, argv);

-    DBconn *conn=DBconn::InitConnection(connectString);
-    if (!conn->IsValid())
-        LogMessage("Invalid connection: " + conn->GetLastError(), LOG_ERROR);
-
-    serviceDBname = conn->GetDBname();
-
     daemonize();

     MainLoop();

Modified: trunk/pgadmin3/xtra/pgagent/win32.cpp
===================================================================
--- trunk/pgadmin3/xtra/pgagent/win32.cpp    2005-05-19 10:30:28 UTC (rev 4218)
+++ trunk/pgadmin3/xtra/pgagent/win32.cpp    2005-05-19 14:53:32 UTC (rev 4219)
@@ -328,7 +328,6 @@
         "-d <displayname>\n"
         "-t <poll time interval in seconds (default 10)>\n"
         "-r <retry period after connection abort in seconds (>=10, default 30)>\n"
-        "-c <connection pool size (>=5, default 5)>\n"
         "-l <logging verbosity (ERROR=0, WARNING=1, DEBUG=2, default 0)>\n"
         );
 }
@@ -337,19 +336,16 @@

 ////////////////////////////////////////////////////////////

-void setupForRun(int argc, char **argv)
+void setupForRun(int argc, char **argv, bool debug=false)
 {
-    eventHandle = RegisterEventSource(0, serviceName.c_str());
-    if (!eventHandle)
-        LogMessage("Couldn't register event handle.", LOG_ERROR);
+    if (!debug)
+    {
+        eventHandle = RegisterEventSource(0, serviceName.c_str());
+        if (!eventHandle)
+            LogMessage("Couldn't register event handle.", LOG_ERROR);
+    }

     setOptions(argc, argv);
-
-    DBconn *conn=DBconn::InitConnection(connectString);
-    if (!conn->IsValid())
-        LogMessage("Invalid connection: " + conn->GetLastError(), LOG_ERROR);
-
-    serviceDBname = conn->GetDBname();
 }


@@ -368,7 +364,7 @@

     if (command == "INSTALL")
     {
-        string displayname = "pgAgent " + serviceName;
+        string displayname = "PostgreSQL scheduling agent - " + serviceName;
         string arg = executable + " RUN " + serviceName;

         while (argc-- > 0)
@@ -416,7 +412,7 @@
     }
     else if (command == "DEBUG")
     {
-        setupForRun(argc, argv);
+        setupForRun(argc, argv, true);

         initService();
 #if START_SUSPENDED


pgadmin-hackers by date:

Previous
From: Adam H.Pendleton
Date:
Subject: Re: New acinclude.m4
Next
From: Raphaël Enrici
Date:
Subject: Re: New acinclude.m4