Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf79ebd9

Browse files
author
Daniel Shelepanov
committed
[PGPRO-4561] Non-blocking writing to the queue
To avoid deadlocking while canceling the pg_query_state() callwriting to the queue is implemented non-blocking way.
1 parentfbf7706 commitf79ebd9

File tree

3 files changed

+72
-8
lines changed

3 files changed

+72
-8
lines changed

‎pg_query_state.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1141,7 +1141,7 @@ GetRemoteBackendQueryStates(PGPROC *leader,
11411141
MAX_RCV_TIMEOUT);
11421142
if (mq_receive_result!=SHM_MQ_SUCCESS)
11431143
{
1144-
/* counterpart isdied, notconsider it */
1144+
/* counterpart isdead, notconsidering it */
11451145
gotomq_error;
11461146
}
11471147
if (msg->reqid!=reqid)

‎pg_query_state.h‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
#defineQUEUE_SIZE(16 * 1024)
2121
#defineMSG_MAX_SIZE1024
22+
#defineWRITING_DELAY(100 * 1000) // 100ms
23+
#defineNUM_OF_ATTEMPTS6
2224

2325
#defineTIMINIG_OFF_WARNING 1
2426
#defineBUFFERS_OFF_WARNING 2

‎signal_handler.c‎

Lines changed: 69 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,16 @@ typedef struct
2727
char*plan;
2828
}stack_frame;
2929

30-
staticvoidsend_msg_by_parts(shm_mq_handle*mqh,Sizenbytes,constvoid*data);
30+
/*
31+
* An self-explanarory enum describing the send_msg_by_parts results
32+
*/
33+
typedefenum
34+
{
35+
MSG_BY_PARTS_SUCCEEDED,
36+
MSG_BY_PARTS_FAILED
37+
}msg_by_parts_result;
38+
39+
staticmsg_by_parts_resultsend_msg_by_parts(shm_mq_handle*mqh,Sizenbytes,constvoid*data);
3140

3241
/*
3342
*Get List of stack_frames as a stack of function calls starting from outermost call.
@@ -151,22 +160,57 @@ serialize_stack(char *dest, List *qs_stack)
151160
}
152161
}
153162

154-
staticvoid
163+
staticmsg_by_parts_result
164+
shm_mq_send_nonblocking(shm_mq_handle*mqh,Sizenbytes,constvoid*data,Sizeattempts)
165+
{
166+
inti;
167+
shm_mq_resultres;
168+
169+
for(i=0;i<attempts;i++)
170+
{
171+
res=shm_mq_send(mqh,nbytes,data, true);
172+
173+
if(res==SHM_MQ_SUCCESS)
174+
break;
175+
elseif (res==SHM_MQ_DETACHED)
176+
returnMSG_BY_PARTS_FAILED;
177+
178+
/* SHM_MQ_WOULD_BLOCK - sleeping for some delay */
179+
pg_usleep(WRITING_DELAY);
180+
}
181+
182+
if(i==attempts)
183+
returnMSG_BY_PARTS_FAILED;
184+
185+
returnMSG_BY_PARTS_SUCCEEDED;
186+
}
187+
188+
/*
189+
* send_msg_by_parts sends data throurh the queue as a bunch of messages
190+
* of smaller size
191+
*/
192+
staticmsg_by_parts_result
155193
send_msg_by_parts(shm_mq_handle*mqh,Sizenbytes,constvoid*data)
156194
{
157195
intbytes_left;
158196
intbytes_send;
159197
intoffset;
160198

161199
/* Send the expected message length */
162-
shm_mq_send(mqh,sizeof(Size),&nbytes, false);
200+
if(shm_mq_send_nonblocking(mqh,sizeof(Size),&nbytes,NUM_OF_ATTEMPTS)==MSG_BY_PARTS_FAILED)
201+
returnMSG_BY_PARTS_FAILED;
163202

203+
/* Send the message itself */
164204
for (offset=0;offset<nbytes;offset+=bytes_send)
165205
{
166206
bytes_left=nbytes-offset;
167207
bytes_send= (bytes_left<MSG_MAX_SIZE) ?bytes_left :MSG_MAX_SIZE;
168-
shm_mq_send(mqh,bytes_send,&(((unsignedchar*)data)[offset]), false);
208+
if(shm_mq_send_nonblocking(mqh,bytes_send,&(((unsignedchar*)data)[offset]),NUM_OF_ATTEMPTS)
209+
==MSG_BY_PARTS_FAILED)
210+
returnMSG_BY_PARTS_FAILED;
169211
}
212+
213+
returnMSG_BY_PARTS_SUCCEEDED;
170214
}
171215

172216
/*
@@ -227,15 +271,17 @@ SendQueryState(void)
227271
{
228272
shm_mq_msgmsg= {reqid,BASE_SIZEOF_SHM_MQ_MSG,MyProc,STAT_DISABLED };
229273

230-
send_msg_by_parts(mqh,msg.length,&msg);
274+
if(send_msg_by_parts(mqh,msg.length,&msg)!=MSG_BY_PARTS_SUCCEEDED)
275+
gotoconnection_cleanup;
231276
}
232277

233278
/* check if backend doesn't execute any query */
234279
elseif (list_length(QueryDescStack)==0)
235280
{
236281
shm_mq_msgmsg= {reqid,BASE_SIZEOF_SHM_MQ_MSG,MyProc,QUERY_NOT_RUNNING };
237282

238-
send_msg_by_parts(mqh,msg.length,&msg);
283+
if(send_msg_by_parts(mqh,msg.length,&msg)!=MSG_BY_PARTS_SUCCEEDED)
284+
gotoconnection_cleanup;
239285
}
240286

241287
/* happy path */
@@ -258,9 +304,25 @@ SendQueryState(void)
258304

259305
msg->stack_depth=list_length(qs_stack);
260306
serialize_stack(msg->stack,qs_stack);
261-
send_msg_by_parts(mqh,msglen,msg);
307+
308+
if(send_msg_by_parts(mqh,msglen,msg)!=MSG_BY_PARTS_SUCCEEDED)
309+
{
310+
elog(WARNING,"pg_query_state: peer seems to have detached");
311+
gotoconnection_cleanup;
312+
}
262313
}
263314
elog(DEBUG1,"Worker %d sends response for pg_query_state to %d",shm_mq_get_sender(mq)->pid,shm_mq_get_receiver(mq)->pid);
264315
DetachPeer();
265316
UnlockShmem(&tag);
317+
318+
return;
319+
320+
connection_cleanup:
321+
#ifPG_VERSION_NUM<100000
322+
shm_mq_detach(mq);
323+
#else
324+
shm_mq_detach(mqh);
325+
#endif
326+
DetachPeer();
327+
UnlockShmem(&tag);
266328
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp