From f2e8feb8bfcd02dadd00f64551374a2a03bf025a Mon Sep 17 00:00:00 2001 From: Shveta Malik Date: Fri, 29 Dec 2023 13:15:15 +0530 Subject: [PATCH v56 4/4] Non replication connection and app_name change. Changes in this patch: 1) Convert replication connection to non-replication one in slotsync worker. 2) Use app_name as {cluster_name}_slotsyncworker in the slotsync worker connection. --- src/backend/commands/subscriptioncmds.c | 8 ++--- .../libpqwalreceiver/libpqwalreceiver.c | 35 ++++++++++++------- src/backend/replication/logical/slotsync.c | 13 +++++-- src/backend/replication/logical/tablesync.c | 2 +- src/backend/replication/logical/worker.c | 2 +- src/backend/replication/walreceiver.c | 2 +- src/include/replication/walreceiver.h | 5 +-- 7 files changed, 44 insertions(+), 23 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 248e287b08..8e63932325 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -755,7 +755,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, /* Try to connect to the publisher. */ must_use_password = !superuser_arg(owner) && opts.passwordrequired; - wrconn = walrcv_connect(conninfo, true, must_use_password, + wrconn = walrcv_connect(conninfo, true, true, must_use_password, stmt->subname, &err); if (!wrconn) ereport(ERROR, @@ -926,7 +926,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, /* Try to connect to the publisher. */ must_use_password = sub->passwordrequired && !sub->ownersuperuser; - wrconn = walrcv_connect(sub->conninfo, true, must_use_password, + wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password, sub->name, &err); if (!wrconn) ereport(ERROR, @@ -1607,7 +1607,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Try to connect to the publisher. */ must_use_password = sub->passwordrequired && !sub->ownersuperuser; - wrconn = walrcv_connect(sub->conninfo, true, must_use_password, + wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password, sub->name, &err); if (!wrconn) ereport(ERROR, @@ -1858,7 +1858,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) */ load_file("libpqwalreceiver", false); - wrconn = walrcv_connect(conninfo, true, must_use_password, + wrconn = walrcv_connect(conninfo, true, true, must_use_password, subname, &err); if (wrconn == NULL) { diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 5661e4cb83..cfd14deb6a 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -50,7 +50,8 @@ struct WalReceiverConn /* Prototypes for interface functions */ static WalReceiverConn *libpqrcv_connect(const char *conninfo, - bool logical, bool must_use_password, + bool replication, bool logical, + bool must_use_password, const char *appname, char **err); static void libpqrcv_check_conninfo(const char *conninfo, bool must_use_password); @@ -136,8 +137,8 @@ _PG_init(void) * case. */ static WalReceiverConn * -libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password, - const char *appname, char **err) +libpqrcv_connect(const char *conninfo, bool replication, bool logical, + bool must_use_password, const char *appname, char **err) { WalReceiverConn *conn; const char *keys[6]; @@ -150,17 +151,27 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password, */ keys[i] = "dbname"; vals[i] = conninfo; - keys[++i] = "replication"; - vals[i] = logical ? "database" : "true"; - if (!logical) + + /* We can not have logical w/o replication */ + if(!replication) + Assert(!logical); + + if (replication) { - /* - * The database name is ignored by the server in replication mode, but - * specify "replication" for .pgpass lookup. - */ - keys[++i] = "dbname"; - vals[i] = "replication"; + keys[++i] = "replication"; + vals[i] = logical ? "database" : "true"; + + if (!logical) + { + /* + * The database name is ignored by the server in replication mode, + * but specify "replication" for .pgpass lookup. + */ + keys[++i] = "dbname"; + vals[i] = "replication"; + } } + keys[++i] = "fallback_application_name"; vals[i] = appname; if (logical) diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 94bd5416b8..22eb06ab3e 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -1103,6 +1103,7 @@ ReplSlotSyncWorkerMain(Datum main_arg) char *dbname; bool am_cascading_standby; char *err; + StringInfoData app_name; ereport(LOG, errmsg("replication slot sync worker started")); @@ -1135,13 +1136,21 @@ ReplSlotSyncWorkerMain(Datum main_arg) */ BackgroundWorkerInitializeConnection(dbname, NULL, 0); + initStringInfo(&app_name); + if (cluster_name[0]) + appendStringInfo(&app_name, "%s_%s", cluster_name, "slotsyncworker"); + else + appendStringInfo(&app_name, "%s", "slotsyncworker"); + /* * Establish the connection to the primary server for slots * synchronization. */ - wrconn = walrcv_connect(PrimaryConnInfo, true, false, - cluster_name[0] ? cluster_name : "slotsyncworker", + wrconn = walrcv_connect(PrimaryConnInfo, false, false, false, + app_name.data, &err); + pfree(app_name.data); + if (wrconn == NULL) ereport(ERROR, errcode(ERRCODE_CONNECTION_FAILURE), diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 7b6170fe55..a6ea416d1e 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1342,7 +1342,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * so that synchronous replication can distinguish them. */ LogRepWorkerWalRcvConn = - walrcv_connect(MySubscription->conninfo, true, + walrcv_connect(MySubscription->conninfo, true, true, must_use_password, slotname, &err); if (LogRepWorkerWalRcvConn == NULL) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 7b3784c212..1b85641e81 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4554,7 +4554,7 @@ run_apply_worker() must_use_password = MySubscription->passwordrequired && !MySubscription->ownersuperuser; - LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, + LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, true, must_use_password, MySubscription->name, &err); diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index ca61a99785..4d49e3d5a4 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -296,7 +296,7 @@ WalReceiverMain(void) sigprocmask(SIG_SETMASK, &UnBlockSig, NULL); /* Establish the connection to the primary for XLOG streaming */ - wrconn = walrcv_connect(conninfo, false, false, + wrconn = walrcv_connect(conninfo, true, false, false, cluster_name[0] ? cluster_name : "walreceiver", &err); if (!wrconn) diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 259d0f7065..ca8b6621c7 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -237,6 +237,7 @@ typedef struct WalRcvExecResult * returned with 'err' including the error generated. */ typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, + bool replication, bool logical, bool must_use_password, const char *appname, @@ -434,8 +435,8 @@ typedef struct WalReceiverFunctionsType extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; -#define walrcv_connect(conninfo, logical, must_use_password, appname, err) \ - WalReceiverFunctions->walrcv_connect(conninfo, logical, must_use_password, appname, err) +#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err) \ + WalReceiverFunctions->walrcv_connect(conninfo, replication, logical, must_use_password, appname, err) #define walrcv_check_conninfo(conninfo, must_use_password) \ WalReceiverFunctions->walrcv_check_conninfo(conninfo, must_use_password) #define walrcv_get_conninfo(conn) \ -- 2.30.0.windows.2