From e9b582054acfc45176faaab1f65d481a2d05335c Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Wed, 23 Dec 2020 01:39:44 -0500 Subject: [PATCH v35 2/8] Refactor spool-file logic in worker.c. This patch only refactors to isolate the streaming spool-file processing to a separate function. A later patch to support prepared transaction apply will require this common processing logic to be called from another place. Author: Peter Smith Reviewed-by: Amit Kapila Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru https://postgr.es/m/CAMGcDxeqEpWj3fTXwqhSwBdXd2RS9jzwWscO-XbeCfso6ts3+Q@mail.gmail.com --- src/backend/replication/logical/worker.c | 48 ++++++++++++++++++++++---------- 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 3874939..4f75e85 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -246,6 +246,8 @@ static void apply_handle_tuple_routing(ResultRelInfo *relinfo, LogicalRepRelMapEntry *relmapentry, CmdType operation); +static int apply_spooled_messages(TransactionId xid, XLogRecPtr lsn); + /* * Should this worker apply changes for given relation. * @@ -924,30 +926,21 @@ apply_handle_stream_abort(StringInfo s) } /* - * Handle STREAM COMMIT message. + * Common spoolfile processing. + * Returns how many changes were applied. */ -static void -apply_handle_stream_commit(StringInfo s) +static int +apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) { - TransactionId xid; StringInfoData s2; int nchanges; char path[MAXPGPATH]; char *buffer = NULL; bool found; - LogicalRepCommitData commit_data; StreamXidHash *ent; MemoryContext oldcxt; BufFile *fd; - Assert(!in_streamed_transaction); - - xid = logicalrep_read_stream_commit(s, &commit_data); - - elog(DEBUG1, "received commit for streamed transaction %u", xid); - - ensure_transaction(); - /* * Allocate file handle and memory required to process all the messages in * TopTransactionContext to avoid them getting reset after each message is @@ -955,7 +948,7 @@ apply_handle_stream_commit(StringInfo s) */ oldcxt = MemoryContextSwitchTo(TopTransactionContext); - /* open the spool file for the committed transaction */ + /* open the spool file for the committed/prepared transaction */ changes_filename(path, MyLogicalRepWorker->subid, xid); elog(DEBUG1, "replaying changes from file \"%s\"", path); ent = (StreamXidHash *) hash_search(xidhash, @@ -970,7 +963,7 @@ apply_handle_stream_commit(StringInfo s) MemoryContextSwitchTo(oldcxt); - remote_final_lsn = commit_data.commit_lsn; + remote_final_lsn = lsn; /* * Make sure the handle apply_dispatch methods are aware we're in a remote @@ -1045,6 +1038,31 @@ apply_handle_stream_commit(StringInfo s) elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", nchanges, path); + return nchanges; +} + +/* + * Handle STREAM COMMIT message. + */ +static void +apply_handle_stream_commit(StringInfo s) +{ + TransactionId xid; + LogicalRepCommitData commit_data; + int nchanges = 0; + + Assert(!in_streamed_transaction); + + xid = logicalrep_read_stream_commit(s, &commit_data); + + elog(DEBUG1, "received commit for streamed transaction %u", xid); + + ensure_transaction(); + + nchanges = apply_spooled_messages(xid, commit_data.commit_lsn); + + elog(DEBUG1, "apply_handle_stream_commit: replayed %d (all) changes.", nchanges); + apply_handle_commit_internal(s, &commit_data); /* unlink the files with serialized changes and subxact info */ -- 1.8.3.1