Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9ad9d93

Browse files
committed
[refer #PGPRO-4197] Add request ID
1 parent81a915a commit9ad9d93

File tree

3 files changed

+25
-5
lines changed

3 files changed

+25
-5
lines changed

‎pg_query_state.c

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -952,6 +952,7 @@ GetRemoteBackendQueryStates(PGPROC *leader,
952952
shm_mq_resultmq_receive_result;
953953
shm_mq_msg*msg;
954954
Sizelen;
955+
staticintreqid=0;
955956

956957
Assert(QueryStatePollReason!=INVALID_PROCSIGNAL);
957958
Assert(mq);
@@ -963,6 +964,7 @@ GetRemoteBackendQueryStates(PGPROC *leader,
963964
params->buffers=buffers;
964965
params->triggers=triggers;
965966
params->format=format;
967+
params->reqid=++reqid;
966968
pg_write_barrier();
967969

968970
/* initialize message queue that will transfer query states */
@@ -1002,9 +1004,13 @@ GetRemoteBackendQueryStates(PGPROC *leader,
10021004

10031005
/* extract query state from leader process */
10041006
mqh=shm_mq_attach(mq,NULL,NULL);
1007+
elog(DEBUG1,"Wait response from leader %d",leader->pid);
10051008
mq_receive_result=shm_mq_receive(mqh,&len, (void**)&msg, false);
10061009
if (mq_receive_result!=SHM_MQ_SUCCESS)
10071010
gotomq_error;
1011+
if (msg->reqid!=reqid)
1012+
gotomq_error;
1013+
10081014
Assert(len==msg->length);
10091015
result=lappend(result,copy_msg(msg));
10101016
#ifPG_VERSION_NUM<100000
@@ -1021,6 +1027,7 @@ GetRemoteBackendQueryStates(PGPROC *leader,
10211027
PGPROC*proc= (PGPROC*)lfirst(iter);
10221028

10231029
/* prepare message queue to transfer data */
1030+
elog(DEBUG1,"Wait response from worker %d",proc->pid);
10241031
mq=shm_mq_create(mq,QUEUE_SIZE);
10251032
shm_mq_set_sender(mq,proc);
10261033
shm_mq_set_receiver(mq,MyProc);/* this function notifies the
@@ -1034,9 +1041,12 @@ GetRemoteBackendQueryStates(PGPROC *leader,
10341041
(void**)&msg,
10351042
MAX_RCV_TIMEOUT);
10361043
if (mq_receive_result!=SHM_MQ_SUCCESS)
1044+
{
10371045
/* counterpart is died, not consider it */
1038-
continue;
1039-
1046+
gotomq_error;
1047+
}
1048+
if (msg->reqid!=reqid)
1049+
gotomq_error;
10401050
Assert(len==msg->length);
10411051

10421052
/* aggregate result data */
@@ -1054,6 +1064,11 @@ GetRemoteBackendQueryStates(PGPROC *leader,
10541064
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
10551065
errmsg("invalid send signal")));
10561066
mq_error:
1067+
#ifPG_VERSION_NUM<100000
1068+
shm_mq_detach(mq);
1069+
#else
1070+
shm_mq_detach(mqh);
1071+
#endif
10571072
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
10581073
errmsg("error in message queue data transmitting")));
10591074
}

‎pg_query_state.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ typedef enum
4242
*/
4343
typedefstruct
4444
{
45+
intreqid;
4546
intlength;/* size of message record, for sanity check */
4647
PGPROC*proc;
4748
PG_QS_RequestResultresult_code;
@@ -56,6 +57,7 @@ typedef struct
5657
/* pg_query_state arguments */
5758
typedefstruct
5859
{
60+
intreqid;
5961
boolverbose;
6062
boolcosts;
6163
booltiming;

‎signal_handler.c

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ SendQueryState(void)
160160
instr_timestart_time;
161161
instr_timecur_time;
162162
int64delay=MAX_SND_TIMEOUT;
163+
intreqid=params->reqid;
163164

164165
INSTR_TIME_SET_CURRENT(start_time);
165166

@@ -189,21 +190,21 @@ SendQueryState(void)
189190
CHECK_FOR_INTERRUPTS();
190191
ResetLatch(MyLatch);
191192
}
192-
193+
elog(DEBUG1,"Worker %d receives pg_query_state request from %d",shm_mq_get_sender(mq)->pid,shm_mq_get_receiver(mq)->pid);
193194
mqh=shm_mq_attach(mq,NULL,NULL);
194195

195196
/* check if module is enabled */
196197
if (!pg_qs_enable)
197198
{
198-
shm_mq_msgmsg= {BASE_SIZEOF_SHM_MQ_MSG,MyProc,STAT_DISABLED };
199+
shm_mq_msgmsg= {reqid,BASE_SIZEOF_SHM_MQ_MSG,MyProc,STAT_DISABLED };
199200

200201
shm_mq_send(mqh,msg.length,&msg, false);
201202
}
202203

203204
/* check if backend doesn't execute any query */
204205
elseif (list_length(QueryDescStack)==0)
205206
{
206-
shm_mq_msgmsg= {BASE_SIZEOF_SHM_MQ_MSG,MyProc,QUERY_NOT_RUNNING };
207+
shm_mq_msgmsg= {reqid,BASE_SIZEOF_SHM_MQ_MSG,MyProc,QUERY_NOT_RUNNING };
207208

208209
shm_mq_send(mqh,msg.length,&msg, false);
209210
}
@@ -215,6 +216,7 @@ SendQueryState(void)
215216
intmsglen=sizeof(shm_mq_msg)+serialized_stack_length(qs_stack);
216217
shm_mq_msg*msg=palloc(msglen);
217218

219+
msg->reqid=reqid;
218220
msg->length=msglen;
219221
msg->proc=MyProc;
220222
msg->result_code=QS_RETURNED;
@@ -229,5 +231,6 @@ SendQueryState(void)
229231
serialize_stack(msg->stack,qs_stack);
230232
shm_mq_send(mqh,msglen,msg, false);
231233
}
234+
elog(DEBUG1,"Worker %d sends response for pg_query_state to %d",shm_mq_get_sender(mq)->pid,shm_mq_get_receiver(mq)->pid);
232235
DetachPeer();
233236
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp