Re: Privileges on PUBLICATION - Mailing list pgsql-hackers

From Antonin Houska
Subject Re: Privileges on PUBLICATION
Date
Msg-id 1537.1667454192@antos
Whole thread Raw
In response to Re: Privileges on PUBLICATION  (Peter Eisentraut <peter.eisentraut@enterprisedb.com>)
Responses Re: Privileges on PUBLICATION
Re: Privileges on PUBLICATION
List pgsql-hackers
Peter Eisentraut <peter.eisentraut@enterprisedb.com> wrote:

> The CF entry is about privileges on publications.  Please rebase that patch
> and repost it so that the CF app and the CF bot are up to date.

The rebased patch (with regression tests added) is attached here.

There's still one design issue that I haven't mentioned yet: if the USAGE
privilege on a publication is revoked after the synchronization phase
completed, the missing privilege on a publication causes ERROR in the output
plugin. If the privilege is then granted, the error does not disappear because
the same (historical) snapshot we use to decode the failed data change again
is also used to check the privileges in the catalog, so the output plugin does
not see that the privilege has already been granted.

The only solution seems to be to drop the publication from the subscription
and add it again, or to drop and re-create the whole subscription. I haven't
added a note about this problem to the documentation yet, in case someone has
better idea how to approach the problem.

-- 
Antonin Houska
Web: https://www.cybertec-postgresql.com

diff --git a/doc/src/sgml/ddl.sgml b/doc/src/sgml/ddl.sgml
index 03c0193709..d510220a07 100644
--- a/doc/src/sgml/ddl.sgml
+++ b/doc/src/sgml/ddl.sgml
@@ -1935,6 +1935,13 @@ REVOKE ALL ON accounts FROM PUBLIC;
        statements that have previously performed this lookup, so this is not
        a completely secure way to prevent object access.
       </para>
+      <para>
+       For publications, allows logical replication via particular
+       publication. The user specified in
+       the <link linkend="sql-createsubscription"><command>CREATE
+       SUBSCRIPTION</command></link> command must have this privilege on all
+       publications listed in that command.
+      </para>
       <para>
        For sequences, allows use of the
        <function>currval</function> and <function>nextval</function> functions.
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index f8756389a3..4286d709d9 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1712,7 +1712,9 @@ CONTEXT:  processing remote data for replication origin "pg_16395" during "INSER
   <para>
    In order to be able to copy the initial table data, the role used for the
    replication connection must have the <literal>SELECT</literal> privilege on
-   a published table (or be a superuser).
+   a published table (or be a superuser). In addition, the role must have
+   the <literal>USAGE</literal> privilege on all the publications referenced
+   by particular subscription.
   </para>
 
   <para>
@@ -1728,14 +1730,12 @@ CONTEXT:  processing remote data for replication origin "pg_16395" during "INSER
   </para>
 
   <para>
-   There are currently no privileges on publications.  Any subscription (that
-   is able to connect) can access any publication.  Thus, if you intend to
-   hide some information from particular subscribers, such as by using row
-   filters or column lists, or by not adding the whole table to the
-   publication, be aware that other publications in the same database could
-   expose the same information.  Publication privileges might be added to
-   <productname>PostgreSQL</productname> in the future to allow for
-   finer-grained access control.
+   If you intend to hide some information from particular subscribers, such as
+   by using row filters or column lists, or by not adding the whole table to
+   the publication, be aware that other publications in the same database
+   could expose the same
+   information. <link linkend="ddl-priv">Privileges</link> on publication can
+   be used to implement finer-grained access control.
   </para>
 
   <para>
diff --git a/doc/src/sgml/ref/grant.sgml b/doc/src/sgml/ref/grant.sgml
index dea19cd348..e62b7f643c 100644
--- a/doc/src/sgml/ref/grant.sgml
+++ b/doc/src/sgml/ref/grant.sgml
@@ -82,6 +82,11 @@ GRANT { { SET | ALTER SYSTEM } [, ... ] | ALL [ PRIVILEGES ] }
     TO <replaceable class="parameter">role_specification</replaceable> [, ...] [ WITH GRANT OPTION ]
     [ GRANTED BY <replaceable class="parameter">role_specification</replaceable> ]
 
+GRANT { USAGE | ALL [ PRIVILEGES ] }
+    ON PUBLICATION <replaceable>pub_name</replaceable> [, ...]
+    TO <replaceable class="parameter">role_specification</replaceable> [, ...] [ WITH GRANT OPTION ]
+    [ GRANTED BY <replaceable class="parameter">role_specification</replaceable> ]
+
 GRANT { { CREATE | USAGE } [, ...] | ALL [ PRIVILEGES ] }
     ON SCHEMA <replaceable>schema_name</replaceable> [, ...]
     TO <replaceable class="parameter">role_specification</replaceable> [, ...] [ WITH GRANT OPTION ]
@@ -488,7 +493,7 @@ GRANT admins TO joe;
    </para>
 
    <para>
-    Privileges on databases, tablespaces, schemas, languages, and
+    Privileges on databases, tablespaces, schemas, languages, publications and
     configuration parameters are
     <productname>PostgreSQL</productname> extensions.
    </para>
diff --git a/doc/src/sgml/ref/revoke.sgml b/doc/src/sgml/ref/revoke.sgml
index 4fd4bfb3d7..7e8d018743 100644
--- a/doc/src/sgml/ref/revoke.sgml
+++ b/doc/src/sgml/ref/revoke.sgml
@@ -104,6 +104,13 @@ REVOKE [ GRANT OPTION FOR ]
     [ GRANTED BY <replaceable class="parameter">role_specification</replaceable> ]
     [ CASCADE | RESTRICT ]
 
+REVOKE [ GRANT OPTION FOR ]
+    { USAGE | ALL [ PRIVILEGES ] }
+    ON PUBLICATION <replaceable>pub_name</replaceable> [, ...]
+    FROM <replaceable class="parameter">role_specification</replaceable> [, ...]
+    [ GRANTED BY <replaceable class="parameter">role_specification</replaceable> ]
+    [ CASCADE | RESTRICT ]
+
 REVOKE [ GRANT OPTION FOR ]
     { { CREATE | USAGE } [, ...] | ALL [ PRIVILEGES ] }
     ON SCHEMA <replaceable>schema_name</replaceable> [, ...]
diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c
index aa5a2ed948..42eff8d8a4 100644
--- a/src/backend/catalog/aclchk.c
+++ b/src/backend/catalog/aclchk.c
@@ -111,6 +111,7 @@ static void ExecGrant_ForeignServer(InternalGrant *istmt);
 static void ExecGrant_Function(InternalGrant *istmt);
 static void ExecGrant_Language(InternalGrant *istmt);
 static void ExecGrant_Largeobject(InternalGrant *istmt);
+static void ExecGrant_Publication(InternalGrant *istmt);
 static void ExecGrant_Namespace(InternalGrant *istmt);
 static void ExecGrant_Tablespace(InternalGrant *istmt);
 static void ExecGrant_Type(InternalGrant *istmt);
@@ -478,6 +479,10 @@ ExecuteGrantStmt(GrantStmt *stmt)
             all_privileges = ACL_ALL_RIGHTS_LARGEOBJECT;
             errormsg = gettext_noop("invalid privilege type %s for large object");
             break;
+        case OBJECT_PUBLICATION:
+            all_privileges = ACL_ALL_RIGHTS_PUBLICATION;
+            errormsg = gettext_noop("invalid privilege type %s for publication");
+            break;
         case OBJECT_SCHEMA:
             all_privileges = ACL_ALL_RIGHTS_SCHEMA;
             errormsg = gettext_noop("invalid privilege type %s for schema");
@@ -606,6 +611,9 @@ ExecGrantStmt_oids(InternalGrant *istmt)
         case OBJECT_LARGEOBJECT:
             ExecGrant_Largeobject(istmt);
             break;
+        case OBJECT_PUBLICATION:
+            ExecGrant_Publication(istmt);
+            break;
         case OBJECT_SCHEMA:
             ExecGrant_Namespace(istmt);
             break;
@@ -716,6 +724,16 @@ objectNamesToOids(ObjectType objtype, List *objnames, bool is_grant)
                 objects = lappend_oid(objects, lobjOid);
             }
             break;
+        case OBJECT_PUBLICATION:
+            foreach(cell, objnames)
+            {
+                char       *nspname = strVal(lfirst(cell));
+                Oid            oid;
+
+                oid = get_publication_oid(nspname, false);
+                objects = lappend_oid(objects, oid);
+            }
+            break;
         case OBJECT_SCHEMA:
             foreach(cell, objnames)
             {
@@ -2854,6 +2872,126 @@ ExecGrant_Largeobject(InternalGrant *istmt)
     table_close(relation, RowExclusiveLock);
 }
 
+static void
+ExecGrant_Publication(InternalGrant *istmt)
+{
+    Relation    relation;
+    ListCell   *cell;
+
+    if (istmt->all_privs && istmt->privileges == ACL_NO_RIGHTS)
+        istmt->privileges = ACL_ALL_RIGHTS_PUBLICATION;
+
+    relation = table_open(PublicationRelationId, RowExclusiveLock);
+
+    foreach(cell, istmt->objects)
+    {
+        Oid            pubid = lfirst_oid(cell);
+        HeapTuple    tuple;
+        HeapTuple    newtuple;
+        Form_pg_publication pg_pub_tuple;
+        Oid            ownerId;
+        Datum        aclDatum;
+        bool        isnull;
+        AclMode        avail_goptions;
+        AclMode        this_privileges;
+        Acl           *old_acl;
+        Acl           *new_acl;
+        Oid            grantorId;
+        Datum        values[Natts_pg_publication];
+        bool        nulls[Natts_pg_publication];
+        bool        replaces[Natts_pg_publication];
+        int            noldmembers;
+        int            nnewmembers;
+        Oid           *oldmembers;
+        Oid           *newmembers;
+
+        tuple = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
+        if (!HeapTupleIsValid(tuple))
+            elog(ERROR, "cache lookup failed for publication %u", pubid);
+
+        pg_pub_tuple = (Form_pg_publication) GETSTRUCT(tuple);
+        ownerId = pg_pub_tuple->pubowner;
+        aclDatum = SysCacheGetAttr(PUBLICATIONOID, tuple,
+                                   Anum_pg_publication_pubacl,
+                                   &isnull);
+
+        if (isnull)
+        {
+            old_acl = acldefault(OBJECT_PUBLICATION, ownerId);
+            /* There are no old member roles according to the catalogs */
+            noldmembers = 0;
+            oldmembers = NULL;
+        }
+        else
+        {
+            old_acl = DatumGetAclPCopy(aclDatum);
+            /* Get the roles mentioned in the existing ACL */
+            noldmembers = aclmembers(old_acl, &oldmembers);
+        }
+
+        /* Determine ID to do the grant as, and available grant options */
+        select_best_grantor(GetUserId(), istmt->privileges,
+                            old_acl, ownerId,
+                            &grantorId, &avail_goptions);
+
+        /*
+         * Restrict the privileges to what we can actually grant, and emit the
+         * standards-mandated warning and error messages.
+         */
+        this_privileges =
+            restrict_and_check_grant(istmt->is_grant, avail_goptions,
+                                     istmt->all_privs, istmt->privileges,
+                                     pubid, grantorId, OBJECT_FUNCTION,
+                                     NameStr(pg_pub_tuple->pubname),
+                                     0, NULL);
+
+        /*
+         * Generate new ACL.
+         */
+        new_acl = merge_acl_with_grant(old_acl, istmt->is_grant,
+                                       istmt->grant_option, istmt->behavior,
+                                       istmt->grantees, this_privileges,
+                                       grantorId, ownerId);
+
+        /*
+         * We need the members of both old and new ACLs so we can correct the
+         * shared dependency information.
+         */
+        nnewmembers = aclmembers(new_acl, &newmembers);
+
+        /* finished building new ACL value, now insert it */
+        MemSet(values, 0, sizeof(values));
+        MemSet(nulls, false, sizeof(nulls));
+        MemSet(replaces, false, sizeof(replaces));
+
+        replaces[Anum_pg_publication_pubacl - 1] = true;
+        values[Anum_pg_publication_pubacl - 1] = PointerGetDatum(new_acl);
+
+        newtuple = heap_modify_tuple(tuple, RelationGetDescr(relation), values,
+                                     nulls, replaces);
+
+        CatalogTupleUpdate(relation, &newtuple->t_self, newtuple);
+
+        /* Update initial privileges for extensions */
+        recordExtensionInitPriv(pubid, PublicationRelationId, 0, new_acl);
+
+        /* Update the shared dependency ACL info */
+        updateAclDependencies(PublicationRelationId, pubid, 0,
+                              ownerId,
+                              noldmembers, oldmembers,
+                              nnewmembers, newmembers);
+
+        ReleaseSysCache(tuple);
+
+        pfree(new_acl);
+
+        /* prevent error when processing duplicate objects */
+        CommandCounterIncrement();
+    }
+
+    table_close(relation, RowExclusiveLock);
+}
+
 static void
 ExecGrant_Namespace(InternalGrant *istmt)
 {
@@ -3802,6 +3940,8 @@ pg_aclmask(ObjectType objtype, Oid table_oid, AttrNumber attnum, Oid roleid,
             return pg_database_aclmask(table_oid, roleid, mask, how);
         case OBJECT_FUNCTION:
             return pg_proc_aclmask(table_oid, roleid, mask, how);
+        case OBJECT_PUBLICATION:
+            return pg_publication_aclmask(table_oid, roleid, mask, how);
         case OBJECT_LANGUAGE:
             return pg_language_aclmask(table_oid, roleid, mask, how);
         case OBJECT_LARGEOBJECT:
@@ -4325,6 +4465,60 @@ pg_proc_aclmask(Oid proc_oid, Oid roleid,
     return result;
 }
 
+/*
+ * Exported routine for examining a user's privileges for a publication.
+ */
+AclMode
+pg_publication_aclmask(Oid pubid, Oid roleid, AclMode mask,
+                       AclMaskHow how)
+{
+    AclMode        result;
+    HeapTuple    tuple;
+    Datum        aclDatum;
+    bool        isNull;
+    Acl           *acl;
+    Oid            ownerId;
+
+    /* Superusers bypass all permission checking. */
+    if (superuser_arg(roleid))
+        return mask;
+
+    /*
+     * Get the publication's ACL from pg_publication
+     */
+    tuple = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
+    if (!HeapTupleIsValid(tuple))
+        ereport(ERROR,
+                (errcode(ERRCODE_UNDEFINED_FUNCTION),
+                 errmsg("publication with OID %u does not exist", pubid)));
+
+    ownerId = ((Form_pg_publication) GETSTRUCT(tuple))->pubowner;
+
+    aclDatum = SysCacheGetAttr(PUBLICATIONOID, tuple, Anum_pg_publication_pubacl,
+                               &isNull);
+    if (isNull)
+    {
+        /* No ACL, so build default ACL */
+        acl = acldefault(OBJECT_PUBLICATION, ownerId);
+        aclDatum = (Datum) 0;
+    }
+    else
+    {
+        /* detoast ACL if necessary */
+        acl = DatumGetAclP(aclDatum);
+    }
+
+    result = aclmask(acl, roleid, ownerId, mask, how);
+
+    /* if we have a detoasted copy, free it */
+    if (acl && (Pointer) acl != DatumGetPointer(aclDatum))
+        pfree(acl);
+
+    ReleaseSysCache(tuple);
+
+    return result;
+}
+
 /*
  * Exported routine for examining a user's privileges for a language
  */
@@ -5022,6 +5216,18 @@ pg_proc_aclcheck(Oid proc_oid, Oid roleid, AclMode mode)
         return ACLCHECK_NO_PRIV;
 }
 
+/*
+ * Exported routine for checking a user's access privileges to a publication
+ */
+AclResult
+pg_publication_aclcheck(Oid pub_oid, Oid roleid, AclMode mode)
+{
+    if (pg_publication_aclmask(pub_oid, roleid, mode, ACLMASK_ANY) != 0)
+        return ACLCHECK_OK;
+    else
+        return ACLCHECK_NO_PRIV;
+}
+
 /*
  * Exported routine for checking a user's access privileges to a language
  */
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index f26cc0d162..e69ee2c82a 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -27,6 +27,7 @@
 #include "commands/progress.h"
 #include "executor/execdesc.h"
 #include "executor/executor.h"
+#include "executor/spi.h"
 #include "executor/tuptable.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
@@ -34,9 +35,11 @@
 #include "miscadmin.h"
 #include "optimizer/optimizer.h"
 #include "pgstat.h"
+#include "replication/walsender.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/fd.h"
 #include "tcop/tcopprot.h"
+#include "utils/acl.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/partcache.h"
@@ -131,6 +134,7 @@ static void CopySendEndOfRow(CopyToState cstate);
 static void CopySendInt32(CopyToState cstate, int32 val);
 static void CopySendInt16(CopyToState cstate, int16 val);
 
+static void check_publication_privileges(List *relids);
 
 /*
  * Send copy start/stop messages for frontend copies.  These have changed
@@ -377,6 +381,7 @@ BeginCopyTo(ParseState *pstate,
         PROGRESS_COPY_COMMAND_TO,
         0
     };
+    List    *pubrelids = NIL;
 
     if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
     {
@@ -441,6 +446,13 @@ BeginCopyTo(ParseState *pstate,
         cstate->rel = rel;
 
         tupDesc = RelationGetDescr(cstate->rel);
+
+        /*
+         * In walsender, we need to check the privileges of the related
+         * publications.
+         */
+        if (am_walsender)
+            pubrelids = lappend_oid(pubrelids, RelationGetRelid(rel));
     }
     else
     {
@@ -570,9 +582,32 @@ BeginCopyTo(ParseState *pstate,
          */
         ExecutorStart(cstate->queryDesc, 0);
 
+        /*
+         * In walsender, we need to check the privileges of the related
+         * publications.
+         */
+        if (am_walsender)
+        {
+            ListCell    *lc;
+
+            foreach(lc, plan->rtable)
+            {
+                RangeTblEntry    *rte = lfirst_node(RangeTblEntry, lc);
+
+                if (rte->rtekind != RTE_RELATION)
+                    continue;
+
+                pubrelids = lappend_oid(pubrelids, rte->relid);
+            }
+        }
+
         tupDesc = cstate->queryDesc->tupDesc;
     }
 
+    /* Check if publication privileges allow reading from the relations. */
+    if (pubrelids)
+        check_publication_privileges(pubrelids);
+
     /* Generate or convert list of attributes to process */
     cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
 
@@ -1287,3 +1322,82 @@ CreateCopyDestReceiver(void)
 
     return (DestReceiver *) self;
 }
+
+static void
+check_publication_privileges(List *relids)
+{
+    ListCell    *lc;
+    StringInfoData    bufOids;
+    StringInfoData    bufQuery;
+    int        i;
+    List    *pubids = NIL;
+
+    if (list_length(relids) == 0)
+        return;
+
+    /* Construct a list of the OIDs */
+    initStringInfo(&bufOids);
+    foreach(lc, relids)
+    {
+        Oid        oid = lfirst_oid(lc);
+
+        if (bufOids.len > 0)
+            appendStringInfoString(&bufOids, ", ");
+        appendStringInfo(&bufOids, "%u", oid);
+    }
+
+    /*
+     * Construct the query to retrieve the publication names. (Similar query
+     * is in fetch_remote_table_info()).
+     */
+    initStringInfo(&bufQuery);
+    appendStringInfo(&bufQuery,
+                     "SELECT DISTINCT p.oid"
+                     "  FROM pg_publication p"
+                     "  LEFT OUTER JOIN pg_publication_rel pr"
+                     "       ON p.oid = pr.prpubid, "
+                     "  LATERAL pg_get_publication_tables(p.pubname) gpt"
+                     " WHERE pr.prrelid IN (%s) AND gpt.relid IN (%s)",
+                     bufOids.data, bufOids.data);
+    list_free(relids);
+
+    /* Run the query to get the publication names. */
+    if (SPI_connect() != SPI_OK_CONNECT)
+        elog(ERROR, "SPI_connect failed");
+
+    if (SPI_exec(bufQuery.data, 0) != SPI_OK_SELECT)
+        elog(ERROR, "SPI_exec failed: %s", bufQuery.data);
+
+    for (i = 0; i < SPI_processed; i++)
+    {
+        Datum    oidD;
+        bool    isnull;
+
+        oidD = SPI_getbinval(SPI_tuptable->vals[i],
+                             SPI_tuptable->tupdesc,
+                             1,
+                             &isnull);
+        Assert(!isnull);
+        pubids = lappend_oid(pubids, DatumGetObjectId(oidD));
+    }
+
+    /* Finally, check the privileges. */
+    foreach(lc, pubids)
+    {
+        Oid        oid = lfirst_oid(lc);
+
+        if (pg_publication_aclcheck(oid, GetUserId(), ACL_USAGE) != ACLCHECK_OK)
+        {
+            char    *pubname = get_publication_name(oid, false);
+
+            ereport(ERROR,
+                    (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                     errmsg("permission denied for publication %s",
+                            pubname)));
+        }
+    }
+    list_free(pubids);
+
+    if (SPI_finish() != SPI_OK_FINISH)
+        elog(ERROR, "SPI_finish failed");
+}
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index a8b75eb1be..2777874d09 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -799,6 +799,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
         BoolGetDatum(pubactions.pubtruncate);
     values[Anum_pg_publication_pubviaroot - 1] =
         BoolGetDatum(publish_via_partition_root);
