@@ -61,7 +61,7 @@ static void qs_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction,
6161static void qs_ExecutorFinish (QueryDesc * queryDesc );
6262
6363static shm_mq_result receive_msg_by_parts (shm_mq_handle * mqh ,Size * total ,
64- void * * datap ,bool nowait );
64+ void * * datap ,int64 timeout , int * rc , bool nowait );
6565
6666/* Global variables */
6767List * QueryDescStack = NIL ;
@@ -780,7 +780,7 @@ shm_mq_receive_with_timeout(shm_mq_handle *mqh,
780780{
781781shm_mq_result mq_receive_result ;
782782
783- mq_receive_result = receive_msg_by_parts (mqh ,nbytesp ,datap , true);
783+ mq_receive_result = receive_msg_by_parts (mqh ,nbytesp ,datap ,timeout , & rc , true);
784784if (mq_receive_result != SHM_MQ_WOULD_BLOCK )
785785return mq_receive_result ;
786786if (rc & WL_TIMEOUT || delay <=0 )
@@ -967,33 +967,61 @@ copy_msg(shm_mq_msg *msg)
967967
968968static shm_mq_result
969969receive_msg_by_parts (shm_mq_handle * mqh ,Size * total ,void * * datap ,
970- bool nowait )
970+ int64 timeout , int * rc , bool nowait )
971971{
972972shm_mq_result mq_receive_result ;
973973shm_mq_msg * buff ;
974974int offset ;
975- Size * expected ;
976- Size expected_data ;
975+ Size * expected ;
976+ Size expected_data ;
977977Size len ;
978978
979979/* Get the expected number of bytes in message */
980980mq_receive_result = shm_mq_receive (mqh ,& len , (void * * )& expected ,nowait );
981- expected_data = * expected ;
982981if (mq_receive_result != SHM_MQ_SUCCESS )
983982return mq_receive_result ;
984983Assert (len == sizeof (Size ));
985984
985+ expected_data = * expected ;
986986* datap = palloc0 (expected_data );
987987
988988/* Get the message itself */
989989for (offset = 0 ;offset < expected_data ; )
990990{
991+ int64 delay = timeout ;
991992/* Keep receiving new messages until we assemble the full message */
992- mq_receive_result = shm_mq_receive (mqh ,& len , ((void * * )& buff ),nowait );
993+ for (;;)
994+ {
995+ mq_receive_result = shm_mq_receive (mqh ,& len , ((void * * )& buff ),nowait );
996+ if (mq_receive_result != SHM_MQ_SUCCESS )
997+ {
998+ if (nowait && mq_receive_result == SHM_MQ_WOULD_BLOCK )
999+ {
1000+ /*
1001+ * We can't leave this function during reading parts with
1002+ * error code SHM_MQ_WOULD_BLOCK because can be be error
1003+ * at next call receive_msg_by_parts() with continuing
1004+ * reading non-readed parts.
1005+ * So we should wait whole MAX_RCV_TIMEOUT timeout and
1006+ * return error after that only.
1007+ */
1008+ if (delay > 0 )
1009+ {
1010+ pg_usleep (PART_RCV_DELAY * 1000 );
1011+ delay -= PART_RCV_DELAY ;
1012+ continue ;
1013+ }
1014+ if (rc )
1015+ {/* Mark that the timeout has expired: */
1016+ * rc |=WL_TIMEOUT ;
1017+ }
1018+ }
1019+ return mq_receive_result ;
1020+ }
1021+ break ;
1022+ }
9931023memcpy ((char * )* datap + offset ,buff ,len );
9941024offset += len ;
995- if (mq_receive_result != SHM_MQ_SUCCESS )
996- return mq_receive_result ;
9971025}
9981026
9991027* total = offset ;
@@ -1074,7 +1102,7 @@ GetRemoteBackendQueryStates(PGPROC *leader,
10741102mqh = shm_mq_attach (mq ,NULL ,NULL );
10751103elog (DEBUG1 ,"Wait response from leader %d" ,leader -> pid );
10761104mq_receive_result = receive_msg_by_parts (mqh ,& len , (void * * )& msg ,
1077- false);
1105+ 0 , NULL , false);
10781106if (mq_receive_result != SHM_MQ_SUCCESS )
10791107gotomq_error ;
10801108if (msg -> reqid != reqid )