From eb0d4d952f6e80284eb8a84aefcc9ee9108bd011 Mon Sep 17 00:00:00 2001 From: "houzj.fnst" Date: Thu, 6 Oct 2022 14:42:24 +0800 Subject: [PATCH v37 5/5] Add a main_worker_pid to pg_stat_subscription main_worker_pid is Process ID of the leader apply worker, if this process is a apply parallel worker. NULL if this process is a leader apply worker or a synchronization worker. The new column can make it easier to distinguish leader apply worker and apply parallel worker which is also similar to the 'leader_pid' column in pg_stat_activity. --- doc/src/sgml/monitoring.sgml | 26 ++++++++++++++---- src/backend/catalog/system_views.sql | 1 + src/backend/replication/logical/launcher.c | 32 ++++++++++++---------- src/include/catalog/pg_proc.dat | 6 ++-- src/test/regress/expected/rules.out | 3 +- 5 files changed, 43 insertions(+), 25 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 342b20ebeb..1774b0c907 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -3178,13 +3178,24 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i + + + apply_leader_pid integer + + + Process ID of the leader apply worker, if this process is a apply + parallel worker. NULL if this process is a leader apply worker or a + synchronization worker. + + + relid oid OID of the relation that the worker is synchronizing; null for the - main apply worker + main apply worker and the parallel apply worker @@ -3194,7 +3205,7 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i Last write-ahead log location received, the initial value of - this field being 0 + this field being 0; null for the parallel apply worker @@ -3203,7 +3214,8 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i last_msg_send_time timestamp with time zone - Send time of last message received from origin WAL sender + Send time of last message received from origin WAL sender; null for the + parallel apply worker @@ -3212,7 +3224,8 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i last_msg_receipt_time timestamp with time zone - Receipt time of last message received from origin WAL sender + Receipt time of last message received from origin WAL sender; null for + the parallel apply worker @@ -3221,7 +3234,8 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i latest_end_lsn pg_lsn - Last write-ahead log location reported to origin WAL sender + Last write-ahead log location reported to origin WAL sender; null for + the parallel apply worker @@ -3231,7 +3245,7 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i Time of last write-ahead log location reported to origin WAL - sender + sender; null for the parallel apply worker diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index f4a00496ee..3f15cf24e9 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -946,6 +946,7 @@ CREATE VIEW pg_stat_subscription AS su.oid AS subid, su.subname, st.pid, + st.apply_leader_pid, st.relid, st.received_lsn, st.last_msg_send_time, diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 8fd2d6f122..23df33b3ca 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -1057,7 +1057,7 @@ IsLogicalLauncher(void) Datum pg_stat_get_subscription(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_COLS 8 +#define PG_STAT_GET_SUBSCRIPTION_COLS 9 Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0); int i; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; @@ -1083,10 +1083,6 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) if (OidIsValid(subid) && worker.subid != subid) continue; - /* Skip if this is a parallel apply worker */ - if (isParallelApplyWorker(&worker)) - continue; - worker_pid = worker.proc->pid; values[0] = ObjectIdGetDatum(worker.subid); @@ -1095,26 +1091,32 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) else nulls[1] = true; values[2] = Int32GetDatum(worker_pid); - if (XLogRecPtrIsInvalid(worker.last_lsn)) + + if (worker.apply_leader_pid == InvalidPid) nulls[3] = true; else - values[3] = LSNGetDatum(worker.last_lsn); - if (worker.last_send_time == 0) + values[3] = Int32GetDatum(worker.apply_leader_pid); + + if (XLogRecPtrIsInvalid(worker.last_lsn)) nulls[4] = true; else - values[4] = TimestampTzGetDatum(worker.last_send_time); - if (worker.last_recv_time == 0) + values[4] = LSNGetDatum(worker.last_lsn); + if (worker.last_send_time == 0) nulls[5] = true; else - values[5] = TimestampTzGetDatum(worker.last_recv_time); - if (XLogRecPtrIsInvalid(worker.reply_lsn)) + values[5] = TimestampTzGetDatum(worker.last_send_time); + if (worker.last_recv_time == 0) nulls[6] = true; else - values[6] = LSNGetDatum(worker.reply_lsn); - if (worker.reply_time == 0) + values[6] = TimestampTzGetDatum(worker.last_recv_time); + if (XLogRecPtrIsInvalid(worker.reply_lsn)) nulls[7] = true; else - values[7] = TimestampTzGetDatum(worker.reply_time); + values[7] = LSNGetDatum(worker.reply_lsn); + if (worker.reply_time == 0) + nulls[8] = true; + else + values[8] = TimestampTzGetDatum(worker.reply_time); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 68bb032d3e..1e583a15d6 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5396,9 +5396,9 @@ proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid', - proallargtypes => '{oid,oid,oid,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}', - proargmodes => '{i,o,o,o,o,o,o,o,o}', - proargnames => '{subid,subid,relid,pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}', + proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o}', + proargnames => '{subid,subid,relid,pid,apply_leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}', prosrc => 'pg_stat_get_subscription' }, { oid => '2026', descr => 'statistics: current backend PID', proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r', diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 9dd137415e..c69f6e471b 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2086,6 +2086,7 @@ pg_stat_ssl| SELECT s.pid, pg_stat_subscription| SELECT su.oid AS subid, su.subname, st.pid, + st.apply_leader_pid, st.relid, st.received_lsn, st.last_msg_send_time, @@ -2093,7 +2094,7 @@ pg_stat_subscription| SELECT su.oid AS subid, st.latest_end_lsn, st.latest_end_time FROM (pg_subscription su - LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid))); + LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, apply_leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid))); pg_stat_subscription_stats| SELECT ss.subid, s.subname, ss.apply_error_count, -- 2.23.0.windows.1