@@ -47,6 +47,11 @@ shm_mq *collector_mq = NULL;
4747uint64 * proc_queryids = NULL ;
4848CollectorShmqHeader * collector_hdr = NULL ;
4949
50+ /* Receiver (backend) local shm_mq pointers and lock */
51+ shm_mq * recv_mq = NULL ;
52+ shm_mq_handle * recv_mqh = NULL ;
53+ LOCKTAG queueTag ;
54+
5055static shmem_startup_hook_type prev_shmem_startup_hook = NULL ;
5156static PGPROC * search_proc (int backendPid );
5257static PlannedStmt * pgws_planner_hook (Query * parse ,
@@ -290,6 +295,14 @@ check_shmem(void)
290295}
291296}
292297
298+ static void
299+ pgws_cleanup_callback (int code ,Datum arg )
300+ {
301+ elog (DEBUG3 ,"pg_wait_sampling cleanup: detaching shm_mq and releasing queue lock" );
302+ shm_mq_detach_compat (recv_mqh ,recv_mq );
303+ LockRelease (& queueTag ,ExclusiveLock , false);
304+ }
305+
293306/*
294307 * Module load callback
295308 */
@@ -499,16 +512,14 @@ init_lock_tag(LOCKTAG *tag, uint32 lock)
499512static void *
500513receive_array (SHMRequest request ,Size item_size ,Size * count )
501514{
502- LOCKTAG queueTag ;
503515LOCKTAG collectorTag ;
504- shm_mq * mq ;
505- shm_mq_handle * mqh ;
506516shm_mq_result res ;
507517Size len ,
508518i ;
509519void * data ;
510520Pointer result ,
511521ptr ;
522+ MemoryContext oldctx ;
512523
513524/* Ensure nobody else trying to send request to queue */
514525init_lock_tag (& queueTag ,PGWS_QUEUE_LOCK );
@@ -519,7 +530,7 @@ receive_array(SHMRequest request, Size item_size, Size *count)
519530LockAcquire (& collectorTag ,ExclusiveLock , false, false);
520531LockRelease (& collectorTag ,ExclusiveLock , false);
521532
522- mq = shm_mq_create (collector_mq ,COLLECTOR_QUEUE_SIZE );
533+ recv_mq = shm_mq_create (collector_mq ,COLLECTOR_QUEUE_SIZE );
523534collector_hdr -> request = request ;
524535
525536if (!collector_hdr -> latch )
@@ -528,33 +539,55 @@ receive_array(SHMRequest request, Size item_size, Size *count)
528539
529540SetLatch (collector_hdr -> latch );
530541
531- shm_mq_set_receiver (mq ,MyProc );
532- mqh = shm_mq_attach (mq ,NULL ,NULL );
542+ shm_mq_set_receiver (recv_mq ,MyProc );
533543
534- res = shm_mq_receive (mqh ,& len ,& data , false);
535- if (res != SHM_MQ_SUCCESS || len != sizeof (* count ))
536- {
537- shm_mq_detach_compat (mqh ,mq );
538- elog (ERROR ,"Error reading mq." );
539- }
540- memcpy (count ,data ,sizeof (* count ));
541-
542- result = palloc (item_size * (* count ));
543- ptr = result ;
544+ /*
545+ * We switch to TopMemoryContext, so that recv_mqh is allocated there
546+ * and is guaranteed to survive until before_shmem_exit callbacks are
547+ * fired. Anyway, shm_mq_detach() will free handler on its own.
548+ */
549+ oldctx = MemoryContextSwitchTo (TopMemoryContext );
550+ recv_mqh = shm_mq_attach (recv_mq ,NULL ,NULL );
551+ MemoryContextSwitchTo (oldctx );
544552
545- for (i = 0 ;i < * count ;i ++ )
553+ /*
554+ * Now we surely attached to the shm_mq and got collector's attention.
555+ * If anything went wrong (e.g. Ctrl+C received from the client) we have
556+ * to cleanup some things, i.e. detach from the shm_mq, so collector was
557+ * able to continue responding to other requests.
558+ *
559+ * PG_ENSURE_ERROR_CLEANUP() guaranties that cleanup callback will be
560+ * fired for both ERROR and FATAL.
561+ */
562+ PG_ENSURE_ERROR_CLEANUP (pgws_cleanup_callback ,0 );
546563{
547- res = shm_mq_receive (mqh ,& len ,& data , false);
548- if (res != SHM_MQ_SUCCESS || len != item_size )
564+ res = shm_mq_receive (recv_mqh ,& len ,& data , false);
565+ if (res != SHM_MQ_SUCCESS || len != sizeof ( * count ) )
549566{
550- shm_mq_detach_compat (mqh , mq );
567+ shm_mq_detach_compat (recv_mqh , recv_mq );
551568elog (ERROR ,"Error reading mq." );
552569}
553- memcpy (ptr ,data ,item_size );
554- ptr += item_size ;
570+ memcpy (count ,data ,sizeof (* count ));
571+
572+ result = palloc (item_size * (* count ));
573+ ptr = result ;
574+
575+ for (i = 0 ;i < * count ;i ++ )
576+ {
577+ res = shm_mq_receive (recv_mqh ,& len ,& data , false);
578+ if (res != SHM_MQ_SUCCESS || len != item_size )
579+ {
580+ shm_mq_detach_compat (recv_mqh ,recv_mq );
581+ elog (ERROR ,"Error reading mq." );
582+ }
583+ memcpy (ptr ,data ,item_size );
584+ ptr += item_size ;
585+ }
555586}
587+ PG_END_ENSURE_ERROR_CLEANUP (pgws_cleanup_callback ,0 );
556588
557- shm_mq_detach_compat (mqh ,mq );
589+ /* We still have to detach and release lock during normal operation. */
590+ shm_mq_detach_compat (recv_mqh ,recv_mq );
558591
559592LockRelease (& queueTag ,ExclusiveLock , false);
560593