From 42d0bfd009014d65b21ab5638ab3eb0de1261813 Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Wed, 18 Nov 2020 13:22:45 +1100 Subject: [PATCH v2] Degugging tablesync worker. This patch adds some LOGs and sleeps to help debugging the tablesync/apply worker initialization behaviour. --- src/backend/replication/logical/tablesync.c | 5 +++++ src/backend/replication/logical/worker.c | 35 +++++++++++++++++++++++++++-- src/backend/replication/pgoutput/pgoutput.c | 6 +++++ 3 files changed, 44 insertions(+), 2 deletions(-) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index a91b00e..8a1fa86 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -529,6 +529,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) void process_syncing_tables(XLogRecPtr current_lsn) { + elog(LOG, "!!>> %s worker: called process_syncing_tables", am_tablesync_worker() ? "tablesync" : "apply"); + if (am_tablesync_worker()) process_syncing_tables_for_sync(current_lsn); else @@ -955,6 +957,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * Finally, wait until the main apply worker tells us to catch up and then * return to let LogicalRepApplyLoop do it. */ + elog(LOG, "!!>> tablesync worker: wait for CATCHUP state notification"); wait_for_worker_state_change(SUBREL_STATE_CATCHUP); + elog(LOG, "!!>> tablesync worker: received CATCHUP state notification"); + return slotname; } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 0468491..8937056 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -260,12 +260,20 @@ static void apply_handle_tuple_routing(ResultRelInfo *relinfo, static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) { + bool b = false; + if (am_tablesync_worker()) - return MyLogicalRepWorker->relid == rel->localreloid; + b = MyLogicalRepWorker->relid == rel->localreloid; else - return (rel->state == SUBREL_STATE_READY || + b =(rel->state == SUBREL_STATE_READY || (rel->state == SUBREL_STATE_SYNCDONE && rel->statelsn <= remote_final_lsn)); + + elog(LOG, "!!>> %s worker: should_apply_changes_for_rel: %s", + am_tablesync_worker() ? "tablesync" : "apply", + b ? "true" : "false"); + + return b; } /* @@ -1898,6 +1906,12 @@ apply_dispatch(StringInfo s) { LogicalRepMsgType action = pq_getmsgbyte(s); + { + elog(LOG, "!!>> %s worker: apply_dispatch for message kind '%c'", + am_tablesync_worker() ? "tablesync" : "apply", + action); + } + switch (action) { case LOGICAL_REP_MSG_BEGIN: @@ -2086,6 +2100,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) bool endofstream = false; long wait_time; + elog(LOG, "!!>> %s worker: LogicalRepApplyLoop", am_tablesync_worker() ? "tablesync" : "apply"); + CHECK_FOR_INTERRUPTS(); MemoryContextSwitchTo(ApplyMessageContext); @@ -3005,8 +3021,21 @@ ApplyWorkerMain(Datum main_arg) (errmsg("logical replication apply worker for subscription \"%s\" has started", MySubscription->name))); + CommitTransactionCommand(); + elog(LOG, "!!>> The %s worker process has PID = %d", + am_tablesync_worker() ? "tablesync" : "apply", + getpid()); + + if (am_tablesync_worker()) + { + elog(LOG, "!!>>\n\n\nSleeping 30 secs. For debugging, attach to process %d now!\n\n\n", getpid()); + sleep(30); + } + + + /* Connect to the origin and start the replication. */ elog(DEBUG1, "connecting to publisher using connection string \"%s\"", MySubscription->conninfo); @@ -3016,7 +3045,9 @@ ApplyWorkerMain(Datum main_arg) char *syncslotname; /* This is table synchronization worker, call initial sync. */ + elog(LOG, "!!>> tablesync worker: About to call LogicalRepSyncTableStart to do initial syncing"); syncslotname = LogicalRepSyncTableStart(&origin_startpos); + elog(LOG, "!!>> tablesync worker: Returned from LogicalRepSyncTableStart"); /* allocate slot name in long-lived context */ myslotname = MemoryContextStrdup(ApplyContext, syncslotname); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 9c997ae..7ebe204 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -361,6 +361,8 @@ pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) } OutputPluginWrite(ctx, true); + + elog(LOG, "!!>>\n\npgoutput_begin_txn\n\n"); } /* @@ -375,6 +377,8 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit(ctx->out, txn, commit_lsn); OutputPluginWrite(ctx, true); + + elog(LOG, "!!>>\n\npgoutput_commit_txn\n\n"); } /* @@ -616,6 +620,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Assert(false); } + //elog(LOG, "!!>>pgoutput_change: action enum = %d", change->action); + /* Cleanup */ MemoryContextSwitchTo(old); MemoryContextReset(data->context); -- 1.8.3.1