SVN Commit by dpage: r4219 - in trunk/pgadmin3/xtra/pgagent: . include - Mailing list pgadmin-hackers
From | svn@pgadmin.org |
---|---|
Subject | SVN Commit by dpage: r4219 - in trunk/pgadmin3/xtra/pgagent: . include |
Date | |
Msg-id | 200505191453.j4JErWtx014110@developer.pgadmin.org Whole thread Raw |
List | pgadmin-hackers |
Author: dpage Date: 2005-05-19 15:53:32 +0100 (Thu, 19 May 2005) New Revision: 4219 Modified: trunk/pgadmin3/xtra/pgagent/connection.cpp trunk/pgadmin3/xtra/pgagent/include/connection.h trunk/pgadmin3/xtra/pgagent/include/job.h trunk/pgadmin3/xtra/pgagent/include/pgAgent.h trunk/pgadmin3/xtra/pgagent/job.cpp trunk/pgadmin3/xtra/pgagent/misc.cpp trunk/pgadmin3/xtra/pgagent/pgAgent.cpp trunk/pgadmin3/xtra/pgagent/pgAgent.dsp trunk/pgadmin3/xtra/pgagent/unix.cpp trunk/pgadmin3/xtra/pgagent/win32.cpp Log: Dynamic connection pooling in preparation for threaded operation. Modified: trunk/pgadmin3/xtra/pgagent/connection.cpp =================================================================== --- trunk/pgadmin3/xtra/pgagent/connection.cpp 2005-05-19 10:30:28 UTC (rev 4218) +++ trunk/pgadmin3/xtra/pgagent/connection.cpp 2005-05-19 14:53:32 UTC (rev 4219) @@ -13,18 +13,15 @@ #include <libpq-fe.h> #include <time.h> - -// entries in the connection pool -int connPoolCount=5; - - -DBconn **DBconn::pool=0; +DBconn *DBconn::primaryConn; string DBconn::basicConnectString; - DBconn::DBconn(const string &name) { dbname = name; + inUse = false; + next=0; + prev=0; Connect(basicConnectString + " dbname=" + dbname); } @@ -32,6 +29,9 @@ DBconn::DBconn(const string &connectString, const string &name) { dbname = name; + inUse = false; + next=0; + prev=0; Connect(connectString); } @@ -65,21 +65,12 @@ DBconn *DBconn::InitConnection(const string &connectString) { - if (!pool) - { - pool = new DBconn*[connPoolCount]; - if (pool) - memset(pool, 0, sizeof(DBconn*) * connPoolCount); - } - if (!pool) - LogMessage("Out of memory for connection pool", LOG_ERROR); - basicConnectString=connectString; string dbname; int pos=basicConnectString.find("dbname="); if (pos == -1) - dbname = "dba"; + dbname = "pgadmin"; else { dbname = basicConnectString.substr(pos+7); @@ -93,89 +84,84 @@ dbname = dbname.substr(0, pos); } } - pool[0] = new DBconn(connectString, dbname); - pool[0]->primary = true; + primaryConn = new DBconn(connectString, dbname); - return pool[0]; + if (!primaryConn) + LogMessage("Failed to create primary connection!", LOG_ERROR); + + primaryConn->inUse = true; + + return primaryConn; } -DBconn *DBconn::Get(const string &dbname, bool asPrimary) +DBconn *DBconn::Get(const string &dbname) { - if (!pool) - { - pool = new DBconn*[connPoolCount]; - if (pool) - memset(pool, 0, sizeof(DBconn*) * connPoolCount); - } - if (!pool) - LogMessage("Out of memory for connection pool", LOG_ERROR); + DBconn *thisConn = primaryConn, *testConn; - int i; - DBconn **emptyConn=0, **oldestConn=0; - // find an existing connection - for (i=0 ; i < connPoolCount ; i++) + do { - if (pool[i]) - { - if (dbname == pool[i]->dbname) - return pool[i]; + if (dbname == thisConn->dbname && !thisConn->inUse) + { + LogMessage("Allocating existing connection to database " + thisConn->dbname, LOG_DEBUG); + thisConn->inUse = true; + return thisConn; + } - // while searching, also mark the oldest non-primary connection - if (!pool[i]->primary) - { - if (!oldestConn || pool[i]->timestamp < (*oldestConn)->timestamp) - oldestConn=pool+i; - } - } - else - { - // while searching, mark the first empty slot - if (!emptyConn) - emptyConn=pool+i; - } - } - if (!emptyConn) - { - delete *oldestConn; - emptyConn=oldestConn; - *emptyConn=0; - } + testConn = thisConn; + if (thisConn->next != 0) + thisConn = thisConn->next; - DBconn *conn=new DBconn(dbname); - if (conn->conn) + } while (testConn->next != 0); + + + // No suitable connection was found, so create a new one. + DBconn *newConn=new DBconn(dbname); + if (newConn->conn) { - *emptyConn=conn; + LogMessage("Allocating new connection to database " + newConn->dbname, LOG_DEBUG); + newConn->inUse = true; + newConn->prev = thisConn; + thisConn->next = newConn; } - return *emptyConn; + else + LogMessage("Failed to create new connection to database: " + dbname, LOG_ERROR); + + return newConn; } +void DBconn::Return() +{ + LogMessage("Returning connection to database " + this->dbname, LOG_DEBUG); + inUse = false; +} void DBconn::ClearConnections(bool all) { - // clears all connections, except for the primary one. - // if all is true, even the primary connection will be killed. - if (pool) - { - int i; - for (i=0 ; i < connPoolCount ; i++) - { - if (pool[i]) - { - if (all || !pool[i]->primary) - { - delete pool[i]; - pool[i]=0; - } - } - } - if (all) - { - delete[] pool; - pool=0; - } - } + if (all) + LogMessage("Clearing all connections", LOG_DEBUG); + else + LogMessage("Clearing all connections except the primary", LOG_DEBUG); + + DBconn *thisConn=primaryConn, *deleteConn; + + // Find the last connection + while (thisConn->next != 0) + thisConn = thisConn->next; + + // Delete connections as required + while (thisConn->prev != 0) + { + deleteConn = thisConn; + thisConn = deleteConn->prev; + delete deleteConn; + thisConn->next = 0; + } + + if (all) + delete thisConn; + } @@ -205,6 +191,18 @@ return rows; } +string DBconn::GetLastError() +{ + // Return the last error message, minus any trailing line ends + if (lastError.substr(lastError.length()-2, 2) == "\r\n") // DOS + return lastError.substr(0, lastError.length()-2); + else if (lastError.substr(lastError.length()-1, 1) == "\n") // Unix + return lastError.substr(0, lastError.length()-1); + else if (lastError.substr(lastError.length()-1, 1) == "\r") // Mac + return lastError.substr(0, lastError.length()-1); + else + return lastError; +} ///////////////////////////////////////////////////////7 Modified: trunk/pgadmin3/xtra/pgagent/include/connection.h =================================================================== --- trunk/pgadmin3/xtra/pgagent/include/connection.h 2005-05-19 10:30:28 UTC (rev 4218) +++ trunk/pgadmin3/xtra/pgagent/include/connection.h 2005-05-19 14:53:32 UTC (rev 4219) @@ -16,7 +16,6 @@ #include <libpq-fe.h> class DBresult; -extern int connPoolCount; class DBconn @@ -27,29 +26,31 @@ ~DBconn(); public: - static DBconn *Get(const string &dbname, bool asPrimary=false); + static DBconn *Get(const string &dbname); static DBconn *InitConnection(const string &connectString); static void ClearConnections(bool allIncludingPrimary=false); static void SetBasicConnectString(const string &bcs) { basicConnectString = bcs; } - string GetLastError() { return lastError; } + + string GetLastError(); string GetDBname() {return dbname; } bool IsValid() { return conn != 0; } DBresult *Execute(const string &query); int ExecuteVoid(const string &query); + void Return(); private: bool DBconn::Connect(const string &connectString); protected: static string basicConnectString; - static DBconn **pool; + static DBconn *primaryConn; string dbname, lastError; PGconn *conn; - long timestamp; - bool primary; + DBconn *next, *prev; + bool inUse; friend class DBresult; Modified: trunk/pgadmin3/xtra/pgagent/include/job.h =================================================================== --- trunk/pgadmin3/xtra/pgagent/include/job.h 2005-05-19 10:30:28 UTC (rev 4218) +++ trunk/pgadmin3/xtra/pgagent/include/job.h 2005-05-19 14:53:32 UTC (rev 4219) @@ -23,7 +23,7 @@ bool Runnable() { return status == "r"; } protected: - DBconn *serviceConn; + DBconn *threadConn; string jobid, logid; string status; }; Modified: trunk/pgadmin3/xtra/pgagent/include/pgAgent.h =================================================================== --- trunk/pgadmin3/xtra/pgagent/include/pgAgent.h 2005-05-19 10:30:28 UTC (rev 4218) +++ trunk/pgadmin3/xtra/pgagent/include/pgAgent.h 2005-05-19 14:53:32 UTC (rev 4219) @@ -29,6 +29,7 @@ extern long minLogLevel; extern string connectString; extern string serviceDBname; +extern string backendPid; // Log levels enum Modified: trunk/pgadmin3/xtra/pgagent/job.cpp =================================================================== --- trunk/pgadmin3/xtra/pgagent/job.cpp 2005-05-19 10:30:28 UTC (rev 4218) +++ trunk/pgadmin3/xtra/pgagent/job.cpp 2005-05-19 14:53:32 UTC (rev 4219) @@ -15,23 +15,23 @@ Job::Job(DBconn *conn, const string &jid) { - serviceConn=conn; + threadConn=conn; jobid=jid; status=""; - int rc=serviceConn->ExecuteVoid( - "UPDATE pgagent.pga_job SET jobagentid=pg_backend_pid(), joblastrun=now() " + int rc=threadConn->ExecuteVoid( + "UPDATE pgagent.pga_job SET jobagentid=" + backendPid + ", joblastrun=now() " " WHERE jobagentid IS NULL AND jobid=" + jobid); if (rc == 1) { - DBresult *id=serviceConn->Execute( + DBresult *id=threadConn->Execute( "SELECT nextval('pgagent.pga_joblog_jlgid_seq') AS id"); if (id) { logid=id->GetString("id"); - DBresult *res=serviceConn->Execute( + DBresult *res=threadConn->Execute( "INSERT INTO pgagent.pga_joblog(jlgid, jlgjobid, jlgstatus) " "VALUES (" + logid + ", " + jobid + ", 'r')"); if (res) @@ -49,7 +49,7 @@ { if (status != "") { - serviceConn->ExecuteVoid( + threadConn->ExecuteVoid( "UPDATE pgagent.pga_joblog " " SET jlgstatus='" + status + "', jlgduration=now() - jlgstart " " WHERE jlgid=" + logid + ";\n" @@ -59,13 +59,14 @@ " WHERE jobid=" + jobid ); } + threadConn->Return(); } int Job::Execute() { int rc=0; - DBresult *steps=serviceConn->Execute( + DBresult *steps=threadConn->Execute( "SELECT jstid, jstkind, jstdbname, jstcode, jstonerror " " FROM pgagent.pga_jobstep " " WHERE jstenabled " @@ -80,17 +81,17 @@ while (steps->HasData()) { - DBconn *conn; + DBconn *stepConn; string jslid, stepid, jpecode; stepid = steps->GetString("jstid"); - DBresult *id=serviceConn->Execute( + DBresult *id=threadConn->Execute( "SELECT nextval('pgagent.pga_jobsteplog_jslid_seq') AS id"); if (id) { jslid=id->GetString("id"); - DBresult *res=serviceConn->Execute( + DBresult *res=threadConn->Execute( "INSERT INTO pgagent.pga_jobsteplog(jslid, jsljlgid, jsljstid, jslstatus) " "SELECT " + jslid + ", " + logid + ", " + stepid + ", 'r'" " FROM pgagent.pga_jobstep WHERE jstid=" + stepid); @@ -115,11 +116,12 @@ { case 's': { - conn=DBconn::Get(steps->GetString("jstdbname")); - if (conn) + stepConn=DBconn::Get(steps->GetString("jstdbname")); + if (stepConn) { LogMessage("Executing step " + stepid + " on database " + steps->GetString("jstdbname"), LOG_DEBUG); - rc=conn->ExecuteVoid(steps->GetString("jstcode")); + rc=stepConn->ExecuteVoid(steps->GetString("jstcode")); + stepConn->Return(); } else rc=-1; @@ -144,7 +146,7 @@ else stepstatus = steps->GetString("jstonerror"); - rc=serviceConn->ExecuteVoid( + rc=threadConn->ExecuteVoid( "UPDATE pgagent.pga_jobsteplog " " SET jslduration = now() - jslstart, " " jslresult = " + NumToStr(rc) + ", jslstatus = '" + stepstatus + "' " Modified: trunk/pgadmin3/xtra/pgagent/misc.cpp =================================================================== --- trunk/pgadmin3/xtra/pgagent/misc.cpp 2005-05-19 10:30:28 UTC (rev 4218) +++ trunk/pgadmin3/xtra/pgagent/misc.cpp 2005-05-19 14:53:32 UTC (rev 4219) @@ -64,13 +64,6 @@ longWait = val; break; } - case 'c': - { - int val = atoi(getArg(argc, argv).c_str()); - if (val >= 5) - connPoolCount = val; - break; - } case 'l': { int val = atoi(getArg(argc, argv).c_str()); Modified: trunk/pgadmin3/xtra/pgagent/pgAgent.cpp =================================================================== --- trunk/pgadmin3/xtra/pgagent/pgAgent.cpp 2005-05-19 10:30:28 UTC (rev 4218) +++ trunk/pgadmin3/xtra/pgagent/pgAgent.cpp 2005-05-19 14:53:32 UTC (rev 4219) @@ -11,6 +11,8 @@ #include "pgAgent.h" +#include <wx/thread.h> + #ifdef WIN32 #include <winsock2.h> #else @@ -19,12 +21,12 @@ string connectString; string serviceDBname; +string backendPid; long longWait=30; long shortWait=10; long minLogLevel=LOG_ERROR; - int MainRestartLoop(DBconn *serviceConn) { // clean up old jobs @@ -65,8 +67,6 @@ ); } - - char hostname[255]; gethostname(hostname, 255); @@ -96,18 +96,20 @@ if (jobid != "") { - Job job(serviceConn, jobid); + DBconn *threadConn=DBconn::Get(serviceDBname); + Job job(threadConn, jobid); if (job.Runnable()) { foundJobToExecute=true; LogMessage("Running job: " + jobid, LOG_DEBUG); job.Execute(); + LogMessage("Completed job: " + jobid, LOG_DEBUG); } } else { - LogMessage("No jobs to run - time for a pint :-)", LOG_DEBUG); + LogMessage("No jobs to run - sleeping...", LOG_DEBUG); WaitAWhile(); } } @@ -124,28 +126,34 @@ void MainLoop() { - // Basic sanity check - LogMessage("Database sanity check", LOG_DEBUG); - DBconn *sanityConn=DBconn::Get(serviceDBname, true); - DBresult *res=sanityConn->Execute("SELECT count(*) As count FROM pg_class cl JOIN pg_namespace ns ON ns.oid=relnamespaceWHERE relname='pga_job' AND nspname='pgagent'"); - if (res) - { - string val=res->GetString("count"); - - if (val == "0") - LogMessage("Could not find the table 'pgagent.pga_job'. Have you run pgagent.sql on this database?", LOG_ERROR); - } // OK, let's get down to business do { - DBconn *serviceConn=DBconn::Get(serviceDBname, true); + LogMessage("Creating primary connection", LOG_DEBUG); + DBconn *serviceConn=DBconn::InitConnection(connectString); + + if (serviceConn && serviceConn->IsValid()) + { + serviceDBname = serviceConn->GetDBname(); - if (serviceConn) - { + // Basic sanity check, and a chance to get the serviceConn's PID + LogMessage("Database sanity check", LOG_DEBUG); + DBresult *res=serviceConn->Execute("SELECT count(*) As count, pg_backend_pid() AS pid FROM pg_class cl JOINpg_namespace ns ON ns.oid=relnamespace WHERE relname='pga_job' AND nspname='pgagent'"); + if (res) + { + string val=res->GetString("count"); + + if (val == "0") + LogMessage("Could not find the table 'pgagent.pga_job'. Have you run pgagent.sql on this database?",LOG_ERROR); + + backendPid=res->GetString("pid"); + } + MainRestartLoop(serviceConn); } + LogMessage("Couldn't create connection: " + serviceConn->GetLastError(), LOG_WARNING); DBconn::ClearConnections(true); WaitAWhile(true); } Modified: trunk/pgadmin3/xtra/pgagent/pgAgent.dsp =================================================================== --- trunk/pgadmin3/xtra/pgagent/pgAgent.dsp 2005-05-19 10:30:28 UTC (rev 4218) +++ trunk/pgadmin3/xtra/pgagent/pgAgent.dsp 2005-05-19 14:53:32 UTC (rev 4219) @@ -70,7 +70,7 @@ # PROP Ignore_Export_Lib 0 # PROP Target_Dir "" # ADD BASE CPP /nologo /W3 /Gm /GX /ZI /Od /I "include" /I "c:/program files/postgresql/8.0/include" /D "WIN32" /D "_DEBUG"/D "_WINDOWS" /D "_MBCS" /D "_MT" /Yu"pgAgent.h" /FD /GZ /c -# ADD CPP /nologo /MT /W3 /GX /O2 /I "include" /I "c:/program files/postgresql/8.0/include" /I "c:/wxWidgets-2.6/include"/I "c:/wxWidgets-2.6/contrib/include" /D "WIN32" /D "_WINDOWS" /D "_MBCS" /D "_MT" /FR /Yu"pgamsgevent.h"/FD /c +# ADD CPP /nologo /MT /W3 /GX /O2 /I "include" /I "c:/program files/postgresql/8.0/include" /I "c:/wxWidgets-2.6/include"/I "c:/wxWidgets-2.6/contrib/include" /D "WIN32" /D "_WINDOWS" /D "_MBCS" /D "_MT" /FR /Yu"pgAgent.h"/FD /c # ADD BASE RSC /l 0x407 /d "_DEBUG" # ADD RSC /l 0x809 /i "c:/wxWidgets-2.6/include" BSC32=bscmake.exe Modified: trunk/pgadmin3/xtra/pgagent/unix.cpp =================================================================== --- trunk/pgadmin3/xtra/pgagent/unix.cpp 2005-05-19 10:30:28 UTC (rev 4218) +++ trunk/pgadmin3/xtra/pgagent/unix.cpp 2005-05-19 14:53:32 UTC (rev 4219) @@ -25,7 +25,6 @@ "options:\n" "-t <poll time interval in seconds (default 10)>\n" "-r <retry period after connection abort in seconds (>=10, default 30)>\n" - "-c <connection pool size (>=5, default 5)>\n" "-l <logging verbosity (ERROR=0, WARNING=1, DEBUG=2, default 0)>\n" ); } @@ -87,12 +86,6 @@ setOptions(argc, argv); - DBconn *conn=DBconn::InitConnection(connectString); - if (!conn->IsValid()) - LogMessage("Invalid connection: " + conn->GetLastError(), LOG_ERROR); - - serviceDBname = conn->GetDBname(); - daemonize(); MainLoop(); Modified: trunk/pgadmin3/xtra/pgagent/win32.cpp =================================================================== --- trunk/pgadmin3/xtra/pgagent/win32.cpp 2005-05-19 10:30:28 UTC (rev 4218) +++ trunk/pgadmin3/xtra/pgagent/win32.cpp 2005-05-19 14:53:32 UTC (rev 4219) @@ -328,7 +328,6 @@ "-d <displayname>\n" "-t <poll time interval in seconds (default 10)>\n" "-r <retry period after connection abort in seconds (>=10, default 30)>\n" - "-c <connection pool size (>=5, default 5)>\n" "-l <logging verbosity (ERROR=0, WARNING=1, DEBUG=2, default 0)>\n" ); } @@ -337,19 +336,16 @@ //////////////////////////////////////////////////////////// -void setupForRun(int argc, char **argv) +void setupForRun(int argc, char **argv, bool debug=false) { - eventHandle = RegisterEventSource(0, serviceName.c_str()); - if (!eventHandle) - LogMessage("Couldn't register event handle.", LOG_ERROR); + if (!debug) + { + eventHandle = RegisterEventSource(0, serviceName.c_str()); + if (!eventHandle) + LogMessage("Couldn't register event handle.", LOG_ERROR); + } setOptions(argc, argv); - - DBconn *conn=DBconn::InitConnection(connectString); - if (!conn->IsValid()) - LogMessage("Invalid connection: " + conn->GetLastError(), LOG_ERROR); - - serviceDBname = conn->GetDBname(); } @@ -368,7 +364,7 @@ if (command == "INSTALL") { - string displayname = "pgAgent " + serviceName; + string displayname = "PostgreSQL scheduling agent - " + serviceName; string arg = executable + " RUN " + serviceName; while (argc-- > 0) @@ -416,7 +412,7 @@ } else if (command == "DEBUG") { - setupForRun(argc, argv); + setupForRun(argc, argv, true); initService(); #if START_SUSPENDED
pgadmin-hackers by date: