*** a/src/backend/access/transam/twophase.c --- b/src/backend/access/transam/twophase.c *************** *** 93,98 **** --- 93,99 ---- #include "miscadmin.h" #include "pg_trace.h" #include "pgstat.h" + #include "replication/logicallauncher.h" #include "replication/origin.h" #include "replication/syncrep.h" #include "replication/walsender.h" *************** *** 914,919 **** typedef struct TwoPhaseFileHeader --- 915,921 ---- int32 nabortrels; /* number of delete-on-abort rels */ int32 ninvalmsgs; /* number of cache invalidation messages */ bool initfileinval; /* does relcache init file need invalidation? */ + bool wakeuplauncher; /* need to wake up logical rep launcher? */ uint16 gidlen; /* length of the GID - GID follows the header */ } TwoPhaseFileHeader; *************** *** 1025,1030 **** StartPrepare(GlobalTransaction gxact) --- 1027,1034 ---- hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels); hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs, &hdr.initfileinval); + hdr.wakeuplauncher = on_commit_launcher_wakeup; + on_commit_launcher_wakeup = false; hdr.gidlen = strlen(gxact->gid) + 1; /* Include '\0' */ save_state_data(&hdr, sizeof(TwoPhaseFileHeader)); *************** *** 1501,1506 **** FinishPreparedTransaction(const char *gid, bool isCommit) --- 1505,1517 ---- /* Count the prepared xact as committed or aborted */ AtEOXact_PgStat(isCommit); + /* Wake up the logical replication launcher if necessary */ + if (hdr->wakeuplauncher) + { + ApplyLauncherWakeupAtCommit(); + AtEOXact_ApplyLauncher(isCommit); + } + /* * And now we can clean up any files we may have left. */ *** a/src/backend/access/transam/xact.c --- b/src/backend/access/transam/xact.c *************** *** 2138,2144 **** CommitTransaction(void) AtEOXact_HashTables(true); AtEOXact_PgStat(true); AtEOXact_Snapshot(true, false); ! AtCommit_ApplyLauncher(); pgstat_report_xact_timestamp(0); CurrentResourceOwner = NULL; --- 2138,2144 ---- AtEOXact_HashTables(true); AtEOXact_PgStat(true); AtEOXact_Snapshot(true, false); ! AtEOXact_ApplyLauncher(true); pgstat_report_xact_timestamp(0); CurrentResourceOwner = NULL; *************** *** 2612,2617 **** AbortTransaction(void) --- 2612,2618 ---- AtEOXact_ComboCid(); AtEOXact_HashTables(false); AtEOXact_PgStat(false); + AtEOXact_ApplyLauncher(false); pgstat_report_xact_timestamp(0); } *** a/src/backend/replication/logical/launcher.c --- b/src/backend/replication/logical/launcher.c *************** *** 81,87 **** static void logicalrep_worker_detach(void); volatile sig_atomic_t got_SIGHUP = false; volatile sig_atomic_t got_SIGTERM = false; ! static bool on_commit_launcher_wakeup = false; Datum pg_stat_get_subscription(PG_FUNCTION_ARGS); --- 81,87 ---- volatile sig_atomic_t got_SIGHUP = false; volatile sig_atomic_t got_SIGTERM = false; ! bool on_commit_launcher_wakeup = false; Datum pg_stat_get_subscription(PG_FUNCTION_ARGS); *************** *** 633,645 **** ApplyLauncherShmemInit(void) } /* ! * Wakeup the launcher on commit if requested. */ void ! AtCommit_ApplyLauncher(void) { ! if (on_commit_launcher_wakeup) ApplyLauncherWakeup(); } /* --- 633,648 ---- } /* ! * AtEOXact_ApplyLauncher ! * Wakeup the launcher on commit if requested. */ void ! AtEOXact_ApplyLauncher(bool isCommit) { ! if (isCommit && on_commit_launcher_wakeup) ApplyLauncherWakeup(); + + on_commit_launcher_wakeup = false; } /* *** a/src/include/replication/logicallauncher.h --- b/src/include/replication/logicallauncher.h *************** *** 22,27 **** extern Size ApplyLauncherShmemSize(void); extern void ApplyLauncherShmemInit(void); extern void ApplyLauncherWakeupAtCommit(void); ! extern void AtCommit_ApplyLauncher(void); #endif /* LOGICALLAUNCHER_H */ --- 22,29 ---- extern void ApplyLauncherShmemInit(void); extern void ApplyLauncherWakeupAtCommit(void); ! extern void AtEOXact_ApplyLauncher(bool isCommit); ! ! extern bool on_commit_launcher_wakeup; #endif /* LOGICALLAUNCHER_H */