diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index c9b1d5f..a345e39 100644 *** a/src/backend/access/heap/heapam.c --- b/src/backend/access/heap/heapam.c *************** *** 975,1000 **** relation_openrv(const RangeVar *relation, LOCKMODE lockmode) { Oid relOid; ! /* ! * Check for shared-cache-inval messages before trying to open the ! * relation. This is needed to cover the case where the name identifies a ! * rel that has been dropped and recreated since the start of our ! * transaction: if we don't flush the old syscache entry then we'll latch ! * onto that entry and suffer an error when we do RelationIdGetRelation. ! * Note that relation_open does not need to do this, since a relation's ! * OID never changes. ! * ! * We skip this if asked for NoLock, on the assumption that the caller has ! * already ensured some appropriate lock is held. ! */ ! if (lockmode != NoLock) ! AcceptInvalidationMessages(); ! ! /* Look up the appropriate relation using namespace search */ ! relOid = RangeVarGetRelid(relation, false); /* Let relation_open do the rest */ ! return relation_open(relOid, lockmode); } /* ---------------- --- 975,985 ---- { Oid relOid; ! /* Look up and lock the appropriate relation using namespace search */ ! relOid = RangeVarLockRelid(relation, lockmode, false); /* Let relation_open do the rest */ ! return relation_open(relOid, NoLock); } /* ---------------- *************** *** 1012,1041 **** relation_openrv_extended(const RangeVar *relation, LOCKMODE lockmode, { Oid relOid; ! /* ! * Check for shared-cache-inval messages before trying to open the ! * relation. This is needed to cover the case where the name identifies a ! * rel that has been dropped and recreated since the start of our ! * transaction: if we don't flush the old syscache entry then we'll latch ! * onto that entry and suffer an error when we do RelationIdGetRelation. ! * Note that relation_open does not need to do this, since a relation's ! * OID never changes. ! * ! * We skip this if asked for NoLock, on the assumption that the caller has ! * already ensured some appropriate lock is held. ! */ ! if (lockmode != NoLock) ! AcceptInvalidationMessages(); ! ! /* Look up the appropriate relation using namespace search */ ! relOid = RangeVarGetRelid(relation, missing_ok); /* Return NULL on not-found */ if (!OidIsValid(relOid)) return NULL; /* Let relation_open do the rest */ ! return relation_open(relOid, lockmode); } /* ---------------- --- 997,1011 ---- { Oid relOid; ! /* Look up and lock the appropriate relation using namespace search */ ! relOid = RangeVarLockRelid(relation, lockmode, missing_ok); /* Return NULL on not-found */ if (!OidIsValid(relOid)) return NULL; /* Let relation_open do the rest */ ! return relation_open(relOid, NoLock); } /* ---------------- diff --git a/src/backend/catalog/namespindex ce795a6..f75fcef 100644 *** a/src/backend/catalog/namespace.c --- b/src/backend/catalog/namespace.c *************** *** 44,49 **** --- 44,51 ---- #include "parser/parse_func.h" #include "storage/backendid.h" #include "storage/ipc.h" + #include "storage/lmgr.h" + #include "storage/sinval.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/guc.h" *************** *** 285,290 **** RangeVarGetRelid(const RangeVar *relation, bool failOK) --- 287,372 ---- } /* + * RangeVarLockRelid + * Like RangeVarGetRelid, but simultaneously acquire the specified lock + * on the relation. This works properly in the face of concurrent DDL + * that may drop, create or rename relations. + * + * If the relation is not found and failOK = true, take no lock and return + * InvalidOid. Otherwise, raise an error. + */ + Oid + RangeVarLockRelid(const RangeVar *relation, LOCKMODE lockmode, + bool failOK) + { + int lastCounter; + Oid relOid1, + relOid2; + + /* + * If the caller requested NoLock, it already acquired an appropriate lock + * with LockRelationOid(). In this case, our first search is always + * correct, and we degenerate to RangeVarGetRelid(). + */ + if (lockmode == NoLock) + return RangeVarGetRelid(relation, failOK); + + /* + * By the time we acquire the lock, our RangeVar may denote a different + * relation or no relation at all. In particular, this can happen when + * the lock acquisition blocks on a transaction performing DROP or ALTER + * TABLE RENAME. However, once and while we do hold a lock of any level, + * we can count on the name of the found relation remaining stable. + * + * Even so, DDL activity could cause an object in a schema earlier in the + * search path to mask our original selection undetected. No current lock + * would prevent that. We let the user worry about it, such as by taking + * additional explicit locks. + */ + + /* + * First attempt. Always allow it to fail; RangeVarGetRelid() may hit a + * negative catcache entry for a recently-created relation. Find out by + * making recent DDL effects visible and checking again. Attempting to + * open nonexistent relations is not a hot path in applications, so the + * extra cost is of little concern. + */ + lastCounter = SharedInvalidMessageCounter; + relOid1 = RangeVarGetRelid(relation, true); + if (!OidIsValid(relOid1)) + { + AcceptInvalidationMessages(); + lastCounter = SharedInvalidMessageCounter; + relOid1 = RangeVarGetRelid(relation, failOK); + } + + do + { + /* After the initial precautions above, not-found is final. */ + if (!OidIsValid(relOid1)) + return relOid1; + + /* + * LockRelationOid() also calls AcceptInvalidationMessages() to make + * recent DDL effects visible, if needed. Finding none, we're done. + */ + LockRelationOid(relOid1, lockmode); + if (lastCounter == SharedInvalidMessageCounter) + break; + else + lastCounter = SharedInvalidMessageCounter; + + /* Some invalidation messages arrived; search again. */ + relOid2 = relOid1; + relOid1 = RangeVarGetRelid(relation, failOK); + + /* Done when our RangeVar denotes the same relation we locked. */ + } while (relOid1 != relOid2); + + return relOid1; + } + + /* * RangeVarGetCreationNamespace * Given a RangeVar describing a to-be-created relation, * choose which namespace to create it in. diff --git a/src/backend/storage/ipc/sindex 9ab16b1..8499615 100644 *** a/src/backend/storage/ipc/sinval.c --- b/src/backend/storage/ipc/sinval.c *************** *** 22,27 **** --- 22,30 ---- #include "utils/inval.h" + uint64 SharedInvalidMessageCounter; + + /* * Because backends sitting idle will not be reading sinval events, we * need a way to give an idle backend a swift kick in the rear and make *************** *** 90,95 **** ReceiveSharedInvalidMessages( --- 93,99 ---- { SharedInvalidationMessage *msg = &messages[nextmsg++]; + SharedInvalidMessageCounter++; invalFunction(msg); } *************** *** 106,111 **** ReceiveSharedInvalidMessages( --- 110,116 ---- { /* got a reset message */ elog(DEBUG4, "cache state reset"); + SharedInvalidMessageCounter++; resetFunction(); break; /* nothing more to do */ } *************** *** 118,123 **** ReceiveSharedInvalidMessages( --- 123,129 ---- { SharedInvalidationMessage *msg = &messages[nextmsg++]; + SharedInvalidMessageCounter++; invalFunction(msg); } diff --git a/src/backend/storage/lmgr/lindex 859b385..90b2ecc 100644 *** a/src/backend/storage/lmgr/lmgr.c --- b/src/backend/storage/lmgr/lmgr.c *************** *** 81,90 **** LockRelationOid(Oid relid, LOCKMODE lockmode) /* * Now that we have the lock, check for invalidation messages, so that we * will update or flush any stale relcache entry before we try to use it. ! * We can skip this in the not-uncommon case that we already had the same ! * type of lock being requested, since then no one else could have ! * modified the relcache entry in an undesirable way. (In the case where ! * our own xact modifies the rel, the relcache update happens via * CommandCounterIncrement, not here.) */ if (res != LOCKACQUIRE_ALREADY_HELD) --- 81,91 ---- /* * Now that we have the lock, check for invalidation messages, so that we * will update or flush any stale relcache entry before we try to use it. ! * RangeVarLockRelid() specifically relies on us for this. We can skip ! * this in the not-uncommon case that we already had the same type of lock ! * being requested, since then no one else could have modified the ! * relcache entry in an undesirable way. (In the case where our own xact ! * modifies the rel, the relcache update happens via * CommandCounterIncrement, not here.) */ if (res != LOCKACQUIRE_ALREADY_HELD) diff --git a/src/include/catalog/namesindex 7e1e194..da1ddfd 100644 *** a/src/include/catalog/namespace.h --- b/src/include/catalog/namespace.h *************** *** 15,20 **** --- 15,21 ---- #define NAMESPACE_H #include "nodes/primnodes.h" + #include "storage/lock.h" /* *************** *** 48,53 **** typedef struct OverrideSearchPath --- 49,56 ---- extern Oid RangeVarGetRelid(const RangeVar *relation, bool failOK); + extern Oid RangeVarLockRelid(const RangeVar *relation, LOCKMODE lockmode, + bool failOK); extern Oid RangeVarGetCreationNamespace(const RangeVar *newRelation); extern Oid RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation); extern void RangeVarAdjustRelationPersistence(RangeVar *newRelation, Oid nspid); diff --git a/src/include/storage/sinvaindex e9ce025..aba474d 100644 *** a/src/include/storage/sinval.h --- b/src/include/storage/sinval.h *************** *** 116,121 **** typedef union --- 116,125 ---- } SharedInvalidationMessage; + /* Counter of messages processed; don't worry about overflow. */ + extern uint64 SharedInvalidMessageCounter; + + extern void SendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n); extern void ReceiveSharedInvalidMessages(