Index: cluster.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/commands/cluster.c,v retrieving revision 1.82 diff -c -u -r1.82 cluster.c --- cluster.c 2002/06/20 20:29:26 1.82 +++ cluster.c 2002/07/11 02:39:30 @@ -26,47 +26,69 @@ #include "access/heapam.h" #include "catalog/heap.h" #include "catalog/index.h" +#include "catalog/indexing.h" +#include "catalog/catname.h" #include "catalog/pg_index.h" #include "catalog/pg_proc.h" #include "commands/cluster.h" #include "commands/tablecmds.h" #include "miscadmin.h" #include "utils/builtins.h" +#include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/syscache.h" +/* + * We need one of these structs for each index in the relation to be + * clustered. It's basically the data needed by index_create() so + * we can recreate the indexes after destroying the old heap. + */ +typedef struct +{ + char *indexName; + IndexInfo *indexInfo; + Oid accessMethodOID; + Oid *classOID; + Oid indexOID; + bool isPrimary; +} IndexAttrs; static Oid copy_heap(Oid OIDOldHeap, const char *NewName); -static Oid copy_index(Oid OIDOldIndex, Oid OIDNewHeap, - const char *NewIndexName); static void rebuildheap(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex); +List *get_indexattr_list (Oid OIDOldHeap); +void recreate_indexattr(Oid OIDOldHeap, List *indexes); +void swap_relfilenodes(Oid r1, Oid r2); /* * cluster * * STILL TO DO: - * Create a list of all the other indexes on this relation. Because - * the cluster will wreck all the tids, I'll need to destroy bogus - * indexes. The user will have to re-create them. Not nice, but - * I'm not a nice guy. The alternative is to try some kind of post - * destroy re-build. This may be possible. I'll check out what the - * index create functiond want in the way of paramaters. On the other - * hand, re-creating n indexes may blow out the space. + * Keep foreign keys, permissions and inheritance of the clustered table. + * + * We need to look at making use of the ability to write a new version of a + * table (or index) under a new relfilenode value, without changing the + * table's OID. */ void cluster(RangeVar *oldrelation, char *oldindexname) { Oid OIDOldHeap, OIDOldIndex, - OIDNewHeap, - OIDNewIndex; + OIDNewHeap; Relation OldHeap, OldIndex; char NewHeapName[NAMEDATALEN]; - char NewIndexName[NAMEDATALEN]; + List *indexes; + /* The games with filenodes may not be rollbackable, so + * disallow running this inside a transaction block. + * This may be a false assumption though. + */ + if (IsTransactionBlock()) + elog (ERROR, "CLUSTER: may not be called inside a transaction block"); + /* - * We grab exclusive access to the target rel and index for the + * We grab exclusive access to the target relation for the * duration of the transaction. */ OldHeap = heap_openrv(oldrelation, AccessExclusiveLock); @@ -94,43 +116,39 @@ heap_close(OldHeap, NoLock); index_close(OldIndex); - /* - * Create the new heap with a temporary name. - */ + /* Save the information of all indexes on the relation. */ + indexes = get_indexattr_list(OIDOldHeap); + + /* Create the new heap with a temporary name. */ snprintf(NewHeapName, NAMEDATALEN, "temp_%u", OIDOldHeap); OIDNewHeap = copy_heap(OIDOldHeap, NewHeapName); /* We do not need CommandCounterIncrement() because copy_heap did it. */ - /* - * Copy the heap data into the new table in the desired order. - */ + /* Copy the heap data into the new table in the desired order. */ rebuildheap(OIDNewHeap, OIDOldHeap, OIDOldIndex); /* To make the new heap's data visible. */ CommandCounterIncrement(); - - /* Create new index over the tuples of the new heap. */ - snprintf(NewIndexName, NAMEDATALEN, "temp_%u", OIDOldIndex); - - OIDNewIndex = copy_index(OIDOldIndex, OIDNewHeap, NewIndexName); - - CommandCounterIncrement(); - /* Destroy old heap (along with its index) and rename new. */ - heap_drop_with_catalog(OIDOldHeap, allowSystemTableMods); + /* Swap the relfilenodes of the old and new heaps. */ + swap_relfilenodes(OIDNewHeap, OIDOldHeap); CommandCounterIncrement(); - - renamerel(OIDNewHeap, oldrelation->relname); - /* This one might be unnecessary, but let's be safe. */ + /* Destroy the new heap, carrying the old filenode along. */ + heap_drop_with_catalog(OIDNewHeap, allowSystemTableMods); CommandCounterIncrement(); - renamerel(OIDNewIndex, oldindexname); + /* Recreate the indexes on the relation. We do not need + * CommandCounterIncrement() because recreate_indexattr does it. + */ + recreate_indexattr(OIDOldHeap, indexes); } +/* Create and initialize the new heap + */ static Oid copy_heap(Oid OIDOldHeap, const char *NewName) { @@ -173,43 +191,9 @@ return OIDNewHeap; } - -static Oid -copy_index(Oid OIDOldIndex, Oid OIDNewHeap, const char *NewIndexName) -{ - Oid OIDNewIndex; - Relation OldIndex, - NewHeap; - IndexInfo *indexInfo; - - NewHeap = heap_open(OIDNewHeap, AccessExclusiveLock); - OldIndex = index_open(OIDOldIndex); - - /* - * Create a new index like the old one. To do this I get the info - * from pg_index, and add a new index with a temporary name (that will - * be changed later). - */ - indexInfo = BuildIndexInfo(OldIndex->rd_index); - - OIDNewIndex = index_create(OIDNewHeap, - NewIndexName, - indexInfo, - OldIndex->rd_rel->relam, - OldIndex->rd_index->indclass, - OldIndex->rd_index->indisprimary, - allowSystemTableMods); - - setRelhasindex(OIDNewHeap, true, - OldIndex->rd_index->indisprimary, InvalidOid); - - index_close(OldIndex); - heap_close(NewHeap, NoLock); - - return OIDNewIndex; -} - +/* Load the data into the new heap, clustered. + */ static void rebuildheap(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex) { @@ -252,4 +236,157 @@ index_close(LocalOldIndex); heap_close(LocalOldHeap, NoLock); heap_close(LocalNewHeap, NoLock); +} + +/* Get the necessary info about the indexes in the relation and + * return a List of IndexAttrs. + */ +List * +get_indexattr_list (Oid OIDOldHeap) +{ + ScanKeyData entry; + HeapScanDesc scan; + Relation indexRelation; + HeapTuple indexTuple; + List *indexes = NIL; + IndexAttrs *attrs; + HeapTuple tuple; + Form_pg_index index; + + /* Grab the index tuples by looking into RelationRelationName + * by the OID of the old heap. + */ + indexRelation = heap_openr(IndexRelationName, AccessShareLock); + ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid, + F_OIDEQ, ObjectIdGetDatum(OIDOldHeap)); + scan = heap_beginscan(indexRelation, SnapshotNow, 1, &entry); + while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + index = (Form_pg_index) GETSTRUCT(indexTuple); + + attrs = (IndexAttrs *) palloc(sizeof(IndexAttrs)); + attrs->indexInfo = BuildIndexInfo(index); + attrs->isPrimary = index->indisprimary; + attrs->indexOID = index->indexrelid; + + /* The opclasses are copied verbatim from the original indexes. + */ + attrs->classOID = (Oid *)palloc(sizeof(Oid) * + attrs->indexInfo->ii_NumIndexAttrs); + memcpy(attrs->classOID, index->indclass, + sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs); + + /* Name and access method of each index come from + * RelationRelationName. + */ + tuple = SearchSysCache(RELOID, + ObjectIdGetDatum(attrs->indexOID), + 0, 0, 0); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "CLUSTER: cannot find index %u", attrs->indexOID); + attrs->indexName = pstrdup(NameStr(((Form_pg_class) GETSTRUCT(tuple))->relname)); + attrs->accessMethodOID = ((Form_pg_class) GETSTRUCT(tuple))->relam; + ReleaseSysCache(tuple); + + /* Cons the gathered data into the list. We do not care about + * ordering, and this is more efficient than append. + */ + indexes=lcons((void *)attrs, indexes); + } + heap_endscan(scan); + heap_close(indexRelation, AccessShareLock); + return indexes; +} + +/* Create new indexes and swap the filenodes with old indexes. Then drop + * the new index (carrying the old heap along). + */ +void +recreate_indexattr(Oid OIDOldHeap, List *indexes) +{ + IndexAttrs *attrs; + List *elem; + Oid newIndexOID; + char newIndexName[NAMEDATALEN]; + + foreach (elem, indexes) + { + attrs=(IndexAttrs *) lfirst(elem); + + /* Create the new index under a temporary name */ + snprintf(newIndexName, NAMEDATALEN, "temp_%u", attrs->indexOID); + newIndexOID = index_create(OIDOldHeap, newIndexName, + attrs->indexInfo, attrs->accessMethodOID, + attrs->classOID, attrs->isPrimary, + allowSystemTableMods); + CommandCounterIncrement(); + + /* Swap the filenodes. */ + swap_relfilenodes(newIndexOID, attrs->indexOID); + setRelhasindex(OIDOldHeap, true, attrs->isPrimary, InvalidOid); + + /* I'm not sure this one is needed, but let's be safe. */ + CommandCounterIncrement(); + + /* Drop the new index, carrying the old filenode along. */ + index_drop(newIndexOID); + CommandCounterIncrement(); + + pfree(attrs->classOID); + pfree(attrs); + } + freeList(indexes); +} + +/* Swap the relfilenodes for two given relations. + */ +void +swap_relfilenodes(Oid r1, Oid r2) +{ + /* I can probably keep RelationRelationName open in the main + * function and pass the Relation around so I don't have to open + * it avery time. + */ + Relation relRelation, + irels[Num_pg_class_indices]; + HeapTuple reltup[2]; + Oid tempRFNode; + + /* We need both RelationRelationName tuples. */ + relRelation = heap_openr(RelationRelationName, RowExclusiveLock); + + reltup[0] = SearchSysCacheCopy(RELOID, + ObjectIdGetDatum(r1), + 0, 0, 0); + if (!HeapTupleIsValid(reltup[0])) + elog(ERROR, "CLUSTER: Cannot find tuple for relation %u", r1); + reltup[1] = SearchSysCacheCopy(RELOID, + ObjectIdGetDatum(r2), + 0, 0, 0); + if (!HeapTupleIsValid(reltup[1])) + elog(ERROR, "CLUSTER: Cannot find tuple for relation %u", r2); + + /* Actually swap the filenodes */ + + tempRFNode = ((Form_pg_class) GETSTRUCT(reltup[0]))->relfilenode; + ((Form_pg_class) GETSTRUCT(reltup[0]))->relfilenode = + ((Form_pg_class) GETSTRUCT(reltup[1]))->relfilenode; + ((Form_pg_class) GETSTRUCT(reltup[1]))->relfilenode = tempRFNode; + + /* Update the RelationRelationName tuples */ + simple_heap_update(relRelation, &reltup[1]->t_self, reltup[1]); + simple_heap_update(relRelation, &reltup[0]->t_self, reltup[0]); + + /* Keep system catalogs current. */ + CatalogOpenIndices(Num_pg_class_indices, Name_pg_class_indices, irels); + CatalogIndexInsert(irels, Num_pg_class_indices, relRelation, reltup[0]); + CatalogIndexInsert(irels, Num_pg_class_indices, relRelation, reltup[1]); + CatalogCloseIndices(Num_pg_class_indices, irels); + + CommandCounterIncrement(); + + heap_close(relRelation, NoLock); + heap_freetuple(reltup[0]); + heap_freetuple(reltup[1]); + }