diff --git a/doc/src/sgml/ref/reindexdb.sgml b/doc/src/sgml/ref/reindexdb.sgml
index 25b5a72770..a7031030b9 100644
--- a/doc/src/sgml/ref/reindexdb.sgml
+++ b/doc/src/sgml/ref/reindexdb.sgml
@@ -166,6 +166,29 @@ PostgreSQL documentation
+
+
+
+
+
+ Execute the reindex commands in parallel by running
+ njobs
+ commands simultaneously. This option reduces the time of the
+ processing but it also increases the load on the database server.
+
+
+ reindexdb will open
+ njobs connections to the
+ database, so make sure your
+ setting is high enough to accommodate all connections.
+
+
+ Note that this mode is not compatible the
+ or the options.
+
+
+
+
diff --git a/src/bin/scripts/Makefile b/src/bin/scripts/Makefile
index 3cd793b134..ede665090f 100644
--- a/src/bin/scripts/Makefile
+++ b/src/bin/scripts/Makefile
@@ -29,7 +29,7 @@ dropdb: dropdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-
dropuser: dropuser.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
clusterdb: clusterdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
vacuumdb: vacuumdb.o common.o scripts_parallel.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
-reindexdb: reindexdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
+reindexdb: reindexdb.o common.o scripts_parallel.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
pg_isready: pg_isready.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
install: all installdirs
diff --git a/src/bin/scripts/reindexdb.c b/src/bin/scripts/reindexdb.c
index 219a9a9211..d5ccb54a78 100644
--- a/src/bin/scripts/reindexdb.c
+++ b/src/bin/scripts/reindexdb.c
@@ -10,10 +10,14 @@
*/
#include "postgres_fe.h"
+
+#include "catalog/pg_class_d.h"
#include "common.h"
#include "common/logging.h"
+#include "fe_utils/connect.h"
#include "fe_utils/simple_list.h"
#include "fe_utils/string_utils.h"
+#include "scripts_parallel.h"
typedef enum ReindexType
{
@@ -25,16 +29,23 @@ typedef enum ReindexType
} ReindexType;
-static void reindex_one_database(const char *name, const char *dbname,
- ReindexType type, const char *host,
+static SimpleStringList *get_parallel_object_list(PGconn *conn, bool echo);
+static void reindex_one_database(const char *dbname, ReindexType type,
+ SimpleStringList *user_list, const char *host,
const char *port, const char *username,
enum trivalue prompt_password, const char *progname,
- bool echo, bool verbose, bool concurrently);
+ bool echo, bool verbose, bool concurrently,
+ int concurrentCons);
static void reindex_all_databases(const char *maintenance_db,
const char *host, const char *port,
const char *username, enum trivalue prompt_password,
const char *progname, bool echo,
- bool quiet, bool verbose, bool concurrently);
+ bool quiet, bool verbose, bool concurrently,
+ int concurrentCons);
+static void run_reindex_command(PGconn *conn, ReindexType type,
+ const char *name, bool echo, bool verbose,
+ bool concurrently, bool async);
+
static void help(const char *progname);
int
@@ -54,6 +65,7 @@ main(int argc, char *argv[])
{"system", no_argument, NULL, 's'},
{"table", required_argument, NULL, 't'},
{"index", required_argument, NULL, 'i'},
+ {"jobs", required_argument, NULL, 'j'},
{"verbose", no_argument, NULL, 'v'},
{"concurrently", no_argument, NULL, 1},
{"maintenance-db", required_argument, NULL, 2},
@@ -79,6 +91,9 @@ main(int argc, char *argv[])
SimpleStringList indexes = {NULL, NULL};
SimpleStringList tables = {NULL, NULL};
SimpleStringList schemas = {NULL, NULL};
+ int concurrentCons = 1;
+ int tbl_count = 0,
+ nsp_count = 0;
pg_logging_init(argv[0]);
progname = get_progname(argv[0]);
@@ -87,7 +102,7 @@ main(int argc, char *argv[])
handle_help_version_opts(argc, argv, "reindexdb", help);
/* process command-line options */
- while ((c = getopt_long(argc, argv, "h:p:U:wWeqS:d:ast:i:v", long_options, &optindex)) != -1)
+ while ((c = getopt_long(argc, argv, "h:p:U:wWeqS:d:ast:i:j:v", long_options, &optindex)) != -1)
{
switch (c)
{
@@ -114,6 +129,7 @@ main(int argc, char *argv[])
break;
case 'S':
simple_string_list_append(&schemas, optarg);
+ nsp_count++;
break;
case 'd':
dbname = pg_strdup(optarg);
@@ -126,10 +142,25 @@ main(int argc, char *argv[])
break;
case 't':
simple_string_list_append(&tables, optarg);
+ tbl_count++;
break;
case 'i':
simple_string_list_append(&indexes, optarg);
break;
+ case 'j':
+ concurrentCons = atoi(optarg);
+ if (concurrentCons <= 0)
+ {
+ pg_log_error("number of parallel jobs must be at least 1");
+ exit(1);
+ }
+ if (concurrentCons > FD_SETSIZE - 1)
+ {
+ pg_log_error("too many parallel jobs requested (maximum: %d)",
+ FD_SETSIZE - 1);
+ exit(1);
+ }
+ break;
case 'v':
verbose = true;
break;
@@ -194,7 +225,8 @@ main(int argc, char *argv[])
}
reindex_all_databases(maintenance_db, host, port, username,
- prompt_password, progname, echo, quiet, verbose, concurrently);
+ prompt_password, progname, echo, quiet, verbose,
+ concurrently, concurrentCons);
}
else if (syscatalog)
{
@@ -214,6 +246,12 @@ main(int argc, char *argv[])
exit(1);
}
+ if (concurrentCons > 1)
+ {
+ pg_log_error("cannot use multiple jobs to reindex system catalogs");
+ exit(1);
+ }
+
if (dbname == NULL)
{
if (getenv("PGDATABASE"))
@@ -224,9 +262,9 @@ main(int argc, char *argv[])
dbname = get_user_name_or_exit(progname);
}
- reindex_one_database(NULL, dbname, REINDEX_SYSTEM, host,
+ reindex_one_database(dbname, REINDEX_SYSTEM, NULL, host,
port, username, prompt_password, progname,
- echo, verbose, concurrently);
+ echo, verbose, concurrently, 1);
}
else
{
@@ -241,61 +279,55 @@ main(int argc, char *argv[])
}
if (schemas.head != NULL)
- {
- SimpleStringListCell *cell;
-
- for (cell = schemas.head; cell; cell = cell->next)
- {
- reindex_one_database(cell->val, dbname, REINDEX_SCHEMA, host,
- port, username, prompt_password, progname,
- echo, verbose, concurrently);
- }
- }
+ reindex_one_database(dbname, REINDEX_SCHEMA, &schemas, host,
+ port, username, prompt_password, progname,
+ echo, verbose, concurrently,
+ Min(concurrentCons, nsp_count));
if (indexes.head != NULL)
- {
- SimpleStringListCell *cell;
- for (cell = indexes.head; cell; cell = cell->next)
- {
- reindex_one_database(cell->val, dbname, REINDEX_INDEX, host,
- port, username, prompt_password, progname,
- echo, verbose, concurrently);
- }
- }
- if (tables.head != NULL)
- {
- SimpleStringListCell *cell;
+ /*
+ * An index list cannot be processed by multiple connections, as
+ * it would require to distribute the work by owner tables. We
+ * simply ignore the passed number of jobs if any.
+ */
+ reindex_one_database(dbname, REINDEX_INDEX, &indexes, host,
+ port, username, prompt_password, progname,
+ echo, verbose, concurrently, 1);
- for (cell = tables.head; cell; cell = cell->next)
- {
- reindex_one_database(cell->val, dbname, REINDEX_TABLE, host,
- port, username, prompt_password, progname,
- echo, verbose, concurrently);
- }
- }
+ if (tables.head != NULL)
+ reindex_one_database(dbname, REINDEX_TABLE, &tables, host,
+ port, username, prompt_password, progname,
+ echo, verbose, concurrently,
+ Min(concurrentCons, tbl_count));
/*
* reindex database only if neither index nor table nor schema is
* specified
*/
if (indexes.head == NULL && tables.head == NULL && schemas.head == NULL)
- reindex_one_database(NULL, dbname, REINDEX_DATABASE, host,
+ reindex_one_database(dbname, REINDEX_DATABASE, NULL, host,
port, username, prompt_password, progname,
- echo, verbose, concurrently);
+ echo, verbose, concurrently, concurrentCons);
}
exit(0);
}
static void
-reindex_one_database(const char *name, const char *dbname, ReindexType type,
- const char *host, const char *port, const char *username,
+reindex_one_database(const char *dbname, ReindexType type,
+ SimpleStringList *user_list, const char *host,
+ const char *port, const char *username,
enum trivalue prompt_password, const char *progname, bool echo,
- bool verbose, bool concurrently)
+ bool verbose, bool concurrently, int concurrentCons)
{
- PQExpBufferData sql;
PGconn *conn;
+ SimpleStringListCell *cell;
+ bool parallel = concurrentCons > 1;
+ SimpleStringList *process_list = user_list;
+ ReindexType process_type = type;
+ ParallelSlot *slots;
+ bool failed = false;
conn = connectDatabase(dbname, host, port, username, prompt_password,
progname, echo, false, false);
@@ -308,6 +340,91 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type,
exit(1);
}
+ if (!parallel)
+ {
+ if (user_list == NULL)
+ {
+ /*
+ * Create a dummy list with an empty string, as user requires an
+ * element.
+ */
+ process_list = pg_malloc0(sizeof(SimpleStringList));
+ simple_string_list_append(process_list, "");
+ }
+ }
+ else
+ {
+ /*
+ * Database-wide parallel reindex requires special processing. If
+ * multiple jobs were asked, we have to reindex system catalogs first,
+ * as they can't be processed in parallel.
+ */
+ if (process_type == REINDEX_DATABASE)
+ {
+ Assert(user_list == NULL);
+ run_reindex_command(conn, REINDEX_SYSTEM, NULL, echo, verbose,
+ concurrently, false);
+
+ process_type = REINDEX_TABLE;
+ process_list = get_parallel_object_list(conn, echo);
+
+ /* Bail out if nothing to process */
+ if (process_list == NULL)
+ {
+ PQfinish(conn);
+ return;
+ }
+ }
+ else
+ Assert(user_list != NULL);
+ }
+
+ slots = ParallelSlotsSetup(dbname, host, port, username, prompt_password,
+ progname, echo, conn, concurrentCons);
+
+ cell = process_list->head;
+ do
+ {
+ const char *objname = cell->val;
+ ParallelSlot *free_slot = NULL;
+
+ if (CancelRequested)
+ {
+ failed = true;
+ goto finish;
+ }
+
+ free_slot = ParallelSlotsGetIdle(slots, concurrentCons);
+ if (!free_slot)
+ {
+ failed = true;
+ goto finish;
+ }
+
+ run_reindex_command(free_slot->connection, process_type, objname,
+ echo, verbose, concurrently, true);
+
+ cell = cell->next;
+ } while (cell != NULL);
+
+ if (!ParallelSlotsWaitCompletion(slots, concurrentCons))
+ failed = true;
+
+finish:
+ ParallelSlotsTerminate(slots, concurrentCons);
+ pfree(slots);
+
+ if (failed)
+ exit(1);
+}
+
+static void
+run_reindex_command(PGconn *conn, ReindexType type, const char *name,
+ bool echo, bool verbose, bool concurrently, bool async)
+{
+ PQExpBufferData sql;
+ bool status;
+
/* build the REINDEX query */
initPQExpBuffer(&sql);
@@ -358,7 +475,17 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type,
/* finish the query */
appendPQExpBufferChar(&sql, ';');
- if (!executeMaintenanceCommand(conn, sql.data, echo))
+ if (async)
+ {
+ if (echo)
+ printf("%s\n", sql.data);
+
+ status = PQsendQuery(conn, sql.data) == 1;
+ }
+ else
+ status = executeMaintenanceCommand(conn, sql.data, echo);
+
+ if (!status)
{
switch (type)
{
@@ -383,20 +510,90 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type,
name, PQdb(conn), PQerrorMessage(conn));
break;
}
- PQfinish(conn);
- exit(1);
+ if (!async)
+ {
+ PQfinish(conn);
+ exit(1);
+ }
}
- PQfinish(conn);
termPQExpBuffer(&sql);
}
+/*
+ * Prepare the list of objects to process by querying the catalogs.
+ *
+ * This function will return a SimpleStringList object containing the entire
+ * list of tables in the given database that should be processed by a parallel
+ * database-wide reindex (excluding system tables), or NULL if there's no such
+ * table.
+ */
+static SimpleStringList *
+get_parallel_object_list(PGconn *conn, bool echo)
+{
+ PQExpBufferData catalog_query;
+ PQExpBufferData buf;
+ PGresult *res;
+ SimpleStringList *tables;
+ int ntups,
+ i;
+
+ initPQExpBuffer(&catalog_query);
+
+ /*
+ * This query is run using a safe search_path, so there's no need to fully
+ * qualify everything.
+ */
+ appendPQExpBuffer(&catalog_query,
+ "SELECT c.relname, ns.nspname\n"
+ " FROM pg_catalog.pg_class c\n"
+ " JOIN pg_catalog.pg_namespace ns"
+ " ON c.relnamespace = ns.oid\n"
+ " WHERE ns.nspname != 'pg_catalog'\n"
+ " AND c.relkind IN ("
+ CppAsString2(RELKIND_RELATION) ", "
+ CppAsString2(RELKIND_MATVIEW) ")\n"
+ " ORDER BY c.relpages DESC;");
+
+ res = executeQuery(conn, catalog_query.data, echo);
+ termPQExpBuffer(&catalog_query);
+
+ /*
+ * If no rows are returned, there are no matching tables, so we are done.
+ */
+ ntups = PQntuples(res);
+ if (ntups == 0)
+ {
+ PQclear(res);
+ PQfinish(conn);
+ return NULL;
+ }
+
+ tables = pg_malloc0(sizeof(SimpleStringList));
+
+ /* Build qualified identifiers for each table */
+ initPQExpBuffer(&buf);
+ for (i = 0; i < ntups; i++)
+ {
+ appendPQExpBufferStr(&buf,
+ fmtQualifiedId(PQgetvalue(res, i, 1),
+ PQgetvalue(res, i, 0)));
+
+ simple_string_list_append(tables, buf.data);
+ resetPQExpBuffer(&buf);
+ }
+ termPQExpBuffer(&buf);
+ PQclear(res);
+
+ return tables;
+}
+
static void
reindex_all_databases(const char *maintenance_db,
const char *host, const char *port,
const char *username, enum trivalue prompt_password,
const char *progname, bool echo, bool quiet, bool verbose,
- bool concurrently)
+ bool concurrently, int concurrentCons)
{
PGconn *conn;
PGresult *result;
@@ -423,9 +620,10 @@ reindex_all_databases(const char *maintenance_db,
appendPQExpBufferStr(&connstr, "dbname=");
appendConnStrVal(&connstr, dbname);
- reindex_one_database(NULL, connstr.data, REINDEX_DATABASE, host,
+ reindex_one_database(connstr.data, REINDEX_DATABASE, NULL, host,
port, username, prompt_password,
- progname, echo, verbose, concurrently);
+ progname, echo, verbose, concurrently,
+ concurrentCons);
}
termPQExpBuffer(&connstr);
@@ -444,6 +642,7 @@ help(const char *progname)
printf(_(" -d, --dbname=DBNAME database to reindex\n"));
printf(_(" -e, --echo show the commands being sent to the server\n"));
printf(_(" -i, --index=INDEX recreate specific index(es) only\n"));
+ printf(_(" -j, --jobs=NUM use this many concurrent connections to reindex\n"));
printf(_(" -q, --quiet don't write any messages\n"));
printf(_(" -s, --system reindex system catalogs\n"));
printf(_(" -S, --schema=SCHEMA reindex specific schema(s) only\n"));
diff --git a/src/bin/scripts/t/090_reindexdb.pl b/src/bin/scripts/t/090_reindexdb.pl
index 1af8ab70ad..364a771fdc 100644
--- a/src/bin/scripts/t/090_reindexdb.pl
+++ b/src/bin/scripts/t/090_reindexdb.pl
@@ -3,7 +3,7 @@ use warnings;
use PostgresNode;
use TestLib;
-use Test::More tests => 34;
+use Test::More tests => 39;
program_help_ok('reindexdb');
program_version_ok('reindexdb');
@@ -77,3 +77,13 @@ $node->command_ok(
$node->command_ok(
[qw(reindexdb --echo --system dbname=template1)],
'reindexdb system with connection string');
+
+# parallel processing
+$node->command_fails([qw(reindexdb -j2 -s)],
+ 'reindexdb cannot process system catalogs in parallel');
+$node->issues_sql_like([qw(reindexdb -j2)],
+ qr/statement: REINDEX SYSTEM postgres/,
+ 'Global and parallel reindex will issue a REINDEX SYSTEM');
+$node->issues_sql_like([qw(reindexdb -j2)],
+ qr/statement: REINDEX TABLE public.test1/,
+ 'Global and parallel reindex will issue per-table REINDEX');