2626#include "utils/resowner.h"
2727#include "pgstat.h"
2828
29+ #include "compat.h"
2930#include "pg_wait_sampling.h"
3031
3132static volatile sig_atomic_t shutdown_requested = false;
@@ -36,18 +37,18 @@ static void handle_sigterm(SIGNAL_ARGS);
3637 * Register background worker for collecting waits history.
3738 */
3839void
39- register_wait_collector (void )
40+ pgws_register_wait_collector (void )
4041{
4142BackgroundWorker worker ;
4243
4344/* Set up background worker parameters */
4445memset (& worker ,0 ,sizeof (worker ));
4546worker .bgw_flags = BGWORKER_SHMEM_ACCESS ;
4647worker .bgw_start_time = BgWorkerStart_ConsistentState ;
47- worker .bgw_restart_time = 0 ;
48+ worker .bgw_restart_time = 1 ;
4849worker .bgw_notify_pid = 0 ;
4950snprintf (worker .bgw_library_name ,BGW_MAXLEN ,"pg_wait_sampling" );
50- snprintf (worker .bgw_function_name ,BGW_MAXLEN ,CppAsString (collector_main ));
51+ snprintf (worker .bgw_function_name ,BGW_MAXLEN ,CppAsString (pgws_collector_main ));
5152snprintf (worker .bgw_name ,BGW_MAXLEN ,"pg_wait_sampling collector" );
5253worker .bgw_main_arg = (Datum )0 ;
5354RegisterBackgroundWorker (& worker );
@@ -56,7 +57,7 @@ register_wait_collector(void)
5657/*
5758 * Allocate memory for waits history.
5859 */
59- void
60+ static void
6061alloc_history (History * observations ,int count )
6162{
6263observations -> items = (HistoryItem * )palloc0 (sizeof (HistoryItem )* count );
@@ -150,7 +151,7 @@ probe_waits(History *observations, HTAB *profile_hash,
150151TimestampTz ts = GetCurrentTimestamp ();
151152
152153/* Realloc waits history if needed */
153- newSize = collector_hdr -> historySize ;
154+ newSize = pgws_collector_hdr -> historySize ;
154155if (observations -> count != newSize )
155156realloc_history (observations ,newSize );
156157
@@ -172,8 +173,8 @@ probe_waits(History *observations, HTAB *profile_hash,
172173item .pid = proc -> pid ;
173174item .wait_event_info = proc -> wait_event_info ;
174175
175- if (collector_hdr -> profileQueries )
176- item .queryId = proc_queryids [i ];
176+ if (pgws_collector_hdr -> profileQueries )
177+ item .queryId = pgws_proc_queryids [i ];
177178else
178179item .queryId = 0 ;
179180
@@ -220,7 +221,7 @@ send_history(History *observations, shm_mq_handle *mqh)
220221else
221222count = observations -> index ;
222223
223- mq_result = shm_mq_send (mqh ,sizeof (count ),& count , false);
224+ mq_result = shm_mq_send_compat (mqh ,sizeof (count ),& count , false, true );
224225if (mq_result == SHM_MQ_DETACHED )
225226{
226227ereport (WARNING ,
@@ -230,10 +231,11 @@ send_history(History *observations, shm_mq_handle *mqh)
230231}
231232for (i = 0 ;i < count ;i ++ )
232233{
233- mq_result = shm_mq_send (mqh ,
234+ mq_result = shm_mq_send_compat (mqh ,
234235sizeof (HistoryItem ),
235236& observations -> items [i ],
236- false);
237+ false,
238+ true);
237239if (mq_result == SHM_MQ_DETACHED )
238240{
239241ereport (WARNING ,
@@ -255,7 +257,7 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh)
255257Size count = hash_get_num_entries (profile_hash );
256258shm_mq_result mq_result ;
257259
258- mq_result = shm_mq_send (mqh ,sizeof (count ),& count , false);
260+ mq_result = shm_mq_send_compat (mqh ,sizeof (count ),& count , false, true );
259261if (mq_result == SHM_MQ_DETACHED )
260262{
261263ereport (WARNING ,
@@ -266,7 +268,8 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh)
266268hash_seq_init (& scan_status ,profile_hash );
267269while ((item = (ProfileItem * )hash_seq_search (& scan_status ))!= NULL )
268270{
269- mq_result = shm_mq_send (mqh ,sizeof (ProfileItem ),item , false);
271+ mq_result = shm_mq_send_compat (mqh ,sizeof (ProfileItem ),item , false,
272+ true);
270273if (mq_result == SHM_MQ_DETACHED )
271274{
272275hash_seq_term (& scan_status );
@@ -289,7 +292,7 @@ make_profile_hash()
289292hash_ctl .hash = tag_hash ;
290293hash_ctl .hcxt = TopMemoryContext ;
291294
292- if (collector_hdr -> profileQueries )
295+ if (pgws_collector_hdr -> profileQueries )
293296hash_ctl .keysize = offsetof(ProfileItem ,count );
294297else
295298hash_ctl .keysize = offsetof(ProfileItem ,queryId );
@@ -318,7 +321,7 @@ millisecs_diff(TimestampTz tz1, TimestampTz tz2)
318321 * Main routine of wait history collector.
319322 */
320323void
321- collector_main (Datum main_arg )
324+ pgws_collector_main (Datum main_arg )
322325{
323326HTAB * profile_hash = NULL ;
324327History observations ;
@@ -337,28 +340,31 @@ collector_main(Datum main_arg)
337340 * any equivalent of the backend's command-read loop, where interrupts can
338341 * be processed immediately, so make sure ImmediateInterruptOK is turned
339342 * off.
343+ *
344+ * We also want to respond to the ProcSignal notifications. This is done
345+ * in the upstream provided procsignal_sigusr1_handler, which is
346+ * automatically used if a bgworker connects to a database. But since our
347+ * worker doesn't connect to any database even though it calls
348+ * InitPostgres, which will still initializze a new backend and thus
349+ * partitipate to the ProcSignal infrastructure.
340350 */
341351pqsignal (SIGTERM ,handle_sigterm );
352+ pqsignal (SIGUSR1 ,procsignal_sigusr1_handler );
342353BackgroundWorkerUnblockSignals ();
343-
344- #if PG_VERSION_NUM >=110000
345- InitPostgres (NULL ,InvalidOid ,NULL ,InvalidOid ,NULL , false);
346- #else
347- InitPostgres (NULL ,InvalidOid ,NULL ,InvalidOid ,NULL );
348- #endif
354+ InitPostgresCompat (NULL ,InvalidOid ,NULL ,InvalidOid , false, false,NULL );
349355SetProcessingMode (NormalProcessing );
350356
351357/* Make pg_wait_sampling recognisable in pg_stat_activity */
352358pgstat_report_appname ("pg_wait_sampling collector" );
353359
354360profile_hash = make_profile_hash ();
355- collector_hdr -> latch = & MyProc -> procLatch ;
361+ pgws_collector_hdr -> latch = & MyProc -> procLatch ;
356362
357363CurrentResourceOwner = ResourceOwnerCreate (NULL ,"pg_wait_sampling collector" );
358364collector_context = AllocSetContextCreate (TopMemoryContext ,
359365"pg_wait_sampling context" ,ALLOCSET_DEFAULT_SIZES );
360366old_context = MemoryContextSwitchTo (collector_context );
361- alloc_history (& observations ,collector_hdr -> historySize );
367+ alloc_history (& observations ,pgws_collector_hdr -> historySize );
362368MemoryContextSwitchTo (old_context );
363369
364370ereport (LOG , (errmsg ("pg_wait_sampling collector started" )));
@@ -377,21 +383,24 @@ collector_main(Datum main_arg)
377383bool write_history ,
378384write_profile ;
379385
386+ /* We need an explicit call for at least ProcSignal notifications. */
387+ CHECK_FOR_INTERRUPTS ();
388+
380389/* Wait calculate time to next sample for history or profile */
381390current_ts = GetCurrentTimestamp ();
382391
383392history_diff = millisecs_diff (history_ts ,current_ts );
384393profile_diff = millisecs_diff (profile_ts ,current_ts );
385- history_period = collector_hdr -> historyPeriod ;
386- profile_period = collector_hdr -> profilePeriod ;
394+ history_period = pgws_collector_hdr -> historyPeriod ;
395+ profile_period = pgws_collector_hdr -> profilePeriod ;
387396
388397write_history = (history_diff >= (int64 )history_period );
389398write_profile = (profile_diff >= (int64 )profile_period );
390399
391400if (write_history || write_profile )
392401{
393402probe_waits (& observations ,profile_hash ,
394- write_history ,write_profile ,collector_hdr -> profilePid );
403+ write_history ,write_profile ,pgws_collector_hdr -> profilePid );
395404
396405if (write_history )
397406{
@@ -430,67 +439,58 @@ collector_main(Datum main_arg)
430439ResetLatch (& MyProc -> procLatch );
431440
432441/* Handle request if any */
433- if (collector_hdr -> request != NO_REQUEST )
442+ if (pgws_collector_hdr -> request != NO_REQUEST )
434443{
435444LOCKTAG tag ;
436- SHMRequest request = collector_hdr -> request ;
445+ SHMRequest request ;
437446
438- init_lock_tag (& tag ,PGWS_COLLECTOR_LOCK );
447+ pgws_init_lock_tag (& tag ,PGWS_COLLECTOR_LOCK );
439448
440449LockAcquire (& tag ,ExclusiveLock , false, false);
441- collector_hdr -> request = NO_REQUEST ;
450+ request = pgws_collector_hdr -> request ;
451+ pgws_collector_hdr -> request = NO_REQUEST ;
442452
443- PG_TRY ();
453+ if ( request == HISTORY_REQUEST || request == PROFILE_REQUEST )
444454{
445- if (request == HISTORY_REQUEST || request == PROFILE_REQUEST )
446- {
447- shm_mq_result mq_result ;
448-
449- /* Send history or profile */
450- shm_mq_set_sender (collector_mq ,MyProc );
451- mqh = shm_mq_attach (collector_mq ,NULL ,NULL );
452- mq_result = shm_mq_wait_for_attach (mqh );
453- switch (mq_result )
454- {
455- case SHM_MQ_SUCCESS :
456- switch (request )
457- {
458- case HISTORY_REQUEST :
459- send_history (& observations ,mqh );
460- break ;
461- case PROFILE_REQUEST :
462- send_profile (profile_hash ,mqh );
463- break ;
464- default :
465- AssertState (false);
466- }
467- break ;
468- case SHM_MQ_DETACHED :
469- ereport (WARNING ,
470- (errmsg ("pg_wait_sampling collector: "
471- "receiver of message queue has been "
472- "detached" )));
473- break ;
474- default :
475- AssertState (false);
476- }
477- shm_mq_detach_compat (mqh ,collector_mq );
478- }
479- else if (request == PROFILE_RESET )
455+ shm_mq_result mq_result ;
456+
457+ /* Send history or profile */
458+ shm_mq_set_sender (pgws_collector_mq ,MyProc );
459+ mqh = shm_mq_attach (pgws_collector_mq ,NULL ,NULL );
460+ mq_result = shm_mq_wait_for_attach (mqh );
461+ switch (mq_result )
480462{
481- /* Reset profile hash */
482- hash_destroy (profile_hash );
483- profile_hash = make_profile_hash ();
463+ case SHM_MQ_SUCCESS :
464+ switch (request )
465+ {
466+ case HISTORY_REQUEST :
467+ send_history (& observations ,mqh );
468+ break ;
469+ case PROFILE_REQUEST :
470+ send_profile (profile_hash ,mqh );
471+ break ;
472+ default :
473+ AssertState (false);
474+ }
475+ break ;
476+ case SHM_MQ_DETACHED :
477+ ereport (WARNING ,
478+ (errmsg ("pg_wait_sampling collector: "
479+ "receiver of message queue have been "
480+ "detached" )));
481+ break ;
482+ default :
483+ AssertState (false);
484484}
485-
486- LockRelease (& tag ,ExclusiveLock , false);
485+ shm_mq_detach_compat (mqh ,pgws_collector_mq );
487486}
488- PG_CATCH ();
487+ else if ( request == PROFILE_RESET )
489488{
490- LockRelease (& tag ,ExclusiveLock , false);
491- PG_RE_THROW ();
489+ /* Reset profile hash */
490+ hash_destroy (profile_hash );
491+ profile_hash = make_profile_hash ();
492492}
493- PG_END_TRY ( );
493+ LockRelease ( & tag , ExclusiveLock , false );
494494}
495495}
496496