Re: Privileges on PUBLICATION - Mailing list pgsql-hackers

From Antonin Houska
Subject Re: Privileges on PUBLICATION
Date
Msg-id 1544.1678866130@antos
Whole thread Raw
In response to Re: Privileges on PUBLICATION  ("Gregory Stark (as CFM)" <stark.cfm@gmail.com>)
List pgsql-hackers
Gregory Stark (as CFM) <stark.cfm@gmail.com> wrote:

> FYI this looks like it needs a rebase due to a conflict in copy.c and
> an offset in pgoutput.c.
> 
> Is there anything specific that still needs review or do you think
> you've handled all Peter's concerns? In particular, is there "a
> comprehensive description of what it is trying to do"? :)

I tried to improve the documentation and commit messages in v05. v06 (just
rebased) is attached.

-- 
Antonin Houska
Web: https://www.cybertec-postgresql.com

From d4490664ec80f52d23c4345eec5771764bcdbb17 Mon Sep 17 00:00:00 2001
From: Antonin Houska <ah@cybertec.at>
Date: Wed, 15 Mar 2023 04:21:01 +0100
Subject: [PATCH 1/2] Move some code into functions.

This is only a preparation for a patch that introduces USAGE privilege on
publications. It should make the following diff a little bit easier to read.
---
 src/backend/catalog/pg_publication.c        | 236 +++++++++++++++++---
 src/backend/commands/copy.c                 |  81 +------
 src/backend/commands/copyto.c               |  89 ++++++++
 src/backend/replication/pgoutput/pgoutput.c | 139 +-----------
 src/include/catalog/pg_publication.h        |   6 +
 src/include/commands/copy.h                 |   2 +
 6 files changed, 308 insertions(+), 245 deletions(-)

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index a98fcad421..7f6024b7a5 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -1025,6 +1025,208 @@ GetPublicationByName(const char *pubname, bool missing_ok)
     return OidIsValid(oid) ? GetPublication(oid) : NULL;
 }
 
+/*
+ * Get the mapping for given publication and relation.
+ */
+void
+GetPublicationRelationMapping(Oid pubid, Oid relid,
+                              Datum *attrs, bool *attrs_isnull,
+                              Datum *qual, bool *qual_isnull)
+{
+    Publication *publication;
+    HeapTuple    pubtuple = NULL;
+    Oid            schemaid = get_rel_namespace(relid);
+
+    publication = GetPublication(pubid);
+
+    /*
+     * We don't consider row filters or column lists for FOR ALL TABLES or
+     * FOR TABLES IN SCHEMA publications.
+     */
+    if (!publication->alltables &&
+        !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
+                               ObjectIdGetDatum(schemaid),
+                               ObjectIdGetDatum(publication->oid)))
+        pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
+                                       ObjectIdGetDatum(relid),
+                                       ObjectIdGetDatum(publication->oid));
+
+    if (HeapTupleIsValid(pubtuple))
+    {
+        /* Lookup the column list attribute. */
+        *attrs = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
+                                 Anum_pg_publication_rel_prattrs,
+                                 attrs_isnull);
+
+        /* Null indicates no filter. */
+        *qual = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
+                                Anum_pg_publication_rel_prqual,
+                                qual_isnull);
+    }
+    else
+    {
+        *attrs_isnull = true;
+        *qual_isnull = true;
+    }
+}
+/*
+ * Pick those publications from a list which should actually be used to
+ * publish given relation and return them.
+ *
+ * If publish_as_relid_p is passed, the relation whose tuple descriptor should
+ * be used to publish the data is stored in *publish_as_relid_p.
+ *
+ * If pubactions is passed, update the structure according to the matching
+ * publications.
+ */
+List *
+GetEffectiveRelationPublications(Oid relid, List *publications,
+                                 Oid *publish_as_relid_p,
+                                 PublicationActions *pubactions)
+{
+    Oid            schemaId = get_rel_namespace(relid);
+    List       *pubids = GetRelationPublications(relid);
+    /*
+     * We don't acquire a lock on the namespace system table as we build the
+     * cache entry using a historic snapshot and all the later changes are
+     * absorbed while decoding WAL.
+     */
+    List       *schemaPubids = GetSchemaPublications(schemaId);
+    ListCell   *lc;
+    Oid            publish_as_relid = relid;
+    int            publish_ancestor_level = 0;
+    bool        am_partition = get_rel_relispartition(relid);
+    char        relkind = get_rel_relkind(relid);
+    List       *rel_publications = NIL;
+
+    foreach(lc, publications)
+    {
+        Publication *pub = lfirst(lc);
+        bool        publish = false;
+
+        /*
+         * Under what relid should we publish changes in this publication?
+         * We'll use the top-most relid across all publications. Also track
+         * the ancestor level for this publication.
+         */
+        Oid    pub_relid = relid;
+        int    ancestor_level = 0;
+
+        /*
+         * If this is a FOR ALL TABLES publication, pick the partition root
+         * and set the ancestor level accordingly.
+         */
+        if (pub->alltables)
+        {
+            publish = true;
+            if (pub->pubviaroot && am_partition)
+            {
+                List       *ancestors = get_partition_ancestors(relid);
+
+                pub_relid = llast_oid(ancestors);
+                ancestor_level = list_length(ancestors);
+            }
+        }
+
+        if (!publish)
+        {
+            bool        ancestor_published = false;
+
+            /*
+             * For a partition, check if any of the ancestors are published.
+             * If so, note down the topmost ancestor that is published via
+             * this publication, which will be used as the relation via which
+             * to publish the partition's changes.
+             */
+            if (am_partition)
+            {
+                Oid            ancestor;
+                int            level;
+                List       *ancestors = get_partition_ancestors(relid);
+
+                ancestor = GetTopMostAncestorInPublication(pub->oid,
+                                                           ancestors,
+                                                           &level);
+
+                if (ancestor != InvalidOid)
+                {
+                    ancestor_published = true;
+                    if (pub->pubviaroot)
+                    {
+                        pub_relid = ancestor;
+                        ancestor_level = level;
+                    }
+                }
+            }
+
+            if (list_member_oid(pubids, pub->oid) ||
+                list_member_oid(schemaPubids, pub->oid) ||
+                ancestor_published)
+                publish = true;
+        }
+
+        /*
+         * If the relation is to be published, determine actions to publish,
+         * and list of columns, if appropriate.
+         *
+         * Don't publish changes for partitioned tables, because publishing
+         * those of its partitions suffices, unless partition changes won't be
+         * published due to pubviaroot being set.
+         */
+        if (publish &&
+            (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
+        {
+            if (pubactions)
+            {
+                pubactions->pubinsert |= pub->pubactions.pubinsert;
+                pubactions->pubupdate |= pub->pubactions.pubupdate;
+                pubactions->pubdelete |= pub->pubactions.pubdelete;
+                pubactions->pubtruncate |= pub->pubactions.pubtruncate;
+            }
+
+            /*
+             * We want to publish the changes as the top-most ancestor across
+             * all publications. So we need to check if the already calculated
+             * level is higher than the new one. If yes, we can ignore the new
+             * value (as it's a child). Otherwise the new value is an
+             * ancestor, so we keep it.
+             */
+            if (publish_ancestor_level > ancestor_level)
+                continue;
+
+            /*
+             * If we found an ancestor higher up in the tree, discard the list
+             * of publications through which we replicate it, and use the new
+             * ancestor.
+             */
+            if (publish_ancestor_level < ancestor_level)
+            {
+                publish_as_relid = pub_relid;
+                publish_ancestor_level = ancestor_level;
+
+                /* reset the publication list for this relation */
+                rel_publications = NIL;
+            }
+            else
+            {
+                /* Same ancestor level, has to be the same OID. */
+                Assert(publish_as_relid == pub_relid);
+            }
+
+            /* Track publications for this ancestor. */
+            rel_publications = lappend(rel_publications, pub);
+        }
+    }
+
+    list_free(pubids);
+    list_free(schemaPubids);
+
+    if (publish_as_relid_p)
+        *publish_as_relid_p = publish_as_relid;
+
+    return rel_publications;
+}
+
 /*
  * Returns information of tables in a publication.
  */
@@ -1108,10 +1310,8 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 
     if (funcctx->call_cntr < list_length(tables))
     {
-        HeapTuple    pubtuple = NULL;
         HeapTuple    rettuple;
         Oid            relid = list_nth_oid(tables, funcctx->call_cntr);
-        Oid            schemaid = get_rel_namespace(relid);
         Datum        values[NUM_PUBLICATION_TABLES_ELEM] = {0};
         bool        nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
 
@@ -1123,35 +1323,9 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 
         values[0] = ObjectIdGetDatum(relid);
 
-        /*
-         * We don't consider row filters or column lists for FOR ALL TABLES or
-         * FOR TABLES IN SCHEMA publications.
-         */
-        if (!publication->alltables &&
-            !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
-                                   ObjectIdGetDatum(schemaid),
-                                   ObjectIdGetDatum(publication->oid)))
-            pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
-                                           ObjectIdGetDatum(relid),
-                                           ObjectIdGetDatum(publication->oid));
-
-        if (HeapTupleIsValid(pubtuple))
-        {
-            /* Lookup the column list attribute. */
-            values[1] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
-                                        Anum_pg_publication_rel_prattrs,
-                                        &(nulls[1]));
-
-            /* Null indicates no filter. */
-            values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
-                                        Anum_pg_publication_rel_prqual,
-                                        &(nulls[2]));
-        }
-        else
-        {
-            nulls[1] = true;
-            nulls[2] = true;
-        }
+        GetPublicationRelationMapping(publication->oid, relid,
+                                      &values[1], &nulls[1],
+                                      &values[2], &nulls[2]);
 
         /* Show all columns when the column list is not specified. */
         if (nulls[1] == true)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 167d31a2d9..8edc2c19f6 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -177,92 +177,13 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
          */
         if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
         {
-            SelectStmt *select;
-            ColumnRef  *cr;
-            ResTarget  *target;
-            RangeVar   *from;
-            List       *targetList = NIL;
-
             if (is_from)
                 ereport(ERROR,
                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                          errmsg("COPY FROM not supported with row-level security"),
                          errhint("Use INSERT statements instead.")));
 
