>From 078dcdd696604801c898decbe478e3c99fe257a6 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Mon, 19 Aug 2013 13:24:30 +0200 Subject: [PATCH 1/8] wal_decoding: Allow walsender's to connect to a specific database Extend the existing 'replication' parameter to not only allow a boolean value but also "database". If the latter is specified we connect to the database specified in 'dbname'. This is useful for future walsender commands which need database interaction, e.g. changeset extraction. --- doc/src/sgml/protocol.sgml | 24 +++++++++--- src/backend/postmaster/postmaster.c | 23 ++++++++++-- .../libpqwalreceiver/libpqwalreceiver.c | 4 +- src/backend/replication/walsender.c | 43 +++++++++++++++++++--- src/backend/utils/init/postinit.c | 5 +++ src/bin/pg_basebackup/pg_basebackup.c | 4 +- src/bin/pg_basebackup/pg_receivexlog.c | 4 +- src/bin/pg_basebackup/receivelog.c | 4 +- src/include/replication/walsender.h | 1 + 9 files changed, 89 insertions(+), 23 deletions(-) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 0b2e60e..2ea14e5 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1301,10 +1301,13 @@ To initiate streaming replication, the frontend sends the -replication parameter in the startup message. This tells the -backend to go into walsender mode, wherein a small set of replication commands -can be issued instead of SQL statements. Only the simple query protocol can be -used in walsender mode. +replication parameter in the startup message. A boolean value +of true tells the backend to go into walsender mode, wherein a +small set of replication commands can be issued instead of SQL statements. Only +the simple query protocol can be used in walsender mode. +Passing a database as the value instructs walsender to connect to +the database specified in the dbname paramter which will in future +allow some additional commands to the ones specified below to be run. The commands accepted in walsender mode are: @@ -1314,7 +1317,7 @@ The commands accepted in walsender mode are: Requests the server to identify itself. Server replies with a result - set of a single row, containing three fields: + set of a single row, containing four fields: @@ -1356,6 +1359,17 @@ The commands accepted in walsender mode are: + + + dbname + + + + Database connected to or NULL. + + + + diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 01d2618..a31b01d 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -1894,10 +1894,21 @@ retry1: port->cmdline_options = pstrdup(valptr); else if (strcmp(nameptr, "replication") == 0) { - if (!parse_bool(valptr, &am_walsender)) + /* + * Due to backward compatibility concerns replication is a + * bybrid beast which allows the value to be either a boolean + * or the string 'database'. The latter connects to a specific + * database which is e.g. required for changeset extraction. + */ + if (strcmp(valptr, "database") == 0) + { + am_walsender = true; + am_db_walsender = true; + } + else if (!parse_bool(valptr, &am_walsender)) ereport(FATAL, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("invalid value for boolean option \"replication\""))); + errmsg("invalid value for option \"replication\", legal values are false, 0, true, 1 or database"))); } else { @@ -1983,8 +1994,12 @@ retry1: if (strlen(port->user_name) >= NAMEDATALEN) port->user_name[NAMEDATALEN - 1] = '\0'; - /* Walsender is not related to a particular database */ - if (am_walsender) + /* + * Generic walsender, e.g. for streaming replication, is not connected to a + * particular database. But walsenders used for logical replication need to + * connect to a specific database. + */ + if (am_walsender && !am_db_walsender) port->database_name[0] = '\0'; /* diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 6bc0aa1..ee0f1fe 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -130,7 +130,7 @@ libpqrcv_identify_system(TimeLineID *primary_tli) "the primary server: %s", PQerrorMessage(streamConn)))); } - if (PQnfields(res) != 3 || PQntuples(res) != 1) + if (PQnfields(res) != 4 || PQntuples(res) != 1) { int ntuples = PQntuples(res); int nfields = PQnfields(res); @@ -138,7 +138,7 @@ libpqrcv_identify_system(TimeLineID *primary_tli) PQclear(res); ereport(ERROR, (errmsg("invalid response from primary server"), - errdetail("Expected 1 tuple with 3 fields, got %d tuples with %d fields.", + errdetail("Expected 1 tuple with 4 fields, got %d tuples with %d fields.", ntuples, nfields))); } primary_sysid = PQgetvalue(res, 0, 0); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index afd559d..b00a91a 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -46,7 +46,10 @@ #include "access/timeline.h" #include "access/transam.h" #include "access/xlog_internal.h" +#include "access/xact.h" + #include "catalog/pg_type.h" +#include "commands/dbcommands.h" #include "funcapi.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" @@ -89,9 +92,10 @@ WalSndCtlData *WalSndCtl = NULL; WalSnd *MyWalSnd = NULL; /* Global state */ -bool am_walsender = false; /* Am I a walsender process ? */ +bool am_walsender = false; /* Am I a walsender process? */ bool am_cascading_walsender = false; /* Am I cascading WAL to - * another standby ? */ + * another standby? */ +bool am_db_walsender = false; /* connect to database? */ /* User-settable parameters for walsender */ int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ @@ -243,10 +247,12 @@ IdentifySystem(void) char tli[11]; char xpos[MAXFNAMELEN]; XLogRecPtr logptr; + char* dbname = NULL; /* - * Reply with a result set with one row, three columns. First col is - * system ID, second is timeline ID, and third is current xlog location. + * Reply with a result set with one row, four columns. First col is system + * ID, second is timeline ID, third is current xlog location and the fourth + * contains the database name if we are connected to one. */ snprintf(sysid, sizeof(sysid), UINT64_FORMAT, @@ -265,9 +271,23 @@ IdentifySystem(void) snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr); + if (MyDatabaseId != InvalidOid) + { + MemoryContext cur = CurrentMemoryContext; + + /* syscache access needs a transaction env. */ + StartTransactionCommand(); + /* make dbname live outside TX context */ + MemoryContextSwitchTo(cur); + dbname = get_database_name(MyDatabaseId); + CommitTransactionCommand(); + /* CommitTransactionCommand switches to TopMemoryContext */ + MemoryContextSwitchTo(cur); + } + /* Send a RowDescription message */ pq_beginmessage(&buf, 'T'); - pq_sendint(&buf, 3, 2); /* 3 fields */ + pq_sendint(&buf, 4, 2); /* 4 fields */ /* first field */ pq_sendstring(&buf, "systemid"); /* col name */ @@ -295,17 +315,28 @@ IdentifySystem(void) pq_sendint(&buf, -1, 2); pq_sendint(&buf, 0, 4); pq_sendint(&buf, 0, 2); + + /* fourth field */ + pq_sendstring(&buf, "dbname"); + pq_sendint(&buf, 0, 4); + pq_sendint(&buf, 0, 2); + pq_sendint(&buf, TEXTOID, 4); + pq_sendint(&buf, -1, 2); + pq_sendint(&buf, 0, 4); + pq_sendint(&buf, 0, 2); pq_endmessage(&buf); /* Send a DataRow message */ pq_beginmessage(&buf, 'D'); - pq_sendint(&buf, 3, 2); /* # of columns */ + pq_sendint(&buf, 4, 2); /* # of columns */ pq_sendint(&buf, strlen(sysid), 4); /* col1 len */ pq_sendbytes(&buf, (char *) &sysid, strlen(sysid)); pq_sendint(&buf, strlen(tli), 4); /* col2 len */ pq_sendbytes(&buf, (char *) tli, strlen(tli)); pq_sendint(&buf, strlen(xpos), 4); /* col3 len */ pq_sendbytes(&buf, (char *) xpos, strlen(xpos)); + pq_sendint(&buf, strlen(dbname), 4); /* col4 len */ + pq_sendbytes(&buf, (char *) dbname, strlen(dbname)); pq_endmessage(&buf); } diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index 2c7f0f1..56c352c 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -725,7 +725,12 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, ereport(FATAL, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser or replication role to start walsender"))); + } + if (am_walsender && + (in_dbname == NULL || in_dbname[0] == '\0') && + dboid == InvalidOid) + { /* process any options passed in the startup packet */ if (MyProcPort != NULL) process_startup_options(MyProcPort, am_superuser); diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index a1e12a8..89e2376 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -1361,11 +1361,11 @@ BaseBackup(void) progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn)); disconnect_and_exit(1); } - if (PQntuples(res) != 1 || PQnfields(res) != 3) + if (PQntuples(res) != 1 || PQnfields(res) != 4) { fprintf(stderr, _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"), - progname, PQntuples(res), PQnfields(res), 1, 3); + progname, PQntuples(res), PQnfields(res), 1, 4); disconnect_and_exit(1); } sysidentifier = pg_strdup(PQgetvalue(res, 0, 0)); diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c index 787a395..fe8aef6 100644 --- a/src/bin/pg_basebackup/pg_receivexlog.c +++ b/src/bin/pg_basebackup/pg_receivexlog.c @@ -252,11 +252,11 @@ StreamLog(void) progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn)); disconnect_and_exit(1); } - if (PQntuples(res) != 1 || PQnfields(res) != 3) + if (PQntuples(res) != 1 || PQnfields(res) != 4) { fprintf(stderr, _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"), - progname, PQntuples(res), PQnfields(res), 1, 3); + progname, PQntuples(res), PQnfields(res), 1, 4); disconnect_and_exit(1); } servertli = atoi(PQgetvalue(res, 0, 1)); diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index d56a4d7..22a5340 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -534,11 +534,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, PQclear(res); return false; } - if (PQnfields(res) != 3 || PQntuples(res) != 1) + if (PQnfields(res) != 4 || PQntuples(res) != 1) { fprintf(stderr, _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"), - progname, PQntuples(res), PQnfields(res), 1, 3); + progname, PQntuples(res), PQnfields(res), 1, 4); PQclear(res); return false; } diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 2cc7ddf..5097235 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -19,6 +19,7 @@ /* global state */ extern bool am_walsender; extern bool am_cascading_walsender; +extern bool am_db_walsender; extern bool wake_wal_senders; /* user-settable parameters */ -- 1.8.4.21.g992c386.dirty