Index: src/bin/pg_dump/pg_backup_archiver.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.c,v retrieving revision 1.172 diff -c -d -r1.172 pg_backup_archiver.c *** src/bin/pg_dump/pg_backup_archiver.c 11 Jun 2009 14:49:07 -0000 1.172 --- src/bin/pg_dump/pg_backup_archiver.c 17 Jul 2009 02:20:28 -0000 *************** *** 59,70 **** --- 59,80 ---- #define thandle HANDLE #endif + /* List header for pending-activity lists */ + typedef struct + { + TocEntry *head; + TocEntry *tail; + /* The list link fields in each TocEntry are par_prev and par_next */ + } TocEntryList; + + /* Arguments needed for a worker child */ typedef struct _restore_args { ArchiveHandle *AH; TocEntry *te; } RestoreArgs; + /* State for each parallel activity slot */ typedef struct _parallel_slot { thandle child_id; *************** *** 117,124 **** static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status); static bool work_in_progress(ParallelSlot *slots, int n_slots); static int get_next_slot(ParallelSlot *slots, int n_slots); static TocEntry *get_next_work_item(ArchiveHandle *AH, ! TocEntry **first_unprocessed, ParallelSlot *slots, int n_slots); static parallel_restore_result parallel_restore(RestoreArgs *args); static void mark_work_done(ArchiveHandle *AH, thandle worker, int status, --- 127,138 ---- static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status); static bool work_in_progress(ParallelSlot *slots, int n_slots); static int get_next_slot(ParallelSlot *slots, int n_slots); + static void toc_list_append(TocEntryList *l, TocEntry *te); + static void toc_list_remove(TocEntryList *l, TocEntry *te); + static void find_ready_items(TocEntryList *pending_list, + TocEntryList *ready_list); static TocEntry *get_next_work_item(ArchiveHandle *AH, ! TocEntryList *ready_list, ParallelSlot *slots, int n_slots); static parallel_restore_result parallel_restore(RestoreArgs *args); static void mark_work_done(ArchiveHandle *AH, thandle worker, int status, *************** *** 3065,3071 **** ParallelSlot *slots; int work_status; int next_slot; ! TocEntry *first_unprocessed = AH->toc->next; TocEntry *next_work_item; thandle ret_child; TocEntry *te; --- 3079,3086 ---- ParallelSlot *slots; int work_status; int next_slot; ! TocEntryList pending_list; ! TocEntryList ready_list; TocEntry *next_work_item; thandle ret_child; TocEntry *te; *************** *** 3087,3094 **** * faster in a single connection because we avoid all the connection and * setup overhead. */ ! while ((next_work_item = get_next_work_item(AH, &first_unprocessed, ! NULL, 0)) != NULL) { if (next_work_item->section == SECTION_DATA || next_work_item->section == SECTION_POST_DATA) --- 3102,3108 ---- * faster in a single connection because we avoid all the connection and * setup overhead. */ ! for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next) { if (next_work_item->section == SECTION_DATA || next_work_item->section == SECTION_POST_DATA) *************** *** 3100,3106 **** (void) restore_toc_entry(AH, next_work_item, ropt, false); - next_work_item->restored = true; reduce_dependencies(AH, next_work_item); } --- 3114,3119 ---- *************** *** 3125,3130 **** --- 3138,3162 ---- AH->currWithOids = -1; /* + * Initialize the lists of pending and ready items. After this setup, + * the pending list is everything that needs to be done but is blocked + * by one or more dependencies, while the ready list contains items that + * have no remaining dependencies. Note: we don't yet filter out entries + * that aren't going to be restored. They might participate in + * dependency chains connecting entries that should be restored, so we + * treat them as live until we actually process them. + */ + pending_list.head = pending_list.tail = NULL; + ready_list.head = ready_list.tail = NULL; + for (; next_work_item != AH->toc; next_work_item = next_work_item->next) + { + if (next_work_item->depCount > 0) + toc_list_append(&pending_list, next_work_item); + else + toc_list_append(&ready_list, next_work_item); + } + + /* * main parent loop * * Keep going until there is no worker still running AND there is no work *************** *** 3133,3139 **** ahlog(AH, 1, "entering main parallel loop\n"); ! while ((next_work_item = get_next_work_item(AH, &first_unprocessed, slots, n_slots)) != NULL || work_in_progress(slots, n_slots)) { --- 3165,3171 ---- ahlog(AH, 1, "entering main parallel loop\n"); ! while ((next_work_item = get_next_work_item(AH, &ready_list, slots, n_slots)) != NULL || work_in_progress(slots, n_slots)) { *************** *** 3149,3156 **** next_work_item->dumpId, next_work_item->desc, next_work_item->tag); ! next_work_item->restored = true; reduce_dependencies(AH, next_work_item); continue; } --- 3181,3189 ---- next_work_item->dumpId, next_work_item->desc, next_work_item->tag); ! toc_list_remove(&ready_list, next_work_item); reduce_dependencies(AH, next_work_item); + find_ready_items(&pending_list, &ready_list); continue; } *************** *** 3165,3171 **** next_work_item->dumpId, next_work_item->desc, next_work_item->tag); ! next_work_item->restored = true; /* this memory is dealloced in mark_work_done() */ args = malloc(sizeof(RestoreArgs)); --- 3198,3204 ---- next_work_item->dumpId, next_work_item->desc, next_work_item->tag); ! toc_list_remove(&ready_list, next_work_item); /* this memory is dealloced in mark_work_done() */ args = malloc(sizeof(RestoreArgs)); *************** *** 3194,3199 **** --- 3227,3233 ---- { mark_work_done(AH, ret_child, WEXITSTATUS(work_status), slots, n_slots); + find_ready_items(&pending_list, &ready_list); } else { *************** *** 3218,3231 **** * dependencies, or some other pathological condition. If so, do it in the * single parent connection. */ ! for (te = AH->toc->next; te != AH->toc; te = te->next) { ! if (!te->restored) ! { ! ahlog(AH, 1, "processing missed item %d %s %s\n", ! te->dumpId, te->desc, te->tag); ! (void) restore_toc_entry(AH, te, ropt, false); ! } } /* The ACLs will be handled back in RestoreArchive. */ --- 3252,3262 ---- * dependencies, or some other pathological condition. If so, do it in the * single parent connection. */ ! for (te = pending_list.head; te; te = te->par_next) { ! ahlog(AH, 1, "processing missed item %d %s %s\n", ! te->dumpId, te->desc, te->tag); ! (void) restore_toc_entry(AH, te, ropt, false); } /* The ACLs will be handled back in RestoreArchive. */ *************** *** 3366,3392 **** } return false; } /* * Find the next work item (if any) that is capable of being run now. * * To qualify, the item must have no remaining dependencies ! * and no requirement for locks that is incompatible with ! * items currently running. * ! * first_unprocessed is state data that tracks the location of the first ! * TocEntry that's not marked 'restored'. This avoids O(N^2) search time ! * with long TOC lists. (Even though the constant is pretty small, it'd ! * get us eventually.) * * pref_non_data is for an alternative selection algorithm that gives * preference to non-data items if there is already a data load running. * It is currently disabled. */ static TocEntry * ! get_next_work_item(ArchiveHandle *AH, TocEntry **first_unprocessed, ParallelSlot *slots, int n_slots) { bool pref_non_data = false; /* or get from AH->ropt */ --- 3397,3474 ---- } return false; } + + + /* Append te to the end of the TocEntryList */ + static void + toc_list_append(TocEntryList *l, TocEntry *te) + { + if (l->tail) + l->tail->par_next = te; + te->par_prev = l->tail; + te->par_next = NULL; + l->tail = te; + if (l->head == NULL) + l->head = te; + } + + /* Remove te from the TocEntryList */ + static void + toc_list_remove(TocEntryList *l, TocEntry *te) + { + if (te->par_prev) + te->par_prev->par_next = te->par_next; + else + l->head = te->par_next; + if (te->par_next) + te->par_next->par_prev = te->par_prev; + else + l->tail = te->par_prev; + te->par_next = NULL; + te->par_prev = NULL; + } + + /* + * Find pending items that have no remaining dependencies, and move them + * to the ready list. + */ + static void + find_ready_items(TocEntryList *pending_list, TocEntryList *ready_list) + { + TocEntry *te; + TocEntry *next; + for (te = pending_list->head; te; te = next) + { + /* Save list link in case we move this item to the other list */ + next = te->par_next; + if (te->depCount > 0) + continue; /* not ready yet */ + /* OK, move it */ + toc_list_remove(pending_list, te); + toc_list_append(ready_list, te); + } + } + /* * Find the next work item (if any) that is capable of being run now. * * To qualify, the item must have no remaining dependencies ! * and no requirements for locks that are incompatible with ! * items currently running. Items in the ready_list are known to have ! * no remaining dependencies, but we have to check for lock conflicts. * ! * Note that the returned item has *not* been removed from ready_list. ! * The caller must do that after successfully dispatching the item. * * pref_non_data is for an alternative selection algorithm that gives * preference to non-data items if there is already a data load running. * It is currently disabled. */ static TocEntry * ! get_next_work_item(ArchiveHandle *AH, TocEntryList *ready_list, ParallelSlot *slots, int n_slots) { bool pref_non_data = false; /* or get from AH->ropt */ *************** *** 3411,3436 **** } /* ! * Advance first_unprocessed if possible. ! */ ! for (te = *first_unprocessed; te != AH->toc; te = te->next) ! { ! if (!te->restored) ! break; ! } ! *first_unprocessed = te; ! ! /* ! * Search from first_unprocessed until we find an available item. */ ! for (; te != AH->toc; te = te->next) { bool conflicts = false; - /* Ignore if already done or still waiting on dependencies */ - if (te->restored || te->depCount > 0) - continue; - /* * Check to see if the item would need exclusive lock on something * that a currently running item also needs lock on, or vice versa. If --- 3493,3504 ---- } /* ! * Search the ready_list until we find a suitable item. */ ! for (te = ready_list->head; te; te = te->par_next) { bool conflicts = false; /* * Check to see if the item would need exclusive lock on something * that a currently running item also needs lock on, or vice versa. If Index: src/bin/pg_dump/pg_backup_archiver.h =================================================================== RCS file: /projects/cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.h,v retrieving revision 1.79 diff -c -d -r1.79 pg_backup_archiver.h *** src/bin/pg_dump/pg_backup_archiver.h 11 Jun 2009 14:49:07 -0000 1.79 --- src/bin/pg_dump/pg_backup_archiver.h 17 Jul 2009 02:20:28 -0000 *************** *** 314,320 **** void *formatData; /* TOC Entry data specific to file format */ /* working state (needed only for parallel restore) */ ! bool restored; /* item is in progress or done */ bool created; /* set for DATA member if TABLE was created */ int depCount; /* number of dependencies not yet restored */ DumpId *lockDeps; /* dumpIds of objects this one needs lock on */ --- 314,321 ---- void *formatData; /* TOC Entry data specific to file format */ /* working state (needed only for parallel restore) */ ! struct _tocEntry *par_prev; /* list links for pending items */ ! struct _tocEntry *par_next; bool created; /* set for DATA member if TABLE was created */ int depCount; /* number of dependencies not yet restored */ DumpId *lockDeps; /* dumpIds of objects this one needs lock on */