Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4db3744

Browse files
committed
Test code for shared memory message queue facility.
This code is intended as a demonstration of how the dynamic sharedmemory and dynamic background worker facilities can be used to establisha group of coooperating processes which can coordinate their activitiesusing the shared memory message queue facility. By itself, the codedoes nothing particularly interesting: it simply allows messages tobe passed through a loop of workers and back to the original process.But it's a useful unit test, in addition to its demonstration value.
1 parentec9037d commit4db3744

File tree

11 files changed

+932
-0
lines changed

11 files changed

+932
-0
lines changed

‎contrib/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ SUBDIRS = \
5151
tablefunc\
5252
tcn\
5353
test_parser\
54+
test_shm_mq\
5455
tsearch2\
5556
unaccent\
5657
vacuumlo\

‎contrib/test_shm_mq/.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Generated subdirectories
2+
/log/
3+
/results/
4+
/tmp_check/

‎contrib/test_shm_mq/Makefile

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# contrib/test_shm_mq/Makefile
2+
3+
MODULE_big = test_shm_mq
4+
OBJS = test.o setup.o worker.o
5+
6+
EXTENSION = test_shm_mq
7+
DATA = test_shm_mq--1.0.sql
8+
9+
REGRESS = test_shm_mq
10+
11+
ifdefUSE_PGXS
12+
PG_CONFIG = pg_config
13+
PGXS :=$(shell$(PG_CONFIG) --pgxs)
14+
include$(PGXS)
15+
else
16+
subdir = contrib/test_shm_mq
17+
top_builddir = ../..
18+
include$(top_builddir)/src/Makefile.global
19+
include$(top_srcdir)/contrib/contrib-global.mk
20+
endif
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
CREATE EXTENSION test_shm_mq;
2+
--
3+
-- These tests don't produce any interesting output. We're checking that
4+
-- the operations complete without crashing or hanging and that none of their
5+
-- internal sanity tests fail.
6+
--
7+
SELECT test_shm_mq(32768, (select string_agg(chr(32+(random()*96)::int), '') from generate_series(1,400)), 10000, 1);
8+
test_shm_mq
9+
-------------
10+
11+
(1 row)
12+
13+
SELECT test_shm_mq_pipelined(16384, (select string_agg(chr(32+(random()*96)::int), '') from generate_series(1,270000)), 200, 3);
14+
test_shm_mq_pipelined
15+
-----------------------
16+
17+
(1 row)
18+

‎contrib/test_shm_mq/setup.c

Lines changed: 323 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,323 @@
1+
/*--------------------------------------------------------------------------
2+
*
3+
* setup.c
4+
*Code to set up a dynamic shared memory segments and a specified
5+
*number of background workers for shared memory message queue
6+
*testing.
7+
*
8+
* Copyright (C) 2013, PostgreSQL Global Development Group
9+
*
10+
* IDENTIFICATION
11+
*contrib/test_shm_mq/setup.c
12+
*
13+
* -------------------------------------------------------------------------
14+
*/
15+
16+
#include"postgres.h"
17+
18+
#include"miscadmin.h"
19+
#include"postmaster/bgworker.h"
20+
#include"storage/procsignal.h"
21+
#include"storage/shm_toc.h"
22+
#include"utils/memutils.h"
23+
24+
#include"test_shm_mq.h"
25+
26+
typedefstruct
27+
{
28+
intnworkers;
29+
BackgroundWorkerHandle*handle[FLEXIBLE_ARRAY_MEMBER];
30+
}worker_state;
31+
32+
staticvoidsetup_dynamic_shared_memory(uint64queue_size,intnworkers,
33+
dsm_segment**segp,
34+
test_shm_mq_header**hdrp,
35+
shm_mq**outp,shm_mq**inp);
36+
staticworker_state*setup_background_workers(intnworkers,
37+
dsm_segment*seg);
38+
staticvoidcleanup_background_workers(dsm_segment*seg,Datumarg);
39+
staticvoidwait_for_workers_to_become_ready(worker_state*wstate,
40+
volatiletest_shm_mq_header*hdr);
41+
staticboolcheck_worker_status(worker_state*wstate);
42+
43+
/*
44+
* Set up a dynamic shared memory segment and zero or more background workers
45+
* for a test run.
46+
*/
47+
void
48+
test_shm_mq_setup(uint64queue_size,int32nworkers,dsm_segment**segp,
49+
shm_mq_handle**output,shm_mq_handle**input)
50+
{
51+
dsm_segment*seg;
52+
test_shm_mq_header*hdr;
53+
shm_mq*outq;
54+
shm_mq*inq;
55+
worker_state*wstate;
56+
57+
/* Set up a dynamic shared memory segment. */
58+
setup_dynamic_shared_memory(queue_size,nworkers,&seg,&hdr,&outq,&inq);
59+
*segp=seg;
60+
61+
/* Register background workers. */
62+
wstate=setup_background_workers(nworkers,seg);
63+
64+
/* Attach the queues. */
65+
*output=shm_mq_attach(outq,seg,wstate->handle[0]);
66+
*input=shm_mq_attach(inq,seg,wstate->handle[nworkers-1]);
67+
68+
/* Wait for workers to become ready. */
69+
wait_for_workers_to_become_ready(wstate,hdr);
70+
71+
/*
72+
* Once we reach this point, all workers are ready. We no longer need
73+
* to kill them if we die; they'll die on their own as the message queues
74+
* shut down.
75+
*/
76+
cancel_on_dsm_detach(seg,cleanup_background_workers,
77+
PointerGetDatum(wstate));
78+
pfree(wstate);
79+
}
80+
81+
/*
82+
* Set up a dynamic shared memory segment.
83+
*
84+
* We set up a small control region that contains only a test_shm_mq_header,
85+
* plus one region per message queue. There are as many message queues as
86+
* the number of workers, plus one.
87+
*/
88+
staticvoid
89+
setup_dynamic_shared_memory(uint64queue_size,intnworkers,
90+
dsm_segment**segp,test_shm_mq_header**hdrp,
91+
shm_mq**outp,shm_mq**inp)
92+
{
93+
shm_toc_estimatore;
94+
inti;
95+
uint64segsize;
96+
dsm_segment*seg;
97+
shm_toc*toc;
98+
test_shm_mq_header*hdr;
99+
100+
/* Ensure a valid queue size. */
101+
if (queue_size<0|| ((uint64)queue_size)<shm_mq_minimum_size)
102+
ereport(ERROR,
103+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
104+
errmsg("queue size must be at least "UINT64_FORMAT" bytes",
105+
shm_mq_minimum_size)));
106+
107+
/*
108+
* Estimate how much shared memory we need.
109+
*
110+
* Because the TOC machinery may choose to insert padding of oddly-sized
111+
* requests, we must estimate each chunk separately.
112+
*
113+
* We need one key to register the location of the header, and we need
114+
* nworkers + 1 keys to track the locations of the message queues.
115+
*/
116+
shm_toc_initialize_estimator(&e);
117+
shm_toc_estimate_chunk(&e,sizeof(test_shm_mq_header));
118+
for (i=0;i <=nworkers;++i)
119+
shm_toc_estimate_chunk(&e,queue_size);
120+
shm_toc_estimate_keys(&e,2+nworkers);
121+
segsize=shm_toc_estimate(&e);
122+
123+
/* Create the shared memory segment and establish a table of contents. */
124+
seg=dsm_create(shm_toc_estimate(&e));
125+
toc=shm_toc_create(PG_TEST_SHM_MQ_MAGIC,dsm_segment_address(seg),
126+
segsize);
127+
128+
/* Set up the header region. */
129+
hdr=shm_toc_allocate(toc,sizeof(test_shm_mq_header));
130+
SpinLockInit(&hdr->mutex);
131+
hdr->workers_total=nworkers;
132+
hdr->workers_attached=0;
133+
hdr->workers_ready=0;
134+
shm_toc_insert(toc,0,hdr);
135+
136+
/* Set up one message queue per worker, plus one. */
137+
for (i=0;i <=nworkers;++i)
138+
{
139+
shm_mq*mq;
140+
141+
mq=shm_mq_create(shm_toc_allocate(toc,queue_size),queue_size);
142+
shm_toc_insert(toc,i+1,mq);
143+
144+
if (i==0)
145+
{
146+
/* We send messages to the first queue. */
147+
shm_mq_set_sender(mq,MyProc);
148+
*outp=mq;
149+
}
150+
if (i==nworkers)
151+
{
152+
/* We receive messages from the last queue. */
153+
shm_mq_set_receiver(mq,MyProc);
154+
*inp=mq;
155+
}
156+
}
157+
158+
/* Return results to caller. */
159+
*segp=seg;
160+
*hdrp=hdr;
161+
}
162+
163+
/*
164+
* Register background workers.
165+
*/
166+
staticworker_state*
167+
setup_background_workers(intnworkers,dsm_segment*seg)
168+
{
169+
MemoryContextoldcontext;
170+
BackgroundWorkerworker;
171+
worker_state*wstate;
172+
inti;
173+
174+
/*
175+
* We need the worker_state object and the background worker handles to
176+
* which it points to be allocated in CurTransactionContext rather than
177+
* ExprContext; otherwise, they'll be destroyed before the on_dsm_detach
178+
* hooks run.
179+
*/
180+
oldcontext=MemoryContextSwitchTo(CurTransactionContext);
181+
182+
/* Create worker state object. */
183+
wstate=MemoryContextAlloc(TopTransactionContext,
184+
offsetof(worker_state,handle)+
185+
sizeof(BackgroundWorkerHandle*)*nworkers);
186+
wstate->nworkers=0;
187+
188+
/*
189+
* Arrange to kill all the workers if we abort before all workers are
190+
* finished hooking themselves up to the dynamic shared memory segment.
191+
*
192+
* If we die after all the workers have finished hooking themselves up
193+
* to the dynamic shared memory segment, we'll mark the two queues to
194+
* which we're directly connected as detached, and the worker(s)
195+
* connected to those queues will exit, marking any other queues to
196+
* which they are connected as detached. This will cause any
197+
* as-yet-unaware workers connected to those queues to exit in their
198+
* turn, and so on, until everybody exits.
199+
*
200+
* But suppose the workers which are supposed to connect to the queues
201+
* to which we're directly attached exit due to some error before they
202+
* actually attach the queues. The remaining workers will have no way of
203+
* knowing this. From their perspective, they're still waiting for those
204+
* workers to start, when in fact they've already died.
205+
*/
206+
on_dsm_detach(seg,cleanup_background_workers,
207+
PointerGetDatum(wstate));
208+
209+
/* Configure a worker. */
210+
worker.bgw_flags=BGWORKER_SHMEM_ACCESS;
211+
worker.bgw_start_time=BgWorkerStart_ConsistentState;
212+
worker.bgw_restart_time=BGW_NEVER_RESTART;
213+
worker.bgw_main=NULL;/* new worker might not have library loaded */
214+
sprintf(worker.bgw_library_name,"test_shm_mq");
215+
sprintf(worker.bgw_function_name,"test_shm_mq_main");
216+
snprintf(worker.bgw_name,BGW_MAXLEN,"test_shm_mq");
217+
worker.bgw_main_arg=UInt32GetDatum(dsm_segment_handle(seg));
218+
/* set bgw_notify_pid, so we can detect if the worker stops */
219+
worker.bgw_notify_pid=MyProcPid;
220+
221+
/* Register the workers. */
222+
for (i=0;i<nworkers;++i)
223+
{
224+
if (!RegisterDynamicBackgroundWorker(&worker,&wstate->handle[i]))
225+
ereport(ERROR,
226+
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
227+
errmsg("could not register background process"),
228+
errhint("You may need to increase max_worker_processes.")));
229+
++wstate->nworkers;
230+
}
231+
232+
/* All done. */
233+
MemoryContextSwitchTo(oldcontext);
234+
returnwstate;
235+
}
236+
237+
staticvoid
238+
cleanup_background_workers(dsm_segment*seg,Datumarg)
239+
{
240+
worker_state*wstate= (worker_state*)DatumGetPointer(arg);
241+
242+
while (wstate->nworkers>0)
243+
{
244+
--wstate->nworkers;
245+
TerminateBackgroundWorker(wstate->handle[wstate->nworkers]);
246+
}
247+
}
248+
249+
staticvoid
250+
wait_for_workers_to_become_ready(worker_state*wstate,
251+
volatiletest_shm_mq_header*hdr)
252+
{
253+
boolsave_set_latch_on_sigusr1;
254+
boolresult= false;
255+
256+
save_set_latch_on_sigusr1=set_latch_on_sigusr1;
257+
set_latch_on_sigusr1= true;
258+
259+
PG_TRY();
260+
{
261+
for (;;)
262+
{
263+
intworkers_ready;
264+
265+
/* If all the workers are ready, we have succeeded. */
266+
SpinLockAcquire(&hdr->mutex);
267+
workers_ready=hdr->workers_ready;
268+
SpinLockRelease(&hdr->mutex);
269+
if (workers_ready >=wstate->nworkers)
270+
{
271+
result= true;
272+
break;
273+
}
274+
275+
/* If any workers (or the postmaster) have died, we have failed. */
276+
if (!check_worker_status(wstate))
277+
{
278+
result= false;
279+
break;
280+
}
281+
282+
/* Wait to be signalled. */
283+
WaitLatch(&MyProc->procLatch,WL_LATCH_SET,0);
284+
285+
/* An interrupt may have occurred while we were waiting. */
286+
CHECK_FOR_INTERRUPTS();
287+
288+
/* Reset the latch so we don't spin. */
289+
ResetLatch(&MyProc->procLatch);
290+
}
291+
}
292+
PG_CATCH();
293+
{
294+
set_latch_on_sigusr1=save_set_latch_on_sigusr1;
295+
PG_RE_THROW();
296+
}
297+
PG_END_TRY();
298+
299+
if (!result)
300+
ereport(ERROR,
301+
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
302+
errmsg("one or more background workers failed to start")));
303+
}
304+
305+
staticbool
306+
check_worker_status(worker_state*wstate)
307+
{
308+
intn;
309+
310+
/* If any workers (or the postmaster) have died, we have failed. */
311+
for (n=0;n<wstate->nworkers;++n)
312+
{
313+
BgwHandleStatusstatus;
314+
pid_tpid;
315+
316+
status=GetBackgroundWorkerPid(wstate->handle[n],&pid);
317+
if (status==BGWH_STOPPED||status==BGWH_POSTMASTER_DIED)
318+
return false;
319+
}
320+
321+
/* Otherwise, things still look OK. */
322+
return true;
323+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
CREATE EXTENSION test_shm_mq;
2+
3+
--
4+
-- These tests don't produce any interesting output. We're checking that
5+
-- the operations complete without crashing or hanging and that none of their
6+
-- internal sanity tests fail.
7+
--
8+
SELECT test_shm_mq(32768, (select string_agg(chr(32+(random()*96)::int),'')from generate_series(1,400)),10000,1);
9+
SELECT test_shm_mq_pipelined(16384, (select string_agg(chr(32+(random()*96)::int),'')from generate_series(1,270000)),200,3);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp