From 6fa2654b1f3c3a1ba963db4e385afcc8c9fa47d6 Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Thu, 14 Jan 2021 15:55:23 +1100 Subject: [PATCH v15] Tablesync extra logging. This patch only adds some extra logging which may be helpful for testing, but is not for committing. --- src/backend/commands/subscriptioncmds.c | 29 ++++++++++++++++++---- src/backend/replication/logical/tablesync.c | 37 +++++++++++++++++++++++++---- 2 files changed, 56 insertions(+), 10 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index f94243b..b5f9d56 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -665,11 +665,18 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) XLogRecPtr statelsn; /* Immediately stop the worker. */ + elog(LOG, + "!!>> AlterSubscription_refresh: before logicalrep_worker_stop"); logicalrep_worker_stop_at_commit(subid, relid); /* prevent re-launching */ logicalrep_worker_stop(subid, relid); /* stop immediately */ + elog(LOG, + "!!>> AlterSubscription_refresh: after logicalrep_worker_stop"); /* Last known rel state. */ state = GetSubscriptionRelState(subid, relid, &statelsn); + elog(LOG, + "!!>> AlterSubscription_refresh: relid %u had state %c", + relid, state); RemoveSubscriptionRel(sub->oid, relid); @@ -692,10 +699,13 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) ReplicationSlotNameForTablesync(subid, relid, syncslotname); - elog(DEBUG1, + elog(LOG, "AlterSubscription_refresh: dropping the tablesync slot \"%s\".", syncslotname); ReplicationSlotDropAtPubNode(wrconn, syncslotname, missing_ok); + elog(LOG, + "!!>> AlterSubscription_refresh: dropped the tablesync slot \"%s\".", + syncslotname); } /* Remove the tablesync's origin tracking if exists. */ @@ -703,13 +713,16 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) originid = replorigin_by_name(originname, true); if (OidIsValid(originid)) { - elog(DEBUG1, + elog(LOG, "AlterSubscription_refresh: dropping origin tracking for \"%s\"", originname); replorigin_drop(originid, false); + elog(LOG, + "!!>> AlterSubscription_refresh: dropped origin tracking for \"%s\"", + originname); } - ereport(DEBUG1, + ereport(LOG, (errmsg("table \"%s.%s\" removed from subscription \"%s\"", get_namespace_name(get_rel_namespace(relid)), get_rel_name(relid), @@ -1191,10 +1204,13 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) } else { - elog(DEBUG1, + elog(LOG, "DropSubscription: dropping the tablesync slot \"%s\".", syncslotname); ReplicationSlotDropAtPubNode(wrconn, syncslotname, missing_ok); + elog(LOG, + "!!>> DropSubscription: dropped the tablesync slot \"%s\".", + syncslotname); } } @@ -1203,10 +1219,13 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) originid = replorigin_by_name(originname, true); if (originid != InvalidRepOriginId) { - elog(DEBUG1, + elog(LOG, "DropSubscription: dropping origin tracking for \"%s\"", originname); replorigin_drop(originid, false); + elog(LOG, + "!!>> DropSubscription: dropped origin tracking for \"%s\"", + originname); } } list_free(rstates); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 33e11a1..80750ad 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -300,10 +300,13 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) MyLogicalRepWorker->relid, syncslotname); - elog(DEBUG1, + elog(LOG, "process_syncing_tables_for_sync: dropping the tablesync slot \"%s\".", syncslotname); ReplicationSlotDropAtPubNode(wrconn, syncslotname, false); + elog(LOG, + "!!>> process_syncing_tables_for_sync: dropped the tablesync slot \"%s\".", + syncslotname); /* * Change state to SYNCDONE. @@ -469,10 +472,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) originid = replorigin_by_name(originname, true); if (OidIsValid(originid)) { - elog(DEBUG1, + elog(LOG, "process_syncing_tables_for_apply: dropping tablesync origin tracking for \"%s\".", originname); replorigin_drop(originid, false); + elog(LOG, + "!!>> process_syncing_tables_for_apply: dropped tablesync origin tracking for \"%s\".", + originname); } } @@ -971,7 +977,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * The COPY phase was previously done, but tablesync then crashed/etc * before it was able to finish normally. */ - elog(DEBUG1, + elog(LOG, "LogicalRepSyncTableStart: tablesync relstate was SUBREL_STATE_FINISHEDCOPY."); StartTransactionCommand(); @@ -979,8 +985,14 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * The origin tracking name must already exist (missing_ok=false). */ originid = replorigin_by_name(originname, false); + elog(LOG, + "!!>> LogicalRepSyncTableStart: 2 replorigin_session_setup \"%s\".", + originname); replorigin_session_setup(originid); replorigin_session_origin = originid; + elog(LOG, + "!!>> LogicalRepSyncTableStart: 2 replorigin_session_get_progress \"%s\".", + originname); *origin_startpos = replorigin_session_get_progress(false); goto copy_table_done; @@ -1029,6 +1041,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * for the catchup phase after COPY is done, so tell it to use the * snapshot to make the final data consistent. */ + elog(LOG, + "!!>> LogicalRepSyncTableStart: walrcv_create_slot for \"%s\".", + slotname); walrcv_create_slot(wrconn, slotname, false, CRS_USE_SNAPSHOT, origin_startpos); @@ -1060,10 +1075,13 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * If something failed during copy table then cleanup the created * slot. */ - elog(DEBUG1, + elog(LOG, "LogicalRepSyncTableStart: tablesync copy failed. Dropping the tablesync slot \"%s\".", slotname); ReplicationSlotDropAtPubNode(wrconn, slotname, false); + elog(LOG, + "!!>> LogicalRepSyncTableStart: tablesync copy failed. Dropped the tablesync slot \"%s\".", + slotname); pfree(slotname); slotname = NULL; @@ -1084,11 +1102,20 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * replication origin from vanishing while advancing. */ LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); + elog(LOG, + "!!>> LogicalRepSyncTableStart: 1 replorigin_create \"%s\".", + originname); originid = replorigin_create(originname); + elog(LOG, + "!!>> LogicalRepSyncTableStart: 1 replorigin_advance \"%s\".", + originname); replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr, true /* go backward */ , true /* WAL log */ ); UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); + elog(LOG, + "!!>> LogicalRepSyncTableStart: 1 replorigin_session_setup \"%s\".", + originname); replorigin_session_setup(originid); replorigin_session_origin = originid; } @@ -1111,7 +1138,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) copy_table_done: - elog(DEBUG1, + elog(LOG, "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X", originname, (uint32) (*origin_startpos >> 32), -- 1.8.3.1