From bd017528754a625a9070045874e11990599fcc17 Mon Sep 17 00:00:00 2001 From: Shveta Malik Date: Tue, 23 Jan 2024 15:48:53 +0530 Subject: [PATCH v67 5/6] Non replication connection and app_name change. This patch has converted replication connection to non-replication one in the slotsync worker. It has also changed the app_name to {cluster_name}_slotsyncworker in the slotsync worker connection. --- src/backend/commands/subscriptioncmds.c | 12 +++-- .../libpqwalreceiver/libpqwalreceiver.c | 44 +++++++++++++------ src/backend/replication/logical/slotsync.c | 14 +++++- src/backend/replication/logical/tablesync.c | 2 +- src/backend/replication/logical/worker.c | 4 +- src/backend/replication/walreceiver.c | 3 +- src/include/replication/walreceiver.h | 5 ++- 7 files changed, 60 insertions(+), 24 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 15bb10da54..f17c666ebc 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -753,7 +753,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, /* Try to connect to the publisher. */ must_use_password = !superuser_arg(owner) && opts.passwordrequired; - wrconn = walrcv_connect(conninfo, true, must_use_password, + wrconn = walrcv_connect(conninfo, true /* replication */ , + true, must_use_password, stmt->subname, &err); if (!wrconn) ereport(ERROR, @@ -904,7 +905,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, /* Try to connect to the publisher. */ must_use_password = sub->passwordrequired && !sub->ownersuperuser; - wrconn = walrcv_connect(sub->conninfo, true, must_use_password, + wrconn = walrcv_connect(sub->conninfo, true /* replication */ , + true, must_use_password, sub->name, &err); if (!wrconn) ereport(ERROR, @@ -1531,7 +1533,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Try to connect to the publisher. */ must_use_password = sub->passwordrequired && !sub->ownersuperuser; - wrconn = walrcv_connect(sub->conninfo, true, must_use_password, + wrconn = walrcv_connect(sub->conninfo, true /* replication */ , + true, must_use_password, sub->name, &err); if (!wrconn) ereport(ERROR, @@ -1782,7 +1785,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) */ load_file("libpqwalreceiver", false); - wrconn = walrcv_connect(conninfo, true, must_use_password, + wrconn = walrcv_connect(conninfo, true /* replication */ , + true, must_use_password, subname, &err); if (wrconn == NULL) { diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index ae76c098b1..13a787b8bf 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -6,6 +6,9 @@ * loaded as a dynamic module to avoid linking the main server binary with * libpq. * + * Apart from walreceiver, the libpq-specific routines here are now being used + * by logical replication workers and slotsync worker as well. + * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group * * @@ -49,7 +52,8 @@ struct WalReceiverConn /* Prototypes for interface functions */ static WalReceiverConn *libpqrcv_connect(const char *conninfo, - bool logical, bool must_use_password, + bool replication, bool logical, + bool must_use_password, const char *appname, char **err); static void libpqrcv_check_conninfo(const char *conninfo, bool must_use_password); @@ -124,7 +128,12 @@ _PG_init(void) } /* - * Establish the connection to the primary server for XLOG streaming + * Establish the connection to the primary server. + * + * The connection established could be either a replication one or + * a non-replication one based on input argument 'replication'. And further + * if it is a replication connection, it could be either logical or physical + * based on input argument 'logical'. * * If an error occurs, this function will normally return NULL and set *err * to a palloc'ed error message. However, if must_use_password is true and @@ -135,8 +144,8 @@ _PG_init(void) * case. */ static WalReceiverConn * -libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password, - const char *appname, char **err) +libpqrcv_connect(const char *conninfo, bool replication, bool logical, + bool must_use_password, const char *appname, char **err) { WalReceiverConn *conn; PostgresPollingStatusType status; @@ -159,17 +168,26 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password, */ keys[i] = "dbname"; vals[i] = conninfo; - keys[++i] = "replication"; - vals[i] = logical ? "database" : "true"; - if (!logical) + + /* We can not have logical without replication */ + if (!replication) + Assert(!logical); + else { - /* - * The database name is ignored by the server in replication mode, but - * specify "replication" for .pgpass lookup. - */ - keys[++i] = "dbname"; - vals[i] = "replication"; + keys[++i] = "replication"; + vals[i] = logical ? "database" : "true"; + + if (!logical) + { + /* + * The database name is ignored by the server in replication mode, + * but specify "replication" for .pgpass lookup. + */ + keys[++i] = "dbname"; + vals[i] = "replication"; + } } + keys[++i] = "fallback_application_name"; vals[i] = appname; if (logical) diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 2a033647b3..39d66b3ed0 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -1067,6 +1067,7 @@ ReplSlotSyncWorkerMain(int argc, char *argv[]) bool primary_slot_invalid; char *err; sigjmp_buf local_sigjmp_buf; + StringInfoData app_name; am_slotsync_worker = true; @@ -1176,13 +1177,22 @@ ReplSlotSyncWorkerMain(int argc, char *argv[]) SetProcessingMode(NormalProcessing); + initStringInfo(&app_name); + if (cluster_name[0]) + appendStringInfo(&app_name, "%s_%s", cluster_name, "slotsyncworker"); + else + appendStringInfo(&app_name, "%s", "slotsyncworker"); + /* * Establish the connection to the primary server for slots * synchronization. */ - wrconn = walrcv_connect(PrimaryConnInfo, true, false, - cluster_name[0] ? cluster_name : "slotsyncworker", + wrconn = walrcv_connect(PrimaryConnInfo, false /* replication */, + false, false, + app_name.data, &err); + pfree(app_name.data); + if (!wrconn) ereport(ERROR, errcode(ERRCODE_CONNECTION_FAILURE), diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 5acab3f3e2..c5c6ac4bac 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1329,7 +1329,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * so that synchronous replication can distinguish them. */ LogRepWorkerWalRcvConn = - walrcv_connect(MySubscription->conninfo, true, + walrcv_connect(MySubscription->conninfo, true /* replication */ , true, must_use_password, slotname, &err); if (LogRepWorkerWalRcvConn == NULL) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 32ff4c0336..0d791c188c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4518,7 +4518,9 @@ run_apply_worker() must_use_password = MySubscription->passwordrequired && !MySubscription->ownersuperuser; - LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, + LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, + true /* replication */ , + true, must_use_password, MySubscription->name, &err); diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index e29a6196a3..872cccb54b 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -296,7 +296,8 @@ WalReceiverMain(void) sigprocmask(SIG_SETMASK, &UnBlockSig, NULL); /* Establish the connection to the primary for XLOG streaming */ - wrconn = walrcv_connect(conninfo, false, false, + wrconn = walrcv_connect(conninfo, true /* replication */ , + false, false, cluster_name[0] ? cluster_name : "walreceiver", &err); if (!wrconn) diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 48dc846e19..1b563a16cd 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -237,6 +237,7 @@ typedef struct WalRcvExecResult * returned with 'err' including the error generated. */ typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, + bool replication, bool logical, bool must_use_password, const char *appname, @@ -426,8 +427,8 @@ typedef struct WalReceiverFunctionsType extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; -#define walrcv_connect(conninfo, logical, must_use_password, appname, err) \ - WalReceiverFunctions->walrcv_connect(conninfo, logical, must_use_password, appname, err) +#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err) \ + WalReceiverFunctions->walrcv_connect(conninfo, replication, logical, must_use_password, appname, err) #define walrcv_check_conninfo(conninfo, must_use_password) \ WalReceiverFunctions->walrcv_check_conninfo(conninfo, must_use_password) #define walrcv_get_conninfo(conn) \ -- 2.34.1