-            /*
-             * Build target list
-             *
-             * If no columns are specified in the attribute list of the COPY
-             * command, then the target list is 'all' columns. Therefore, '*'
-             * should be used as the target list for the resulting SELECT
-             * statement.
-             *
-             * In the case that columns are specified in the attribute list,
-             * create a ColumnRef and ResTarget for each column and add them
-             * to the target list for the resulting SELECT statement.
-             */
-            if (!stmt->attlist)
-            {
-                cr = makeNode(ColumnRef);
-                cr->fields = list_make1(makeNode(A_Star));
-                cr->location = -1;
-
-                target = makeNode(ResTarget);
-                target->name = NULL;
-                target->indirection = NIL;
-                target->val = (Node *) cr;
-                target->location = -1;
-
-                targetList = list_make1(target);
-            }
-            else
-            {
-                ListCell   *lc;
-
-                foreach(lc, stmt->attlist)
-                {
-                    /*
-                     * Build the ColumnRef for each column.  The ColumnRef
-                     * 'fields' property is a String node that corresponds to
-                     * the column name respectively.
-                     */
-                    cr = makeNode(ColumnRef);
-                    cr->fields = list_make1(lfirst(lc));
-                    cr->location = -1;
-
-                    /* Build the ResTarget and add the ColumnRef to it. */
-                    target = makeNode(ResTarget);
-                    target->name = NULL;
-                    target->indirection = NIL;
-                    target->val = (Node *) cr;
-                    target->location = -1;
-
-                    /* Add each column to the SELECT statement's target list */
-                    targetList = lappend(targetList, target);
-                }
-            }
-
-            /*
-             * Build RangeVar for from clause, fully qualified based on the
-             * relation which we have opened and locked.  Use "ONLY" so that
-             * COPY retrieves rows from only the target table not any
-             * inheritance children, the same as when RLS doesn't apply.
-             */
-            from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
-                                pstrdup(RelationGetRelationName(rel)),
-                                -1);
-            from->inh = false;    /* apply ONLY */
-
-            /* Build query */
-            select = makeNode(SelectStmt);
-            select->targetList = targetList;
-            select->fromClause = list_make1(from);
-
-            query = makeNode(RawStmt);
-            query->stmt = (Node *) select;
-            query->stmt_location = stmt_location;
-            query->stmt_len = stmt_len;
-
+            query = CreateCopyToQuery(stmt, rel, stmt_location, stmt_len);
             /*
              * Close the relation for now, but keep the lock on it to prevent
              * changes between now and when we start the query-based COPY.
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index beea1ac687..af0cdef158 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -32,6 +32,7 @@
 #include "libpq/pqformat.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
+#include "nodes/makefuncs.h"
 #include "optimizer/optimizer.h"
 #include "pgstat.h"
 #include "rewrite/rewriteHandler.h"
@@ -339,6 +340,94 @@ EndCopy(CopyToState cstate)
     pfree(cstate);
 }
 
+/*
+ * Turn "COPY table_name TO" form into "COPY (query) TO".
+ */
+RawStmt *
+CreateCopyToQuery(const CopyStmt *stmt, Relation rel, int stmt_location,
+                  int stmt_len)
+{
+    SelectStmt *select;
+    ColumnRef  *cr;
+    ResTarget  *target;
+    RangeVar   *from;
+    List       *targetList = NIL;
+    RawStmt    *query = NULL;
+
+    /*
+     * Build target list
+     *
+     * If no columns are specified in the attribute list of the COPY command,
+     * then the target list is 'all' columns. Therefore, '*' should be used as
+     * the target list for the resulting SELECT statement.
+     *
+     * In the case that columns are specified in the attribute list, create a
+     * ColumnRef and ResTarget for each column and add them to the target list
+     * for the resulting SELECT statement.
+     */
+    if (!stmt->attlist)
+    {
+        cr = makeNode(ColumnRef);
+        cr->fields = list_make1(makeNode(A_Star));
+        cr->location = -1;
+
+        target = makeNode(ResTarget);
+        target->name = NULL;
+        target->indirection = NIL;
+        target->val = (Node *) cr;
+        target->location = -1;
+
+        targetList = list_make1(target);
+    }
+    else
+    {
+        ListCell   *lc;
+
+        foreach(lc, stmt->attlist)
+        {
+            /*
+             * Build the ColumnRef for each column.  The ColumnRef 'fields'
+             * property is a String node that corresponds to the column name
+             * respectively.
+             */
+            cr = makeNode(ColumnRef);
+            cr->fields = list_make1(lfirst(lc));
+            cr->location = -1;
+
+            /* Build the ResTarget and add the ColumnRef to it. */
+            target = makeNode(ResTarget);
+            target->name = NULL;
+            target->indirection = NIL;
+            target->val = (Node *) cr;
+            target->location = -1;
+
+            /* Add each column to the SELECT statement's target list */
+            targetList = lappend(targetList, target);
+        }
+    }
+
+    /*
+     * Build RangeVar for from clause, fully qualified based on the relation
+     * which we have opened and locked.
+     */
+    from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
+                        pstrdup(RelationGetRelationName(rel)),
+                        -1);
+    from->inh = false;    /* apply ONLY */
+
+    /* Build query */
+    select = makeNode(SelectStmt);
+    select->targetList = targetList;
+    select->fromClause = list_make1(from);
+
+    query = makeNode(RawStmt);
+    query->stmt = (Node *) select;
+    query->stmt_location = stmt_location;
+    query->stmt_len = stmt_len;
+
+    return query;
+}
+
 /*
  * Setup CopyToState to read tuples from a table or a query for COPY TO.
  *
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 00a2d73dab..21b8b2944e 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -2063,21 +2063,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
     /* Validate the entry */
     if (!entry->replicate_valid)
     {
-        Oid            schemaId = get_rel_namespace(relid);
-        List       *pubids = GetRelationPublications(relid);
-
-        /*
-         * We don't acquire a lock on the namespace system table as we build
-         * the cache entry using a historic snapshot and all the later changes
-         * are absorbed while decoding WAL.
-         */
-        List       *schemaPubids = GetSchemaPublications(schemaId);
-        ListCell   *lc;
-        Oid            publish_as_relid = relid;
-        int            publish_ancestor_level = 0;
-        bool        am_partition = get_rel_relispartition(relid);
-        char        relkind = get_rel_relkind(relid);
-        List       *rel_publications = NIL;
+        List    *rel_publications;
 
         /* Reload publications if needed before use. */
         if (!publications_valid)
@@ -2140,123 +2126,10 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
          * but here we only need to consider ones that the subscriber
          * requested.
          */
-        foreach(lc, data->publications)
-        {
-            Publication *pub = lfirst(lc);
-            bool        publish = false;
-
-            /*
-             * Under what relid should we publish changes in this publication?
-             * We'll use the top-most relid across all publications. Also
-             * track the ancestor level for this publication.
-             */
-            Oid            pub_relid = relid;
-            int            ancestor_level = 0;
-
-            /*
-             * If this is a FOR ALL TABLES publication, pick the partition
-             * root and set the ancestor level accordingly.
-             */
-            if (pub->alltables)
-            {
-                publish = true;
-                if (pub->pubviaroot && am_partition)
-                {
-                    List       *ancestors = get_partition_ancestors(relid);
-
-                    pub_relid = llast_oid(ancestors);
-                    ancestor_level = list_length(ancestors);
-                }
-            }
-
-            if (!publish)
-            {
-                bool        ancestor_published = false;
-
-                /*
-                 * For a partition, check if any of the ancestors are
-                 * published.  If so, note down the topmost ancestor that is
-                 * published via this publication, which will be used as the
-                 * relation via which to publish the partition's changes.
-                 */
-                if (am_partition)
-                {
-                    Oid            ancestor;
-                    int            level;
-                    List       *ancestors = get_partition_ancestors(relid);
-
-                    ancestor = GetTopMostAncestorInPublication(pub->oid,
-                                                               ancestors,
-                                                               &level);
-
-                    if (ancestor != InvalidOid)
-                    {
-                        ancestor_published = true;
-                        if (pub->pubviaroot)
-                        {
-                            pub_relid = ancestor;
-                            ancestor_level = level;
-                        }
-                    }
-                }
-
-                if (list_member_oid(pubids, pub->oid) ||
-                    list_member_oid(schemaPubids, pub->oid) ||
-                    ancestor_published)
-                    publish = true;
-            }
-
-            /*
-             * If the relation is to be published, determine actions to
-             * publish, and list of columns, if appropriate.
-             *
-             * Don't publish changes for partitioned tables, because
-             * publishing those of its partitions suffices, unless partition
-             * changes won't be published due to pubviaroot being set.
-             */
-            if (publish &&
-                (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
-            {
-                entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
-                entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
-                entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
-                entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
-
-                /*
-                 * We want to publish the changes as the top-most ancestor
-                 * across all publications. So we need to check if the already
-                 * calculated level is higher than the new one. If yes, we can
-                 * ignore the new value (as it's a child). Otherwise the new
-                 * value is an ancestor, so we keep it.
-                 */
-                if (publish_ancestor_level > ancestor_level)
-                    continue;
-
-                /*
-                 * If we found an ancestor higher up in the tree, discard the
-                 * list of publications through which we replicate it, and use
-                 * the new ancestor.
-                 */
-                if (publish_ancestor_level < ancestor_level)
-                {
-                    publish_as_relid = pub_relid;
-                    publish_ancestor_level = ancestor_level;
-
-                    /* reset the publication list for this relation */
-                    rel_publications = NIL;
-                }
-                else
-                {
-                    /* Same ancestor level, has to be the same OID. */
-                    Assert(publish_as_relid == pub_relid);
-                }
-
-                /* Track publications for this ancestor. */
-                rel_publications = lappend(rel_publications, pub);
-            }
-        }
-
-        entry->publish_as_relid = publish_as_relid;
+        rel_publications = GetEffectiveRelationPublications(relid,
+                                                            data->publications,
+                                                            &entry->publish_as_relid,
+                                                            &entry->pubactions);
 
         /*
          * Initialize the tuple slot, map, and row filter. These are only used
@@ -2275,8 +2148,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
             pgoutput_column_list_init(data, rel_publications, entry);
         }
 
-        list_free(pubids);
-        list_free(schemaPubids);
         list_free(rel_publications);
 
         entry->replicate_valid = true;
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 6ecaa2a01e..dab5bc8444 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -113,6 +113,12 @@ typedef struct PublicationRelInfo
 extern Publication *GetPublication(Oid pubid);
 extern Publication *GetPublicationByName(const char *pubname, bool missing_ok);
 extern List *GetRelationPublications(Oid relid);
+extern void GetPublicationRelationMapping(Oid pubid, Oid relid,
+                                          Datum *attrs, bool *attrs_isnull,
+                                          Datum *qual, bool *qual_isnull);
+extern List *GetEffectiveRelationPublications(Oid relid, List *publications,
+                                              Oid *publish_as_relid_p,
+                                              PublicationActions *pubactions);
 
 /*---------
  * Expected values for pub_partopt parameter of GetRelationPublications(),
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 33175868f6..774b835251 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -92,6 +92,8 @@ extern DestReceiver *CreateCopyDestReceiver(void);
 /*
  * internal prototypes
  */
+extern RawStmt *CreateCopyToQuery(const CopyStmt *stmt, Relation rel,
+                                  int stmt_location, int stmt_len);
 extern CopyToState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *raw_query,
                                Oid queryRelId, const char *filename, bool is_program,
                                copy_data_dest_cb data_dest_cb, List *attnamelist, List *options);
-- 
2.31.1

From 281272f99527ab53547d4d6a6ce71d2d5bfe7b14 Mon Sep 17 00:00:00 2001
From: Antonin Houska <ah@cybertec.at>
Date: Wed, 15 Mar 2023 04:21:01 +0100
Subject: [PATCH 2/2] Implement the USAGE privilege on PUBLICATION.

Publication row filters and column lists can be used to prevent subsets of
data from being replicated via the logical replication system. These features
can address performance issues, but currently should not be used for security
purposes, such as hiding sensitive data from the subscribers. The problem is
that any subscriber can get the data from any publication, so if the sensitive
data is deliberately not published via one publication, it can still be
available via another one (supposedly created for another subscriber).

This patch adds an ACL column to the pg_publication catalog, implements the
corresponding checks and enhances the GRANT and REVOKE commands to grant and
revoke the USAGE privilege on publication to / from roles. The USAGE privilege
is initially granted to the PUBLIC group (so that existing configurations
don't get broken) but the user can revoke it and grant it only to individual
subscription users (i.e. users mentioned in the subscription connection
configuration). Thus the publisher instance can reject to send data that given
subscriber is not supposed to receive.

Obviously, the publication privileges are checked on the publisher side,
otherwise the implementation wouldn't be secure. The output plugin
(pgoutput.c) is the easy part because it already does receive the list of
publications whose data it should send to the subscriber. The initial table
synchronization is a little bit tricky because so far the "tablesync worker"
(running on the subscriber side) was responsible for constructing the SQL
query for the COPY TO command, which is executed on the publisher side.

This patch adds a new option PUBLICATION_NAMES to the COPY TO command. The
subscriber uses it to pass a list of publications to the publisher. The
publisher checks if the subscription user has the USAGE privilege on each
publication, retrieves the corresponding data (i.e. rows matching the row
filters of the publications) and sends it to the subscriber.

Since the publisher and subscriber instances can be on different major
versions of postgres, and since old subscribers cannot send the publication
names during the initial table synchronization, a new configuration variable
"publication_security" was added. The default value is "off", meaning that the
publisher does not require the COPY TO command to contain the
PUBLICATION_NAMES option. If the option is passed yet, the publisher does not
check the privileges on the listed publications, but it does perform row
filtering according to the publication filters. Thus upgrade of the publisher
instance does not break anything.

Once all the subscribers have migrated to the postgres version that supports
this feature, this variable should be set to "on". At that moment the
publisher starts to require the presence of the PUBLICATION_NAMES option in
the COPY TO command, as long as the COPY TO is executed by a role which has
the REPLICATION privilege. (Role w/o the REPLICATION privilege aren't
currently allowed to use the PUBLICATION_NAMES option.)
---
 doc/src/sgml/catalogs.sgml                    |   9 +
 doc/src/sgml/config.sgml                      |  28 ++
 doc/src/sgml/ddl.sgml                         |  14 +
 doc/src/sgml/logical-replication.sgml         |  72 +--
 doc/src/sgml/ref/copy.sgml                    |  36 ++
 doc/src/sgml/ref/grant.sgml                   |   9 +-
 src/backend/catalog/aclchk.c                  |  22 +
 src/backend/catalog/namespace.c               |  34 +-
 src/backend/catalog/objectaddress.c           |   2 +-
 src/backend/catalog/pg_publication.c          |  20 +-
 src/backend/commands/copy.c                   | 168 ++++++-
 src/backend/commands/copyto.c                 | 218 ++++++++-
 src/backend/commands/publicationcmds.c        |   2 +
 src/backend/executor/execMain.c               |   4 +-
 src/backend/parser/gram.y                     |   8 +
 src/backend/replication/logical/tablesync.c   |  91 ++--
 src/backend/replication/pgoutput/pgoutput.c   |   3 +
 src/backend/replication/walsender.c           |   6 +
 src/backend/utils/adt/acl.c                   |  51 +++
 src/backend/utils/misc/guc_tables.c           |  12 +
 src/backend/utils/misc/postgresql.conf.sample |   6 +
 src/bin/pg_dump/dumputils.c                   |   2 +
 src/bin/pg_dump/pg_dump.c                     |  47 +-
 src/bin/pg_dump/pg_dump.h                     |   1 +
 src/bin/psql/describe.c                       |  11 +
 src/bin/psql/tab-complete.c                   |   3 +
 src/include/catalog/pg_proc.dat               |   3 +
 src/include/catalog/pg_publication.h          |  10 +
 src/include/commands/copy.h                   |   5 +-
 src/include/replication/logicalproto.h        |   1 +
 src/include/utils/acl.h                       |   1 +
 src/include/utils/guc_tables.h                |   1 +
 .../test_copy_callbacks/test_copy_callbacks.c |   2 +-
 src/test/regress/expected/copy.out            |  52 +++
 src/test/regress/expected/publication.out     | 424 ++++++++++--------
 src/test/regress/sql/copy.sql                 |  36 ++
 src/test/regress/sql/publication.sql          |  28 ++
 src/test/subscription/t/027_nosuperuser.pl    |  58 ++-
 38 files changed, 1197 insertions(+), 303 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 746baf5053..c5baafceef 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -6361,6 +6361,15 @@ SCRAM-SHA-256$<replaceable><iteration count></replaceable>:<replaceable>&l
        publication instead of its own.
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>pubacl</structfield> <type>aclitem[]</type>
+      </para>
+      <para>
+       Access privileges; see <xref linkend="ddl-priv"/> for details
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index e5c41cc6c6..b9caae4423 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4958,6 +4958,34 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
      </variablelist>
     </sect2>
 
+    <sect2 id="runtime-config-replication-publisher">
+     <title>Publishers</title>
+
+     <para>
+      These settings control the behavior of a logical replication publisher.
+      Their values on the subscriber are irrelevant.
+     </para>
+
+     <variablelist>
+
+     <varlistentry id="guc-publication-security" xreflabel="publication_security">
+      <term><varname>publication_security</varname> (<type>boolean</type>)
+       <indexterm>
+        <primary><varname>publication_security</varname> configuration parameter</primary>
+        <secondary>in a publisher</secondary>
+       </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Specifies whether the publisher should check the publication
+        privileges before it sends data to the subscriber.  See
+        <xref linkend="logical-replication-security"/> for more details.
+       </para>
+      </listitem>
+     </varlistentry>
+     </variablelist>
+    </sect2>
+
     <sect2 id="runtime-config-replication-subscriber">
      <title>Subscribers</title>
 
diff --git a/doc/src/sgml/ddl.sgml b/doc/src/sgml/ddl.sgml
index 5179125510..9a71790678 100644
--- a/doc/src/sgml/ddl.sgml
+++ b/doc/src/sgml/ddl.sgml
@@ -1963,6 +1963,13 @@ REVOKE ALL ON accounts FROM PUBLIC;
        statements that have previously performed this lookup, so this is not
        a completely secure way to prevent object access.
       </para>
+      <para>
+       For publications, allows logical replication via particular
+       publication. The user specified in
+       the <link linkend="sql-createsubscription"><command>CREATE
+       SUBSCRIPTION</command></link> command must have this privilege on all
+       publications listed in that command.
+      </para>
       <para>
        For sequences, allows use of the
        <function>currval</function> and <function>nextval</function> functions.
@@ -2156,6 +2163,7 @@ REVOKE ALL ON accounts FROM PUBLIC;
        <literal>FOREIGN DATA WRAPPER</literal>,
        <literal>FOREIGN SERVER</literal>,
        <literal>LANGUAGE</literal>,
+       <literal>PUBLICATION</literal>,
        <literal>SCHEMA</literal>,
        <literal>SEQUENCE</literal>,
        <literal>TYPE</literal>
@@ -2252,6 +2260,12 @@ REVOKE ALL ON accounts FROM PUBLIC;
       <entry>none</entry>
       <entry><literal>\dconfig+</literal></entry>
      </row>
+     <row>
+      <entry><literal>PUBLICATION</literal></entry>
+      <entry><literal>U</literal></entry>
+      <entry>U</entry>
+      <entry><literal>\dRp+</literal></entry>
+     </row>
      <row>
       <entry><literal>SCHEMA</literal></entry>
       <entry><literal>UC</literal></entry>
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 1bd5660c87..64774e68cd 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -898,26 +898,26 @@ CREATE PUBLICATION
     <command>psql</command> can be used to show the row filter expressions (if
     defined) for each publication.
 <programlisting>
-test_pub=# \dRp+
+  test_pub=# \dRp+
                                Publication p1
-  Owner   | All tables | Inserts | Updates | Deletes | Truncates | Via root
-----------+------------+---------+---------+---------+-----------+----------
- postgres | f          | t       | t       | t       | t         | f
+  Owner   | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges
+----------+------------+---------+---------+---------+-----------+------------------------------
+ postgres | f          | t       | t       | t       | t         | f        |
 Tables:
     "public.t1" WHERE ((a > 5) AND (c = 'NSW'::text))
 
                                Publication p2
-  Owner   | All tables | Inserts | Updates | Deletes | Truncates | Via root
-----------+------------+---------+---------+---------+-----------+----------
- postgres | f          | t       | t       | t       | t         | f
+  Owner   | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges
+----------+------------+---------+---------+---------+-----------+------------------------------
+ postgres | f          | t       | t       | t       | t         | f        |
 Tables:
     "public.t1"
     "public.t2" WHERE (e = 99)
 
                                Publication p3
-  Owner   | All tables | Inserts | Updates | Deletes | Truncates | Via root
-----------+------------+---------+---------+---------+-----------+----------
- postgres | f          | t       | t       | t       | t         | f
+  Owner   | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges
+----------+------------+---------+---------+---------+-----------+------------------------------
+ postgres | f          | t       | t       | t       | t         | f        |
 Tables:
     "public.t2" WHERE (d = 10)
     "public.t3" WHERE (g = 10)
@@ -1259,10 +1259,11 @@ test_sub=# SELECT * FROM child ORDER BY a;
 
   <para>
    The choice of columns can be based on behavioral or performance reasons.
-   However, do not rely on this feature for security: a malicious subscriber
-   is able to obtain data from columns that are not specifically
-   published.  If security is a consideration, protections can be applied
-   at the publisher side.
+   However, if you want to use this feature for security, please consider
+   using the privileges on publication, as explained in
+   <xref linkend="logical-replication-security"/>. Otherwise a malicious
+   subscriber may be able to use other publications to obtain data from
+   columns that are not specifically published via your publication.
   </para>
 
   <para>
@@ -1360,9 +1361,9 @@ CREATE PUBLICATION
 <programlisting>
 test_pub=# \dRp+
                                Publication p1
-  Owner   | All tables | Inserts | Updates | Deletes | Truncates | Via root
-----------+------------+---------+---------+---------+-----------+----------
- postgres | f          | t       | t       | t       | t         | f
+  Owner   | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges
+----------+------------+---------+---------+---------+-----------+------------------------------
+ postgres | f          | t       | t       | t       | t         | f        |
 Tables:
     "public.t1" (id, a, b, d)
 </programlisting></para>
@@ -1724,12 +1725,6 @@ CONTEXT:  processing remote data for replication origin "pg_16395" during "INSER
    and it must have the <literal>LOGIN</literal> attribute.
   </para>
 
-  <para>
-   In order to be able to copy the initial table data, the role used for the
-   replication connection must have the <literal>SELECT</literal> privilege on
-   a published table (or be a superuser).
-  </para>
-
   <para>
    To create a publication, the user must have the <literal>CREATE</literal>
    privilege in the database.
@@ -1743,16 +1738,25 @@ CONTEXT:  processing remote data for replication origin "pg_16395" during "INSER
   </para>
 
   <para>
-   There are currently no privileges on publications.  Any subscription (that
-   is able to connect) can access any publication.  Thus, if you intend to
-   hide some information from particular subscribers, such as by using row
-   filters or column lists, or by not adding the whole table to the
-   publication, be aware that other publications in the same database could
-   expose the same information.  Publication privileges might be added to
-   <productname>PostgreSQL</productname> in the future to allow for
-   finer-grained access control.
+   To replicate data, the role used for the replication connection must have
+   the <literal>USAGE</literal> privilege on the publication. In such a case,
+   the subscription role needs neither the <literal>SELECT</literal>
+   privileges on the replicated tables nor the <literal>USAGE</literal>
+   privilege on the containing schemas.
   </para>
 
+  <note>
+   <para>
+    The <literal>USAGE</literal> privilege on publication is only checked if
+    the <link linkend="guc-publication-security"><varname>publication_security</varname></link>
+    configuration parameter is set. The default is <literal>off</literal>. It
+    should only be set to <literal>on</literal> if all the subscribers are
+    on <productname>PostgreSQL</productname> server version 16 or later. The
+    older versions do not send the publication names for the initial table
+    synchronization, so they would fail to receive the data.
+   </para>
+  </note>
+
   <para>
    To create a subscription, the user must be a superuser.
   </para>
@@ -1812,6 +1816,12 @@ CONTEXT:  processing remote data for replication origin "pg_16395" during "INSER
     <link linkend="guc-wal-sender-timeout"><varname>wal_sender_timeout</varname></link>.
    </para>
 
+   <para>
+    <link linkend="guc-publication-security"><varname>publication_security</varname></link>
+    must be set to <literal>on</literal> if the publisher is supposed to check
+    the publication privileges.
+   </para>
+
   </sect2>
 
   <sect2 id="logical-replication-config-subscriber">
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index 5e591ed2e6..3bc199e701 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -44,6 +44,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     FORCE_NULL ( <replaceable class="parameter">column_name</replaceable> [, ...] )
     ENCODING '<replaceable class="parameter">encoding_name</replaceable>'
     DEFAULT '<replaceable class="parameter">default_string</replaceable>'
+    PUBLICATION_NAMES ( <replaceable class="parameter">publication_name</replaceable> [, ...] )
 </synopsis>
  </refsynopsisdiv>
 
@@ -382,6 +383,41 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><replaceable class="parameter">publication_name</replaceable></term>
+    <listitem>
+     <para>
+      The name of an
+      existing <link linkend="logical-replication-publication">publication</link>.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>PUBLICATION_NAMES</literal></term>
+    <listitem>
+     <para>
+      Specifies a list of publications. Only rows that match the
+      <link linkend="logical-replication-row-filter">row filter</link> of at
+      least one the publications are copied. If at least one publication in
+      the list has no row filter, the whole table contents will be copied.
+     </para>
+     <para>
+      If
+      the <link linkend="guc-publication-security">publication_security</link>
+      configuration parameter is <literal>on</literal>, the list is required.
+      and the user needs to have the <literal>USAGE</literal> privilege on all
+      the publications in the list which are actually used to retrieve the
+      data from given table.
+     </para>
+     <para>
+      This option is allowed only in <command>COPY TO</command>. Currently,
+      only the users with the <literal>REPLICATION</literal> privilege can use
+      this option.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>WHERE</literal></term>
     <listitem>
diff --git a/doc/src/sgml/ref/grant.sgml b/doc/src/sgml/ref/grant.sgml
index 35bf0332c8..329a4f9023 100644
--- a/doc/src/sgml/ref/grant.sgml
+++ b/doc/src/sgml/ref/grant.sgml
@@ -82,6 +82,11 @@ GRANT { { SET | ALTER SYSTEM } [, ... ] | ALL [ PRIVILEGES ] }
     TO <replaceable class="parameter">role_specification</replaceable> [, ...] [ WITH GRANT OPTION ]
     [ GRANTED BY <replaceable class="parameter">role_specification</replaceable> ]
 
+GRANT { USAGE [, ... ] | ALL [ PRIVILEGES ] }
+    ON PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...]
+    TO <replaceable class="parameter">role_specification</replaceable> [, ...] [ WITH GRANT OPTION ]
+    [ GRANTED BY <replaceable class="parameter">role_specification</replaceable> ]
+
 GRANT { { CREATE | USAGE } [, ...] | ALL [ PRIVILEGES ] }
     ON SCHEMA <replaceable>schema_name</replaceable> [, ...]
     TO <replaceable class="parameter">role_specification</replaceable> [, ...] [ WITH GRANT OPTION ]
@@ -513,8 +518,8 @@ GRANT admins TO joe;
    </para>
 
    <para>
-    Privileges on databases, tablespaces, schemas, languages, and
-    configuration parameters are
+    Privileges on databases, tablespaces, schemas, languages, configuration
+    parameters and publications are
     <productname>PostgreSQL</productname> extensions.
    </para>
  </refsect1>
diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c
index c4232344aa..b7dc203859 100644
--- a/src/backend/catalog/aclchk.c
+++ b/src/backend/catalog/aclchk.c
@@ -253,6 +253,9 @@ restrict_and_check_grant(bool is_grant, AclMode avail_goptions, bool all_privs,
         case OBJECT_FUNCTION:
             whole_mask = ACL_ALL_RIGHTS_FUNCTION;
             break;
+        case OBJECT_PUBLICATION:
+            whole_mask = ACL_ALL_RIGHTS_PUBLICATION;
+            break;
         case OBJECT_LANGUAGE:
             whole_mask = ACL_ALL_RIGHTS_LANGUAGE;
             break;
@@ -485,6 +488,10 @@ ExecuteGrantStmt(GrantStmt *stmt)
             all_privileges = ACL_ALL_RIGHTS_FUNCTION;
             errormsg = gettext_noop("invalid privilege type %s for function");
             break;
+        case OBJECT_PUBLICATION:
+            all_privileges = ACL_ALL_RIGHTS_PUBLICATION;
+            errormsg = gettext_noop("invalid privilege type %s for publication");
+            break;
         case OBJECT_LANGUAGE:
             all_privileges = ACL_ALL_RIGHTS_LANGUAGE;
             errormsg = gettext_noop("invalid privilege type %s for language");
@@ -621,6 +628,9 @@ ExecGrantStmt_oids(InternalGrant *istmt)
         case OBJECT_LARGEOBJECT:
             ExecGrant_Largeobject(istmt);
             break;
+        case OBJECT_PUBLICATION:
+            ExecGrant_common(istmt, PublicationRelationId, ACL_ALL_RIGHTS_PUBLICATION, NULL);
+            break;
         case OBJECT_SCHEMA:
             ExecGrant_common(istmt, NamespaceRelationId, ACL_ALL_RIGHTS_SCHEMA, NULL);
             break;
@@ -731,6 +741,16 @@ objectNamesToOids(ObjectType objtype, List *objnames, bool is_grant)
                 objects = lappend_oid(objects, lobjOid);
             }
             break;
+        case OBJECT_PUBLICATION:
+            foreach(cell, objnames)
+            {
+                char       *nspname = strVal(lfirst(cell));
+                Oid            oid;
+
+                oid = get_publication_oid(nspname, false);
+                objects = lappend_oid(objects, oid);
+            }
+            break;
         case OBJECT_SCHEMA:
             foreach(cell, objnames)
             {
@@ -3023,6 +3043,8 @@ pg_aclmask(ObjectType objtype, Oid object_oid, AttrNumber attnum, Oid roleid,
             return object_aclmask(DatabaseRelationId, object_oid, roleid, mask, how);
         case OBJECT_FUNCTION:
             return object_aclmask(ProcedureRelationId, object_oid, roleid, mask, how);
+        case OBJECT_PUBLICATION:
+            return object_aclmask(PublicationRelationId, object_oid, roleid, mask, how);
         case OBJECT_LANGUAGE:
             return object_aclmask(LanguageRelationId, object_oid, roleid, mask, how);
         case OBJECT_LARGEOBJECT:
diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c
index 14e57adee2..d76b052059 100644
--- a/src/backend/catalog/namespace.c
+++ b/src/backend/catalog/namespace.c
@@ -2936,7 +2936,6 @@ Oid
 LookupExplicitNamespace(const char *nspname, bool missing_ok)
 {
     Oid            namespaceId;
-    AclResult    aclresult;
 
     /* check for pg_temp alias */
     if (strcmp(nspname, "pg_temp") == 0)
@@ -2955,10 +2954,20 @@ LookupExplicitNamespace(const char *nspname, bool missing_ok)
     if (missing_ok && !OidIsValid(namespaceId))
         return InvalidOid;
 
-    aclresult = object_aclcheck(NamespaceRelationId, namespaceId, GetUserId(), ACL_USAGE);
-    if (aclresult != ACLCHECK_OK)
-        aclcheck_error(aclresult, OBJECT_SCHEMA,
-                       nspname);
+    /*
+     * If the publication security is active, bypass the standard security
+     * checks.
+     */
+    if (!publication_security)
+    {
+        AclResult    aclresult;
+
+        aclresult = object_aclcheck(NamespaceRelationId, namespaceId, GetUserId(),
+                                    ACL_USAGE);
+        if (aclresult != ACLCHECK_OK)
+            aclcheck_error(aclresult, OBJECT_SCHEMA,
+                           nspname);
+    }
     /* Schema search hook for this lookup */
     InvokeNamespaceSearchHook(namespaceId, true);
 
@@ -3835,10 +3844,16 @@ recomputeNamespacePath(void)
                 rname = NameStr(((Form_pg_authid) GETSTRUCT(tuple))->rolname);
                 namespaceId = get_namespace_oid(rname, true);
                 ReleaseSysCache(tuple);
+
+                /*
+                 * If the publication security is active, bypass the standard
+                 * security checks.
+                 */
                 if (OidIsValid(namespaceId) &&
                     !list_member_oid(oidlist, namespaceId) &&
-                    object_aclcheck(NamespaceRelationId, namespaceId, roleid,
-                                          ACL_USAGE) == ACLCHECK_OK &&
+                    (publication_security ||
+                     object_aclcheck(NamespaceRelationId, namespaceId, roleid,
+                                     ACL_USAGE) == ACLCHECK_OK) &&
                     InvokeNamespaceSearchHook(namespaceId, false))
                     oidlist = lappend_oid(oidlist, namespaceId);
             }
@@ -3865,8 +3880,9 @@ recomputeNamespacePath(void)
             namespaceId = get_namespace_oid(curname, true);
             if (OidIsValid(namespaceId) &&
                 !list_member_oid(oidlist, namespaceId) &&
-                object_aclcheck(NamespaceRelationId, namespaceId, roleid,
-                                      ACL_USAGE) == ACLCHECK_OK &&
+                (publication_security ||
+                 object_aclcheck(NamespaceRelationId, namespaceId, roleid,
+                                 ACL_USAGE) == ACLCHECK_OK) &&
                 InvokeNamespaceSearchHook(namespaceId, false))
                 oidlist = lappend_oid(oidlist, namespaceId);
         }
diff --git a/src/backend/catalog/objectaddress.c b/src/backend/catalog/objectaddress.c
index 2f688166e1..31e7599111 100644
--- a/src/backend/catalog/objectaddress.c
+++ b/src/backend/catalog/objectaddress.c
@@ -587,7 +587,7 @@ static const ObjectPropertyType ObjectProperty[] =
         Anum_pg_publication_pubname,
         InvalidAttrNumber,
         Anum_pg_publication_pubowner,
-        InvalidAttrNumber,
+        Anum_pg_publication_pubacl,
         OBJECT_PUBLICATION,
         true
     },
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 7f6024b7a5..93793b1fa4 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -1069,9 +1069,11 @@ GetPublicationRelationMapping(Oid pubid, Oid relid,
         *qual_isnull = true;
     }
 }
+
 /*
  * Pick those publications from a list which should actually be used to
- * publish given relation and return them.
+ * publish given relation, check their USAGE privilege is needed and return
+ * them.
  *
  * If publish_as_relid_p is passed, the relation whose tuple descriptor should
  * be used to publish the data is stored in *publish_as_relid_p.
@@ -1165,6 +1167,22 @@ GetEffectiveRelationPublications(Oid relid, List *publications,
                 publish = true;
         }
 
+        /*
+         * Check privileges before we use any information of the
+         * publication.
+         */
+        if (publication_security && publish)
+        {
+            Oid            roleid = GetUserId();
+            AclResult    aclresult;
+
+            aclresult = object_aclcheck(PublicationRelationId, pub->oid,
+                                        roleid, ACL_USAGE);
+            if (aclresult != ACLCHECK_OK)
+                aclcheck_error(aclresult, OBJECT_PUBLICATION,
+                               get_publication_name(pub->oid, false));
+        }
+
         /*
          * If the relation is to be published, determine actions to publish,
          * and list of columns, if appropriate.
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 8edc2c19f6..6504a27771 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -41,6 +41,8 @@
 #include "utils/rel.h"
 #include "utils/rls.h"
 
+static bool isReplicationUser(void);
+
 /*
  *     DoCopy executes the SQL COPY statement
  *
@@ -71,6 +73,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
     Oid            relid;
     RawStmt    *query = NULL;
     Node       *whereClause = NULL;
+    List        *publication_names = NIL;
 
     /*
      * Disallow COPY to/from file or program except to users with the
@@ -105,14 +108,23 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
         }
     }
 
+    /*
+     * It seems more useful to tell the user immediately that something is
+     * wrong about the use of the PUBLICATION_NAMES option than to complain
+     * about missing SELECT privilege below: whoever is authorized to use this
+     * option shouldn't need the SELECT privilege at all. Therefore check the
+     * PUBLICATION_NAMES option earlier than the other options.  XXX Shouldn't
+     * we check all the options here anyway?
+     */
+    publication_names = ProcessCopyToPublicationOptions(pstate,
+                                                        stmt->options,
+                                                        stmt->is_from);
+
     if (stmt->relation)
     {
         LOCKMODE    lockmode = is_from ? RowExclusiveLock : AccessShareLock;
         ParseNamespaceItem *nsitem;
         RTEPermissionInfo *perminfo;
-        TupleDesc    tupDesc;
-        List       *attnums;
-        ListCell   *cur;
 
         Assert(!stmt->query);
 
@@ -127,6 +139,14 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
         perminfo = nsitem->p_perminfo;
         perminfo->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT);
 
+        /*
+         * The access by a replication user is controlled by the publication
+         * privileges, ACL_SELECT is not required. The actual checks of the
+         * publication privileges will take place later.
+         */
+        if (!is_from && publication_security)
+            perminfo->requiredPerms &= ~ACL_SELECT;
+
         if (stmt->whereClause)
         {
             /* add nsitem to query namespace */
@@ -147,19 +167,31 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
             whereClause = (Node *) make_ands_implicit((Expr *) whereClause);
         }
 
-        tupDesc = RelationGetDescr(rel);
-        attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
-        foreach(cur, attnums)
+        /*
+         * If publication row filters need to be applied, the query form of
+         * COPY TO is used, so the permissions will be checked by the
+         * executor. Otherwise check the permissions now.
+         */
+        if (publication_names == NIL)
         {
-            int            attno;
-            Bitmapset **bms;
+            TupleDesc    tupDesc;
+            List       *attnums;
+            ListCell   *cur;
 
-            attno = lfirst_int(cur) - FirstLowInvalidHeapAttributeNumber;
-            bms = is_from ? &perminfo->insertedCols : &perminfo->selectedCols;
+            tupDesc = RelationGetDescr(rel);
+            attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
+            foreach(cur, attnums)
+            {
+                int            attno;
+                Bitmapset **bms;
 
-            *bms = bms_add_member(*bms, attno);
+                attno = lfirst_int(cur) - FirstLowInvalidHeapAttributeNumber;
+                bms = is_from ? &perminfo->insertedCols : &perminfo->selectedCols;
+
+                *bms = bms_add_member(*bms, attno);
+            }
+            ExecCheckPermissions(pstate->p_rtable, list_make1(perminfo), true);
         }
-        ExecCheckPermissions(pstate->p_rtable, list_make1(perminfo), true);
 
         /*
          * Permission check for row security policies.
@@ -184,6 +216,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
                          errhint("Use INSERT statements instead.")));
 
             query = CreateCopyToQuery(stmt, rel, stmt_location, stmt_len);
+
             /*
              * Close the relation for now, but keep the lock on it to prevent
              * changes between now and when we start the query-based COPY.
@@ -232,10 +265,24 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
     else
     {
         CopyToState cstate;
+        Relation    rel_loc = rel;
 
-        cstate = BeginCopyTo(pstate, rel, query, relid,
+        /*
+         * If publication row filters need to be applied, use the "COPY query
+         * TO ..."  form of the command.
+         */
+        if (rel && publication_names)
+        {
+            query = CreateCopyToQuery(stmt, rel, stmt_location, stmt_len);
+
+            /* BeginCopyTo() should only receive the query.  */
+            rel_loc = NULL;
+        }
+
+        cstate = BeginCopyTo(pstate, rel_loc, query, relid,
                              stmt->filename, stmt->is_program,
-                             NULL, stmt->attlist, stmt->options);
+                             NULL, stmt->attlist, stmt->options,
+                             publication_names);
         *processed = DoCopyTo(cstate);    /* copy from database to file */
         EndCopyTo(cstate);
     }
