diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index c9b1d5f..a345e39 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -975,26 +975,11 @@ relation_openrv(const RangeVar *relation, LOCKMODE lockmode) { Oid relOid; - /* - * Check for shared-cache-inval messages before trying to open the - * relation. This is needed to cover the case where the name identifies a - * rel that has been dropped and recreated since the start of our - * transaction: if we don't flush the old syscache entry then we'll latch - * onto that entry and suffer an error when we do RelationIdGetRelation. - * Note that relation_open does not need to do this, since a relation's - * OID never changes. - * - * We skip this if asked for NoLock, on the assumption that the caller has - * already ensured some appropriate lock is held. - */ - if (lockmode != NoLock) - AcceptInvalidationMessages(); - - /* Look up the appropriate relation using namespace search */ - relOid = RangeVarGetRelid(relation, false); + /* Look up and lock the appropriate relation using namespace search */ + relOid = RangeVarLockRelid(relation, lockmode, false); /* Let relation_open do the rest */ - return relation_open(relOid, lockmode); + return relation_open(relOid, NoLock); } /* ---------------- @@ -1012,30 +997,15 @@ relation_openrv_extended(const RangeVar *relation, LOCKMODE lockmode, { Oid relOid; - /* - * Check for shared-cache-inval messages before trying to open the - * relation. This is needed to cover the case where the name identifies a - * rel that has been dropped and recreated since the start of our - * transaction: if we don't flush the old syscache entry then we'll latch - * onto that entry and suffer an error when we do RelationIdGetRelation. - * Note that relation_open does not need to do this, since a relation's - * OID never changes. - * - * We skip this if asked for NoLock, on the assumption that the caller has - * already ensured some appropriate lock is held. - */ - if (lockmode != NoLock) - AcceptInvalidationMessages(); - - /* Look up the appropriate relation using namespace search */ - relOid = RangeVarGetRelid(relation, missing_ok); + /* Look up and lock the appropriate relation using namespace search */ + relOid = RangeVarLockRelid(relation, lockmode, missing_ok); /* Return NULL on not-found */ if (!OidIsValid(relOid)) return NULL; /* Let relation_open do the rest */ - return relation_open(relOid, lockmode); + return relation_open(relOid, NoLock); } /* ---------------- diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c index ce795a6..60862a4 100644 --- a/src/backend/catalog/namespace.c +++ b/src/backend/catalog/namespace.c @@ -44,6 +44,8 @@ #include "parser/parse_func.h" #include "storage/backendid.h" #include "storage/ipc.h" +#include "storage/lmgr.h" +#include "storage/sinval.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/guc.h" @@ -285,6 +287,72 @@ RangeVarGetRelid(const RangeVar *relation, bool failOK) } /* + * RangeVarLockRelid + * Like RangeVarGetRelid, but simulatenously acquire the specified lock on + * the relation. This works properly in the face of concurrent DDL that + * may drop, create or rename relations. + * + * If the relation is not found and failOK = true, take no lock and return + * InvalidOid. Otherwise, raise an error. + */ +Oid +RangeVarLockRelid(const RangeVar *relation, LOCKMODE lockmode, + bool failOK) +{ + int lastCounter; + Oid relOid1, + relOid2; + + /* + * First attempt. If the caller requested NoLock, it already acquired an + * appropriate lock and has called AcceptInvalidationMessages() since + * doing so. In this case, our first search is always correct, and we + * degenerate to behave exactly like RangeVarGetRelid(). + */ + lastCounter = SharedInvalidMessageCounter; + relOid1 = RangeVarGetRelid(relation, failOK); + if (lockmode == NoLock) + return relOid1; + + /* + * By the time we acquire the lock, our RangeVar may denote a different + * relation or no relation at all. In particular, this can happen when + * the lock acquisition blocks on a transaction performing DROP or ALTER + * TABLE RENAME. However, once and while we do hold a lock of any level, + * we can count on the name of the found relation remaining stable. + * + * Even so, DDL activity could cause an object in a schema earlier in the + * search path to mask our original selection undetected. No current lock + * would prevent that. We let the user worry about it, such as by taking + * additional explicit locks. + */ + do + { + /* Not-found is always final. */ + if (!OidIsValid(relOid1)) + return relOid1; + + /* + * LockRelationOid also calls AcceptInvalidationMessages() to make + * recent DDL effects visible, if needed. Finding none, we're done. + */ + LockRelationOid(relOid1, lockmode); + if (lastCounter == SharedInvalidMessageCounter) + break; + else + lastCounter = SharedInvalidMessageCounter; + + /* Some invalidation messages arrived; search again. */ + relOid2 = relOid1; + relOid1 = RangeVarGetRelid(relation, failOK); + + /* Done when our RangeVar denotes the same relation we locked. */ + } while (relOid1 != relOid2); + + return relOid1; +} + +/* * RangeVarGetCreationNamespace * Given a RangeVar describing a to-be-created relation, * choose which namespace to create it in. diff --git a/src/backend/storage/ipc/sinval.c b/src/backend/storage/ipc/sinval.c index 9ab16b1..9b1ec82 100644 --- a/src/backend/storage/ipc/sinval.c +++ b/src/backend/storage/ipc/sinval.c @@ -22,6 +22,9 @@ #include "utils/inval.h" +unsigned SharedInvalidMessageCounter; + + /* * Because backends sitting idle will not be reading sinval events, we * need a way to give an idle backend a swift kick in the rear and make @@ -90,6 +93,7 @@ ReceiveSharedInvalidMessages( { SharedInvalidationMessage *msg = &messages[nextmsg++]; + SharedInvalidMessageCounter++; invalFunction(msg); } @@ -106,6 +110,7 @@ ReceiveSharedInvalidMessages( { /* got a reset message */ elog(DEBUG4, "cache state reset"); + SharedInvalidMessageCounter++; resetFunction(); break; /* nothing more to do */ } @@ -118,6 +123,7 @@ ReceiveSharedInvalidMessages( { SharedInvalidationMessage *msg = &messages[nextmsg++]; + SharedInvalidMessageCounter++; invalFunction(msg); } diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c index 859b385..90b2ecc 100644 --- a/src/backend/storage/lmgr/lmgr.c +++ b/src/backend/storage/lmgr/lmgr.c @@ -81,10 +81,11 @@ LockRelationOid(Oid relid, LOCKMODE lockmode) /* * Now that we have the lock, check for invalidation messages, so that we * will update or flush any stale relcache entry before we try to use it. - * We can skip this in the not-uncommon case that we already had the same - * type of lock being requested, since then no one else could have - * modified the relcache entry in an undesirable way. (In the case where - * our own xact modifies the rel, the relcache update happens via + * RangeVarLockRelid() specifically relies on us for this. We can skip + * this in the not-uncommon case that we already had the same type of lock + * being requested, since then no one else could have modified the + * relcache entry in an undesirable way. (In the case where our own xact + * modifies the rel, the relcache update happens via * CommandCounterIncrement, not here.) */ if (res != LOCKACQUIRE_ALREADY_HELD) diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h index 7e1e194..da1ddfd 100644 --- a/src/include/catalog/namespace.h +++ b/src/include/catalog/namespace.h @@ -15,6 +15,7 @@ #define NAMESPACE_H #include "nodes/primnodes.h" +#include "storage/lock.h" /* @@ -48,6 +49,8 @@ typedef struct OverrideSearchPath extern Oid RangeVarGetRelid(const RangeVar *relation, bool failOK); +extern Oid RangeVarLockRelid(const RangeVar *relation, LOCKMODE lockmode, + bool failOK); extern Oid RangeVarGetCreationNamespace(const RangeVar *newRelation); extern Oid RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation); extern void RangeVarAdjustRelationPersistence(RangeVar *newRelation, Oid nspid); diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h index e9ce025..a90aa2d 100644 --- a/src/include/storage/sinval.h +++ b/src/include/storage/sinval.h @@ -116,6 +116,10 @@ typedef union } SharedInvalidationMessage; +/* Counter of messages processed; may overflow. */ +extern unsigned SharedInvalidMessageCounter; + + extern void SendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n); extern void ReceiveSharedInvalidMessages(