1515 *
1616 *
1717 * IDENTIFICATION
18- *$PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.174 2009/08/04 21:56:08 tgl Exp $
18+ *$PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.175 2009/08/07 22:48:34 tgl Exp $
1919 *
2020 *-------------------------------------------------------------------------
2121 */
5959#define thandle HANDLE
6060#endif
6161
62+ /* Arguments needed for a worker child */
6263typedef struct _restore_args
6364{
6465ArchiveHandle * AH ;
6566TocEntry * te ;
6667}RestoreArgs ;
6768
69+ /* State for each parallel activity slot */
6870typedef struct _parallel_slot
6971{
7072thandle child_id ;
@@ -117,19 +119,24 @@ static thandle spawn_restore(RestoreArgs *args);
117119static thandle reap_child (ParallelSlot * slots ,int n_slots ,int * work_status );
118120static bool work_in_progress (ParallelSlot * slots ,int n_slots );
119121static int get_next_slot (ParallelSlot * slots ,int n_slots );
122+ static void par_list_header_init (TocEntry * l );
123+ static void par_list_append (TocEntry * l ,TocEntry * te );
124+ static void par_list_remove (TocEntry * te );
120125static TocEntry * get_next_work_item (ArchiveHandle * AH ,
121- TocEntry * * first_unprocessed ,
126+ TocEntry * ready_list ,
122127ParallelSlot * slots ,int n_slots );
123128static parallel_restore_result parallel_restore (RestoreArgs * args );
124- static void mark_work_done (ArchiveHandle * AH ,thandle worker ,int status ,
129+ static void mark_work_done (ArchiveHandle * AH ,TocEntry * ready_list ,
130+ thandle worker ,int status ,
125131ParallelSlot * slots ,int n_slots );
126132static void fix_dependencies (ArchiveHandle * AH );
127133static bool has_lock_conflicts (TocEntry * te1 ,TocEntry * te2 );
128134static void repoint_table_dependencies (ArchiveHandle * AH ,
129135DumpId tableId ,DumpId tableDataId );
130136static void identify_locking_dependencies (TocEntry * te ,
131137TocEntry * * tocsByDumpId );
132- static void reduce_dependencies (ArchiveHandle * AH ,TocEntry * te );
138+ static void reduce_dependencies (ArchiveHandle * AH ,TocEntry * te ,
139+ TocEntry * ready_list );
133140static void mark_create_done (ArchiveHandle * AH ,TocEntry * te );
134141static void inhibit_data_for_failed_table (ArchiveHandle * AH ,TocEntry * te );
135142static ArchiveHandle * CloneArchive (ArchiveHandle * AH );
@@ -3069,7 +3076,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
30693076ParallelSlot * slots ;
30703077int work_status ;
30713078int next_slot ;
3072- TocEntry * first_unprocessed = AH -> toc -> next ;
3079+ TocEntry pending_list ;
3080+ TocEntry ready_list ;
30733081TocEntry * next_work_item ;
30743082thandle ret_child ;
30753083TocEntry * te ;
@@ -3091,8 +3099,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
30913099 * faster in a single connection because we avoid all the connection and
30923100 * setup overhead.
30933101 */
3094- while ((next_work_item = get_next_work_item (AH ,& first_unprocessed ,
3095- NULL ,0 ))!= NULL )
3102+ for (next_work_item = AH -> toc -> next ;next_work_item != AH -> toc ;next_work_item = next_work_item -> next )
30963103{
30973104if (next_work_item -> section == SECTION_DATA ||
30983105next_work_item -> section == SECTION_POST_DATA )
@@ -3104,8 +3111,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
31043111
31053112(void )restore_toc_entry (AH ,next_work_item ,ropt , false);
31063113
3107- next_work_item -> restored = true;
3108- reduce_dependencies (AH ,next_work_item );
3114+ /* there should be no touch of ready_list here, so pass NULL */
3115+ reduce_dependencies (AH ,next_work_item , NULL );
31093116}
31103117
31113118/*
@@ -3128,6 +3135,25 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
31283135AH -> currTablespace = NULL ;
31293136AH -> currWithOids = -1 ;
31303137
3138+ /*
3139+ * Initialize the lists of pending and ready items. After this setup,
3140+ * the pending list is everything that needs to be done but is blocked
3141+ * by one or more dependencies, while the ready list contains items that
3142+ * have no remaining dependencies. Note: we don't yet filter out entries
3143+ * that aren't going to be restored. They might participate in
3144+ * dependency chains connecting entries that should be restored, so we
3145+ * treat them as live until we actually process them.
3146+ */
3147+ par_list_header_init (& pending_list );
3148+ par_list_header_init (& ready_list );
3149+ for (;next_work_item != AH -> toc ;next_work_item = next_work_item -> next )
3150+ {
3151+ if (next_work_item -> depCount > 0 )
3152+ par_list_append (& pending_list ,next_work_item );
3153+ else
3154+ par_list_append (& ready_list ,next_work_item );
3155+ }
3156+
31313157/*
31323158 * main parent loop
31333159 *
@@ -3137,7 +3163,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
31373163
31383164ahlog (AH ,1 ,"entering main parallel loop\n" );
31393165
3140- while ((next_work_item = get_next_work_item (AH ,& first_unprocessed ,
3166+ while ((next_work_item = get_next_work_item (AH ,& ready_list ,
31413167slots ,n_slots ))!= NULL ||
31423168work_in_progress (slots ,n_slots ))
31433169{
@@ -3153,8 +3179,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
31533179next_work_item -> dumpId ,
31543180next_work_item -> desc ,next_work_item -> tag );
31553181
3156- next_work_item -> restored = true ;
3157- reduce_dependencies (AH ,next_work_item );
3182+ par_list_remove ( next_work_item ) ;
3183+ reduce_dependencies (AH ,next_work_item , & ready_list );
31583184
31593185continue ;
31603186}
@@ -3169,7 +3195,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
31693195next_work_item -> dumpId ,
31703196next_work_item -> desc ,next_work_item -> tag );
31713197
3172- next_work_item -> restored = true ;
3198+ par_list_remove ( next_work_item ) ;
31733199
31743200/* this memory is dealloced in mark_work_done() */
31753201args = malloc (sizeof (RestoreArgs ));
@@ -3196,7 +3222,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
31963222
31973223if (WIFEXITED (work_status ))
31983224{
3199- mark_work_done (AH ,ret_child ,WEXITSTATUS (work_status ),
3225+ mark_work_done (AH ,& ready_list ,
3226+ ret_child ,WEXITSTATUS (work_status ),
32003227slots ,n_slots );
32013228}
32023229else
@@ -3222,14 +3249,11 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
32223249 * dependencies, or some other pathological condition. If so, do it in the
32233250 * single parent connection.
32243251 */
3225- for (te = AH -> toc -> next ;te != AH -> toc ;te = te -> next )
3252+ for (te = pending_list . par_next ;te != & pending_list ;te = te -> par_next )
32263253{
3227- if (!te -> restored )
3228- {
3229- ahlog (AH ,1 ,"processing missed item %d %s %s\n" ,
3230- te -> dumpId ,te -> desc ,te -> tag );
3231- (void )restore_toc_entry (AH ,te ,ropt , false);
3232- }
3254+ ahlog (AH ,1 ,"processing missed item %d %s %s\n" ,
3255+ te -> dumpId ,te -> desc ,te -> tag );
3256+ (void )restore_toc_entry (AH ,te ,ropt , false);
32333257}
32343258
32353259/* The ACLs will be handled back in RestoreArchive. */
@@ -3372,25 +3396,57 @@ has_lock_conflicts(TocEntry *te1, TocEntry *te2)
33723396}
33733397
33743398
3399+ /*
3400+ * Initialize the header of a parallel-processing list.
3401+ *
3402+ * These are circular lists with a dummy TocEntry as header, just like the
3403+ * main TOC list; but we use separate list links so that an entry can be in
3404+ * the main TOC list as well as in a parallel-processing list.
3405+ */
3406+ static void
3407+ par_list_header_init (TocEntry * l )
3408+ {
3409+ l -> par_prev = l -> par_next = l ;
3410+ }
3411+
3412+ /* Append te to the end of the parallel-processing list headed by l */
3413+ static void
3414+ par_list_append (TocEntry * l ,TocEntry * te )
3415+ {
3416+ te -> par_prev = l -> par_prev ;
3417+ l -> par_prev -> par_next = te ;
3418+ l -> par_prev = te ;
3419+ te -> par_next = l ;
3420+ }
3421+
3422+ /* Remove te from whatever parallel-processing list it's in */
3423+ static void
3424+ par_list_remove (TocEntry * te )
3425+ {
3426+ te -> par_prev -> par_next = te -> par_next ;
3427+ te -> par_next -> par_prev = te -> par_prev ;
3428+ te -> par_prev = NULL ;
3429+ te -> par_next = NULL ;
3430+ }
3431+
33753432
33763433/*
33773434 * Find the next work item (if any) that is capable of being run now.
33783435 *
33793436 * To qualify, the item must have no remaining dependencies
3380- * and no requirement for locks that is incompatible with
3381- * items currently running.
3437+ * and no requirements for locks that are incompatible with
3438+ * items currently running. Items in the ready_list are known to have
3439+ * no remaining dependencies, but we have to check for lock conflicts.
33823440 *
3383- * first_unprocessed is state data that tracks the location of the first
3384- * TocEntry that's not marked 'restored'. This avoids O(N^2) search time
3385- * with long TOC lists. (Even though the constant is pretty small, it'd
3386- * get us eventually.)
3441+ * Note that the returned item has *not* been removed from ready_list.
3442+ * The caller must do that after successfully dispatching the item.
33873443 *
33883444 * pref_non_data is for an alternative selection algorithm that gives
33893445 * preference to non-data items if there is already a data load running.
33903446 * It is currently disabled.
33913447 */
33923448static TocEntry *
3393- get_next_work_item (ArchiveHandle * AH ,TocEntry * * first_unprocessed ,
3449+ get_next_work_item (ArchiveHandle * AH ,TocEntry * ready_list ,
33943450ParallelSlot * slots ,int n_slots )
33953451{
33963452bool pref_non_data = false;/* or get from AH->ropt */
@@ -3415,26 +3471,12 @@ get_next_work_item(ArchiveHandle *AH, TocEntry **first_unprocessed,
34153471}
34163472
34173473/*
3418- * Advance first_unprocessed if possible.
3419- */
3420- for (te = * first_unprocessed ;te != AH -> toc ;te = te -> next )
3421- {
3422- if (!te -> restored )
3423- break ;
3424- }
3425- * first_unprocessed = te ;
3426-
3427- /*
3428- * Search from first_unprocessed until we find an available item.
3474+ * Search the ready_list until we find a suitable item.
34293475 */
3430- for (;te != AH -> toc ;te = te -> next )
3476+ for (te = ready_list -> par_next ;te != ready_list ;te = te -> par_next )
34313477{
34323478bool conflicts = false;
34333479
3434- /* Ignore if already done or still waiting on dependencies */
3435- if (te -> restored || te -> depCount > 0 )
3436- continue ;
3437-
34383480/*
34393481 * Check to see if the item would need exclusive lock on something
34403482 * that a currently running item also needs lock on, or vice versa. If
@@ -3546,7 +3588,8 @@ parallel_restore(RestoreArgs *args)
35463588 * update status, and reduce the dependency count of any dependent items.
35473589 */
35483590static void
3549- mark_work_done (ArchiveHandle * AH ,thandle worker ,int status ,
3591+ mark_work_done (ArchiveHandle * AH ,TocEntry * ready_list ,
3592+ thandle worker ,int status ,
35503593ParallelSlot * slots ,int n_slots )
35513594{
35523595TocEntry * te = NULL ;
@@ -3585,7 +3628,7 @@ mark_work_done(ArchiveHandle *AH, thandle worker, int status,
35853628die_horribly (AH ,modulename ,"worker process failed: exit code %d\n" ,
35863629status );
35873630
3588- reduce_dependencies (AH ,te );
3631+ reduce_dependencies (AH ,te , ready_list );
35893632}
35903633
35913634
@@ -3610,13 +3653,16 @@ fix_dependencies(ArchiveHandle *AH)
36103653 * indexes the TOC entries by dump ID, rather than searching the TOC list
36113654 * repeatedly.Entries for dump IDs not present in the TOC will be NULL.
36123655 *
3613- * Also, initialize the depCount fields.
3656+ * Also, initialize the depCount fields, and make sure all the TOC items
3657+ * are marked as not being in any parallel-processing list.
36143658 */
36153659tocsByDumpId = (TocEntry * * )calloc (AH -> maxDumpId ,sizeof (TocEntry * ));
36163660for (te = AH -> toc -> next ;te != AH -> toc ;te = te -> next )
36173661{
36183662tocsByDumpId [te -> dumpId - 1 ]= te ;
36193663te -> depCount = te -> nDeps ;
3664+ te -> par_prev = NULL ;
3665+ te -> par_next = NULL ;
36203666}
36213667
36223668/*
@@ -3785,10 +3831,11 @@ identify_locking_dependencies(TocEntry *te, TocEntry **tocsByDumpId)
37853831
37863832/*
37873833 * Remove the specified TOC entry from the depCounts of items that depend on
3788- * it, thereby possibly making them ready-to-run.
3834+ * it, thereby possibly making them ready-to-run. Any pending item that
3835+ * becomes ready should be moved to the ready list.
37893836 */
37903837static void
3791- reduce_dependencies (ArchiveHandle * AH ,TocEntry * te )
3838+ reduce_dependencies (ArchiveHandle * AH ,TocEntry * te , TocEntry * ready_list )
37923839{
37933840DumpId target = te -> dumpId ;
37943841int i ;
@@ -3805,7 +3852,16 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te)
38053852for (i = 0 ;i < te -> nDeps ;i ++ )
38063853{
38073854if (te -> dependencies [i ]== target )
3855+ {
38083856te -> depCount -- ;
3857+ if (te -> depCount == 0 && te -> par_prev != NULL )
3858+ {
3859+ /* It must be in the pending list, so remove it ... */
3860+ par_list_remove (te );
3861+ /* ... and add to ready_list */
3862+ par_list_append (ready_list ,te );
3863+ }
3864+ }
38093865}
38103866}
38113867}