Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commita379e36

Browse files
author
Alexander Korotkov
committed
pg_stat_wait contrib.
1 parentfb3d5da commita379e36

File tree

13 files changed

+1648
-2
lines changed

13 files changed

+1648
-2
lines changed

‎contrib/pg_stat_wait/Makefile‎

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# contrib/pg_stat_wait/Makefile
2+
3+
MODULE_big = pg_stat_wait
4+
OBJS = pg_stat_wait.o collector.o
5+
6+
EXTENSION = pg_stat_wait
7+
DATA = pg_stat_wait--1.0.sql
8+
PG_CPPFLAGS = -DPG_STAT_WAIT_TESTS
9+
EXTRA_CLEAN =$(pg_regress_clean_files) ./regression_output ./isolation_output
10+
11+
ifdefUSE_PGXS
12+
PG_CONFIG = pg_config
13+
PGXS :=$(shell$(PG_CONFIG) --pgxs)
14+
include$(PGXS)
15+
else
16+
subdir = contrib/pg_stat_wait
17+
top_builddir = ../..
18+
include$(top_builddir)/src/Makefile.global
19+
include$(top_srcdir)/contrib/contrib-global.mk
20+
endif
21+
22+
check: regresscheck
23+
24+
submake-regress:
25+
$(MAKE) -C$(top_builddir)/src/test/regress all
26+
27+
submake-isolation:
28+
$(MAKE) -C$(top_builddir)/src/test/isolation all
29+
30+
submake-pg_stat_wait:
31+
$(MAKE) -C$(top_builddir)/contrib/pg_stat_wait
32+
33+
REGRESSCHECKS=descriptions file_trace
34+
35+
regresscheck: all | submake-regress submake-pg_stat_wait
36+
$(MKDIR_P) regression_output
37+
$(pg_regress_check)\
38+
--temp-install=./tmp_check\
39+
--extra-install=contrib/pg_stat_wait\
40+
--outputdir=./regression_output\
41+
$(REGRESSCHECKS)
42+
43+
ISOLATIONCHECKS=history
44+
45+
isolationcheck: all | submake-isolation submake-pg_stat_wait
46+
$(MKDIR_P) isolation_output
47+
$(pg_isolation_regress_check)\
48+
--extra-install=contrib/pg_stat_wait\
49+
--temp-config=./waits.conf\
50+
--outputdir=./isolation_output\
51+
$(ISOLATIONCHECKS)
52+

‎contrib/pg_stat_wait/collector.c‎

Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
#include"postgres.h"
2+
3+
#include"access/htup_details.h"
4+
#include"catalog/pg_type.h"
5+
#include"funcapi.h"
6+
#include"miscadmin.h"
7+
#include"postmaster/bgworker.h"
8+
#include"storage/ipc.h"
9+
#include"storage/procarray.h"
10+
#include"storage/procsignal.h"
11+
#include"storage/s_lock.h"
12+
#include"storage/shm_mq.h"
13+
#include"storage/shm_toc.h"
14+
#include"storage/spin.h"
15+
#include"utils/guc.h"
16+
#include"utils/memutils.h"
17+
#include"utils/resowner.h"
18+
#include"utils/wait.h"
19+
#include"portability/instr_time.h"
20+
21+
#include"pg_stat_wait.h"
22+
23+
CollectorShmqHeader*hdr=NULL;
24+
25+
staticvoid*pgsw;
26+
shm_toc*toc;
27+
shm_mq*mq;
28+
staticvolatilesig_atomic_tshutdown_requested= false;
29+
30+
inthistorySize;
31+
inthistoryPeriod;
32+
boolhistorySkipLatch;
33+
34+
staticvoidhandle_sigterm(SIGNAL_ARGS);
35+
staticvoidcollector_main(Datummain_arg);
36+
37+
/*
38+
* Estimate shared memory space needed.
39+
*/
40+
Size
41+
CollectorShmemSize(void)
42+
{
43+
shm_toc_estimatore;
44+
Sizesize;
45+
46+
shm_toc_initialize_estimator(&e);
47+
shm_toc_estimate_chunk(&e,sizeof(CollectorShmqHeader));
48+
shm_toc_estimate_chunk(&e, (Size)COLLECTOR_QUEUE_SIZE);
49+
shm_toc_estimate_keys(&e,2);
50+
size=shm_toc_estimate(&e);
51+
52+
returnsize;
53+
}
54+
55+
CollectorShmqHeader*
56+
GetCollectorMem(boolinit)
57+
{
58+
boolfound;
59+
Sizesegsize=CollectorShmemSize();
60+
61+
pgsw=ShmemInitStruct("pg_stat_wait",segsize,&found);
62+
if (!init&& !found)
63+
{
64+
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
65+
errmsg("A collector memory wasn't initialized yet")));
66+
}
67+
68+
if (!found)
69+
{
70+
void*mq_mem;
71+
72+
toc=shm_toc_create(PG_STAT_WAIT_MAGIC,pgsw,segsize);
73+
hdr=shm_toc_allocate(toc,sizeof(CollectorShmqHeader));
74+
shm_toc_insert(toc,0,hdr);
75+
76+
mq_mem=shm_toc_allocate(toc,COLLECTOR_QUEUE_SIZE);
77+
shm_toc_insert(toc,1,mq_mem);
78+
}
79+
else
80+
{
81+
toc=shm_toc_attach(PG_STAT_WAIT_MAGIC,pgsw);
82+
hdr=shm_toc_lookup(toc,0);
83+
}
84+
returnhdr;
85+
}
86+
87+
void
88+
RegisterWaitsCollector(void)
89+
{
90+
BackgroundWorkerworker;
91+
92+
/* set up common data for all our workers */
93+
worker.bgw_flags=BGWORKER_SHMEM_ACCESS |
94+
BGWORKER_BACKEND_DATABASE_CONNECTION;
95+
worker.bgw_start_time=BgWorkerStart_ConsistentState;
96+
worker.bgw_restart_time=BGW_NEVER_RESTART;
97+
worker.bgw_main=collector_main;
98+
worker.bgw_notify_pid=0;
99+
snprintf(worker.bgw_name,BGW_MAXLEN,"pg_stat_wait collector");
100+
worker.bgw_main_arg= (Datum)0;
101+
RegisterBackgroundWorker(&worker);
102+
}
103+
104+
void
105+
AllocHistory(History*observations,intcount)
106+
{
107+
observations->items= (HistoryItem*)palloc0(sizeof(HistoryItem)*count);
108+
observations->index=0;
109+
observations->count=count;
110+
observations->wraparound= false;
111+
}
112+
113+
/* Read current wait information from proc, if readCurrent is true,
114+
* then it reads from currently going wait, and can be inconsistent
115+
*/
116+
int
117+
GetCurrentWaitsState(PGPROC*proc,HistoryItem*item,intidx)
118+
{
119+
instr_timecurrentTime;
120+
#ifdefNOT_USED
121+
ProcWait*wait;
122+
123+
if (idx==-1)
124+
return0;
125+
126+
INSTR_TIME_SET_CURRENT(currentTime);
127+
wait=&proc->waits.waitsBuf[idx];
128+
item->backendPid=proc->pid;
129+
item->classId= (int)wait->classId;
130+
if (item->classId==0)
131+
return0;
132+
133+
item->eventId= (int)wait->eventId;
134+
135+
INSTR_TIME_SUBTRACT(currentTime,wait->startTime);
136+
item->waitTime=INSTR_TIME_GET_MICROSEC(currentTime);
137+
memcpy(item->params,wait->params,sizeof(item->params));
138+
#endif
139+
return1;
140+
}
141+
142+
staticvoid
143+
handle_sigterm(SIGNAL_ARGS)
144+
{
145+
intsave_errno=errno;
146+
shutdown_requested= true;
147+
if (MyProc)
148+
SetLatch(&MyProc->procLatch);
149+
errno=save_errno;
150+
}
151+
152+
/* Circulation in history */
153+
staticHistoryItem*
154+
get_next_observation(History*observations)
155+
{
156+
HistoryItem*result;
157+
158+
result=&observations->items[observations->index];
159+
observations->index++;
160+
if (observations->index >=observations->count)
161+
{
162+
observations->index=0;
163+
observations->wraparound= true;
164+
}
165+
returnresult;
166+
}
167+
168+
/* Gets current waits from backends */
169+
staticvoid
170+
write_waits_history(History*observations,TimestampTzcurrent_ts)
171+
{
172+
inti;
173+
174+
#ifdefNOT_USED
175+
LWLockAcquire(ProcArrayLock,LW_SHARED);
176+
for (i=0;i<ProcGlobal->allProcCount;++i)
177+
{
178+
HistoryItemitem,*observation;
179+
PGPROC*proc=&ProcGlobal->allProcs[i];
180+
intstateOk=GetCurrentWaitsState(proc,&item,proc->waits.readIdx);
181+
182+
/* mark waits as read */
183+
proc->waits.readIdx=-1;
184+
185+
if (stateOk)
186+
{
187+
if (historySkipLatch&&item.classId==WAIT_LATCH)
188+
continue;
189+
190+
item.ts=current_ts;
191+
observation=get_next_observation(observations);
192+
*observation=item;
193+
}
194+
}
195+
LWLockRelease(ProcArrayLock);
196+
#endif
197+
}
198+
199+
staticvoid
200+
send_history(History*observations,shm_mq_handle*mqh)
201+
{
202+
intcount,i;
203+
204+
if (observations->wraparound)
205+
count=observations->count;
206+
else
207+
count=observations->index;
208+
209+
shm_mq_send(mqh,sizeof(count),&count, false);
210+
for (i=0;i<count;i++)
211+
shm_mq_send(mqh,sizeof(HistoryItem),&observations->items[i], false);
212+
}
213+
214+
staticvoid
215+
collector_main(Datummain_arg)
216+
{
217+
shm_mq*mq;
218+
shm_mq_handle*mqh;
219+
Historyobservations;
220+
MemoryContextold_context,collector_context;
221+
222+
/*
223+
* Establish signal handlers.
224+
*
225+
* We want CHECK_FOR_INTERRUPTS() to kill off this worker process just as
226+
* it would a normal user backend. To make that happen, we establish a
227+
* signal handler that is a stripped-down version of die(). We don't have
228+
* any equivalent of the backend's command-read loop, where interrupts can
229+
* be processed immediately, so make sure ImmediateInterruptOK is turned
230+
* off.
231+
*/
232+
pqsignal(SIGTERM,handle_sigterm);
233+
BackgroundWorkerUnblockSignals();
234+
235+
hdr->latch=&MyProc->procLatch;
236+
237+
CurrentResourceOwner=ResourceOwnerCreate(NULL,"pg_stat_wait collector");
238+
collector_context=AllocSetContextCreate(TopMemoryContext,
239+
"pg_stat_wait context",
240+
ALLOCSET_DEFAULT_MINSIZE,
241+
ALLOCSET_DEFAULT_INITSIZE,
242+
ALLOCSET_DEFAULT_MAXSIZE);
243+
old_context=MemoryContextSwitchTo(collector_context);
244+
AllocHistory(&observations,historySize);
245+
MemoryContextSwitchTo(old_context);
246+
247+
while (1)
248+
{
249+
intrc;
250+
TimestampTzcurrent_ts;
251+
252+
ResetLatch(&MyProc->procLatch);
253+
current_ts=GetCurrentTimestamp();
254+
write_waits_history(&observations,current_ts);
255+
256+
if (shutdown_requested)
257+
break;
258+
259+
rc=WaitLatch(&MyProc->procLatch,
260+
WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH,
261+
historyPeriod);
262+
263+
if (rc&WL_POSTMASTER_DEATH)
264+
exit(1);
265+
266+
if (hdr->request==HISTORY_REQUEST)
267+
{
268+
hdr->request=NO_REQUEST;
269+
270+
mq= (shm_mq*)shm_toc_lookup(toc,1);
271+
shm_mq_set_sender(mq,MyProc);
272+
mqh=shm_mq_attach(mq,NULL,NULL);
273+
shm_mq_wait_for_attach(mqh);
274+
275+
if (shm_mq_get_receiver(mq)!=NULL)
276+
send_history(&observations,mqh);
277+
278+
shm_mq_detach(mq);
279+
}
280+
}
281+
282+
MemoryContextReset(collector_context);
283+
284+
/*
285+
* We're done. Explicitly detach the shared memory segment so that we
286+
* don't get a resource leak warning at commit time. This will fire any
287+
* on_dsm_detach callbacks we've registered, as well. Once that's done,
288+
* we can go ahead and exit.
289+
*/
290+
proc_exit(0);
291+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp