diff --git a/doc/src/sgml/ref/reindexdb.sgml b/doc/src/sgml/ref/reindexdb.sgml index 25b5a72770..a7031030b9 100644 --- a/doc/src/sgml/ref/reindexdb.sgml +++ b/doc/src/sgml/ref/reindexdb.sgml @@ -166,6 +166,29 @@ PostgreSQL documentation + + + + + + Execute the reindex commands in parallel by running + njobs + commands simultaneously. This option reduces the time of the + processing but it also increases the load on the database server. + + + reindexdb will open + njobs connections to the + database, so make sure your + setting is high enough to accommodate all connections. + + + Note that this mode is not compatible the + or the options. + + + + diff --git a/src/bin/scripts/Makefile b/src/bin/scripts/Makefile index 3cd793b134..ede665090f 100644 --- a/src/bin/scripts/Makefile +++ b/src/bin/scripts/Makefile @@ -29,7 +29,7 @@ dropdb: dropdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake- dropuser: dropuser.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils clusterdb: clusterdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils vacuumdb: vacuumdb.o common.o scripts_parallel.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils -reindexdb: reindexdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils +reindexdb: reindexdb.o common.o scripts_parallel.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils pg_isready: pg_isready.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils install: all installdirs diff --git a/src/bin/scripts/reindexdb.c b/src/bin/scripts/reindexdb.c index 219a9a9211..d5ccb54a78 100644 --- a/src/bin/scripts/reindexdb.c +++ b/src/bin/scripts/reindexdb.c @@ -10,10 +10,14 @@ */ #include "postgres_fe.h" + +#include "catalog/pg_class_d.h" #include "common.h" #include "common/logging.h" +#include "fe_utils/connect.h" #include "fe_utils/simple_list.h" #include "fe_utils/string_utils.h" +#include "scripts_parallel.h" typedef enum ReindexType { @@ -25,16 +29,23 @@ typedef enum ReindexType } ReindexType; -static void reindex_one_database(const char *name, const char *dbname, - ReindexType type, const char *host, +static SimpleStringList *get_parallel_object_list(PGconn *conn, bool echo); +static void reindex_one_database(const char *dbname, ReindexType type, + SimpleStringList *user_list, const char *host, const char *port, const char *username, enum trivalue prompt_password, const char *progname, - bool echo, bool verbose, bool concurrently); + bool echo, bool verbose, bool concurrently, + int concurrentCons); static void reindex_all_databases(const char *maintenance_db, const char *host, const char *port, const char *username, enum trivalue prompt_password, const char *progname, bool echo, - bool quiet, bool verbose, bool concurrently); + bool quiet, bool verbose, bool concurrently, + int concurrentCons); +static void run_reindex_command(PGconn *conn, ReindexType type, + const char *name, bool echo, bool verbose, + bool concurrently, bool async); + static void help(const char *progname); int @@ -54,6 +65,7 @@ main(int argc, char *argv[]) {"system", no_argument, NULL, 's'}, {"table", required_argument, NULL, 't'}, {"index", required_argument, NULL, 'i'}, + {"jobs", required_argument, NULL, 'j'}, {"verbose", no_argument, NULL, 'v'}, {"concurrently", no_argument, NULL, 1}, {"maintenance-db", required_argument, NULL, 2}, @@ -79,6 +91,9 @@ main(int argc, char *argv[]) SimpleStringList indexes = {NULL, NULL}; SimpleStringList tables = {NULL, NULL}; SimpleStringList schemas = {NULL, NULL}; + int concurrentCons = 1; + int tbl_count = 0, + nsp_count = 0; pg_logging_init(argv[0]); progname = get_progname(argv[0]); @@ -87,7 +102,7 @@ main(int argc, char *argv[]) handle_help_version_opts(argc, argv, "reindexdb", help); /* process command-line options */ - while ((c = getopt_long(argc, argv, "h:p:U:wWeqS:d:ast:i:v", long_options, &optindex)) != -1) + while ((c = getopt_long(argc, argv, "h:p:U:wWeqS:d:ast:i:j:v", long_options, &optindex)) != -1) { switch (c) { @@ -114,6 +129,7 @@ main(int argc, char *argv[]) break; case 'S': simple_string_list_append(&schemas, optarg); + nsp_count++; break; case 'd': dbname = pg_strdup(optarg); @@ -126,10 +142,25 @@ main(int argc, char *argv[]) break; case 't': simple_string_list_append(&tables, optarg); + tbl_count++; break; case 'i': simple_string_list_append(&indexes, optarg); break; + case 'j': + concurrentCons = atoi(optarg); + if (concurrentCons <= 0) + { + pg_log_error("number of parallel jobs must be at least 1"); + exit(1); + } + if (concurrentCons > FD_SETSIZE - 1) + { + pg_log_error("too many parallel jobs requested (maximum: %d)", + FD_SETSIZE - 1); + exit(1); + } + break; case 'v': verbose = true; break; @@ -194,7 +225,8 @@ main(int argc, char *argv[]) } reindex_all_databases(maintenance_db, host, port, username, - prompt_password, progname, echo, quiet, verbose, concurrently); + prompt_password, progname, echo, quiet, verbose, + concurrently, concurrentCons); } else if (syscatalog) { @@ -214,6 +246,12 @@ main(int argc, char *argv[]) exit(1); } + if (concurrentCons > 1) + { + pg_log_error("cannot use multiple jobs to reindex system catalogs"); + exit(1); + } + if (dbname == NULL) { if (getenv("PGDATABASE")) @@ -224,9 +262,9 @@ main(int argc, char *argv[]) dbname = get_user_name_or_exit(progname); } - reindex_one_database(NULL, dbname, REINDEX_SYSTEM, host, + reindex_one_database(dbname, REINDEX_SYSTEM, NULL, host, port, username, prompt_password, progname, - echo, verbose, concurrently); + echo, verbose, concurrently, 1); } else { @@ -241,61 +279,55 @@ main(int argc, char *argv[]) } if (schemas.head != NULL) - { - SimpleStringListCell *cell; - - for (cell = schemas.head; cell; cell = cell->next) - { - reindex_one_database(cell->val, dbname, REINDEX_SCHEMA, host, - port, username, prompt_password, progname, - echo, verbose, concurrently); - } - } + reindex_one_database(dbname, REINDEX_SCHEMA, &schemas, host, + port, username, prompt_password, progname, + echo, verbose, concurrently, + Min(concurrentCons, nsp_count)); if (indexes.head != NULL) - { - SimpleStringListCell *cell; - for (cell = indexes.head; cell; cell = cell->next) - { - reindex_one_database(cell->val, dbname, REINDEX_INDEX, host, - port, username, prompt_password, progname, - echo, verbose, concurrently); - } - } - if (tables.head != NULL) - { - SimpleStringListCell *cell; + /* + * An index list cannot be processed by multiple connections, as + * it would require to distribute the work by owner tables. We + * simply ignore the passed number of jobs if any. + */ + reindex_one_database(dbname, REINDEX_INDEX, &indexes, host, + port, username, prompt_password, progname, + echo, verbose, concurrently, 1); - for (cell = tables.head; cell; cell = cell->next) - { - reindex_one_database(cell->val, dbname, REINDEX_TABLE, host, - port, username, prompt_password, progname, - echo, verbose, concurrently); - } - } + if (tables.head != NULL) + reindex_one_database(dbname, REINDEX_TABLE, &tables, host, + port, username, prompt_password, progname, + echo, verbose, concurrently, + Min(concurrentCons, tbl_count)); /* * reindex database only if neither index nor table nor schema is * specified */ if (indexes.head == NULL && tables.head == NULL && schemas.head == NULL) - reindex_one_database(NULL, dbname, REINDEX_DATABASE, host, + reindex_one_database(dbname, REINDEX_DATABASE, NULL, host, port, username, prompt_password, progname, - echo, verbose, concurrently); + echo, verbose, concurrently, concurrentCons); } exit(0); } static void -reindex_one_database(const char *name, const char *dbname, ReindexType type, - const char *host, const char *port, const char *username, +reindex_one_database(const char *dbname, ReindexType type, + SimpleStringList *user_list, const char *host, + const char *port, const char *username, enum trivalue prompt_password, const char *progname, bool echo, - bool verbose, bool concurrently) + bool verbose, bool concurrently, int concurrentCons) { - PQExpBufferData sql; PGconn *conn; + SimpleStringListCell *cell; + bool parallel = concurrentCons > 1; + SimpleStringList *process_list = user_list; + ReindexType process_type = type; + ParallelSlot *slots; + bool failed = false; conn = connectDatabase(dbname, host, port, username, prompt_password, progname, echo, false, false); @@ -308,6 +340,91 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type, exit(1); } + if (!parallel) + { + if (user_list == NULL) + { + /* + * Create a dummy list with an empty string, as user requires an + * element. + */ + process_list = pg_malloc0(sizeof(SimpleStringList)); + simple_string_list_append(process_list, ""); + } + } + else + { + /* + * Database-wide parallel reindex requires special processing. If + * multiple jobs were asked, we have to reindex system catalogs first, + * as they can't be processed in parallel. + */ + if (process_type == REINDEX_DATABASE) + { + Assert(user_list == NULL); + run_reindex_command(conn, REINDEX_SYSTEM, NULL, echo, verbose, + concurrently, false); + + process_type = REINDEX_TABLE; + process_list = get_parallel_object_list(conn, echo); + + /* Bail out if nothing to process */ + if (process_list == NULL) + { + PQfinish(conn); + return; + } + } + else + Assert(user_list != NULL); + } + + slots = ParallelSlotsSetup(dbname, host, port, username, prompt_password, + progname, echo, conn, concurrentCons); + + cell = process_list->head; + do + { + const char *objname = cell->val; + ParallelSlot *free_slot = NULL; + + if (CancelRequested) + { + failed = true; + goto finish; + } + + free_slot = ParallelSlotsGetIdle(slots, concurrentCons); + if (!free_slot) + { + failed = true; + goto finish; + } + + run_reindex_command(free_slot->connection, process_type, objname, + echo, verbose, concurrently, true); + + cell = cell->next; + } while (cell != NULL); + + if (!ParallelSlotsWaitCompletion(slots, concurrentCons)) + failed = true; + +finish: + ParallelSlotsTerminate(slots, concurrentCons); + pfree(slots); + + if (failed) + exit(1); +} + +static void +run_reindex_command(PGconn *conn, ReindexType type, const char *name, + bool echo, bool verbose, bool concurrently, bool async) +{ + PQExpBufferData sql; + bool status; + /* build the REINDEX query */ initPQExpBuffer(&sql); @@ -358,7 +475,17 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type, /* finish the query */ appendPQExpBufferChar(&sql, ';'); - if (!executeMaintenanceCommand(conn, sql.data, echo)) + if (async) + { + if (echo) + printf("%s\n", sql.data); + + status = PQsendQuery(conn, sql.data) == 1; + } + else + status = executeMaintenanceCommand(conn, sql.data, echo); + + if (!status) { switch (type) { @@ -383,20 +510,90 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type, name, PQdb(conn), PQerrorMessage(conn)); break; } - PQfinish(conn); - exit(1); + if (!async) + { + PQfinish(conn); + exit(1); + } } - PQfinish(conn); termPQExpBuffer(&sql); } +/* + * Prepare the list of objects to process by querying the catalogs. + * + * This function will return a SimpleStringList object containing the entire + * list of tables in the given database that should be processed by a parallel + * database-wide reindex (excluding system tables), or NULL if there's no such + * table. + */ +static SimpleStringList * +get_parallel_object_list(PGconn *conn, bool echo) +{ + PQExpBufferData catalog_query; + PQExpBufferData buf; + PGresult *res; + SimpleStringList *tables; + int ntups, + i; + + initPQExpBuffer(&catalog_query); + + /* + * This query is run using a safe search_path, so there's no need to fully + * qualify everything. + */ + appendPQExpBuffer(&catalog_query, + "SELECT c.relname, ns.nspname\n" + " FROM pg_catalog.pg_class c\n" + " JOIN pg_catalog.pg_namespace ns" + " ON c.relnamespace = ns.oid\n" + " WHERE ns.nspname != 'pg_catalog'\n" + " AND c.relkind IN (" + CppAsString2(RELKIND_RELATION) ", " + CppAsString2(RELKIND_MATVIEW) ")\n" + " ORDER BY c.relpages DESC;"); + + res = executeQuery(conn, catalog_query.data, echo); + termPQExpBuffer(&catalog_query); + + /* + * If no rows are returned, there are no matching tables, so we are done. + */ + ntups = PQntuples(res); + if (ntups == 0) + { + PQclear(res); + PQfinish(conn); + return NULL; + } + + tables = pg_malloc0(sizeof(SimpleStringList)); + + /* Build qualified identifiers for each table */ + initPQExpBuffer(&buf); + for (i = 0; i < ntups; i++) + { + appendPQExpBufferStr(&buf, + fmtQualifiedId(PQgetvalue(res, i, 1), + PQgetvalue(res, i, 0))); + + simple_string_list_append(tables, buf.data); + resetPQExpBuffer(&buf); + } + termPQExpBuffer(&buf); + PQclear(res); + + return tables; +} + static void reindex_all_databases(const char *maintenance_db, const char *host, const char *port, const char *username, enum trivalue prompt_password, const char *progname, bool echo, bool quiet, bool verbose, - bool concurrently) + bool concurrently, int concurrentCons) { PGconn *conn; PGresult *result; @@ -423,9 +620,10 @@ reindex_all_databases(const char *maintenance_db, appendPQExpBufferStr(&connstr, "dbname="); appendConnStrVal(&connstr, dbname); - reindex_one_database(NULL, connstr.data, REINDEX_DATABASE, host, + reindex_one_database(connstr.data, REINDEX_DATABASE, NULL, host, port, username, prompt_password, - progname, echo, verbose, concurrently); + progname, echo, verbose, concurrently, + concurrentCons); } termPQExpBuffer(&connstr); @@ -444,6 +642,7 @@ help(const char *progname) printf(_(" -d, --dbname=DBNAME database to reindex\n")); printf(_(" -e, --echo show the commands being sent to the server\n")); printf(_(" -i, --index=INDEX recreate specific index(es) only\n")); + printf(_(" -j, --jobs=NUM use this many concurrent connections to reindex\n")); printf(_(" -q, --quiet don't write any messages\n")); printf(_(" -s, --system reindex system catalogs\n")); printf(_(" -S, --schema=SCHEMA reindex specific schema(s) only\n")); diff --git a/src/bin/scripts/t/090_reindexdb.pl b/src/bin/scripts/t/090_reindexdb.pl index 1af8ab70ad..364a771fdc 100644 --- a/src/bin/scripts/t/090_reindexdb.pl +++ b/src/bin/scripts/t/090_reindexdb.pl @@ -3,7 +3,7 @@ use warnings; use PostgresNode; use TestLib; -use Test::More tests => 34; +use Test::More tests => 39; program_help_ok('reindexdb'); program_version_ok('reindexdb'); @@ -77,3 +77,13 @@ $node->command_ok( $node->command_ok( [qw(reindexdb --echo --system dbname=template1)], 'reindexdb system with connection string'); + +# parallel processing +$node->command_fails([qw(reindexdb -j2 -s)], + 'reindexdb cannot process system catalogs in parallel'); +$node->issues_sql_like([qw(reindexdb -j2)], + qr/statement: REINDEX SYSTEM postgres/, + 'Global and parallel reindex will issue a REINDEX SYSTEM'); +$node->issues_sql_like([qw(reindexdb -j2)], + qr/statement: REINDEX TABLE public.test1/, + 'Global and parallel reindex will issue per-table REINDEX');