From 1bc5b945190c83312f0fc60f06e311fabc403ed0 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Fri, 21 Jun 2024 10:41:49 +0800 Subject: [PATCH v3 2/2] Collect statistics about conflicts in logical replication This commit adds columns in view pg_stat_subscription_stats to show information about the conflict which occur during the application of logical replication changes. Currently, the following columns are added. insert_exists_counts: Number of times inserting a row that iolates a NOT DEFERRABLE unique constraint. update_differ_counts: Number of times updating a row that was previously modified by another origin. update_missing_counts: Number of times that the tuple to be updated is missing. delete_missing_counts: Number of times that the tuple to be deleted is missing. The conflicts will be tracked only when track_conflict option of the subscription is enabled. Additionally, update_differ can be detected only when track_commit_timestamp is enabled. --- doc/src/sgml/monitoring.sgml | 52 ++++++++++++- doc/src/sgml/ref/create_subscription.sgml | 4 +- src/backend/catalog/system_views.sql | 4 + src/backend/replication/logical/conflict.c | 4 + .../utils/activity/pgstat_subscription.c | 17 ++++ src/backend/utils/adt/pgstatfuncs.c | 20 ++++- src/include/catalog/pg_proc.dat | 6 +- src/include/pgstat.h | 4 + src/include/replication/conflict.h | 2 + src/test/regress/expected/rules.out | 6 +- src/test/subscription/t/026_stats.pl | 77 +++++++++++++++++-- 11 files changed, 178 insertions(+), 18 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index b2ad9b446f..0ceb71f214 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -507,7 +507,7 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser pg_stat_subscription_statspg_stat_subscription_stats - One row per subscription, showing statistics about errors. + One row per subscription, showing statistics about errors and conflicts. See pg_stat_subscription_stats for details. @@ -2163,6 +2163,56 @@ description | Waiting for a newly initialized WAL file to reach durable storage + + + insert_exists_count bigint + + + Number of times inserting a row that violates a + NOT DEFERRABLE unique constraint while applying + changes. This conflict is counted only if + detect_conflict + is enabled + + + + + + update_differ_count bigint + + + Number of times updating a row that was previously modified by another + source while applying changes. This conflict is counted only when the + detect_conflict + option of the subscription and track_commit_timestamp + are enabled + + + + + + update_missing_count bigint + + + Number of times that the tuple to be updated is not found while applying + changes. This conflict is counted only if + detect_conflict + is enabled + + + + + + delete_missing_count bigint + + + Number of times that the tuple to be deleted is not found while applying + changes. This conflict is counted only if + detect_conflict + is enabled + + + stats_reset timestamp with time zone diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index ce37fa6490..06bea458a6 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -437,8 +437,8 @@ CREATE SUBSCRIPTION subscription_namefalse. - When conflict detection is enabled, additional logging is triggered - in the following scenarios: + When conflict detection is enabled, additional logging is triggered and + the conflict statistics are collected in the following scenarios: insert_exists diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 30393e6d67..182836ac82 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1370,6 +1370,10 @@ CREATE VIEW pg_stat_subscription_stats AS s.subname, ss.apply_error_count, ss.sync_error_count, + ss.insert_exists_count, + ss.update_differ_count, + ss.update_missing_count, + ss.delete_missing_count, ss.stats_reset FROM pg_subscription as s, pg_stat_get_subscription_stats(s.oid) as ss; diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index f24f048054..ca58b708ef 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -15,8 +15,10 @@ #include "postgres.h" #include "access/commit_ts.h" +#include "pgstat.h" #include "replication/conflict.h" #include "replication/origin.h" +#include "replication/worker_internal.h" #include "utils/lsyscache.h" #include "utils/rel.h" @@ -75,6 +77,8 @@ ReportApplyConflict(int elevel, ConflictType type, Relation localrel, RepOriginId localorigin, TimestampTz localts, TupleTableSlot *conflictslot) { + pgstat_report_subscription_conflict(MySubscription->oid, type); + ereport(elevel, errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION), errmsg("conflict %s detected on relation \"%s.%s\"", diff --git a/src/backend/utils/activity/pgstat_subscription.c b/src/backend/utils/activity/pgstat_subscription.c index d9af8de658..e06c92727e 100644 --- a/src/backend/utils/activity/pgstat_subscription.c +++ b/src/backend/utils/activity/pgstat_subscription.c @@ -39,6 +39,21 @@ pgstat_report_subscription_error(Oid subid, bool is_apply_error) pending->sync_error_count++; } +/* + * Report a subscription conflict. + */ +void +pgstat_report_subscription_conflict(Oid subid, ConflictType type) +{ + PgStat_EntryRef *entry_ref; + PgStat_BackendSubEntry *pending; + + entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION, + InvalidOid, subid, NULL); + pending = entry_ref->pending; + pending->conflict_count[type]++; +} + /* * Report creating the subscription. */ @@ -101,6 +116,8 @@ pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait) #define SUB_ACC(fld) shsubent->stats.fld += localent->fld SUB_ACC(apply_error_count); SUB_ACC(sync_error_count); + for (int i = 0; i < CONFLICT_NUM_TYPES; i++) + SUB_ACC(conflict_count[i]); #undef SUB_ACC pgstat_unlock_entry(entry_ref); diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 3876339ee1..e8ddb749a7 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -1966,7 +1966,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) Datum pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 4 +#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 8 Oid subid = PG_GETARG_OID(0); TupleDesc tupdesc; Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0}; @@ -1985,7 +1985,15 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 4, "stats_reset", + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "insert_exists_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "update_differ_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "update_missing_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "delete_missing_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "stats_reset", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); @@ -2005,11 +2013,15 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) /* sync_error_count */ values[2] = Int64GetDatum(subentry->sync_error_count); + /* conflict count */ + for (int i = 0; i < CONFLICT_NUM_TYPES; i++) + values[3 + i] = Int64GetDatum(subentry->conflict_count[i]); + /* stats_reset */ if (subentry->stat_reset_timestamp == 0) - nulls[3] = true; + nulls[7] = true; else - values[3] = TimestampTzGetDatum(subentry->stat_reset_timestamp); + values[7] = TimestampTzGetDatum(subentry->stat_reset_timestamp); /* Returns the record as Datum */ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 6a5476d3c4..08bc966a2f 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5505,9 +5505,9 @@ { oid => '6231', descr => 'statistics: information about subscription stats', proname => 'pg_stat_get_subscription_stats', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid', - proallargtypes => '{oid,oid,int8,int8,timestamptz}', - proargmodes => '{i,o,o,o,o}', - proargnames => '{subid,subid,apply_error_count,sync_error_count,stats_reset}', + proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,timestamptz}', + proargmodes => '{i,o,o,o,o,o,o,o,o}', + proargnames => '{subid,subid,apply_error_count,sync_error_count,insert_exists_count,update_differ_count,update_missing_count,delete_missing_count,stats_reset}', prosrc => 'pg_stat_get_subscription_stats' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 2136239710..b957e7ad36 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -14,6 +14,7 @@ #include "datatype/timestamp.h" #include "portability/instr_time.h" #include "postmaster/pgarch.h" /* for MAX_XFN_CHARS */ +#include "replication/conflict.h" #include "utils/backend_progress.h" /* for backward compatibility */ #include "utils/backend_status.h" /* for backward compatibility */ #include "utils/relcache.h" @@ -135,6 +136,7 @@ typedef struct PgStat_BackendSubEntry { PgStat_Counter apply_error_count; PgStat_Counter sync_error_count; + PgStat_Counter conflict_count[CONFLICT_NUM_TYPES]; } PgStat_BackendSubEntry; /* ---------- @@ -393,6 +395,7 @@ typedef struct PgStat_StatSubEntry { PgStat_Counter apply_error_count; PgStat_Counter sync_error_count; + PgStat_Counter conflict_count[CONFLICT_NUM_TYPES]; TimestampTz stat_reset_timestamp; } PgStat_StatSubEntry; @@ -695,6 +698,7 @@ extern PgStat_SLRUStats *pgstat_fetch_slru(void); */ extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error); +extern void pgstat_report_subscription_conflict(Oid subid, ConflictType conflict); extern void pgstat_create_subscription(Oid subid); extern void pgstat_drop_subscription(Oid subid); extern PgStat_StatSubEntry *pgstat_fetch_stat_subscription(Oid subid); diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index 0bc9db991e..40dcb1d9ad 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -33,6 +33,8 @@ typedef enum CT_DELETE_MISSING, } ConflictType; +#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1) + extern bool GetTupleCommitTs(TupleTableSlot *localslot, TransactionId *xmin, RepOriginId *localorigin, TimestampTz *localts); extern void ReportApplyConflict(int elevel, ConflictType type, diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 13178e2b3d..80a6857b00 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2141,9 +2141,13 @@ pg_stat_subscription_stats| SELECT ss.subid, s.subname, ss.apply_error_count, ss.sync_error_count, + ss.insert_exists_count, + ss.update_differ_count, + ss.update_missing_count, + ss.delete_missing_count, ss.stats_reset FROM pg_subscription s, - LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, stats_reset); + LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, insert_exists_count, update_differ_count, update_missing_count, delete_missing_count, stats_reset); pg_stat_sys_indexes| SELECT relid, indexrelid, schemaname, diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl index fb3e5629b3..f81ce66540 100644 --- a/src/test/subscription/t/026_stats.pl +++ b/src/test/subscription/t/026_stats.pl @@ -16,6 +16,7 @@ $node_publisher->start; # Create subscriber node. my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', 'track_commit_timestamp = on'); $node_subscriber->start; @@ -30,6 +31,7 @@ sub create_sub_pub_w_errors qq[ BEGIN; CREATE TABLE $table_name(a int); + ALTER TABLE $table_name REPLICA IDENTITY FULL; INSERT INTO $table_name VALUES (1); COMMIT; ]); @@ -53,7 +55,7 @@ sub create_sub_pub_w_errors # infinite error loop due to violating the unique constraint. my $sub_name = $table_name . '_sub'; $node_subscriber->safe_psql($db, - qq(CREATE SUBSCRIPTION $sub_name CONNECTION '$publisher_connstr' PUBLICATION $pub_name) + qq(CREATE SUBSCRIPTION $sub_name CONNECTION '$publisher_connstr' PUBLICATION $pub_name WITH (detect_conflict = on)) ); $node_publisher->wait_for_catchup($sub_name); @@ -95,7 +97,7 @@ sub create_sub_pub_w_errors $node_subscriber->poll_query_until( $db, qq[ - SELECT apply_error_count > 0 + SELECT apply_error_count > 0 AND insert_exists_count > 0 FROM pg_stat_subscription_stats WHERE subname = '$sub_name' ]) @@ -105,6 +107,47 @@ sub create_sub_pub_w_errors # Truncate test table so that apply worker can continue. $node_subscriber->safe_psql($db, qq(TRUNCATE $table_name)); + # Update and delete data to test table on the publisher, skipping the + # update and delete on the subscriber as there are no data in the test + # table. + $node_publisher->safe_psql($db, qq( + UPDATE $table_name SET a = 2; + DELETE FROM $table_name; + )); + + # Wait for the tuple missing to be reported. + $node_subscriber->poll_query_until( + $db, + qq[ + SELECT update_missing_count > 0 AND delete_missing_count > 0 + FROM pg_stat_subscription_stats + WHERE subname = '$sub_name' + ]) + or die + qq(Timed out while waiting for tuple missing conflict for subscription '$sub_name'); + + # Prepare data for further tests. + $node_publisher->safe_psql($db, qq(INSERT INTO $table_name VALUES (1))); + $node_publisher->wait_for_catchup($sub_name); + $node_subscriber->safe_psql($db, qq( + TRUNCATE $table_name; + INSERT INTO $table_name VALUES (1); + )); + + # Update data to test table on the publisher, updating a row on the + # subscriber that was modified by a different origin. + $node_publisher->safe_psql($db, qq(UPDATE $table_name SET a = 2;)); + + $node_subscriber->poll_query_until( + $db, + qq[ + SELECT update_differ_count > 0 + FROM pg_stat_subscription_stats + WHERE subname = '$sub_name' + ]) + or die + qq(Timed out while waiting for update_differ conflict for subscription '$sub_name'); + return ($pub_name, $sub_name); } @@ -128,11 +171,15 @@ is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count > 0, sync_error_count > 0, + insert_exists_count > 0, + update_differ_count > 0, + update_missing_count > 0, + delete_missing_count > 0, stats_reset IS NULL FROM pg_stat_subscription_stats WHERE subname = '$sub1_name') ), - qq(t|t|t), + qq(t|t|t|t|t|t|t), qq(Check that apply errors and sync errors are both > 0 and stats_reset is NULL for subscription '$sub1_name'.) ); @@ -146,11 +193,15 @@ is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count = 0, sync_error_count = 0, + insert_exists_count = 0, + update_differ_count = 0, + update_missing_count = 0, + delete_missing_count = 0, stats_reset IS NOT NULL FROM pg_stat_subscription_stats WHERE subname = '$sub1_name') ), - qq(t|t|t), + qq(t|t|t|t|t|t|t), qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.) ); @@ -186,11 +237,15 @@ is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count > 0, sync_error_count > 0, + insert_exists_count > 0, + update_differ_count > 0, + update_missing_count > 0, + delete_missing_count > 0, stats_reset IS NULL FROM pg_stat_subscription_stats WHERE subname = '$sub2_name') ), - qq(t|t|t), + qq(t|t|t|t|t|t|t), qq(Confirm that apply errors and sync errors are both > 0 and stats_reset is NULL for sub '$sub2_name'.) ); @@ -203,11 +258,15 @@ is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count = 0, sync_error_count = 0, + insert_exists_count = 0, + update_differ_count = 0, + update_missing_count = 0, + delete_missing_count = 0, stats_reset IS NOT NULL FROM pg_stat_subscription_stats WHERE subname = '$sub1_name') ), - qq(t|t|t), + qq(t|t|t|t|t|t|t), qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub1_name' after reset.) ); @@ -215,11 +274,15 @@ is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count = 0, sync_error_count = 0, + insert_exists_count = 0, + update_differ_count = 0, + update_missing_count = 0, + delete_missing_count = 0, stats_reset IS NOT NULL FROM pg_stat_subscription_stats WHERE subname = '$sub2_name') ), - qq(t|t|t), + qq(t|t|t|t|t|t|t), qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub2_name' after reset.) ); -- 2.30.0.windows.2