Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitcdc7169

Browse files
committed
Use MinimalTuple for tuple queues.
This representation saves 8 bytes per tuple compared to HeapTuple, andavoids the need to allocate, copy and free on the receiving side.Gather can emit the returned MinimalTuple directly, but GatherMerge nowneeds to make an explicit copy because it buffers multiple tuples at atime. That should be no worse than before.Reviewed-by: Soumyadeep Chakraborty <soumyadeep2007@gmail.com>Discussion:https://postgr.es/m/CA%2BhUKG%2B8T_ggoUTAE-U%3DA%2BOcPc4%3DB0nPPHcSfffuQhvXXjML6w%40mail.gmail.com
1 parentd2bddc2 commitcdc7169

File tree

5 files changed

+51
-47
lines changed

5 files changed

+51
-47
lines changed

‎src/backend/executor/nodeGather.c

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646

4747
staticTupleTableSlot*ExecGather(PlanState*pstate);
4848
staticTupleTableSlot*gather_getnext(GatherState*gatherstate);
49-
staticHeapTuplegather_readnext(GatherState*gatherstate);
49+
staticMinimalTuplegather_readnext(GatherState*gatherstate);
5050
staticvoidExecShutdownGatherWorkers(GatherState*node);
5151

5252

@@ -120,7 +120,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
120120
* Initialize funnel slot to same tuple descriptor as outer plan.
121121
*/
122122
gatherstate->funnel_slot=ExecInitExtraTupleSlot(estate,tupDesc,
123-
&TTSOpsHeapTuple);
123+
&TTSOpsMinimalTuple);
124124

125125
/*
126126
* Gather doesn't support checking a qual (it's always more efficient to
@@ -266,7 +266,7 @@ gather_getnext(GatherState *gatherstate)
266266
PlanState*outerPlan=outerPlanState(gatherstate);
267267
TupleTableSlot*outerTupleSlot;
268268
TupleTableSlot*fslot=gatherstate->funnel_slot;
269-
HeapTupletup;
269+
MinimalTupletup;
270270

271271
while (gatherstate->nreaders>0||gatherstate->need_to_scan_locally)
272272
{
@@ -278,9 +278,9 @@ gather_getnext(GatherState *gatherstate)
278278

279279
if (HeapTupleIsValid(tup))
280280
{
281-
ExecStoreHeapTuple(tup,/* tuple to store */
282-
fslot,/* slot to store the tuple */
283-
true);/* pfree tuplewhen done with it */
281+
ExecStoreMinimalTuple(tup,/* tuple to store */
282+
fslot,/* slot to store the tuple */
283+
false);/*don'tpfree tuple */
284284
returnfslot;
285285
}
286286
}
@@ -308,15 +308,15 @@ gather_getnext(GatherState *gatherstate)
308308
/*
309309
* Attempt to read a tuple from one of our parallel workers.
310310
*/
311-
staticHeapTuple
311+
staticMinimalTuple
312312
gather_readnext(GatherState*gatherstate)
313313
{
314314
intnvisited=0;
315315

316316
for (;;)
317317
{
318318
TupleQueueReader*reader;
319-
HeapTupletup;
319+
MinimalTupletup;
320320
boolreaderdone;
321321

322322
/* Check for async events, particularly messages from workers. */

‎src/backend/executor/nodeGatherMerge.c

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
*/
4646
typedefstructGMReaderTupleBuffer
4747
{
48-
HeapTuple*tuple;/* array of length MAX_TUPLE_STORE */
48+
MinimalTuple*tuple;/* array of length MAX_TUPLE_STORE */
4949
intnTuples;/* number of tuples currently stored */
5050
intreadCounter;/* index of next tuple to extract */
5151
booldone;/* true if reader is known exhausted */
@@ -54,8 +54,8 @@ typedef struct GMReaderTupleBuffer
5454
staticTupleTableSlot*ExecGatherMerge(PlanState*pstate);
5555
staticint32heap_compare_slots(Datuma,Datumb,void*arg);
5656
staticTupleTableSlot*gather_merge_getnext(GatherMergeState*gm_state);
57-
staticHeapTuplegm_readnext_tuple(GatherMergeState*gm_state,intnreader,
58-
boolnowait,bool*done);
57+
staticMinimalTuplegm_readnext_tuple(GatherMergeState*gm_state,intnreader,
58+
boolnowait,bool*done);
5959
staticvoidExecShutdownGatherMergeWorkers(GatherMergeState*node);
6060
staticvoidgather_merge_setup(GatherMergeState*gm_state);
6161
staticvoidgather_merge_init(GatherMergeState*gm_state);
@@ -419,12 +419,12 @@ gather_merge_setup(GatherMergeState *gm_state)
419419
{
420420
/* Allocate the tuple array with length MAX_TUPLE_STORE */
421421
gm_state->gm_tuple_buffers[i].tuple=
422-
(HeapTuple*)palloc0(sizeof(HeapTuple)*MAX_TUPLE_STORE);
422+
(MinimalTuple*)palloc0(sizeof(MinimalTuple)*MAX_TUPLE_STORE);
423423

424424
/* Initialize tuple slot for worker */
425425
gm_state->gm_slots[i+1]=
426426
ExecInitExtraTupleSlot(gm_state->ps.state,gm_state->tupDesc,
427-
&TTSOpsHeapTuple);
427+
&TTSOpsMinimalTuple);
428428
}
429429

430430
/* Allocate the resources for the merge */
@@ -533,7 +533,7 @@ gather_merge_clear_tuples(GatherMergeState *gm_state)
533533
GMReaderTupleBuffer*tuple_buffer=&gm_state->gm_tuple_buffers[i];
534534

535535
while (tuple_buffer->readCounter<tuple_buffer->nTuples)
536-
heap_freetuple(tuple_buffer->tuple[tuple_buffer->readCounter++]);
536+
pfree(tuple_buffer->tuple[tuple_buffer->readCounter++]);
537537

538538
ExecClearTuple(gm_state->gm_slots[i+1]);
539539
}
@@ -613,13 +613,13 @@ load_tuple_array(GatherMergeState *gm_state, int reader)
613613
/* Try to fill additional slots in the array. */
614614
for (i=tuple_buffer->nTuples;i<MAX_TUPLE_STORE;i++)
615615
{
616-
HeapTupletuple;
616+
MinimalTupletuple;
617617

618618
tuple=gm_readnext_tuple(gm_state,
619619
reader,
620620
true,
621621
&tuple_buffer->done);
622-
if (!HeapTupleIsValid(tuple))
622+
if (!tuple)
623623
break;
624624
tuple_buffer->tuple[i]=tuple;
625625
tuple_buffer->nTuples++;
@@ -637,7 +637,7 @@ static bool
637637
gather_merge_readnext(GatherMergeState*gm_state,intreader,boolnowait)
638638
{
639639
GMReaderTupleBuffer*tuple_buffer;
640-
HeapTupletup;
640+
MinimalTupletup;
641641

642642
/*
643643
* If we're being asked to generate a tuple from the leader, then we just
@@ -687,7 +687,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
687687
reader,
688688
nowait,
689689
&tuple_buffer->done);
690-
if (!HeapTupleIsValid(tup))
690+
if (!tup)
691691
return false;
692692

693693
/*
@@ -697,26 +697,26 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
697697
load_tuple_array(gm_state,reader);
698698
}
699699

700-
Assert(HeapTupleIsValid(tup));
700+
Assert(tup);
701701

702702
/* Build the TupleTableSlot for the given tuple */
703-
ExecStoreHeapTuple(tup,/* tuple to store */
704-
gm_state->gm_slots[reader],/* slot in which to store
705-
* the tuple */
706-
true);/* pfree tuple when done with it */
703+
ExecStoreMinimalTuple(tup,/* tuple to store */
704+
gm_state->gm_slots[reader],/* slot in which to store
705+
* the tuple */
706+
true);/* pfree tuple when done with it */
707707

708708
return true;
709709
}
710710

711711
/*
712712
* Attempt to read a tuple from given worker.
713713
*/
714-
staticHeapTuple
714+
staticMinimalTuple
715715
gm_readnext_tuple(GatherMergeState*gm_state,intnreader,boolnowait,
716716
bool*done)
717717
{
718718
TupleQueueReader*reader;
719-
HeapTupletup;
719+
MinimalTupletup;
720720

721721
/* Check for async events, particularly messages from workers. */
722722
CHECK_FOR_INTERRUPTS();
@@ -732,7 +732,11 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
732732
reader=gm_state->reader[nreader-1];
733733
tup=TupleQueueReaderNext(reader,nowait,done);
734734

735-
returntup;
735+
/*
736+
* Since we'll be buffering these across multiple calls, we need to make a
737+
* copy.
738+
*/
739+
returntup ?heap_copy_minimal_tuple(tup) :NULL;
736740
}
737741

738742
/*

‎src/backend/executor/tqueue.c

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,16 @@ static bool
5454
tqueueReceiveSlot(TupleTableSlot*slot,DestReceiver*self)
5555
{
5656
TQueueDestReceiver*tqueue= (TQueueDestReceiver*)self;
57-
HeapTupletuple;
57+
MinimalTupletuple;
5858
shm_mq_resultresult;
5959
boolshould_free;
6060

6161
/* Send the tuple itself. */
62-
tuple=ExecFetchSlotHeapTuple(slot, true,&should_free);
63-
result=shm_mq_send(tqueue->queue,tuple->t_len,tuple->t_data, false);
62+
tuple=ExecFetchSlotMinimalTuple(slot,&should_free);
63+
result=shm_mq_send(tqueue->queue,tuple->t_len,tuple, false);
6464

6565
if (should_free)
66-
heap_freetuple(tuple);
66+
pfree(tuple);
6767

6868
/* Check for failure. */
6969
if (result==SHM_MQ_DETACHED)
@@ -164,18 +164,18 @@ DestroyTupleQueueReader(TupleQueueReader *reader)
164164
* nowait = true and no tuple is ready to return. *done, if not NULL,
165165
* is set to true when there are no remaining tuples and otherwise to false.
166166
*
167-
* The returned tuple, if any, isallocated inCurrentMemoryContext.
168-
*Note that this routine must not leak memory! (We used to allow that,
169-
*but not any more.)
167+
* The returned tuple, if any, iseither inshared memory or a private buffer
168+
*and should not be freed. The pointer is invalid after the next call to
169+
*TupleQueueReaderNext().
170170
*
171171
* Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
172172
* accumulate bytes from a partially-read message, so it's useful to call
173173
* this with nowait = true even if nothing is returned.
174174
*/
175-
HeapTuple
175+
MinimalTuple
176176
TupleQueueReaderNext(TupleQueueReader*reader,boolnowait,bool*done)
177177
{
178-
HeapTupleDatahtup;
178+
MinimalTupletuple;
179179
shm_mq_resultresult;
180180
Sizenbytes;
181181
void*data;
@@ -200,13 +200,11 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
200200
Assert(result==SHM_MQ_SUCCESS);
201201

202202
/*
203-
*Set up a dummy HeapTupleData pointingto thedata from the shm_mq
204-
*(which had better besufficiently aligned).
203+
*Return a pointerto thequeue memory directly (which had better be
204+
* sufficiently aligned).
205205
*/
206-
ItemPointerSetInvalid(&htup.t_self);
207-
htup.t_tableOid=InvalidOid;
208-
htup.t_len=nbytes;
209-
htup.t_data=data;
206+
tuple= (MinimalTuple)data;
207+
Assert(tuple->t_len==nbytes);
210208

211-
returnheap_copytuple(&htup);
209+
returntuple;
212210
}

‎src/backend/optimizer/plan/createplan.c

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1730,8 +1730,10 @@ create_gather_plan(PlannerInfo *root, GatherPath *best_path)
17301730
List*tlist;
17311731

17321732
/*
1733-
* Although the Gather node can project, we prefer to push down such work
1734-
* to its child node, so demand an exact tlist from the child.
1733+
* Push projection down to the child node. That way, the projection work
1734+
* is parallelized, and there can be no system columns in the result (they
1735+
* can't travel through a tuple queue because it uses MinimalTuple
1736+
* representation).
17351737
*/
17361738
subplan=create_plan_recurse(root,best_path->subpath,CP_EXACT_TLIST);
17371739

@@ -1766,7 +1768,7 @@ create_gather_merge_plan(PlannerInfo *root, GatherMergePath *best_path)
17661768
List*pathkeys=best_path->path.pathkeys;
17671769
List*tlist=build_path_tlist(root,&best_path->path);
17681770

1769-
/* As with Gather,it's best toproject away columns in the workers. */
1771+
/* As with Gather, project away columns in the workers. */
17701772
subplan=create_plan_recurse(root,best_path->subpath,CP_EXACT_TLIST);
17711773

17721774
/* Create a shell for a GatherMerge plan. */

‎src/include/executor/tqueue.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ extern DestReceiver *CreateTupleQueueDestReceiver(shm_mq_handle *handle);
2626
/* Use these to receive tuples from a shm_mq. */
2727
externTupleQueueReader*CreateTupleQueueReader(shm_mq_handle*handle);
2828
externvoidDestroyTupleQueueReader(TupleQueueReader*reader);
29-
externHeapTupleTupleQueueReaderNext(TupleQueueReader*reader,
30-
boolnowait,bool*done);
29+
externMinimalTupleTupleQueueReaderNext(TupleQueueReader*reader,
30+
boolnowait,bool*done);
3131

3232
#endif/* TQUEUE_H */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp