Index: cluster.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/commands/cluster.c,v retrieving revision 1.83 diff -c -u -r1.83 cluster.c --- cluster.c 2002/07/12 18:43:15 1.83 +++ cluster.c 2002/07/14 19:27:15 @@ -27,45 +27,74 @@ #include "catalog/dependency.h" #include "catalog/heap.h" #include "catalog/index.h" +#include "catalog/indexing.h" +#include "catalog/catname.h" #include "catalog/pg_index.h" #include "catalog/pg_proc.h" #include "commands/cluster.h" #include "commands/tablecmds.h" #include "miscadmin.h" #include "utils/builtins.h" +#include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/syscache.h" +#include "utils/relcache.h" +/* + * We need one of these structs for each index in the relation to be + * clustered. It's basically the data needed by index_create() so + * we can recreate the indexes after destroying the old heap. + */ +typedef struct +{ + char *indexName; + IndexInfo *indexInfo; + Oid accessMethodOID; + Oid *classOID; + Oid indexOID; + bool isPrimary; +} IndexAttrs; static Oid copy_heap(Oid OIDOldHeap, const char *NewName); -static Oid copy_index(Oid OIDOldIndex, Oid OIDNewHeap, - const char *NewIndexName); static void rebuildheap(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex); +static List *get_indexattr_list (Oid OIDOldHeap); +static void recreate_indexattr(Oid OIDOldHeap, List *indexes); +static void swap_relfilenodes(Oid r1, Oid r2); +Relation RelationIdGetRelation(Oid relationId); + /* * cluster + * + * This clusters the table by creating a new, clustered table and + * swapping the relfilenodes of the new table and the old table, so + * the OID of the original table is preserved. Thus we do not lose + * GRANT, inheritance nor references to this table (this was a bug + * till version 7.2). + * + * Also create new indexes and swap the filenodes with the old indexes + * the same way we do for the relation. + * + * TODO: + * maybe we can get away with AccessShareLock for the table. + * Concurrency would be much improved. Only acquire + * AccessExclusiveLock right before swapping the filenodes. + * This would allow users to CLUSTER on a regular basis, + * practically eliminating the need for auto-clustered indexes. * - * STILL TO DO: - * Create a list of all the other indexes on this relation. Because - * the cluster will wreck all the tids, I'll need to destroy bogus - * indexes. The user will have to re-create them. Not nice, but - * I'm not a nice guy. The alternative is to try some kind of post - * destroy re-build. This may be possible. I'll check out what the - * index create functiond want in the way of paramaters. On the other - * hand, re-creating n indexes may blow out the space. + * Preserve constraint bit for the indexes. */ void cluster(RangeVar *oldrelation, char *oldindexname) { Oid OIDOldHeap, OIDOldIndex, - OIDNewHeap, - OIDNewIndex; + OIDNewHeap; Relation OldHeap, OldIndex; char NewHeapName[NAMEDATALEN]; - char NewIndexName[NAMEDATALEN]; ObjectAddress object; + List *indexes; /* * We grab exclusive access to the target rel and index for the @@ -96,9 +125,10 @@ heap_close(OldHeap, NoLock); index_close(OldIndex); - /* - * Create the new heap with a temporary name. - */ + /* Save the information of all indexes on the relation. */ + indexes = get_indexattr_list(OIDOldHeap); + + /* Create the new heap with a temporary name. */ snprintf(NewHeapName, NAMEDATALEN, "temp_%u", OIDOldHeap); OIDNewHeap = copy_heap(OIDOldHeap, NewHeapName); @@ -112,30 +142,28 @@ /* To make the new heap's data visible. */ CommandCounterIncrement(); - - /* Create new index over the tuples of the new heap. */ - snprintf(NewIndexName, NAMEDATALEN, "temp_%u", OIDOldIndex); - OIDNewIndex = copy_index(OIDOldIndex, OIDNewHeap, NewIndexName); + /* Swap the relfilenodes of the old and new heaps. */ + swap_relfilenodes(OIDNewHeap, OIDOldHeap); CommandCounterIncrement(); - /* Destroy old heap (along with its index) and rename new. */ + /* Destroy new heap with old filenode */ object.classId = RelOid_pg_class; - object.objectId = OIDOldHeap; + object.objectId = OIDNewHeap; object.objectSubId = 0; - /* XXX better to use DROP_CASCADE here? */ + /* The relation is private to our transaction, so + * DROP_RESTRICT should be OK. + */ performDeletion(&object, DROP_RESTRICT); /* performDeletion does CommandCounterIncrement at end */ - - renamerel(OIDNewHeap, oldrelation->relname); - - /* This one might be unnecessary, but let's be safe. */ - CommandCounterIncrement(); - renamerel(OIDNewIndex, oldindexname); + /* Recreate the indexes on the relation. We do not need + * CommandCounterIncrement() because recreate_indexattr does it. + */ + recreate_indexattr(OIDOldHeap, indexes); } static Oid @@ -181,43 +209,6 @@ return OIDNewHeap; } -static Oid -copy_index(Oid OIDOldIndex, Oid OIDNewHeap, const char *NewIndexName) -{ - Oid OIDNewIndex; - Relation OldIndex, - NewHeap; - IndexInfo *indexInfo; - - NewHeap = heap_open(OIDNewHeap, AccessExclusiveLock); - OldIndex = index_open(OIDOldIndex); - - /* - * Create a new index like the old one. To do this I get the info - * from pg_index, and add a new index with a temporary name (that will - * be changed later). - */ - indexInfo = BuildIndexInfo(OldIndex->rd_index); - - OIDNewIndex = index_create(OIDNewHeap, - NewIndexName, - indexInfo, - OldIndex->rd_rel->relam, - OldIndex->rd_index->indclass, - OldIndex->rd_index->indisprimary, - false, /* XXX losing constraint status */ - allowSystemTableMods); - - setRelhasindex(OIDNewHeap, true, - OldIndex->rd_index->indisprimary, InvalidOid); - - index_close(OldIndex); - heap_close(NewHeap, NoLock); - - return OIDNewIndex; -} - - static void rebuildheap(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex) { @@ -260,4 +251,175 @@ index_close(LocalOldIndex); heap_close(LocalOldHeap, NoLock); heap_close(LocalNewHeap, NoLock); +} + +/* Get the necessary info about the indexes in the relation and + * return a List of IndexAttrs. + */ +List * +get_indexattr_list (Oid OIDOldHeap) +{ + ScanKeyData entry; + HeapScanDesc scan; + Relation indexRelation; + HeapTuple indexTuple; + List *indexes = NIL; + IndexAttrs *attrs; + HeapTuple tuple; + Form_pg_index index; + + /* Grab the index tuples by looking into RelationRelationName + * by the OID of the old heap. + */ + indexRelation = heap_openr(IndexRelationName, AccessShareLock); + ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid, + F_OIDEQ, ObjectIdGetDatum(OIDOldHeap)); + scan = heap_beginscan(indexRelation, SnapshotNow, 1, &entry); + while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + index = (Form_pg_index) GETSTRUCT(indexTuple); + + attrs = (IndexAttrs *) palloc(sizeof(IndexAttrs)); + attrs->indexInfo = BuildIndexInfo(index); + attrs->isPrimary = index->indisprimary; + attrs->indexOID = index->indexrelid; + + /* The opclasses are copied verbatim from the original indexes. + */ + attrs->classOID = (Oid *)palloc(sizeof(Oid) * + attrs->indexInfo->ii_NumIndexAttrs); + memcpy(attrs->classOID, index->indclass, + sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs); + + /* Name and access method of each index come from + * RelationRelationName. + */ + tuple = SearchSysCache(RELOID, + ObjectIdGetDatum(attrs->indexOID), + 0, 0, 0); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "CLUSTER: cannot find index %u", attrs->indexOID); + attrs->indexName = pstrdup(NameStr(((Form_pg_class) GETSTRUCT(tuple))->relname)); + attrs->accessMethodOID = ((Form_pg_class) GETSTRUCT(tuple))->relam; + ReleaseSysCache(tuple); + + /* Cons the gathered data into the list. We do not care about + * ordering, and this is more efficient than append. + */ + indexes=lcons((void *)attrs, indexes); + } + heap_endscan(scan); + heap_close(indexRelation, AccessShareLock); + return indexes; +} + +/* Create new indexes and swap the filenodes with old indexes. Then drop + * the new index (carrying the old heap along). + */ +void +recreate_indexattr(Oid OIDOldHeap, List *indexes) +{ + IndexAttrs *attrs; + List *elem; + Oid newIndexOID; + char newIndexName[NAMEDATALEN]; + + foreach (elem, indexes) + { + attrs=(IndexAttrs *) lfirst(elem); + + /* Create the new index under a temporary name */ + snprintf(newIndexName, NAMEDATALEN, "temp_%u", attrs->indexOID); + newIndexOID = index_create(OIDOldHeap, newIndexName, + attrs->indexInfo, attrs->accessMethodOID, + attrs->classOID, attrs->isPrimary, + /* XXX need to get constraint status */ + false, allowSystemTableMods); + CommandCounterIncrement(); + + /* Swap the filenodes. */ + swap_relfilenodes(newIndexOID, attrs->indexOID); + setRelhasindex(OIDOldHeap, true, attrs->isPrimary, InvalidOid); + + /* I'm not sure this one is needed, but let's be safe. */ + CommandCounterIncrement(); + + /* Drop the new index, carrying the old filenode along. */ + index_drop(newIndexOID); + CommandCounterIncrement(); + + pfree(attrs->classOID); + pfree(attrs); + } + freeList(indexes); +} + +/* Swap the relfilenodes for two given relations. + */ +void +swap_relfilenodes(Oid r1, Oid r2) +{ + /* I can probably keep RelationRelationName open in the main + * function and pass the Relation around so I don't have to open + * it every time. + */ + Relation relRelation, + irels[Num_pg_class_indices], + rel; + HeapTuple reltup[2]; + Oid tempRFNode; + int i; + + /* We need both RelationRelationName tuples. */ + relRelation = heap_openr(RelationRelationName, RowExclusiveLock); + + reltup[0] = SearchSysCacheCopy(RELOID, + ObjectIdGetDatum(r1), + 0, 0, 0); + if (!HeapTupleIsValid(reltup[0])) + elog(ERROR, "CLUSTER: Cannot find tuple for relation %u", r1); + reltup[1] = SearchSysCacheCopy(RELOID, + ObjectIdGetDatum(r2), + 0, 0, 0); + if (!HeapTupleIsValid(reltup[1])) + elog(ERROR, "CLUSTER: Cannot find tuple for relation %u", r2); + + /* The buffer manager gets confused if we swap relfilenodes for + * relations that are not both local or non-local to this transaction. + * Flush the buffers on both relations so the buffer manager can + * forget about'em. + */ + + rel = RelationIdGetRelation(r1); + i = FlushRelationBuffers(rel, RelationGetNumberOfBlocks(rel)); + if (i < 0) + elog(ERROR, "CLUSTER: FlushRelationBuffers returned %d", i); + RelationClose(rel); + rel = RelationIdGetRelation(r1); + i = FlushRelationBuffers(rel, RelationGetNumberOfBlocks(rel)); + if (i < 0) + elog(ERROR, "CLUSTER: FlushRelationBuffers returned %d", i); + RelationClose(rel); + + /* Actually swap the filenodes */ + + tempRFNode = ((Form_pg_class) GETSTRUCT(reltup[0]))->relfilenode; + ((Form_pg_class) GETSTRUCT(reltup[0]))->relfilenode = + ((Form_pg_class) GETSTRUCT(reltup[1]))->relfilenode; + ((Form_pg_class) GETSTRUCT(reltup[1]))->relfilenode = tempRFNode; + + /* Update the RelationRelationName tuples */ + simple_heap_update(relRelation, &reltup[1]->t_self, reltup[1]); + simple_heap_update(relRelation, &reltup[0]->t_self, reltup[0]); + + /* To keep system catalogs current. */ + CatalogOpenIndices(Num_pg_class_indices, Name_pg_class_indices, irels); + CatalogIndexInsert(irels, Num_pg_class_indices, relRelation, reltup[1]); + CatalogIndexInsert(irels, Num_pg_class_indices, relRelation, reltup[0]); + CatalogCloseIndices(Num_pg_class_indices, irels); + CommandCounterIncrement(); + + heap_close(relRelation, NoLock); + heap_freetuple(reltup[0]); + heap_freetuple(reltup[1]); }