Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3080468

Browse files
author
Maksim Milyutin
committed
Make local version of function to release deadlock cases
1 parent6e459f8 commit3080468

File tree

4 files changed

+122
-202
lines changed

4 files changed

+122
-202
lines changed

‎Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# contrib/pg_query_state/Makefile
22

33
MODULE_big = pg_query_state
4-
OBJS = pg_query_state.o signal_handler.ouserid_rpc.o$(WIN32RES)
4+
OBJS = pg_query_state.o signal_handler.o$(WIN32RES)
55
EXTENSION = pg_query_state
66
EXTVERSION = 1.0
77
DATA =$(EXTENSION)--$(EXTVERSION).sql

‎pg_query_state.c

Lines changed: 119 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
#include"pgstat.h"
2222
#include"postmaster/bgworker.h"
2323
#include"storage/ipc.h"
24+
#include"storage/s_lock.h"
25+
#include"storage/spin.h"
2426
#include"storage/procarray.h"
2527
#include"storage/procsignal.h"
2628
#include"storage/shm_toc.h"
@@ -64,6 +66,7 @@ static void qs_postExecProcNode(PlanState *planstate, TupleTableSlot *result);
6466

6567
/* Global variables */
6668
List*QueryDescStack=NIL;
69+
staticProcSignalReasonUserIdPollReason;
6770
staticProcSignalReasonQueryStatePollReason;
6871
staticProcSignalReasonWorkerPollReason;
6972
staticboolmodule_initialized= false;
@@ -77,6 +80,13 @@ static const char*be_state_str[] = {/* BackendState -> string repr */
7780
"disabled",/* STATE_DISABLED */
7881
};
7982

83+
typedefstruct
84+
{
85+
slock_tmutex;/* protect concurrent access to `userid` */
86+
Oiduserid;
87+
Latch*caller;
88+
}RemoteUserIdResult;
89+
8090
/*
8191
* Kinds of trace commands
8292
*/
@@ -96,15 +106,17 @@ typedef struct
96106
pid_ttraceable;
97107
}trace_request;
98108

109+
staticvoidSendCurrentUserId(void);
99110
staticvoidSendWorkerPids(void);
100-
List*GetRemoteBackendWorkers(PGPROC*proc,int*error_code);
111+
staticOidGetRemoteBackendUserId(PGPROC*proc);
112+
staticList*GetRemoteBackendWorkers(PGPROC*proc,int*error_code);
101113

102114
/* Shared memory variables */
103115
shm_toc*toc=NULL;
116+
RemoteUserIdResult*counterpart_userid=NULL;
104117
pg_qs_params*params=NULL;
105118
trace_request*trace_req=NULL;
106119
shm_mq*mq=NULL;
107-
void*grbui_shm=NULL;
108120

109121
/*
110122
* Estimate amount of shared memory needed.
@@ -118,8 +130,9 @@ pg_qs_shmem_size()
118130

119131
shm_toc_initialize_estimator(&e);
120132

121-
nkeys=3;
133+
nkeys=4;
122134

135+
shm_toc_estimate_chunk(&e,sizeof(RemoteUserIdResult));
123136
shm_toc_estimate_chunk(&e,sizeof(trace_request));
124137
shm_toc_estimate_chunk(&e,sizeof(pg_qs_params));
125138
shm_toc_estimate_chunk(&e, (Size)QUEUE_SIZE);
@@ -146,25 +159,30 @@ pg_qs_shmem_startup(void)
146159
{
147160
toc=shm_toc_create(PG_QS_MODULE_KEY,shmem,shmem_size);
148161

162+
counterpart_userid=shm_toc_allocate(toc,sizeof(RemoteUserIdResult));
163+
shm_toc_insert(toc,num_toc++,counterpart_userid);
164+
SpinLockInit(&counterpart_userid->mutex);
165+
149166
params=shm_toc_allocate(toc,sizeof(pg_qs_params));
150167
shm_toc_insert(toc,num_toc++,params);
168+
151169
trace_req=shm_toc_allocate(toc,sizeof(trace_request));
152170
shm_toc_insert(toc,num_toc++,trace_req);
153171
MemSet(trace_req,0,sizeof(trace_request));
172+
154173
mq=shm_toc_allocate(toc,QUEUE_SIZE);
155174
shm_toc_insert(toc,num_toc++,mq);
156175
}
157176
else
158177
{
159178
toc=shm_toc_attach(PG_QS_MODULE_KEY,shmem);
160179

180+
counterpart_userid=shm_toc_lookup(toc,num_toc++);
161181
params=shm_toc_lookup(toc,num_toc++);
162182
trace_req=shm_toc_lookup(toc,num_toc++);
163183
mq=shm_toc_lookup(toc,num_toc++);
164184
}
165185

166-
uirpcShmemInit();
167-
168186
if (prev_shmem_startup_hook)
169187
prev_shmem_startup_hook();
170188

@@ -185,13 +203,15 @@ _PG_init(void)
185203
* the postmaster process.) We'll allocate or attach to the shared
186204
* resources in qs_shmem_startup().
187205
*/
188-
RequestAddinShmemSpace(pg_qs_shmem_size()+uirpcEstimateShmemSize());
206+
RequestAddinShmemSpace(pg_qs_shmem_size());
189207

190208
/* Register interrupt on custom signal of polling query state */
191-
RegisterGetRemoteBackendUserId();
209+
UserIdPollReason=RegisterCustomProcSignalHandler(SendCurrentUserId);
192210
QueryStatePollReason=RegisterCustomProcSignalHandler(SendQueryState);
193211
WorkerPollReason=RegisterCustomProcSignalHandler(SendWorkerPids);
194-
if (QueryStatePollReason==INVALID_PROCSIGNAL||WorkerPollReason==INVALID_PROCSIGNAL)
212+
if (QueryStatePollReason==INVALID_PROCSIGNAL
213+
||WorkerPollReason==INVALID_PROCSIGNAL
214+
||UserIdPollReason==INVALID_PROCSIGNAL)
195215
{
196216
ereport(WARNING, (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
197217
errmsg("pg_query_state isn't loaded: insufficient custom ProcSignal slots")));
@@ -580,11 +600,6 @@ pg_query_state(PG_FUNCTION_ARGS)
580600
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
581601
errmsg("backend with pid=%d not found",pid)));
582602

583-
counterpart_user_id=GetRemoteBackendUserId(proc);
584-
if (!(superuser()||GetUserId()==counterpart_user_id))
585-
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
586-
errmsg("permission denied")));
587-
588603
if (TEXT_CSTR_CMP(format_text,"text")==0)
589604
format=EXPLAIN_FORMAT_TEXT;
590605
elseif (TEXT_CSTR_CMP(format_text,"xml")==0)
@@ -603,6 +618,11 @@ pg_query_state(PG_FUNCTION_ARGS)
603618
init_lock_tag(&tag,PG_QUERY_STATE_KEY);
604619
LockAcquire(&tag,ExclusiveLock, false, false);
605620

621+
counterpart_user_id=GetRemoteBackendUserId(proc);
622+
if (!(superuser()||GetUserId()==counterpart_user_id))
623+
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
624+
errmsg("permission denied")));
625+
606626
/* fill in parameters of query state request */
607627
params->verbose=verbose;
608628
params->costs=costs;
@@ -800,6 +820,50 @@ executor_continue(PG_FUNCTION_ARGS)
800820
PG_RETURN_VOID();
801821
}
802822

823+
staticvoid
824+
SendCurrentUserId(void)
825+
{
826+
SpinLockAcquire(&counterpart_userid->mutex);
827+
counterpart_userid->userid=GetUserId();
828+
SpinLockRelease(&counterpart_userid->mutex);
829+
830+
SetLatch(counterpart_userid->caller);
831+
}
832+
833+
/*
834+
* Extract effective user id from backend on which `proc` points.
835+
*
836+
* Assume the `proc` points on valid backend and it's not current process.
837+
*
838+
* This fuction must be called after registeration of `UserIdPollReason` and
839+
* initialization `RemoteUserIdResult` object in shared memory.
840+
*/
841+
staticOid
842+
GetRemoteBackendUserId(PGPROC*proc)
843+
{
844+
Oidresult;
845+
846+
counterpart_userid->userid=InvalidOid;
847+
counterpart_userid->caller=MyLatch;
848+
849+
SendProcSignal(proc->pid,UserIdPollReason,proc->backendId);
850+
for (;;)
851+
{
852+
SpinLockAcquire(&counterpart_userid->mutex);
853+
result=counterpart_userid->userid;
854+
SpinLockRelease(&counterpart_userid->mutex);
855+
856+
if (result!=InvalidOid)
857+
break;
858+
859+
WaitLatch(MyLatch,WL_LATCH_SET,0);
860+
CHECK_FOR_INTERRUPTS();
861+
ResetLatch(MyLatch);
862+
}
863+
864+
returnresult;
865+
}
866+
803867
staticbool
804868
extract_worker_handles(PlanState*node,List**result)
805869
{
@@ -866,43 +930,45 @@ SendWorkerPids(void)
866930
shm_mq_send(mqh,msg_len,msg, false);
867931
}
868932

