@@ -36,8 +36,6 @@ PG_MODULE_MAGIC;
3636#define PG_QS_MODULE_KEY 0xCA94B108
3737#define PG_QUERY_STATE_KEY 0
3838
39- #define MIN_TIMEOUT 5000
40-
4139#define TEXT_CSTR_CMP (text ,cstr ) \
4240(memcmp(VARDATA(text), (cstr), VARSIZE(text) - VARHDRSZ))
4341
@@ -516,11 +514,14 @@ pg_query_state(PG_FUNCTION_ARGS)
516514init_lock_tag (& tag ,PG_QUERY_STATE_KEY );
517515LockAcquire (& tag ,ExclusiveLock , false, false);
518516
519- for (i = 0 ;pg_atomic_read_u32 (& counterpart_userid -> n_peers )!= 0 && i < MIN_TIMEOUT /1000 ;i ++ )
517+ for (i = 0 ;pg_atomic_read_u32 (& counterpart_userid -> n_peers )!= 0 && i <= MAX_TIMEOUT /1000 ;i ++ )
520518{
521519pg_usleep (1000000 );/* wait one second */
522520CHECK_FOR_INTERRUPTS ();
523521}
522+ if (i > MAX_TIMEOUT /1000 )
523+ elog (WARNING ,"pg_query_state: last request was interrupted" );
524+
524525pg_atomic_write_u32 (& counterpart_userid -> n_peers ,1 );
525526
526527counterpart_user_id = GetRemoteBackendUserId (proc );
@@ -741,15 +742,15 @@ shm_mq_receive_with_timeout(shm_mq_handle *mqh,
741742{
742743int rc = 0 ;
743744long delay = timeout ;
745+ instr_time start_time ;
746+ instr_time cur_time ;
747+
748+ INSTR_TIME_SET_CURRENT (start_time );
744749
745750for (;;)
746751{
747- instr_time start_time ;
748- instr_time cur_time ;
749752shm_mq_result mq_receive_result ;
750753
751- INSTR_TIME_SET_CURRENT (start_time );
752-
753754mq_receive_result = shm_mq_receive (mqh ,nbytesp ,datap , true);
754755if (mq_receive_result != SHM_MQ_WOULD_BLOCK )
755756return mq_receive_result ;
@@ -772,6 +773,8 @@ shm_mq_receive_with_timeout(shm_mq_handle *mqh,
772773INSTR_TIME_SUBTRACT (cur_time ,start_time );
773774
774775delay = timeout - (long )INSTR_TIME_GET_MILLISEC (cur_time );
776+ if (delay <=0 )
777+ return SHM_MQ_WOULD_BLOCK ;
775778
776779CHECK_FOR_INTERRUPTS ();
777780ResetLatch (MyLatch );
@@ -970,6 +973,9 @@ GetRemoteBackendQueryStates(PGPROC *leader,
970973PGPROC * proc = (PGPROC * )lfirst (iter );
971974if (!proc || !proc -> pid )
972975continue ;
976+
977+ pg_atomic_add_fetch_u32 (& counterpart_userid -> n_peers ,1 );
978+
973979sig_result = SendProcSignal (proc -> pid ,
974980QueryStatePollReason ,
975981proc -> backendId );
@@ -980,7 +986,6 @@ GetRemoteBackendQueryStates(PGPROC *leader,
980986continue ;
981987}
982988
983- pg_atomic_add_fetch_u32 (& counterpart_userid -> n_peers ,1 );
984989alive_procs = lappend (alive_procs ,proc );
985990}
986991
@@ -1018,7 +1023,7 @@ GetRemoteBackendQueryStates(PGPROC *leader,
10181023mq_receive_result = shm_mq_receive_with_timeout (mqh ,
10191024& len ,
10201025(void * * )& msg ,
1021- MIN_TIMEOUT );
1026+ MAX_TIMEOUT );
10221027if (mq_receive_result != SHM_MQ_SUCCESS )
10231028/* counterpart is died, not consider it */
10241029continue ;