@@ -86,6 +86,7 @@ typedef struct
8686slock_t mutex ;/* protect concurrent access to `userid` */
8787Oid userid ;
8888Latch * caller ;
89+ pg_atomic_uint32 n_peers ;
8990}RemoteUserIdResult ;
9091
9192static void SendCurrentUserId (void );
@@ -150,6 +151,7 @@ pg_qs_shmem_startup(void)
150151counterpart_userid = shm_toc_allocate (toc ,sizeof (RemoteUserIdResult ));
151152shm_toc_insert (toc ,num_toc ++ ,counterpart_userid );
152153SpinLockInit (& counterpart_userid -> mutex );
154+ pg_atomic_init_u32 (& counterpart_userid -> n_peers ,0 );
153155
154156params = shm_toc_allocate (toc ,sizeof (pg_qs_params ));
155157shm_toc_insert (toc ,num_toc ++ ,params );
@@ -481,6 +483,7 @@ pg_query_state(PG_FUNCTION_ARGS)
481483shm_mq_msg * msg ;
482484List * bg_worker_procs = NIL ;
483485List * msgs ;
486+ int i ;
484487
485488if (!module_initialized )
486489ereport (ERROR , (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
@@ -513,6 +516,13 @@ pg_query_state(PG_FUNCTION_ARGS)
513516init_lock_tag (& tag ,PG_QUERY_STATE_KEY );
514517LockAcquire (& tag ,ExclusiveLock , false, false);
515518
519+ for (i = 0 ;pg_atomic_read_u32 (& counterpart_userid -> n_peers )!= 0 && i < MIN_TIMEOUT /1000 ;i ++ )
520+ {
521+ pg_usleep (1000000 );/* wait one second */
522+ CHECK_FOR_INTERRUPTS ();
523+ }
524+ pg_atomic_write_u32 (& counterpart_userid -> n_peers ,1 );
525+
516526counterpart_user_id = GetRemoteBackendUserId (proc );
517527if (!(superuser ()|| GetUserId ()== counterpart_user_id ))
518528ereport (ERROR , (errcode (ERRCODE_INSUFFICIENT_PRIVILEGE ),
@@ -970,6 +980,7 @@ GetRemoteBackendQueryStates(PGPROC *leader,
970980continue ;
971981}
972982
983+ pg_atomic_add_fetch_u32 (& counterpart_userid -> n_peers ,1 );
973984alive_procs = lappend (alive_procs ,proc );
974985}
975986
@@ -1023,7 +1034,6 @@ GetRemoteBackendQueryStates(PGPROC *leader,
10231034shm_mq_detach (mqh );
10241035#endif
10251036}
1026-
10271037return result ;
10281038
10291039signal_error :
@@ -1033,3 +1043,12 @@ GetRemoteBackendQueryStates(PGPROC *leader,
10331043ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
10341044errmsg ("error in message queue data transmitting" )));
10351045}
1046+
1047+ void
1048+ DetachPeer (void )
1049+ {
1050+ int n_peers = pg_atomic_fetch_sub_u32 (& counterpart_userid -> n_peers ,1 );
1051+ if (n_peers <=0 )
1052+ ereport (LOG , (errcode (ERRCODE_INTERNAL_ERROR ),
1053+ errmsg ("pg_query_state peer is not responding" )));
1054+ }