From 3cbbe29b3126a241ae8781fa6d0e2579caccf04a Mon Sep 17 00:00:00 2001 From: Dave Cramer Date: Fri, 14 Jun 2019 15:39:47 -0400 Subject: [PATCH 2/7] add binary column to pg_subscription bump catversion support create and alter subcription with binary option --- src/backend/catalog/system_views.sql | 2 +- src/backend/commands/subscriptioncmds.c | 39 +++++++++++++++---- .../libpqwalreceiver/libpqwalreceiver.c | 3 ++ src/backend/replication/logical/worker.c | 1 + src/include/catalog/catversion.h | 2 +- src/include/catalog/pg_subscription.h | 4 ++ src/include/replication/walreceiver.h | 1 + 7 files changed, 43 insertions(+), 9 deletions(-) diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 9fe4a4794a..a744f06f1d 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1058,7 +1058,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are readable. REVOKE ALL ON pg_subscription FROM public; -GRANT SELECT (subdbid, subname, subowner, subenabled, subslotname, subpublications) +GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, subslotname, subpublications) ON pg_subscription TO public; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 1419195766..cf645256dc 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -66,7 +66,7 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, - bool *refresh) + bool *refresh, bool *binary_given, bool *binary) { ListCell *lc; bool connect_given = false; @@ -97,6 +97,12 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, *synchronous_commit = NULL; if (refresh) *refresh = true; + if (binary) + { + *binary_given = false; + /* not all versions of pgoutput will understand this option default to false */ + *binary = false; + } /* Parse options */ foreach(lc, options) @@ -182,6 +188,11 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, refresh_given = true; *refresh = defGetBoolean(defel); } + else if (strcmp(defel->defname, "binary") == 0 && binary) + { + *binary_given = true; + *binary = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -331,8 +342,10 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) bool slotname_given; char originname[NAMEDATALEN]; bool create_slot; - List *publications; + bool binary; + bool binary_given; + List *publications; /* * Parse and check options. * @@ -341,7 +354,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) parse_subscription_options(stmt->options, &connect, &enabled_given, &enabled, &create_slot, &slotname_given, &slotname, ©_data, &synchronous_commit, - NULL); + NULL, &binary_given, &binary); /* * Since creating a replication slot is not transactional, rolling back @@ -407,6 +420,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) DirectFunctionCall1(namein, CStringGetDatum(stmt->subname)); values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner); values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); + values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (slotname) @@ -677,10 +691,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt) char *slotname; bool slotname_given; char *synchronous_commit; + bool binary_given; + bool binary; parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, &slotname_given, &slotname, - NULL, &synchronous_commit, NULL); + NULL, &synchronous_commit, NULL, + &binary_given, &binary); if (slotname_given) { @@ -705,6 +722,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt) replaces[Anum_pg_subscription_subsynccommit - 1] = true; } + if (binary_given) + { + values[Anum_pg_subscription_subbinary - 1] = + BoolGetDatum(binary); + replaces[Anum_pg_subscription_subbinary - 1] = true; + } + update_tuple = true; break; } @@ -716,7 +740,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, &enabled_given, &enabled, NULL, - NULL, NULL, NULL, NULL, NULL); + NULL, NULL, NULL, NULL, NULL, + NULL, NULL); Assert(enabled_given); if (!sub->slotname && enabled) @@ -754,7 +779,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, NULL, NULL, ©_data, - NULL, &refresh); + NULL, &refresh, NULL, NULL); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); @@ -791,7 +816,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, NULL, NULL, ©_data, - NULL, NULL); + NULL, NULL, NULL, NULL); AlterSubscription_refresh(sub, copy_data); diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 6eba08a920..9aec824a18 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -400,6 +400,7 @@ libpqrcv_startstreaming(WalReceiverConn *conn, char *pubnames_str; List *pubnames; char *pubnames_literal; + bool binary; appendStringInfoString(&cmd, " ("); @@ -421,6 +422,8 @@ libpqrcv_startstreaming(WalReceiverConn *conn, appendStringInfo(&cmd, ", publication_names %s", pubnames_literal); PQfreemem(pubnames_literal); pfree(pubnames_str); + if (options->proto.logical.binary) + appendStringInfo(&cmd, ", binary 'true'"); appendStringInfoChar(&cmd, ')'); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index cc505f8c06..6d5c0c1ce1 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1779,6 +1779,7 @@ ApplyWorkerMain(Datum main_arg) options.slotname = myslotname; options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM; options.proto.logical.publication_names = MySubscription->publications; + options.proto.logical.binary = MySubscription->binary; /* Start normal logical streaming replication. */ walrcv_startstreaming(wrconn, &options); diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 1f6de76e9c..f0b17091e5 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 201910251 +#define CATALOG_VERSION_NO 201910301 #endif diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 3cb13d897e..bb44e5e45c 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -48,6 +48,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subenabled; /* True if the subscription is enabled (the * worker should be running) */ + bool subbinary; /* True if the subscription wants the + * output plugin to send data in binary */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -73,6 +76,7 @@ typedef struct Subscription char *name; /* Name of the subscription */ Oid owner; /* Oid of the subscription owner */ bool enabled; /* Indicates if the subscription is enabled */ + bool binary; /* Indicates if the subscription wants data in binary format */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index e12a934966..641dd1f416 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -162,6 +162,7 @@ typedef struct { uint32 proto_version; /* Logical protocol version */ List *publication_names; /* String list of publications */ + bool binary; /* Ask publisher output plugin to use binary */ } logical; } proto; } WalRcvStreamOptions; -- 2.20.1 (Apple Git-117)