diff --git a/doc/src/sgml/ref/lock.sgml b/doc/src/sgml/ref/lock.sgml
index b946eab..5a431b7 100644
--- a/doc/src/sgml/ref/lock.sgml
+++ b/doc/src/sgml/ref/lock.sgml
@@ -46,6 +46,13 @@ LOCK [ TABLE ] [ ONLY ] name [ * ]
+ Automatically updatable views (see )
+ that do not have INSTEAD OF> triggers or INSTEAD>
+ rules are also lockable. When a view is locked, its base relations are
+ also locked recursively with the same lock mode.
+
+
+
When acquiring locks automatically for commands that reference
tables, PostgreSQL always uses the least
restrictive lock mode possible. LOCK TABLE
diff --git a/src/backend/commands/lockcmds.c b/src/backend/commands/lockcmds.c
index 9fe9e02..37fd028 100644
--- a/src/backend/commands/lockcmds.c
+++ b/src/backend/commands/lockcmds.c
@@ -19,15 +19,20 @@
#include "commands/lockcmds.h"
#include "miscadmin.h"
#include "parser/parse_clause.h"
+#include "parser/parsetree.h"
#include "storage/lmgr.h"
#include "utils/acl.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
+#include "rewrite/rewriteHandler.h"
+#include "access/heapam.h"
-static void LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait);
-static AclResult LockTableAclCheck(Oid relid, LOCKMODE lockmode);
+static void LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait, Oid userid);
+static AclResult LockTableAclCheck(Oid relid, LOCKMODE lockmode, Oid userid);
static void RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid,
Oid oldrelid, void *arg);
+static void LockViewRecurse(Oid reloid, Oid root_reloid, LOCKMODE lockmode, bool nowait);
+static void LockViewCheck(Relation view, Query *viewquery);
/*
* LOCK TABLE
@@ -62,8 +67,10 @@ LockTableCommand(LockStmt *lockstmt)
RangeVarCallbackForLockTable,
(void *) &lockstmt->mode);
- if (recurse)
- LockTableRecurse(reloid, lockstmt->mode, lockstmt->nowait);
+ if (get_rel_relkind(reloid) == RELKIND_VIEW)
+ LockViewRecurse(reloid, reloid, lockstmt->mode, lockstmt->nowait);
+ else if (recurse)
+ LockTableRecurse(reloid, lockstmt->mode, lockstmt->nowait, GetUserId());
}
}
@@ -86,15 +93,17 @@ RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, Oid oldrelid,
return; /* woops, concurrently dropped; no permissions
* check */
- /* Currently, we only allow plain tables to be locked */
- if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
+
+ /* Currently, we only allow plain tables or auto-updatable views to be locked */
+ if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE &&
+ relkind != RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a table",
+ errmsg("\"%s\" is not a table or view",
rv->relname)));
/* Check permissions. */
- aclresult = LockTableAclCheck(relid, lockmode);
+ aclresult = LockTableAclCheck(relid, lockmode, GetUserId());
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_CLASS, rv->relname);
}
@@ -107,7 +116,7 @@ RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, Oid oldrelid,
* multiply-inheriting children more than once, but that's no problem.
*/
static void
-LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait)
+LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait, Oid userid)
{
List *children;
ListCell *lc;
@@ -120,7 +129,7 @@ LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait)
AclResult aclresult;
/* Check permissions before acquiring the lock. */
- aclresult = LockTableAclCheck(childreloid, lockmode);
+ aclresult = LockTableAclCheck(childreloid, lockmode, userid);
if (aclresult != ACLCHECK_OK)
{
char *relname = get_rel_name(childreloid);
@@ -157,15 +166,82 @@ LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait)
continue;
}
- LockTableRecurse(childreloid, lockmode, nowait);
+ LockTableRecurse(childreloid, lockmode, nowait, userid);
}
}
/*
+ * Apply LOCK TABLE recursively over a view
+ */
+static void
+LockViewRecurse(Oid reloid, Oid root_reloid, LOCKMODE lockmode, bool nowait)
+{
+ Relation view;
+ Query *viewquery;
+ RangeTblRef *rtr;
+ RangeTblEntry *base_rte;
+ Oid baseoid;
+ AclResult aclresult;
+ char *relname;
+ char relkind;
+
+ view = heap_open(reloid, NoLock);
+ viewquery = get_view_query(view);
+
+ /* Check whether the view is lockable */
+ LockViewCheck(view, viewquery);
+
+ heap_close(view, NoLock);
+
+ Assert(list_length(viewquery->jointree->fromlist) == 1);
+ rtr = linitial_node(RangeTblRef, viewquery->jointree->fromlist);
+ base_rte = rt_fetch(rtr->rtindex, viewquery->rtable);
+ Assert(base_rte->rtekind == RTE_RELATION);
+
+ baseoid = base_rte->relid;
+ relname = get_rel_name(baseoid);
+ relkind = get_rel_relkind(baseoid);
+
+ /* Currently, we only allow plain tables or auto-updatable views to be locked */
+ if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE &&
+ relkind != RELKIND_VIEW)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is not a table or view",
+ relname)));
+
+ /* Check infinite recursion in the view definition */
+ if (baseoid == root_reloid)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("infinite recursion detected in rules for relation \"%s\"",
+ get_rel_name(root_reloid))));
+
+ /* Check permissions with the view owner's priviledge. */
+ aclresult = LockTableAclCheck(baseoid, lockmode, view->rd_rel->relowner);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, ACL_KIND_CLASS, relname);
+
+ /* We have enough rights to lock the relation; do so. */
+ if (!nowait)
+ LockRelationOid(baseoid, lockmode);
+ else if (!ConditionalLockRelationOid(baseoid, lockmode))
+ ereport(ERROR,
+ (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+ errmsg("could not obtain lock on relation \"%s\"",
+ relname)));
+
+ if (relkind == RELKIND_VIEW)
+ LockViewRecurse(baseoid, root_reloid, lockmode, nowait);
+ else if (base_rte->inh)
+ LockTableRecurse(baseoid, lockmode, nowait, view->rd_rel->relowner);
+}
+
+/*
* Check whether the current user is permitted to lock this relation.
*/
static AclResult
-LockTableAclCheck(Oid reloid, LOCKMODE lockmode)
+LockTableAclCheck(Oid reloid, LOCKMODE lockmode, Oid userid)
{
AclResult aclresult;
AclMode aclmask;
@@ -178,7 +254,50 @@ LockTableAclCheck(Oid reloid, LOCKMODE lockmode)
else
aclmask = ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE;
- aclresult = pg_class_aclcheck(reloid, GetUserId(), aclmask);
+ aclresult = pg_class_aclcheck(reloid, userid, aclmask);
return aclresult;
}
+
+/*
+ * Check whether the view is lockable.
+ *
+ * Currently, only auto-updatable views can be locked, that is,
+ * views whose definition are simple and that doesn't have
+ * instead of rules or triggers are lockable.
+ */
+static void
+LockViewCheck(Relation view, Query *viewquery)
+{
+ const char *relname = RelationGetRelationName(view);
+ const char *auto_update_detail;
+ int i;
+
+ auto_update_detail = view_query_is_auto_updatable(viewquery, false);
+ if (auto_update_detail)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot lock view \"%s\"", relname),
+ errdetail_internal("%s", _(auto_update_detail))));
+
+
+ /* Confirm the view doesn't have instead of rules */
+ for (i=0; ird_rules->numLocks; i++)
+ {
+ if (view->rd_rules->rules[i]->isInstead && view->rd_rules->rules[i]->event != CMD_SELECT)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot lock view \"%s\"", relname),
+ errdetail_internal("views that have an INSTEAD OF rule are not lockable")));
+ }
+
+ /* Confirm the view doesn't have instead of triggers */
+ if (view->trigdesc &&
+ (view->trigdesc->trig_insert_instead_row ||
+ view->trigdesc->trig_update_instead_row ||
+ view->trigdesc->trig_delete_instead_row))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot lock view \"%s\"", relname),
+ errdetail_internal("views that have an INSTEAD OF trigger are not lockable")));
+}
diff --git a/src/test/regress/expected/lock.out b/src/test/regress/expected/lock.out
index fd27344..b6becb8 100644
--- a/src/test/regress/expected/lock.out
+++ b/src/test/regress/expected/lock.out
@@ -5,7 +5,9 @@
CREATE SCHEMA lock_schema1;
SET search_path = lock_schema1;
CREATE TABLE lock_tbl1 (a BIGINT);
-CREATE VIEW lock_view1 AS SELECT 1;
+CREATE VIEW lock_view1 AS SELECT * FROM lock_tbl1;
+CREATE VIEW lock_view2 AS SELECT count(*) FROM lock_tbl1;
+CREATE VIEW lock_view3 AS SELECT 1;
CREATE ROLE regress_rol_lock1;
ALTER ROLE regress_rol_lock1 SET search_path = lock_schema1;
GRANT USAGE ON SCHEMA lock_schema1 TO regress_rol_lock1;
@@ -30,8 +32,17 @@ LOCK TABLE lock_tbl1 IN SHARE MODE NOWAIT;
LOCK TABLE lock_tbl1 IN SHARE ROW EXCLUSIVE MODE NOWAIT;
LOCK TABLE lock_tbl1 IN EXCLUSIVE MODE NOWAIT;
LOCK TABLE lock_tbl1 IN ACCESS EXCLUSIVE MODE NOWAIT;
-LOCK TABLE lock_view1 IN EXCLUSIVE MODE; -- Will fail; can't lock a non-table
-ERROR: "lock_view1" is not a table
+LOCK TABLE lock_view1 IN EXCLUSIVE MODE; -- OK; can lock an autoupdatable view
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view2 IN EXCLUSIVE MODE; -- Will fail; can't lock a non-autoupdatable view
+ERROR: cannot lock view "lock_view2"
+DETAIL: Views that return aggregate functions are not automatically updatable.
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view3 IN EXCLUSIVE MODE; -- Will fail; can't lock a non-autoupdatable view
+ERROR: cannot lock view "lock_view3"
+DETAIL: Views that do not select from a single table or view are not automatically updatable.
ROLLBACK;
-- Verify that we can lock a table with inheritance children.
CREATE TABLE lock_tbl2 (b BIGINT) INHERITS (lock_tbl1);
@@ -55,6 +66,8 @@ RESET ROLE;
-- Clean up
--
DROP VIEW lock_view1;
+DROP VIEW lock_view2;
+DROP VIEW lock_view3;
DROP TABLE lock_tbl3;
DROP TABLE lock_tbl2;
DROP TABLE lock_tbl1;
diff --git a/src/test/regress/sql/lock.sql b/src/test/regress/sql/lock.sql
index 567e8bc..d8e97d7 100644
--- a/src/test/regress/sql/lock.sql
+++ b/src/test/regress/sql/lock.sql
@@ -6,7 +6,9 @@
CREATE SCHEMA lock_schema1;
SET search_path = lock_schema1;
CREATE TABLE lock_tbl1 (a BIGINT);
-CREATE VIEW lock_view1 AS SELECT 1;
+CREATE VIEW lock_view1 AS SELECT * FROM lock_tbl1;
+CREATE VIEW lock_view2 AS SELECT count(*) FROM lock_tbl1;
+CREATE VIEW lock_view3 AS SELECT 1;
CREATE ROLE regress_rol_lock1;
ALTER ROLE regress_rol_lock1 SET search_path = lock_schema1;
GRANT USAGE ON SCHEMA lock_schema1 TO regress_rol_lock1;
@@ -33,7 +35,13 @@ LOCK TABLE lock_tbl1 IN SHARE MODE NOWAIT;
LOCK TABLE lock_tbl1 IN SHARE ROW EXCLUSIVE MODE NOWAIT;
LOCK TABLE lock_tbl1 IN EXCLUSIVE MODE NOWAIT;
LOCK TABLE lock_tbl1 IN ACCESS EXCLUSIVE MODE NOWAIT;
-LOCK TABLE lock_view1 IN EXCLUSIVE MODE; -- Will fail; can't lock a non-table
+LOCK TABLE lock_view1 IN EXCLUSIVE MODE; -- OK; can lock an autoupdatable view
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view2 IN EXCLUSIVE MODE; -- Will fail; can't lock a non-autoupdatable view
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view3 IN EXCLUSIVE MODE; -- Will fail; can't lock a non-autoupdatable view
ROLLBACK;
-- Verify that we can lock a table with inheritance children.
@@ -59,6 +67,8 @@ RESET ROLE;
-- Clean up
--
DROP VIEW lock_view1;
+DROP VIEW lock_view2;
+DROP VIEW lock_view3;
DROP TABLE lock_tbl3;
DROP TABLE lock_tbl2;
DROP TABLE lock_tbl1;