@@ -482,6 +529,13 @@ ProcessCopyOptions(ParseState *pstate,
                                 defel->defname),
                          parser_errposition(pstate, defel->location)));
         }
+        else if (strcmp(defel->defname, "publication_names") == 0)
+        {
+            /*
+             * ProcessCopyToPublicationOptions() should have been checked this
+             * already.
+             */
+        }
         else
             ereport(ERROR,
                     (errcode(ERRCODE_SYNTAX_ERROR),
@@ -679,6 +733,78 @@ ProcessCopyOptions(ParseState *pstate,
     }
 }
 
+/*
+ * Check the PUBLICATION_NAMES option of the "COPY TO" command.
+ *
+ * This option is checked separate from others.
+ */
+List *
+ProcessCopyToPublicationOptions(ParseState *pstate, List *options,
+                                bool is_from)
+{
+    ListCell   *option;
+    bool    found = false;
+    List    *result = NIL;
+
+    /* Extract options from the statement node tree */
+    foreach(option, options)
+    {
+        DefElem    *defel = lfirst_node(DefElem, option);
+
+        if (strcmp(defel->defname, "publication_names") == 0)
+        {
+            if (is_from)
+                ereport(ERROR,
+                        errmsg("PUBLICATION_NAMES option only available using COPY TO"));
+
+            if (result)
+                errorConflictingDefElem(defel, pstate);
+            found = true;
+            if (defel->arg == NULL || IsA(defel->arg, List))
+                result = castNode(List, defel->arg);
+            else
+                ereport(ERROR,
+                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                         errmsg("argument to option \"%s\" must be a list of publication names",
+                                defel->defname),
+                         parser_errposition(pstate, defel->location)));
+        }
+    }
+
+    /*
+     * If the publication security is enabled, subscriber must send the list
+     * of publication in order to tell which subset of the data it is
+     * authorized to receive.
+     *
+     * publication_security does not affect sessions of non-replication users.
+     */
+    if (!found && publication_security && isReplicationUser())
+    {
+        /*
+         * This probably means that an old version of subscriber tries to get
+         * data from a secured publisher.
+         */
+        ereport(ERROR,
+                (errmsg("publication security requires the PUBLICATION_NAMES option")));
+    }
+
+    /*
+     * The option does only make sense in the context of (logical)
+     * replication. We could allow it for non-replication users too, but then
+     * we'd have to require it publication_security is on like above and thus
+     * break existing client code.
+     */
+    if (found && !isReplicationUser())
+        ereport(ERROR,
+                (errmsg("PUBLICATION_NAMES may only be used by roles with the REPLICATION privilege")));
+
+    if (found && result == NIL)
+        ereport(ERROR,
+                (errmsg("the value of the PUBLICATION_NAMES option must not be empty")));
+
+    return result;
+}
+
 /*
  * CopyGetAttnums - build an integer list of attnums to be copied
  *
@@ -769,3 +895,17 @@ CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
 
     return attnums;
 }
+
+/*
+ * Check whether the current session can use the USAGE privilege on
+ * publications instead of the SELECT privileges on tables.
+ *
+ * Superuser makes the test pass too so that subscriptions which connect to
+ * the publisher as superuser work fine.
+ */
+static bool
+isReplicationUser(void)
+{
+    return has_rolreplication(GetUserId()) || superuser();
+
+}
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index af0cdef158..fd508b592f 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -34,13 +34,18 @@
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "optimizer/optimizer.h"
+#include "parser/parsetree.h"
+#include "parser/parse_relation.h"
 #include "pgstat.h"
 #include "rewrite/rewriteHandler.h"
+#include "rewrite/rewriteManip.h"
 #include "storage/fd.h"
 #include "tcop/tcopprot.h"
+#include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/partcache.h"
+#include "utils/acl.h"
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
 
@@ -132,6 +137,10 @@ static void CopySendEndOfRow(CopyToState cstate);
 static void CopySendInt32(CopyToState cstate, int32 val);
 static void CopySendInt16(CopyToState cstate, int16 val);
 
+static void AddPublicationFiltersToQuery(CopyToState cstate, Query *query,
+                                         List *publication_names);
+static Node *GetPublicationFilters(Relation rel, List *publications,
+                                   int varno);
 
 /*
  * Send copy start/stop messages for frontend copies.  These have changed
@@ -439,6 +448,7 @@ CreateCopyToQuery(const CopyStmt *stmt, Relation rel, int stmt_location,
  * 'data_dest_cb': Callback that processes the output data
  * 'attnamelist': List of char *, columns to include. NIL selects all cols.
  * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
+ * 'publication_names': PUBLICATION_NAMES option (also contained in 'options')
  *
  * Returns a CopyToState, to be passed to DoCopyTo() and related functions.
  */
@@ -451,7 +461,8 @@ BeginCopyTo(ParseState *pstate,
             bool is_program,
             copy_data_dest_cb data_dest_cb,
             List *attnamelist,
-            List *options)
+            List *options,
+            List *publication_names)
 {
     CopyToState cstate;
     bool        pipe = (filename == NULL && data_dest_cb == NULL);
@@ -606,6 +617,12 @@ BeginCopyTo(ParseState *pstate,
                      errmsg("COPY query must have a RETURNING clause")));
         }
 
+        /*
+         * If the subscriber passed the publication names, use them.
+         */
+        if (publication_names)
+            AddPublicationFiltersToQuery(cstate, query, publication_names);
+
         /* plan the query */
         plan = pg_plan_query(query, pstate->p_sourcetext,
                              CURSOR_OPT_PARALLEL_OK, NULL);
@@ -1376,3 +1393,202 @@ CreateCopyDestReceiver(void)
 
     return (DestReceiver *) self;
 }
+
+/*
+ * For each table in the query add the row filters of the related publication
+ * to the WHERE clause. While doing so, check if the current user has the
+ * USAGE privilege on the publications.
+ */
+static void
+AddPublicationFiltersToQuery(CopyToState cstate, Query *query,
+                             List *publication_names)
+{
+    List    *publications = NIL;
+    Index rtindex;
+    FromExpr   *from_expr;
+    ListCell    *lc;
+
+    Assert(publication_names);
+
+    /* Convert the list of names to a list of OIDs. */
+    foreach(lc, publication_names)
+    {
+        char    *pubname = strVal(lfirst(lc));
+        Oid        pubid;
+        Publication    *pub;
+
+        pubid = get_publication_oid(pubname, true);
+        if (pubid == InvalidOid)
+        {
+            ereport(WARNING,
+                (errcode(ERRCODE_UNDEFINED_OBJECT),
+                 errmsg("publication \"%s\" does not exist", pubname)));
+            continue;
+        }
+
+        pub = GetPublication(pubid);
+
+        publications = lappend(publications, pub);
+    }
+
+    if (publications == NIL)
+        ereport(ERROR, errmsg("no valid publication received"));
+
+    /*
+     * If the query references at least one table, construct or adjust the
+     * WHERE clause according to the publications.
+     */
+    from_expr = query->jointree;
+
+    rtindex = 1;
+    foreach(lc, query->rtable)
+    {
+        RangeTblEntry *rte;
+        Relation    qrel;
+        List    *pubs_matched;
+        Node    *quals;
+
+        rte = lfirst_node(RangeTblEntry, lc);
+
+        /*
+         * NoLock because the relation should already be locked due to the
+         * prior rewriting.
+         */
+        qrel = relation_open(rte->relid, NoLock);
+
+        /*
+         * Clear ACL_SELECT on each RTE entry if the ACL_USAGE permission on
+         * publications should control the access, see below.
+         */
+        if (publication_security)
+        {
+            RTEPermissionInfo *perminfo;
+
+            perminfo = getRTEPermissionInfo(query->rteperminfos, rte);
+            perminfo->requiredPerms &= ~ACL_SELECT;
+        }
+
+        /*
+         * Retrieve the publications relevant to this relation, and if needed,
+         * check if the current user has the USAGE privilege on them.
+         */
+        pubs_matched = GetEffectiveRelationPublications(RelationGetRelid(qrel),
+                                                        publications, NULL, NULL);
+        if (pubs_matched == NIL)
+            ereport(ERROR,
+                    (errmsg("no publication for relation \"%s\"",
+                            get_rel_name(RelationGetRelid(qrel)))));
+
+        /* Range table implies there should be a FROM list. */
+        Assert(from_expr && from_expr->fromlist);
+
+        /*
+         * Use the publication filters to construct the (additional) filter
+         * expression for this relation.
+         */
+        quals = GetPublicationFilters(qrel, pubs_matched, rtindex);
+        if (quals)
+        {
+            if (from_expr->quals == NULL)
+            {
+                /* Assign a new WHERE clause to the query. */
+                from_expr->quals = quals;
+            }
+            else
+            {
+                List    *new_quals;
+
+                /*
+                 * AND the filter for this relation to the existing WHERE
+                 * clause.
+                 */
+                new_quals = list_make2(quals, from_expr->quals);
+                from_expr->quals = (Node *) make_andclause(new_quals);
+            }
+        }
+
+        list_free(pubs_matched);
+        relation_close(qrel, NoLock);
+        rtindex++;
+    }
+}
+
+/*
+ * Construct WHERE clause for a relation according to the given list of
+ * publications.
+ *
+ * Return NULL if at least one of the publications has no filter.
+ */
+static Node *
+GetPublicationFilters(Relation rel, List *publications, int varno)
+{
+    Oid        relid = RelationGetRelid(rel);
+    List       *filters = NIL;
+    Node       *result = NULL;
+    ListCell   *lc;
+    bool        isvarlena;
+    FmgrInfo    fmgrinfo;
+    Oid            outfunc;
+
+    Assert(publications);
+
+    /* Make sure we're ready call the output function for the node values. */
+    getTypeOutputInfo(PG_NODE_TREEOID, &outfunc, &isvarlena);
+    Assert(isvarlena);
+    fmgr_info(outfunc, &fmgrinfo);
+
+    /* Retrieve the publication filters. */
+    foreach(lc, publications)
+    {
+        Publication        *pub = (Publication *) lfirst(lc);
+        Datum    attrs, qual;
+        bool    attrs_isnull, qual_isnull;
+        char       *nodeStr;
+        Node       *node;
+
+        /* Get the filter expression. */
+        GetPublicationRelationMapping(pub->oid, relid, &attrs, &attrs_isnull,
+                                      &qual, &qual_isnull);
+
+        /*
+         * A single publication w/o expression means that the whole table
+         * should be published.
+         */
+        if (qual_isnull)
+        {
+            if (filters)
+            {
+                list_free_deep(filters);
+                filters = NIL;
+            }
+
+            break;
+        }
+
+        /* Get the filter expression and add it to the list. */
+        nodeStr = OutputFunctionCall(&fmgrinfo, qual);
+        node = stringToNode(nodeStr);
+        pfree(nodeStr);
+
+        /*
+         * Adjust varno so that the expression references the correct
+         * range table entry.
+         */
+        ChangeVarNodes(node, 1, varno, 0);
+
+        /*
+         * XXX Is it worth checking for duplicate expressions in the list?
+         */
+        filters = lappend(filters, node);
+    }
+
+    if (filters)
+    {
+        if (list_length(filters) > 1)
+            result = (Node *) make_orclause(filters);
+        else
+            result = (Node *) linitial(filters);
+    }
+
+    return result;
+}
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index f4ba572697..d9652604c7 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -800,6 +800,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
         BoolGetDatum(pubactions.pubtruncate);
     values[Anum_pg_publication_pubviaroot - 1] =
         BoolGetDatum(publish_via_partition_root);
+    values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
+    nulls[Anum_pg_publication_pubacl - 1] = true;
 
     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index b32f419176..663bfc034c 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -613,7 +613,9 @@ ExecCheckOneRelPerms(RTEPermissionInfo *perminfo)
     Oid            relOid = perminfo->relid;
 
     requiredPerms = perminfo->requiredPerms;
-    Assert(requiredPerms != 0);
+
+    if (requiredPerms == 0)
+        return true;
 
     /*
      * userid to check as: current user unless we have a setuid indication.
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index a0138382a1..fc70aa2057 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -7655,6 +7655,14 @@ privilege_target:
                     n->objs = $2;
                     $$ = n;
                 }
+            | PUBLICATION name_list
+                {
+                    PrivTarget *n = (PrivTarget *) palloc(sizeof(PrivTarget));
+                    n->targtype = ACL_TARGET_OBJECT;
+                    n->objtype = OBJECT_PUBLICATION;
+                    n->objs = $2;
+                    $$ = n;
+                }
             | SCHEMA name_list
                 {
                     PrivTarget *n = (PrivTarget *) palloc(sizeof(PrivTarget));
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 07eea504ba..2a8cfc5be0 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -753,6 +753,27 @@ copy_read_data(void *outbuf, int minread, int maxread)
 }
 
 
+/*
+ * Return a comma-separated list of publications associated with the current
+ * subscriptions.
+ */
+static char *
+get_publication_names(void)
+{
+    StringInfoData buf;
+    ListCell   *lc;
+
+    initStringInfo(&buf);
+    foreach(lc, MySubscription->publications)
+    {
+        if (foreach_current_index(lc) > 0)
+            appendStringInfoString(&buf, ", ");
+        appendStringInfoString(&buf, quote_literal_cstr(strVal(lfirst(lc))));
+    }
+
+    return buf.data;
+}
+
 /*
  * Get information about remote relation in similar fashion the RELATION
  * message provides during replication. This function also returns the relation
@@ -770,7 +791,6 @@ fetch_remote_table_info(char *nspname, char *relname,
     Oid            qualRow[] = {TEXTOID};
     bool        isnull;
     int            natt;
-    ListCell   *lc;
     Bitmapset  *included_cols = NULL;
 
     lrel->nspname = nspname;
@@ -812,7 +832,6 @@ fetch_remote_table_info(char *nspname, char *relname,
     ExecDropSingleTupleTableSlot(slot);
     walrcv_clear_result(res);
 
-
     /*
      * Get column lists for each relation.
      *
@@ -824,15 +843,7 @@ fetch_remote_table_info(char *nspname, char *relname,
         WalRcvExecResult *pubres;
         TupleTableSlot *tslot;
         Oid            attrsRow[] = {INT2VECTOROID};
-        StringInfoData pub_names;
-
-        initStringInfo(&pub_names);
-        foreach(lc, MySubscription->publications)
-        {
-            if (foreach_current_index(lc) > 0)
-                appendStringInfoString(&pub_names, ", ");
-            appendStringInfoString(&pub_names, quote_literal_cstr(strVal(lfirst(lc))));
-        }
+        char       *pub_names = get_publication_names();
 
         /*
          * Fetch info about column lists for the relation (from all the
@@ -849,7 +860,7 @@ fetch_remote_table_info(char *nspname, char *relname,
                          " WHERE gpt.relid = %u AND c.oid = gpt.relid"
                          "   AND p.pubname IN ( %s )",
                          lrel->remoteid,
-                         pub_names.data);
+                         pub_names);
 
         pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
                              lengthof(attrsRow), attrsRow);
@@ -904,8 +915,7 @@ fetch_remote_table_info(char *nspname, char *relname,
         ExecDropSingleTupleTableSlot(tslot);
 
         walrcv_clear_result(pubres);
-
-        pfree(pub_names.data);
+        pfree(pub_names);
     }
 
     /*
@@ -986,6 +996,18 @@ fetch_remote_table_info(char *nspname, char *relname,
 
     walrcv_clear_result(res);
 
+    lrel->pubnames = NULL;
+    if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000)
+    {
+        /*
+         * If the publication ACL is implemented, the publisher is responsible
+         * for checking. All we need to do is to pass the publication names.
+         * The publisher should only return the data matching these
+         * publications and only check the ACLs of these.
+         */
+        lrel->pubnames = get_publication_names();
+    }
+
     /*
      * Get relation's row filter expressions. DISTINCT avoids the same
      * expression of a table in multiple publications from being included
@@ -1005,21 +1027,9 @@ fetch_remote_table_info(char *nspname, char *relname,
      * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
      * that includes this relation
      */
-    if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+    else if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
     {
-        StringInfoData pub_names;
-
-        /* Build the pubname list. */
-        initStringInfo(&pub_names);
-        foreach(lc, MySubscription->publications)
-        {
-            char       *pubname = strVal(lfirst(lc));
-
-            if (foreach_current_index(lc) > 0)
-                appendStringInfoString(&pub_names, ", ");
-
-            appendStringInfoString(&pub_names, quote_literal_cstr(pubname));
-        }
+        char       *pub_names = get_publication_names();
 
         /* Check for row filters. */
         resetStringInfo(&cmd);
@@ -1030,7 +1040,7 @@ fetch_remote_table_info(char *nspname, char *relname,
                          " WHERE gpt.relid = %u"
                          "   AND p.pubname IN ( %s )",
                          lrel->remoteid,
-                         pub_names.data);
+                         pub_names);
 
         res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
 
@@ -1069,6 +1079,7 @@ fetch_remote_table_info(char *nspname, char *relname,
         ExecDropSingleTupleTableSlot(slot);
 
         walrcv_clear_result(res);
+        pfree(pub_names);
     }
 
     pfree(cmd.data);
@@ -1105,7 +1116,12 @@ copy_table(Relation rel)
     /* Start copy on the publisher. */
     initStringInfo(&cmd);
 
-    /* Regular table with no row filter */
+    /*
+     * Regular table with no row filter.
+     *
+     * Note that "qual" can also be NIL due to the fact the publisher is
+     * supposed to handle the row filters, so that we didn't check them here.
+     */
     if (lrel.relkind == RELKIND_RELATION && qual == NIL)
     {
         appendStringInfo(&cmd, "COPY %s (",
@@ -1122,8 +1138,6 @@ copy_table(Relation rel)
 
             appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
         }
-
-        appendStringInfoString(&cmd, ") TO STDOUT");
     }
     else
     {
@@ -1165,9 +1179,20 @@ copy_table(Relation rel)
             }
             list_free_deep(qual);
         }
+    }
+
+    appendStringInfoString(&cmd, ") TO STDOUT");
 
-        appendStringInfoString(&cmd, ") TO STDOUT");
+    if (lrel.pubnames)
+    {
+        /*
+         * Tell the publisher which publications we are interested in.
+         * Publishers of recent versions do need this information to construct
+         * the query filter and to check publication privileges.
+         */
+        appendStringInfo(&cmd, " (PUBLICATION_NAMES (%s)) ", lrel.pubnames);
     }
+
     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
     pfree(cmd.data);
     if (res->status != WALRCV_OK_COPY_OUT)
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 21b8b2944e..6c2c5add1c 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -14,6 +14,7 @@
 
 #include "access/tupconvert.h"
 #include "catalog/partition.h"
+#include "catalog/pg_authid.h"
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
 #include "catalog/pg_subscription.h"
@@ -21,6 +22,7 @@
 #include "commands/subscriptioncmds.h"
 #include "executor/executor.h"
 #include "fmgr.h"
+#include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "optimizer/optimizer.h"
 #include "parser/parse_relation.h"
@@ -28,6 +30,7 @@
 #include "replication/logicalproto.h"
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
+#include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 75e8363e24..694a9828bb 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -125,6 +125,12 @@ int            wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
                                              * data message */
 bool        log_replication_commands = false;
 
+/*
+ * Should USAGE privilege on publications be checked? Defaults to false so
+ * that server upgrade does not break existing logical replication.
+ */
+bool        publication_security = false;
+
 /*
  * State for WalSndWakeupRequest
  */
diff --git a/src/backend/utils/adt/acl.c b/src/backend/utils/adt/acl.c
index 8f7522d103..8c318676e1 100644
--- a/src/backend/utils/adt/acl.c
+++ b/src/backend/utils/adt/acl.c
@@ -29,6 +29,7 @@
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_parameter_acl.h"
 #include "catalog/pg_proc.h"
+#include "catalog/pg_publication.h"
 #include "catalog/pg_tablespace.h"
 #include "catalog/pg_type.h"
 #include "commands/dbcommands.h"
@@ -118,6 +119,7 @@ static AclMode convert_tablespace_priv_string(text *priv_type_text);
 static Oid    convert_type_name(text *typename);
 static AclMode convert_type_priv_string(text *priv_type_text);
 static AclMode convert_parameter_priv_string(text *priv_text);
+static AclMode convert_publication_priv_string(text *priv_type_text);
 static AclMode convert_role_priv_string(text *priv_type_text);
 static AclResult pg_role_aclcheck(Oid role_oid, Oid roleid, AclMode mode);
 
@@ -844,6 +846,10 @@ acldefault(ObjectType objtype, Oid ownerId)
             world_default = ACL_NO_RIGHTS;
             owner_default = ACL_ALL_RIGHTS_PARAMETER_ACL;
             break;
+        case OBJECT_PUBLICATION:
+            world_default = ACL_USAGE;
+            owner_default = ACL_ALL_RIGHTS_PUBLICATION;
+            break;
         default:
             elog(ERROR, "unrecognized object type: %d", (int) objtype);
             world_default = ACL_NO_RIGHTS;    /* keep compiler quiet */
@@ -929,6 +935,9 @@ acldefault_sql(PG_FUNCTION_ARGS)
         case 'p':
             objtype = OBJECT_PARAMETER_ACL;
             break;
+        case 'P':
+            objtype = OBJECT_PUBLICATION;
+            break;
         case 't':
             objtype = OBJECT_TABLESPACE;
             break;
@@ -4558,6 +4567,48 @@ convert_parameter_priv_string(text *priv_text)
     return convert_any_priv_string(priv_text, parameter_priv_map);
 }
 
+/*
+ * has_publication_privilege_id
+ *        Check user privileges on a publication given
+ *        publication oid and text priv name.
+ *        current_user is assumed
+ */
+Datum
+has_publication_privilege_id(PG_FUNCTION_ARGS)
+{
+    Oid            puboid = PG_GETARG_OID(0);
+    text       *priv_type_text = PG_GETARG_TEXT_PP(1);
+    Oid            roleid;
+    AclMode        mode;
+    AclResult    aclresult;
+
+    roleid = GetUserId();
+    mode = convert_publication_priv_string(priv_type_text);
+
+    if (!SearchSysCacheExists1(PUBLICATIONOID, ObjectIdGetDatum(puboid)))
+        PG_RETURN_NULL();
+
+    aclresult = object_aclcheck(PublicationRelationId, puboid, roleid, mode);
+
+    PG_RETURN_BOOL(aclresult == ACLCHECK_OK);
+}
+
+/*
+ * convert_publication_priv_string
+ *        Convert text string to AclMode value.
+ */
+static AclMode
+convert_publication_priv_string(text *priv_type_text)
+{
+    static const priv_map type_priv_map[] = {
+        {"USAGE", ACL_USAGE},
+        {"USAGE WITH GRANT OPTION", ACL_GRANT_OPTION_FOR(ACL_USAGE)},
+        {NULL, 0}
+    };
+
+    return convert_any_priv_string(priv_type_text, type_priv_map);
+}
+
 /*
  * pg_has_role variants
  *        These are all named "pg_has_role" at the SQL level.
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 1c0583fe26..90ef11eef9 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -686,6 +686,8 @@ const char *const config_group_names[] =
     gettext_noop("Replication / Primary Server"),
     /* REPLICATION_STANDBY */
     gettext_noop("Replication / Standby Servers"),
+    /* REPLICATION_PUBLISHERS */
+    gettext_noop("Replication / Publishers"),
     /* REPLICATION_SUBSCRIBERS */
     gettext_noop("Replication / Subscribers"),
     /* QUERY_TUNING_METHOD */
@@ -1973,6 +1975,16 @@ struct config_bool ConfigureNamesBool[] =
         NULL, NULL, NULL
     },
 
+    {
+        {"publication_security", PGC_SUSET, REPLICATION_PUBLISHERS,
+            gettext_noop("Enable publication security."),
+            gettext_noop("When enabled, the USAGE privilege is needed to access publications.")
+        },
+        &publication_security,
+        false,
+        NULL, NULL, NULL
+    },
+
     /* End-of-list marker */
     {
         {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index d06074b86f..720b7157c2 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -353,6 +353,12 @@
                     # retrieve WAL after a failed attempt
 #recovery_min_apply_delay = 0        # minimum delay for applying changes during recovery
 
+# - Publishers -
+
+# These settings are ignored on a subscriber.
+
+#publication_security = off        # should publication privileges be checked?
+
 # - Subscribers -
 
 # These settings are ignored on a publisher.
diff --git a/src/bin/pg_dump/dumputils.c b/src/bin/pg_dump/dumputils.c
index 079693585c..4b0d1b5d27 100644
--- a/src/bin/pg_dump/dumputils.c
+++ b/src/bin/pg_dump/dumputils.c
@@ -511,6 +511,8 @@ do { \
         CONVERT_PRIV('r', "SELECT");
         CONVERT_PRIV('w', "UPDATE");
     }
+    else if (strcmp(type, "PUBLICATION") == 0)
+        CONVERT_PRIV('U', "USAGE");
     else
         abort();
 
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 2e068c6620..a14c8738ef 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4025,6 +4025,8 @@ getPublications(Archive *fout, int *numPublications)
     int            i_pubdelete;
     int            i_pubtruncate;
     int            i_pubviaroot;
+    int            i_pubacl;
+    int            i_acldefault;
     int            i,
                 ntups;
 
@@ -4039,27 +4041,32 @@ getPublications(Archive *fout, int *numPublications)
     resetPQExpBuffer(query);
 
     /* Get the publications. */
-    if (fout->remoteVersion >= 130000)
+    if (fout->remoteVersion >= 150000)
         appendPQExpBufferStr(query,
-                             "SELECT p.tableoid, p.oid, p.pubname, "
-                             "p.pubowner, "
-                             "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot "
-                             "FROM pg_publication p");
+                          "SELECT p.tableoid, p.oid, p.pubname, "
+                          "p.pubowner, "
+                          "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot,
p.pubacl,acldefault('P', p.pubowner) AS acldefault "
 
+                          "FROM pg_publication p");
+    else if (fout->remoteVersion >= 130000)
+        appendPQExpBuffer(query,
+                          "SELECT p.tableoid, p.oid, p.pubname, "
+                          "p.pubowner, "
+                          "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot, '{}' AS
pubacl,'{}' AS acldefault "
 
+                          "FROM pg_publication p");
     else if (fout->remoteVersion >= 110000)
         appendPQExpBufferStr(query,
-                             "SELECT p.tableoid, p.oid, p.pubname, "
-                             "p.pubowner, "
-                             "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS
pubviaroot" 
-                             "FROM pg_publication p");
+                          "SELECT p.tableoid, p.oid, p.pubname, "
+                          "p.pubowner, "
+                          "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot,
'{}'AS pubacl, '{}' AS acldefault "
 
+                          "FROM pg_publication p");
     else
         appendPQExpBufferStr(query,
-                             "SELECT p.tableoid, p.oid, p.pubname, "
-                             "p.pubowner, "
-                             "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS
pubviaroot"
 
-                             "FROM pg_publication p");
+                          "SELECT p.tableoid, p.oid, p.pubname, "
+                          "p.pubowner, "
+                          "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS
pubviaroot,'{}' AS pubacl, '{}' AS acldefault "
 
+                          "FROM pg_publication p");
 
     res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
-
     ntups = PQntuples(res);
 
     i_tableoid = PQfnumber(res, "tableoid");
@@ -4072,6 +4079,8 @@ getPublications(Archive *fout, int *numPublications)
     i_pubdelete = PQfnumber(res, "pubdelete");
     i_pubtruncate = PQfnumber(res, "pubtruncate");
     i_pubviaroot = PQfnumber(res, "pubviaroot");
+    i_pubacl = PQfnumber(res, "pubacl");
+    i_acldefault = PQfnumber(res, "acldefault");
 
     pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
 
@@ -4096,6 +4105,11 @@ getPublications(Archive *fout, int *numPublications)
             (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0);
         pubinfo[i].pubviaroot =
             (strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0);
+        pubinfo[i].dacl.acl = pg_strdup(PQgetvalue(res, i, i_pubacl));
+        pubinfo[i].dacl.acldefault = pg_strdup(PQgetvalue(res, i, i_acldefault));
+        pubinfo[i].dacl.privtype = 0;
+        pubinfo[i].dacl.initprivs = NULL;
+        pubinfo[i].dobj.components |= DUMP_COMPONENT_ACL;
 
         /* Decide whether we want to dump it */
         selectDumpableObject(&(pubinfo[i].dobj), fout);
@@ -4199,6 +4213,11 @@ dumpPublication(Archive *fout, const PublicationInfo *pubinfo)
                      NULL, pubinfo->rolname,
                      pubinfo->dobj.catId, 0, pubinfo->dobj.dumpId);
 
+    if (pubinfo->dobj.dump & DUMP_COMPONENT_ACL)
+        dumpACL(fout, pubinfo->dobj.dumpId, InvalidDumpId, "PUBLICATION",
+                pg_strdup(fmtId(pubinfo->dobj.name)), NULL, NULL,
+                pubinfo->rolname, &pubinfo->dacl);
+
     destroyPQExpBuffer(delq);
     destroyPQExpBuffer(query);
     free(qpubname);
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index cdca0b993d..36e9a00cbf 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -612,6 +612,7 @@ typedef struct _policyInfo
 typedef struct _PublicationInfo
 {
     DumpableObject dobj;
+    DumpableAcl dacl;
     const char *rolname;
     bool        puballtables;
     bool        pubinsert;
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 99e28f607e..8746a6ed73 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6308,6 +6308,7 @@ describePublications(const char *pattern)
     PGresult   *res;
     bool        has_pubtruncate;
     bool        has_pubviaroot;
+    bool        has_pubacl;
 
     PQExpBufferData title;
     printTableContent cont;
@@ -6324,6 +6325,7 @@ describePublications(const char *pattern)
 
     has_pubtruncate = (pset.sversion >= 110000);
     has_pubviaroot = (pset.sversion >= 130000);
+    has_pubacl = (pset.sversion >= 160000);
 
     initPQExpBuffer(&buf);
 
@@ -6337,6 +6339,9 @@ describePublications(const char *pattern)
     if (has_pubviaroot)
         appendPQExpBufferStr(&buf,
                              ", pubviaroot");
+    if (has_pubacl)
+        appendPQExpBufferStr(&buf,
+                            ", pubacl");
     appendPQExpBufferStr(&buf,
                          "\nFROM pg_catalog.pg_publication\n");
 
@@ -6388,6 +6393,8 @@ describePublications(const char *pattern)
             ncols++;
         if (has_pubviaroot)
             ncols++;
+        if (has_pubacl)
+            ncols++;
 
         initPQExpBuffer(&title);
         printfPQExpBuffer(&title, _("Publication %s"), pubname);
@@ -6402,6 +6409,8 @@ describePublications(const char *pattern)
             printTableAddHeader(&cont, gettext_noop("Truncates"), true, align);
         if (has_pubviaroot)
             printTableAddHeader(&cont, gettext_noop("Via root"), true, align);
+        if (has_pubacl)
+            printTableAddHeader(&cont, gettext_noop("Access privileges"), true, align);
 
         printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false);
         printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false);
@@ -6412,6 +6421,8 @@ describePublications(const char *pattern)
             printTableAddCell(&cont, PQgetvalue(res, i, 7), false, false);
         if (has_pubviaroot)
             printTableAddCell(&cont, PQgetvalue(res, i, 8), false, false);
+        if (has_pubacl)
+            printTableAddCell(&cont, PQgetvalue(res, i, 9), false, false);
 
         if (!puballtables)
         {
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 42e87b9e49..768db694d8 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -3940,6 +3940,7 @@ psql_completion(const char *text, int start, int end)
                                             "LARGE OBJECT",
                                             "PARAMETER",
                                             "PROCEDURE",
+                                            "PUBLICATION",
                                             "ROUTINE",
                                             "SCHEMA",
                                             "SEQUENCE",
@@ -3977,6 +3978,8 @@ psql_completion(const char *text, int start, int end)
             COMPLETE_WITH_QUERY(Query_for_list_of_languages);
         else if (TailMatches("PROCEDURE"))
             COMPLETE_WITH_VERSIONED_SCHEMA_QUERY(Query_for_list_of_procedures);
+        else if (TailMatches("PUBLICATION"))
+            COMPLETE_WITH_VERSIONED_QUERY(Query_for_list_of_publications);
         else if (TailMatches("ROUTINE"))
             COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_routines);
         else if (TailMatches("SCHEMA"))
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index fbc4aade49..17f358d419 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -7245,6 +7245,9 @@
 { oid => '2273', descr => 'current user privilege on schema by schema oid',
   proname => 'has_schema_privilege', provolatile => 's', prorettype => 'bool',
   proargtypes => 'oid text', prosrc => 'has_schema_privilege_id' },
+{ oid => '9800', descr => 'current user privilege on publication by publication oid',
+  proname => 'has_publication_privilege', provolatile => 's', prorettype => 'bool',
+  proargtypes => 'oid text', prosrc => 'has_publication_privilege_id' },
 
 { oid => '2390',
   descr => 'user privilege on tablespace by username, tablespace name',
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index dab5bc8444..87da458bdb 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -54,6 +54,12 @@ CATALOG(pg_publication,6104,PublicationRelationId)
 
     /* true if partition changes are published using root schema */
     bool        pubviaroot;
+
+#ifdef CATALOG_VARLEN            /* variable-length fields start here */
+    /* NOTE: These fields are not present in a relcache entry's rd_rel field. */
+    /* access permissions */
+    aclitem        pubacl[1] BKI_DEFAULT(_null_);
+#endif
 } FormData_pg_publication;
 
 /* ----------------
@@ -63,6 +69,8 @@ CATALOG(pg_publication,6104,PublicationRelationId)
  */
 typedef FormData_pg_publication *Form_pg_publication;
 
+DECLARE_TOAST(pg_publication, 9801, 9802);
+
 DECLARE_UNIQUE_INDEX_PKEY(pg_publication_oid_index, 6110, PublicationObjectIndexId, on pg_publication using btree(oid
oid_ops));
 DECLARE_UNIQUE_INDEX(pg_publication_pubname_index, 6111, PublicationNameIndexId, on pg_publication using btree(pubname
name_ops));
 
@@ -136,6 +144,8 @@ typedef enum PublicationPartOpt
     PUBLICATION_PART_ALL,
 } PublicationPartOpt;
 
+extern PGDLLIMPORT bool publication_security;
+
 extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
 extern List *GetAllTablesPublications(void);
 extern List *GetAllTablesPublicationRelations(bool pubviaroot);
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 774b835251..9953091370 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -75,6 +75,8 @@ extern void DoCopy(ParseState *pstate, const CopyStmt *stmt,
                    uint64 *processed);
 
 extern void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, List *options);
+extern List *ProcessCopyToPublicationOptions(ParseState *pstate,
+                                             List *options, bool is_from);
 extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause,
                                    const char *filename,
                                    bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List
*options);
@@ -96,7 +98,8 @@ extern RawStmt *CreateCopyToQuery(const CopyStmt *stmt, Relation rel,
                                   int stmt_location, int stmt_len);
 extern CopyToState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *raw_query,
                                Oid queryRelId, const char *filename, bool is_program,
-                               copy_data_dest_cb data_dest_cb, List *attnamelist, List *options);
+                               copy_data_dest_cb data_dest_cb, List *attnamelist, List *options,
+                               List *publication_names);
 extern void EndCopyTo(CopyToState cstate);
 extern uint64 DoCopyTo(CopyToState cstate);
 extern List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 0ea2df5088..6d9b6fa250 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -113,6 +113,7 @@ typedef struct LogicalRepRelation
     char        replident;        /* replica identity */
     char        relkind;        /* remote relation kind */
     Bitmapset  *attkeys;        /* Bitmap of key columns */
