Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7f7f25f

Browse files
committed
Revert "Fix race in Parallel Hash Join batch cleanup."
This reverts commit378802e.This reverts commit3b8981b.Discussion:https://postgr.es/m/CA%2BhUKGJmcqAE3MZeDCLLXa62cWM0AJbKmp2JrJYaJ86bz36LFA%40mail.gmail.com
1 parent9fd2952 commit7f7f25f

File tree

5 files changed

+113
-142
lines changed

5 files changed

+113
-142
lines changed

‎src/backend/executor/nodeHash.c

Lines changed: 41 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -246,10 +246,10 @@ MultiExecParallelHash(HashState *node)
246246
*/
247247
pstate=hashtable->parallel_state;
248248
build_barrier=&pstate->build_barrier;
249-
Assert(BarrierPhase(build_barrier) >=PHJ_BUILD_ALLOCATE);
249+
Assert(BarrierPhase(build_barrier) >=PHJ_BUILD_ALLOCATING);
250250
switch (BarrierPhase(build_barrier))
251251
{
252-
casePHJ_BUILD_ALLOCATE:
252+
casePHJ_BUILD_ALLOCATING:
253253

254254
/*
255255
* Either I just allocated the initial hash table in
@@ -259,7 +259,7 @@ MultiExecParallelHash(HashState *node)
259259
BarrierArriveAndWait(build_barrier,WAIT_EVENT_HASH_BUILD_ALLOCATE);
260260
/* Fall through. */
261261

262-
casePHJ_BUILD_HASH_INNER:
262+
casePHJ_BUILD_HASHING_INNER:
263263

264264
/*
265265
* It's time to begin hashing, or if we just arrived here then
@@ -271,10 +271,10 @@ MultiExecParallelHash(HashState *node)
271271
* below.
272272
*/
273273
if (PHJ_GROW_BATCHES_PHASE(BarrierAttach(&pstate->grow_batches_barrier))!=
274-
PHJ_GROW_BATCHES_ELECT)
274+
PHJ_GROW_BATCHES_ELECTING)
275275
ExecParallelHashIncreaseNumBatches(hashtable);
276276
if (PHJ_GROW_BUCKETS_PHASE(BarrierAttach(&pstate->grow_buckets_barrier))!=
277-
PHJ_GROW_BUCKETS_ELECT)
277+
PHJ_GROW_BUCKETS_ELECTING)
278278
ExecParallelHashIncreaseNumBuckets(hashtable);
279279
ExecParallelHashEnsureBatchAccessors(hashtable);
280280
ExecParallelHashTableSetCurrentBatch(hashtable,0);
@@ -333,22 +333,15 @@ MultiExecParallelHash(HashState *node)
333333
hashtable->nbuckets=pstate->nbuckets;
334334
hashtable->log2_nbuckets=my_log2(hashtable->nbuckets);
335335
hashtable->totalTuples=pstate->total_tuples;
336-
337-
/*
338-
* Unless we're completely done and the batch state has been freed, make
339-
* sure we have accessors.
340-
*/
341-
if (BarrierPhase(build_barrier)<PHJ_BUILD_FREE)
342-
ExecParallelHashEnsureBatchAccessors(hashtable);
336+
ExecParallelHashEnsureBatchAccessors(hashtable);
343337

344338
/*
345339
* The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE
346-
* case, which will bring the build phase toPHJ_BUILD_RUN (if it isn't
340+
* case, which will bring the build phase toPHJ_BUILD_DONE (if it isn't
347341
* there already).
348342
*/
349-
Assert(BarrierPhase(build_barrier)==PHJ_BUILD_HASH_OUTER||
350-
BarrierPhase(build_barrier)==PHJ_BUILD_RUN||
351-
BarrierPhase(build_barrier)==PHJ_BUILD_FREE);
343+
Assert(BarrierPhase(build_barrier)==PHJ_BUILD_HASHING_OUTER||
344+
BarrierPhase(build_barrier)==PHJ_BUILD_DONE);
352345
}
353346

354347
/* ----------------------------------------------------------------
@@ -596,8 +589,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
596589
* Attach to the build barrier. The corresponding detach operation is
597590
* in ExecHashTableDetach. Note that we won't attach to the
598591
* batch_barrier for batch 0 yet. We'll attach later and start it out
599-
* inPHJ_BATCH_PROBE phase, because batch 0 is allocated up front and
600-
* then loaded while hashing (the standard hybrid hash join
592+
* inPHJ_BATCH_PROBING phase, because batch 0 is allocated up front
593+
*andthen loaded while hashing (the standard hybrid hash join
601594
* algorithm), and we'll coordinate that using build_barrier.
602595
*/
603596
build_barrier=&pstate->build_barrier;
@@ -610,7 +603,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
610603
* SharedHashJoinBatch objects and the hash table for batch 0. One
611604
* backend will be elected to do that now if necessary.
612605
*/
613-
if (BarrierPhase(build_barrier)==PHJ_BUILD_ELECT&&
606+
if (BarrierPhase(build_barrier)==PHJ_BUILD_ELECTING&&
614607
BarrierArriveAndWait(build_barrier,WAIT_EVENT_HASH_BUILD_ELECT))
615608
{
616609
pstate->nbatch=nbatch;
@@ -631,7 +624,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
631624
/*
632625
* The next Parallel Hash synchronization point is in
633626
* MultiExecParallelHash(), which will progress it all the way to
634-
*PHJ_BUILD_RUN. The caller must not return control from this
627+
*PHJ_BUILD_DONE. The caller must not return control from this
635628
* executor node between now and then.
636629
*/
637630
}
@@ -1067,7 +1060,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
10671060
ParallelHashJoinState*pstate=hashtable->parallel_state;
10681061
inti;
10691062

1070-
Assert(BarrierPhase(&pstate->build_barrier)==PHJ_BUILD_HASH_INNER);
1063+
Assert(BarrierPhase(&pstate->build_barrier)==PHJ_BUILD_HASHING_INNER);
10711064

10721065
/*
10731066
* It's unlikely, but we need to be prepared for new participants to show
@@ -1076,7 +1069,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
10761069
*/
10771070
switch (PHJ_GROW_BATCHES_PHASE(BarrierPhase(&pstate->grow_batches_barrier)))
10781071
{
1079-
casePHJ_GROW_BATCHES_ELECT:
1072+
casePHJ_GROW_BATCHES_ELECTING:
10801073

10811074
/*
10821075
* Elect one participant to prepare to grow the number of batches.
@@ -1194,13 +1187,13 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
11941187
}
11951188
/* Fall through. */
11961189

1197-
casePHJ_GROW_BATCHES_REALLOCATE:
1190+
casePHJ_GROW_BATCHES_ALLOCATING:
11981191
/* Wait for the above to be finished. */
11991192
BarrierArriveAndWait(&pstate->grow_batches_barrier,
1200-
WAIT_EVENT_HASH_GROW_BATCHES_REALLOCATE);
1193+
WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATE);
12011194
/* Fall through. */
12021195

1203-
casePHJ_GROW_BATCHES_REPARTITION:
1196+
casePHJ_GROW_BATCHES_REPARTITIONING:
12041197
/* Make sure that we have the current dimensions and buckets. */
12051198
ExecParallelHashEnsureBatchAccessors(hashtable);
12061199
ExecParallelHashTableSetCurrentBatch(hashtable,0);
@@ -1213,7 +1206,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
12131206
WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION);
12141207
/* Fall through. */
12151208

1216-
casePHJ_GROW_BATCHES_DECIDE:
1209+
casePHJ_GROW_BATCHES_DECIDING:
12171210

12181211
/*
12191212
* Elect one participant to clean up and decide whether further
@@ -1268,7 +1261,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
12681261
}
12691262
/* Fall through. */
12701263

1271-
casePHJ_GROW_BATCHES_FINISH:
1264+
casePHJ_GROW_BATCHES_FINISHING:
12721265
/* Wait for the above to complete. */
12731266
BarrierArriveAndWait(&pstate->grow_batches_barrier,
12741267
WAIT_EVENT_HASH_GROW_BATCHES_FINISH);
@@ -1508,7 +1501,7 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
15081501
HashMemoryChunkchunk;
15091502
dsa_pointerchunk_s;
15101503

1511-
Assert(BarrierPhase(&pstate->build_barrier)==PHJ_BUILD_HASH_INNER);
1504+
Assert(BarrierPhase(&pstate->build_barrier)==PHJ_BUILD_HASHING_INNER);
15121505

15131506
/*
15141507
* It's unlikely, but we need to be prepared for new participants to show
@@ -1517,7 +1510,7 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
15171510
*/
15181511
switch (PHJ_GROW_BUCKETS_PHASE(BarrierPhase(&pstate->grow_buckets_barrier)))
15191512
{
1520-
casePHJ_GROW_BUCKETS_ELECT:
1513+
casePHJ_GROW_BUCKETS_ELECTING:
15211514
/* Elect one participant to prepare to increase nbuckets. */
15221515
if (BarrierArriveAndWait(&pstate->grow_buckets_barrier,
15231516
WAIT_EVENT_HASH_GROW_BUCKETS_ELECT))
@@ -1546,13 +1539,13 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
15461539
}
15471540
/* Fall through. */
15481541

1549-
casePHJ_GROW_BUCKETS_REALLOCATE:
1542+
casePHJ_GROW_BUCKETS_ALLOCATING:
15501543
/* Wait for the above to complete. */
15511544
BarrierArriveAndWait(&pstate->grow_buckets_barrier,
1552-
WAIT_EVENT_HASH_GROW_BUCKETS_REALLOCATE);
1545+
WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATE);
15531546
/* Fall through. */
15541547

1555-
casePHJ_GROW_BUCKETS_REINSERT:
1548+
casePHJ_GROW_BUCKETS_REINSERTING:
15561549
/* Reinsert all tuples into the hash table. */
15571550
ExecParallelHashEnsureBatchAccessors(hashtable);
15581551
ExecParallelHashTableSetCurrentBatch(hashtable,0);
@@ -1708,7 +1701,7 @@ ExecParallelHashTableInsert(HashJoinTable hashtable,
17081701

17091702
/* Try to load it into memory. */
17101703
Assert(BarrierPhase(&hashtable->parallel_state->build_barrier)==
1711-
PHJ_BUILD_HASH_INNER);
1704+
PHJ_BUILD_HASHING_INNER);
17121705
hashTuple=ExecParallelHashTupleAlloc(hashtable,
17131706
HJTUPLE_OVERHEAD+tuple->t_len,
17141707
&shared);
@@ -2862,7 +2855,7 @@ ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size,
28622855
if (pstate->growth!=PHJ_GROWTH_DISABLED)
28632856
{
28642857
Assert(curbatch==0);
2865-
Assert(BarrierPhase(&pstate->build_barrier)==PHJ_BUILD_HASH_INNER);
2858+
Assert(BarrierPhase(&pstate->build_barrier)==PHJ_BUILD_HASHING_INNER);
28662859

28672860
/*
28682861
* Check if our space limit would be exceeded. To avoid choking on
@@ -2982,7 +2975,7 @@ ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
29822975
{
29832976
/* Batch 0 doesn't need to be loaded. */
29842977
BarrierAttach(&shared->batch_barrier);
2985-
while (BarrierPhase(&shared->batch_barrier)<PHJ_BATCH_PROBE)
2978+
while (BarrierPhase(&shared->batch_barrier)<PHJ_BATCH_PROBING)
29862979
BarrierArriveAndWait(&shared->batch_barrier,0);
29872980
BarrierDetach(&shared->batch_barrier);
29882981
}
@@ -3055,11 +3048,14 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
30553048
}
30563049

30573050
/*
3058-
* We should never see a state where the batch-tracking array is freed,
3059-
* because we should have given up sooner if we join when the build
3060-
* barrier has reached the PHJ_BUILD_FREE phase.
3051+
* It's possible for a backend to start up very late so that the whole
3052+
* join is finished and the shm state for tracking batches has already
3053+
* been freed by ExecHashTableDetach(). In that case we'll just leave
3054+
* hashtable->batches as NULL so that ExecParallelHashJoinNewBatch() gives
3055+
* up early.
30613056
*/
3062-
Assert(DsaPointerIsValid(pstate->batches));
3057+
if (!DsaPointerIsValid(pstate->batches))
3058+
return;
30633059

30643060
/* Use hash join memory context. */
30653061
oldcxt=MemoryContextSwitchTo(hashtable->hashCxt);
@@ -3140,7 +3136,7 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
31403136
* longer attached, but since there is no way it's moving after
31413137
* this point it seems safe to make the following assertion.
31423138
*/
3143-
Assert(BarrierPhase(&batch->batch_barrier)==PHJ_BATCH_FREE);
3139+
Assert(BarrierPhase(&batch->batch_barrier)==PHJ_BATCH_DONE);
31443140

31453141
/* Free shared chunks and buckets. */
31463142
while (DsaPointerIsValid(batch->chunks))
@@ -3179,17 +3175,9 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
31793175
void
31803176
ExecHashTableDetach(HashJoinTablehashtable)
31813177
{
3182-
ParallelHashJoinState*pstate=hashtable->parallel_state;
3183-
3184-
/*
3185-
* If we're involved in a parallel query, we must either have got all the
3186-
* way to PHJ_BUILD_RUN, or joined too late and be in PHJ_BUILD_FREE.
3187-
*/
3188-
Assert(!pstate||
3189-
BarrierPhase(&pstate->build_barrier) >=PHJ_BUILD_RUN);
3190-
3191-
if (pstate&&BarrierPhase(&pstate->build_barrier)==PHJ_BUILD_RUN)
3178+
if (hashtable->parallel_state)
31923179
{
3180+
ParallelHashJoinState*pstate=hashtable->parallel_state;
31933181
inti;
31943182

31953183
/* Make sure any temporary files are closed. */
@@ -3205,22 +3193,17 @@ ExecHashTableDetach(HashJoinTable hashtable)
32053193
}
32063194

32073195
/* If we're last to detach, clean up shared memory. */
3208-
if (BarrierArriveAndDetach(&pstate->build_barrier))
3196+
if (BarrierDetach(&pstate->build_barrier))
32093197
{
3210-
/*
3211-
* Late joining processes will see this state and give up
3212-
* immediately.
3213-
*/
3214-
Assert(BarrierPhase(&pstate->build_barrier)==PHJ_BUILD_FREE);
3215-
32163198
if (DsaPointerIsValid(pstate->batches))
32173199
{
32183200
dsa_free(hashtable->area,pstate->batches);
32193201
pstate->batches=InvalidDsaPointer;
32203202
}
32213203
}
3204+
3205+
hashtable->parallel_state=NULL;
32223206
}
3223-
hashtable->parallel_state=NULL;
32243207
}
32253208

32263209
/*

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp