Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitaeab11f

Browse files
author
Maksim Milyutin
committed
Split GetRemoteBackendUserId functionality
1 parent7604feb commitaeab11f

File tree

4 files changed

+192
-133
lines changed

4 files changed

+192
-133
lines changed

‎Makefile‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# contrib/pg_query_state/Makefile
22

33
MODULE_big = pg_query_state
4-
OBJS = pg_query_state.o signal_handler.o$(WIN32RES)
4+
OBJS = pg_query_state.o signal_handler.ouserid_rpc.o$(WIN32RES)
55
EXTENSION = pg_query_state
66
EXTVERSION = 1.0
77
DATA =$(EXTENSION)--$(EXTVERSION).sql

‎pg_query_state.c‎

Lines changed: 51 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/*
22
* pg_query_state.c
3-
*Extract information about query stateof other backend
3+
*Extract information about query statefrom other backend
44
*
55
* Copyright (c) 2016-2016, Postgres Professional
66
*
@@ -31,7 +31,6 @@
3131
PG_MODULE_MAGIC;
3232
#endif
3333

34-
#defineQUEUE_SIZE(16 * 1024)
3534
#definePG_QS_MODULE_KEY0xCA94B108
3635
#definePG_QUERY_STATE_KEY0
3736
#defineEXECUTOR_TRACE_KEY1
@@ -66,7 +65,6 @@ static void qs_postExecProcNode(PlanState *planstate, TupleTableSlot *result);
6665
/* Global variables */
6766
List*QueryDescStack=NIL;
6867
staticProcSignalReasonQueryStatePollReason;
69-
staticProcSignalReasonRolePollReason;
7068
staticProcSignalReasonWorkerPollReason;
7169
staticboolmodule_initialized= false;
7270
staticconstchar*be_state_str[]= {/* BackendState -> string repr */
@@ -98,8 +96,6 @@ typedef struct
9896
pid_ttraceable;
9997
}trace_request;
10098

101-
staticvoidSendCurrentUserId(void);
102-
OidGetRemoteBackendUserId(PGPROC*proc);
10399
staticvoidSendWorkerPids(void);
104100
List*GetRemoteBackendWorkers(PGPROC*proc,int*error_code);
105101

@@ -108,6 +104,7 @@ shm_toc*toc = NULL;
108104
pg_qs_params*params=NULL;
109105
trace_request*trace_req=NULL;
110106
shm_mq*mq=NULL;
107+
void*grbui_shm=NULL;
111108

112109
/*
113110
* Estimate amount of shared memory needed.
@@ -121,11 +118,12 @@ pg_qs_shmem_size()
121118

122119
shm_toc_initialize_estimator(&e);
123120

124-
nkeys=3;
121+
nkeys=4;
125122

126123
shm_toc_estimate_chunk(&e,sizeof(trace_request));
127124
shm_toc_estimate_chunk(&e,sizeof(pg_qs_params));
128125
shm_toc_estimate_chunk(&e, (Size)QUEUE_SIZE);
126+
shm_toc_estimate_chunk(&e,grbui_EstimateShmemSize());
129127

130128
shm_toc_estimate_keys(&e,nkeys);
131129
size=shm_toc_estimate(&e);
@@ -156,6 +154,8 @@ pg_qs_shmem_startup(void)
156154
MemSet(trace_req,0,sizeof(trace_request));
157155
mq=shm_toc_allocate(toc,QUEUE_SIZE);
158156
shm_toc_insert(toc,num_toc++,mq);
157+
grbui_shm=shm_toc_allocate(toc,grbui_EstimateShmemSize());
158+
shm_toc_insert(toc,num_toc++,grbui_shm);
159159
}
160160
else
161161
{
@@ -164,7 +164,9 @@ pg_qs_shmem_startup(void)
164164
params=shm_toc_lookup(toc,num_toc++);
165165
trace_req=shm_toc_lookup(toc,num_toc++);
166166
mq=shm_toc_lookup(toc,num_toc++);
167+
grbui_shm=shm_toc_lookup(toc,num_toc++);
167168
}
169+
grbui_ShmemInit(grbui_shm,found);
168170

169171
if (prev_shmem_startup_hook)
170172
prev_shmem_startup_hook();
@@ -186,14 +188,13 @@ _PG_init(void)
186188
* the postmaster process.) We'll allocate or attach to the shared
187189
* resources in qs_shmem_startup().
188190
*/
189-
RequestAddinShmemSpace(QUEUE_SIZE);
191+
RequestAddinShmemSpace(pg_qs_shmem_size());
190192

191193
/* Register interrupt on custom signal of polling query state */
194+
RegisterGetRemoteBackendUserId();
192195
QueryStatePollReason=RegisterCustomProcSignalHandler(SendQueryState);
193-
RolePollReason=RegisterCustomProcSignalHandler(SendCurrentUserId);
194196
WorkerPollReason=RegisterCustomProcSignalHandler(SendWorkerPids);
195-
if (QueryStatePollReason==INVALID_PROCSIGNAL||RolePollReason==INVALID_PROCSIGNAL
196-
||WorkerPollReason==INVALID_PROCSIGNAL)
197+
if (QueryStatePollReason==INVALID_PROCSIGNAL||WorkerPollReason==INVALID_PROCSIGNAL)
197198
{
198199
ereport(WARNING, (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
199200
errmsg("pg_query_state isn't loaded: insufficient custom ProcSignal slots")));
@@ -802,85 +803,6 @@ executor_continue(PG_FUNCTION_ARGS)
802803
PG_RETURN_VOID();
803804
}
804805

805-
staticshm_mq_result
806-
shm_mq_receive_with_timeout(shm_mq_handle*mqh,Size*nbytesp,void**datap,longtimeout)
807-
{
808-
809-
#ifdefHAVE_INT64_TIMESTAMP
810-
#defineGetNowFloat()((float8) GetCurrentTimestamp() / 1000.0)
811-
#else
812-
#defineGetNowFloat()1000.0 * GetCurrentTimestamp()
813-
#endif
814-
815-
float8endtime=GetNowFloat()+timeout;
816-
intrc=0;
817-
818-
for (;;)
819-
{
820-
longdelay;
821-
shm_mq_resultmq_receive_result=shm_mq_receive(mqh,nbytesp,datap, true);
822-
823-
if (mq_receive_result!=SHM_MQ_WOULD_BLOCK)
824-
returnmq_receive_result;
825-
826-
if (rc&WL_TIMEOUT)
827-
returnSHM_MQ_WOULD_BLOCK;
828-
829-
delay= (long) (endtime-GetNowFloat());
830-
rc=WaitLatch(MyLatch,WL_LATCH_SET |WL_TIMEOUT,delay);
831-
CHECK_FOR_INTERRUPTS();
832-
ResetLatch(MyLatch);
833-
}
834-
}
835-
836-
staticvoid
837-
SendCurrentUserId(void)
838-
{
839-
shm_mq_handle*mqh=shm_mq_attach(mq,NULL,NULL);
840-
Oiduser_oid=GetUserId();
841-
842-
shm_mq_send(mqh,sizeof(Oid),&user_oid, false);
843-
}
844-
845-
#defineNOT_BACKEND_PROCESS1
846-
#defineCOULD_NOT_SEND_SIGNAL 2
847-
#defineINVALID_MQ_READ3
848-
849-
/*
850-
* Extract effective user id of external backend session
851-
* Assume `proc` is valid backend and doesn't point to current process
852-
*/
853-
Oid
854-
GetRemoteBackendUserId(PGPROC*proc)
855-
{
856-
intsig_result;
857-
shm_mq_handle*mqh;
858-
shm_mq_resultmq_receive_result;
859-
Oid*result;
860-
Sizeres_len;
861-
862-
Assert(proc&&proc!=MyProc&&proc->backendId!=InvalidBackendId);
863-
864-
mq=shm_mq_create(mq,QUEUE_SIZE);
865-
shm_mq_set_sender(mq,proc);
866-
shm_mq_set_receiver(mq,MyProc);
867-
868-
sig_result=SendProcSignal(proc->pid,RolePollReason,proc->backendId);
869-
if (sig_result==-1)
870-
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
871-
errmsg("invalid send signal")));
872-
873-
mqh=shm_mq_attach(mq,NULL,NULL);
874-
mq_receive_result=shm_mq_receive_with_timeout(mqh,&res_len, (void**)&result,1000);
875-
if (mq_receive_result!=SHM_MQ_SUCCESS)
876-
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
877-
errmsg("invalid read from message queue")));
878-
879-
shm_mq_detach(mq);
880-
881-
return*result;
882-
}
883-
884806
staticbool
885807
extract_worker_handles(PlanState*node,List**result)
886808
{
@@ -947,46 +869,43 @@ SendWorkerPids(void)
947869
shm_mq_send(mqh,msg_len,msg, false);
948870
}
949871

950-
List*
951-
GetRemoteBackendWorkers(PGPROC*proc,int*error_code)
952-
{
953-
intsig_result;
954-
shm_mq_handle*mqh;
955-
shm_mq_resultmq_receive_result;
956-
workers_msg*msg;
957-
Sizemsg_len;
958-
inti;
959-
List*result=NIL;
960-
961-
if (proc->backendId==InvalidBackendId)
962-
{
963-
*error_code=NOT_BACKEND_PROCESS;
964-
returnNIL;
965-
}
966-
967-
mq=shm_mq_create(mq,QUEUE_SIZE);
968-
shm_mq_set_sender(mq,proc);
969-
shm_mq_set_receiver(mq,MyProc);
970-
971-
sig_result=SendProcSignal(proc->pid,WorkerPollReason,proc->backendId);
972-
if (sig_result==-1)
973-
{
974-
*error_code=COULD_NOT_SEND_SIGNAL;
975-
returnNIL;
976-
}
977-
978-
mqh=shm_mq_attach(mq,NULL,NULL);
979-
mq_receive_result=shm_mq_receive_with_timeout(mqh,&msg_len, (void**)&msg,1000);
980-
if (mq_receive_result!=SHM_MQ_SUCCESS)
981-
{
982-
*error_code=INVALID_MQ_READ;
983-
returnNIL;
984-
}
985-
986-
for (i=0;i<msg->num;i++)
987-
result=lcons_int(msg->pids[i],result);
988-
989-
shm_mq_detach(mq);
990-
991-
returnresult;
992-
}
872+
// List *
873+
// GetRemoteBackendWorkers(PGPROC *proc, int *error_code)
874+
// {
875+
// intsig_result;
876+
// shm_mq_handle*mqh;
877+
// shm_mq_result mq_receive_result;
878+
// workers_msg*msg;
879+
// Sizemsg_len;
880+
// inti;
881+
// List*result = NIL;
882+
883+
// if (proc->backendId == InvalidBackendId)
884+
// {
885+
// return NIL;
886+
// }
887+
888+
// mq = shm_mq_create(mq, QUEUE_SIZE);
889+
// shm_mq_set_sender(mq, proc);
890+
// shm_mq_set_receiver(mq, MyProc);
891+
892+
// sig_result = SendProcSignal(proc->pid, WorkerPollReason, proc->backendId);
893+
// if (sig_result == -1)
894+
// {
895+
// return NIL;
896+
// }
897+
898+
// mqh = shm_mq_attach(mq, NULL, NULL);
899+
// mq_receive_result = shm_mq_receive_with_timeout(mqh, &msg_len, (void **) &msg, 1000);
900+
// if (mq_receive_result != SHM_MQ_SUCCESS)
901+
// {
902+
// return NIL;
903+
// }
904+
905+
// for (i = 0; i < msg->num; i++)
906+
// result = lcons_int(msg->pids[i], result);
907+
908+
// shm_mq_detach(mq);
909+
910+
// return result;
911+
// }

‎pg_query_state.h‎

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
#include"nodes/pg_list.h"
1717
#include"storage/shm_mq.h"
1818

19+
#defineQUEUE_SIZE(16 * 1024)
20+
1921
#defineTIMINIG_OFF_WARNING 0b01
2022
#defineBUFFERS_OFF_WARNING 0b10
2123

@@ -65,5 +67,9 @@ extern shm_mq *mq;
6567

6668
/* signal_handler.c */
6769
externvoidSendQueryState(void);
70+
externvoidRegisterGetRemoteBackendUserId(void);
71+
externOidGetRemoteBackendUserId(PGPROC*proc);
72+
externSizegrbui_EstimateShmemSize(void);
73+
externvoidgrbui_ShmemInit(void*address,boolinitialized);
6874

6975
#endif

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp