Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc6dbf1f

Browse files
committed
Stop the executor if no more tuples can be sent from worker to leader.
If a Gather node has read as many tuples as it needs (for example, dueto Limit) it may detach the queue connecting it to the worker beforereading all of the worker's tuples. Rather than let the workercontinue to generate and send all of the results, have it stop aftersending the next tuple.More could be done here to stop the worker even quicker, but this isabout as well as we can hope to do for 9.6.This is in response to a problem report from Andreas Seltenreich.Commit44339b8 should be actually besufficient to fix that example even without this change, but it seemsbetter to do this, too, since we might otherwise waste quite a largeamount of effort in one or more workers.Discussion: CAA4eK1KOKGqmz9bGu+Z42qhRwMbm4R5rfnqsLCNqFs9j14jzEA@mail.gmail.comAmit Kapila
1 parent44339b8 commitc6dbf1f

File tree

14 files changed

+78
-29
lines changed

14 files changed

+78
-29
lines changed

‎src/backend/access/common/printtup.c

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@
2626

2727
staticvoidprinttup_startup(DestReceiver*self,intoperation,
2828
TupleDesctypeinfo);
29-
staticvoidprinttup(TupleTableSlot*slot,DestReceiver*self);
30-
staticvoidprinttup_20(TupleTableSlot*slot,DestReceiver*self);
31-
staticvoidprinttup_internal_20(TupleTableSlot*slot,DestReceiver*self);
29+
staticboolprinttup(TupleTableSlot*slot,DestReceiver*self);
30+
staticboolprinttup_20(TupleTableSlot*slot,DestReceiver*self);
31+
staticboolprinttup_internal_20(TupleTableSlot*slot,DestReceiver*self);
3232
staticvoidprinttup_shutdown(DestReceiver*self);
3333
staticvoidprinttup_destroy(DestReceiver*self);
3434

@@ -299,7 +299,7 @@ printtup_prepare_info(DR_printtup *myState, TupleDesc typeinfo, int numAttrs)
299299
*printtup --- print a tuple in protocol 3.0
300300
* ----------------
301301
*/
302-
staticvoid
302+
staticbool
303303
printtup(TupleTableSlot*slot,DestReceiver*self)
304304
{
305305
TupleDesctypeinfo=slot->tts_tupleDescriptor;
@@ -376,13 +376,15 @@ printtup(TupleTableSlot *slot, DestReceiver *self)
376376
/* Return to caller's context, and flush row's temporary memory */
377377
MemoryContextSwitchTo(oldcontext);
378378
MemoryContextReset(myState->tmpcontext);
379+
380+
return true;
379381
}
380382

381383
/* ----------------
382384
*printtup_20 --- print a tuple in protocol 2.0
383385
* ----------------
384386
*/
385-
staticvoid
387+
staticbool
386388
printtup_20(TupleTableSlot*slot,DestReceiver*self)
387389
{
388390
TupleDesctypeinfo=slot->tts_tupleDescriptor;
@@ -452,6 +454,8 @@ printtup_20(TupleTableSlot *slot, DestReceiver *self)
452454
/* Return to caller's context, and flush row's temporary memory */
453455
MemoryContextSwitchTo(oldcontext);
454456
MemoryContextReset(myState->tmpcontext);
457+
458+
return true;
455459
}
456460

457461
/* ----------------
@@ -528,7 +532,7 @@ debugStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
528532
*debugtup - print one tuple for an interactive backend
529533
* ----------------
530534
*/
531-
void
535+
bool
532536
debugtup(TupleTableSlot*slot,DestReceiver*self)
533537
{
534538
TupleDesctypeinfo=slot->tts_tupleDescriptor;
@@ -553,6 +557,8 @@ debugtup(TupleTableSlot *slot, DestReceiver *self)
553557
printatt((unsigned)i+1,typeinfo->attrs[i],value);
554558
}
555559
printf("\t----\n");
560+
561+
return true;
556562
}
557563

558564
/* ----------------
@@ -564,7 +570,7 @@ debugtup(TupleTableSlot *slot, DestReceiver *self)
564570
* This is largely same as printtup_20, except we use binary formatting.
565571
* ----------------
566572
*/
567-
staticvoid
573+
staticbool
568574
printtup_internal_20(TupleTableSlot*slot,DestReceiver*self)
569575
{
570576
TupleDesctypeinfo=slot->tts_tupleDescriptor;
@@ -636,4 +642,6 @@ printtup_internal_20(TupleTableSlot *slot, DestReceiver *self)
636642
/* Return to caller's context, and flush row's temporary memory */
637643
MemoryContextSwitchTo(oldcontext);
638644
MemoryContextReset(myState->tmpcontext);
645+
646+
return true;
639647
}

‎src/backend/commands/copy.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4454,7 +4454,7 @@ copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
44544454
/*
44554455
* copy_dest_receive --- receive one tuple
44564456
*/
4457-
staticvoid
4457+
staticbool
44584458
copy_dest_receive(TupleTableSlot*slot,DestReceiver*self)
44594459
{
44604460
DR_copy*myState= (DR_copy*)self;
@@ -4466,6 +4466,8 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
44664466
/* And send the data */
44674467
CopyOneRowTo(cstate,InvalidOid,slot->tts_values,slot->tts_isnull);
44684468
myState->processed++;
4469+
4470+
return true;
44694471
}
44704472

44714473
/*

‎src/backend/commands/createas.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ typedef struct
6262
staticObjectAddressCreateAsReladdr= {InvalidOid,InvalidOid,0};
6363

6464
staticvoidintorel_startup(DestReceiver*self,intoperation,TupleDesctypeinfo);
65-
staticvoidintorel_receive(TupleTableSlot*slot,DestReceiver*self);
65+
staticboolintorel_receive(TupleTableSlot*slot,DestReceiver*self);
6666
staticvoidintorel_shutdown(DestReceiver*self);
6767
staticvoidintorel_destroy(DestReceiver*self);
6868

@@ -482,7 +482,7 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
482482
/*
483483
* intorel_receive --- receive one tuple
484484
*/
485-
staticvoid
485+
staticbool
486486
intorel_receive(TupleTableSlot*slot,DestReceiver*self)
487487
{
488488
DR_intorel*myState= (DR_intorel*)self;
@@ -507,6 +507,8 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
507507
myState->bistate);
508508

509509
/* We know this is a newly created relation, so there are no indexes */
510+
511+
return true;
510512
}
511513

512514
/*

‎src/backend/commands/matview.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ typedef struct
5656
staticintmatview_maintenance_depth=0;
5757

5858
staticvoidtransientrel_startup(DestReceiver*self,intoperation,TupleDesctypeinfo);
59-
staticvoidtransientrel_receive(TupleTableSlot*slot,DestReceiver*self);
59+
staticbooltransientrel_receive(TupleTableSlot*slot,DestReceiver*self);
6060
staticvoidtransientrel_shutdown(DestReceiver*self);
6161
staticvoidtransientrel_destroy(DestReceiver*self);
6262
staticvoidrefresh_matview_datafill(DestReceiver*dest,Query*query,
@@ -467,7 +467,7 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
467467
/*
468468
* transientrel_receive --- receive one tuple
469469
*/
470-
staticvoid
470+
staticbool
471471
transientrel_receive(TupleTableSlot*slot,DestReceiver*self)
472472
{
473473
DR_transientrel*myState= (DR_transientrel*)self;
@@ -486,6 +486,8 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
486486
myState->bistate);
487487

488488
/* We know this is a newly created relation, so there are no indexes */
489+
490+
return true;
489491
}
490492

491493
/*

‎src/backend/executor/execMain.c

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1593,7 +1593,15 @@ ExecutePlan(EState *estate,
15931593
* practice, this is probably always the case at this point.)
15941594
*/
15951595
if (sendTuples)
1596-
(*dest->receiveSlot) (slot,dest);
1596+
{
1597+
/*
1598+
* If we are not able to send the tuple, we assume the destination
1599+
* has closed and no more tuples can be sent. If that's the case,
1600+
* end the loop.
1601+
*/
1602+
if (!((*dest->receiveSlot) (slot,dest)))
1603+
break;
1604+
}
15971605

15981606
/*
15991607
* Count tuples processed, if this is a SELECT. (For other operation

‎src/backend/executor/execTuples.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1266,7 +1266,7 @@ do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
12661266
ExecStoreVirtualTuple(slot);
12671267

12681268
/* send the tuple to the receiver */
1269-
(*tstate->dest->receiveSlot) (slot,tstate->dest);
1269+
(void) (*tstate->dest->receiveSlot) (slot,tstate->dest);
12701270

12711271
/* clean up */
12721272
ExecClearTuple(slot);

‎src/backend/executor/functions.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ static Datum postquel_get_single_result(TupleTableSlot *slot,
167167
staticvoidsql_exec_error_callback(void*arg);
168168
staticvoidShutdownSQLFunction(Datumarg);
169169
staticvoidsqlfunction_startup(DestReceiver*self,intoperation,TupleDesctypeinfo);
170-
staticvoidsqlfunction_receive(TupleTableSlot*slot,DestReceiver*self);
170+
staticboolsqlfunction_receive(TupleTableSlot*slot,DestReceiver*self);
171171
staticvoidsqlfunction_shutdown(DestReceiver*self);
172172
staticvoidsqlfunction_destroy(DestReceiver*self);
173173

@@ -1904,7 +1904,7 @@ sqlfunction_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
19041904
/*
19051905
* sqlfunction_receive --- receive one tuple
19061906
*/
1907-
staticvoid
1907+
staticbool
19081908
sqlfunction_receive(TupleTableSlot*slot,DestReceiver*self)
19091909
{
19101910
DR_sqlfunction*myState= (DR_sqlfunction*)self;
@@ -1914,6 +1914,8 @@ sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self)
19141914

19151915
/* Store the filtered tuple into the tuplestore */
19161916
tuplestore_puttupleslot(myState->tstore,slot);
1917+
1918+
return true;
19171919
}
19181920

19191921
/*

‎src/backend/executor/spi.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1774,7 +1774,7 @@ spi_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
17741774
*store tuple retrieved by Executor into SPITupleTable
17751775
*of current SPI procedure
17761776
*/
1777-
void
1777+
bool
17781778
spi_printtup(TupleTableSlot*slot,DestReceiver*self)
17791779
{
17801780
SPITupleTable*tuptable;
@@ -1809,6 +1809,8 @@ spi_printtup(TupleTableSlot *slot, DestReceiver *self)
18091809
(tuptable->free)--;
18101810

18111811
MemoryContextSwitchTo(oldcxt);
1812+
1813+
return true;
18121814
}
18131815

18141816
/*

‎src/backend/executor/tqueue.c

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,13 @@ static RemapInfo *BuildRemapInfo(TupleDesc tupledesc);
115115
* type over a range type over a range type over an array type over a record,
116116
* or something like that.
117117
*/
118-
staticvoid
118+
staticbool
119119
tqueueReceiveSlot(TupleTableSlot*slot,DestReceiver*self)
120120
{
121121
TQueueDestReceiver*tqueue= (TQueueDestReceiver*)self;
122122
TupleDesctupledesc=slot->tts_tupleDescriptor;
123123
HeapTupletuple;
124+
shm_mq_resultresult;
124125

125126
/*
126127
* Test to see whether the tupledesc has changed; if so, set up for the
@@ -195,7 +196,16 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
195196
}
196197

197198
/* Send the tuple itself. */
198-
shm_mq_send(tqueue->handle,tuple->t_len,tuple->t_data, false);
199+
result=shm_mq_send(tqueue->handle,tuple->t_len,tuple->t_data, false);
200+
201+
if (result==SHM_MQ_DETACHED)
202+
return false;
203+
elseif (result!=SHM_MQ_SUCCESS)
204+
ereport(ERROR,
205+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
206+
errmsg("unable to send tuples")));
207+
208+
return true;
199209
}
200210

201211
/*

‎src/backend/executor/tstoreReceiver.c

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ typedef struct
3737
}TStoreState;
3838

3939

40-
staticvoidtstoreReceiveSlot_notoast(TupleTableSlot*slot,DestReceiver*self);
41-
staticvoidtstoreReceiveSlot_detoast(TupleTableSlot*slot,DestReceiver*self);
40+
staticbooltstoreReceiveSlot_notoast(TupleTableSlot*slot,DestReceiver*self);
41+
staticbooltstoreReceiveSlot_detoast(TupleTableSlot*slot,DestReceiver*self);
4242

4343

4444
/*
@@ -90,19 +90,21 @@ tstoreStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
9090
* Receive a tuple from the executor and store it in the tuplestore.
9191
* This is for the easy case where we don't have to detoast.
9292
*/
93-
staticvoid
93+
staticbool
9494
tstoreReceiveSlot_notoast(TupleTableSlot*slot,DestReceiver*self)
9595
{
9696
TStoreState*myState= (TStoreState*)self;
9797

9898
tuplestore_puttupleslot(myState->tstore,slot);
99+
100+
return true;
99101
}
100102

101103
/*
102104
* Receive a tuple from the executor and store it in the tuplestore.
103105
* This is for the case where we have to detoast any toasted values.
104106
*/
105-
staticvoid
107+
staticbool
106108
tstoreReceiveSlot_detoast(TupleTableSlot*slot,DestReceiver*self)
107109
{
108110
TStoreState*myState= (TStoreState*)self;
@@ -152,6 +154,8 @@ tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self)
152154
/* And release any temporary detoasted values */
153155
for (i=0;i<nfree;i++)
154156
pfree(DatumGetPointer(myState->tofree[i]));
157+
158+
return true;
155159
}
156160

157161
/*

‎src/backend/tcop/dest.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,10 @@
4545
*dummy DestReceiver functions
4646
* ----------------
4747
*/
48-
staticvoid
48+
staticbool
4949
donothingReceive(TupleTableSlot*slot,DestReceiver*self)
5050
{
51+
return true;
5152
}
5253

5354
staticvoid

‎src/backend/tcop/pquery.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1109,7 +1109,13 @@ RunFromStore(Portal portal, ScanDirection direction, uint64 count,
11091109
if (!ok)
11101110
break;
11111111

1112-
(*dest->receiveSlot) (slot,dest);
1112+
/*
1113+
* If we are not able to send the tuple, we assume the destination
1114+
* has closed and no more tuples can be sent. If that's the case,
1115+
* end the loop.
1116+
*/
1117+
if (!((*dest->receiveSlot) (slot,dest)))
1118+
break;
11131119

11141120
ExecClearTuple(slot);
11151121

‎src/include/access/printtup.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ extern void SendRowDescriptionMessage(TupleDesc typeinfo, List *targetlist,
2525

2626
externvoiddebugStartup(DestReceiver*self,intoperation,
2727
TupleDesctypeinfo);
28-
externvoiddebugtup(TupleTableSlot*slot,DestReceiver*self);
28+
externbooldebugtup(TupleTableSlot*slot,DestReceiver*self);
2929

3030
/* XXX these are really in executor/spi.c */
3131
externvoidspi_dest_startup(DestReceiver*self,intoperation,
3232
TupleDesctypeinfo);
33-
externvoidspi_printtup(TupleTableSlot*slot,DestReceiver*self);
33+
externboolspi_printtup(TupleTableSlot*slot,DestReceiver*self);
3434

3535
#endif/* PRINTTUP_H */

‎src/include/tcop/dest.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,15 +104,17 @@ typedef enum
104104
*pointers that the executor must call.
105105
*
106106
* Note: the receiveSlot routine must be passed a slot containing a TupleDesc
107-
* identical to the one given to the rStartup routine.
107+
* identical to the one given to the rStartup routine. It returns bool where
108+
* a "true" value means "continue processing" and a "false" value means
109+
* "stop early, just as if we'd reached the end of the scan".
108110
* ----------------
109111
*/
110112
typedefstruct_DestReceiverDestReceiver;
111113

112114
struct_DestReceiver
113115
{
114116
/* Called for each tuple to be output: */
115-
void(*receiveSlot) (TupleTableSlot*slot,
117+
bool(*receiveSlot) (TupleTableSlot*slot,
116118
DestReceiver*self);
117119
/* Per-executor-run initialization and shutdown: */
118120
void(*rStartup) (DestReceiver*self,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp