From c1940cb37539030efd016d6a409ea39c72302d82 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Thu, 10 Feb 2022 21:18:11 +0900 Subject: [PATCH v11 1/2] Report error transaction's commit LSN instead of XID to pg_stat_subscription_workers. --- doc/src/sgml/monitoring.sgml | 6 +-- src/backend/catalog/system_views.sql | 2 +- src/backend/postmaster/pgstat.c | 10 ++--- src/backend/replication/logical/worker.c | 45 ++++++++++----------- src/backend/utils/adt/pgstatfuncs.c | 11 ++--- src/include/catalog/pg_proc.dat | 4 +- src/include/pgstat.h | 6 +-- src/test/regress/expected/rules.out | 4 +- src/test/subscription/t/026_worker_stats.pl | 14 ++----- 9 files changed, 47 insertions(+), 55 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 62f2a3332b..0820d4a320 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -3143,11 +3143,11 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i - last_error_xid xid + last_error_lsn pg_lsn - Transaction ID of the publisher node being applied when the error - occurred. This field is null if the error was reported + The commit LSN of transaction of the publisher node being applied + when the error occurred. This field is null if the error was reported during the initial data copy. diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 3cb69b1f87..9e9578bad4 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1271,7 +1271,7 @@ CREATE VIEW pg_stat_subscription_workers AS w.subrelid, w.last_error_relid, w.last_error_command, - w.last_error_xid, + w.last_error_lsn, w.last_error_count, w.last_error_message, w.last_error_time diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 0646f53098..9d95bcb0e3 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -1956,7 +1956,7 @@ pgstat_report_replslot_drop(const char *slotname) */ void pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid, - LogicalRepMsgType command, TransactionId xid, + LogicalRepMsgType command, XLogRecPtr lsn, const char *errmsg) { PgStat_MsgSubWorkerError msg; @@ -1968,7 +1968,7 @@ pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid, msg.m_subrelid = subrelid; msg.m_relid = relid; msg.m_command = command; - msg.m_xid = xid; + msg.m_lsn = lsn; msg.m_timestamp = GetCurrentTimestamp(); strlcpy(msg.m_message, errmsg, PGSTAT_SUBWORKERERROR_MSGLEN); @@ -3967,7 +3967,7 @@ pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry, Oid subid, Oid subrelid, { subwentry->last_error_relid = InvalidOid; subwentry->last_error_command = 0; - subwentry->last_error_xid = InvalidTransactionId; + subwentry->last_error_lsn = InvalidXLogRecPtr; subwentry->last_error_count = 0; subwentry->last_error_time = 0; subwentry->last_error_message[0] = '\0'; @@ -6173,7 +6173,7 @@ pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len) if (subwentry->last_error_relid == msg->m_relid && subwentry->last_error_command == msg->m_command && - subwentry->last_error_xid == msg->m_xid && + subwentry->last_error_lsn == msg->m_lsn && strcmp(subwentry->last_error_message, msg->m_message) == 0) { /* @@ -6188,7 +6188,7 @@ pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len) /* Otherwise, update the error information */ subwentry->last_error_relid = msg->m_relid; subwentry->last_error_command = msg->m_command; - subwentry->last_error_xid = msg->m_xid; + subwentry->last_error_lsn = msg->m_lsn; subwentry->last_error_count = 1; subwentry->last_error_time = msg->m_timestamp; strlcpy(subwentry->last_error_message, msg->m_message, diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index d77bb32bb9..2d2c83cd53 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -225,7 +225,7 @@ typedef struct ApplyErrorCallbackArg /* Remote node information */ int remote_attnum; /* -1 if invalid */ - TransactionId remote_xid; + XLogRecPtr remote_lsn; TimestampTz ts; /* commit, rollback, or prepare timestamp */ } ApplyErrorCallbackArg; @@ -234,7 +234,7 @@ static ApplyErrorCallbackArg apply_error_callback_arg = .command = 0, .rel = NULL, .remote_attnum = -1, - .remote_xid = InvalidTransactionId, + .remote_lsn = InvalidXLogRecPtr, .ts = 0, }; @@ -334,7 +334,7 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn); /* Functions for apply error callback */ static void apply_error_callback(void *arg); -static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts); +static inline void set_apply_error_context_xact(XLogRecPtr lsn, TimestampTz ts); static inline void reset_apply_error_context_info(void); /* @@ -787,7 +787,7 @@ apply_handle_begin(StringInfo s) LogicalRepBeginData begin_data; logicalrep_read_begin(s, &begin_data); - set_apply_error_context_xact(begin_data.xid, begin_data.committime); + set_apply_error_context_xact(begin_data.final_lsn, begin_data.committime); remote_final_lsn = begin_data.final_lsn; @@ -839,7 +839,7 @@ apply_handle_begin_prepare(StringInfo s) errmsg_internal("tablesync worker received a BEGIN PREPARE message"))); logicalrep_read_begin_prepare(s, &begin_data); - set_apply_error_context_xact(begin_data.xid, begin_data.prepare_time); + set_apply_error_context_xact(begin_data.prepare_lsn, begin_data.prepare_time); remote_final_lsn = begin_data.prepare_lsn; @@ -938,7 +938,7 @@ apply_handle_commit_prepared(StringInfo s) char gid[GIDSIZE]; logicalrep_read_commit_prepared(s, &prepare_data); - set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_time); + set_apply_error_context_xact(prepare_data.commit_lsn, prepare_data.commit_time); /* Compute GID for two_phase transactions. */ TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, @@ -979,7 +979,8 @@ apply_handle_rollback_prepared(StringInfo s) char gid[GIDSIZE]; logicalrep_read_rollback_prepared(s, &rollback_data); - set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_time); + set_apply_error_context_xact(rollback_data.rollback_end_lsn, + rollback_data.rollback_time); /* Compute GID for two_phase transactions. */ TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid, @@ -1044,7 +1045,8 @@ apply_handle_stream_prepare(StringInfo s) errmsg_internal("tablesync worker received a STREAM PREPARE message"))); logicalrep_read_stream_prepare(s, &prepare_data); - set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_time); + set_apply_error_context_xact(prepare_data.prepare_lsn, + prepare_data.prepare_time); elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid); @@ -1126,8 +1128,6 @@ apply_handle_stream_start(StringInfo s) (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid transaction ID in streamed replication transaction"))); - set_apply_error_context_xact(stream_xid, 0); - /* * Initialize the worker's stream_fileset if we haven't yet. This will be * used for the entire duration of the worker so create it in a permanent @@ -1214,10 +1214,7 @@ apply_handle_stream_abort(StringInfo s) * just delete the files with serialized info. */ if (xid == subxid) - { - set_apply_error_context_xact(xid, 0); stream_cleanup_files(MyLogicalRepWorker->subid, xid); - } else { /* @@ -1241,8 +1238,6 @@ apply_handle_stream_abort(StringInfo s) bool found = false; char path[MAXPGPATH]; - set_apply_error_context_xact(subxid, 0); - subidx = -1; begin_replication_step(); subxact_info_read(MyLogicalRepWorker->subid, xid); @@ -1426,7 +1421,7 @@ apply_handle_stream_commit(StringInfo s) errmsg_internal("STREAM COMMIT message without STREAM STOP"))); xid = logicalrep_read_stream_commit(s, &commit_data); - set_apply_error_context_xact(xid, commit_data.committime); + set_apply_error_context_xact(commit_data.commit_lsn, commit_data.committime); elog(DEBUG1, "received commit for streamed transaction %u", xid); @@ -3499,7 +3494,7 @@ ApplyWorkerMain(Datum main_arg) MyLogicalRepWorker->relid, MyLogicalRepWorker->relid, 0, /* message type */ - InvalidTransactionId, + InvalidXLogRecPtr, errdata->message); MemoryContextSwitchTo(ecxt); PG_RE_THROW(); @@ -3640,7 +3635,7 @@ ApplyWorkerMain(Datum main_arg) ? apply_error_callback_arg.rel->localreloid : InvalidOid, apply_error_callback_arg.command, - apply_error_callback_arg.remote_xid, + apply_error_callback_arg.remote_lsn, errdata->message); MemoryContextSwitchTo(ecxt); } @@ -3687,11 +3682,13 @@ apply_error_callback(void *arg) } /* append transaction information */ - if (TransactionIdIsNormal(errarg->remote_xid)) + if (!XLogRecPtrIsInvalid(errarg->remote_lsn)) { - appendStringInfo(&buf, _(" in transaction %u"), errarg->remote_xid); + appendStringInfo(&buf, _(" in transaction which committed at %X/%X"), + LSN_FORMAT_ARGS(errarg->remote_lsn)); + if (errarg->ts != 0) - appendStringInfo(&buf, _(" at %s"), + appendStringInfo(&buf, _(", at %s"), timestamptz_to_str(errarg->ts)); } @@ -3701,9 +3698,9 @@ apply_error_callback(void *arg) /* Set transaction information of apply error callback */ static inline void -set_apply_error_context_xact(TransactionId xid, TimestampTz ts) +set_apply_error_context_xact(XLogRecPtr lsn, TimestampTz ts) { - apply_error_callback_arg.remote_xid = xid; + apply_error_callback_arg.remote_lsn = lsn; apply_error_callback_arg.ts = ts; } @@ -3714,5 +3711,5 @@ reset_apply_error_context_info(void) apply_error_callback_arg.command = 0; apply_error_callback_arg.rel = NULL; apply_error_callback_arg.remote_attnum = -1; - set_apply_error_context_xact(InvalidTransactionId, 0); + set_apply_error_context_xact(InvalidXLogRecPtr, 0); } diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 15cb17ace4..697f72c276 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -30,6 +30,7 @@ #include "utils/acl.h" #include "utils/builtins.h" #include "utils/inet.h" +#include "utils/pg_lsn.h" #include "utils/timestamp.h" #define UINT32_ACCESS_ONCE(var) ((uint32)(*((volatile uint32 *)&(var)))) @@ -2446,8 +2447,8 @@ pg_stat_get_subscription_worker(PG_FUNCTION_ARGS) OIDOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 4, "last_error_command", TEXTOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 5, "last_error_xid", - XIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "last_error_lsn", + LSNOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 6, "last_error_count", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 7, "last_error_message", @@ -2483,9 +2484,9 @@ pg_stat_get_subscription_worker(PG_FUNCTION_ARGS) else nulls[i++] = true; - /* last_error_xid */ - if (TransactionIdIsValid(wentry->last_error_xid)) - values[i++] = TransactionIdGetDatum(wentry->last_error_xid); + /* last_error_lsn */ + if (!XLogRecPtrIsInvalid(wentry->last_error_lsn)) + values[i++] = LSNGetDatum(wentry->last_error_lsn); else nulls[i++] = true; diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 7024dbe10a..1b6b745d11 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5375,9 +5375,9 @@ proname => 'pg_stat_get_subscription_worker', prorows => '1', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid oid', - proallargtypes => '{oid,oid,oid,oid,oid,text,xid,int8,text,timestamptz}', + proallargtypes => '{oid,oid,oid,oid,oid,text,pg_lsn,int8,text,timestamptz}', proargmodes => '{i,i,o,o,o,o,o,o,o,o}', - proargnames => '{subid,subrelid,subid,subrelid,last_error_relid,last_error_command,last_error_xid,last_error_count,last_error_message,last_error_time}', + proargnames => '{subid,subrelid,subid,subrelid,last_error_relid,last_error_command,last_error_lsn,last_error_count,last_error_message,last_error_time}', prosrc => 'pg_stat_get_subscription_worker' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index e10d20222a..77eb799e81 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -585,7 +585,7 @@ typedef struct PgStat_MsgSubWorkerError Oid m_relid; LogicalRepMsgType m_command; - TransactionId m_xid; + XLogRecPtr m_lsn; TimestampTz m_timestamp; char m_message[PGSTAT_SUBWORKERERROR_MSGLEN]; } PgStat_MsgSubWorkerError; @@ -1016,7 +1016,7 @@ typedef struct PgStat_StatSubWorkerEntry */ Oid last_error_relid; LogicalRepMsgType last_error_command; - TransactionId last_error_xid; + XLogRecPtr last_error_lsn; PgStat_Counter last_error_count; TimestampTz last_error_time; char last_error_message[PGSTAT_SUBWORKERERROR_MSGLEN]; @@ -1133,7 +1133,7 @@ extern void pgstat_report_replslot_create(const char *slotname); extern void pgstat_report_replslot_drop(const char *slotname); extern void pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid, LogicalRepMsgType command, - TransactionId xid, const char *errmsg); + XLogRecPtr lsn, const char *errmsg); extern void pgstat_report_subscription_drop(Oid subid); extern void pgstat_initialize(void); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index d652f7b5fb..0b2b2f81e9 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2099,7 +2099,7 @@ pg_stat_subscription_workers| SELECT w.subid, w.subrelid, w.last_error_relid, w.last_error_command, - w.last_error_xid, + w.last_error_lsn, w.last_error_count, w.last_error_message, w.last_error_time @@ -2110,7 +2110,7 @@ pg_stat_subscription_workers| SELECT w.subid, SELECT pg_subscription_rel.srsubid AS subid, pg_subscription_rel.srrelid AS relid FROM pg_subscription_rel) sr, - (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, last_error_relid, last_error_command, last_error_xid, last_error_count, last_error_message, last_error_time) + (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, last_error_relid, last_error_command, last_error_lsn, last_error_count, last_error_message, last_error_time) JOIN pg_subscription s ON ((w.subid = s.oid))); pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid, pg_stat_all_indexes.indexrelid, diff --git a/src/test/subscription/t/026_worker_stats.pl b/src/test/subscription/t/026_worker_stats.pl index 6cf21c8fee..d7f6e702df 100644 --- a/src/test/subscription/t/026_worker_stats.pl +++ b/src/test/subscription/t/026_worker_stats.pl @@ -11,7 +11,7 @@ use Test::More tests => 3; # Test if the error reported on pg_stat_subscription_workers view is expected. sub test_subscription_error { - my ($node, $relname, $command, $xid, $by_apply_worker, $errmsg_prefix, $msg) + my ($node, $relname, $command, $by_apply_worker, $errmsg_prefix, $msg) = @_; my $check_sql = qq[ @@ -30,11 +30,6 @@ WHERE last_error_relid = '$relname'::regclass ? qq[ AND last_error_command IS NULL] : qq[ AND last_error_command = '$command']; - # last_error_xid - $check_sql .= $xid eq '' - ? qq[ AND last_error_xid IS NULL] - : qq[ AND last_error_xid = '$xid'::xid]; - # Wait for the particular error statistics to be reported. $node->poll_query_until('postgres', $check_sql, ) or die "Timed out while waiting for " . $msg; @@ -116,21 +111,20 @@ is($result, q(1), 'check initial data are copied to subscriber'); # Insert more data to test_tab1, raising an error on the subscriber due to # violation of the unique constraint on test_tab1. -my $xid = $node_publisher->safe_psql( +$node_publisher->safe_psql( 'postgres', qq[ BEGIN; INSERT INTO test_tab1 VALUES (1); -SELECT pg_current_xact_id()::xid; COMMIT; ]); -test_subscription_error($node_subscriber, 'test_tab1', 'INSERT', $xid, +test_subscription_error($node_subscriber, 'test_tab1', 'INSERT', 1, # check apply worker error qq(duplicate key value violates unique constraint), 'error reported by the apply worker'); # Check the table sync worker's error in the view. -test_subscription_error($node_subscriber, 'test_tab2', '', '', +test_subscription_error($node_subscriber, 'test_tab2', '', 0, # check tablesync worker error qq(duplicate key value violates unique constraint), 'the error reported by the table sync worker'); -- 2.24.3 (Apple Git-128)