*** a/doc/src/sgml/ref/vacuumdb.sgml
--- b/doc/src/sgml/ref/vacuumdb.sgml
***************
*** 224,229 **** PostgreSQL documentation
--- 224,239 ----
+
+
+
+
+ Number of parallel process to perform the operation.
+
+
+
+
+
*** a/src/bin/scripts/Makefile
--- b/src/bin/scripts/Makefile
***************
*** 32,38 **** dropdb: dropdb.o common.o dumputils.o kwlookup.o keywords.o | submake-libpq subm
droplang: droplang.o common.o print.o mbprint.o | submake-libpq submake-libpgport
dropuser: dropuser.o common.o dumputils.o kwlookup.o keywords.o | submake-libpq submake-libpgport
clusterdb: clusterdb.o common.o dumputils.o kwlookup.o keywords.o | submake-libpq submake-libpgport
! vacuumdb: vacuumdb.o common.o dumputils.o kwlookup.o keywords.o | submake-libpq submake-libpgport
reindexdb: reindexdb.o common.o dumputils.o kwlookup.o keywords.o | submake-libpq submake-libpgport
pg_isready: pg_isready.o common.o | submake-libpq submake-libpgport
--- 32,38 ----
droplang: droplang.o common.o print.o mbprint.o | submake-libpq submake-libpgport
dropuser: dropuser.o common.o dumputils.o kwlookup.o keywords.o | submake-libpq submake-libpgport
clusterdb: clusterdb.o common.o dumputils.o kwlookup.o keywords.o | submake-libpq submake-libpgport
! vacuumdb: vacuumdb.o vac_parallel.o common.o dumputils.o kwlookup.o keywords.o | submake-libpq submake-libpgport
reindexdb: reindexdb.o common.o dumputils.o kwlookup.o keywords.o | submake-libpq submake-libpgport
pg_isready: pg_isready.o common.o | submake-libpq submake-libpgport
***************
*** 65,71 **** uninstall:
clean distclean maintainer-clean:
rm -f $(addsuffix $(X), $(PROGRAMS)) $(addsuffix .o, $(PROGRAMS))
! rm -f common.o dumputils.o kwlookup.o keywords.o print.o mbprint.o $(WIN32RES)
rm -f dumputils.c print.c mbprint.c kwlookup.c keywords.c
rm -rf tmp_check
--- 65,71 ----
clean distclean maintainer-clean:
rm -f $(addsuffix $(X), $(PROGRAMS)) $(addsuffix .o, $(PROGRAMS))
! rm -f common.o dumputils.o kwlookup.o keywords.o print.o mbprint.o vac_parallel.o $(WIN32RES)
rm -f dumputils.c print.c mbprint.c kwlookup.c keywords.c
rm -rf tmp_check
*** /dev/null
--- b/src/bin/scripts/vac_parallel.c
***************
*** 0 ****
--- 1,1144 ----
+ /*-------------------------------------------------------------------------
+ *
+ * vac_parallel.c
+ *
+ * Parallel support for the vacuumdb
+ *
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * The author is not responsible for loss or damages that may
+ * result from its use.
+ *
+ * IDENTIFICATION
+ * src/bin/scripts/vac_parallel.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+ #include "vac_parallel.h"
+
+ #ifndef WIN32
+ #include
+ #include
+ #include "signal.h"
+ #include
+ #include
+ #endif
+
+ #include "common.h"
+
+
+ #define PIPE_READ 0
+ #define PIPE_WRITE 1
+
+ /* file-scope variables */
+ #ifdef WIN32
+ static unsigned int tMasterThreadId = 0;
+ static HANDLE termEvent = INVALID_HANDLE_VALUE;
+ static int pgpipe(int handles[2]);
+ static int piperead(int s, char *buf, int len);
+ bool parallel_init_done = false;
+ DWORD mainThreadId;
+
+ /*
+ * Structure to hold info passed by _beginthreadex() to the function it calls
+ * via its single allowed argument.
+ */
+ typedef struct
+ {
+ VacOpt *vopt;
+ int worker;
+ int pipeRead;
+ int pipeWrite;
+ } WorkerInfo;
+
+ #define pipewrite(a,b,c) send(a,b,c,0)
+ #else
+ /*
+ * aborting is only ever used in the master, the workers are fine with just
+ * wantAbort.
+ */
+ static bool aborting = false;
+ static volatile sig_atomic_t wantAbort = 0;
+
+ #define pgpipe(a) pipe(a)
+ #define piperead(a,b,c) read(a,b,c)
+ #define pipewrite(a,b,c) write(a,b,c)
+ #endif
+
+ static const char *modulename = gettext_noop("parallel vacuum");
+
+ typedef struct ShutdownInformation
+ {
+ ParallelState *pstate;
+ PGconn *conn;
+ } ShutdownInformation;
+
+ static ShutdownInformation shutdown_info;
+
+ static char *readMessageFromPipe(int fd);
+ static void
+ SetupWorker(PGconn *connection, int pipefd[2], int worker, int vacStage);
+ static void
+ WaitForCommands(PGconn * connection, int pipefd[2]);
+ static char *
+ getMessageFromMaster(int pipefd[2]);
+ #define messageStartsWith(msg, prefix) \
+ (strncmp(msg, prefix, strlen(prefix)) == 0)
+ #define messageEquals(msg, pattern) \
+ (strcmp(msg, pattern) == 0)
+ static char *
+ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker);
+
+ static void
+ sendMessageToMaster(int pipefd[2], const char *str);
+
+ static void
+ parallel_msg_master(ParallelSlot *slot, const char *modulename, const char *fmt,
+ va_list ap)__attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 0)));
+
+
+ #ifndef WIN32
+ static void sigTermHandler(int signum);
+ #endif
+
+ static ParallelSlot *
+ GetMyPSlot(ParallelState *pstate);
+ static int
+ select_loop(int maxFd, fd_set *workerset);
+ void horribly_exit(const char *modulename, const char *fmt,...)
+ __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
+
+ void
+ init_parallel_vacuum_utils(void);
+
+ static void exit_nicely(int code);
+
+ static bool
+ HasEveryWorkerTerminated(ParallelState *pstate);
+
+ static void checkAborting();
+
+
+ static ParallelSlot *
+ GetMyPSlot(ParallelState *pstate)
+ {
+ int i;
+
+ for (i = 0; i < pstate->numWorkers; i++)
+ #ifdef WIN32
+ if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
+ #else
+ if (pstate->parallelSlot[i].pid == getpid())
+ #endif
+ return &(pstate->parallelSlot[i]);
+
+ return NULL;
+ }
+
+ /* Sends the error message from the worker to the master process */
+ static void
+ parallel_msg_master(ParallelSlot *slot, const char *modulename,
+ const char *fmt, va_list ap)
+ {
+ char buf[512];
+ int pipefd[2];
+
+ pipefd[PIPE_READ] = slot->pipeRevRead;
+ pipefd[PIPE_WRITE] = slot->pipeRevWrite;
+
+ strcpy(buf, "ERROR ");
+ vsnprintf(buf + strlen("ERROR "),
+ sizeof(buf) - strlen("ERROR "), fmt, ap);
+
+ sendMessageToMaster(pipefd, buf);
+ }
+
+ /*
+ * Wait for the termination of the processes using the OS-specific method.
+ */
+ static void
+ WaitForTerminatingWorkers(ParallelState *pstate)
+ {
+ while (!HasEveryWorkerTerminated(pstate))
+ {
+ ParallelSlot *slot = NULL;
+ int j;
+
+ #ifndef WIN32
+ int status;
+ pid_t pid = wait(&status);
+
+ for (j = 0; j < pstate->numWorkers; j++)
+ if (pstate->parallelSlot[j].pid == pid)
+ slot = &(pstate->parallelSlot[j]);
+ #else
+ uintptr_t hThread;
+ DWORD ret;
+ uintptr_t *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
+ int nrun = 0;
+
+ for (j = 0; j < pstate->numWorkers; j++)
+ if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED)
+ {
+ lpHandles[nrun] = pstate->parallelSlot[j].hThread;
+ nrun++;
+ }
+ ret = WaitForMultipleObjects(nrun, (HANDLE *) lpHandles, false, INFINITE);
+ Assert(ret != WAIT_FAILED);
+ hThread = lpHandles[ret - WAIT_OBJECT_0];
+
+ for (j = 0; j < pstate->numWorkers; j++)
+ if (pstate->parallelSlot[j].hThread == hThread)
+ slot = &(pstate->parallelSlot[j]);
+
+ free(lpHandles);
+ #endif
+ Assert(slot);
+
+ slot->workerStatus = WRKR_TERMINATED;
+ }
+ Assert(HasEveryWorkerTerminated(pstate));
+ }
+
+
+ #ifdef WIN32
+ static unsigned __stdcall
+ init_spawned_worker_win32(WorkerInfo *wi)
+ {
+ PGconn *conn;
+ int pipefd[2] = {wi->pipeRead, wi->pipeWrite};
+ int worker = wi->worker;
+ VacOpt *vopt = wi->vopt;
+ ParallelSlot *mySlot = &shutdown_info.pstate->parallelSlot[worker];
+
+ conn = connectDatabase(vopt->dbname, vopt->pghost, vopt->pgport,
+ vopt->username, vopt->promptPassword, vopt->progname,
+ false);
+
+ mySlot->args->connection = conn;
+
+ free(wi);
+ SetupWorker(conn, pipefd, worker, vopt->analyze_stage);
+ _endthreadex(0);
+ return 0;
+ }
+ #endif
+
+
+
+ ParallelState * ParallelVacuumStart(VacOpt *vopt, int numWorkers)
+ {
+ ParallelState *pstate;
+ int i;
+ const size_t slotSize = numWorkers * sizeof(ParallelSlot);
+
+ Assert(numWorkers > 0);
+
+ /* Ensure stdio state is quiesced before forking */
+ fflush(NULL);
+
+ pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
+
+ pstate->numWorkers = numWorkers;
+ pstate->parallelSlot = NULL;
+
+ if (numWorkers == 1)
+ return pstate;
+
+ pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize);
+ memset((void *) pstate->parallelSlot, 0, slotSize);
+
+ shutdown_info.pstate = pstate;
+
+ #ifdef WIN32
+ tMasterThreadId = GetCurrentThreadId();
+ termEvent = CreateEvent(NULL, true, false, "Terminate");
+ #else
+ signal(SIGTERM, sigTermHandler);
+ signal(SIGINT, sigTermHandler);
+ signal(SIGQUIT, sigTermHandler);
+ #endif
+
+
+ for (i = 0; i < pstate->numWorkers; i++)
+ {
+ #ifdef WIN32
+ WorkerInfo *wi;
+ uintptr_t handle;
+ #else
+ pid_t pid;
+ #endif
+ int pipeMW[2],
+ pipeWM[2];
+
+ if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
+ horribly_exit(modulename,
+ "could not create communication channels: %s\n",
+ strerror(errno));
+
+ pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
+ pstate->parallelSlot[i].args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
+ pstate->parallelSlot[i].args->vopt = vopt;
+ #ifdef WIN32
+ /* Allocate a new structure for every worker */
+ wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
+ wi->worker = i;
+ wi->pipeRead = pstate->parallelSlot[i].pipeRevRead = pipeMW[PIPE_READ];
+ wi->pipeWrite = pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE];
+ wi->vopt = vopt;
+ handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
+ wi, 0, &(pstate->parallelSlot[i].threadId));
+ pstate->parallelSlot[i].hThread = handle;
+ #else
+ pid = fork();
+ if (pid == 0)
+ {
+ /* we are the worker */
+ int j;
+ int pipefd[2] = {pipeMW[PIPE_READ], pipeWM[PIPE_WRITE]};
+
+ /*
+ * Store the fds for the reverse communication in pstate. Actually
+ * we only use this in case of an error and don't use pstate
+ * otherwise in the worker process. On Windows we write to the
+ * global pstate, in Unix we write to our process-local copy but
+ * that's also where we'd retrieve this information back from.
+ */
+ pstate->parallelSlot[i].pipeRevRead = pipefd[PIPE_READ];
+ pstate->parallelSlot[i].pipeRevWrite = pipefd[PIPE_WRITE];
+ pstate->parallelSlot[i].pid = getpid();
+
+ pstate->parallelSlot[i].args->connection
+ = connectDatabase(vopt->dbname, vopt->pghost, vopt->pgport,
+ vopt->username, vopt->promptPassword,
+ vopt->progname, false);
+
+ /* close read end of Worker -> Master */
+ closesocket(pipeWM[PIPE_READ]);
+
+ /* close write end of Master -> Worker */
+ closesocket(pipeMW[PIPE_WRITE]);
+
+ /*
+ * Close all inherited fds for communication of the master with
+ * the other workers.
+ */
+ for (j = 0; j < i; j++)
+ {
+ closesocket(pstate->parallelSlot[j].pipeRead);
+ closesocket(pstate->parallelSlot[j].pipeWrite);
+ }
+
+ SetupWorker(pstate->parallelSlot[i].args->connection,
+ pipefd, i, vopt->analyze_stage);
+ exit(0);
+ }
+ else if (pid < 0)
+ /* fork failed */
+ horribly_exit(modulename,
+ "could not create worker process: %s\n",
+ strerror(errno));
+
+ /* we are the Master, pid > 0 here */
+ Assert(pid > 0);
+
+ /* close read end of Master -> Worker */
+ closesocket(pipeMW[PIPE_READ]);
+
+ /* close write end of Worker -> Master */
+ closesocket(pipeWM[PIPE_WRITE]);
+
+ pstate->parallelSlot[i].pid = pid;
+ #endif
+
+ pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ];
+ pstate->parallelSlot[i].pipeWrite = pipeMW[PIPE_WRITE];
+ }
+
+ return pstate;
+ }
+
+ /*
+ * Tell all of our workers to terminate.
+ *
+ * Pretty straightforward routine, first we tell everyone to terminate, then
+ * we listen to the workers' replies and finally close the sockets that we
+ * have used for communication.
+ */
+ void
+ ParallelVacuumEnd(ParallelState *pstate)
+ {
+ int i;
+
+ Assert(IsEveryWorkerIdle(pstate));
+
+ /* close the sockets so that the workers know they can exit */
+ for (i = 0; i < pstate->numWorkers; i++)
+ {
+ closesocket(pstate->parallelSlot[i].pipeRead);
+ closesocket(pstate->parallelSlot[i].pipeWrite);
+ }
+
+ WaitForTerminatingWorkers(pstate);
+
+ /*
+ * Remove the pstate again, so the exit handler in the parent will now
+ * again fall back to closing AH->connection (if connected).
+ */
+ shutdown_info.pstate = NULL;
+
+ free(pstate->parallelSlot);
+ free(pstate);
+ }
+
+
+ /*
+ * Find the first free parallel slot (if any).
+ */
+ int
+ GetIdleWorker(ParallelState *pstate)
+ {
+ int i;
+
+ for (i = 0; i < pstate->numWorkers; i++)
+ if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
+ return i;
+ return NO_SLOT;
+ }
+
+ /*
+ * Return true iff every worker process is in the WRKR_TERMINATED state.
+ */
+ static bool
+ HasEveryWorkerTerminated(ParallelState *pstate)
+ {
+ int i;
+
+ for (i = 0; i < pstate->numWorkers; i++)
+ if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED)
+ return false;
+ return true;
+ }
+
+ /*
+ * If we have one worker that terminates for some reason, we'd like the other
+ * threads to terminate as well (and not finish with their 70 GB table dump
+ * first...). Now in UNIX we can just kill these processes, and let the signal
+ * handler set wantAbort to 1. In Windows we set a termEvent and this serves
+ * as the signal for everyone to terminate.
+ */
+ static void
+ checkAborting()
+ {
+ #ifdef WIN32
+ if (WaitForSingleObject(termEvent, 0) == WAIT_OBJECT_0)
+ #else
+ if (wantAbort)
+ #endif
+ horribly_exit(modulename, "worker is terminating\n");
+ }
+
+ /*
+ * Shut down any remaining workers, this has an implicit do_wait == true.
+ *
+ * The fastest way we can make the workers terminate gracefully is when
+ * they are listening for new commands and we just tell them to terminate.
+ */
+ static void
+ ShutdownWorkersHard(ParallelState *pstate)
+ {
+ #ifndef WIN32
+ int i;
+
+ signal(SIGPIPE, SIG_IGN);
+
+ /*
+ * Close our write end of the sockets so that the workers know they can
+ * exit.
+ */
+ for (i = 0; i < pstate->numWorkers; i++)
+ closesocket(pstate->parallelSlot[i].pipeWrite);
+
+ for (i = 0; i < pstate->numWorkers; i++)
+ kill(pstate->parallelSlot[i].pid, SIGTERM);
+ #else
+ /* The workers monitor this event via checkAborting(). */
+ SetEvent(termEvent);
+ #endif
+
+ WaitForTerminatingWorkers(pstate);
+ }
+
+ #ifndef WIN32
+ /* Signal handling (UNIX only) */
+ static void
+ sigTermHandler(int signum)
+ {
+ wantAbort = 1;
+ }
+ #endif
+
+
+
+ /*
+ * This function is called by both UNIX and Windows variants to set up a
+ * worker process.
+ */
+ static void
+ SetupWorker(PGconn *connection, int pipefd[2], int worker, int vacStage)
+ {
+
+ const char *stage_commands[] = {
+ "SET default_statistics_target=1; SET vacuum_cost_delay=0;",
+ "SET default_statistics_target=10; RESET vacuum_cost_delay;",
+ "RESET default_statistics_target;"};
+
+ executeMaintenanceCommand(connection, stage_commands[vacStage], false);
+
+ /*
+ * Call the setup worker function that's defined in the ArchiveHandle.
+ *
+ * We get the raw connection only for the reason that we can close it
+ * properly when we shut down. This happens only that way when it is
+ * brought down because of an error.
+ */
+ WaitForCommands(connection, pipefd);
+ closesocket(pipefd[PIPE_READ]);
+ closesocket(pipefd[PIPE_WRITE]);
+ }
+
+
+
+ /*
+ * That's the main routine for the worker.
+ * When it starts up it enters this routine and waits for commands from the
+ * master process. After having processed a command it comes back to here to
+ * wait for the next command. Finally it will receive a TERMINATE command and
+ * exit.
+ */
+ static void
+ WaitForCommands(PGconn * connection, int pipefd[2])
+ {
+ char *command;
+ PQExpBufferData sql;
+
+ for (;;)
+ {
+ if (!(command = getMessageFromMaster(pipefd)))
+ {
+ PQfinish(connection);
+ connection = NULL;
+ return;
+ }
+
+
+ /* check if master has set the terminate event*/
+ checkAborting();
+
+ if (executeMaintenanceCommand(connection, command, false))
+ sendMessageToMaster(pipefd, "OK");
+ else
+ {
+ initPQExpBuffer(&sql);
+ appendPQExpBuffer(&sql, "ERROR : %s",
+ PQerrorMessage(connection));
+ sendMessageToMaster(pipefd, sql.data);
+ termPQExpBuffer(&sql);
+ }
+
+ /* command was pg_malloc'd and we are responsible for free()ing it. */
+ free(command);
+ }
+ }
+
+ /*
+ * ---------------------------------------------------------------------
+ * Note the status change:
+ *
+ * DispatchJobForTocEntry WRKR_IDLE -> WRKR_WORKING
+ * ListenToWorkers WRKR_WORKING -> WRKR_FINISHED / WRKR_TERMINATED
+ * ReapWorkerStatus WRKR_FINISHED -> WRKR_IDLE
+ * ---------------------------------------------------------------------
+ *
+ * Just calling ReapWorkerStatus() when all workers are working might or might
+ * not give you an idle worker because you need to call ListenToWorkers() in
+ * between and only thereafter ReapWorkerStatus(). This is necessary in order
+ * to get and deal with the status (=result) of the worker's execution.
+ */
+ void
+ ListenToWorkers(ParallelState *pstate, bool do_wait)
+ {
+ int worker;
+ char *msg;
+
+ msg = getMessageFromWorker(pstate, do_wait, &worker);
+
+ if (!msg)
+ {
+ if (do_wait)
+ horribly_exit(modulename, "a worker process died unexpectedly\n");
+ return;
+ }
+
+ if (messageStartsWith(msg, "OK"))
+ {
+ pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED;
+
+ }
+ else if (messageStartsWith(msg, "ERROR "))
+ {
+
+ ParallelSlot *mySlot = &pstate->parallelSlot[worker];
+
+ mySlot->workerStatus = WRKR_TERMINATED;
+
+ horribly_exit(modulename,
+ "%s: vacuuming of database \"%s\" failed %s",
+ mySlot->args->vopt->progname,
+ mySlot->args->vopt->dbname, msg + strlen("ERROR "));
+ }
+ else
+ horribly_exit(modulename, "invalid message received from worker: %s\n", msg);
+
+ /* both Unix and Win32 return pg_malloc()ed space, so we free it */
+ free(msg);
+ }
+
+ /*
+ * This function is executed in the master process.
+ *
+ * This function is used to get the return value of a terminated worker
+ * process. If a process has terminated, its status is stored in *status and
+ * the id of the worker is returned.
+ */
+ int
+ ReapWorkerStatus(ParallelState *pstate, int *status)
+ {
+ int i;
+
+ for (i = 0; i < pstate->numWorkers; i++)
+ {
+ if (pstate->parallelSlot[i].workerStatus == WRKR_FINISHED)
+ {
+ *status = pstate->parallelSlot[i].status;
+ pstate->parallelSlot[i].status = 0;
+ pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
+ return i;
+ }
+ }
+
+ return NO_SLOT;
+ }
+
+ /*
+ * This function is executed in the master process.
+ *
+ * It looks for an idle worker process and only returns if there is one.
+ */
+ void
+ EnsureIdleWorker(ParallelState *pstate)
+ {
+ int ret_worker;
+ int work_status;
+
+ for (;;)
+ {
+ int nTerm = 0;
+
+ while ((ret_worker = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
+ {
+ if (work_status != 0)
+ horribly_exit(modulename, "error processing a parallel work item\n");
+
+ nTerm++;
+ }
+
+ /*
+ * We need to make sure that we have an idle worker before dispatching
+ * the next item. If nTerm > 0 we already have that (quick check).
+ */
+ if (nTerm > 0)
+ return;
+
+ /* explicit check for an idle worker */
+ if (GetIdleWorker(pstate) != NO_SLOT)
+ return;
+
+ /*
+ * If we have no idle worker, read the result of one or more workers
+ * and loop the loop to call ReapWorkerStatus() on them
+ */
+ ListenToWorkers(pstate, true);
+ }
+ }
+
+
+ /*
+ * Return true iff every worker is in the WRKR_IDLE state.
+ */
+ bool
+ IsEveryWorkerIdle(ParallelState *pstate)
+ {
+ int i;
+
+ for (i = 0; i < pstate->numWorkers; i++)
+ if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
+ return false;
+ return true;
+ }
+
+
+ /*
+ * This function is executed in the master process.
+ *
+ * It waits for all workers to terminate.
+ */
+ void
+ EnsureWorkersFinished(ParallelState *pstate)
+ {
+ int work_status;
+
+ if (!pstate || pstate->numWorkers == 1)
+ return;
+
+ /* Waiting for the remaining worker processes to finish */
+ while (!IsEveryWorkerIdle(pstate))
+ {
+ if (ReapWorkerStatus(pstate, &work_status) == NO_SLOT)
+ ListenToWorkers(pstate, true);
+ else if (work_status != 0)
+ horribly_exit(modulename,
+ "error processing a parallel work item\n");
+ }
+ }
+
+ /*
+ * This function is executed in the worker process.
+ *
+ * It returns the next message on the communication channel, blocking until it
+ * becomes available.
+ */
+ static char *
+ getMessageFromMaster(int pipefd[2])
+ {
+ return readMessageFromPipe(pipefd[PIPE_READ]);
+ }
+
+ /*
+ * This function is executed in the worker process.
+ *
+ * It sends a message to the master on the communication channel.
+ */
+ static void
+ sendMessageToMaster(int pipefd[2], const char *str)
+ {
+ int len = strlen(str) + 1;
+
+ if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
+ horribly_exit(modulename,
+ "could not write to the communication channel: %s\n",
+ strerror(errno));
+ }
+
+ /*
+ * This function is executed in the master process.
+ *
+ * It returns the next message from the worker on the communication channel,
+ * optionally blocking (do_wait) until it becomes available.
+ *
+ * The id of the worker is returned in *worker.
+ */
+ static char *
+ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
+ {
+ int i;
+ fd_set workerset;
+ int maxFd = -1;
+ struct timeval nowait = {0, 0};
+
+ FD_ZERO(&workerset);
+
+ for (i = 0; i < pstate->numWorkers; i++)
+ {
+ if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED)
+ continue;
+ FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
+ /* actually WIN32 ignores the first parameter to select()... */
+ if (pstate->parallelSlot[i].pipeRead > maxFd)
+ maxFd = pstate->parallelSlot[i].pipeRead;
+ }
+
+ if (do_wait)
+ {
+ i = select_loop(maxFd, &workerset);
+ Assert(i != 0);
+ }
+ else
+ {
+ if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
+ return NULL;
+ }
+
+ if (i < 0)
+ horribly_exit(modulename, "error in ListenToWorkers(): %s\n", strerror(errno));
+
+ for (i = 0; i < pstate->numWorkers; i++)
+ {
+ char *msg;
+
+ if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
+ continue;
+
+ msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
+ *worker = i;
+ return msg;
+ }
+
+ Assert(false);
+ return NULL;
+ }
+
+ /*
+ * This function is executed in the master process.
+ *
+ * It sends a message to a certain worker on the communication channel.
+ */
+ static void
+ sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
+ {
+ int len = strlen(str) + 1;
+
+ if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
+ {
+ /*
+ * If we're already aborting anyway, don't care if we succeed or not.
+ * The child might have gone already.
+ */
+ #ifndef WIN32
+ if (!aborting)
+ #endif
+ horribly_exit(modulename,
+ "could not write to the communication channel: %s\n",
+ strerror(errno));
+ }
+ }
+
+ /*
+ * The underlying function to read a message from the communication channel
+ * (fd) with optional blocking (do_wait).
+ */
+ static char *
+ readMessageFromPipe(int fd)
+ {
+ char *msg;
+ int msgsize,
+ bufsize;
+ int ret;
+
+ /*
+ * The problem here is that we need to deal with several possibilites: we
+ * could receive only a partial message or several messages at once. The
+ * caller expects us to return exactly one message however.
+ *
+ * We could either read in as much as we can and keep track of what we
+ * delivered back to the caller or we just read byte by byte. Once we see
+ * (char) 0, we know that it's the message's end. This would be quite
+ * inefficient for more data but since we are reading only on the command
+ * channel, the performance loss does not seem worth the trouble of
+ * keeping internal states for different file descriptors.
+ */
+ bufsize = 64; /* could be any number */
+ msg = (char *) pg_malloc(bufsize);
+
+ msgsize = 0;
+ for (;;)
+ {
+ Assert(msgsize <= bufsize);
+ ret = piperead(fd, msg + msgsize, 1);
+
+ /* worker has closed the connection or another error happened */
+ if (ret <= 0)
+ return NULL;
+
+ Assert(ret == 1);
+
+ if (msg[msgsize] == '\0')
+ return msg;
+
+ msgsize++;
+ if (msgsize == bufsize)
+ {
+ /* could be any number */
+ bufsize += 16;
+ msg = (char *) realloc(msg, bufsize);
+ }
+ }
+ }
+
+ /*
+ * A select loop that repeats calling select until a descriptor in the read
+ * set becomes readable. On Windows we have to check for the termination event
+ * from time to time, on Unix we can just block forever.
+ */
+ static int
+ select_loop(int maxFd, fd_set *workerset)
+ {
+ int i;
+ fd_set saveSet = *workerset;
+
+ #ifdef WIN32
+ /* should always be the master */
+ Assert(tMasterThreadId == GetCurrentThreadId());
+
+ for (;;)
+ {
+ /*
+ * sleep a quarter of a second before checking if we should terminate.
+ */
+ struct timeval tv = {0, 250000};
+
+ *workerset = saveSet;
+ i = select(maxFd + 1, workerset, NULL, NULL, &tv);
+
+ if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
+ continue;
+ if (i)
+ break;
+ }
+ #else /* UNIX */
+
+ for (;;)
+ {
+ *workerset = saveSet;
+ i = select(maxFd + 1, workerset, NULL, NULL, NULL);
+
+ /*
+ * If we Ctrl-C the master process , it's likely that we interrupt
+ * select() here. The signal handler will set wantAbort == true and
+ * the shutdown journey starts from here. Note that we'll come back
+ * here later when we tell all workers to terminate and read their
+ * responses. But then we have aborting set to true.
+ */
+ if (wantAbort && !aborting)
+ horribly_exit(modulename, "terminated by user\n");
+
+ if (i < 0 && errno == EINTR)
+ continue;
+ break;
+ }
+ #endif
+
+ return i;
+ }
+
+ void
+ DispatchJob(ParallelState *pstate, char * command)
+ {
+ int worker;
+
+ /* our caller makes sure that at least one worker is idle */
+ Assert(GetIdleWorker(pstate) != NO_SLOT);
+ worker = GetIdleWorker(pstate);
+ Assert(worker != NO_SLOT);
+
+ sendMessageToWorker(pstate, worker, command);
+ pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
+ }
+
+ #ifdef WIN32
+ /*
+ * This is a replacement version of pipe for Win32 which allows returned
+ * handles to be used in select(). Note that read/write calls must be replaced
+ * with recv/send.
+ */
+ static int
+ pgpipe(int handles[2])
+ {
+ SOCKET s;
+ struct sockaddr_in serv_addr;
+ int len = sizeof(serv_addr);
+
+ handles[0] = handles[1] = INVALID_SOCKET;
+
+ if ((s = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
+ {
+ return -1;
+ }
+
+ memset((void *) &serv_addr, 0, sizeof(serv_addr));
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_port = htons(0);
+ serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
+ {
+ closesocket(s);
+ return -1;
+ }
+ if (listen(s, 1) == SOCKET_ERROR)
+ {
+ closesocket(s);
+ return -1;
+ }
+ if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
+ {
+ closesocket(s);
+ return -1;
+ }
+ if ((handles[1] = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
+ {
+ closesocket(s);
+ return -1;
+ }
+
+ if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
+ {
+ closesocket(s);
+ return -1;
+ }
+ if ((handles[0] = accept(s, (SOCKADDR *) &serv_addr, &len)) == INVALID_SOCKET)
+ {
+ closesocket(handles[1]);
+ handles[1] = INVALID_SOCKET;
+ closesocket(s);
+ return -1;
+ }
+ closesocket(s);
+ return 0;
+ }
+
+ static int
+ piperead(int s, char *buf, int len)
+ {
+ int ret = recv(s, buf, len, 0);
+
+ if (ret < 0 && WSAGetLastError() == WSAECONNRESET)
+ /* EOF on the pipe! (win32 socket based implementation) */
+ ret = 0;
+ return ret;
+ }
+
+ #endif
+
+ static void
+ shutdown_parallel_vacuum_utils()
+ {
+ #ifdef WIN32
+ /* Call the cleanup function only from the main thread */
+ if (mainThreadId == GetCurrentThreadId())
+ WSACleanup();
+ #endif
+ }
+
+ void
+ init_parallel_vacuum_utils(void)
+ {
+ #ifdef WIN32
+ if (!parallel_init_done)
+ {
+ WSADATA wsaData;
+ int err;
+
+ mainThreadId = GetCurrentThreadId();
+ err = WSAStartup(MAKEWORD(2, 2), &wsaData);
+ if (err != 0)
+ {
+ exit_nicely(1);
+ }
+
+ parallel_init_done = true;
+ }
+ #endif
+ }
+
+ void
+ on_exit_close_connection(PGconn *conn)
+ {
+ shutdown_info.conn = conn;
+ }
+
+ /*
+ * Fail and die, with a message to stderr. Parameters as for write_msg.
+ *
+ * This is defined in parallel.c, because in parallel mode, things are more
+ * complicated. If the worker process does horribly_exit(), we forward its
+ * last words to the master process. The master process then does
+ * horribly_exit() with this error message itself and prints it normally.
+ * After printing the message, horribly_exit() on the master will shut down
+ * the remaining worker processes.
+ */
+ void
+ horribly_exit(const char *modulename, const char *fmt,...)
+ {
+ va_list ap;
+ ParallelState *pstate = shutdown_info.pstate;
+ ParallelSlot *slot;
+ va_start(ap, fmt);
+
+ if (pstate == NULL)
+ {
+ /* We're the parent, just write the message out */
+ vfprintf(stderr, _(fmt), ap);
+ }
+ else
+ {
+ slot = GetMyPSlot(pstate);
+ if (!slot)
+ {
+ /* We're the parent, just write the message out */
+ vfprintf(stderr, _(fmt), ap);
+
+ }
+ else
+ {
+ /* If we're a worker process, send the msg to the master process */
+ parallel_msg_master(slot, modulename, fmt, ap);
+ }
+ }
+
+ va_end(ap);
+
+ exit_nicely(1);
+ }
+
+ static void
+ exit_nicely(int code)
+ {
+ if (shutdown_info.pstate)
+ {
+ ParallelSlot *slot = GetMyPSlot(shutdown_info.pstate);
+
+ if (!slot)
+ {
+ /*
+ * We're the master: We have already printed out the message
+ * passed to horribly_exit() either from the master itself or from
+ * a worker process. Now we need to close our own database
+ * connection (only open during parallel dump but not restore) and
+ * shut down the remaining workers.
+ */
+ PQfinish(shutdown_info.conn);
+ #ifndef WIN32
+
+ /*
+ * Setting aborting to true switches to best-effort-mode
+ * (send/receive but ignore errors) in communicating with our
+ * workers.
+ */
+ aborting = true;
+ #endif
+ ShutdownWorkersHard(shutdown_info.pstate);
+ }
+ else if (slot->args->connection)
+ PQfinish(slot->args->connection);
+
+ }
+ else if (shutdown_info.conn)
+ PQfinish(shutdown_info.conn);
+
+ shutdown_parallel_vacuum_utils();
+ exit(code);
+ }
+
+
*** /dev/null
--- b/src/bin/scripts/vac_parallel.h
***************
*** 0 ****
--- 1,105 ----
+ /*-------------------------------------------------------------------------
+ *
+ * vac_parallel.h
+ *
+ * Parallel support header file for the vacuumdb
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * The author is not responsible for loss or damages that may
+ * result from its use.
+ *
+ * IDENTIFICATION
+ * src/bin/scripts/vac_parallel.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+ #ifndef VAC_PARALLEL_H
+ #define VAC_PARALLEL_H
+
+ #include "postgres_fe.h"
+ #include
+ #include "libpq-fe.h"
+ #include "common.h"
+
+ typedef enum
+ {
+ WRKR_TERMINATED = 0,
+ WRKR_IDLE,
+ WRKR_WORKING,
+ WRKR_FINISHED
+ } T_WorkerStatus;
+
+
+ typedef struct VacOpt
+ {
+ char *dbname;
+ char *pgport;
+ char *pghost;
+ char *username;
+ char *progname;
+ enum trivalue promptPassword;
+ int analyze_stage;
+ }VacOpt;
+
+ /* Arguments needed for a worker process */
+ typedef struct ParallelArgs
+ {
+ PGconn *connection;
+ VacOpt *vopt;
+ } ParallelArgs;
+
+ /* State for each parallel activity slot */
+ typedef struct ParallelSlot
+ {
+ ParallelArgs *args; //can pass connection handle here
+ T_WorkerStatus workerStatus;
+ int status;
+ int pipeRead;
+ int pipeWrite;
+ int pipeRevRead;
+ int pipeRevWrite;
+ #ifdef WIN32
+ uintptr_t hThread;
+ unsigned int threadId;
+ #else
+ pid_t pid;
+ #endif
+ } ParallelSlot;
+
+ #define NO_SLOT (-1)
+
+ typedef struct ParallelState
+ {
+ int numWorkers;
+ ParallelSlot *parallelSlot;
+ } ParallelState;
+
+
+
+ #ifdef WIN32
+ extern bool parallel_init_done;
+ extern DWORD mainThreadId;
+ #endif
+
+ extern ParallelState * ParallelVacuumStart(VacOpt *vopt, int numWorkers);
+ extern int GetIdleWorker(ParallelState *pstate);
+ extern bool IsEveryWorkerIdle(ParallelState *pstate);
+ extern void ListenToWorkers(ParallelState *pstate, bool do_wait);
+ extern int ReapWorkerStatus(ParallelState *pstate, int *status);
+ extern void EnsureIdleWorker(ParallelState *pstate);
+ extern void EnsureWorkersFinished(ParallelState *pstate);
+
+ extern void DispatchJob(ParallelState *pstate, char * command);
+ extern void
+ exit_horribly(const char *modulename, const char *fmt,...)
+ __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3), noreturn));
+
+ extern void init_parallel_vacuum_utils(void);
+ extern void on_exit_close_connection(PGconn *conn);
+ extern void ParallelVacuumEnd(ParallelState *pstate);
+
+
+ #endif /* VAC_PARALLEL_H */
*** a/src/bin/scripts/vacuumdb.c
--- b/src/bin/scripts/vacuumdb.c
***************
*** 11,34 ****
*/
#include "postgres_fe.h"
- #include "common.h"
#include "dumputils.h"
static void vacuum_one_database(const char *dbname, bool full, bool verbose,
! bool and_analyze, bool analyze_only, bool analyze_in_stages, bool freeze,
! const char *table, const char *host, const char *port,
! const char *username, enum trivalue prompt_password,
const char *progname, bool echo);
static void vacuum_all_databases(bool full, bool verbose, bool and_analyze,
bool analyze_only, bool analyze_in_stages, bool freeze,
const char *maintenance_db,
const char *host, const char *port,
const char *username, enum trivalue prompt_password,
! const char *progname, bool echo, bool quiet);
static void help(const char *progname);
int
main(int argc, char *argv[])
--- 11,54 ----
*/
#include "postgres_fe.h"
#include "dumputils.h"
+ #include "vac_parallel.h"
static void vacuum_one_database(const char *dbname, bool full, bool verbose,
! bool and_analyze, bool analyze_only, bool analyze_in_stages,
! bool freeze, const char *table, const char *host,
! const char *port, const char *username,
! enum trivalue prompt_password,
const char *progname, bool echo);
static void vacuum_all_databases(bool full, bool verbose, bool and_analyze,
bool analyze_only, bool analyze_in_stages, bool freeze,
const char *maintenance_db,
const char *host, const char *port,
const char *username, enum trivalue prompt_password,
! const char *progname, bool echo, bool quiet, int parallel);
static void help(const char *progname);
+ void vacuum_parallel(const char *dbname, bool full, bool verbose,
+ bool and_analyze, bool analyze_only, bool analyze_in_stages,
+ bool freeze, const char *host,
+ const char *port, const char *username,
+ enum trivalue prompt_password, const char *progname,
+ bool echo, int parallel, SimpleStringList *tables);
+
+ void run_command(ParallelState *pstate, char *command);
+
+ void prepare_command(PGconn *conn, bool full, bool verbose, bool and_analyze,
+ bool analyze_only, bool freeze, PQExpBuffer sql);
+ static void
+ run_parallel_vacuum(PGconn *conn, bool echo,
+ const char *dbname, SimpleStringList *tables,
+ bool full, bool verbose, bool and_analyze,
+ bool analyze_only, bool freeze, int parallel,
+ VacOpt *vopt, const char *progname);
+
+
int
main(int argc, char *argv[])
***************
*** 49,54 **** main(int argc, char *argv[])
--- 69,75 ----
{"table", required_argument, NULL, 't'},
{"full", no_argument, NULL, 'f'},
{"verbose", no_argument, NULL, 'v'},
+ {"jobs", required_argument, NULL, 'j'},
{"maintenance-db", required_argument, NULL, 2},
{"analyze-in-stages", no_argument, NULL, 3},
{NULL, 0, NULL, 0}
***************
*** 74,86 **** main(int argc, char *argv[])
bool full = false;
bool verbose = false;
SimpleStringList tables = {NULL, NULL};
progname = get_progname(argv[0]);
set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pgscripts"));
handle_help_version_opts(argc, argv, "vacuumdb", help);
! while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fv", long_options, &optindex)) != -1)
{
switch (c)
{
--- 95,108 ----
bool full = false;
bool verbose = false;
SimpleStringList tables = {NULL, NULL};
+ int parallel = 0;
progname = get_progname(argv[0]);
set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pgscripts"));
handle_help_version_opts(argc, argv, "vacuumdb", help);
! while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fv:j:", long_options, &optindex)) != -1)
{
switch (c)
{
***************
*** 129,134 **** main(int argc, char *argv[])
--- 151,166 ----
case 'v':
verbose = true;
break;
+ case 'j':
+ parallel = atoi(optarg);
+ if (parallel <= 0)
+ {
+ fprintf(stderr, _("%s: Number of parallel \"jobs\" should be at least 1\n"),
+ progname);
+ exit(1);
+ }
+
+ break;
case 2:
maintenance_db = pg_strdup(optarg);
break;
***************
*** 141,146 **** main(int argc, char *argv[])
--- 173,179 ----
}
}
+ optind++;
/*
* Non-option argument specifies database name as long as it wasn't
***************
*** 196,202 **** main(int argc, char *argv[])
vacuum_all_databases(full, verbose, and_analyze, analyze_only, analyze_in_stages, freeze,
maintenance_db, host, port, username,
! prompt_password, progname, echo, quiet);
}
else
{
--- 229,235 ----
vacuum_all_databases(full, verbose, and_analyze, analyze_only, analyze_in_stages, freeze,
maintenance_db, host, port, username,
! prompt_password, progname, echo, quiet, parallel);
}
else
{
***************
*** 210,234 **** main(int argc, char *argv[])
dbname = get_user_name_or_exit(progname);
}
! if (tables.head != NULL)
{
! SimpleStringListCell *cell;
! for (cell = tables.head; cell; cell = cell->next)
{
! vacuum_one_database(dbname, full, verbose, and_analyze,
! analyze_only, analyze_in_stages,
! freeze, cell->val,
! host, port, username, prompt_password,
! progname, echo);
}
}
- else
- vacuum_one_database(dbname, full, verbose, and_analyze,
- analyze_only, analyze_in_stages,
- freeze, NULL,
- host, port, username, prompt_password,
- progname, echo);
}
exit(0);
--- 243,281 ----
dbname = get_user_name_or_exit(progname);
}
! if (parallel > 1)
{
! vacuum_parallel(dbname, full, verbose, and_analyze,
! analyze_only, analyze_in_stages,
! freeze, host, port, username, prompt_password,
! progname, echo, parallel, &tables);
! }
! else
! {
! if (tables.head != NULL)
{
! SimpleStringListCell *cell;
!
! for (cell = tables.head; cell; cell = cell->next)
! {
! vacuum_one_database(dbname, full, verbose, and_analyze,
! analyze_only, analyze_in_stages,
! freeze, cell->val,
! host, port, username, prompt_password,
! progname, echo);
! }
! }
! else
! {
! vacuum_one_database(dbname, full, verbose, and_analyze,
! analyze_only, analyze_in_stages,
! freeze, NULL,
! host, port, username, prompt_password,
! progname, echo);
!
}
}
}
exit(0);
***************
*** 253,263 **** run_vacuum_command(PGconn *conn, const char *sql, bool echo, const char *dbname,
static void
! vacuum_one_database(const char *dbname, bool full, bool verbose, bool and_analyze,
! bool analyze_only, bool analyze_in_stages, bool freeze, const char *table,
! const char *host, const char *port,
! const char *username, enum trivalue prompt_password,
! const char *progname, bool echo)
{
PQExpBufferData sql;
--- 300,311 ----
static void
! vacuum_one_database(const char *dbname, bool full, bool verbose,
! bool and_analyze, bool analyze_only, bool analyze_in_stages,
! bool freeze, const char *table, const char *host,
! const char *port, const char *username,
! enum trivalue prompt_password, const char *progname,
! bool echo)
{
PQExpBufferData sql;
***************
*** 352,362 **** vacuum_one_database(const char *dbname, bool full, bool verbose, bool and_analyz
static void
! vacuum_all_databases(bool full, bool verbose, bool and_analyze, bool analyze_only,
! bool analyze_in_stages, bool freeze, const char *maintenance_db,
! const char *host, const char *port,
! const char *username, enum trivalue prompt_password,
! const char *progname, bool echo, bool quiet)
{
PGconn *conn;
PGresult *result;
--- 400,411 ----
static void
! vacuum_all_databases(bool full, bool verbose, bool and_analyze,
! bool analyze_only, bool analyze_in_stages, bool freeze,
! const char *maintenance_db, const char *host,
! const char *port, const char *username,
! enum trivalue prompt_password, const char *progname,
! bool echo, bool quiet, int parallel)
{
PGconn *conn;
PGresult *result;
***************
*** 377,391 **** vacuum_all_databases(bool full, bool verbose, bool and_analyze, bool analyze_onl
fflush(stdout);
}
! vacuum_one_database(dbname, full, verbose, and_analyze, analyze_only,
! analyze_in_stages,
! freeze, NULL, host, port, username, prompt_password,
progname, echo);
}
PQclear(result);
}
static void
help(const char *progname)
--- 426,673 ----
fflush(stdout);
}
!
! if (parallel > 1)
! {
! vacuum_parallel(dbname, full, verbose, and_analyze,
! analyze_only, analyze_in_stages,
! freeze, host, port, username, prompt_password,
! progname, echo, parallel, NULL);
!
! }
! else
! {
! vacuum_one_database(dbname, full, verbose, and_analyze,
! analyze_only, analyze_in_stages,
! freeze, NULL, host, port, username, prompt_password,
progname, echo);
+ }
}
PQclear(result);
}
+ static void
+ run_parallel_vacuum(PGconn *conn, bool echo,
+ const char *dbname, SimpleStringList *tables,
+ bool full, bool verbose, bool and_analyze,
+ bool analyze_only, bool freeze, int parallel,
+ VacOpt *vopt, const char *progname)
+ {
+ ParallelState *pstate;
+ PQExpBufferData sql;
+ SimpleStringListCell *cell;
+
+ initPQExpBuffer(&sql);
+
+ pstate = ParallelVacuumStart(vopt, parallel);
+
+ for (cell = tables->head; cell; cell = cell->next)
+ {
+ prepare_command(conn, full, verbose, and_analyze,
+ analyze_only, freeze, &sql);
+ appendPQExpBuffer(&sql, " %s", cell->val);
+ run_command(pstate, sql.data);
+ termPQExpBuffer(&sql);
+ }
+
+ EnsureWorkersFinished(pstate);
+ ParallelVacuumEnd(pstate);
+ termPQExpBuffer(&sql);
+ }
+
+
+ void
+ vacuum_parallel(const char *dbname, bool full, bool verbose,
+ bool and_analyze, bool analyze_only, bool analyze_in_stages,
+ bool freeze, const char *host, const char *port,
+ const char *username, enum trivalue prompt_password,
+ const char *progname, bool echo, int parallel,
+ SimpleStringList *tables)
+ {
+
+ PGconn *conn;
+ VacOpt vopt = {0};
+
+ init_parallel_vacuum_utils();
+
+ conn = connectDatabase(dbname, host, port, username, prompt_password,
+ progname, false);
+
+ if (dbname)
+ vopt.dbname = pg_strdup(dbname);
+
+ if (host)
+ vopt.pghost = pg_strdup(host);
+
+ if (port)
+ vopt.pgport = pg_strdup(port);
+
+ if (username)
+ vopt.username = pg_strdup(username);
+
+ if (progname)
+ vopt.progname = pg_strdup(progname);
+
+ vopt.promptPassword = prompt_password;
+
+ on_exit_close_connection(conn);
+
+ /* if table list is not provided then we need to do vaccum for whole DB
+ get the list of all tables and prpare the list*/
+ if (!tables || !tables->head)
+ {
+ SimpleStringList dbtables = {NULL, NULL};
+ PGresult *res;
+ int ntuple;
+ int i;
+ char *relName;
+ char *nspace;
+ PQExpBufferData sql;
+
+ initPQExpBuffer(&sql);
+
+ res = executeQuery(conn,
+ "select relname, nspname from pg_class c, pg_namespace ns"
+ " where relkind= \'r\' and c.relnamespace = ns.oid"
+ " order by relpages desc",
+ progname, echo);
+
+ ntuple = PQntuples(res);
+ for (i = 0; i < ntuple; i++)
+ {
+ relName = PQgetvalue(res, i, 0);
+ nspace = PQgetvalue(res, i, 1);
+
+ appendPQExpBuffer(&sql, " %s.\"%s\"", nspace, relName);
+ simple_string_list_append(&dbtables, sql.data);
+ resetPQExpBuffer(&sql);
+ }
+
+ termPQExpBuffer(&sql);
+
+ tables = &dbtables;
+
+ }
+
+ if (analyze_in_stages)
+ {
+ int i;
+
+ for (i = 0; i < 3; i++)
+ {
+ const char *stage_messages[] = {
+ gettext_noop("Generating minimal optimizer statistics (1 target)"),
+ gettext_noop("Generating medium optimizer statistics (10 targets)"),
+ gettext_noop("Generating default (full) optimizer statistics")};
+
+ puts(gettext(stage_messages[i]));
+ vopt.analyze_stage = i;
+ run_parallel_vacuum(conn, echo, dbname, tables, full, verbose,
+ and_analyze, analyze_only, freeze, parallel, &vopt,
+ progname);
+ }
+ }
+ else
+ {
+ vopt.analyze_stage = -1;
+ run_parallel_vacuum(conn, echo, dbname, tables, full, verbose,
+ and_analyze, analyze_only, freeze, parallel, &vopt,
+ progname);
+ }
+
+ PQfinish(conn);
+ }
+
+ void run_command(ParallelState *pstate, char *command)
+ {
+ int work_status;
+ int ret_child;
+
+ DispatchJob(pstate, command);
+
+ /*Listen for worker and get message*/
+ for (;;)
+ {
+ int nTerm = 0;
+
+ ListenToWorkers(pstate, false);
+ while ((ret_child = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
+ {
+ nTerm++;
+ }
+
+ /*
+ * We need to make sure that we have an idle worker before
+ * re-running the loop. If nTerm > 0 we already have that (quick
+ * check).
+ */
+ if (nTerm > 0)
+ break;
+
+ /* if nobody terminated, explicitly check for an idle worker */
+ if (GetIdleWorker(pstate) != NO_SLOT)
+ break;
+ }
+ }
+
+ void prepare_command(PGconn *conn, bool full, bool verbose, bool and_analyze,
+ bool analyze_only, bool freeze, PQExpBuffer sql)
+ {
+ initPQExpBuffer(sql);
+
+ if (analyze_only)
+ {
+ appendPQExpBuffer(sql, "ANALYZE");
+ if (verbose)
+ appendPQExpBuffer(sql, " VERBOSE");
+ }
+ else
+ {
+ appendPQExpBuffer(sql, "VACUUM");
+ if (PQserverVersion(conn) >= 90000)
+ {
+ const char *paren = " (";
+ const char *comma = ", ";
+ const char *sep = paren;
+
+ if (full)
+ {
+ appendPQExpBuffer(sql, "%sFULL", sep);
+ sep = comma;
+ }
+ if (freeze)
+ {
+ appendPQExpBuffer(sql, "%sFREEZE", sep);
+ sep = comma;
+ }
+ if (verbose)
+ {
+ appendPQExpBuffer(sql, "%sVERBOSE", sep);
+ sep = comma;
+ }
+ if (and_analyze)
+ {
+ appendPQExpBuffer(sql, "%sANALYZE", sep);
+ sep = comma;
+ }
+ if (sep != paren)
+ appendPQExpBuffer(sql, ")");
+ }
+ else
+ {
+ if (full)
+ appendPQExpBuffer(sql, " FULL");
+ if (freeze)
+ appendPQExpBuffer(sql, " FREEZE");
+ if (verbose)
+ appendPQExpBuffer(sql, " VERBOSE");
+ if (and_analyze)
+ appendPQExpBuffer(sql, " ANALYZE");
+ }
+ }
+ }
+
static void
help(const char *progname)
***************
*** 405,410 **** help(const char *progname)
--- 687,693 ----
printf(_(" -V, --version output version information, then exit\n"));
printf(_(" -z, --analyze update optimizer statistics\n"));
printf(_(" -Z, --analyze-only only update optimizer statistics\n"));
+ printf(_(" -j, --jobs=NUM use this many parallel jobs to vacuum\n"));
printf(_(" --analyze-in-stages only update optimizer statistics, in multiple\n"
" stages for faster results\n"));
printf(_(" -?, --help show this help, then exit\n"));