diff --git a/contrib/pg_stat_statements/expected/wal.out b/contrib/pg_stat_statements/expected/wal.out index 34a2bf5b03..4b2220a96b 100644 --- a/contrib/pg_stat_statements/expected/wal.out +++ b/contrib/pg_stat_statements/expected/wal.out @@ -17,7 +17,7 @@ FROM pg_stat_statements ORDER BY query COLLATE "C"; --------------------------------------------------------------+-------+------+---------------------+-----------------------+--------------------- DELETE FROM pgss_wal_tab WHERE a > $1 | 1 | 1 | t | t | t INSERT INTO pgss_wal_tab VALUES(generate_series($1, $2), $3) | 1 | 10 | t | t | t - SELECT pg_stat_statements_reset() IS NOT NULL AS t | 1 | 1 | f | f | f + SELECT pg_stat_statements_reset() IS NOT NULL AS t | 1 | 1 | t | t | t SET pg_stat_statements.track_utility = FALSE | 1 | 0 | f | f | t UPDATE pgss_wal_tab SET b = $1 WHERE a > $2 | 1 | 3 | t | t | t (5 rows) diff --git a/contrib/pg_stat_statements/pg_stat_statements.c b/contrib/pg_stat_statements/pg_stat_statements.c index 67cec865ba..d0220fd9eb 100644 --- a/contrib/pg_stat_statements/pg_stat_statements.c +++ b/contrib/pg_stat_statements/pg_stat_statements.c @@ -74,6 +74,10 @@ #include "utils/memutils.h" #include "utils/timestamp.h" +#include "access/xlog.h" +#include "access/xlog_internal.h" +#include "access/xloginsert.h" + PG_MODULE_MAGIC; /* Location of permanent stats file (valid when database is shut down) */ @@ -323,7 +327,6 @@ PG_FUNCTION_INFO_V1(pg_stat_statements_info); static void pgss_shmem_request(void); static void pgss_shmem_startup(void); -static void pgss_shmem_shutdown(int code, Datum arg); static void pgss_post_parse_analyze(ParseState *pstate, Query *query, JumbleState *jstate); static PlannedStmt *pgss_planner(Query *parse, @@ -370,6 +373,50 @@ static void fill_in_constant_lengths(JumbleState *jstate, const char *query, int query_loc); static int comp_location(const void *a, const void *b); +/* RMGR API */ +#define CUSTOMRMGR_ID RM_EXPERIMENTAL_ID +#define CUSTOMRMGR_NAME "pgss_rmgr" + +static void rmgr_redo(XLogReaderState *record); +static void rmgr_desc(StringInfo buf, XLogReaderState *record); +static const char *rmgr_identify(uint8 info); +static void rmgr_checkpoint(int flags); + +/* WAL record definitions */ +#define PGSS_XLOG_INSERT 0x00 +#define PGSS_XLOG_RESET 0x10 + +/* The necessary fields from pgssEntry */ +typedef struct pgssXLogInsert +{ + uint32 header; + pgssHashKey key; + Counters counters; + int encoding; + TimestampTz stats_since; + TimestampTz minmax_stats_since; + int query_len; + char qtext[FLEXIBLE_ARRAY_MEMBER]; +} pgssXLogInsert; + +/* The params of entry_reset() function */ +typedef struct pgssXLogReset +{ + uint32 header; + Oid userid; + uint64 queryid; + Oid dbid; + bool minmax_only; +} pgssXLogReset; + +/* RMGR data */ +const RmgrData pgss_rmgr = { + .rm_name = CUSTOMRMGR_NAME, + .rm_redo = rmgr_redo, + .rm_checkpoint = rmgr_checkpoint, + .rm_identify = rmgr_identify, + .rm_desc = rmgr_desc +}; /* * Module load callback @@ -457,6 +504,8 @@ _PG_init(void) MarkGUCPrefixReserved("pg_stat_statements"); + RegisterCustomRmgr(CUSTOMRMGR_ID, &pgss_rmgr); + /* * Install hooks. */ @@ -556,9 +605,12 @@ pgss_shmem_startup(void) /* * If we're in the postmaster (or a standalone backend...), set up a shmem * exit hook to dump the statistics to disk. + * + * Now we do it at CHECKPOINT. + * + *if (!IsUnderPostmaster) + * on_shmem_exit(pgss_shmem_shutdown, (Datum) 0); */ - if (!IsUnderPostmaster) - on_shmem_exit(pgss_shmem_shutdown, (Datum) 0); /* * Done if some other process already completed our initialization. @@ -720,108 +772,6 @@ fail: */ } -/* - * shmem_shutdown hook: Dump statistics into file. - * - * Note: we don't bother with acquiring lock, because there should be no - * other processes running when this is called. - */ -static void -pgss_shmem_shutdown(int code, Datum arg) -{ - FILE *file; - char *qbuffer = NULL; - Size qbuffer_size = 0; - HASH_SEQ_STATUS hash_seq; - int32 num_entries; - pgssEntry *entry; - - /* Don't try to dump during a crash. */ - if (code) - return; - - /* Safety check ... shouldn't get here unless shmem is set up. */ - if (!pgss || !pgss_hash) - return; - - /* Don't dump if told not to. */ - if (!pgss_save) - return; - - file = AllocateFile(PGSS_DUMP_FILE ".tmp", PG_BINARY_W); - if (file == NULL) - goto error; - - if (fwrite(&PGSS_FILE_HEADER, sizeof(uint32), 1, file) != 1) - goto error; - if (fwrite(&PGSS_PG_MAJOR_VERSION, sizeof(uint32), 1, file) != 1) - goto error; - num_entries = hash_get_num_entries(pgss_hash); - if (fwrite(&num_entries, sizeof(int32), 1, file) != 1) - goto error; - - qbuffer = qtext_load_file(&qbuffer_size); - if (qbuffer == NULL) - goto error; - - /* - * When serializing to disk, we store query texts immediately after their - * entry data. Any orphaned query texts are thereby excluded. - */ - hash_seq_init(&hash_seq, pgss_hash); - while ((entry = hash_seq_search(&hash_seq)) != NULL) - { - int len = entry->query_len; - char *qstr = qtext_fetch(entry->query_offset, len, - qbuffer, qbuffer_size); - - if (qstr == NULL) - continue; /* Ignore any entries with bogus texts */ - - if (fwrite(entry, sizeof(pgssEntry), 1, file) != 1 || - fwrite(qstr, 1, len + 1, file) != len + 1) - { - /* note: we assume hash_seq_term won't change errno */ - hash_seq_term(&hash_seq); - goto error; - } - } - - /* Dump global statistics for pg_stat_statements */ - if (fwrite(&pgss->stats, sizeof(pgssGlobalStats), 1, file) != 1) - goto error; - - free(qbuffer); - qbuffer = NULL; - - if (FreeFile(file)) - { - file = NULL; - goto error; - } - - /* - * Rename file into place, so we atomically replace any old one. - */ - (void) durable_rename(PGSS_DUMP_FILE ".tmp", PGSS_DUMP_FILE, LOG); - - /* Unlink query-texts file; it's not needed while shutdown */ - unlink(PGSS_TEXT_FILE); - - return; - -error: - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not write file \"%s\": %m", - PGSS_DUMP_FILE ".tmp"))); - free(qbuffer); - if (file) - FreeFile(file); - unlink(PGSS_DUMP_FILE ".tmp"); - unlink(PGSS_TEXT_FILE); -} - /* * Post-parse-analysis hook: mark query with a queryId */ @@ -1284,6 +1234,7 @@ pgss_store(const char *query, uint64 queryId, pgssEntry *entry; char *norm_query = NULL; int encoding = GetDatabaseEncoding(); + bool qtext_stored = false; Assert(query != NULL); @@ -1325,7 +1276,6 @@ pgss_store(const char *query, uint64 queryId, { Size query_offset; int gc_count; - bool stored; bool do_gc; /* @@ -1345,7 +1295,7 @@ pgss_store(const char *query, uint64 queryId, } /* Append new query text to file with only shared lock held */ - stored = qtext_store(norm_query ? norm_query : query, query_len, + qtext_stored = qtext_store(norm_query ? norm_query : query, query_len, &query_offset, &gc_count); /* @@ -1366,12 +1316,12 @@ pgss_store(const char *query, uint64 queryId, * This should be infrequent enough that doing it while holding * exclusive lock isn't a performance problem. */ - if (!stored || pgss->gc_count != gc_count) - stored = qtext_store(norm_query ? norm_query : query, query_len, + if (!qtext_stored || pgss->gc_count != gc_count) + qtext_stored = qtext_store(norm_query ? norm_query : query, query_len, &query_offset, NULL); /* If we failed to write to the text file, give up */ - if (!stored) + if (!qtext_stored) goto done; /* OK to create a new hashtable entry */ @@ -1486,6 +1436,38 @@ pgss_store(const char *query, uint64 queryId, SpinLockRelease(&e->mutex); } + /* Write entry to XLOG */ + if (pgss_save && !RecoveryInProgress()) + { + pgssXLogInsert *xlog_entry; + XLogRecPtr ptr; + + xlog_entry = palloc(sizeof(pgssXLogInsert)); + xlog_entry->header = PGSS_FILE_HEADER; + xlog_entry->key = entry->key; + xlog_entry->counters = entry->counters; + xlog_entry->encoding = entry->encoding; + xlog_entry->stats_since = entry->stats_since; + xlog_entry->minmax_stats_since = entry->minmax_stats_since; + + XLogBeginInsert(); + XLogRegisterData((char *) xlog_entry, offsetof(pgssXLogInsert, qtext)); + + /* Write the query text if need */ + if (qtext_stored) + { + xlog_entry->query_len = entry->query_len; + XLogRegisterData(norm_query ? norm_query : (char *) query, query_len); + } + else + xlog_entry->query_len = 0; + + XLogSetRecordFlags(XLOG_MARK_UNIMPORTANT); + ptr = XLogInsert(CUSTOMRMGR_ID, PGSS_XLOG_INSERT); + XLogFlush(ptr); + pfree(xlog_entry); + } + done: LWLockRelease(pgss->lock); @@ -2676,6 +2658,27 @@ entry_reset(Oid userid, Oid dbid, uint64 queryid, bool minmax_only) LWLockAcquire(pgss->lock, LW_EXCLUSIVE); num_entries = hash_get_num_entries(pgss_hash); + /* Write entry to XLOG */ + if (pgss_save && !RecoveryInProgress()) + { + pgssXLogReset *xlrec; + XLogRecPtr ptr; + + xlrec = palloc0(sizeof(pgssXLogReset)); + xlrec->header = PGSS_FILE_HEADER; + xlrec->userid = userid; + xlrec->dbid = dbid; + xlrec->queryid = queryid; + xlrec->minmax_only = minmax_only; + + XLogBeginInsert(); + XLogRegisterData((char *) xlrec, sizeof(pgssXLogReset)); + XLogSetRecordFlags(XLOG_MARK_UNIMPORTANT); + ptr = XLogInsert(CUSTOMRMGR_ID, PGSS_XLOG_RESET); + XLogFlush(ptr); + pfree(xlrec); + } + stats_reset = GetCurrentTimestamp(); if (userid != 0 && dbid != 0 && queryid != UINT64CONST(0)) @@ -3010,3 +3013,236 @@ comp_location(const void *a, const void *b) return pg_cmp_s32(l, r); } + +static void +rmgr_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + /* + * Because we did not restore records from storage, + * we also do not restore records from WAL. + */ + if (!pgss_save) + return; + + if (info == PGSS_XLOG_INSERT) + { + pgssXLogInsert *xlrec = (pgssXLogInsert *) XLogRecGetData(record); + pgssEntry *entry; + + if (xlrec->header != PGSS_FILE_HEADER) + { + elog(WARNING, "Skip the inconsistent WAL record"); + return; + } + + /* Safety check... */ + if (!pgss || !pgss_hash) + return; + + LWLockAcquire(pgss->lock, LW_EXCLUSIVE); + + entry = (pgssEntry *) hash_search(pgss_hash, &xlrec->key, HASH_FIND, NULL); + + /* Create new entry, if not present */ + if (!entry) + { + Size query_offset; + bool stored; + char *query; + + Assert(xlrec->query_len > 0); + + query = (char *) xlrec->qtext; + + /* Append new query text to file */ + stored = qtext_store(query, xlrec->query_len, &query_offset, NULL); + + /* If we failed to write to the text file, give up */ + if (!stored) + { + LWLockRelease(pgss->lock); + return; + } + + /* OK to create a new hashtable entry */ + entry = entry_alloc(&xlrec->key, query_offset, xlrec->query_len, + xlrec->encoding, false); + + /* If needed, perform garbage collection */ + gc_qtexts(); + } + + /* Copy the necessary data from XLog record */ + entry->counters = xlrec->counters; + + entry->encoding = xlrec->encoding; + entry->stats_since = xlrec->stats_since; + entry->minmax_stats_since = xlrec->minmax_stats_since; + + LWLockRelease(pgss->lock); + } + else if (info == PGSS_XLOG_RESET) + { + pgssXLogReset *xlrec = (pgssXLogReset *) XLogRecGetData(record); + + if (xlrec->header != PGSS_FILE_HEADER) + { + elog(WARNING, "Skip the inconsistent WAL record"); + return; + } + + entry_reset(xlrec->userid, xlrec->dbid, xlrec->queryid, xlrec->minmax_only); + } +} + +static void +rmgr_desc(StringInfo buf, XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info == PGSS_XLOG_INSERT) + { + pgssXLogInsert *xlrec = (pgssXLogInsert *) rec; + + if (xlrec->header != PGSS_FILE_HEADER) + { + elog(WARNING, "Skip the inconsistent WAL record"); + return; + } + + appendStringInfo(buf, "userid: %u, dbid: %u, queryid: " UINT64_FORMAT + ", toplevel: %d", + xlrec->key.userid, xlrec->key.dbid, xlrec->key.queryid, + (int) xlrec->key.toplevel); + } + else if (info == PGSS_XLOG_RESET) + { + pgssXLogReset *xlrec = (pgssXLogReset *) rec; + + if (xlrec->header != PGSS_FILE_HEADER) + { + elog(WARNING, "Skip the inconsistent WAL record"); + return; + } + + appendStringInfo(buf, "userid: %u, dbid: %u, queryid: " UINT64_FORMAT + ", minmax_only: %d", + xlrec->userid, xlrec->dbid, xlrec->queryid, + (int) xlrec->minmax_only); + } +} + +static const char * +rmgr_identify(uint8 info) +{ + if ((info & ~XLR_INFO_MASK) == PGSS_XLOG_INSERT) + return "INSERT"; + if ((info & ~XLR_INFO_MASK) == PGSS_XLOG_RESET) + return "RESET"; + + return NULL; +} + +static void +rmgr_checkpoint(int flags) +{ + FILE *file; + char *qbuffer = NULL; + Size qbuffer_size = 0; + HASH_SEQ_STATUS hash_seq; + int32 num_entries; + pgssEntry *entry; + + /* Safety check ... shouldn't get here unless shmem is set up. */ + if (!pgss || !pgss_hash) + return; + + /* Don't dump if told not to. */ + if (!pgss_save) + return; + + /* XXX: Can there be concurrent CHECKPOINTs? */ + LWLockAcquire(pgss->lock, LW_EXCLUSIVE); + + file = AllocateFile(PGSS_DUMP_FILE ".tmp", PG_BINARY_W); + if (file == NULL) + goto error; + + if (fwrite(&PGSS_FILE_HEADER, sizeof(uint32), 1, file) != 1) + goto error; + if (fwrite(&PGSS_PG_MAJOR_VERSION, sizeof(uint32), 1, file) != 1) + goto error; + num_entries = hash_get_num_entries(pgss_hash); + if (fwrite(&num_entries, sizeof(int32), 1, file) != 1) + goto error; + + qbuffer = qtext_load_file(&qbuffer_size); + if (qbuffer == NULL) + goto error; + + /* + * When serializing to disk, we store query texts immediately after their + * entry data. Any orphaned query texts are thereby excluded. + */ + hash_seq_init(&hash_seq, pgss_hash); + while ((entry = hash_seq_search(&hash_seq)) != NULL) + { + int len = entry->query_len; + char *qstr = qtext_fetch(entry->query_offset, len, + qbuffer, qbuffer_size); + + if (qstr == NULL) + continue; /* Ignore any entries with bogus texts */ + + if (fwrite(entry, sizeof(pgssEntry), 1, file) != 1 || + fwrite(qstr, 1, len + 1, file) != len + 1) + { + /* note: we assume hash_seq_term won't change errno */ + hash_seq_term(&hash_seq); + goto error; + } + } + + /* Dump global statistics for pg_stat_statements */ + if (fwrite(&pgss->stats, sizeof(pgssGlobalStats), 1, file) != 1) + goto error; + + free(qbuffer); + qbuffer = NULL; + + if (FreeFile(file)) + { + file = NULL; + goto error; + } + + /* + * Rename file into place, so we atomically replace any old one. + */ + (void) durable_rename(PGSS_DUMP_FILE ".tmp", PGSS_DUMP_FILE, LOG); + + /* Unlink query-texts file; it's not needed while shutdown */ + if (flags & CHECKPOINT_IS_SHUTDOWN) + unlink(PGSS_TEXT_FILE); + + LWLockRelease(pgss->lock); + return; + +error: + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not write file \"%s\": %m", + PGSS_DUMP_FILE ".tmp"))); + free(qbuffer); + if (file) + FreeFile(file); + unlink(PGSS_DUMP_FILE ".tmp"); + + if (flags & CHECKPOINT_IS_SHUTDOWN) + unlink(PGSS_TEXT_FILE); + + LWLockRelease(pgss->lock); +} diff --git a/contrib/pg_stat_statements/t/020_crash.pl b/contrib/pg_stat_statements/t/020_crash.pl new file mode 100644 index 0000000000..f33bb43d7d --- /dev/null +++ b/contrib/pg_stat_statements/t/020_crash.pl @@ -0,0 +1,80 @@ +# Copyright (c) 2023-2024, PostgreSQL Global Development Group + +# Tests for checking that pg_stat_statements contents are preserved +# across restarts. + +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $node = PostgreSQL::Test::Cluster->new('main'); +$node->init; +$node->append_conf('postgresql.conf', + q[ + shared_preload_libraries = 'pg_stat_statements' + restart_after_crash = 1 + ]); +$node->start; + +$node->safe_psql('postgres', 'CREATE EXTENSION pg_stat_statements'); + +# Without the CHECKPOINT hook, we won't see this query in pg_stat_statements +# after a server crash. +$node->safe_psql('postgres', 'CREATE TABLE t1 (a int)'); + +$node->safe_psql('postgres', 'CHECKPOINT'); +$node->safe_psql('postgres', 'SELECT a FROM t1'); + +is( $node->safe_psql( + 'postgres', + "SELECT query FROM pg_stat_statements WHERE query NOT LIKE '%pg_stat_statements%' ORDER BY query" + ), + "CHECKPOINT\nCREATE TABLE t1 (a int)\nSELECT a FROM t1", + 'pg_stat_statements populated'); + + +# Perform a server shutdown by killing the backend. +my $psql_timeout = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default); + +my ($killme_stdin, $killme_stdout, $killme_stderr) = ('', '', ''); +my $killme = IPC::Run::start( + [ + 'psql', '-X', '-qAt', '-v', 'ON_ERROR_STOP=1', '-f', '-', '-d', + $node->connstr('postgres') + ], + '<', + \$killme_stdin, + '>', + \$killme_stdout, + '2>', + \$killme_stderr, + $psql_timeout); + +$killme_stdin .= "SELECT pg_backend_pid();\n"; +ok( pump_until( + $killme, $psql_timeout, \$killme_stdout, qr/[[:digit:]]+[\r\n]$/m), + 'acquired pid for SIGQUIT'); +my $pid = $killme_stdout; +chomp($pid); + +my $ret = PostgreSQL::Test::Utils::system_log('pg_ctl', 'kill', 'QUIT', $pid); +is($ret, 0, "killed process with SIGQUIT"); + +$killme->finish; + +# Wait till server restarts +is($node->poll_query_until('postgres', undef, ''), + "1", "reconnected after SIGQUIT"); + +is( $node->safe_psql( + 'postgres', + "SELECT query FROM pg_stat_statements WHERE query NOT LIKE '%pg_stat_statements%' ORDER BY query" + ), + "CHECKPOINT\nCREATE TABLE t1 (a int)\nSELECT a FROM t1\nSELECT pg_backend_pid()", + 'pg_stat_statements data kept across the server crash'); + +$node->stop; + +done_testing();