Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf5201f8

Browse files
author
Maksim Milyutin
committed
Add possibility to send broadcast request on getting query state from leader and its workers
1 parentb132ee1 commitf5201f8

File tree

2 files changed

+61
-21
lines changed

2 files changed

+61
-21
lines changed

‎pg_query_state.c

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1065,7 +1065,8 @@ GetRemoteBackendQueryStates(List *procs,
10651065
ExplainFormatformat)
10661066
{
10671067
List*result=NIL;
1068-
ListCell*i;
1068+
List*alive_procs=NIL;
1069+
ListCell*iter;
10691070

10701071
Assert(QueryStatePollReason!=INVALID_PROCSIGNAL);
10711072
Assert(mq);
@@ -1079,41 +1080,67 @@ GetRemoteBackendQueryStates(List *procs,
10791080
params->format=format;
10801081
pg_write_barrier();
10811082

1082-
foreach(i,procs)
1083+
/*
1084+
* send signal `QueryStatePollReason` to all processes and define all alive
1085+
* ones
1086+
*/
1087+
foreach(iter,procs)
10831088
{
1084-
PGPROC*proc= (PGPROC*)lfirst(i);
1085-
shm_mq_msg*msg;
1089+
PGPROC*proc= (PGPROC*)lfirst(iter);
1090+
intsig_result;
1091+
1092+
sig_result=SendProcSignal(proc->pid,
1093+
QueryStatePollReason,
1094+
proc->backendId);
1095+
if (sig_result==-1)
1096+
{
1097+
if (errno!=ESRCH)
1098+
gotosignal_error;
1099+
continue;
1100+
}
1101+
1102+
alive_procs=lappend(alive_procs,proc);
1103+
}
1104+
1105+
/*
1106+
* collect results from all alived processes
1107+
*/
1108+
foreach(iter,alive_procs)
1109+
{
1110+
PGPROC*proc= (PGPROC*)lfirst(iter);
10861111
shm_mq_handle*mqh;
10871112
shm_mq_resultmq_receive_result;
1088-
intsig_result;
1113+
shm_mq_msg*msg;
10891114
Sizelen;
10901115

1091-
Assert(proc&&proc->backendId!=InvalidBackendId);
1092-
10931116
/* prepare message queue to transfer data */
10941117
mq=shm_mq_create(mq,QUEUE_SIZE);
10951118
shm_mq_set_sender(mq,proc);
1096-
shm_mq_set_receiver(mq,MyProc);
1097-
1098-
/* send signal to specified backend to extract its state */
1099-
sig_result=SendProcSignal(proc->pid,QueryStatePollReason,proc->backendId);
1100-
if (sig_result==-1)
1101-
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
1102-
errmsg("invalid send signal")));
1119+
shm_mq_set_receiver(mq,MyProc);/* this function notifies the
1120+
counterpart to come into data
1121+
transfer */
11031122

1104-
/* retrieve data from message queue */
1123+
/* retrieveresultdata from message queue */
11051124
mqh=shm_mq_attach(mq,NULL,NULL);
1106-
mq_receive_result=shm_mq_receive_with_timeout(mqh,&len, (void**)&msg,5000);
1125+
mq_receive_result=shm_mq_receive_with_timeout(mqh,
1126+
&len,
1127+
(void**)&msg,
1128+
5000);
11071129
if (mq_receive_result!=SHM_MQ_SUCCESS)
1108-
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
1109-
errmsg("invalid read from message queue")));
1130+
/* counterpart is died, not consider it */
1131+
continue;
11101132

1133+
Assert(len==msg->length);
1134+
1135+
/* aggregate result data */
11111136
result=lappend(result,copy_msg(msg));
11121137

11131138
shm_mq_detach(mq);
1114-
1115-
Assert(len==msg->length);
11161139
}
11171140

11181141
returnresult;
1142+
1143+
signal_error:
1144+
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
1145+
errmsg("invalid send signal")));
11191146
}

‎signal_handler.c

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,20 @@ serialize_stack(char *dest, List *qs_stack)
153153
void
154154
SendQueryState(void)
155155
{
156-
shm_mq_handle*mqh=shm_mq_attach(mq,NULL,NULL);
156+
shm_mq_handle*mqh;
157+
158+
/* wait until caller sets this process as sender to message queue */
159+
for (;;)
160+
{
161+
if (shm_mq_get_sender(mq)==MyProc)
162+
break;
163+
164+
WaitLatch(MyLatch,WL_LATCH_SET,0);
165+
CHECK_FOR_INTERRUPTS();
166+
ResetLatch(MyLatch);
167+
}
168+
169+
mqh=shm_mq_attach(mq,NULL,NULL);
157170

158171
/* check if module is enabled */
159172
if (!pg_qs_enable)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp