Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4a4e689

Browse files
committed
Glue layer to connect the executor to the shm_mq mechanism.
The shm_mq mechanism was built to send error (and notice) messages andtuples between backends. However, shm_mq itself only deals in rawbytes. Since commit2bd9e41, we havehad infrastructure for one message to redirect protocol messages to aqueue and for another backend to parse them and do useful things withthem. This commit introduces a somewhat analogous facility for tuplesby adding a new type of DestReceiver, DestTupleQueue, which writeseach tuple generated by a query into a shm_mq, and a newTupleQueueFunnel facility which reads raw tuples out of the queue andreconstructs the HeapTuple format expected by the executor.The TupleQueueFunnel abstraction supports reading from multiple tuplestreams at the same time, but only in round-robin fashion. Someonecould imaginably want other policies, but this should be good enoughto meet our short-term needs related to parallel query, and we canalways extend it later.This also makes one minor addition to the shm_mq API that didn'seem worth breaking out as a separate patch.Extracted from Amit Kapila's parallel sequential scan patch. Thiscode was originally written by me, and then it was revised by Amit,and then it was revised some more by me.
1 parentc00c324 commit4a4e689

File tree

8 files changed

+316
-2
lines changed

8 files changed

+316
-2
lines changed

‎src/backend/executor/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,6 @@ OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \
2424
nodeSamplescan.o nodeSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o\
2525
nodeValuesscan.o nodeCtescan.o nodeWorktablescan.o\
2626
nodeGroup.o nodeSubplan.o nodeSubqueryscan.o nodeTidscan.o\
27-
nodeForeignscan.o nodeWindowAgg.o tstoreReceiver.o spi.o
27+
nodeForeignscan.o nodeWindowAgg.o tstoreReceiver.otqueue.ospi.o
2828

2929
include$(top_srcdir)/src/backend/common.mk

‎src/backend/executor/tqueue.c

Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
/*-------------------------------------------------------------------------
2+
*
3+
* tqueue.c
4+
* Use shm_mq to send & receive tuples between parallel backends
5+
*
6+
* A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
7+
* under the hood, writes tuples from the executor to a shm_mq.
8+
*
9+
* A TupleQueueFunnel helps manage the process of reading tuples from
10+
* one or more shm_mq objects being used as tuple queues.
11+
*
12+
* Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
13+
* Portions Copyright (c) 1994, Regents of the University of California
14+
*
15+
* IDENTIFICATION
16+
* src/backend/executor/tqueue.c
17+
*
18+
*-------------------------------------------------------------------------
19+
*/
20+
21+
#include"postgres.h"
22+
23+
#include"access/htup_details.h"
24+
#include"executor/tqueue.h"
25+
#include"miscadmin.h"
26+
27+
typedefstruct
28+
{
29+
DestReceiverpub;
30+
shm_mq_handle*handle;
31+
}TQueueDestReceiver;
32+
33+
structTupleQueueFunnel
34+
{
35+
intnqueues;
36+
intmaxqueues;
37+
intnextqueue;
38+
shm_mq_handle**queue;
39+
};
40+
41+
/*
42+
* Receive a tuple.
43+
*/
44+
staticvoid
45+
tqueueReceiveSlot(TupleTableSlot*slot,DestReceiver*self)
46+
{
47+
TQueueDestReceiver*tqueue= (TQueueDestReceiver*)self;
48+
HeapTupletuple;
49+
50+
tuple=ExecMaterializeSlot(slot);
51+
shm_mq_send(tqueue->handle,tuple->t_len,tuple->t_data, false);
52+
}
53+
54+
/*
55+
* Prepare to receive tuples from executor.
56+
*/
57+
staticvoid
58+
tqueueStartupReceiver(DestReceiver*self,intoperation,TupleDesctypeinfo)
59+
{
60+
/* do nothing */
61+
}
62+
63+
/*
64+
* Clean up at end of an executor run
65+
*/
66+
staticvoid
67+
tqueueShutdownReceiver(DestReceiver*self)
68+
{
69+
/* do nothing */
70+
}
71+
72+
/*
73+
* Destroy receiver when done with it
74+
*/
75+
staticvoid
76+
tqueueDestroyReceiver(DestReceiver*self)
77+
{
78+
pfree(self);
79+
}
80+
81+
/*
82+
* Create a DestReceiver that writes tuples to a tuple queue.
83+
*/
84+
DestReceiver*
85+
CreateTupleQueueDestReceiver(shm_mq_handle*handle)
86+
{
87+
TQueueDestReceiver*self;
88+
89+
self= (TQueueDestReceiver*)palloc0(sizeof(TQueueDestReceiver));
90+
91+
self->pub.receiveSlot=tqueueReceiveSlot;
92+
self->pub.rStartup=tqueueStartupReceiver;
93+
self->pub.rShutdown=tqueueShutdownReceiver;
94+
self->pub.rDestroy=tqueueDestroyReceiver;
95+
self->pub.mydest=DestTupleQueue;
96+
self->handle=handle;
97+
98+
return (DestReceiver*)self;
99+
}
100+
101+
/*
102+
* Create a tuple queue funnel.
103+
*/
104+
TupleQueueFunnel*
105+
CreateTupleQueueFunnel(void)
106+
{
107+
TupleQueueFunnel*funnel=palloc0(sizeof(TupleQueueFunnel));
108+
109+
funnel->maxqueues=8;
110+
funnel->queue=palloc(funnel->maxqueues*sizeof(shm_mq_handle*));
111+
112+
returnfunnel;
113+
}
114+
115+
/*
116+
* Destroy a tuple queue funnel.
117+
*/
118+
void
119+
DestroyTupleQueueFunnel(TupleQueueFunnel*funnel)
120+
{
121+
inti;
122+
123+
for (i=0;i<funnel->nqueues;i++)
124+
shm_mq_detach(shm_mq_get_queue(funnel->queue[i]));
125+
pfree(funnel->queue);
126+
pfree(funnel);
127+
}
128+
129+
/*
130+
* Remember the shared memory queue handle in funnel.
131+
*/
132+
void
133+
RegisterTupleQueueOnFunnel(TupleQueueFunnel*funnel,shm_mq_handle*handle)
134+
{
135+
if (funnel->nqueues<funnel->maxqueues)
136+
{
137+
funnel->queue[funnel->nqueues++]=handle;
138+
return;
139+
}
140+
141+
if (funnel->nqueues >=funnel->maxqueues)
142+
{
143+
intnewsize=funnel->nqueues*2;
144+
145+
Assert(funnel->nqueues==funnel->maxqueues);
146+
147+
funnel->queue=repalloc(funnel->queue,
148+
newsize*sizeof(shm_mq_handle*));
149+
funnel->maxqueues=newsize;
150+
}
151+
152+
funnel->queue[funnel->nqueues++]=handle;
153+
}
154+
155+
/*
156+
* Fetch a tuple from a tuple queue funnel.
157+
*
158+
* We try to read from the queues in round-robin fashion so as to avoid
159+
* the situation where some workers get their tuples read expediently while
160+
* others are barely ever serviced.
161+
*
162+
* Even when nowait = false, we read from the individual queues in
163+
* non-blocking mode. Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK,
164+
* it can still accumulate bytes from a partially-read message, so doing it
165+
* this way should outperform doing a blocking read on each queue in turn.
166+
*
167+
* The return value is NULL if there are no remaining queues or if
168+
* nowait = true and no queue returned a tuple without blocking. *done, if
169+
* not NULL, is set to true when there are no remaining queues and false in
170+
* any other case.
171+
*/
172+
HeapTuple
173+
TupleQueueFunnelNext(TupleQueueFunnel*funnel,boolnowait,bool*done)
174+
{
175+
intwaitpos=funnel->nextqueue;
176+
177+
/* Corner case: called before adding any queues, or after all are gone. */
178+
if (funnel->nqueues==0)
179+
{
180+
if (done!=NULL)
181+
*done= true;
182+
returnNULL;
183+
}
184+
185+
if (done!=NULL)
186+
*done= false;
187+
188+
for (;;)
189+
{
190+
shm_mq_handle*mqh=funnel->queue[funnel->nextqueue];
191+
shm_mq_resultresult;
192+
Sizenbytes;
193+
void*data;
194+
195+
/* Attempt to read a message. */
196+
result=shm_mq_receive(mqh,&nbytes,&data, true);
197+
198+
/*
199+
* Normally, we advance funnel->nextqueue to the next queue at this
200+
* point, but if we're pointing to a queue that we've just discovered
201+
* is detached, then forget that queue and leave the pointer where it
202+
* is until the number of remaining queues fall below that pointer and
203+
* at that point make the pointer point to the first queue.
204+
*/
205+
if (result!=SHM_MQ_DETACHED)
206+
funnel->nextqueue= (funnel->nextqueue+1) %funnel->nqueues;
207+
else
208+
{
209+
--funnel->nqueues;
210+
if (funnel->nqueues==0)
211+
{
212+
if (done!=NULL)
213+
*done= true;
214+
returnNULL;
215+
}
216+
217+
memmove(&funnel->queue[funnel->nextqueue],
218+
&funnel->queue[funnel->nextqueue+1],
219+
sizeof(shm_mq_handle*)
220+
* (funnel->nqueues-funnel->nextqueue));
221+
222+
if (funnel->nextqueue >=funnel->nqueues)
223+
funnel->nextqueue=0;
224+
225+
if (funnel->nextqueue<waitpos)
226+
--waitpos;
227+
228+
continue;
229+
}
230+
231+
/* If we got a message, return it. */
232+
if (result==SHM_MQ_SUCCESS)
233+
{
234+
HeapTupleDatahtup;
235+
236+
/*
237+
* The tuple data we just read from the queue is only valid until
238+
* we again attempt to read from it. Copy the tuple into a single
239+
* palloc'd chunk as callers will expect.
240+
*/
241+
ItemPointerSetInvalid(&htup.t_self);
242+
htup.t_tableOid=InvalidOid;
243+
htup.t_len=nbytes;
244+
htup.t_data=data;
245+
returnheap_copytuple(&htup);
246+
}
247+
248+
/*
249+
* If we've visited all of the queues, then we should either give up
250+
* and return NULL (if we're in non-blocking mode) or wait for the
251+
* process latch to be set (otherwise).
252+
*/
253+
if (funnel->nextqueue==waitpos)
254+
{
255+
if (nowait)
256+
returnNULL;
257+
WaitLatch(MyLatch,WL_LATCH_SET,0);
258+
CHECK_FOR_INTERRUPTS();
259+
ResetLatch(MyLatch);
260+
}
261+
}
262+
}

‎src/backend/storage/ipc/shm_mq.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -745,6 +745,15 @@ shm_mq_detach(shm_mq *mq)
745745
SetLatch(&victim->procLatch);
746746
}
747747

748+
/*
749+
* Get the shm_mq from handle.
750+
*/
751+
shm_mq*
752+
shm_mq_get_queue(shm_mq_handle*mqh)
753+
{
754+
returnmqh->mqh_queue;
755+
}
756+
748757
/*
749758
* Write bytes into a shared message queue.
750759
*/

‎src/backend/tcop/dest.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include"commands/createas.h"
3535
#include"commands/matview.h"
3636
#include"executor/functions.h"
37+
#include"executor/tqueue.h"
3738
#include"executor/tstoreReceiver.h"
3839
#include"libpq/libpq.h"
3940
#include"libpq/pqformat.h"
@@ -129,6 +130,9 @@ CreateDestReceiver(CommandDest dest)
129130

130131
caseDestTransientRel:
131132
returnCreateTransientRelDestReceiver(InvalidOid);
133+
134+
caseDestTupleQueue:
135+
returnCreateTupleQueueDestReceiver(NULL);
132136
}
133137

134138
/* should never get here */
@@ -162,6 +166,7 @@ EndCommand(const char *commandTag, CommandDest dest)
162166
caseDestCopyOut:
163167
caseDestSQLFunction:
164168
caseDestTransientRel:
169+
caseDestTupleQueue:
165170
break;
166171
}
167172
}
@@ -204,6 +209,7 @@ NullCommand(CommandDest dest)
204209
caseDestCopyOut:
205210
caseDestSQLFunction:
206211
caseDestTransientRel:
212+
caseDestTupleQueue:
207213
break;
208214
}
209215
}
@@ -248,6 +254,7 @@ ReadyForQuery(CommandDest dest)
248254
caseDestCopyOut:
249255
caseDestSQLFunction:
250256
caseDestTransientRel:
257+
caseDestTupleQueue:
251258
break;
252259
}
253260
}

‎src/include/executor/tqueue.h

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*-------------------------------------------------------------------------
2+
*
3+
* tqueue.h
4+
* Use shm_mq to send & receive tuples between parallel backends
5+
*
6+
* Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
7+
* Portions Copyright (c) 1994, Regents of the University of California
8+
*
9+
* src/include/executor/tqueue.h
10+
*
11+
*-------------------------------------------------------------------------
12+
*/
13+
14+
#ifndefTQUEUE_H
15+
#defineTQUEUE_H
16+
17+
#include"storage/shm_mq.h"
18+
#include"tcop/dest.h"
19+
20+
/* Use this to send tuples to a shm_mq. */
21+
externDestReceiver*CreateTupleQueueDestReceiver(shm_mq_handle*handle);
22+
23+
/* Use these to receive tuples from a shm_mq. */
24+
typedefstructTupleQueueFunnelTupleQueueFunnel;
25+
externTupleQueueFunnel*CreateTupleQueueFunnel(void);
26+
externvoidDestroyTupleQueueFunnel(TupleQueueFunnel*funnel);
27+
externvoidRegisterTupleQueueOnFunnel(TupleQueueFunnel*,shm_mq_handle*);
28+
externHeapTupleTupleQueueFunnelNext(TupleQueueFunnel*,boolnowait,
29+
bool*done);
30+
31+
#endif/* TQUEUE_H */

‎src/include/storage/shm_mq.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ extern void shm_mq_set_handle(shm_mq_handle *, BackgroundWorkerHandle *);
6565
/* Break connection. */
6666
externvoidshm_mq_detach(shm_mq*);
6767

68+
/* Get the shm_mq from handle. */
69+
externshm_mq*shm_mq_get_queue(shm_mq_handle*mqh);
70+
6871
/* Send or receive messages. */
6972
externshm_mq_resultshm_mq_send(shm_mq_handle*mqh,
7073
Sizenbytes,constvoid*data,boolnowait);

‎src/include/tcop/dest.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ typedef enum
9494
DestIntoRel,/* results sent to relation (SELECT INTO) */
9595
DestCopyOut,/* results sent to COPY TO code */
9696
DestSQLFunction,/* results sent to SQL-language func mgr */
97-
DestTransientRel/* results sent to transient relation */
97+
DestTransientRel,/* results sent to transient relation */
98+
DestTupleQueue/* results sent to tuple queue */
9899
}CommandDest;
99100

100101
/* ----------------

‎src/tools/pgindent/typedefs.list

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2014,6 +2014,7 @@ TupleHashEntry
20142014
TupleHashEntryData
20152015
TupleHashIterator
20162016
TupleHashTable
2017+
TupleQueueFunnel
20172018
TupleTableSlot
20182019
Tuplesortstate
20192020
Tuplestorestate

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp