@@ -60,6 +60,9 @@ static void qs_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction,
6060#endif
6161static void qs_ExecutorFinish (QueryDesc * queryDesc );
6262
63+ static shm_mq_result receive_msg_by_parts (shm_mq_handle * mqh ,Size * total ,
64+ void * * datap ,bool nowait );
65+
6366/* Global variables */
6467List * QueryDescStack = NIL ;
6568static ProcSignalReason UserIdPollReason = INVALID_PROCSIGNAL ;
@@ -777,7 +780,7 @@ shm_mq_receive_with_timeout(shm_mq_handle *mqh,
777780{
778781shm_mq_result mq_receive_result ;
779782
780- mq_receive_result = shm_mq_receive (mqh ,nbytesp ,datap , true);
783+ mq_receive_result = receive_msg_by_parts (mqh ,nbytesp ,datap , true);
781784if (mq_receive_result != SHM_MQ_WOULD_BLOCK )
782785return mq_receive_result ;
783786if (rc & WL_TIMEOUT || delay <=0 )
@@ -960,51 +963,36 @@ copy_msg(shm_mq_msg *msg)
960963return result ;
961964}
962965
963- // ----------------- DEBUG -----------------
964- static void
965- print_recv_bytes (int num ,char * src ,int offset )
966- {
967- elog (INFO ,"======= RECV MSG SEGMENT START (%d bytes) =======" ,num );
968- for (int i = offset ;i < offset + num ;i ++ )
969- elog (INFO ,"RECV byte #%d = %02x" ,i , (unsignedchar )* (src + i ));
970- }
971- // ----------------- DEBUG -----------------
972-
973966static shm_mq_result
974- shm_mq_receive_by_bytes (shm_mq_handle * mqh ,Size * total ,void * * datap )
967+ receive_msg_by_parts (shm_mq_handle * mqh ,Size * total ,void * * datap ,
968+ bool nowait )
975969{
976970shm_mq_result mq_receive_result ;
977971shm_mq_msg * buff ;
978- int ii ;
979972int offset ;
980973int * expected ;
981974Size len ;
982975
983976/* Get the expected number of bytes in message */
984- mq_receive_result = shm_mq_receive (mqh ,& len , (void * * )& expected ,false );
977+ mq_receive_result = shm_mq_receive (mqh ,& len , (void * * )& expected ,nowait );
985978if (mq_receive_result != SHM_MQ_SUCCESS )
986979return mq_receive_result ;
987980Assert (len == sizeof (int ));
988- //elog(INFO, "======= RECV MSG (expecting %d bytes) =======", *expected);
989981
990982* datap = palloc0 (* expected );
991983
992984/* Get the message itself */
993- for (offset = 0 , ii = 0 ;offset < * expected ;ii ++ )
985+ for (offset = 0 ;offset < * expected ; )
994986{
995- // Keep receiving new messages until we assemble the full message
996- mq_receive_result = shm_mq_receive (mqh ,& len , ((void * * )& buff ),false );
987+ /* Keep receiving new messages until we assemble the full message */
988+ mq_receive_result = shm_mq_receive (mqh ,& len , ((void * * )& buff ),nowait );
997989memcpy ((char * )* datap + offset ,buff ,len );
998- //print_recv_bytes(len, (char *) *datap, offset);
999990offset += len ;
1000991if (mq_receive_result != SHM_MQ_SUCCESS )
1001992return mq_receive_result ;
1002993}
1003994
1004- //elog(INFO, "RECV: END cycle - %d", ii);
1005995* total = offset ;
1006- //mq_receive_result = shm_mq_receive(mqh, &len, (void **) &msg, false);
1007- //*datap = buff;
1008996
1009997return mq_receive_result ;
1010998}
@@ -1081,7 +1069,8 @@ GetRemoteBackendQueryStates(PGPROC *leader,
10811069/* extract query state from leader process */
10821070mqh = shm_mq_attach (mq ,NULL ,NULL );
10831071elog (DEBUG1 ,"Wait response from leader %d" ,leader -> pid );
1084- mq_receive_result = shm_mq_receive_by_bytes (mqh ,& len , ((void * * )& msg ));
1072+ mq_receive_result = receive_msg_by_parts (mqh ,& len , (void * * )& msg ,
1073+ false);
10851074if (mq_receive_result != SHM_MQ_SUCCESS )
10861075gotomq_error ;
10871076if (msg -> reqid != reqid )