diff --git a/doc/src/sgml/ref/initdb.sgml b/doc/src/sgml/ref/initdb.sgml
index 4e339ec..ac460e9 100644
--- a/doc/src/sgml/ref/initdb.sgml
+++ b/doc/src/sgml/ref/initdb.sgml
@@ -152,6 +152,19 @@ PostgreSQL documentation
+
+
+
+
+ This option enables the shared catalog tables security, by adding
+ row level security policies on all eligible shared catalog tables.
+ With this option, multi-tenancy in PostgreSQL is supported at
+ database level.
+
+
+
+
+
diff --git a/src/backend/commands/policy.c b/src/backend/commands/policy.c
index bb735ac..820261e 100644
--- a/src/backend/commands/policy.c
+++ b/src/backend/commands/policy.c
@@ -23,7 +23,16 @@
#include "catalog/namespace.h"
#include "catalog/objectaccess.h"
#include "catalog/pg_authid.h"
+#include "catalog/pg_auth_members.h"
+#include "catalog/pg_database.h"
+#include "catalog/pg_db_role_setting.h"
+#include "catalog/pg_pltemplate.h"
#include "catalog/pg_policy.h"
+#include "catalog/pg_replication_origin.h"
+#include "catalog/pg_shdepend.h"
+#include "catalog/pg_shdescription.h"
+#include "catalog/pg_shseclabel.h"
+#include "catalog/pg_tablespace.h"
#include "catalog/pg_type.h"
#include "commands/policy.h"
#include "miscadmin.h"
@@ -36,6 +45,7 @@
#include "rewrite/rewriteManip.h"
#include "rewrite/rowsecurity.h"
#include "storage/lock.h"
+#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/array.h"
#include "utils/builtins.h"
@@ -46,11 +56,18 @@
#include "utils/rel.h"
#include "utils/syscache.h"
+#define CATALOG_POLICY_STRING_SIZE 65536
+
static void RangeVarCallbackForPolicy(const RangeVar *rv,
Oid relid, Oid oldrelid, void *arg);
static char parse_policy_command(const char *cmd_name);
static Datum *policy_role_list_to_array(List *roles, int *num_roles);
+static bool get_catalog_policy_string(Oid relationid, Form_pg_class pg_class_tuple, char *buf);
+
+/* variable to identify whether catalog row security is built is in progress or not? */
+bool CatalogRowSecurityBuiltInProgress = false;
+
/*
* Callback to RangeVarGetRelidExtended().
*
@@ -195,6 +212,16 @@ RelationBuildRowSecurity(Relation relation)
RowSecurityDesc *volatile rsdesc = NULL;
/*
+ * Build the row security descriptor of a relation, once all the critical
+ * relations are built.
+ */
+ if (!criticalRelcachesBuilt || !criticalSharedRelcachesBuilt)
+ return;
+
+ if (IsSystemRelation(relation) && CatalogRowSecurityBuiltInProgress)
+ return;
+
+ /*
* Create a memory context to hold everything associated with this
* relation's row security policy. This makes it easy to clean up during
* a relcache flush.
@@ -211,124 +238,194 @@ RelationBuildRowSecurity(Relation relation)
*/
PG_TRY();
{
- Relation catalog;
- ScanKeyData skey;
- SysScanDesc sscan;
- HeapTuple tuple;
-
rsdesc = MemoryContextAllocZero(rscxt, sizeof(RowSecurityDesc));
rsdesc->rscxt = rscxt;
- catalog = heap_open(PolicyRelationId, AccessShareLock);
-
- ScanKeyInit(&skey,
- Anum_pg_policy_polrelid,
- BTEqualStrategyNumber, F_OIDEQ,
- ObjectIdGetDatum(RelationGetRelid(relation)));
-
- sscan = systable_beginscan(catalog, PolicyPolrelidPolnameIndexId, true,
- NULL, 1, &skey);
-
- /*
- * Loop through the row level security policies for this relation, if
- * any.
- */
- while (HeapTupleIsValid(tuple = systable_getnext(sscan)))
+ if (IsSystemRelation(relation))
{
- Datum value_datum;
- char cmd_value;
- Datum roles_datum;
- char *qual_value;
- Expr *qual_expr;
- char *with_check_value;
- Expr *with_check_qual;
- char *policy_name_value;
- bool isnull;
RowSecurityPolicy *policy;
+ Datum *role_oids;
+ int nitems = 0;
+ ArrayType *role_ids;
+ Node *qual_expr;
+ List *raw_parsetree_list;
+ ListCell *list_item;
+ ParseState *qual_pstate;
+ RangeTblEntry *rte;
+ CreatePolicyStmt *stmt;
+ char buf[CATALOG_POLICY_STRING_SIZE];
- /*
- * Note: all the pass-by-reference data we collect here is either
- * still stored in the tuple, or constructed in the caller's
- * short-lived memory context. We must copy it into rscxt
- * explicitly below.
- */
+ CatalogRowSecurityBuiltInProgress = true;
+ get_catalog_policy_string(relation->rd_id, relation->rd_rel, buf);
- /* Get policy command */
- value_datum = heap_getattr(tuple, Anum_pg_policy_polcmd,
- RelationGetDescr(catalog), &isnull);
- Assert(!isnull);
- cmd_value = DatumGetChar(value_datum);
-
- /* Get policy name */
- value_datum = heap_getattr(tuple, Anum_pg_policy_polname,
- RelationGetDescr(catalog), &isnull);
- Assert(!isnull);
- policy_name_value = NameStr(*(DatumGetName(value_datum)));
-
- /* Get policy roles */
- roles_datum = heap_getattr(tuple, Anum_pg_policy_polroles,
- RelationGetDescr(catalog), &isnull);
- /* shouldn't be null, but initdb doesn't mark it so, so check */
- if (isnull)
- elog(ERROR, "unexpected null value in pg_policy.polroles");
-
- /* Get policy qual */
- value_datum = heap_getattr(tuple, Anum_pg_policy_polqual,
- RelationGetDescr(catalog), &isnull);
- if (!isnull)
- {
- qual_value = TextDatumGetCString(value_datum);
- qual_expr = (Expr *) stringToNode(qual_value);
- }
- else
- qual_expr = NULL;
+ role_oids = policy_role_list_to_array(NULL, &nitems);
+ role_ids = construct_array(role_oids, nitems, OIDOID,
+ sizeof(Oid), true, 'i');
- /* Get WITH CHECK qual */
- value_datum = heap_getattr(tuple, Anum_pg_policy_polwithcheck,
- RelationGetDescr(catalog), &isnull);
- if (!isnull)
+ raw_parsetree_list = pg_parse_query(buf);
+ Assert(list_length(raw_parsetree_list) == 1);
+
+ foreach(list_item, raw_parsetree_list)
{
- with_check_value = TextDatumGetCString(value_datum);
- with_check_qual = (Expr *) stringToNode(with_check_value);
+ Node *parsetree;
+
+ parsetree = (Node *) lfirst(list_item);
+ stmt = (CreatePolicyStmt *) parsetree;
}
- else
- with_check_qual = NULL;
+
+ qual_pstate = make_parsestate(NULL);
+ rte = addRangeTableEntryForRelation(qual_pstate, relation,
+ NULL, false, false);
+
+ addRTEtoQuery(qual_pstate, rte, false, true, true);
+ qual_expr = transformWhereClause(qual_pstate,
+ stmt->qual,
+ EXPR_KIND_POLICY,
+ "POLICY");
+
+ /* Fix up collation information */
+ assign_expr_collations(qual_pstate, qual_expr);
/* Now copy everything into the cache context */
MemoryContextSwitchTo(rscxt);
policy = palloc0(sizeof(RowSecurityPolicy));
- policy->policy_name = pstrdup(policy_name_value);
- policy->polcmd = cmd_value;
- policy->roles = DatumGetArrayTypePCopy(roles_datum);
+ policy->policy_name = pstrdup(stmt->policy_name);
+ policy->polcmd = ACL_SELECT_CHR;
+ policy->roles = DatumGetArrayTypePCopy(role_ids);
policy->qual = copyObject(qual_expr);
- policy->with_check_qual = copyObject(with_check_qual);
- policy->hassublinks = checkExprHasSubLink((Node *) qual_expr) ||
- checkExprHasSubLink((Node *) with_check_qual);
+ policy->with_check_qual = NULL;
+ policy->hassublinks = checkExprHasSubLink((Node *) qual_expr);
rsdesc->policies = lcons(policy, rsdesc->policies);
MemoryContextSwitchTo(oldcxt);
-
- /* clean up some (not all) of the junk ... */
- if (qual_expr != NULL)
- pfree(qual_expr);
- if (with_check_qual != NULL)
- pfree(with_check_qual);
}
+ else
+ {
+ Relation catalog;
+ ScanKeyData skey;
+ SysScanDesc sscan;
+ HeapTuple tuple;
+
+ catalog = heap_open(PolicyRelationId, AccessShareLock);
- systable_endscan(sscan);
- heap_close(catalog, AccessShareLock);
+ ScanKeyInit(&skey,
+ Anum_pg_policy_polrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(relation)));
+
+ sscan = systable_beginscan(catalog, PolicyPolrelidPolnameIndexId, true,
+ NULL, 1, &skey);
+
+ /*
+ * Loop through the row level security policies for this relation,
+ * if any.
+ */
+ while (HeapTupleIsValid(tuple = systable_getnext(sscan)))
+ {
+ Datum value_datum;
+ char cmd_value;
+ Datum roles_datum;
+ char *qual_value;
+ Expr *qual_expr;
+ char *with_check_value;
+ Expr *with_check_qual;
+ char *policy_name_value;
+ bool isnull;
+ RowSecurityPolicy *policy;
+
+ /*
+ * Note: all the pass-by-reference data we collect here is
+ * either still stored in the tuple, or constructed in the
+ * caller's short-lived memory context. We must copy it into
+ * rscxt explicitly below.
+ */
+
+ /* Get policy command */
+ value_datum = heap_getattr(tuple, Anum_pg_policy_polcmd,
+ RelationGetDescr(catalog), &isnull);
+ Assert(!isnull);
+ cmd_value = DatumGetChar(value_datum);
+
+ /* Get policy name */
+ value_datum = heap_getattr(tuple, Anum_pg_policy_polname,
+ RelationGetDescr(catalog), &isnull);
+ Assert(!isnull);
+ policy_name_value = NameStr(*(DatumGetName(value_datum)));
+
+ /* Get policy roles */
+ roles_datum = heap_getattr(tuple, Anum_pg_policy_polroles,
+ RelationGetDescr(catalog), &isnull);
+ /* shouldn't be null, but initdb doesn't mark it so, so check */
+ if (isnull)
+ elog(ERROR, "unexpected null value in pg_policy.polroles");
+
+ /* Get policy qual */
+ value_datum = heap_getattr(tuple, Anum_pg_policy_polqual,
+ RelationGetDescr(catalog), &isnull);
+ if (!isnull)
+ {
+ qual_value = TextDatumGetCString(value_datum);
+ qual_expr = (Expr *) stringToNode(qual_value);
+ }
+ else
+ qual_expr = NULL;
+
+ /* Get WITH CHECK qual */
+ value_datum = heap_getattr(tuple, Anum_pg_policy_polwithcheck,
+ RelationGetDescr(catalog), &isnull);
+ if (!isnull)
+ {
+ with_check_value = TextDatumGetCString(value_datum);
+ with_check_qual = (Expr *) stringToNode(with_check_value);
+ }
+ else
+ with_check_qual = NULL;
+
+ /* Now copy everything into the cache context */
+ MemoryContextSwitchTo(rscxt);
+
+ policy = palloc0(sizeof(RowSecurityPolicy));
+ policy->policy_name = pstrdup(policy_name_value);
+ policy->polcmd = cmd_value;
+ policy->roles = DatumGetArrayTypePCopy(roles_datum);
+ policy->qual = copyObject(qual_expr);
+ policy->with_check_qual = copyObject(with_check_qual);
+ policy->hassublinks = checkExprHasSubLink((Node *) qual_expr) ||
+ checkExprHasSubLink((Node *) with_check_qual);
+
+ rsdesc->policies = lcons(policy, rsdesc->policies);
+
+ MemoryContextSwitchTo(oldcxt);
+
+ /* clean up some (not all) of the junk ... */
+ if (qual_expr != NULL)
+ pfree(qual_expr);
+ if (with_check_qual != NULL)
+ pfree(with_check_qual);
+ }
+
+ systable_endscan(sscan);
+ heap_close(catalog, AccessShareLock);
+ }
}
PG_CATCH();
{
/* Delete rscxt, first making sure it isn't active */
MemoryContextSwitchTo(oldcxt);
MemoryContextDelete(rscxt);
+
+ if (IsSystemRelation(relation))
+ CatalogRowSecurityBuiltInProgress = false;
+
PG_RE_THROW();
}
PG_END_TRY();
+ if (IsSystemRelation(relation))
+ CatalogRowSecurityBuiltInProgress = false;
+
/* Success --- attach the policy descriptor to the relcache entry */
relation->rd_rsdesc = rsdesc;
}
@@ -410,6 +507,73 @@ RemovePolicyById(Oid policy_id)
heap_close(pg_policy_rel, RowExclusiveLock);
}
+static bool
+get_catalog_policy_string(Oid relationid, Form_pg_class pg_class_tuple, char *buf)
+{
+ bool policy_exist = true;
+
+ switch (relationid)
+ {
+ case AuthIdRelationId:
+ sprintf(buf, "create policy %s_read_own_data on %s for select using"
+ " (pg_has_role(oid, 'any'))",
+ pg_class_tuple->relname.data, pg_class_tuple->relname.data);
+ break;
+ case AuthMemRelationId:
+ sprintf(buf, "create policy %s_read_own_data on %s for select using"
+ " (pg_has_role(roleid, 'any') AND pg_has_role(member, 'any'))",
+ pg_class_tuple->relname.data, pg_class_tuple->relname.data);
+ break;
+ case DatabaseRelationId:
+ sprintf(buf, "create policy %s_read_own_data on %s for select using"
+ " ((oid < 16384) OR has_database_privilege(oid,'any'))",
+ pg_class_tuple->relname.data, pg_class_tuple->relname.data);
+ break;
+ case DbRoleSettingRelationId:
+ sprintf(buf, "create policy %s_read_own_data on %s for select using"
+ " (pg_has_role(setrole, 'any') AND has_database_privilege(setdatabase,'any'))",
+ pg_class_tuple->relname.data, pg_class_tuple->relname.data);
+ break;
+ case PLTemplateRelationId:
+ policy_exist = false;
+ break;
+ case ReplicationOriginRelationId:
+ policy_exist = false;
+ break;
+ case SharedDependRelationId:
+ sprintf(buf, "create policy %s_read_own_data on %s for select using"
+ " ((classid = (select oid from pg_class where relname = 'pg_database') AND has_database_privilege(objid, 'any'))"
+ " OR (classid = (select oid from pg_class where relname = 'pg_authid') AND pg_has_role(objid, 'any'))"
+ " OR (classid = (select oid from pg_class where relname = 'pg_tablespace') AND has_tablespace_privilege(objid, 'any')))",
+ pg_class_tuple->relname.data, pg_class_tuple->relname.data);
+ break;
+ case SharedDescriptionRelationId:
+ sprintf(buf, "create policy %s_read_own_data on %s for select using"
+ " ((classoid = (select oid from pg_class where relname = 'pg_database') AND has_database_privilege(objoid, 'any'))"
+ " OR (classoid = (select oid from pg_class where relname = 'pg_authid') AND pg_has_role(objoid, 'any'))"
+ " OR (classoid = (select oid from pg_class where relname = 'pg_tablespace') AND has_tablespace_privilege(objoid, 'any')))",
+ pg_class_tuple->relname.data, pg_class_tuple->relname.data);
+ break;
+ case SharedSecLabelRelationId:
+ sprintf(buf, "create policy %s_read_own_data on %s for select using"
+ " ((classoid = (select oid from pg_class where relname = 'pg_database') AND has_database_privilege(objoid, 'any'))"
+ " OR (classoid = (select oid from pg_class where relname = 'pg_authid') AND pg_has_role(objoid, 'any'))"
+ " OR (classoid = (select oid from pg_class where relname = 'pg_tablespace') AND has_tablespace_privilege(objoid, 'any')))",
+ pg_class_tuple->relname.data, pg_class_tuple->relname.data);
+ break;
+ case TableSpaceRelationId:
+ sprintf(buf, "create policy %s_read_own_data on %s for select using"
+ " ((oid < 16384) OR has_tablespace_privilege(oid, 'any'))",
+ pg_class_tuple->relname.data, pg_class_tuple->relname.data);
+ break;
+ default:
+ policy_exist = false;
+ break;
+ }
+
+ return policy_exist;
+}
+
/*
* RemoveRoleFromObjectPolicy -
* remove a role from a policy by its OID. If the role is not a member of
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 0b4a334..5b00a8d 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -3442,6 +3442,16 @@ ATRewriteCatalogs(List **wqueue, LOCKMODE lockmode)
{
AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
+ /*
+ * ALTER table on sytem catalog tables is possible only when user
+ * specifies CATALOG SECURITY on system catalog tables. To avoid an
+ * error in the AlterTableCreateToastTable function for system catalog
+ * tables, the system catalog tables are ignored for the toast table
+ * creation.
+ */
+ if (!IsUnderPostmaster && IsSharedRelation(tab->relid))
+ continue;
+
if (tab->relkind == RELKIND_RELATION ||
tab->relkind == RELKIND_MATVIEW)
AlterTableCreateToastTable(tab->relid, (Datum) 0, lockmode);
diff --git a/src/backend/utils/misc/rls.c b/src/backend/utils/misc/rls.c
index c33f29e..4d6c88d 100644
--- a/src/backend/utils/misc/rls.c
+++ b/src/backend/utils/misc/rls.c
@@ -58,10 +58,6 @@ check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
bool relforcerowsecurity;
bool amowner;
- /* Nothing to do for built-in relations */
- if (relid < (Oid) FirstNormalObjectId)
- return RLS_NONE;
-
/* Fetch relation's relrowsecurity and relforcerowsecurity flags */
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
if (!HeapTupleIsValid(tuple))
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index 35e39ce..13a22ee 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -136,6 +136,7 @@ static bool do_sync = true;
static bool sync_only = false;
static bool show_setting = false;
static bool data_checksums = false;
+static bool shared_catalog_security = false;
static char *xlog_dir = "";
@@ -190,6 +191,7 @@ static char *authwarning = NULL;
*/
static const char *boot_options = "-F";
static const char *backend_options = "--single -F -O -j -c search_path=pg_catalog -c exit_on_error=true";
+static const char *catalog_security_options = "--single -F -O -j -c search_path=pg_catalog -c exit_on_error=true -c allow_system_table_mods=true";
static const char *const subdirs[] = {
"global",
@@ -259,6 +261,7 @@ static void setup_dictionary(FILE *cmdfd);
static void setup_privileges(FILE *cmdfd);
static void set_info_version(void);
static void setup_schema(FILE *cmdfd);
+static void setup_shared_catalog_security(FILE *cmdfd);
static void load_plpgsql(FILE *cmdfd);
static void vacuum_db(FILE *cmdfd);
static void make_template0(FILE *cmdfd);
@@ -2082,6 +2085,59 @@ setup_schema(FILE *cmdfd)
}
/*
+ * setup shared catalog security by defining policies
+ */
+static void
+setup_shared_catalog_security(FILE *cmdfd)
+{
+ const char **line;
+ static const char *pg_shared_catalog_security_setup[] = {
+ /* AuthMemRelationId */
+ "alter table pg_auth_members enable row level security;\n",
+
+ /* AuthIdRelationId */
+ "alter table pg_authid enable row level security;\n",
+
+ /* DatabaseRelationId */
+ "alter table pg_database enable row level security;\n",
+
+ /* DbRoleSettingRelationId */
+ "alter table pg_database enable row level security;\n",
+
+ /* PLTemplateRelationId */
+
+ /*
+ * Currently there is no policy needed for this table, so leave it as
+ * it is.
+ */
+
+ /* ReplicationOriginRelationId */
+
+ /*
+ * Currently there is no policy needed for this table, so leave it as
+ * it is.
+ */
+
+ /* SharedDependRelationId */
+ "alter table pg_shdepend enable row level security;\n",
+
+ /* SharedDescriptionRelationId */
+ "alter table pg_shdescription enable row level security;\n",
+
+ /* SharedSecLabelRelationId */
+ "alter table pg_shseclabel enable row level security;\n",
+
+ /* TableSpaceRelationId */
+ "alter table pg_tablespace enable row level security;\n",
+
+ NULL
+ };
+
+ for (line = pg_shared_catalog_security_setup; *line != NULL; line++)
+ PG_CMD_PUTS(*line);
+}
+
+/*
* load PL/pgsql server-side language
*/
static void
@@ -2536,6 +2592,8 @@ usage(const char *progname)
printf(_("\nLess commonly used options:\n"));
printf(_(" -d, --debug generate lots of debugging output\n"));
printf(_(" -k, --data-checksums use data page checksums\n"));
+ printf(_(" -C, --shared-catalog-security\n"
+ " use shared catalog security\n"));
printf(_(" -L DIRECTORY where to find the input files\n"));
printf(_(" -n, --noclean do not clean up after errors\n"));
printf(_(" -N, --nosync do not wait for changes to be written safely to disk\n"));
@@ -3073,6 +3131,10 @@ initialize_data_directory(void)
{
PG_CMD_DECL;
int i;
+ const char *options = backend_options;
+
+ if (shared_catalog_security)
+ options = catalog_security_options;
setup_signals();
@@ -3121,7 +3183,7 @@ initialize_data_directory(void)
snprintf(cmd, sizeof(cmd),
"\"%s\" %s template1 >%s",
- backend_exec, backend_options,
+ backend_exec, options,
DEVNULL);
PG_CMD_OPEN;
@@ -3146,6 +3208,9 @@ initialize_data_directory(void)
setup_schema(cmdfd);
+ if (shared_catalog_security)
+ setup_shared_catalog_security(cmdfd);
+
load_plpgsql(cmdfd);
vacuum_db(cmdfd);
@@ -3190,6 +3255,7 @@ main(int argc, char *argv[])
{"sync-only", no_argument, NULL, 'S'},
{"xlogdir", required_argument, NULL, 'X'},
{"data-checksums", no_argument, NULL, 'k'},
+ {"shared-catalog-security", no_argument, NULL, 'C'},
{NULL, 0, NULL, 0}
};
@@ -3230,7 +3296,7 @@ main(int argc, char *argv[])
/* process command-line options */
- while ((c = getopt_long(argc, argv, "dD:E:kL:nNU:WA:sST:X:", long_options, &option_index)) != -1)
+ while ((c = getopt_long(argc, argv, "dD:E:kCL:nNU:WA:sST:X:", long_options, &option_index)) != -1)
{
switch (c)
{
@@ -3282,6 +3348,9 @@ main(int argc, char *argv[])
case 'k':
data_checksums = true;
break;
+ case 'C':
+ shared_catalog_security = true;
+ break;
case 'L':
share_path = pg_strdup(optarg);
break;