From 4511eb68cf8471ed31f0a690b507b7fdd9ccc9e7 Mon Sep 17 00:00:00 2001 From: Shveta Malik Date: Thu, 14 Mar 2024 11:20:00 +0530 Subject: [PATCH v1] Use user as dbname for slot sync --- .../libpqwalreceiver/libpqwalreceiver.c | 45 ++----------- src/backend/replication/logical/slotsync.c | 63 ++++++------------- src/backend/replication/slotfuncs.c | 2 - src/include/replication/slotsync.h | 1 - src/include/replication/walreceiver.h | 12 ++-- src/interfaces/libpq/exports.txt | 1 + src/interfaces/libpq/fe-exec.c | 9 +++ src/interfaces/libpq/libpq-fe.h | 1 + 8 files changed, 42 insertions(+), 92 deletions(-) diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 761bf0f677..9c28f956c1 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -59,7 +59,7 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port); static char *libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli); -static char *libpqrcv_get_dbname_from_conninfo(const char *conninfo); +static char *libpqrcv_get_dbname_from_conn(WalReceiverConn *conn); static int libpqrcv_server_version(WalReceiverConn *conn); static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, @@ -102,7 +102,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { .walrcv_send = libpqrcv_send, .walrcv_create_slot = libpqrcv_create_slot, .walrcv_alter_slot = libpqrcv_alter_slot, - .walrcv_get_dbname_from_conninfo = libpqrcv_get_dbname_from_conninfo, + .walrcv_get_dbname_from_conn = libpqrcv_get_dbname_from_conn, .walrcv_get_backend_pid = libpqrcv_get_backend_pid, .walrcv_exec = libpqrcv_exec, .walrcv_disconnect = libpqrcv_disconnect @@ -494,47 +494,12 @@ libpqrcv_server_version(WalReceiverConn *conn) } /* - * Get database name from the primary server's conninfo. - * - * If dbname is not found in connInfo, return NULL value. + * Get database name from the connection. */ static char * -libpqrcv_get_dbname_from_conninfo(const char *connInfo) +libpqrcv_get_dbname_from_conn(WalReceiverConn *conn) { - PQconninfoOption *opts; - char *dbname = NULL; - char *err = NULL; - - opts = PQconninfoParse(connInfo, &err); - if (opts == NULL) - { - /* The error string is malloc'd, so we must free it explicitly */ - char *errcopy = err ? pstrdup(err) : "out of memory"; - - PQfreemem(err); - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("invalid connection string syntax: %s", errcopy))); - } - - for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt) - { - /* - * If multiple dbnames are specified, then the last one will be - * returned - */ - if (strcmp(opt->keyword, "dbname") == 0 && opt->val && - *opt->val) - { - if (dbname) - pfree(dbname); - - dbname = pstrdup(opt->val); - } - } - - PQconninfoFree(opts); - return dbname; + return PQgetDbname(conn->streamConn); } /* diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 5074c8409f..57c59457c8 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -891,33 +891,6 @@ validate_remote_info(WalReceiverConn *wrconn) CommitTransactionCommand(); } -/* - * Checks if dbname is specified in 'primary_conninfo'. - * - * Error out if not specified otherwise return it. - */ -char * -CheckAndGetDbnameFromConninfo(void) -{ - char *dbname; - - /* - * The slot synchronization needs a database connection for walrcv_exec to - * work. - */ - dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo); - if (dbname == NULL) - ereport(ERROR, - - /* - * translator: dbname is a specific option; %s is a GUC variable name - */ - errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("slot synchronization requires dbname to be specified in %s", - "primary_conninfo")); - return dbname; -} - /* * Return true if all necessary GUCs for slot synchronization are set * appropriately, otherwise, return false. @@ -1228,22 +1201,6 @@ ReplSlotSyncWorkerMain(int argc, char *argv[]) */ SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE); - dbname = CheckAndGetDbnameFromConninfo(); - - /* - * Connect to the database specified by the user in primary_conninfo. We - * need a database connection for walrcv_exec to work which we use to - * fetch slot information from the remote node. See comments atop - * libpqrcv_exec. - * - * We do not specify a specific user here since the slot sync worker will - * operate as a superuser. This is safe because the slot sync worker does - * not interact with user tables, eliminating the risk of executing - * arbitrary code within triggers. - */ - InitPostgres(dbname, InvalidOid, NULL, InvalidOid, 0, NULL); - - SetProcessingMode(NormalProcessing); initStringInfo(&app_name); if (cluster_name[0]) @@ -1264,6 +1221,26 @@ ReplSlotSyncWorkerMain(int argc, char *argv[]) errcode(ERRCODE_CONNECTION_FAILURE), errmsg("could not connect to the primary server: %s", err)); + /* There must be dbName initialized in wrconn */ + dbname = walrcv_get_dbname_from_conn(wrconn); + Assert(dbname); + + /* + * Connect to the database which is used by libpq to make connection(wrconn). + * We need a database connection for walrcv_exec to work which we use to + * fetch slot information from the remote node. See comments atop + * libpqrcv_exec. + * + * We do not specify a specific user here since the slot sync worker will + * operate as a superuser. This is safe because the slot sync worker does + * not interact with user tables, eliminating the risk of executing + * arbitrary code within triggers. + */ + InitPostgres(dbname, InvalidOid, NULL, InvalidOid, 0, NULL); + + SetProcessingMode(NormalProcessing); + + /* * Register the failure callback once we have the connection. * diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index ad79e1fccd..205e790309 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -977,8 +977,6 @@ pg_sync_replication_slots(PG_FUNCTION_ARGS) /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); - (void) CheckAndGetDbnameFromConninfo(); - initStringInfo(&app_name); if (cluster_name[0]) appendStringInfo(&app_name, "%s_slotsync", cluster_name); diff --git a/src/include/replication/slotsync.h b/src/include/replication/slotsync.h index dca57c5020..3f556a9623 100644 --- a/src/include/replication/slotsync.h +++ b/src/include/replication/slotsync.h @@ -23,7 +23,6 @@ extern PGDLLIMPORT bool sync_replication_slots; extern PGDLLIMPORT char *PrimaryConnInfo; extern PGDLLIMPORT char *PrimarySlotName; -extern char *CheckAndGetDbnameFromConninfo(void); extern bool ValidateSlotSyncParams(int elevel); #ifdef EXEC_BACKEND diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index b906bb5ce8..6ede80c250 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -283,11 +283,11 @@ typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn, TimeLineID *primary_tli); /* - * walrcv_get_dbname_from_conninfo_fn + * walrcv_get_dbname_from_conn_fn * - * Returns the database name from the primary_conninfo + * Returns the database name from the connection. */ -typedef char *(*walrcv_get_dbname_from_conninfo_fn) (const char *conninfo); +typedef char *(*walrcv_get_dbname_from_conn_fn) (WalReceiverConn *conn); /* * walrcv_server_version_fn @@ -413,7 +413,7 @@ typedef struct WalReceiverFunctionsType walrcv_get_conninfo_fn walrcv_get_conninfo; walrcv_get_senderinfo_fn walrcv_get_senderinfo; walrcv_identify_system_fn walrcv_identify_system; - walrcv_get_dbname_from_conninfo_fn walrcv_get_dbname_from_conninfo; + walrcv_get_dbname_from_conn_fn walrcv_get_dbname_from_conn; walrcv_server_version_fn walrcv_server_version; walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile; walrcv_startstreaming_fn walrcv_startstreaming; @@ -439,8 +439,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port) #define walrcv_identify_system(conn, primary_tli) \ WalReceiverFunctions->walrcv_identify_system(conn, primary_tli) -#define walrcv_get_dbname_from_conninfo(conninfo) \ - WalReceiverFunctions->walrcv_get_dbname_from_conninfo(conninfo) +#define walrcv_get_dbname_from_conn(conn) \ + WalReceiverFunctions->walrcv_get_dbname_from_conn(conn) #define walrcv_server_version(conn) \ WalReceiverFunctions->walrcv_server_version(conn) #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \ diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 9fbd3d3407..e879116a0d 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -202,3 +202,4 @@ PQcancelSocket 199 PQcancelErrorMessage 200 PQcancelReset 201 PQcancelFinish 202 +PQgetDbname 203 diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index c02a9180b2..4b61cebdf7 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -2917,6 +2917,15 @@ PQendcopy(PGconn *conn) } +char * +PQgetDbname(PGconn *conn) +{ + if (!conn) + return 0; + + return conn->dbName; +} + /* ---------------- * PQfn - Send a function call to the POSTGRES backend. * diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index 09b485bd2b..eac2984d1c 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -516,6 +516,7 @@ extern int PQputline(PGconn *conn, const char *string); extern int PQgetlineAsync(PGconn *conn, char *buffer, int bufsize); extern int PQputnbytes(PGconn *conn, const char *buffer, int nbytes); extern int PQendcopy(PGconn *conn); +extern char *PQgetDbname(PGconn *conn); /* Set blocking/nonblocking connection to the backend */ extern int PQsetnonblocking(PGconn *conn, int arg); -- 2.34.1