+    char       *pubnames;        /* publication names (comma-separated list) */
 } LogicalRepRelation;
 
 /* Type mapping info */
diff --git a/src/include/utils/acl.h b/src/include/utils/acl.h
index f8e1238fa2..eb4e5044e8 100644
--- a/src/include/utils/acl.h
+++ b/src/include/utils/acl.h
@@ -169,6 +169,7 @@ typedef struct ArrayType Acl;
 #define ACL_ALL_RIGHTS_SCHEMA        (ACL_USAGE|ACL_CREATE)
 #define ACL_ALL_RIGHTS_TABLESPACE    (ACL_CREATE)
 #define ACL_ALL_RIGHTS_TYPE            (ACL_USAGE)
+#define ACL_ALL_RIGHTS_PUBLICATION    (ACL_USAGE)
 
 /* operation codes for pg_*_aclmask */
 typedef enum
diff --git a/src/include/utils/guc_tables.h b/src/include/utils/guc_tables.h
index d5a0880678..87ddeecc6e 100644
--- a/src/include/utils/guc_tables.h
+++ b/src/include/utils/guc_tables.h
@@ -75,6 +75,7 @@ enum config_group
     REPLICATION_SENDING,
     REPLICATION_PRIMARY,
     REPLICATION_STANDBY,
+    REPLICATION_PUBLISHERS,
     REPLICATION_SUBSCRIBERS,
     QUERY_TUNING_METHOD,
     QUERY_TUNING_COST,
diff --git a/src/test/modules/test_copy_callbacks/test_copy_callbacks.c
b/src/test/modules/test_copy_callbacks/test_copy_callbacks.c
index e65771067e..9178e102bb 100644
--- a/src/test/modules/test_copy_callbacks/test_copy_callbacks.c
+++ b/src/test/modules/test_copy_callbacks/test_copy_callbacks.c
@@ -38,7 +38,7 @@ test_copy_to_callback(PG_FUNCTION_ARGS)
     int64        processed;
 
     cstate = BeginCopyTo(NULL, rel, NULL, RelationGetRelid(rel), NULL, false,
-                         to_cb, NIL, NIL);
+                         to_cb, NIL, NIL, NIL);
     processed = DoCopyTo(cstate);
     EndCopyTo(cstate);
 
diff --git a/src/test/regress/expected/copy.out b/src/test/regress/expected/copy.out
index 8a8bf43fde..b6011bea0f 100644
--- a/src/test/regress/expected/copy.out
+++ b/src/test/regress/expected/copy.out
@@ -240,3 +240,55 @@ SELECT * FROM header_copytest ORDER BY a;
 (5 rows)
 
 drop table header_copytest;
+-- Filtering by publication
+-- Suppress the warning about insufficient wal_level when creating
+-- publications.
+set client_min_messages to error;
+create role regress_copy_repl_user login replication;
+create table published_copytest (i int);
+insert into published_copytest(i) select x from generate_series(1, 10) g(x);
+create publication pub1 for table published_copytest where (i >= 7);
+set publication_security to on;
+-- Test both table name and query forms of the COPY command.
+set role regress_copy_repl_user;
+copy published_copytest to stdout (publication_names (pub1));
+7
+8
+9
+10
+copy (select i from published_copytest) to stdout (publication_names (pub1));
+7
+8
+9
+10
+reset role;
+-- Publish some more data.
+create publication pub2 for table published_copytest where (i <= 2);
+set role regress_copy_repl_user;
+copy published_copytest to stdout (publication_names (pub1, pub2));
+1
+2
+7
+8
+9
+10
+reset role;
+-- If any publication has no filter, the other filters are ignored.
+create publication pub3 for table published_copytest;
+set role regress_copy_repl_user;
+copy published_copytest to stdout (publication_names (pub1, pub2, pub3));
+1
+2
+3
+4
+5
+6
+7
+8
+9
+10
+reset role;
+reset publication_security;
+reset client_min_messages;
+drop role regress_copy_repl_user;
+drop publication pub1, pub2, pub3;
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 427f87ea07..76a70c80d4 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -87,10 +87,10 @@ RESET client_min_messages;
 -- should be able to add schema to 'FOR TABLE' publication
 ALTER PUBLICATION testpub_fortable ADD TABLES IN SCHEMA pub_test;
 \dRp+ testpub_fortable
-                                Publication testpub_fortable
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                          Publication testpub_fortable
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables:
     "public.testpub_tbl1"
 Tables from schemas:
@@ -99,20 +99,20 @@ Tables from schemas:
 -- should be able to drop schema from 'FOR TABLE' publication
 ALTER PUBLICATION testpub_fortable DROP TABLES IN SCHEMA pub_test;
 \dRp+ testpub_fortable
-                                Publication testpub_fortable
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                          Publication testpub_fortable
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables:
     "public.testpub_tbl1"
 
 -- should be able to set schema to 'FOR TABLE' publication
 ALTER PUBLICATION testpub_fortable SET TABLES IN SCHEMA pub_test;
 \dRp+ testpub_fortable
-                                Publication testpub_fortable
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                          Publication testpub_fortable
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables from schemas:
     "pub_test"
 
@@ -123,10 +123,10 @@ CREATE PUBLICATION testpub_forschema FOR TABLES IN SCHEMA pub_test;
 CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA pub_test, TABLE pub_test.testpub_nopk;
 RESET client_min_messages;
 \dRp+ testpub_for_tbl_schema
-                             Publication testpub_for_tbl_schema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                       Publication testpub_for_tbl_schema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables:
     "pub_test.testpub_nopk"
 Tables from schemas:
@@ -135,10 +135,10 @@ Tables from schemas:
 -- should be able to add a table of the same schema to the schema publication
 ALTER PUBLICATION testpub_forschema ADD TABLE pub_test.testpub_nopk;
 \dRp+ testpub_forschema
-                               Publication testpub_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables:
     "pub_test.testpub_nopk"
 Tables from schemas:
@@ -147,10 +147,10 @@ Tables from schemas:
 -- should be able to drop the table
 ALTER PUBLICATION testpub_forschema DROP TABLE pub_test.testpub_nopk;
 \dRp+ testpub_forschema
-                               Publication testpub_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables from schemas:
     "pub_test"
 
@@ -161,10 +161,10 @@ ERROR:  relation "testpub_nopk" is not part of the publication
 -- should be able to set table to schema publication
 ALTER PUBLICATION testpub_forschema SET TABLE pub_test.testpub_nopk;
 \dRp+ testpub_forschema
-                               Publication testpub_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables:
     "pub_test.testpub_nopk"
 
@@ -186,10 +186,10 @@ Publications:
     "testpub_foralltables"
 
 \dRp+ testpub_foralltables
-                              Publication testpub_foralltables
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | t          | t       | t       | f       | f         | f
+                                        Publication testpub_foralltables
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | t          | t       | t       | f       | f         | f        | 
 (1 row)
 
 DROP TABLE testpub_tbl2;
@@ -201,19 +201,19 @@ CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3;
 CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3;
 RESET client_min_messages;
 \dRp+ testpub3
-                                    Publication testpub3
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                              Publication testpub3
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables:
     "public.testpub_tbl3"
     "public.testpub_tbl3a"
 
 \dRp+ testpub4
-                                    Publication testpub4
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                              Publication testpub4
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables:
     "public.testpub_tbl3"
 
@@ -234,10 +234,10 @@ UPDATE testpub_parted1 SET a = 1;
 -- only parent is listed as being in publication, not the partition
 ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
 \dRp+ testpub_forparted
-                               Publication testpub_forparted
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub_forparted
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables:
     "public.testpub_parted"
 
@@ -252,10 +252,10 @@ ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1;
 UPDATE testpub_parted1 SET a = 1;
 ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true);
 \dRp+ testpub_forparted
-                               Publication testpub_forparted
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | t
+                                         Publication testpub_forparted
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | t        | 
 Tables:
     "public.testpub_parted"
 
@@ -284,10 +284,10 @@ SET client_min_messages = 'ERROR';
 CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1, testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5) WITH (publish =
'insert');
 RESET client_min_messages;
 \dRp+ testpub5
-                                    Publication testpub5
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | f         | f
+                                              Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | f       | f       | f         | f        | 
 Tables:
     "public.testpub_rf_tbl1"
     "public.testpub_rf_tbl2" WHERE ((c <> 'test'::text) AND (d < 5))
@@ -300,10 +300,10 @@ Tables:
 
 ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000 AND e < 2000);
 \dRp+ testpub5
-                                    Publication testpub5
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | f         | f
+                                              Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | f       | f       | f         | f        | 
 Tables:
     "public.testpub_rf_tbl1"
     "public.testpub_rf_tbl2" WHERE ((c <> 'test'::text) AND (d < 5))
@@ -319,10 +319,10 @@ Publications:
 
 ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2;
 \dRp+ testpub5
-                                    Publication testpub5
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | f         | f
+                                              Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | f       | f       | f         | f        | 
 Tables:
     "public.testpub_rf_tbl1"
     "public.testpub_rf_tbl3" WHERE ((e > 1000) AND (e < 2000))
@@ -330,10 +330,10 @@ Tables:
 -- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression)
 ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500);
 \dRp+ testpub5
-                                    Publication testpub5
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | f         | f
+                                              Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | f       | f       | f         | f        | 
 Tables:
     "public.testpub_rf_tbl3" WHERE ((e > 300) AND (e < 500))
 
@@ -366,10 +366,10 @@ SET client_min_messages = 'ERROR';
 CREATE PUBLICATION testpub_syntax1 FOR TABLE testpub_rf_tbl1, ONLY testpub_rf_tbl3 WHERE (e < 999) WITH (publish =
'insert');
 RESET client_min_messages;
 \dRp+ testpub_syntax1
-                                Publication testpub_syntax1
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | f         | f
+                                          Publication testpub_syntax1
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | f       | f       | f         | f        | 
 Tables:
     "public.testpub_rf_tbl1"
     "public.testpub_rf_tbl3" WHERE (e < 999)
@@ -379,10 +379,10 @@ SET client_min_messages = 'ERROR';
 CREATE PUBLICATION testpub_syntax2 FOR TABLE testpub_rf_tbl1, testpub_rf_schema1.testpub_rf_tbl5 WHERE (h < 999) WITH
(publish= 'insert');
 
 RESET client_min_messages;
 \dRp+ testpub_syntax2
-                                Publication testpub_syntax2
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | f         | f
+                                          Publication testpub_syntax2
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | f       | f       | f         | f        | 
 Tables:
     "public.testpub_rf_tbl1"
     "testpub_rf_schema1.testpub_rf_tbl5" WHERE (h < 999)
@@ -497,10 +497,10 @@ CREATE PUBLICATION testpub6 FOR TABLES IN SCHEMA testpub_rf_schema2;
 ALTER PUBLICATION testpub6 SET TABLES IN SCHEMA testpub_rf_schema2, TABLE testpub_rf_schema2.testpub_rf_tbl6 WHERE (i
<99);
 
 RESET client_min_messages;
 \dRp+ testpub6
-                                    Publication testpub6
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                              Publication testpub6
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables:
     "testpub_rf_schema2.testpub_rf_tbl6" WHERE (i < 99)
 Tables from schemas:
@@ -714,10 +714,10 @@ CREATE PUBLICATION testpub_table_ins WITH (publish = 'insert, truncate');
 RESET client_min_messages;
 ALTER PUBLICATION testpub_table_ins ADD TABLE testpub_tbl5 (a);        -- ok
 \dRp+ testpub_table_ins
-                               Publication testpub_table_ins
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | t         | f
+                                         Publication testpub_table_ins
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | f       | f       | t         | f        | 
 Tables:
     "public.testpub_tbl5" (a)
 
@@ -891,10 +891,10 @@ CREATE TABLE testpub_tbl_both_filters (a int, b int, c int, PRIMARY KEY (a,c));
 ALTER TABLE testpub_tbl_both_filters REPLICA IDENTITY USING INDEX testpub_tbl_both_filters_pkey;
 ALTER PUBLICATION testpub_both_filters ADD TABLE testpub_tbl_both_filters (a,c) WHERE (c != 1);
 \dRp+ testpub_both_filters
-                              Publication testpub_both_filters
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                        Publication testpub_both_filters
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables:
     "public.testpub_tbl_both_filters" (a, c) WHERE (c <> 1)
 
@@ -1099,10 +1099,10 @@ ERROR:  relation "testpub_tbl1" is already member of publication "testpub_fortbl
 CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1;
 ERROR:  publication "testpub_fortbl" already exists
 \dRp+ testpub_fortbl
-                                 Publication testpub_fortbl
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                           Publication testpub_fortbl
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables:
     "pub_test.testpub_nopk"
     "public.testpub_tbl1"
@@ -1140,10 +1140,10 @@ Publications:
     "testpub_fortbl"
 
 \dRp+ testpub_default
-                                Publication testpub_default
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | f         | f
+                                          Publication testpub_default
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | f         | f        | 
 Tables:
     "pub_test.testpub_nopk"
     "public.testpub_tbl1"
@@ -1214,17 +1214,57 @@ ALTER PUBLICATION testpub4 owner to regress_publication_user2; -- fail
 ERROR:  permission denied to change owner of publication "testpub4"
 HINT:  The owner of a FOR TABLES IN SCHEMA publication must be a superuser.
 ALTER PUBLICATION testpub4 owner to regress_publication_user; -- ok
+-- Test the USAGE privilege.
+SET ROLE regress_publication_user;
+CREATE ROLE regress_publication_user4;
+-- First, check that USAGE is granted to PUBLIC by default.
+SET ROLE regress_publication_user4;
+SELECT has_publication_privilege(p.oid, 'usage')
+FROM pg_catalog.pg_publication p
+WHERE p.pubname='testpub4';
+ has_publication_privilege 
+---------------------------
+ t
+(1 row)
+
+-- Revoke the USAGE privilege from PUBLIC.
+SET ROLE regress_publication_user;
+REVOKE USAGE ON PUBLICATION testpub4 FROM public;
+-- regress_publication_user4 does not have the privilege now.
+SET ROLE regress_publication_user4;
+SELECT has_publication_privilege(p.oid, 'usage')
+FROM pg_catalog.pg_publication p
+WHERE p.pubname='testpub4';
+ has_publication_privilege 
+---------------------------
+ f
+(1 row)
+
+-- Grant USAGE to regress_publication_user4 explicitly.
+SET ROLE regress_publication_user;
+GRANT USAGE ON PUBLICATION testpub4 TO regress_publication_user4;
+-- regress_publication_user4 does have the privilege now.
+SET ROLE regress_publication_user4;
+SELECT has_publication_privilege(p.oid, 'usage')
+FROM pg_catalog.pg_publication p
+WHERE p.pubname='testpub4';
+ has_publication_privilege 
+---------------------------
+ t
+(1 row)
+
 SET ROLE regress_publication_user;
 DROP PUBLICATION testpub4;
 DROP ROLE regress_publication_user3;
