3434#include "compress_io.h"
3535#include "dumputils.h"
3636#include "fe_utils/string_utils.h"
37+ #include "lib/binaryheap.h"
3738#include "lib/stringinfo.h"
3839#include "libpq/libpq-fs.h"
3940#include "parallel.h"
4445#define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
4546#define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
4647
47- /*
48- * State for tracking TocEntrys that are ready to process during a parallel
49- * restore. (This used to be a list, and we still call it that, though now
50- * it's really an array so that we can apply qsort to it.)
51- *
52- * tes[] is sized large enough that we can't overrun it.
53- * The valid entries are indexed first_te .. last_te inclusive.
54- * We periodically sort the array to bring larger-by-dataLength entries to
55- * the front; "sorted" is true if the valid entries are known sorted.
56- */
57- typedef struct _parallelReadyList
58- {
59- TocEntry * * tes ;/* Ready-to-dump TocEntrys */
60- int first_te ;/* index of first valid entry in tes[] */
61- int last_te ;/* index of last valid entry in tes[] */
62- bool sorted ;/* are valid entries currently sorted? */
63- }ParallelReadyList ;
64-
6548
6649static ArchiveHandle * _allocAH (const char * FileSpec ,const ArchiveFormat fmt ,
6750const pg_compress_specification compression_spec ,
@@ -111,16 +94,12 @@ static void restore_toc_entries_postfork(ArchiveHandle *AH,
11194static void pending_list_header_init (TocEntry * l );
11295static void pending_list_append (TocEntry * l ,TocEntry * te );
11396static void pending_list_remove (TocEntry * te );
114- static void ready_list_init (ParallelReadyList * ready_list ,int tocCount );
115- static void ready_list_free (ParallelReadyList * ready_list );
116- static void ready_list_insert (ParallelReadyList * ready_list ,TocEntry * te );
117- static void ready_list_remove (ParallelReadyList * ready_list ,int i );
118- static void ready_list_sort (ParallelReadyList * ready_list );
119- static int TocEntrySizeCompare (const void * p1 ,const void * p2 );
120- static void move_to_ready_list (TocEntry * pending_list ,
121- ParallelReadyList * ready_list ,
97+ static int TocEntrySizeCompareQsort (const void * p1 ,const void * p2 );
98+ static int TocEntrySizeCompareBinaryheap (void * p1 ,void * p2 ,void * arg );
99+ static void move_to_ready_heap (TocEntry * pending_list ,
100+ binaryheap * ready_heap ,
122101RestorePass pass );
123- static TocEntry * pop_next_work_item (ParallelReadyList * ready_list ,
102+ static TocEntry * pop_next_work_item (binaryheap * ready_heap ,
124103ParallelState * pstate );
125104static void mark_dump_job_done (ArchiveHandle * AH ,
126105TocEntry * te ,
@@ -135,7 +114,7 @@ static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
135114static void repoint_table_dependencies (ArchiveHandle * AH );
136115static void identify_locking_dependencies (ArchiveHandle * AH ,TocEntry * te );
137116static void reduce_dependencies (ArchiveHandle * AH ,TocEntry * te ,
138- ParallelReadyList * ready_list );
117+ binaryheap * ready_heap );
139118static void mark_create_done (ArchiveHandle * AH ,TocEntry * te );
140119static void inhibit_data_for_failed_table (ArchiveHandle * AH ,TocEntry * te );
141120
@@ -2384,7 +2363,7 @@ WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
23842363}
23852364
23862365if (ntes > 1 )
2387- qsort (tes ,ntes ,sizeof (TocEntry * ),TocEntrySizeCompare );
2366+ qsort (tes ,ntes ,sizeof (TocEntry * ),TocEntrySizeCompareQsort );
23882367
23892368for (int i = 0 ;i < ntes ;i ++ )
23902369DispatchJobForTocEntry (AH ,pstate ,tes [i ],ACT_DUMP ,
@@ -3984,7 +3963,7 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
39843963
39853964(void )restore_toc_entry (AH ,next_work_item , false);
39863965
3987- /* Reduce dependencies, but don't move anything toready_list */
3966+ /* Reduce dependencies, but don't move anything toready_heap */
39883967reduce_dependencies (AH ,next_work_item ,NULL );
39893968}
39903969else
@@ -4027,24 +4006,26 @@ static void
40274006restore_toc_entries_parallel (ArchiveHandle * AH ,ParallelState * pstate ,
40284007TocEntry * pending_list )
40294008{
4030- ParallelReadyList ready_list ;
4009+ binaryheap * ready_heap ;
40314010TocEntry * next_work_item ;
40324011
40334012pg_log_debug ("entering restore_toc_entries_parallel" );
40344013
4035- /* Set up ready_list with enough room for all known TocEntrys */
4036- ready_list_init (& ready_list ,AH -> tocCount );
4014+ /* Set up ready_heap with enough room for all known TocEntrys */
4015+ ready_heap = binaryheap_allocate (AH -> tocCount ,
4016+ TocEntrySizeCompareBinaryheap ,
4017+ NULL );
40374018
40384019/*
40394020 * The pending_list contains all items that we need to restore. Move all
4040- * items that are available to process immediately into theready_list .
4021+ * items that are available to process immediately into theready_heap .
40414022 * After this setup, the pending list is everything that needs to be done
4042- * but is blocked by one or more dependencies, while the readylist
4023+ * but is blocked by one or more dependencies, while the readyheap
40434024 * contains items that have no remaining dependencies and are OK to
40444025 * process in the current restore pass.
40454026 */
40464027AH -> restorePass = RESTORE_PASS_MAIN ;
4047- move_to_ready_list (pending_list ,& ready_list ,AH -> restorePass );
4028+ move_to_ready_heap (pending_list ,ready_heap ,AH -> restorePass );
40484029
40494030/*
40504031 * main parent loop
@@ -4058,7 +4039,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
40584039for (;;)
40594040{
40604041/* Look for an item ready to be dispatched to a worker */
4061- next_work_item = pop_next_work_item (& ready_list ,pstate );
4042+ next_work_item = pop_next_work_item (ready_heap ,pstate );
40624043if (next_work_item != NULL )
40634044{
40644045/* If not to be restored, don't waste time launching a worker */
@@ -4068,7 +4049,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
40684049next_work_item -> dumpId ,
40694050next_work_item -> desc ,next_work_item -> tag );
40704051/* Update its dependencies as though we'd completed it */
4071- reduce_dependencies (AH ,next_work_item ,& ready_list );
4052+ reduce_dependencies (AH ,next_work_item ,ready_heap );
40724053/* Loop around to see if anything else can be dispatched */
40734054continue ;
40744055}
@@ -4079,7 +4060,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
40794060
40804061/* Dispatch to some worker */
40814062DispatchJobForTocEntry (AH ,pstate ,next_work_item ,ACT_RESTORE ,
4082- mark_restore_job_done ,& ready_list );
4063+ mark_restore_job_done ,ready_heap );
40834064}
40844065else if (IsEveryWorkerIdle (pstate ))
40854066{
@@ -4093,7 +4074,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
40934074/* Advance to next restore pass */
40944075AH -> restorePass ++ ;
40954076/* That probably allows some stuff to be made ready */
4096- move_to_ready_list (pending_list ,& ready_list ,AH -> restorePass );
4077+ move_to_ready_heap (pending_list ,ready_heap ,AH -> restorePass );
40974078/* Loop around to see if anything's now ready */
40984079continue ;
40994080}
@@ -4122,10 +4103,10 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
41224103next_work_item ?WFW_ONE_IDLE :WFW_GOT_STATUS );
41234104}
41244105
4125- /* There should now be nothing inready_list . */
4126- Assert (ready_list . first_te > ready_list . last_te );
4106+ /* There should now be nothing inready_heap . */
4107+ Assert (binaryheap_empty ( ready_heap ) );
41274108
4128- ready_list_free ( & ready_list );
4109+ binaryheap_free ( ready_heap );
41294110
41304111pg_log_info ("finished main parallel loop" );
41314112}
@@ -4225,80 +4206,9 @@ pending_list_remove(TocEntry *te)
42254206}
42264207
42274208
4228- /*
4229- * Initialize the ready_list with enough room for up to tocCount entries.
4230- */
4231- static void
4232- ready_list_init (ParallelReadyList * ready_list ,int tocCount )
4233- {
4234- ready_list -> tes = (TocEntry * * )
4235- pg_malloc (tocCount * sizeof (TocEntry * ));
4236- ready_list -> first_te = 0 ;
4237- ready_list -> last_te = -1 ;
4238- ready_list -> sorted = false;
4239- }
4240-
4241- /*
4242- * Free storage for a ready_list.
4243- */
4244- static void
4245- ready_list_free (ParallelReadyList * ready_list )
4246- {
4247- pg_free (ready_list -> tes );
4248- }
4249-
4250- /* Add te to the ready_list */
4251- static void
4252- ready_list_insert (ParallelReadyList * ready_list ,TocEntry * te )
4253- {
4254- ready_list -> tes [++ ready_list -> last_te ]= te ;
4255- /* List is (probably) not sorted anymore. */
4256- ready_list -> sorted = false;
4257- }
4258-
4259- /* Remove the i'th entry in the ready_list */
4260- static void
4261- ready_list_remove (ParallelReadyList * ready_list ,int i )
4262- {
4263- int f = ready_list -> first_te ;
4264-
4265- Assert (i >=f && i <=ready_list -> last_te );
4266-
4267- /*
4268- * In the typical case where the item to be removed is the first ready
4269- * entry, we need only increment first_te to remove it. Otherwise, move
4270- * the entries before it to compact the list. (This preserves sortedness,
4271- * if any.) We could alternatively move the entries after i, but there
4272- * are typically many more of those.
4273- */
4274- if (i > f )
4275- {
4276- TocEntry * * first_te_ptr = & ready_list -> tes [f ];
4277-
4278- memmove (first_te_ptr + 1 ,first_te_ptr , (i - f )* sizeof (TocEntry * ));
4279- }
4280- ready_list -> first_te ++ ;
4281- }
4282-
4283- /* Sort the ready_list into the desired order */
4284- static void
4285- ready_list_sort (ParallelReadyList * ready_list )
4286- {
4287- if (!ready_list -> sorted )
4288- {
4289- int n = ready_list -> last_te - ready_list -> first_te + 1 ;
4290-
4291- if (n > 1 )
4292- qsort (ready_list -> tes + ready_list -> first_te ,n ,
4293- sizeof (TocEntry * ),
4294- TocEntrySizeCompare );
4295- ready_list -> sorted = true;
4296- }
4297- }
4298-
42994209/* qsort comparator for sorting TocEntries by dataLength */
43004210static int
4301- TocEntrySizeCompare (const void * p1 ,const void * p2 )
4211+ TocEntrySizeCompareQsort (const void * p1 ,const void * p2 )
43024212{
43034213const TocEntry * te1 = * (const TocEntry * const * )p1 ;
43044214const TocEntry * te2 = * (const TocEntry * const * )p2 ;
@@ -4318,17 +4228,25 @@ TocEntrySizeCompare(const void *p1, const void *p2)
43184228return 0 ;
43194229}
43204230
4231+ /* binaryheap comparator for sorting TocEntries by dataLength */
4232+ static int
4233+ TocEntrySizeCompareBinaryheap (void * p1 ,void * p2 ,void * arg )
4234+ {
4235+ /* return opposite of qsort comparator for max-heap */
4236+ return - TocEntrySizeCompareQsort (& p1 ,& p2 );
4237+ }
4238+
43214239
43224240/*
4323- * Move all immediately-ready items from pending_list toready_list .
4241+ * Move all immediately-ready items from pending_list toready_heap .
43244242 *
43254243 * Items are considered ready if they have no remaining dependencies and
43264244 * they belong in the current restore pass. (See also reduce_dependencies,
43274245 * which applies the same logic one-at-a-time.)
43284246 */
43294247static void
4330- move_to_ready_list (TocEntry * pending_list ,
4331- ParallelReadyList * ready_list ,
4248+ move_to_ready_heap (TocEntry * pending_list ,
4249+ binaryheap * ready_heap ,
43324250RestorePass pass )
43334251{
43344252TocEntry * te ;
@@ -4344,38 +4262,38 @@ move_to_ready_list(TocEntry *pending_list,
43444262{
43454263/* Remove it from pending_list ... */
43464264pending_list_remove (te );
4347- /* ... and add toready_list */
4348- ready_list_insert ( ready_list ,te );
4265+ /* ... and add toready_heap */
4266+ binaryheap_add ( ready_heap ,te );
43494267}
43504268}
43514269}
43524270
43534271/*
43544272 * Find the next work item (if any) that is capable of being run now,
4355- * and remove it from theready_list .
4273+ * and remove it from theready_heap .
43564274 *
43574275 * Returns the item, or NULL if nothing is runnable.
43584276 *
43594277 * To qualify, the item must have no remaining dependencies
43604278 * and no requirements for locks that are incompatible with
4361- * items currently running. Items in theready_list are known to have
4279+ * items currently running. Items in theready_heap are known to have
43624280 * no remaining dependencies, but we have to check for lock conflicts.
43634281 */
43644282static TocEntry *
4365- pop_next_work_item (ParallelReadyList * ready_list ,
4283+ pop_next_work_item (binaryheap * ready_heap ,
43664284ParallelState * pstate )
43674285{
43684286/*
4369- *Sort theready_list so that we'll tackle larger jobs first.
4370- */
4371- ready_list_sort ( ready_list );
4372-
4373- /*
4374- *Search the ready_list until we find a suitable item .
4287+ *Search theready_heap until we find a suitable item. Note that we do a
4288+ * sequential scan through the heap nodes, so even though we will first
4289+ * try to choose the highest-priority item, we might end up picking
4290+ * something with a much lower priority. However, we expect that we will
4291+ * typically be able to pick one of the first few items, which should
4292+ *usually have a relatively high priority .
43754293 */
4376- for (int i = ready_list -> first_te ;i <= ready_list -> last_te ;i ++ )
4294+ for (int i = 0 ;i < binaryheap_size ( ready_heap ) ;i ++ )
43774295{
4378- TocEntry * te = ready_list -> tes [ i ] ;
4296+ TocEntry * te = ( TocEntry * ) binaryheap_get_node ( ready_heap , i ) ;
43794297bool conflicts = false;
43804298
43814299/*
@@ -4401,7 +4319,7 @@ pop_next_work_item(ParallelReadyList *ready_list,
44014319continue ;
44024320
44034321/* passed all tests, so this item can run */
4404- ready_list_remove ( ready_list ,i );
4322+ binaryheap_remove_node ( ready_heap ,i );
44054323return te ;
44064324}
44074325
@@ -4447,7 +4365,7 @@ mark_restore_job_done(ArchiveHandle *AH,
44474365int status ,
44484366void * callback_data )
44494367{
4450- ParallelReadyList * ready_list = (ParallelReadyList * )callback_data ;
4368+ binaryheap * ready_heap = (binaryheap * )callback_data ;
44514369
44524370pg_log_info ("finished item %d %s %s" ,
44534371te -> dumpId ,te -> desc ,te -> tag );
@@ -4465,7 +4383,7 @@ mark_restore_job_done(ArchiveHandle *AH,
44654383pg_fatal ("worker process failed: exit code %d" ,
44664384status );
44674385
4468- reduce_dependencies (AH ,te ,ready_list );
4386+ reduce_dependencies (AH ,te ,ready_heap );
44694387}
44704388
44714389
@@ -4708,11 +4626,11 @@ identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
47084626/*
47094627 * Remove the specified TOC entry from the depCounts of items that depend on
47104628 * it, thereby possibly making them ready-to-run. Any pending item that
4711- * becomes ready should be moved to theready_list , if that's provided.
4629+ * becomes ready should be moved to theready_heap , if that's provided.
47124630 */
47134631static void
47144632reduce_dependencies (ArchiveHandle * AH ,TocEntry * te ,
4715- ParallelReadyList * ready_list )
4633+ binaryheap * ready_heap )
47164634{
47174635int i ;
47184636
@@ -4730,18 +4648,18 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
47304648 * the current restore pass, and it is currently a member of the
47314649 * pending list (that check is needed to prevent double restore in
47324650 * some cases where a list-file forces out-of-order restoring).
4733- * However, ifready_list == NULL then caller doesn't want any list
4651+ * However, ifready_heap == NULL then caller doesn't want any list
47344652 * memberships changed.
47354653 */
47364654if (otherte -> depCount == 0 &&
47374655_tocEntryRestorePass (otherte )== AH -> restorePass &&
47384656otherte -> pending_prev != NULL &&
4739- ready_list != NULL )
4657+ ready_heap != NULL )
47404658{
47414659/* Remove it from pending list ... */
47424660pending_list_remove (otherte );
4743- /* ... and add toready_list */
4744- ready_list_insert ( ready_list ,otherte );
4661+ /* ... and add toready_heap */
4662+ binaryheap_add ( ready_heap ,otherte );
47454663}
47464664}
47474665}