+    values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
+    nulls[Anum_pg_publication_pubacl - 1] = true;
 
     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 737bd2d06d..7eddd1073a 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -7635,6 +7635,14 @@ privilege_target:
                     n->objs = $2;
                     $$ = n;
                 }
+            | PUBLICATION name_list
+                {
+                    PrivTarget *n = (PrivTarget *) palloc(sizeof(PrivTarget));
+                    n->targtype = ACL_TARGET_OBJECT;
+                    n->objtype = OBJECT_PUBLICATION;
+                    n->objs = $2;
+                    $$ = n;
+                }
             | SCHEMA name_list
                 {
                     PrivTarget *n = (PrivTarget *) palloc(sizeof(PrivTarget));
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 2ecaa5b907..ecfa0aa857 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -20,12 +20,14 @@
 #include "commands/defrem.h"
 #include "executor/executor.h"
 #include "fmgr.h"
+#include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "optimizer/optimizer.h"
 #include "replication/logical.h"
 #include "replication/logicalproto.h"
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
+#include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
@@ -2093,8 +2095,15 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
              * We'll use the top-most relid across all publications. Also
              * track the ancestor level for this publication.
              */
-            Oid            pub_relid = relid;
-            int            ancestor_level = 0;
+            Oid    pub_relid = relid;
+            int    ancestor_level = 0;
+
+            /* Check if we have the usage privilege on the publication. */
+            if (pg_publication_aclcheck(pub->oid, GetUserId(), ACL_USAGE) != ACLCHECK_OK)
+                ereport(ERROR,
+                        (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                         errmsg("permission denied for publication %s",
+                                pub->name)));
 
             /*
              * If this is a FOR ALL TABLES publication, pick the partition
diff --git a/src/backend/utils/adt/acl.c b/src/backend/utils/adt/acl.c
index 4fac402e5b..3e8877936e 100644
--- a/src/backend/utils/adt/acl.c
+++ b/src/backend/utils/adt/acl.c
@@ -806,6 +806,10 @@ acldefault(ObjectType objtype, Oid ownerId)
             world_default = ACL_NO_RIGHTS;
             owner_default = ACL_ALL_RIGHTS_PARAMETER_ACL;
             break;
+        case OBJECT_PUBLICATION:
+            world_default = ACL_USAGE;
+            owner_default = ACL_ALL_RIGHTS_PUBLICATION;
+            break;
         default:
             elog(ERROR, "unrecognized objtype: %d", (int) objtype);
             world_default = ACL_NO_RIGHTS;    /* keep compiler quiet */
@@ -903,6 +907,9 @@ acldefault_sql(PG_FUNCTION_ARGS)
         case 'T':
             objtype = OBJECT_TYPE;
             break;
+        case 'P':
+            objtype = OBJECT_PUBLICATION;
+            break;
         default:
             elog(ERROR, "unrecognized objtype abbreviation: %c", objtypec);
     }
diff --git a/src/bin/pg_dump/dumputils.c b/src/bin/pg_dump/dumputils.c
index 6e501a5413..91f97a1f93 100644
--- a/src/bin/pg_dump/dumputils.c
+++ b/src/bin/pg_dump/dumputils.c
@@ -504,6 +504,8 @@ do { \
         CONVERT_PRIV('r', "SELECT");
         CONVERT_PRIV('w', "UPDATE");
     }
+    else if (strcmp(type, "PUBLICATION") == 0)
+        CONVERT_PRIV('U', "USAGE");
     else
         abort();
 
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index bd9b066e4e..37e8995ac3 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -3898,6 +3898,8 @@ getPublications(Archive *fout, int *numPublications)
     int            i_pubdelete;
     int            i_pubtruncate;
     int            i_pubviaroot;
+    int            i_pubacl;
+    int            i_acldefault;
     int            i,
                 ntups;
 
@@ -3912,27 +3914,32 @@ getPublications(Archive *fout, int *numPublications)
     resetPQExpBuffer(query);
 
     /* Get the publications. */
-    if (fout->remoteVersion >= 130000)
+    if (fout->remoteVersion >= 150000)
         appendPQExpBufferStr(query,
-                             "SELECT p.tableoid, p.oid, p.pubname, "
-                             "p.pubowner, "
-                             "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot "
-                             "FROM pg_publication p");
+                          "SELECT p.tableoid, p.oid, p.pubname, "
+                          "p.pubowner, "
+                          "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot,
p.pubacl,acldefault('P', p.pubowner) AS acldefault "
 
+                          "FROM pg_publication p");
+    else if (fout->remoteVersion >= 130000)
+        appendPQExpBuffer(query,
+                          "SELECT p.tableoid, p.oid, p.pubname, "
+                          "p.pubowner, "
+                          "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot, '{}' AS
pubacl,'{}' AS acldefault "
 
+                          "FROM pg_publication p");
     else if (fout->remoteVersion >= 110000)
         appendPQExpBufferStr(query,
-                             "SELECT p.tableoid, p.oid, p.pubname, "
-                             "p.pubowner, "
-                             "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS
pubviaroot"
 
-                             "FROM pg_publication p");
+                          "SELECT p.tableoid, p.oid, p.pubname, "
+                          "p.pubowner, "
+                          "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot,
'{}'AS pubacl, '{}' AS acldefault "
 
+                          "FROM pg_publication p");
     else
         appendPQExpBufferStr(query,
-                             "SELECT p.tableoid, p.oid, p.pubname, "
-                             "p.pubowner, "
-                             "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS
pubviaroot"
 
-                             "FROM pg_publication p");
+                          "SELECT p.tableoid, p.oid, p.pubname, "
+                          "p.pubowner, "
+                          "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS
pubviaroot,'{}' AS pubacl, '{}' AS acldefault "
 
+                          "FROM pg_publication p");
 
     res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
-
     ntups = PQntuples(res);
 
     i_tableoid = PQfnumber(res, "tableoid");
@@ -3945,6 +3952,8 @@ getPublications(Archive *fout, int *numPublications)
     i_pubdelete = PQfnumber(res, "pubdelete");
     i_pubtruncate = PQfnumber(res, "pubtruncate");
     i_pubviaroot = PQfnumber(res, "pubviaroot");
+    i_pubacl = PQfnumber(res, "pubacl");
+    i_acldefault = PQfnumber(res, "acldefault");
 
     pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
 
@@ -3969,6 +3978,11 @@ getPublications(Archive *fout, int *numPublications)
             (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0);
         pubinfo[i].pubviaroot =
             (strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0);
+        pubinfo[i].dacl.acl = pg_strdup(PQgetvalue(res, i, i_pubacl));
+        pubinfo[i].dacl.acldefault = pg_strdup(PQgetvalue(res, i, i_acldefault));
+        pubinfo[i].dacl.privtype = 0;
+        pubinfo[i].dacl.initprivs = NULL;
+        pubinfo[i].dobj.components |= DUMP_COMPONENT_ACL;
 
         /* Decide whether we want to dump it */
         selectDumpableObject(&(pubinfo[i].dobj), fout);
@@ -4072,6 +4086,11 @@ dumpPublication(Archive *fout, const PublicationInfo *pubinfo)
                      NULL, pubinfo->rolname,
                      pubinfo->dobj.catId, 0, pubinfo->dobj.dumpId);
 
+    if (pubinfo->dobj.dump & DUMP_COMPONENT_ACL)
+        dumpACL(fout, pubinfo->dobj.dumpId, InvalidDumpId, "PUBLICATION",
+                pg_strdup(fmtId(pubinfo->dobj.name)), NULL, NULL,
+                pubinfo->rolname, &pubinfo->dacl);
+
     destroyPQExpBuffer(delq);
     destroyPQExpBuffer(query);
     free(qpubname);
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 427f5d45f6..ddf1b1d11a 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -613,6 +613,7 @@ typedef struct _policyInfo
 typedef struct _PublicationInfo
 {
     DumpableObject dobj;
+    DumpableAcl dacl;
     const char *rolname;
     bool        puballtables;
     bool        pubinsert;
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 4c45e4747a..33e03f1d64 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -3874,6 +3874,7 @@ psql_completion(const char *text, int start, int end)
                                             "LARGE OBJECT",
                                             "PARAMETER",
                                             "PROCEDURE",
+                                            "PUBLICATION",
                                             "ROUTINE",
                                             "SCHEMA",
                                             "SEQUENCE",
@@ -3911,6 +3912,8 @@ psql_completion(const char *text, int start, int end)
             COMPLETE_WITH_QUERY(Query_for_list_of_languages);
         else if (TailMatches("PROCEDURE"))
             COMPLETE_WITH_VERSIONED_SCHEMA_QUERY(Query_for_list_of_procedures);
+        else if (TailMatches("PUBLICATION"))
+            COMPLETE_WITH_VERSIONED_QUERY(Query_for_list_of_publications);
         else if (TailMatches("ROUTINE"))
             COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_routines);
         else if (TailMatches("SCHEMA"))
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index ecf5a28e00..89c7ac120f 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -54,6 +54,12 @@ CATALOG(pg_publication,6104,PublicationRelationId)
 
     /* true if partition changes are published using root schema */
     bool        pubviaroot;
+
+#ifdef CATALOG_VARLEN            /* variable-length fields start here */
+    /* NOTE: These fields are not present in a relcache entry's rd_rel field. */
+    /* access permissions */
+    aclitem        pubacl[1] BKI_DEFAULT(_null_);
+#endif
 } FormData_pg_publication;
 
 /* ----------------
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 7e7ad3f7e4..ea344abdc7 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -88,8 +88,8 @@ typedef uint32 AclMode;            /* a bitmask of privilege bits */
 #define ACL_REFERENCES    (1<<5)
 #define ACL_TRIGGER        (1<<6)
 #define ACL_EXECUTE        (1<<7)    /* for functions */
-#define ACL_USAGE        (1<<8)    /* for languages, namespaces, FDWs, and
-                                 * servers */
+#define ACL_USAGE        (1<<8)    /* for languages, namespaces, FDWs, servers
+                                 * and publications */
 #define ACL_CREATE        (1<<9)    /* for namespaces and databases */
 #define ACL_CREATE_TEMP (1<<10) /* for databases */
 #define ACL_CONNECT        (1<<11) /* for databases */
diff --git a/src/include/utils/acl.h b/src/include/utils/acl.h
index 9a4df3a5da..54b74fa56f 100644
--- a/src/include/utils/acl.h
+++ b/src/include/utils/acl.h
@@ -168,6 +168,7 @@ typedef struct ArrayType Acl;
 #define ACL_ALL_RIGHTS_SCHEMA        (ACL_USAGE|ACL_CREATE)
 #define ACL_ALL_RIGHTS_TABLESPACE    (ACL_CREATE)
 #define ACL_ALL_RIGHTS_TYPE            (ACL_USAGE)
+#define ACL_ALL_RIGHTS_PUBLICATION    (ACL_USAGE)
 
 /* operation codes for pg_*_aclmask */
 typedef enum
@@ -253,6 +254,8 @@ extern AclMode pg_parameter_acl_aclmask(Oid acl_oid, Oid roleid,
                                         AclMode mask, AclMaskHow how);
 extern AclMode pg_proc_aclmask(Oid proc_oid, Oid roleid,
                                AclMode mask, AclMaskHow how);
+extern AclMode pg_publication_aclmask(Oid pubid, Oid roleid,
+                                      AclMode mask, AclMaskHow how);
 extern AclMode pg_language_aclmask(Oid lang_oid, Oid roleid,
                                    AclMode mask, AclMaskHow how);
 extern AclMode pg_largeobject_aclmask_snapshot(Oid lobj_oid, Oid roleid,
@@ -284,6 +287,7 @@ extern AclResult pg_parameter_aclcheck(const char *name, Oid roleid,
 extern AclResult pg_parameter_acl_aclcheck(Oid acl_oid, Oid roleid,
                                            AclMode mode);
 extern AclResult pg_proc_aclcheck(Oid proc_oid, Oid roleid, AclMode mode);
+extern AclResult pg_publication_aclcheck(Oid pub_oid, Oid roleid, AclMode mode);
 extern AclResult pg_language_aclcheck(Oid lang_oid, Oid roleid, AclMode mode);
 extern AclResult pg_largeobject_aclcheck_snapshot(Oid lobj_oid, Oid roleid,
                                                   AclMode mode, Snapshot snapshot);
diff --git a/src/test/regress/expected/misc_sanity.out b/src/test/regress/expected/misc_sanity.out
index a57fd142a9..14a551a871 100644
--- a/src/test/regress/expected/misc_sanity.out
+++ b/src/test/regress/expected/misc_sanity.out
@@ -60,7 +60,8 @@ ORDER BY 1, 2;
  pg_index                | indpred       | pg_node_tree
  pg_largeobject          | data          | bytea
  pg_largeobject_metadata | lomacl        | aclitem[]
-(11 rows)
+ pg_publication          | pubacl        | aclitem[]
+(12 rows)
 
 -- system catalogs without primary keys
 --
diff --git a/src/test/subscription/t/027_nosuperuser.pl b/src/test/subscription/t/027_nosuperuser.pl
index 8614bf0458..43f5ffd20c 100644
--- a/src/test/subscription/t/027_nosuperuser.pl
+++ b/src/test/subscription/t/027_nosuperuser.pl
@@ -7,8 +7,10 @@ use warnings;
 use PostgreSQL::Test::Cluster;
 use Test::More;
 
-my ($node_publisher, $node_subscriber, $publisher_connstr, $result, $offset);
+my ($node_publisher, $node_subscriber, $publisher_connstr, $result, $offset,
+    $offset_pub);
 $offset = 0;
+$offset_pub = 0;
 
 sub publish_insert
 {
@@ -103,7 +105,8 @@ $node_publisher->init(allows_streaming => 'logical');
 $node_subscriber->init;
 $node_publisher->start;
 $node_subscriber->start;
-$publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+# Non-super user, so that we can publication privileges.
+$publisher_connstr = $node_publisher->connstr . ' dbname=postgres user=regress_alice';
 my %remainder_a = (
     publisher  => 0,
     subscriber => 1);
@@ -141,6 +144,8 @@ for my $node ($node_publisher, $node_subscriber)
 }
 $node_publisher->safe_psql(
     'postgres', qq(
+ALTER ROLE regress_alice REPLICATION;
+
 SET SESSION AUTHORIZATION regress_alice;
 
 CREATE PUBLICATION alice
@@ -316,4 +321,46 @@ expect_replication("alice.unpartitioned", 2, 23, 25,
     "nosuperuser nobypassrls table owner can replicate delete into unpartitioned despite rls"
 );
 
+# Test publication permissions.
+#
+# First, make sure that the user specified in the subscription is not able to
+# access the data, then do some changes. (By deleting everything we make the
+# following checks simpler.)
+$node_publisher->safe_psql(
+    'postgres', qq(
+REVOKE USAGE ON PUBLICATION alice FROM PUBLIC;
+REVOKE USAGE ON PUBLICATION alice FROM regress_alice;
+
+DELETE FROM alice.unpartitioned;
+));
+# Missing permission should cause error.
+expect_failure("alice.unpartitioned", 2, 23, 25,
+               qr/ERROR: ( [A-Z0-9]+:)? permission denied for publication alice/msi, 0);
+# Check that the missing privilege makes table synchronization fail too.
+$node_subscriber->safe_psql(
+    'postgres', qq(
+SET SESSION AUTHORIZATION regress_admin;
+DROP SUBSCRIPTION admin_sub;
+TRUNCATE TABLE alice.unpartitioned;
+CREATE SUBSCRIPTION admin_sub CONNECTION '$publisher_connstr' PUBLICATION alice;
+));
+# Note that expect_failure() does not wait for the end of the synchronization,
+# so if there was any data on publisher side and if it found its way to the
+# subscriber, the function might still see an empty table. So we only rely on
+# the function to check the error message.
+expect_failure("alice.unpartitioned", 0, '', '',
+               qr/ERROR: ( [A-Z0-9]+:)? permission denied for publication alice/msi, 0);
+# Restore the privilege on the publication.
+$node_publisher->safe_psql(
+    'postgres', qq(
+GRANT USAGE ON PUBLICATION alice TO regress_alice;
+));
+# Wait for synchronization to complete.
+$node_subscriber->wait_for_subscription_sync;
+# The replication should work again now.
+publish_insert("alice.unpartitioned", 1);
+expect_replication("alice.unpartitioned", 1, 1, 1,
+   "unpartitioned is replicated as soon as regress_alic has permissions on alice publication"
+);
+
 done_testing();

pgsql-hackers by date:

Previous
From: John Naylor
Date:
Subject: Re: Incorrect include file order in guc-file.l
Next
From: Ian Lawrence Barwick
Date:
Subject: Re: Improve description of XLOG_RUNNING_XACTS