Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit530bc38

Browse files
nathan-bossartpull[bot]
authored andcommitted
Convert pg_restore's ready_list to a priority queue.
Presently, parallel restores spend a lot of time sorting this listso that we pick the largest items first. With many tables, thissorting can become a significant bottleneck. There are a couple ofreports from the field about this, and it is easy to reproduce.This commit improves the performance of parallel pg_restore withmany tables by converting its ready_list to a priority queue, i.e.,a binary heap. We will first try to run the highest priority item,but if it cannot be chosen due to the lock heuristic, we'll do asequential scan through the heap nodes until we find one that isrunnable. This means that we might end up picking an item with amuch lower priority. However, we expect that we will typically beable to pick one of the first few items, which should usually havea relatively high priority.Suggested-by: Tom LaneTested-by: Pierre DucroquetReviewed-by: Tom LaneDiscussion:https://postgr.es/m/3612876.1689443232%40sss.pgh.pa.us
1 parentfeba44a commit530bc38

File tree

1 file changed

+58
-140
lines changed

1 file changed

+58
-140
lines changed

‎src/bin/pg_dump/pg_backup_archiver.c

Lines changed: 58 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include"compress_io.h"
3535
#include"dumputils.h"
3636
#include"fe_utils/string_utils.h"
37+
#include"lib/binaryheap.h"
3738
#include"lib/stringinfo.h"
3839
#include"libpq/libpq-fs.h"
3940
#include"parallel.h"
@@ -44,24 +45,6 @@
4445
#defineTEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
4546
#defineTEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
4647

47-
/*
48-
* State for tracking TocEntrys that are ready to process during a parallel
49-
* restore. (This used to be a list, and we still call it that, though now
50-
* it's really an array so that we can apply qsort to it.)
51-
*
52-
* tes[] is sized large enough that we can't overrun it.
53-
* The valid entries are indexed first_te .. last_te inclusive.
54-
* We periodically sort the array to bring larger-by-dataLength entries to
55-
* the front; "sorted" is true if the valid entries are known sorted.
56-
*/
57-
typedefstruct_parallelReadyList
58-
{
59-
TocEntry**tes;/* Ready-to-dump TocEntrys */
60-
intfirst_te;/* index of first valid entry in tes[] */
61-
intlast_te;/* index of last valid entry in tes[] */
62-
boolsorted;/* are valid entries currently sorted? */
63-
}ParallelReadyList;
64-
6548

6649
staticArchiveHandle*_allocAH(constchar*FileSpec,constArchiveFormatfmt,
6750
constpg_compress_specificationcompression_spec,
@@ -111,16 +94,12 @@ static void restore_toc_entries_postfork(ArchiveHandle *AH,
11194
staticvoidpending_list_header_init(TocEntry*l);
11295
staticvoidpending_list_append(TocEntry*l,TocEntry*te);
11396
staticvoidpending_list_remove(TocEntry*te);
114-
staticvoidready_list_init(ParallelReadyList*ready_list,inttocCount);
115-
staticvoidready_list_free(ParallelReadyList*ready_list);
116-
staticvoidready_list_insert(ParallelReadyList*ready_list,TocEntry*te);
117-
staticvoidready_list_remove(ParallelReadyList*ready_list,inti);
118-
staticvoidready_list_sort(ParallelReadyList*ready_list);
119-
staticintTocEntrySizeCompare(constvoid*p1,constvoid*p2);
120-
staticvoidmove_to_ready_list(TocEntry*pending_list,
121-
ParallelReadyList*ready_list,
97+
staticintTocEntrySizeCompareQsort(constvoid*p1,constvoid*p2);
98+
staticintTocEntrySizeCompareBinaryheap(void*p1,void*p2,void*arg);
99+
staticvoidmove_to_ready_heap(TocEntry*pending_list,
100+
binaryheap*ready_heap,
122101
RestorePasspass);
123-
staticTocEntry*pop_next_work_item(ParallelReadyList*ready_list,
102+
staticTocEntry*pop_next_work_item(binaryheap*ready_heap,
124103
ParallelState*pstate);
125104
staticvoidmark_dump_job_done(ArchiveHandle*AH,
126105
TocEntry*te,
@@ -135,7 +114,7 @@ static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
135114
staticvoidrepoint_table_dependencies(ArchiveHandle*AH);
136115
staticvoididentify_locking_dependencies(ArchiveHandle*AH,TocEntry*te);
137116
staticvoidreduce_dependencies(ArchiveHandle*AH,TocEntry*te,
138-
ParallelReadyList*ready_list);
117+
binaryheap*ready_heap);
139118
staticvoidmark_create_done(ArchiveHandle*AH,TocEntry*te);
140119
staticvoidinhibit_data_for_failed_table(ArchiveHandle*AH,TocEntry*te);
141120

@@ -2384,7 +2363,7 @@ WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
23842363
}
23852364

23862365
if (ntes>1)
2387-
qsort(tes,ntes,sizeof(TocEntry*),TocEntrySizeCompare);
2366+
qsort(tes,ntes,sizeof(TocEntry*),TocEntrySizeCompareQsort);
23882367

23892368
for (inti=0;i<ntes;i++)
23902369
DispatchJobForTocEntry(AH,pstate,tes[i],ACT_DUMP,
@@ -3984,7 +3963,7 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
39843963

39853964
(void)restore_toc_entry(AH,next_work_item, false);
39863965

3987-
/* Reduce dependencies, but don't move anything toready_list */
3966+
/* Reduce dependencies, but don't move anything toready_heap */
39883967
reduce_dependencies(AH,next_work_item,NULL);
39893968
}
39903969
else
@@ -4027,24 +4006,26 @@ static void
40274006
restore_toc_entries_parallel(ArchiveHandle*AH,ParallelState*pstate,
40284007
TocEntry*pending_list)
40294008
{
4030-
ParallelReadyListready_list;
4009+
binaryheap*ready_heap;
40314010
TocEntry*next_work_item;
40324011

40334012
pg_log_debug("entering restore_toc_entries_parallel");
40344013

4035-
/* Set up ready_list with enough room for all known TocEntrys */
4036-
ready_list_init(&ready_list,AH->tocCount);
4014+
/* Set up ready_heap with enough room for all known TocEntrys */
4015+
ready_heap=binaryheap_allocate(AH->tocCount,
4016+
TocEntrySizeCompareBinaryheap,
4017+
NULL);
40374018

40384019
/*
40394020
* The pending_list contains all items that we need to restore. Move all
4040-
* items that are available to process immediately into theready_list.
4021+
* items that are available to process immediately into theready_heap.
40414022
* After this setup, the pending list is everything that needs to be done
4042-
* but is blocked by one or more dependencies, while the readylist
4023+
* but is blocked by one or more dependencies, while the readyheap
40434024
* contains items that have no remaining dependencies and are OK to
40444025
* process in the current restore pass.
40454026
*/
40464027
AH->restorePass=RESTORE_PASS_MAIN;
4047-
move_to_ready_list(pending_list,&ready_list,AH->restorePass);
4028+
move_to_ready_heap(pending_list,ready_heap,AH->restorePass);
40484029

40494030
/*
40504031
* main parent loop
@@ -4058,7 +4039,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
40584039
for (;;)
40594040
{
40604041
/* Look for an item ready to be dispatched to a worker */
4061-
next_work_item=pop_next_work_item(&ready_list,pstate);
4042+
next_work_item=pop_next_work_item(ready_heap,pstate);
40624043
if (next_work_item!=NULL)
40634044
{
40644045
/* If not to be restored, don't waste time launching a worker */
@@ -4068,7 +4049,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
40684049
next_work_item->dumpId,
40694050
next_work_item->desc,next_work_item->tag);
40704051
/* Update its dependencies as though we'd completed it */
4071-
reduce_dependencies(AH,next_work_item,&ready_list);
4052+
reduce_dependencies(AH,next_work_item,ready_heap);
40724053
/* Loop around to see if anything else can be dispatched */
40734054
continue;
40744055
}
@@ -4079,7 +4060,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
40794060

40804061
/* Dispatch to some worker */
40814062
DispatchJobForTocEntry(AH,pstate,next_work_item,ACT_RESTORE,
4082-
mark_restore_job_done,&ready_list);
4063+
mark_restore_job_done,ready_heap);
40834064
}
40844065
elseif (IsEveryWorkerIdle(pstate))
40854066
{
@@ -4093,7 +4074,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
40934074
/* Advance to next restore pass */
40944075
AH->restorePass++;
40954076
/* That probably allows some stuff to be made ready */
4096-
move_to_ready_list(pending_list,&ready_list,AH->restorePass);
4077+
move_to_ready_heap(pending_list,ready_heap,AH->restorePass);
40974078
/* Loop around to see if anything's now ready */
40984079
continue;
40994080
}
@@ -4122,10 +4103,10 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
41224103
next_work_item ?WFW_ONE_IDLE :WFW_GOT_STATUS);
41234104
}
41244105

4125-
/* There should now be nothing inready_list. */
4126-
Assert(ready_list.first_te>ready_list.last_te);
4106+
/* There should now be nothing inready_heap. */
4107+
Assert(binaryheap_empty(ready_heap));
41274108

4128-
ready_list_free(&ready_list);
4109+
binaryheap_free(ready_heap);
41294110

41304111
pg_log_info("finished main parallel loop");
41314112
}
@@ -4225,80 +4206,9 @@ pending_list_remove(TocEntry *te)
42254206
}
42264207

42274208

4228-
/*
4229-
* Initialize the ready_list with enough room for up to tocCount entries.
4230-
*/
4231-
staticvoid
4232-
ready_list_init(ParallelReadyList*ready_list,inttocCount)
4233-
{
4234-
ready_list->tes= (TocEntry**)
4235-
pg_malloc(tocCount*sizeof(TocEntry*));
4236-
ready_list->first_te=0;
4237-
ready_list->last_te=-1;
4238-
ready_list->sorted= false;
4239-
}
4240-
4241-
/*
4242-
* Free storage for a ready_list.
4243-
*/
4244-
staticvoid
4245-
ready_list_free(ParallelReadyList*ready_list)
4246-
{
4247-
pg_free(ready_list->tes);
4248-
}
4249-
4250-
/* Add te to the ready_list */
4251-
staticvoid
4252-
ready_list_insert(ParallelReadyList*ready_list,TocEntry*te)
4253-
{
4254-
ready_list->tes[++ready_list->last_te]=te;
4255-
/* List is (probably) not sorted anymore. */
4256-
ready_list->sorted= false;
4257-
}
4258-
4259-
/* Remove the i'th entry in the ready_list */
4260-
staticvoid
4261-
ready_list_remove(ParallelReadyList*ready_list,inti)
4262-
{
4263-
intf=ready_list->first_te;
4264-
4265-
Assert(i >=f&&i <=ready_list->last_te);
4266-
4267-
/*
4268-
* In the typical case where the item to be removed is the first ready
4269-
* entry, we need only increment first_te to remove it. Otherwise, move
4270-
* the entries before it to compact the list. (This preserves sortedness,
4271-
* if any.) We could alternatively move the entries after i, but there
4272-
* are typically many more of those.
4273-
*/
4274-
if (i>f)
4275-
{
4276-
TocEntry**first_te_ptr=&ready_list->tes[f];
4277-
4278-
memmove(first_te_ptr+1,first_te_ptr, (i-f)*sizeof(TocEntry*));
4279-
}
4280-
ready_list->first_te++;
4281-
}
4282-
4283-
/* Sort the ready_list into the desired order */
4284-
staticvoid
4285-
ready_list_sort(ParallelReadyList*ready_list)
4286-
{
4287-
if (!ready_list->sorted)
4288-
{
4289-
intn=ready_list->last_te-ready_list->first_te+1;
4290-
4291-
if (n>1)
4292-
qsort(ready_list->tes+ready_list->first_te,n,
4293-
sizeof(TocEntry*),
4294-
TocEntrySizeCompare);
4295-
ready_list->sorted= true;
4296-
}
4297-
}
4298-
42994209
/* qsort comparator for sorting TocEntries by dataLength */
43004210
staticint
4301-
TocEntrySizeCompare(constvoid*p1,constvoid*p2)
4211+
TocEntrySizeCompareQsort(constvoid*p1,constvoid*p2)
43024212
{
43034213
constTocEntry*te1=*(constTocEntry*const*)p1;
43044214
constTocEntry*te2=*(constTocEntry*const*)p2;
@@ -4318,17 +4228,25 @@ TocEntrySizeCompare(const void *p1, const void *p2)
43184228
return0;
43194229
}
43204230

4231+
/* binaryheap comparator for sorting TocEntries by dataLength */
4232+
staticint
4233+
TocEntrySizeCompareBinaryheap(void*p1,void*p2,void*arg)
4234+
{
4235+
/* return opposite of qsort comparator for max-heap */
4236+
return-TocEntrySizeCompareQsort(&p1,&p2);
4237+
}
4238+
43214239

43224240
/*
4323-
* Move all immediately-ready items from pending_list toready_list.
4241+
* Move all immediately-ready items from pending_list toready_heap.
43244242
*
43254243
* Items are considered ready if they have no remaining dependencies and
43264244
* they belong in the current restore pass. (See also reduce_dependencies,
43274245
* which applies the same logic one-at-a-time.)
43284246
*/
43294247
staticvoid
4330-
move_to_ready_list(TocEntry*pending_list,
4331-
ParallelReadyList*ready_list,
4248+
move_to_ready_heap(TocEntry*pending_list,
4249+
binaryheap*ready_heap,
43324250
RestorePasspass)
43334251
{
43344252
TocEntry*te;
@@ -4344,38 +4262,38 @@ move_to_ready_list(TocEntry *pending_list,
43444262
{
43454263
/* Remove it from pending_list ... */
43464264
pending_list_remove(te);
4347-
/* ... and add toready_list */
4348-
ready_list_insert(ready_list,te);
4265+
/* ... and add toready_heap */
4266+
binaryheap_add(ready_heap,te);
43494267
}
43504268
}
43514269
}
43524270

43534271
/*
43544272
* Find the next work item (if any) that is capable of being run now,
4355-
* and remove it from theready_list.
4273+
* and remove it from theready_heap.
43564274
*
43574275
* Returns the item, or NULL if nothing is runnable.
43584276
*
43594277
* To qualify, the item must have no remaining dependencies
43604278
* and no requirements for locks that are incompatible with
4361-
* items currently running. Items in theready_list are known to have
4279+
* items currently running. Items in theready_heap are known to have
43624280
* no remaining dependencies, but we have to check for lock conflicts.
43634281
*/
43644282
staticTocEntry*
4365-
pop_next_work_item(ParallelReadyList*ready_list,
4283+
pop_next_work_item(binaryheap*ready_heap,
43664284
ParallelState*pstate)
43674285
{
43684286
/*
4369-
*Sort theready_list so that we'll tackle larger jobs first.
4370-
*/
4371-
ready_list_sort(ready_list);
4372-
4373-
/*
4374-
*Search the ready_list until we find a suitable item.
4287+
*Search theready_heap until we find a suitable item. Note that we do a
4288+
* sequential scan through the heap nodes, so even though we will first
4289+
* try to choose the highest-priority item, we might end up picking
4290+
* something with a much lower priority. However, we expect that we will
4291+
* typically be able to pick one of the first few items, which should
4292+
*usually have a relatively high priority.
43754293
*/
4376-
for (inti=ready_list->first_te;i <=ready_list->last_te;i++)
4294+
for (inti=0;i<binaryheap_size(ready_heap);i++)
43774295
{
4378-
TocEntry*te=ready_list->tes[i];
4296+
TocEntry*te=(TocEntry*)binaryheap_get_node(ready_heap,i);
43794297
boolconflicts= false;
43804298

43814299
/*
@@ -4401,7 +4319,7 @@ pop_next_work_item(ParallelReadyList *ready_list,
44014319
continue;
44024320

44034321
/* passed all tests, so this item can run */
4404-
ready_list_remove(ready_list,i);
4322+
binaryheap_remove_node(ready_heap,i);
44054323
returnte;
44064324
}
44074325

@@ -4447,7 +4365,7 @@ mark_restore_job_done(ArchiveHandle *AH,
44474365
intstatus,
44484366
void*callback_data)
44494367
{
4450-
ParallelReadyList*ready_list= (ParallelReadyList*)callback_data;
4368+
binaryheap*ready_heap= (binaryheap*)callback_data;
44514369

44524370
pg_log_info("finished item %d %s %s",
44534371
te->dumpId,te->desc,te->tag);
@@ -4465,7 +4383,7 @@ mark_restore_job_done(ArchiveHandle *AH,
44654383
pg_fatal("worker process failed: exit code %d",
44664384
status);
44674385

4468-
reduce_dependencies(AH,te,ready_list);
4386+
reduce_dependencies(AH,te,ready_heap);
44694387
}
44704388

44714389

@@ -4708,11 +4626,11 @@ identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
47084626
/*
47094627
* Remove the specified TOC entry from the depCounts of items that depend on
47104628
* it, thereby possibly making them ready-to-run. Any pending item that
4711-
* becomes ready should be moved to theready_list, if that's provided.
4629+
* becomes ready should be moved to theready_heap, if that's provided.
47124630
*/
47134631
staticvoid
47144632
reduce_dependencies(ArchiveHandle*AH,TocEntry*te,
4715-
ParallelReadyList*ready_list)
4633+
binaryheap*ready_heap)
47164634
{
47174635
inti;
47184636

@@ -4730,18 +4648,18 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
47304648
* the current restore pass, and it is currently a member of the
47314649
* pending list (that check is needed to prevent double restore in
47324650
* some cases where a list-file forces out-of-order restoring).
4733-
* However, ifready_list == NULL then caller doesn't want any list
4651+
* However, ifready_heap == NULL then caller doesn't want any list
47344652
* memberships changed.
47354653
*/
47364654
if (otherte->depCount==0&&
47374655
_tocEntryRestorePass(otherte)==AH->restorePass&&
47384656
otherte->pending_prev!=NULL&&
4739-
ready_list!=NULL)
4657+
ready_heap!=NULL)
47404658
{
47414659
/* Remove it from pending list ... */
47424660
pending_list_remove(otherte);
4743-
/* ... and add toready_list */
4744-
ready_list_insert(ready_list,otherte);
4661+
/* ... and add toready_heap */
4662+
binaryheap_add(ready_heap,otherte);
47454663
}
47464664
}
47474665
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp