From 1c7c49c173fa53a55a6c55a6295d45501ef8e30f Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Wed, 10 Jan 2024 18:27:16 +0800 Subject: [PATCH v59 4/4] Non replication connection and app_name change. Changes in this patch: 1) Convert replication connection to non-replication one in slotsync worker. 2) Use app_name as {cluster_name}_slotsyncworker in the slotsync worker connection. --- src/backend/commands/subscriptioncmds.c | 8 ++-- .../libpqwalreceiver/libpqwalreceiver.c | 44 +++++++++++++------ src/backend/replication/logical/slotsync.c | 13 +++++- src/backend/replication/logical/tablesync.c | 2 +- src/backend/replication/logical/worker.c | 2 +- src/backend/replication/walreceiver.c | 2 +- src/include/replication/walreceiver.h | 5 ++- 7 files changed, 52 insertions(+), 24 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index f50be29d99..7b12671ef1 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -753,7 +753,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, /* Try to connect to the publisher. */ must_use_password = !superuser_arg(owner) && opts.passwordrequired; - wrconn = walrcv_connect(conninfo, true, must_use_password, + wrconn = walrcv_connect(conninfo, true, true, must_use_password, stmt->subname, &err); if (!wrconn) ereport(ERROR, @@ -904,7 +904,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, /* Try to connect to the publisher. */ must_use_password = sub->passwordrequired && !sub->ownersuperuser; - wrconn = walrcv_connect(sub->conninfo, true, must_use_password, + wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password, sub->name, &err); if (!wrconn) ereport(ERROR, @@ -1530,7 +1530,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Try to connect to the publisher. */ must_use_password = sub->passwordrequired && !sub->ownersuperuser; - wrconn = walrcv_connect(sub->conninfo, true, must_use_password, + wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password, sub->name, &err); if (!wrconn) ereport(ERROR, @@ -1781,7 +1781,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) */ load_file("libpqwalreceiver", false); - wrconn = walrcv_connect(conninfo, true, must_use_password, + wrconn = walrcv_connect(conninfo, true, true, must_use_password, subname, &err); if (wrconn == NULL) { diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index f910a3b103..8aa0103d8f 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -6,6 +6,9 @@ * loaded as a dynamic module to avoid linking the main server binary with * libpq. * + * Apart from walreceiver, the libpq-specific routines here are now being used + * by logical replication workers and slotsync worker as well. + * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group * * @@ -50,7 +53,8 @@ struct WalReceiverConn /* Prototypes for interface functions */ static WalReceiverConn *libpqrcv_connect(const char *conninfo, - bool logical, bool must_use_password, + bool replication, bool logical, + bool must_use_password, const char *appname, char **err); static void libpqrcv_check_conninfo(const char *conninfo, bool must_use_password); @@ -125,7 +129,12 @@ _PG_init(void) } /* - * Establish the connection to the primary server for XLOG streaming + * Establish the connection to the primary server. + * + * The connection established could be either a replication one or + * a non-replication one based on input argument 'replication'. And further + * if it is a replication connection, it could be either logical or physical + * based on input argument 'logical'. * * If an error occurs, this function will normally return NULL and set *err * to a palloc'ed error message. However, if must_use_password is true and @@ -136,8 +145,8 @@ _PG_init(void) * case. */ static WalReceiverConn * -libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password, - const char *appname, char **err) +libpqrcv_connect(const char *conninfo, bool replication, bool logical, + bool must_use_password, const char *appname, char **err) { WalReceiverConn *conn; const char *keys[6]; @@ -150,17 +159,26 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password, */ keys[i] = "dbname"; vals[i] = conninfo; - keys[++i] = "replication"; - vals[i] = logical ? "database" : "true"; - if (!logical) + + /* We can not have logical without replication */ + if (!replication) + Assert(!logical); + else { - /* - * The database name is ignored by the server in replication mode, but - * specify "replication" for .pgpass lookup. - */ - keys[++i] = "dbname"; - vals[i] = "replication"; + keys[++i] = "replication"; + vals[i] = logical ? "database" : "true"; + + if (!logical) + { + /* + * The database name is ignored by the server in replication mode, + * but specify "replication" for .pgpass lookup. + */ + keys[++i] = "dbname"; + vals[i] = "replication"; + } } + keys[++i] = "fallback_application_name"; vals[i] = appname; if (logical) diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 45c5bb0a37..e252490cf3 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -971,6 +971,7 @@ ReplSlotSyncWorkerMain(Datum main_arg) char *dbname; bool am_cascading_standby; char *err; + StringInfoData app_name; ereport(LOG, errmsg("replication slot sync worker started")); @@ -1003,13 +1004,21 @@ ReplSlotSyncWorkerMain(Datum main_arg) */ BackgroundWorkerInitializeConnection(dbname, NULL, 0); + initStringInfo(&app_name); + if (cluster_name[0]) + appendStringInfo(&app_name, "%s_%s", cluster_name, "slotsyncworker"); + else + appendStringInfo(&app_name, "%s", "slotsyncworker"); + /* * Establish the connection to the primary server for slots * synchronization. */ - wrconn = walrcv_connect(PrimaryConnInfo, true, false, - cluster_name[0] ? cluster_name : "slotsyncworker", + wrconn = walrcv_connect(PrimaryConnInfo, false, false, false, + app_name.data, &err); + pfree(app_name.data); + if (wrconn == NULL) ereport(ERROR, errcode(ERRCODE_CONNECTION_FAILURE), diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 5acab3f3e2..ee06629088 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1329,7 +1329,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * so that synchronous replication can distinguish them. */ LogRepWorkerWalRcvConn = - walrcv_connect(MySubscription->conninfo, true, + walrcv_connect(MySubscription->conninfo, true, true, must_use_password, slotname, &err); if (LogRepWorkerWalRcvConn == NULL) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 3dea10f9b3..4effb62849 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4518,7 +4518,7 @@ run_apply_worker() must_use_password = MySubscription->passwordrequired && !MySubscription->ownersuperuser; - LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, + LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, true, must_use_password, MySubscription->name, &err); diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index ffacd55e5c..f34ab09ac6 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -296,7 +296,7 @@ WalReceiverMain(void) sigprocmask(SIG_SETMASK, &UnBlockSig, NULL); /* Establish the connection to the primary for XLOG streaming */ - wrconn = walrcv_connect(conninfo, false, false, + wrconn = walrcv_connect(conninfo, true, false, false, cluster_name[0] ? cluster_name : "walreceiver", &err); if (!wrconn) diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 5e942cb4fc..68ac074274 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -237,6 +237,7 @@ typedef struct WalRcvExecResult * returned with 'err' including the error generated. */ typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, + bool replication, bool logical, bool must_use_password, const char *appname, @@ -434,8 +435,8 @@ typedef struct WalReceiverFunctionsType extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; -#define walrcv_connect(conninfo, logical, must_use_password, appname, err) \ - WalReceiverFunctions->walrcv_connect(conninfo, logical, must_use_password, appname, err) +#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err) \ + WalReceiverFunctions->walrcv_connect(conninfo, replication, logical, must_use_password, appname, err) #define walrcv_check_conninfo(conninfo, must_use_password) \ WalReceiverFunctions->walrcv_check_conninfo(conninfo, must_use_password) #define walrcv_get_conninfo(conn) \ -- 2.30.0.windows.2