20
20
21
21
#include "pg_stat_wait.h"
22
22
23
- CollectorShmqHeader * hdr = NULL ;
24
-
25
- static void * pgsw ;
26
- shm_toc * toc ;
27
- shm_mq * mq ;
28
23
static volatile sig_atomic_t shutdown_requested = false;
29
24
30
- int historySize ;
31
- int historyPeriod ;
32
- bool historySkipLatch ;
33
-
34
25
static void handle_sigterm (SIGNAL_ARGS );
35
26
static void collector_main (Datum main_arg );
36
27
37
28
/*
38
- *Estimate shared memory space needed .
29
+ *Register background worker for collecting waits history .
39
30
*/
40
- Size
41
- CollectorShmemSize (void )
42
- {
43
- shm_toc_estimator e ;
44
- Size size ;
45
-
46
- shm_toc_initialize_estimator (& e );
47
- shm_toc_estimate_chunk (& e ,sizeof (CollectorShmqHeader ));
48
- shm_toc_estimate_chunk (& e , (Size )COLLECTOR_QUEUE_SIZE );
49
- shm_toc_estimate_keys (& e ,2 );
50
- size = shm_toc_estimate (& e );
51
-
52
- return size ;
53
- }
54
-
55
- CollectorShmqHeader *
56
- GetCollectorMem (bool init )
57
- {
58
- bool found ;
59
- Size segsize = CollectorShmemSize ();
60
-
61
- pgsw = ShmemInitStruct ("pg_stat_wait" ,segsize ,& found );
62
- if (!init && !found )
63
- {
64
- ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
65
- errmsg ("A collector memory wasn't initialized yet" )));
66
- }
67
-
68
- if (!found )
69
- {
70
- void * mq_mem ;
71
-
72
- toc = shm_toc_create (PG_STAT_WAIT_MAGIC ,pgsw ,segsize );
73
- hdr = shm_toc_allocate (toc ,sizeof (CollectorShmqHeader ));
74
- shm_toc_insert (toc ,0 ,hdr );
75
-
76
- mq_mem = shm_toc_allocate (toc ,COLLECTOR_QUEUE_SIZE );
77
- shm_toc_insert (toc ,1 ,mq_mem );
78
- }
79
- else
80
- {
81
- toc = shm_toc_attach (PG_STAT_WAIT_MAGIC ,pgsw );
82
- hdr = shm_toc_lookup (toc ,0 );
83
- }
84
- return hdr ;
85
- }
86
-
87
31
void
88
32
RegisterWaitsCollector (void )
89
33
{
@@ -110,33 +54,26 @@ AllocHistory(History *observations, int count)
110
54
observations -> wraparound = false;
111
55
}
112
56
113
- /* Read current wait information from proc, if readCurrent is true,
57
+ /*
58
+ * Read current wait information from proc, if readCurrent is true,
114
59
* then it reads from currently going wait, and can be inconsistent
115
60
*/
116
- int
117
- GetCurrentWaitsState (PGPROC * proc ,HistoryItem * item , int idx )
61
+ void
62
+ ReadCurrentWait (PGPROC * proc ,HistoryItem * item )
118
63
{
119
- instr_time currentTime ;
120
- #ifdef NOT_USED
121
- ProcWait * wait ;
64
+ CurrentWaitEvent * wait ;
65
+ instr_time currentTime ;
122
66
123
- if (idx == -1 )
124
- return 0 ;
67
+ wait = & cur_wait_events [proc -> pgprocno ];
125
68
126
- INSTR_TIME_SET_CURRENT (currentTime );
127
- wait = & proc -> waits .waitsBuf [idx ];
128
69
item -> backendPid = proc -> pid ;
129
- item -> classId = (int )wait -> classId ;
130
- if (item -> classId == 0 )
131
- return 0 ;
132
-
133
- item -> eventId = (int )wait -> eventId ;
70
+ item -> classid = wait -> classid ;
71
+ item -> eventid = wait -> eventid ;
72
+ memcpy (item -> params ,wait -> params ,sizeof (item -> params ));
134
73
135
- INSTR_TIME_SUBTRACT (currentTime ,wait -> startTime );
74
+ INSTR_TIME_SET_CURRENT (currentTime );
75
+ INSTR_TIME_SUBTRACT (currentTime ,wait -> start_time );
136
76
item -> waitTime = INSTR_TIME_GET_MICROSEC (currentTime );
137
- memcpy (item -> params ,wait -> params ,sizeof (item -> params ));
138
- #endif
139
- return 1 ;
140
77
}
141
78
142
79
static void
@@ -149,7 +86,9 @@ handle_sigterm(SIGNAL_ARGS)
149
86
errno = save_errno ;
150
87
}
151
88
152
- /* Circulation in history */
89
+ /*
90
+ * Get next item of history with rotation.
91
+ */
153
92
static HistoryItem *
154
93
get_next_observation (History * observations )
155
94
{
@@ -165,41 +104,41 @@ get_next_observation(History *observations)
165
104
return result ;
166
105
}
167
106
168
- /* Gets current waits from backends */
107
+ /*
108
+ * Read current waits from backends and write them to history array.
109
+ */
169
110
static void
170
111
write_waits_history (History * observations ,TimestampTz current_ts )
171
112
{
172
113
int i ;
173
114
174
- #ifdef NOT_USED
175
115
LWLockAcquire (ProcArrayLock ,LW_SHARED );
176
- for (i = 0 ;i < ProcGlobal -> allProcCount ;++ i )
116
+ for (i = 0 ;i < ProcGlobal -> allProcCount ;i ++ )
177
117
{
178
- HistoryItem item ,* observation ;
179
- PGPROC * proc = & ProcGlobal -> allProcs [ i ] ;
180
- int stateOk = GetCurrentWaitsState ( proc , & item , proc -> waits . readIdx ) ;
118
+ HistoryItem item ,
119
+ * observation ;
120
+ PGPROC * proc = & ProcGlobal -> allProcs [ i ] ;
181
121
182
- /* mark waits as read */
183
- proc -> waits .readIdx = -1 ;
122
+ ReadCurrentWait (proc ,& item );
184
123
185
- if (stateOk )
186
- {
187
- if (historySkipLatch && item .classId == WAIT_LATCH )
188
- continue ;
124
+ if (historySkipLatch && item .classid == WAIT_LATCH )
125
+ continue ;
189
126
190
- item .ts = current_ts ;
191
- observation = get_next_observation (observations );
192
- * observation = item ;
193
- }
127
+ item .ts = current_ts ;
128
+ observation = get_next_observation (observations );
129
+ * observation = item ;
194
130
}
195
131
LWLockRelease (ProcArrayLock );
196
- #endif
197
132
}
198
133
134
+ /*
135
+ * Send waits history to shared memory queue.
136
+ */
199
137
static void
200
138
send_history (History * observations ,shm_mq_handle * mqh )
201
139
{
202
- int count ,i ;
140
+ int count ,
141
+ i ;
203
142
204
143
if (observations -> wraparound )
205
144
count = observations -> count ;
@@ -211,13 +150,15 @@ send_history(History *observations, shm_mq_handle *mqh)
211
150
shm_mq_send (mqh ,sizeof (HistoryItem ),& observations -> items [i ], false);
212
151
}
213
152
153
+ /*
154
+ * Main routine of wait history collector.
155
+ */
214
156
static void
215
157
collector_main (Datum main_arg )
216
158
{
217
- shm_mq * mq ;
218
- shm_mq_handle * mqh ;
219
- History observations ;
220
- MemoryContext old_context ,collector_context ;
159
+ History observations ;
160
+ MemoryContext old_context ,
161
+ collector_context ;
221
162
222
163
/*
223
164
* Establish signal handlers.
@@ -232,7 +173,7 @@ collector_main(Datum main_arg)
232
173
pqsignal (SIGTERM ,handle_sigterm );
233
174
BackgroundWorkerUnblockSignals ();
234
175
235
- hdr -> latch = & MyProc -> procLatch ;
176
+ collector_hdr -> latch = & MyProc -> procLatch ;
236
177
237
178
CurrentResourceOwner = ResourceOwnerCreate (NULL ,"pg_stat_wait collector" );
238
179
collector_context = AllocSetContextCreate (TopMemoryContext ,
@@ -246,8 +187,9 @@ collector_main(Datum main_arg)
246
187
247
188
while (1 )
248
189
{
249
- int rc ;
250
- TimestampTz current_ts ;
190
+ int rc ;
191
+ TimestampTz current_ts ;
192
+ shm_mq_handle * mqh ;
251
193
252
194
ResetLatch (& MyProc -> procLatch );
253
195
current_ts = GetCurrentTimestamp ();
@@ -263,19 +205,18 @@ collector_main(Datum main_arg)
263
205
if (rc & WL_POSTMASTER_DEATH )
264
206
exit (1 );
265
207
266
- if (hdr -> request == HISTORY_REQUEST )
208
+ if (collector_hdr -> request == HISTORY_REQUEST )
267
209
{
268
- hdr -> request = NO_REQUEST ;
210
+ collector_hdr -> request = NO_REQUEST ;
269
211
270
- mq = (shm_mq * )shm_toc_lookup (toc ,1 );
271
- shm_mq_set_sender (mq ,MyProc );
272
- mqh = shm_mq_attach (mq ,NULL ,NULL );
212
+ shm_mq_set_sender (collector_mq ,MyProc );
213
+ mqh = shm_mq_attach (collector_mq ,NULL ,NULL );
273
214
shm_mq_wait_for_attach (mqh );
274
215
275
- if (shm_mq_get_receiver (mq )!= NULL )
216
+ if (shm_mq_get_receiver (collector_mq )!= NULL )
276
217
send_history (& observations ,mqh );
277
218
278
- shm_mq_detach (mq );
219
+ shm_mq_detach (collector_mq );
279
220
}
280
221
}
281
222