*** a/doc/src/sgml/ref/vacuumdb.sgml
--- b/doc/src/sgml/ref/vacuumdb.sgml
***************
*** 204,209 **** PostgreSQL documentation
--- 204,230 ----
+
+
+
+
+ Number of concurrent connections to perform the operation.
+ This option will enable the vacuum operation to run on asynchronous
+ connections, at a time one table will be operated on one connection.
+ So at one time as many tables will be vacuumed parallely as number of
+ jobs. If number of jobs given are more than number of tables then
+ number of jobs will be set to number of tables.
+
+
+ vacuumdb will open
+ njobs connections to the
+ database, so make sure your
+ setting is high enough to accommodate all connections.
+
+
+
+
+
*** a/src/bin/scripts/common.c
--- b/src/bin/scripts/common.c
***************
*** 19,28 ****
#include "common.h"
! static void SetCancelConn(PGconn *conn);
! static void ResetCancelConn(void);
static PGcancel *volatile cancelConn = NULL;
#ifdef WIN32
static CRITICAL_SECTION cancelConnLock;
--- 19,28 ----
#include "common.h"
!
static PGcancel *volatile cancelConn = NULL;
+ static bool inAbort = false;
#ifdef WIN32
static CRITICAL_SECTION cancelConnLock;
***************
*** 291,297 **** yesno_prompt(const char *question)
*
* Set cancelConn to point to the current database connection.
*/
! static void
SetCancelConn(PGconn *conn)
{
PGcancel *oldCancelConn;
--- 291,297 ----
*
* Set cancelConn to point to the current database connection.
*/
! void
SetCancelConn(PGconn *conn)
{
PGcancel *oldCancelConn;
***************
*** 321,327 **** SetCancelConn(PGconn *conn)
*
* Free the current cancel connection, if any, and set to NULL.
*/
! static void
ResetCancelConn(void)
{
PGcancel *oldCancelConn;
--- 321,327 ----
*
* Free the current cancel connection, if any, and set to NULL.
*/
! void
ResetCancelConn(void)
{
PGcancel *oldCancelConn;
***************
*** 358,363 **** handle_sigint(SIGNAL_ARGS)
--- 358,364 ----
/* Send QueryCancel if we are processing a database query */
if (cancelConn != NULL)
{
+ inAbort = true;
if (PQcancel(cancelConn, errbuf, sizeof(errbuf)))
fprintf(stderr, _("Cancel request sent\n"));
else
***************
*** 391,396 **** consoleHandler(DWORD dwCtrlType)
--- 392,399 ----
EnterCriticalSection(&cancelConnLock);
if (cancelConn != NULL)
{
+ inAbort = true;
+
if (PQcancel(cancelConn, errbuf, sizeof(errbuf)))
fprintf(stderr, _("Cancel request sent\n"));
else
***************
*** 414,416 **** setup_cancel_handler(void)
--- 417,424 ----
}
#endif /* WIN32 */
+
+ bool in_abort()
+ {
+ return inAbort;
+ }
*** a/src/bin/scripts/common.h
--- b/src/bin/scripts/common.h
***************
*** 49,52 **** extern bool yesno_prompt(const char *question);
--- 49,57 ----
extern void setup_cancel_handler(void);
+ extern void SetCancelConn(PGconn *conn);
+ extern void ResetCancelConn(void);
+ extern bool in_abort(void);
+
+
#endif /* COMMON_H */
*** a/src/bin/scripts/vacuumdb.c
--- b/src/bin/scripts/vacuumdb.c
***************
*** 14,19 ****
--- 14,30 ----
#include "common.h"
#include "dumputils.h"
+ #define NO_SLOT (-1)
+
+ /* Arguments needed for a worker process */
+ typedef struct ParallelSlot
+ {
+ PGconn *connection;
+ bool isFree;
+ pgsocket sock;
+ } ParallelSlot;
+
+ #define ERRCODE_UNDEFINED_TABLE "42P01"
static void vacuum_one_database(const char *dbname, bool full, bool verbose,
bool and_analyze, bool analyze_only, bool analyze_in_stages, int stage, bool freeze,
***************
*** 25,34 **** static void vacuum_all_databases(bool full, bool verbose, bool and_analyze,
const char *maintenance_db,
const char *host, const char *port,
const char *username, enum trivalue prompt_password,
! const char *progname, bool echo, bool quiet);
static void help(const char *progname);
int
main(int argc, char *argv[])
--- 36,74 ----
const char *maintenance_db,
const char *host, const char *port,
const char *username, enum trivalue prompt_password,
! const char *progname, bool echo, bool quiet,
! int concurrentCons);
static void help(const char *progname);
+ void vacuum_parallel(const char *dbname, bool full, bool verbose,
+ bool and_analyze, bool analyze_only, bool analyze_in_stages,
+ int stage, bool freeze, const char *host, const char *port,
+ const char *username, enum trivalue prompt_password,
+ const char *progname, bool echo, int concurrentCons,
+ SimpleStringList *tables, bool quiet);
+
+
+ void prepare_command(PGconn *conn, bool full, bool verbose, bool and_analyze,
+ bool analyze_only, bool freeze, PQExpBuffer sql);
+ static void
+ run_parallel_vacuum(bool echo, const char *dbname, SimpleStringList *tables,
+ bool full, bool verbose, bool and_analyze,
+ bool analyze_only, bool freeze, int concurrentCons,
+ const char *progname, int analyze_stage,
+ ParallelSlot *connSlot, bool completedb);
+ static int
+ GetIdleSlot(ParallelSlot *pSlot, int max_slot, const char *dbname,
+ const char *progname, bool completedb);
+
+ static bool GetQueryResult(PGconn *conn, const char *dbname,
+ const char *progname, bool completedb);
+
+ static int
+ select_loop(int maxFd, fd_set *workerset);
+
+ static void DisconnectDatabase(ParallelSlot *slot);
+
int
main(int argc, char *argv[])
***************
*** 49,54 **** main(int argc, char *argv[])
--- 89,95 ----
{"table", required_argument, NULL, 't'},
{"full", no_argument, NULL, 'f'},
{"verbose", no_argument, NULL, 'v'},
+ {"jobs", required_argument, NULL, 'j'},
{"maintenance-db", required_argument, NULL, 2},
{"analyze-in-stages", no_argument, NULL, 3},
{NULL, 0, NULL, 0}
***************
*** 74,86 **** main(int argc, char *argv[])
bool full = false;
bool verbose = false;
SimpleStringList tables = {NULL, NULL};
progname = get_progname(argv[0]);
set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pgscripts"));
handle_help_version_opts(argc, argv, "vacuumdb", help);
! while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fv", long_options, &optindex)) != -1)
{
switch (c)
{
--- 115,129 ----
bool full = false;
bool verbose = false;
SimpleStringList tables = {NULL, NULL};
+ int concurrentCons = 0;
+ int tbl_count = 0;
progname = get_progname(argv[0]);
set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pgscripts"));
handle_help_version_opts(argc, argv, "vacuumdb", help);
! while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fv:j:", long_options, &optindex)) != -1)
{
switch (c)
{
***************
*** 121,134 **** main(int argc, char *argv[])
--- 164,190 ----
alldb = true;
break;
case 't':
+ {
simple_string_list_append(&tables, optarg);
+ tbl_count++;
break;
+ }
case 'f':
full = true;
break;
case 'v':
verbose = true;
break;
+ case 'j':
+ concurrentCons = atoi(optarg);
+ if (concurrentCons <= 0)
+ {
+ fprintf(stderr, _("%s: Number of parallel \"jobs\" should be at least 1\n"),
+ progname);
+ exit(1);
+ }
+
+ break;
case 2:
maintenance_db = pg_strdup(optarg);
break;
***************
*** 141,146 **** main(int argc, char *argv[])
--- 197,203 ----
}
}
+ optind++;
/*
* Non-option argument specifies database name as long as it wasn't
***************
*** 179,184 **** main(int argc, char *argv[])
--- 236,248 ----
setup_cancel_handler();
+ /*
+ * When user is giving the table list, and list is smaller then
+ * number of tables
+ */
+ if (tbl_count && (concurrentCons > tbl_count))
+ concurrentCons = tbl_count;
+
if (alldb)
{
if (dbname)
***************
*** 196,202 **** main(int argc, char *argv[])
vacuum_all_databases(full, verbose, and_analyze, analyze_only, analyze_in_stages, freeze,
maintenance_db, host, port, username,
! prompt_password, progname, echo, quiet);
}
else
{
--- 260,266 ----
vacuum_all_databases(full, verbose, and_analyze, analyze_only, analyze_in_stages, freeze,
maintenance_db, host, port, username,
! prompt_password, progname, echo, quiet, concurrentCons);
}
else
{
***************
*** 210,234 **** main(int argc, char *argv[])
dbname = get_user_name_or_exit(progname);
}
! if (tables.head != NULL)
{
! SimpleStringListCell *cell;
- for (cell = tables.head; cell; cell = cell->next)
- {
- vacuum_one_database(dbname, full, verbose, and_analyze,
- analyze_only, analyze_in_stages, -1,
- freeze, cell->val,
- host, port, username, prompt_password,
- progname, echo, quiet);
- }
}
else
! vacuum_one_database(dbname, full, verbose, and_analyze,
analyze_only, analyze_in_stages, -1,
freeze, NULL,
host, port, username, prompt_password,
progname, echo, quiet);
}
exit(0);
--- 274,309 ----
dbname = get_user_name_or_exit(progname);
}
! if (concurrentCons > 1)
{
! vacuum_parallel(dbname, full, verbose, and_analyze,
! analyze_only, analyze_in_stages, -1,
! freeze, host, port, username, prompt_password,
! progname, echo, concurrentCons, &tables, quiet);
}
else
! {
! if (tables.head != NULL)
! {
! SimpleStringListCell *cell;
!
! for (cell = tables.head; cell; cell = cell->next)
! {
! vacuum_one_database(dbname, full, verbose, and_analyze,
! analyze_only, analyze_in_stages, -1,
! freeze, cell->val,
! host, port, username, prompt_password,
! progname, echo, quiet);
! }
! }
! else
! vacuum_one_database(dbname, full, verbose, and_analyze,
analyze_only, analyze_in_stages, -1,
freeze, NULL,
host, port, username, prompt_password,
progname, echo, quiet);
+ }
}
exit(0);
***************
*** 268,323 **** vacuum_one_database(const char *dbname, bool full, bool verbose, bool and_analyz
conn = connectDatabase(dbname, host, port, username, prompt_password,
progname, false);
! if (analyze_only)
! {
! appendPQExpBufferStr(&sql, "ANALYZE");
! if (verbose)
! appendPQExpBufferStr(&sql, " VERBOSE");
! }
! else
! {
! appendPQExpBufferStr(&sql, "VACUUM");
! if (PQserverVersion(conn) >= 90000)
! {
! const char *paren = " (";
! const char *comma = ", ";
! const char *sep = paren;
- if (full)
- {
- appendPQExpBuffer(&sql, "%sFULL", sep);
- sep = comma;
- }
- if (freeze)
- {
- appendPQExpBuffer(&sql, "%sFREEZE", sep);
- sep = comma;
- }
- if (verbose)
- {
- appendPQExpBuffer(&sql, "%sVERBOSE", sep);
- sep = comma;
- }
- if (and_analyze)
- {
- appendPQExpBuffer(&sql, "%sANALYZE", sep);
- sep = comma;
- }
- if (sep != paren)
- appendPQExpBufferStr(&sql, ")");
- }
- else
- {
- if (full)
- appendPQExpBufferStr(&sql, " FULL");
- if (freeze)
- appendPQExpBufferStr(&sql, " FREEZE");
- if (verbose)
- appendPQExpBufferStr(&sql, " VERBOSE");
- if (and_analyze)
- appendPQExpBufferStr(&sql, " ANALYZE");
- }
- }
if (table)
appendPQExpBuffer(&sql, " %s", table);
appendPQExpBufferStr(&sql, ";");
--- 343,351 ----
conn = connectDatabase(dbname, host, port, username, prompt_password,
progname, false);
! prepare_command(conn, full, verbose,
! and_analyze, analyze_only, freeze, &sql);
if (table)
appendPQExpBuffer(&sql, " %s", table);
appendPQExpBufferStr(&sql, ";");
***************
*** 353,360 **** vacuum_one_database(const char *dbname, bool full, bool verbose, bool and_analyz
}
else
{
! /* Otherwise, we got a stage from vacuum_all_databases(), so run
! * only that one. */
if (!quiet)
{
puts(gettext(stage_messages[stage]));
--- 381,390 ----
}
else
{
! /*
! * Otherwise, we got a stage from vacuum_all_databases(), so run
! * only that one.
! */
if (!quiet)
{
puts(gettext(stage_messages[stage]));
***************
*** 374,384 **** vacuum_one_database(const char *dbname, bool full, bool verbose, bool and_analyz
static void
! vacuum_all_databases(bool full, bool verbose, bool and_analyze, bool analyze_only,
! bool analyze_in_stages, bool freeze, const char *maintenance_db,
! const char *host, const char *port,
! const char *username, enum trivalue prompt_password,
! const char *progname, bool echo, bool quiet)
{
PGconn *conn;
PGresult *result;
--- 404,415 ----
static void
! vacuum_all_databases(bool full, bool verbose, bool and_analyze,
! bool analyze_only, bool analyze_in_stages, bool freeze,
! const char *maintenance_db, const char *host,
! const char *port, const char *username,
! enum trivalue prompt_password, const char *progname,
! bool echo, bool quiet, int concurrentCons)
{
PGconn *conn;
PGresult *result;
***************
*** 407,412 **** vacuum_all_databases(bool full, bool verbose, bool and_analyze, bool analyze_onl
--- 438,452 ----
fflush(stdout);
}
+ if (concurrentCons > 1)
+ {
+ vacuum_parallel(dbname, full, verbose, and_analyze,
+ analyze_only, analyze_in_stages, stage,
+ freeze, host, port, username, prompt_password,
+ progname, echo, concurrentCons, NULL, quiet);
+
+ }
+ else
vacuum_one_database(dbname, full, verbose, and_analyze, analyze_only,
analyze_in_stages, stage,
freeze, NULL, host, port, username, prompt_password,
***************
*** 417,422 **** vacuum_all_databases(bool full, bool verbose, bool and_analyze, bool analyze_onl
--- 457,958 ----
PQclear(result);
}
+ /*
+ * run_parallel_vacuum
+ * This function process the table list,
+ * pick the object one by one and get the Free connections slot, once it
+ * get the free slot send the job on the free connection.
+ */
+
+ static void
+ run_parallel_vacuum(bool echo, const char *dbname, SimpleStringList *tables,
+ bool full, bool verbose, bool and_analyze,
+ bool analyze_only, bool freeze, int concurrentCons,
+ const char *progname, int analyze_stage,
+ ParallelSlot *connSlot, bool completedb)
+ {
+ PQExpBufferData sql;
+ SimpleStringListCell *cell;
+ int max_slot = concurrentCons;
+ int i;
+ int free_slot;
+ PGconn *slotconn;
+ bool error = false;
+ const char *stage_commands[] = {
+ "SET default_statistics_target=1; SET vacuum_cost_delay=0;",
+ "SET default_statistics_target=10; RESET vacuum_cost_delay;",
+ "RESET default_statistics_target;"};
+
+ initPQExpBuffer(&sql);
+
+ if (analyze_stage >= 0)
+ {
+ for (i = 0; i < max_slot; i++)
+ {
+ executeCommand(connSlot[i].connection,
+ stage_commands[analyze_stage], progname, echo);
+ }
+ }
+
+ for (cell = tables->head; cell; cell = cell->next)
+ {
+ /*
+ * This will give the free connection slot, if no slot is free it will
+ * wait for atleast one slot to get free.
+ */
+ free_slot = GetIdleSlot(connSlot, max_slot, dbname, progname,
+ completedb);
+ if (free_slot == NO_SLOT)
+ {
+ error = true;
+ goto fail;
+ }
+
+ prepare_command(connSlot[free_slot].connection, full, verbose,
+ and_analyze, analyze_only, freeze, &sql);
+
+ appendPQExpBuffer(&sql, " %s", cell->val);
+ appendPQExpBufferStr(&sql, ";");
+
+ connSlot[free_slot].isFree = false;
+
+ slotconn = connSlot[free_slot].connection;
+ PQsendQuery(slotconn, sql.data);
+
+ resetPQExpBuffer(&sql);
+ }
+
+ for (i = 0; i < max_slot; i++)
+ {
+ /* wait for all connection to return the results*/
+ if (!GetQueryResult(connSlot[i].connection, dbname, progname,
+ completedb))
+ {
+ error = true;
+ goto fail;
+ }
+
+ connSlot[i].isFree = true;
+ }
+
+ fail:
+
+ termPQExpBuffer(&sql);
+
+ if (error)
+ {
+ for (i = 0; i < max_slot; i++)
+ {
+ DisconnectDatabase(&connSlot[i]);
+ }
+
+ pfree(connSlot);
+
+ exit(1);
+ }
+ }
+
+ /*
+ * GetIdleSlot
+ * Process the slot list, if any free slot available return the slotid
+ * If no slot is free, Then perform select on all the socket and wait until
+ * atleast one slot is free.
+ */
+ static int
+ GetIdleSlot(ParallelSlot *pSlot, int max_slot, const char *dbname,
+ const char *progname, bool completedb)
+ {
+ int i;
+ fd_set slotset;
+ int firstFree = -1;
+ pgsocket maxFd;
+
+ for (i = 0; i < max_slot; i++)
+ if (pSlot[i].isFree)
+ return i;
+
+ FD_ZERO(&slotset);
+
+ maxFd = pSlot[0].sock;
+
+ for (i = 0; i < max_slot; i++)
+ {
+ FD_SET(pSlot[i].sock, &slotset);
+ if (pSlot[i].sock > maxFd)
+ maxFd = pSlot[i].sock;
+ }
+
+ /*
+ * Some of the slot are free, Process the results for slots whichever
+ * are free
+ */
+
+ do
+ {
+ SetCancelConn(pSlot[0].connection);
+
+ i = select_loop(maxFd, &slotset);
+
+ ResetCancelConn();
+
+ if (i < 0)
+ {
+ /*
+ * This can only happen if user has sent the cancel request using
+ * Ctrl+C, Cancel is handled by 0th slot, so fetch the error result.
+ */
+
+ GetQueryResult(pSlot[0].connection, dbname, progname,
+ completedb);
+ return NO_SLOT;
+ }
+
+ Assert(i != 0);
+
+ for (i = 0; i < max_slot; i++)
+ {
+ if (!FD_ISSET(pSlot[i].sock, &slotset))
+ continue;
+
+ PQconsumeInput(pSlot[i].connection);
+ if (PQisBusy(pSlot[i].connection))
+ continue;
+
+ pSlot[i].isFree = true;
+
+ if (!GetQueryResult(pSlot[i].connection, dbname, progname,
+ completedb))
+ return NO_SLOT;
+
+ if (firstFree < 0)
+ firstFree = i;
+ }
+ }while(firstFree < 0);
+
+ return firstFree;
+ }
+
+ /*
+ * GetQueryResult
+ * Process the query result.
+ */
+
+ static bool GetQueryResult(PGconn *conn, const char *dbname,
+ const char *progname, bool completedb)
+ {
+ PGresult *result = NULL;
+ PGresult *lastResult = NULL;
+ bool r;
+
+
+ SetCancelConn(conn);
+ while((result = PQgetResult(conn)) != NULL)
+ {
+ PQclear(lastResult);
+ lastResult = result;
+ }
+
+ ResetCancelConn();
+
+ if (!lastResult)
+ return true;
+
+ r = (PQresultStatus(lastResult) == PGRES_COMMAND_OK);
+
+ /*
+ * If user has given the vacuum of complete db, then if
+ * any of the object's vacuum failed it can be ignored and vacuuming
+ * of other objects can be continued, this is the same behavior as
+ * vacuuming of complete db is handled without --jobs option
+ */
+ if (!r)
+ {
+ char *sqlState = PQresultErrorField(lastResult, PG_DIAG_SQLSTATE);
+
+ if (!completedb ||
+ (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0))
+ {
+
+ fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
+ progname, dbname, PQerrorMessage(conn));
+
+ PQclear(lastResult);
+ return false;
+ }
+ }
+
+ PQclear(lastResult);
+ return true;
+ }
+
+ /*
+ * vacuum_parallel
+ * This function will open the multiple concurrent connections as
+ * suggested by user, it will derive the table list using server call
+ * if table list is not given by user and perform the vacuum call
+ */
+
+ void
+ vacuum_parallel(const char *dbname, bool full, bool verbose,
+ bool and_analyze, bool analyze_only, bool analyze_in_stages,
+ int stage, bool freeze, const char *host, const char *port,
+ const char *username, enum trivalue prompt_password,
+ const char *progname, bool echo, int concurrentCons,
+ SimpleStringList *tables, bool quiet)
+ {
+
+ PGconn *conn;
+ int i;
+ ParallelSlot *connSlot;
+ SimpleStringList dbtables = {NULL, NULL};
+ bool completeDb = false;
+
+ conn = connectDatabase(dbname, host, port, username,
+ prompt_password, progname, false);
+
+ /*
+ * if table list is not provided then we need to do vaccum for whole DB
+ * get the list of all tables and prepare the list
+ */
+ if (!tables || !tables->head)
+ {
+ PGresult *res;
+ int ntuple;
+ int i;
+ PQExpBufferData sql;
+
+ initPQExpBuffer(&sql);
+
+ res = executeQuery(conn,
+ "SELECT c.relname, ns.nspname FROM pg_class c, pg_namespace ns"
+ " WHERE (relkind = \'r\' or relkind = \'m\')"
+ " and c.relnamespace = ns.oid ORDER BY c.relpages desc",
+ progname, echo);
+
+ ntuple = PQntuples(res);
+ for (i = 0; i < ntuple; i++)
+ {
+ appendPQExpBuffer(&sql, "%s",
+ fmtQualifiedId(PQserverVersion(conn),
+ PQgetvalue(res, i, 1),
+ PQgetvalue(res, i, 0)));
+
+ simple_string_list_append(&dbtables, sql.data);
+ resetPQExpBuffer(&sql);
+ }
+
+ termPQExpBuffer(&sql);
+ tables = &dbtables;
+
+ /* Vaccuming full database*/
+ completeDb = true;
+
+ if (concurrentCons > ntuple)
+ concurrentCons = ntuple;
+ }
+
+ connSlot = (ParallelSlot*)pg_malloc(concurrentCons * sizeof(ParallelSlot));
+ connSlot[0].connection = conn;
+ connSlot[0].sock = PQsocket(conn);
+
+ PQsetnonblocking(connSlot[0].connection, 1);
+
+ for (i = 1; i < concurrentCons; i++)
+ {
+ connSlot[i].connection = connectDatabase(dbname, host, port, username,
+ prompt_password, progname, false);
+
+ PQsetnonblocking(connSlot[i].connection, 1);
+ connSlot[i].isFree = true;
+ connSlot[i].sock = PQsocket(connSlot[i].connection);
+ }
+
+ if (analyze_in_stages)
+ {
+ const char *stage_messages[] = {
+ gettext_noop("Generating minimal optimizer statistics (1 target)"),
+ gettext_noop("Generating medium optimizer statistics (10 targets)"),
+ gettext_noop("Generating default (full) optimizer statistics")
+ };
+
+ if (stage == -1)
+ {
+ int i;
+ for (i = 0; i < 3; i++)
+ {
+ if (!quiet)
+ {
+ puts(gettext(stage_messages[i]));
+ fflush(stdout);
+ }
+
+ run_parallel_vacuum(echo, dbname, tables, full, verbose,
+ and_analyze, analyze_only, freeze, concurrentCons,
+ progname, i, connSlot, completeDb);
+ }
+ }
+ else
+ {
+ /* Otherwise, we got a stage from vacuum_all_databases(), so run
+ * only that one. */
+ if (!quiet)
+ {
+ puts(gettext(stage_messages[stage]));
+ fflush(stdout);
+ }
+
+ run_parallel_vacuum(echo, dbname, tables, full, verbose,
+ and_analyze, analyze_only, freeze, concurrentCons,
+ progname, stage, connSlot, completeDb);
+ }
+ }
+ else
+ {
+ run_parallel_vacuum(echo, dbname, tables, full, verbose,
+ and_analyze, analyze_only, freeze,
+ concurrentCons, progname, -1, connSlot, completeDb);
+ }
+
+ for (i = 0; i < concurrentCons; i++)
+ {
+ PQfinish(connSlot[i].connection);
+ }
+
+ pfree(connSlot);
+ }
+
+ /*
+ * A select loop that repeats calling select until a descriptor in the read
+ * set becomes readable. On Windows we have to check for the termination event
+ * from time to time, on Unix we can just block forever.
+ */
+ static int
+ select_loop(int maxFd, fd_set *workerset)
+ {
+ int i;
+ fd_set saveSet = *workerset;
+
+ #ifdef WIN32
+ /* should always be the master */
+ for (;;)
+ {
+ /*
+ * sleep a quarter of a second before checking if we should terminate.
+ */
+ struct timeval tv = {0, 250000};
+
+ *workerset = saveSet;
+ i = select(maxFd + 1, workerset, NULL, NULL, &tv);
+
+ if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
+ continue;
+ if (i)
+ break;
+ }
+ #else /* UNIX */
+
+ for (;;)
+ {
+ *workerset = saveSet;
+ i = select(maxFd + 1, workerset, NULL, NULL, NULL);
+
+ if (in_abort())
+ {
+ return -1;
+ }
+
+ if (i < 0 && errno == EINTR)
+ continue;
+ break;
+ }
+ #endif
+
+ return i;
+ }
+
+ /*
+ * DisconnectDatabase
+ * disconnect all the connections.
+ */
+ void
+ DisconnectDatabase(ParallelSlot *slot)
+ {
+ PGcancel *cancel;
+ char errbuf[1];
+
+ if (!slot->connection)
+ return;
+
+ if (PQtransactionStatus(slot->connection) == PQTRANS_ACTIVE)
+ {
+ if ((cancel = PQgetCancel(slot->connection)))
+ {
+ PQcancel(cancel, errbuf, sizeof(errbuf));
+ PQfreeCancel(cancel);
+ }
+ }
+
+ PQfinish(slot->connection);
+ slot->connection= NULL;
+ }
+
+
+
+ void prepare_command(PGconn *conn, bool full, bool verbose, bool and_analyze,
+ bool analyze_only, bool freeze, PQExpBuffer sql)
+ {
+ if (analyze_only)
+ {
+ appendPQExpBufferStr(sql, "ANALYZE");
+ if (verbose)
+ appendPQExpBufferStr(sql, " VERBOSE");
+ }
+ else
+ {
+ appendPQExpBufferStr(sql, "VACUUM");
+ if (PQserverVersion(conn) >= 90000)
+ {
+ const char *paren = " (";
+ const char *comma = ", ";
+ const char *sep = paren;
+
+ if (full)
+ {
+ appendPQExpBuffer(sql, "%sFULL", sep);
+ sep = comma;
+ }
+ if (freeze)
+ {
+ appendPQExpBuffer(sql, "%sFREEZE", sep);
+ sep = comma;
+ }
+ if (verbose)
+ {
+ appendPQExpBuffer(sql, "%sVERBOSE", sep);
+ sep = comma;
+ }
+ if (and_analyze)
+ {
+ appendPQExpBuffer(sql, "%sANALYZE", sep);
+ sep = comma;
+ }
+ if (sep != paren)
+ appendPQExpBufferStr(sql, ")");
+ }
+ else
+ {
+ if (full)
+ appendPQExpBufferStr(sql, " FULL");
+ if (freeze)
+ appendPQExpBufferStr(sql, " FREEZE");
+ if (verbose)
+ appendPQExpBufferStr(sql, " VERBOSE");
+ if (and_analyze)
+ appendPQExpBufferStr(sql, " ANALYZE");
+ }
+ }
+ }
+
static void
help(const char *progname)
***************
*** 436,441 **** help(const char *progname)
--- 972,978 ----
printf(_(" -V, --version output version information, then exit\n"));
printf(_(" -z, --analyze update optimizer statistics\n"));
printf(_(" -Z, --analyze-only only update optimizer statistics\n"));
+ printf(_(" -j, --jobs=NUM use this many concurrent connections to vacuum\n"));
printf(_(" --analyze-in-stages only update optimizer statistics, in multiple\n"
" stages for faster results\n"));
printf(_(" -?, --help show this help, then exit\n"));