From 9e8b4ddd829f73a5520a8796c7e74f2589a8d969 Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Thu, 6 May 2021 18:57:38 +1000 Subject: [PATCH v3] Fix wrconn. Use stack variable. This patch replaces the global "wrconn" in AlterSubscription_refresh with a local variable of the same name, making it consistent with other functions in subscriptioncmds.c (e.g. DropSubscription). The global wrconn is only meant to be used for logical apply/tablesync worker. To reduce future confusion it has renamed from "wrconn" to "lrep_worker_wrconn". Using the global/incorrect wrconn in AlterSubscription_refresh doesn't normally cause any problems, but harm is still posslble if the apply worker ever manages to do a subscription refresh. e.g. see [1]. [1] https://www.postgresql.org/message-id/20201111215820.qihhrz7fayu6myfi%40alap3.anarazel.de --- src/backend/commands/subscriptioncmds.c | 16 ++++++++-------- src/backend/replication/logical/launcher.c | 4 ++-- src/backend/replication/logical/tablesync.c | 29 +++++++++++++++-------------- src/backend/replication/logical/worker.c | 20 ++++++++++---------- src/include/replication/worker_internal.h | 2 +- 5 files changed, 36 insertions(+), 35 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 517c8ed..1096aa8 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -556,18 +556,19 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) char state; } SubRemoveRels; SubRemoveRels *sub_remove_rels; + WalReceiverConn *wrconn; /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); + /* Try to connect to the publisher. */ + wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the publisher: %s", err))); + PG_TRY(); { - /* Try to connect to the publisher. */ - wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); - if (!wrconn) - ereport(ERROR, - (errmsg("could not connect to the publisher: %s", err))); - /* Get the table list from publisher. */ pubrel_names = fetch_table_list(wrconn, sub->publications); @@ -737,8 +738,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) } PG_FINALLY(); { - if (wrconn) - walrcv_disconnect(wrconn); + walrcv_disconnect(wrconn); } PG_END_TRY(); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index cb462a0..a39ae17 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -649,8 +649,8 @@ static void logicalrep_worker_onexit(int code, Datum arg) { /* Disconnect gracefully from the remote side. */ - if (wrconn) - walrcv_disconnect(wrconn); + if (lrep_worker_wrconn) + walrcv_disconnect(lrep_worker_wrconn); logicalrep_worker_detach(); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 0638f5c..eda9f23 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -302,8 +302,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn); - /* End wal streaming so wrconn can be re-used to drop the slot. */ - walrcv_endstreaming(wrconn, &tli); + /* End wal streaming so lrep_worker_wrconn can be re-used to drop the slot. */ + walrcv_endstreaming(lrep_worker_wrconn, &tli); /* * Cleanup the tablesync slot. @@ -322,7 +322,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) * otherwise, it won't be dropped till the corresponding subscription * is dropped. So passing missing_ok = false. */ - ReplicationSlotDropAtPubNode(wrconn, syncslotname, false); + ReplicationSlotDropAtPubNode(lrep_worker_wrconn, syncslotname, false); finish_sync_worker(); } @@ -642,7 +642,7 @@ copy_read_data(void *outbuf, int minread, int maxread) for (;;) { /* Try read the data. */ - len = walrcv_receive(wrconn, &buf, &fd); + len = walrcv_receive(lrep_worker_wrconn, &buf, &fd); CHECK_FOR_INTERRUPTS(); @@ -715,7 +715,7 @@ fetch_remote_table_info(char *nspname, char *relname, " AND c.relname = %s", quote_literal_cstr(nspname), quote_literal_cstr(relname)); - res = walrcv_exec(wrconn, cmd.data, lengthof(tableRow), tableRow); + res = walrcv_exec(lrep_worker_wrconn, cmd.data, lengthof(tableRow), tableRow); if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, @@ -752,9 +752,10 @@ fetch_remote_table_info(char *nspname, char *relname, " AND a.attrelid = %u" " ORDER BY a.attnum", lrel->remoteid, - (walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""), + (walrcv_server_version(lrep_worker_wrconn) >= 120000 ? + "AND a.attgenerated = ''" : ""), lrel->remoteid); - res = walrcv_exec(wrconn, cmd.data, lengthof(attrRow), attrRow); + res = walrcv_exec(lrep_worker_wrconn, cmd.data, lengthof(attrRow), attrRow); if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, @@ -841,7 +842,7 @@ copy_table(Relation rel) appendStringInfo(&cmd, " FROM %s) TO STDOUT", quote_qualified_identifier(lrel.nspname, lrel.relname)); } - res = walrcv_exec(wrconn, cmd.data, 0, NULL); + res = walrcv_exec(lrep_worker_wrconn, cmd.data, 0, NULL); pfree(cmd.data); if (res->status != WALRCV_OK_COPY_OUT) ereport(ERROR, @@ -957,8 +958,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * application_name, so that it is different from the main apply worker, * so that synchronous replication can distinguish them. */ - wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err); - if (wrconn == NULL) + lrep_worker_wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err); + if (lrep_worker_wrconn == NULL) ereport(ERROR, (errmsg("could not connect to the publisher: %s", err))); @@ -985,7 +986,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * breakdown then it wouldn't have succeeded so trying it next time * seems like a better bet. */ - ReplicationSlotDropAtPubNode(wrconn, slotname, true); + ReplicationSlotDropAtPubNode(lrep_worker_wrconn, slotname, true); } else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY) { @@ -1038,7 +1039,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * ensures that both the replication slot we create (see below) and the * COPY are consistent with each other. */ - res = walrcv_exec(wrconn, + res = walrcv_exec(lrep_worker_wrconn, "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ", 0, NULL); if (res->status != WALRCV_OK_COMMAND) @@ -1058,7 +1059,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * slot leading to a dangling slot on the server. */ HOLD_INTERRUPTS(); - walrcv_create_slot(wrconn, slotname, false /* permanent */ , + walrcv_create_slot(lrep_worker_wrconn, slotname, false /* permanent */ , CRS_USE_SNAPSHOT, origin_startpos); RESUME_INTERRUPTS(); @@ -1100,7 +1101,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) copy_table(rel); PopActiveSnapshot(); - res = walrcv_exec(wrconn, "COMMIT", 0, NULL); + res = walrcv_exec(lrep_worker_wrconn, "COMMIT", 0, NULL); if (res->status != WALRCV_OK_COMMAND) ereport(ERROR, (errmsg("table copy could not finish transaction on publisher: %s", diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index d9f1571..181b716 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -156,7 +156,7 @@ MemoryContext ApplyContext = NULL; /* per stream context for streaming transactions */ static MemoryContext LogicalStreamingContext = NULL; -WalReceiverConn *wrconn = NULL; +WalReceiverConn *lrep_worker_wrconn = NULL; Subscription *MySubscription = NULL; bool MySubscriptionValid = false; @@ -2126,7 +2126,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) MemoryContextSwitchTo(ApplyMessageContext); - len = walrcv_receive(wrconn, &buf, &fd); + len = walrcv_receive(lrep_worker_wrconn, &buf, &fd); if (len != 0) { @@ -2206,7 +2206,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) MemoryContextReset(ApplyMessageContext); } - len = walrcv_receive(wrconn, &buf, &fd); + len = walrcv_receive(lrep_worker_wrconn, &buf, &fd); } } @@ -2312,7 +2312,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) } /* All done */ - walrcv_endstreaming(wrconn, &tli); + walrcv_endstreaming(lrep_worker_wrconn, &tli); } /* @@ -2396,7 +2396,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) LSN_FORMAT_ARGS(writepos), LSN_FORMAT_ARGS(flushpos)); - walrcv_send(wrconn, reply_message->data, reply_message->len); + walrcv_send(lrep_worker_wrconn, reply_message->data, reply_message->len); if (recvpos > last_recvpos) last_recvpos = recvpos; @@ -3090,9 +3090,9 @@ ApplyWorkerMain(Datum main_arg) origin_startpos = replorigin_session_get_progress(false); CommitTransactionCommand(); - wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name, + lrep_worker_wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name, &err); - if (wrconn == NULL) + if (lrep_worker_wrconn == NULL) ereport(ERROR, (errmsg("could not connect to the publisher: %s", err))); @@ -3100,7 +3100,7 @@ ApplyWorkerMain(Datum main_arg) * We don't really use the output identify_system for anything but it * does some initializations on the upstream so let's still call it. */ - (void) walrcv_identify_system(wrconn, &startpointTLI); + (void) walrcv_identify_system(lrep_worker_wrconn, &startpointTLI); } /* @@ -3116,14 +3116,14 @@ ApplyWorkerMain(Datum main_arg) options.startpoint = origin_startpos; options.slotname = myslotname; options.proto.logical.proto_version = - walrcv_server_version(wrconn) >= 140000 ? + walrcv_server_version(lrep_worker_wrconn) >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM; options.proto.logical.publication_names = MySubscription->publications; options.proto.logical.binary = MySubscription->binary; options.proto.logical.streaming = MySubscription->stream; /* Start normal logical streaming replication. */ - walrcv_startstreaming(wrconn, &options); + walrcv_startstreaming(lrep_worker_wrconn, &options); /* Run the main loop. */ LogicalRepApplyLoop(origin_startpos); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 1cac75e..9209991 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -62,7 +62,7 @@ typedef struct LogicalRepWorker extern MemoryContext ApplyContext; /* libpqreceiver connection */ -extern struct WalReceiverConn *wrconn; +extern struct WalReceiverConn *lrep_worker_wrconn; /* Worker and subscription objects. */ extern Subscription *MySubscription; -- 1.8.3.1