869-
// List *
870-
// GetRemoteBackendWorkers(PGPROC *proc, int *error_code)
871-
// {
872-
// intsig_result;
873-
// shm_mq_handle*mqh;
874-
// shm_mq_result mq_receive_result;
875-
// workers_msg*msg;
876-
// Sizemsg_len;
877-
// inti;
878-
// List*result = NIL;
879-
880-
// if (proc->backendId == InvalidBackendId)
881-
// {
882-
// return NIL;
883-
// }
884-
885-
// mq = shm_mq_create(mq, QUEUE_SIZE);
886-
// shm_mq_set_sender(mq, proc);
887-
// shm_mq_set_receiver(mq, MyProc);
888-
889-
// sig_result = SendProcSignal(proc->pid, WorkerPollReason, proc->backendId);
890-
// if (sig_result == -1)
891-
// {
892-
// return NIL;
893-
// }
894-
895-
// mqh = shm_mq_attach(mq, NULL, NULL);
896-
// mq_receive_result = shm_mq_receive_with_timeout(mqh, &msg_len, (void **) &msg, 1000);
897-
// if (mq_receive_result != SHM_MQ_SUCCESS)
898-
// {
899-
// return NIL;
900-
// }
901-
902-
// for (i = 0; i < msg->num; i++)
903-
// result = lcons_int(msg->pids[i], result);
904-
905-
// shm_mq_detach(mq);
906-
907-
// return result;
908-
// }
933+
/*
934+
List *
935+
GetRemoteBackendWorkers(PGPROC *proc, int *error_code)
936+
{
937+
intsig_result;
938+
shm_mq_handle*mqh;
939+
shm_mq_result mq_receive_result;
940+
workers_msg*msg;
941+
Sizemsg_len;
942+
inti;
943+
List*result = NIL;
944+
945+
if (proc->backendId == InvalidBackendId)
946+
{
947+
return NIL;
948+
}
949+
950+
mq = shm_mq_create(mq, QUEUE_SIZE);
951+
shm_mq_set_sender(mq, proc);
952+
shm_mq_set_receiver(mq, MyProc);
953+
954+
sig_result = SendProcSignal(proc->pid, WorkerPollReason, proc->backendId);
955+
if (sig_result == -1)
956+
{
957+
return NIL;
958+
}
959+
960+
mqh = shm_mq_attach(mq, NULL, NULL);
961+
mq_receive_result = shm_mq_receive_with_timeout(mqh, &msg_len, (void **) &msg, 1000);
962+
if (mq_receive_result != SHM_MQ_SUCCESS)
963+
{
964+
return NIL;
965+
}
966+
967+
for (i = 0; i < msg->num; i++)
968+
result = lcons_int(msg->pids[i], result);
969+
970+
shm_mq_detach(mq);
971+
972+
return result;
973+
}
974+
*/

‎pg_query_state.h

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818

1919
#defineQUEUE_SIZE(16 * 1024)
2020

21-
#defineTIMINIG_OFF_WARNING0b01
22-
#defineBUFFERS_OFF_WARNING0b10
21+
#defineTIMINIG_OFF_WARNING1
22+
#defineBUFFERS_OFF_WARNING2
2323

2424
/*
2525
* Result status on query state request from asked backend
@@ -67,9 +67,5 @@ extern shm_mq *mq;
6767

6868
/* signal_handler.c */
6969
externvoidSendQueryState(void);
70-
externvoidRegisterGetRemoteBackendUserId(void);
71-
externOidGetRemoteBackendUserId(PGPROC*proc);
72-
externSizeuirpcEstimateShmemSize(void);
73-
externvoiduirpcShmemInit(void);
7470

7571
#endif

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp