@@ -27,7 +27,16 @@ typedef struct
2727char * plan ;
2828}stack_frame ;
2929
30- static void send_msg_by_parts (shm_mq_handle * mqh ,Size nbytes ,const void * data );
30+ /*
31+ * An self-explanarory enum describing the send_msg_by_parts results
32+ */
33+ typedef enum
34+ {
35+ MSG_BY_PARTS_SUCCEEDED ,
36+ MSG_BY_PARTS_FAILED
37+ }msg_by_parts_result ;
38+
39+ static msg_by_parts_result send_msg_by_parts (shm_mq_handle * mqh ,Size nbytes ,const void * data );
3140
3241/*
3342 *Get List of stack_frames as a stack of function calls starting from outermost call.
@@ -151,22 +160,57 @@ serialize_stack(char *dest, List *qs_stack)
151160}
152161}
153162
154- static void
163+ static msg_by_parts_result
164+ shm_mq_send_nonblocking (shm_mq_handle * mqh ,Size nbytes ,const void * data ,Size attempts )
165+ {
166+ int i ;
167+ shm_mq_result res ;
168+
169+ for (i = 0 ;i < attempts ;i ++ )
170+ {
171+ res = shm_mq_send (mqh ,nbytes ,data , true);
172+
173+ if (res == SHM_MQ_SUCCESS )
174+ break ;
175+ else if (res == SHM_MQ_DETACHED )
176+ return MSG_BY_PARTS_FAILED ;
177+
178+ /* SHM_MQ_WOULD_BLOCK - sleeping for some delay */
179+ pg_usleep (WRITING_DELAY );
180+ }
181+
182+ if (i == attempts )
183+ return MSG_BY_PARTS_FAILED ;
184+
185+ return MSG_BY_PARTS_SUCCEEDED ;
186+ }
187+
188+ /*
189+ * send_msg_by_parts sends data throurh the queue as a bunch of messages
190+ * of smaller size
191+ */
192+ static msg_by_parts_result
155193send_msg_by_parts (shm_mq_handle * mqh ,Size nbytes ,const void * data )
156194{
157195int bytes_left ;
158196int bytes_send ;
159197int offset ;
160198
161199/* Send the expected message length */
162- shm_mq_send (mqh ,sizeof (Size ),& nbytes , false);
200+ if (shm_mq_send_nonblocking (mqh ,sizeof (Size ),& nbytes ,NUM_OF_ATTEMPTS )== MSG_BY_PARTS_FAILED )
201+ return MSG_BY_PARTS_FAILED ;
163202
203+ /* Send the message itself */
164204for (offset = 0 ;offset < nbytes ;offset += bytes_send )
165205{
166206bytes_left = nbytes - offset ;
167207bytes_send = (bytes_left < MSG_MAX_SIZE ) ?bytes_left :MSG_MAX_SIZE ;
168- shm_mq_send (mqh ,bytes_send ,& (((unsignedchar * )data )[offset ]), false);
208+ if (shm_mq_send_nonblocking (mqh ,bytes_send ,& (((unsignedchar * )data )[offset ]),NUM_OF_ATTEMPTS )
209+ == MSG_BY_PARTS_FAILED )
210+ return MSG_BY_PARTS_FAILED ;
169211}
212+
213+ return MSG_BY_PARTS_SUCCEEDED ;
170214}
171215
172216/*
@@ -227,15 +271,17 @@ SendQueryState(void)
227271{
228272shm_mq_msg msg = {reqid ,BASE_SIZEOF_SHM_MQ_MSG ,MyProc ,STAT_DISABLED };
229273
230- send_msg_by_parts (mqh ,msg .length ,& msg );
274+ if (send_msg_by_parts (mqh ,msg .length ,& msg )!= MSG_BY_PARTS_SUCCEEDED )
275+ gotoconnection_cleanup ;
231276}
232277
233278/* check if backend doesn't execute any query */
234279else if (list_length (QueryDescStack )== 0 )
235280{
236281shm_mq_msg msg = {reqid ,BASE_SIZEOF_SHM_MQ_MSG ,MyProc ,QUERY_NOT_RUNNING };
237282
238- send_msg_by_parts (mqh ,msg .length ,& msg );
283+ if (send_msg_by_parts (mqh ,msg .length ,& msg )!= MSG_BY_PARTS_SUCCEEDED )
284+ gotoconnection_cleanup ;
239285}
240286
241287/* happy path */
@@ -258,9 +304,25 @@ SendQueryState(void)
258304
259305msg -> stack_depth = list_length (qs_stack );
260306serialize_stack (msg -> stack ,qs_stack );
261- send_msg_by_parts (mqh ,msglen ,msg );
307+
308+ if (send_msg_by_parts (mqh ,msglen ,msg )!= MSG_BY_PARTS_SUCCEEDED )
309+ {
310+ elog (WARNING ,"pg_query_state: peer seems to have detached" );
311+ gotoconnection_cleanup ;
312+ }
262313}
263314elog (DEBUG1 ,"Worker %d sends response for pg_query_state to %d" ,shm_mq_get_sender (mq )-> pid ,shm_mq_get_receiver (mq )-> pid );
264315DetachPeer ();
265316UnlockShmem (& tag );
317+
318+ return ;
319+
320+ connection_cleanup :
321+ #if PG_VERSION_NUM < 100000
322+ shm_mq_detach (mq );
323+ #else
324+ shm_mq_detach (mqh );
325+ #endif
326+ DetachPeer ();
327+ UnlockShmem (& tag );
266328}