Expand Up @@ -2,37 +2,65 @@ * collector.c *Collector of wait event history and profile. * * Copyright (c) 2015-2016 , Postgres Professional * Copyright (c) 2015-2025 , Postgres Professional * * IDENTIFICATION * contrib/pg_wait_sampling/pg_wait_sampling.c */ #include "postgres.h" #include "catalog/pg_type.h" #if PG_VERSION_NUM >= 130000 #include "common/hashfn.h" #endif #include "funcapi.h" #include "miscadmin.h" #include "pg_wait_sampling.h" #include "postmaster/bgworker.h" #include "postmaster/interrupt.h" #include "storage/ipc.h" #include "storage/procarray.h" #include "storage/latch.h" #include "storage/lock.h" #include "storage/lwlock.h" #include "storage/proc.h" #include "storage/procsignal.h" #include "storage/shm_mq.h" #include "storage/shm_toc .h" #include "storage/spin .h" #include "utils/memutils .h" #include "utils/resowner.h" #include "utils/guc .h" #include "utils/hsearch .h" #include "utils/timestamp .h" #if PG_VERSION_NUM < 140000 #include "pgstat.h" #else #include "utils/wait_event.h" #endif #include "compat.h" #include "pg_wait_sampling.h" static inline shm_mq_result shm_mq_send_compat(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush) { #if PG_VERSION_NUM >= 150000 return shm_mq_send(mqh, nbytes, data, nowait, force_flush); #else return shm_mq_send(mqh, nbytes, data, nowait); #endif } static volatile sig_atomic_t shutdown_requested = false; #if PG_VERSION_NUM < 170000 #define INIT_PG_LOAD_SESSION_LIBS0x0001 #define INIT_PG_OVERRIDE_ALLOW_CONNS0x0002 #endif static void handle_sigterm(SIGNAL_ARGS); static inline void InitPostgresCompat(const char *in_dbname, Oid dboid, const char *username, Oid useroid, bits32 flags, char *out_dbname) { #if PG_VERSION_NUM >= 170000 InitPostgres(in_dbname, dboid, username, useroid, flags, out_dbname); #elif PG_VERSION_NUM >= 150000 InitPostgres(in_dbname, dboid, username, useroid, flags & INIT_PG_LOAD_SESSION_LIBS, flags & INIT_PG_OVERRIDE_ALLOW_CONNS, out_dbname); #else InitPostgres(in_dbname, dboid, username, useroid, out_dbname, flags & INIT_PG_OVERRIDE_ALLOW_CONNS); #endif } /* * Register background worker for collecting waits history. Expand Down Expand Up @@ -111,16 +139,6 @@ realloc_history(History *observations, int count) observations->wraparound = false; } static void handle_sigterm(SIGNAL_ARGS) { int save_errno = errno; shutdown_requested = true; if (MyProc) SetLatch(&MyProc->procLatch); errno = save_errno; } /* * Get next item of history with rotation. */ Expand All @@ -129,6 +147,7 @@ get_next_observation(History *observations) { HistoryItem *result; /* Check for wraparound */ if (observations->index >= observations->count) { observations->index = 0; Expand Down Expand Up @@ -215,6 +234,7 @@ send_history(History *observations, shm_mq_handle *mqh) else count = observations->index; /* Send array size first since receive_array expects this */ mq_result = shm_mq_send_compat(mqh, sizeof(count), &count, false, true); if (mq_result == SHM_MQ_DETACHED) { Expand Down Expand Up @@ -251,6 +271,7 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh) Sizecount = hash_get_num_entries(profile_hash); shm_mq_resultmq_result; /* Send array size first since receive_array expects this */ mq_result = shm_mq_send_compat(mqh, sizeof(count), &count, false, true); if (mq_result == SHM_MQ_DETACHED) { Expand Down Expand Up @@ -283,32 +304,11 @@ make_profile_hash() { HASHCTL hash_ctl; hash_ctl.hash = tag_hash; hash_ctl.hcxt = TopMemoryContext; if (pgws_profileQueries) hash_ctl.keysize = offsetof(ProfileItem, count); else hash_ctl.keysize = offsetof(ProfileItem, queryId); /* We always include queryId in hash key */ hash_ctl.keysize = offsetof(ProfileItem, count); hash_ctl.entrysize = sizeof(ProfileItem); return hash_create("Waits profile hash", 1024, &hash_ctl, HASH_FUNCTION | HASH_ELEM); } /* * Delta between two timestamps in milliseconds. */ static int64 millisecs_diff(TimestampTz tz1, TimestampTz tz2) { longsecs; intmicrosecs; TimestampDifference(tz1, tz2, &secs, µsecs); return secs * 1000 + microsecs / 1000; HASH_ELEM | HASH_BLOBS); } /* Expand All @@ -319,77 +319,49 @@ pgws_collector_main(Datum main_arg) { HTAB *profile_hash = NULL; Historyobservations; MemoryContextold_context, collector_context; TimestampTzcurrent_ts, history_ts, profile_ts; /* * Establish signal handlers. * * We want CHECK_FOR_INTERRUPTS() to kill off this worker process just as * it would a normal user backend. To make that happen, we establish a * signal handler that is a stripped-down version of die(). We don't have * any equivalent of the backend's command-read loop, where interrupts can * be processed immediately, so make sure ImmediateInterruptOK is turned * off. * * We also want to respond to the ProcSignal notifications. This is done * in the upstream provided procsignal_sigusr1_handler, which is * automatically used if a bgworker connects to a database. But since our * worker doesn't connect to any database even though it calls * InitPostgres, which will still initializze a new backend and thus * partitipate to the ProcSignal infrastructure. */ pqsignal(SIGTERM, handle_sigterm); /* Establish signal handlers */ pqsignal(SIGHUP, SignalHandlerForConfigReload); pqsignal(SIGUSR1, procsignal_sigusr1_handler); BackgroundWorkerUnblockSignals(); InitPostgresCompat(NULL, InvalidOid, NULL, InvalidOid, 0, NULL); SetProcessingMode(NormalProcessing); /* Make pg_wait_sampling recognisable in pg_stat_activity */ pgstat_report_appname("pg_wait_sampling collector"); profile_hash = make_profile_hash(); pgws_collector_hdr->latch = &MyProc->procLatch; CurrentResourceOwner = ResourceOwnerCreate(NULL, "pg_wait_sampling collector"); collector_context = AllocSetContextCreate(TopMemoryContext, "pg_wait_sampling context", ALLOCSET_DEFAULT_SIZES); old_context = MemoryContextSwitchTo(collector_context); alloc_history(&observations, pgws_historySize); MemoryContextSwitchTo(old_context );profile_hash = make_profile_hash( );ereport(LOG,( errmsg("pg_wait_sampling collector started") )); ereport(LOG, errmsg("pg_wait_sampling collector started")); /* Start counting time for history and profile samples */ profile_ts = history_ts = GetCurrentTimestamp(); while (1) { intrc; shm_mq_handle *mqh; int64history_diff, profile_diff; boolwrite_history, write_profile; /* We need an explicit call for at least ProcSignal notifications. */ CHECK_FOR_INTERRUPTS(); HandleMainLoopInterrupts(); if (ConfigReloadPending) { ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); } /*Wait calculate time to next sample for history or profile */ /*Calculate time to next sample for history or profile */ current_ts = GetCurrentTimestamp(); history_diff =millisecs_diff (history_ts, current_ts); profile_diff =millisecs_diff (profile_ts, current_ts); history_diff =TimestampDifferenceMilliseconds (history_ts, current_ts); profile_diff =TimestampDifferenceMilliseconds (profile_ts, current_ts); write_history = (history_diff >= (int64)pgws_historyPeriod); write_profile = (profile_diff >= (int64)pgws_profilePeriod); Expand All @@ -412,20 +384,15 @@ pgws_collector_main(Datum main_arg) } } /* Shutdown if requested */ if (shutdown_requested) break; /* * Waituntil next sample time or request to do something through * Waitfor sample time or until request to do something through * shared memory. */ rc = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, Min(pgws_historyPeriod - (int)history_diff, pgws_historyPeriod - (int)profile_diff), PG_WAIT_EXTENSION); if (rc & WL_POSTMASTER_DEATH) proc_exit(1); WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, Min(pgws_historyPeriod - (int)history_diff, pgws_profilePeriod - (int)profile_diff), PG_WAIT_EXTENSION); ResetLatch(&MyProc->procLatch); Expand Down Expand Up @@ -484,15 +451,4 @@ pgws_collector_main(Datum main_arg) LockRelease(&tag, ExclusiveLock, false); } } MemoryContextReset(collector_context); /* * We're done. Explicitly detach the shared memory segment so that we * don't get a resource leak warning at commit time. This will fire any * on_dsm_detach callbacks we've registered, as well. Once that's done, * we can go ahead and exit. */ ereport(LOG, (errmsg("pg_wait_sampling collector shutting down"))); proc_exit(0); }