Index: backend/commands/cluster.c =================================================================== RCS file: /projects/cvsroot/pgsql-server/src/backend/commands/cluster.c,v retrieving revision 1.87 diff -c -r1.87 cluster.c *** backend/commands/cluster.c 2002/08/27 03:38:27 1.87 --- backend/commands/cluster.c 2002/08/30 22:17:37 *************** *** 25,33 **** --- 25,35 ---- #include "catalog/index.h" #include "catalog/indexing.h" #include "catalog/catname.h" + #include "catalog/namespace.h" #include "commands/cluster.h" #include "commands/tablecmds.h" #include "miscadmin.h" + #include "utils/acl.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/syscache.h" *************** *** 48,60 **** bool isclustered; } IndexAttrs; static Oid make_new_heap(Oid OIDOldHeap, const char *NewName); static void copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex); static List *get_indexattr_list(Relation OldHeap, Oid OldIndex); static void recreate_indexattr(Oid OIDOldHeap, List *indexes); static void swap_relfilenodes(Oid r1, Oid r2); - /* * cluster * --- 50,74 ---- bool isclustered; } IndexAttrs; + /* This struct is used to pass the information on tables to be clustered + * around. We need this so we can make a list of them when invoked without + * a specific table/index pair. + */ + typedef struct + { + Oid tableOid; + Oid indexOid; + } relToCluster; + static Oid make_new_heap(Oid OIDOldHeap, const char *NewName); static void copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex); static List *get_indexattr_list(Relation OldHeap, Oid OldIndex); static void recreate_indexattr(Oid OIDOldHeap, List *indexes); static void swap_relfilenodes(Oid r1, Oid r2); + static void clusterRel(relToCluster *rv); + static bool checkClusterOwnership(Oid relOid); + static List *getTablesByOwner(Oid owner); /* * cluster * *************** *** 72,82 **** * Permissions checks were done already. */ void ! cluster(RangeVar *oldrelation, char *oldindexname) { ! Oid OIDOldHeap, ! OIDOldIndex, ! OIDNewHeap; Relation OldHeap, OldIndex; char NewHeapName[NAMEDATALEN]; --- 86,94 ---- * Permissions checks were done already. */ void ! clusterRel(relToCluster *rvtc) { ! Oid OIDNewHeap; Relation OldHeap, OldIndex; char NewHeapName[NAMEDATALEN]; *************** *** 87,111 **** * We grab exclusive access to the target rel and index for the * duration of the transaction. */ ! OldHeap = heap_openrv(oldrelation, AccessExclusiveLock); ! OIDOldHeap = RelationGetRelid(OldHeap); ! /* ! * The index is expected to be in the same namespace as the relation. ! */ ! OIDOldIndex = get_relname_relid(oldindexname, ! RelationGetNamespace(OldHeap)); ! if (!OidIsValid(OIDOldIndex)) ! elog(ERROR, "CLUSTER: cannot find index \"%s\" for table \"%s\"", ! oldindexname, RelationGetRelationName(OldHeap)); ! OldIndex = index_open(OIDOldIndex); LockRelation(OldIndex, AccessExclusiveLock); /* * Check that index is in fact an index on the given relation */ if (OldIndex->rd_index == NULL || ! OldIndex->rd_index->indrelid != OIDOldHeap) elog(ERROR, "CLUSTER: \"%s\" is not an index for table \"%s\"", RelationGetRelationName(OldIndex), RelationGetRelationName(OldHeap)); --- 99,114 ---- * We grab exclusive access to the target rel and index for the * duration of the transaction. */ ! OldHeap = heap_open(rvtc->tableOid, AccessExclusiveLock); ! OldIndex = index_open(rvtc->indexOid); LockRelation(OldIndex, AccessExclusiveLock); /* * Check that index is in fact an index on the given relation */ if (OldIndex->rd_index == NULL || ! OldIndex->rd_index->indrelid != rvtc->tableOid) elog(ERROR, "CLUSTER: \"%s\" is not an index for table \"%s\"", RelationGetRelationName(OldIndex), RelationGetRelationName(OldHeap)); *************** *** 122,128 **** RelationGetRelationName(OldHeap)); /* Save the information of all indexes on the relation. */ ! indexes = get_indexattr_list(OldHeap, OIDOldIndex); /* Drop relcache refcnts, but do NOT give up the locks */ index_close(OldIndex); --- 125,131 ---- RelationGetRelationName(OldHeap)); /* Save the information of all indexes on the relation. */ ! indexes = get_indexattr_list(OldHeap, rvtc->indexOid); /* Drop relcache refcnts, but do NOT give up the locks */ index_close(OldIndex); *************** *** 135,156 **** * particular, we can't create the new heap in a different namespace from * the old, or we will have problems with the TEMP status of temp tables. */ ! snprintf(NewHeapName, NAMEDATALEN, "pg_temp_%u", OIDOldHeap); ! OIDNewHeap = make_new_heap(OIDOldHeap, NewHeapName); /* We don't need CommandCounterIncrement() because make_new_heap did it. */ /* * Copy the heap data into the new table in the desired order. */ ! copy_heap_data(OIDNewHeap, OIDOldHeap, OIDOldIndex); /* To make the new heap's data visible (probably not needed?). */ CommandCounterIncrement(); /* Swap the relfilenodes of the old and new heaps. */ ! swap_relfilenodes(OIDOldHeap, OIDNewHeap); CommandCounterIncrement(); --- 138,159 ---- * particular, we can't create the new heap in a different namespace from * the old, or we will have problems with the TEMP status of temp tables. */ ! snprintf(NewHeapName, NAMEDATALEN, "pg_temp_%u", rvtc->tableOid); ! OIDNewHeap = make_new_heap(rvtc->tableOid, NewHeapName); /* We don't need CommandCounterIncrement() because make_new_heap did it. */ /* * Copy the heap data into the new table in the desired order. */ ! copy_heap_data(OIDNewHeap, rvtc->tableOid, rvtc->indexOid); /* To make the new heap's data visible (probably not needed?). */ CommandCounterIncrement(); /* Swap the relfilenodes of the old and new heaps. */ ! swap_relfilenodes(rvtc->tableOid, OIDNewHeap); CommandCounterIncrement(); *************** *** 171,177 **** * Recreate each index on the relation. We do not need * CommandCounterIncrement() because recreate_indexattr does it. */ ! recreate_indexattr(OIDOldHeap, indexes); } /* --- 174,180 ---- * Recreate each index on the relation. We do not need * CommandCounterIncrement() because recreate_indexattr does it. */ ! recreate_indexattr(rvtc->tableOid, indexes); } /* *************** *** 549,552 **** --- 552,658 ---- heap_freetuple(reltup2); heap_close(relRelation, RowExclusiveLock); + } + + void + cluster(ClusterStmt *stmt) + { + if (stmt->relation != NULL) + { + Oid indexOid, + tableOid; + relToCluster rvtc; + HeapTuple tuple; + + tableOid = RangeVarGetRelid(stmt->relation, false); + if (checkClusterOwnership(tableOid) == false) + elog(ERROR, "CLUSTER: You do not own relation %s", + stmt->relation->relname); + + /* The index is expected to be in the same namespace as the + * relation. */ + tuple = SearchSysCache(RELOID, + ObjectIdGetDatum(tableOid), + 0, 0, 0); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "Cache lookup failed for relation %u", tableOid); + indexOid = get_relname_relid(stmt->indexname, + ((Form_pg_class) GETSTRUCT(tuple))->relnamespace); + ReleaseSysCache(tuple); + + /* XXX The namespace should be reported as well */ + if (!OidIsValid(indexOid)) + elog(ERROR, "CLUSTER: cannot find index \"%s\" for table \"%s\"", + stmt->indexname, stmt->relation->relname); + rvtc.tableOid = tableOid; + rvtc.indexOid = indexOid; + clusterRel(&rvtc); + } + else + { + relToCluster *rvtc; + List *rv; + + /* Ok, now that we've got them all, cluster them one by one */ + foreach (rv, getTablesByOwner(GetUserId()) + { + rvtc = (relToCluster *)lfirst(rv); + elog(NOTICE, "Clustering index %u on table %u", + rvtc->indexOid, rvtc->tableOid); + clusterRel(rvtc); + /* We don't need this entry anymore, so free it. */ + pfree(rvtc); + } + } + } + + /* Checks if the user owns the relation. Superusers + * are allowed to cluster any table. + */ + bool + checkClusterOwnership(Oid relOid) + { + bool permOk = false; + + /* Superusers bypass this check */ + if (pg_class_ownercheck(relOid, GetUserId())) + permOk = true; + + return permOk; + } + + /* Get a list of tables that the current user owns and + * have indisclustered set. Return the list in a List * of rvsToCluster + * with the tableOid and the indexOid on which the table is already + * clustered. + */ + List * + getTablesByOwner(Oid owner) + { + Relation indRelation; + HeapScanDesc scan; + ScanKeyData entry; + HeapTuple indexTuple; + Form_pg_index index; + relToCluster *rvtc; + List *rvs = NIL; + + /* Get all indexes that have indisclustered set */ + indRelation = relation_openr(IndexRelationName, AccessShareLock); + ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indisclustered, + F_BOOLEQ, true); + scan = heap_beginscan(indRelation, SnapshotNow, 1, &entry); + while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + index = (Form_pg_index) GETSTRUCT(indexTuple); + if (!checkClusterOwnership(index->indrelid)) + continue; + rvtc = (relToCluster *)palloc(sizeof(relToCluster)); + rvtc->indexOid = index->indexrelid; + rvtc->tableOid = index->indrelid; + lcons((void *)rvtc, rvs); + } + heap_endscan(scan); + relation_close(indRelation, AccessShareLock); + return rvs; } Index: backend/parser/gram.y =================================================================== RCS file: /projects/cvsroot/pgsql-server/src/backend/parser/gram.y,v retrieving revision 2.364 diff -c -r2.364 gram.y *** backend/parser/gram.y 2002/08/29 00:17:04 2.364 --- backend/parser/gram.y 2002/08/30 22:17:54 *************** *** 3740,3746 **** /***************************************************************************** * * QUERY: ! * cluster on * *****************************************************************************/ --- 3740,3746 ---- /***************************************************************************** * * QUERY: ! * cluster [ on ] * *****************************************************************************/ *************** *** 3750,3755 **** --- 3750,3762 ---- ClusterStmt *n = makeNode(ClusterStmt); n->relation = $4; n->indexname = $2; + $$ = (Node*)n; + } + | CLUSTER + { + ClusterStmt *n = makeNode(ClusterStmt); + n->relation = NULL; + n->indexname = NULL; $$ = (Node*)n; } ; Index: backend/tcop/utility.c =================================================================== RCS file: /projects/cvsroot/pgsql-server/src/backend/tcop/utility.c,v retrieving revision 1.175 diff -c -r1.175 utility.c *** backend/tcop/utility.c 2002/08/30 19:23:20 1.175 --- backend/tcop/utility.c 2002/08/30 22:17:56 *************** *** 695,703 **** { ClusterStmt *stmt = (ClusterStmt *) parsetree; ! CheckOwnership(stmt->relation, true); ! ! cluster(stmt->relation, stmt->indexname); } break; --- 695,701 ---- { ClusterStmt *stmt = (ClusterStmt *) parsetree; ! cluster(stmt); } break; Index: include/commands/cluster.h =================================================================== RCS file: /projects/cvsroot/pgsql-server/src/include/commands/cluster.h,v retrieving revision 1.15 diff -c -r1.15 cluster.h *** include/commands/cluster.h 2002/08/10 21:00:34 1.15 --- include/commands/cluster.h 2002/08/30 22:17:58 *************** *** 13,21 **** #ifndef CLUSTER_H #define CLUSTER_H /* * functions */ ! extern void cluster(RangeVar *oldrelation, char *oldindexname); #endif /* CLUSTER_H */ --- 13,22 ---- #ifndef CLUSTER_H #define CLUSTER_H + #include /* * functions */ ! extern void cluster(ClusterStmt *stmt); #endif /* CLUSTER_H */ Index: include/nodes/parsenodes.h =================================================================== RCS file: /projects/cvsroot/pgsql-server/src/include/nodes/parsenodes.h,v retrieving revision 1.203 diff -c -r1.203 parsenodes.h *** include/nodes/parsenodes.h 2002/08/30 19:23:20 1.203 --- include/nodes/parsenodes.h 2002/08/30 22:53:18 *************** *** 1466,1472 **** typedef struct ClusterStmt { NodeTag type; ! RangeVar *relation; /* relation being indexed */ char *indexname; /* original index defined */ } ClusterStmt; --- 1466,1472 ---- typedef struct ClusterStmt { NodeTag type; ! RangeVar *relation; /* relation being indexed, or NULL if all */ char *indexname; /* original index defined */ } ClusterStmt;