+DROP ROLE regress_publication_user4;
 REVOKE CREATE ON DATABASE regression FROM regress_publication_user2;
 DROP TABLE testpub_parted;
 DROP TABLE testpub_tbl1;
 \dRp+ testpub_default
-                                Publication testpub_default
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | f         | f
+                                          Publication testpub_default
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | f         | f        | 
 (1 row)
 
 -- fail - must be owner of publication
@@ -1263,19 +1303,19 @@ CREATE TABLE "CURRENT_SCHEMA"."CURRENT_SCHEMA"(id int);
 SET client_min_messages = 'ERROR';
 CREATE PUBLICATION testpub1_forschema FOR TABLES IN SCHEMA pub_test1;
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables from schemas:
     "pub_test1"
 
 CREATE PUBLICATION testpub2_forschema FOR TABLES IN SCHEMA pub_test1, pub_test2, pub_test3;
 \dRp+ testpub2_forschema
-                               Publication testpub2_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub2_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1289,44 +1329,44 @@ CREATE PUBLICATION testpub6_forschema FOR TABLES IN SCHEMA "CURRENT_SCHEMA", CUR
 CREATE PUBLICATION testpub_fortable FOR TABLE "CURRENT_SCHEMA"."CURRENT_SCHEMA";
 RESET client_min_messages;
 \dRp+ testpub3_forschema
-                               Publication testpub3_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub3_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables from schemas:
     "public"
 
 \dRp+ testpub4_forschema
-                               Publication testpub4_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub4_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables from schemas:
     "CURRENT_SCHEMA"
 
 \dRp+ testpub5_forschema
-                               Publication testpub5_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub5_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables from schemas:
     "CURRENT_SCHEMA"
     "public"
 
 \dRp+ testpub6_forschema
-                               Publication testpub6_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub6_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables from schemas:
     "CURRENT_SCHEMA"
     "public"
 
 \dRp+ testpub_fortable
-                                Publication testpub_fortable
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                          Publication testpub_fortable
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables:
     "CURRENT_SCHEMA.CURRENT_SCHEMA"
 
@@ -1360,10 +1400,10 @@ ERROR:  schema "testpub_view" does not exist
 -- dropping the schema should reflect the change in publication
 DROP SCHEMA pub_test3;
 \dRp+ testpub2_forschema
-                               Publication testpub2_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub2_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1371,20 +1411,20 @@ Tables from schemas:
 -- renaming the schema should reflect the change in publication
 ALTER SCHEMA pub_test1 RENAME to pub_test1_renamed;
 \dRp+ testpub2_forschema
-                               Publication testpub2_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub2_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables from schemas:
     "pub_test1_renamed"
     "pub_test2"
 
 ALTER SCHEMA pub_test1_renamed RENAME to pub_test1;
 \dRp+ testpub2_forschema
-                               Publication testpub2_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub2_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1392,10 +1432,10 @@ Tables from schemas:
 -- alter publication add schema
 ALTER PUBLICATION testpub1_forschema ADD TABLES IN SCHEMA pub_test2;
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1404,10 +1444,10 @@ Tables from schemas:
 ALTER PUBLICATION testpub1_forschema ADD TABLES IN SCHEMA non_existent_schema;
 ERROR:  schema "non_existent_schema" does not exist
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1416,10 +1456,10 @@ Tables from schemas:
 ALTER PUBLICATION testpub1_forschema ADD TABLES IN SCHEMA pub_test1;
 ERROR:  schema "pub_test1" is already member of publication "testpub1_forschema"
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1427,10 +1467,10 @@ Tables from schemas:
 -- alter publication drop schema
 ALTER PUBLICATION testpub1_forschema DROP TABLES IN SCHEMA pub_test2;
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables from schemas:
     "pub_test1"
 
@@ -1438,10 +1478,10 @@ Tables from schemas:
 ALTER PUBLICATION testpub1_forschema DROP TABLES IN SCHEMA pub_test2;
 ERROR:  tables from schema "pub_test2" are not part of the publication
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables from schemas:
     "pub_test1"
 
@@ -1449,29 +1489,29 @@ Tables from schemas:
 ALTER PUBLICATION testpub1_forschema DROP TABLES IN SCHEMA non_existent_schema;
 ERROR:  schema "non_existent_schema" does not exist
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables from schemas:
     "pub_test1"
 
 -- drop all schemas
 ALTER PUBLICATION testpub1_forschema DROP TABLES IN SCHEMA pub_test1;
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 (1 row)
 
 -- alter publication set multiple schema
 ALTER PUBLICATION testpub1_forschema SET TABLES IN SCHEMA pub_test1, pub_test2;
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1480,10 +1520,10 @@ Tables from schemas:
 ALTER PUBLICATION testpub1_forschema SET TABLES IN SCHEMA non_existent_schema;
 ERROR:  schema "non_existent_schema" does not exist
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1492,10 +1532,10 @@ Tables from schemas:
 -- removing the duplicate schemas
 ALTER PUBLICATION testpub1_forschema SET TABLES IN SCHEMA pub_test1, pub_test1;
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables from schemas:
     "pub_test1"
 
@@ -1574,18 +1614,18 @@ SET client_min_messages = 'ERROR';
 CREATE PUBLICATION testpub3_forschema;
 RESET client_min_messages;
 \dRp+ testpub3_forschema
-                               Publication testpub3_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub3_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 (1 row)
 
 ALTER PUBLICATION testpub3_forschema SET TABLES IN SCHEMA pub_test1;
 \dRp+ testpub3_forschema
-                               Publication testpub3_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub3_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables from schemas:
     "pub_test1"
 
@@ -1595,20 +1635,20 @@ CREATE PUBLICATION testpub_forschema_fortable FOR TABLES IN SCHEMA pub_test1, TA
 CREATE PUBLICATION testpub_fortable_forschema FOR TABLE pub_test2.tbl1, TABLES IN SCHEMA pub_test1;
 RESET client_min_messages;
 \dRp+ testpub_forschema_fortable
-                           Publication testpub_forschema_fortable
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                     Publication testpub_forschema_fortable
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables:
     "pub_test2.tbl1"
 Tables from schemas:
     "pub_test1"
 
 \dRp+ testpub_fortable_forschema
-                           Publication testpub_fortable_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                     Publication testpub_fortable_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Access privileges 
+--------------------------+------------+---------+---------+---------+-----------+----------+-------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | 
 Tables:
     "pub_test2.tbl1"
 Tables from schemas:
diff --git a/src/test/regress/sql/copy.sql b/src/test/regress/sql/copy.sql
index f9da7b1508..4174823cff 100644
--- a/src/test/regress/sql/copy.sql
+++ b/src/test/regress/sql/copy.sql
@@ -268,3 +268,39 @@ a    c    b
 
 SELECT * FROM header_copytest ORDER BY a;
 drop table header_copytest;
+
+-- Filtering by publication
+
+-- Suppress the warning about insufficient wal_level when creating
+-- publications.
+set client_min_messages to error;
+
+create role regress_copy_repl_user login replication;
+create table published_copytest (i int);
+insert into published_copytest(i) select x from generate_series(1, 10) g(x);
+create publication pub1 for table published_copytest where (i >= 7);
+
+set publication_security to on;
+
+-- Test both table name and query forms of the COPY command.
+set role regress_copy_repl_user;
+copy published_copytest to stdout (publication_names (pub1));
+copy (select i from published_copytest) to stdout (publication_names (pub1));
+reset role;
+
+-- Publish some more data.
+create publication pub2 for table published_copytest where (i <= 2);
+set role regress_copy_repl_user;
+copy published_copytest to stdout (publication_names (pub1, pub2));
+reset role;
+
+-- If any publication has no filter, the other filters are ignored.
+create publication pub3 for table published_copytest;
+set role regress_copy_repl_user;
+copy published_copytest to stdout (publication_names (pub1, pub2, pub3));
+reset role;
+
+reset publication_security;
+reset client_min_messages;
+drop role regress_copy_repl_user;
+drop publication pub1, pub2, pub3;
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index a47c5939d5..303870a1e9 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -808,9 +808,37 @@ SET ROLE regress_publication_user3;
 ALTER PUBLICATION testpub4 owner to regress_publication_user2; -- fail
 ALTER PUBLICATION testpub4 owner to regress_publication_user; -- ok
 
+-- Test the USAGE privilege.
+SET ROLE regress_publication_user;
+CREATE ROLE regress_publication_user4;
+-- First, check that USAGE is granted to PUBLIC by default.
+SET ROLE regress_publication_user4;
+SELECT has_publication_privilege(p.oid, 'usage')
+FROM pg_catalog.pg_publication p
+WHERE p.pubname='testpub4';
+
+-- Revoke the USAGE privilege from PUBLIC.
+SET ROLE regress_publication_user;
+REVOKE USAGE ON PUBLICATION testpub4 FROM public;
+-- regress_publication_user4 does not have the privilege now.
+SET ROLE regress_publication_user4;
+SELECT has_publication_privilege(p.oid, 'usage')
+FROM pg_catalog.pg_publication p
+WHERE p.pubname='testpub4';
+
+-- Grant USAGE to regress_publication_user4 explicitly.
+SET ROLE regress_publication_user;
+GRANT USAGE ON PUBLICATION testpub4 TO regress_publication_user4;
+-- regress_publication_user4 does have the privilege now.
+SET ROLE regress_publication_user4;
+SELECT has_publication_privilege(p.oid, 'usage')
+FROM pg_catalog.pg_publication p
+WHERE p.pubname='testpub4';
+
 SET ROLE regress_publication_user;
 DROP PUBLICATION testpub4;
 DROP ROLE regress_publication_user3;
+DROP ROLE regress_publication_user4;
 
 REVOKE CREATE ON DATABASE regression FROM regress_publication_user2;
 
diff --git a/src/test/subscription/t/027_nosuperuser.pl b/src/test/subscription/t/027_nosuperuser.pl
index 59192dbe2f..31e94514c1 100644
--- a/src/test/subscription/t/027_nosuperuser.pl
+++ b/src/test/subscription/t/027_nosuperuser.pl
@@ -7,8 +7,10 @@ use warnings;
 use PostgreSQL::Test::Cluster;
 use Test::More;
 
-my ($node_publisher, $node_subscriber, $publisher_connstr, $result, $offset);
+my ($node_publisher, $node_subscriber, $publisher_connstr, $result, $offset,
+    $offset_pub);
 $offset = 0;
+$offset_pub = 0;
 
 sub publish_insert
 {
@@ -103,7 +105,8 @@ $node_publisher->init(allows_streaming => 'logical');
 $node_subscriber->init;
 $node_publisher->start;
 $node_subscriber->start;
-$publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+# Non-super user, so that we can test publication privileges.
+$publisher_connstr = $node_publisher->connstr . ' dbname=postgres user=regress_alice';
 my %remainder_a = (
     publisher  => 0,
     subscriber => 1);
@@ -141,6 +144,8 @@ for my $node ($node_publisher, $node_subscriber)
 }
 $node_publisher->safe_psql(
     'postgres', qq(
+ALTER ROLE regress_alice REPLICATION;
+
 SET SESSION AUTHORIZATION regress_alice;
 
 CREATE PUBLICATION alice
@@ -316,4 +321,53 @@ expect_replication("alice.unpartitioned", 2, 23, 25,
     "nosuperuser nobypassrls table owner can replicate delete into unpartitioned despite rls"
 );
 
+# Test publication permissions.
+$node_publisher->append_conf(
+    'postgresql.conf',
+    qq[
+publication_security = on
+]);
+$node_publisher->restart;
+
+# First, make sure that the user specified in the subscription is not able to
+# access the data, then do some changes. (By deleting everything we make the
+# following checks simpler.)
+$node_publisher->safe_psql(
+    'postgres', qq(
+REVOKE USAGE ON PUBLICATION alice FROM PUBLIC;
+REVOKE USAGE ON PUBLICATION alice FROM regress_alice;
+ALTER DATABASE postgres SET publication_security TO on;
+
+DELETE FROM alice.unpartitioned;
+));
+# Missing permission should cause error.
+expect_failure("alice.unpartitioned", 2, 23, 25,
+               qr/ERROR: ( [A-Z0-9]+:)? permission denied for publication alice/msi, 0);
+# Check that the missing privilege makes table synchronization fail too.
+$node_subscriber->safe_psql(
+    'postgres', qq(
+SET SESSION AUTHORIZATION regress_admin;
+DROP SUBSCRIPTION admin_sub;
+TRUNCATE TABLE alice.unpartitioned;
+CREATE SUBSCRIPTION admin_sub CONNECTION '$publisher_connstr' PUBLICATION alice;
+));
+# Note that expect_failure() does not wait for the end of the synchronization,
+# so if there was any data on publisher side and if it found its way to the
+# subscriber, the function might still see an empty table. So we only rely on
+# the function to check the error message.
+expect_failure("alice.unpartitioned", 0, '', '',
+               qr/ERROR: ( [A-Z0-9]+:)? permission denied for publication alice/msi, 0);
+# Restore the privilege on the publication.
+$node_publisher->safe_psql(
+    'postgres', qq(
+GRANT USAGE ON PUBLICATION alice TO regress_alice;
+));
+# Wait for synchronization to complete.
+$node_subscriber->wait_for_subscription_sync;
+# The replication should work again now.
+publish_insert("alice.unpartitioned", 1);
+expect_replication("alice.unpartitioned", 1, 1, 1,
+   "unpartitioned is replicated as soon as regress_alic has permissions on alice publication"
+);
+
 done_testing();
-- 
2.31.1


pgsql-hackers by date:

Previous
From: Peter Eisentraut
Date:
Subject: meson documentation build open issues
Next
From: Peter Eisentraut
Date:
Subject: pkg-config Requires.private entries should be comma-separated