Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit2a8f9cd

Browse files
author
Maksim Milyutin
committed
Add function for extracting running workers launching from another backend
1 parentf3bb0b7 commit2a8f9cd

File tree

1 file changed

+140
-25
lines changed

1 file changed

+140
-25
lines changed

‎pg_query_state.c

Lines changed: 140 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,22 @@
44
*
55
* Copyright (c) 2016-2016, Postgres Professional
66
*
7-
* IDENTIFICATION
87
* contrib/pg_query_state/pg_query_state.c
8+
* IDENTIFICATION
99
*/
1010

1111
#include"pg_query_state.h"
1212

1313
#include"access/htup_details.h"
1414
#include"catalog/pg_type.h"
1515
#include"funcapi.h"
16+
#include"executor/execParallel.h"
1617
#include"executor/executor.h"
1718
#include"miscadmin.h"
19+
#include"nodes/nodeFuncs.h"
20+
#include"nodes/print.h"
1821
#include"pgstat.h"
22+
#include"postmaster/bgworker.h"
1923
#include"storage/ipc.h"
2024
#include"storage/procarray.h"
2125
#include"storage/procsignal.h"
@@ -63,6 +67,7 @@ static void qs_postExecProcNode(PlanState *planstate, TupleTableSlot *result);
6367
List*QueryDescStack=NIL;
6468
staticProcSignalReasonQueryStatePollReason;
6569
staticProcSignalReasonRolePollReason;
70+
staticProcSignalReasonWorkerPollReason;
6671
staticboolmodule_initialized= false;
6772
staticconstchar*be_state_str[]= {/* BackendState -> string repr */
6873
"undefined",/* STATE_UNDEFINED */
@@ -93,8 +98,10 @@ typedef struct
9398
pid_ttraceable;
9499
}trace_request;
95100

96-
staticvoidSendCurrentRoleOid(void);
97-
OidGetRemoteBackendUser(pid_tpid,int*error_code);
101+
staticvoidSendCurrentUserId(void);
102+
OidGetRemoteBackendUserId(PGPROC*proc,int*error_code);
103+
staticvoidSendWorkerPids(void);
104+
List*GetRemoteBackendWorkers(PGPROC*proc,int*error_code);
98105

99106
/* Shared memory variables */
100107
shm_toc*toc=NULL;
@@ -187,8 +194,10 @@ _PG_init(void)
187194

188195
/* Register interrupt on custom signal of polling query state */
189196
QueryStatePollReason=RegisterCustomProcSignalHandler(SendQueryState);
190-
RolePollReason=RegisterCustomProcSignalHandler(SendCurrentRoleOid);
191-
if (QueryStatePollReason==INVALID_PROCSIGNAL||RolePollReason==INVALID_PROCSIGNAL)
197+
RolePollReason=RegisterCustomProcSignalHandler(SendCurrentUserId);
198+
WorkerPollReason=RegisterCustomProcSignalHandler(SendWorkerPids);
199+
if (QueryStatePollReason==INVALID_PROCSIGNAL||RolePollReason==INVALID_PROCSIGNAL
200+
||WorkerPollReason==INVALID_PROCSIGNAL)
192201
{
193202
ereport(WARNING, (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
194203
errmsg("pg_query_state isn't loaded: insufficient custom ProcSignal slots")));
@@ -615,9 +624,6 @@ pg_query_state(PG_FUNCTION_ARGS)
615624
init_lock_tag(&tag,PG_QUERY_STATE_KEY);
616625
LockAcquire(&tag,ExclusiveLock, false, false);
617626

618-
interror_code;
619-
Oiduser_id=GetRemoteBackendUser(pid,&error_code);
620-
621627
/* fill in caller's user data */
622628
caller->user_id=GetUserId();
623629
caller->superuser=superuser();
@@ -827,56 +833,55 @@ shm_mq_receive_with_timeout(shm_mq_handle *mqh, Size *nbytesp, void **datap, lon
827833
{
828834

829835
#ifdefHAVE_INT64_TIMESTAMP
830-
#defineGetNowLong()((long) GetCurrentTimestamp() / 1000)
836+
#defineGetNowFloat()((float8) GetCurrentTimestamp() / 1000.0)
831837
#else
832-
#defineGetNowLong()1000 * GetCurrentTimestamp()
838+
#defineGetNowFloat()1000.0 * GetCurrentTimestamp()
833839
#endif
834840

835-
longendtime=GetNowLong()+timeout;
841+
float8endtime=GetNowFloat()+timeout;
842+
intrc=0;
836843

837844
for (;;)
838845
{
839-
intrc;
840846
longdelay;
841847
shm_mq_resultmq_receive_result=shm_mq_receive(mqh,nbytesp,datap, true);
842848

843849
if (mq_receive_result!=SHM_MQ_WOULD_BLOCK)
844850
returnmq_receive_result;
845851

846-
delay=endtime-GetNowLong();
852+
if (rc&WL_TIMEOUT)
853+
returnSHM_MQ_WOULD_BLOCK;
854+
855+
delay= (long) (endtime-GetNowFloat());
847856
rc=WaitLatch(MyLatch,WL_LATCH_SET |WL_TIMEOUT,delay);
848857
CHECK_FOR_INTERRUPTS();
849858
ResetLatch(MyLatch);
850-
851-
if (rc&WL_TIMEOUT)
852-
returnSHM_MQ_WOULD_BLOCK;
853859
}
854860
}
855861

856862
staticvoid
857-
SendCurrentRoleOid(void)
863+
SendCurrentUserId(void)
858864
{
859865
shm_mq_handle*mqh=shm_mq_attach(mq,NULL,NULL);
860-
Oidrole_oid=GetUserId();
866+
Oiduser_oid=GetUserId();
861867

862-
shm_mq_send(mqh,sizeof(Oid),&role_oid, false);
868+
shm_mq_send(mqh,sizeof(Oid),&user_oid, false);
863869
}
864870

865871
#defineNOT_BACKEND_PROCESS1
866872
#defineCOULD_NOT_SEND_SIGNAL 2
867873
#defineINVALID_MQ_READ3
868874

869875
Oid
870-
GetRemoteBackendUser(pid_tpid,int*error_code)
876+
GetRemoteBackendUserId(PGPROC*proc,int*error_code)
871877
{
872-
PGPROC*proc=BackendPidGetProc(pid);
873878
intsig_result;
874-
shm_mq_handle*mqh;
875-
shm_mq_resultmq_receive_result;
879+
shm_mq_handle*mqh;
880+
shm_mq_resultmq_receive_result;
876881
Oid*result;
877882
Sizeres_len;
878883

879-
if (proc==NULL||proc->backendId==InvalidBackendId)
884+
if (proc->backendId==InvalidBackendId)
880885
{
881886
*error_code=NOT_BACKEND_PROCESS;
882887
returnInvalidOid;
@@ -886,7 +891,7 @@ GetRemoteBackendUser(pid_t pid, int *error_code)
886891
shm_mq_set_sender(mq,proc);
887892
shm_mq_set_receiver(mq,MyProc);
888893

889-
sig_result=SendProcSignal(pid,RolePollReason,proc->backendId);
894+
sig_result=SendProcSignal(proc->pid,RolePollReason,proc->backendId);
890895
if (sig_result==-1)
891896
{
892897
*error_code=COULD_NOT_SEND_SIGNAL;
@@ -905,3 +910,113 @@ GetRemoteBackendUser(pid_t pid, int *error_code)
905910

906911
return*result;
907912
}
913+
914+
staticbool
915+
extract_worker_handles(PlanState*node,List**result)
916+
{
917+
if (node==NULL)
918+
return false;
919+
920+
if (IsA(node,GatherState))
921+
{
922+
GatherState*gather_node= (GatherState*)node;
923+
inti;
924+
925+
if (gather_node->pei)
926+
{
927+
for (i=0;i<gather_node->pei->pcxt->nworkers_launched;i++)
928+
{
929+
pid_tpid;
930+
BackgroundWorkerHandle*bgwh=gather_node->pei->pcxt->worker[i].bgwhandle;
931+
BgwHandleStatusstatus;
932+
933+
if (!bgwh)
934+
continue;
935+
936+
status=GetBackgroundWorkerPid(bgwh,&pid);
937+
if (status==BGWH_STARTED)
938+
*result=lcons_int(pid,*result);
939+
}
940+
}
941+
}
942+
returnplanstate_tree_walker(node,extract_worker_handles, (void*)result);
943+
}
944+
945+
typedefstruct
946+
{
947+
intnum;
948+
pid_tpids[FLEXIBLE_ARRAY_MEMBER];
949+
}workers_msg;
950+
951+
staticvoid
952+
SendWorkerPids(void)
953+
{
954+
ListCell*iter;
955+
List*all_workers=NIL;
956+
workers_msg*msg;
957+
intmsg_len;
958+
inti;
959+
shm_mq_handle*mqh=shm_mq_attach(mq,NULL,NULL);
960+
961+
foreach(iter,QueryDescStack)
962+
{
963+
QueryDesc*curQueryDesc= (QueryDesc*)lfirst(iter);
964+
List*bgworker_pids=NIL;
965+
966+
extract_worker_handles(curQueryDesc->planstate,&bgworker_pids);
967+
all_workers=list_concat(all_workers,bgworker_pids);
968+
}
969+
970+
msg_len= offsetof(workers_msg,pids)+sizeof(pid_t)*list_length(all_workers);
971+
msg=palloc(msg_len);
972+
msg->num=list_length(all_workers);
973+
i=0;
974+
foreach(iter,all_workers)
975+
msg->pids[i++]=lfirst_int(iter);
976+
977+
shm_mq_send(mqh,msg_len,msg, false);
978+
}
979+
980+
List*
981+
GetRemoteBackendWorkers(PGPROC*proc,int*error_code)
982+
{
983+
intsig_result;
984+
shm_mq_handle*mqh;
985+
shm_mq_resultmq_receive_result;
986+
workers_msg*msg;
987+
Sizemsg_len;
988+
inti;
989+
List*result=NIL;
990+
991+
if (proc->backendId==InvalidBackendId)
992+
{
993+
*error_code=NOT_BACKEND_PROCESS;
994+
returnInvalidOid;
995+
}
996+
997+
mq=shm_mq_create(mq,QUEUE_SIZE);
998+
shm_mq_set_sender(mq,proc);
999+
shm_mq_set_receiver(mq,MyProc);
1000+
1001+
sig_result=SendProcSignal(proc->pid,WorkerPollReason,proc->backendId);
1002+
if (sig_result==-1)
1003+
{
1004+
*error_code=COULD_NOT_SEND_SIGNAL;
1005+
returnInvalidOid;
1006+
}
1007+
1008+
mqh=shm_mq_attach(mq,NULL,NULL);
1009+
mq_receive_result=shm_mq_receive_with_timeout(mqh,&msg_len, (void**)&msg,1000);
1010+
if (mq_receive_result!=SHM_MQ_SUCCESS)
1011+
{
1012+
*error_code=INVALID_MQ_READ;
1013+
returnInvalidOid;
1014+
}
1015+
1016+
for (i=0;i<msg->num;i++)
1017+
result=lcons_int(msg->pids[i],result);
1018+
1019+
shm_mq_detach(mq);
1020+
1021+
returnresult;
1022+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp