Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb383d38

Browse files
author
Maksim Milyutin
committed
Replace mq to transfer user id on shmem slot with lwlock
1 parentaeab11f commitb383d38

File tree

3 files changed

+50
-43
lines changed

3 files changed

+50
-43
lines changed

‎pg_query_state.c

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -118,12 +118,11 @@ pg_qs_shmem_size()
118118

119119
shm_toc_initialize_estimator(&e);
120120

121-
nkeys=4;
121+
nkeys=3;
122122

123123
shm_toc_estimate_chunk(&e,sizeof(trace_request));
124124
shm_toc_estimate_chunk(&e,sizeof(pg_qs_params));
125125
shm_toc_estimate_chunk(&e, (Size)QUEUE_SIZE);
126-
shm_toc_estimate_chunk(&e,grbui_EstimateShmemSize());
127126

128127
shm_toc_estimate_keys(&e,nkeys);
129128
size=shm_toc_estimate(&e);
@@ -154,8 +153,6 @@ pg_qs_shmem_startup(void)
154153
MemSet(trace_req,0,sizeof(trace_request));
155154
mq=shm_toc_allocate(toc,QUEUE_SIZE);
156155
shm_toc_insert(toc,num_toc++,mq);
157-
grbui_shm=shm_toc_allocate(toc,grbui_EstimateShmemSize());
158-
shm_toc_insert(toc,num_toc++,grbui_shm);
159156
}
160157
else
161158
{
@@ -164,9 +161,9 @@ pg_qs_shmem_startup(void)
164161
params=shm_toc_lookup(toc,num_toc++);
165162
trace_req=shm_toc_lookup(toc,num_toc++);
166163
mq=shm_toc_lookup(toc,num_toc++);
167-
grbui_shm=shm_toc_lookup(toc,num_toc++);
168164
}
169-
grbui_ShmemInit(grbui_shm,found);
165+
166+
uirpcShmemInit();
170167

171168
if (prev_shmem_startup_hook)
172169
prev_shmem_startup_hook();
@@ -188,7 +185,7 @@ _PG_init(void)
188185
* the postmaster process.) We'll allocate or attach to the shared
189186
* resources in qs_shmem_startup().
190187
*/
191-
RequestAddinShmemSpace(pg_qs_shmem_size());
188+
RequestAddinShmemSpace(pg_qs_shmem_size()+uirpcEstimateShmemSize());
192189

193190
/* Register interrupt on custom signal of polling query state */
194191
RegisterGetRemoteBackendUserId();
@@ -583,6 +580,11 @@ pg_query_state(PG_FUNCTION_ARGS)
583580
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
584581
errmsg("backend with pid=%d not found",pid)));
585582

583+
counterpart_user_id=GetRemoteBackendUserId(proc);
584+
if (!(superuser()||GetUserId()==counterpart_user_id))
585+
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
586+
errmsg("permission denied")));
587+
586588
if (TEXT_CSTR_CMP(format_text,"text")==0)
587589
format=EXPLAIN_FORMAT_TEXT;
588590
elseif (TEXT_CSTR_CMP(format_text,"xml")==0)
@@ -601,11 +603,6 @@ pg_query_state(PG_FUNCTION_ARGS)
601603
init_lock_tag(&tag,PG_QUERY_STATE_KEY);
602604
LockAcquire(&tag,ExclusiveLock, false, false);
603605

604-
counterpart_user_id=GetRemoteBackendUserId(proc);
605-
if (!(superuser()||GetUserId()==counterpart_user_id))
606-
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
607-
errmsg("permission denied")));
608-
609606
/* fill in parameters of query state request */
610607
params->verbose=verbose;
611608
params->costs=costs;

‎pg_query_state.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ extern shm_mq *mq;
6969
externvoidSendQueryState(void);
7070
externvoidRegisterGetRemoteBackendUserId(void);
7171
externOidGetRemoteBackendUserId(PGPROC*proc);
72-
externSizegrbui_EstimateShmemSize(void);
73-
externvoidgrbui_ShmemInit(void*address,boolinitialized);
72+
externSizeuirpcEstimateShmemSize(void);
73+
externvoiduirpcShmemInit(void);
7474

7575
#endif

‎userid_rpc.c

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,15 @@
99
#defineQUEUE_SIZE(shm_mq_minimum_size + sizeof(Oid))
1010

1111
/* shared objects */
12-
staticshm_mq*mq=NULL;
12+
typedefstruct
13+
{
14+
LWLock*lock;
15+
Oiduserid;
16+
}uirpcFuncResult;
17+
staticuirpcFuncResult*resptr=NULL;
1318

1419
/* global variables */
15-
staticProcSignalReasonUserPollReason;
20+
staticProcSignalReasonUserPollReason=INVALID_PROCSIGNAL;
1621

1722
/*
1823
* Receive a message from a shared message queue until timeout is exceeded.
@@ -57,9 +62,9 @@ shm_mq_receive_with_timeout(shm_mq_handle *mqh, Size *nbytesp, void **datap,
5762
* Returns estimated size of shared memory needed for `GetRemoteBackendUserId`.
5863
*/
5964
Size
60-
grbui_EstimateShmemSize()
65+
uirpcEstimateShmemSize()
6166
{
62-
returnQUEUE_SIZE;
67+
returnMAXALIGN(sizeof(uirpcFuncResult));
6368
}
6469

6570
/*
@@ -69,22 +74,28 @@ grbui_EstimateShmemSize()
6974
* Flag `initialized` specifies the case when shmem is already initialized.
7075
*/
7176
void
72-
grbui_ShmemInit(void*address,boolinitialized)
77+
uirpcShmemInit()
7378
{
74-
mq=address;
79+
boolfound;
80+
81+
resptr=ShmemInitStruct("userid_rpc",
82+
sizeof(uirpcFuncResult),
83+
&found);
84+
if (!found)
85+
/* First time through ... */
86+
resptr->lock=&(GetNamedLWLockTranche("userid_rpc"))->lock;
87+
resptr->userid=InvalidOid;
7588
}
7689

7790
staticvoid
7891
SendCurrentUserId(void)
7992
{
80-
shm_mq_handle*mqh=shm_mq_attach(mq,NULL,NULL);
81-
Oiduser_oid=GetUserId();
82-
83-
shm_mq_send(mqh,sizeof(Oid),&user_oid, false);
93+
LWLockUpdateVar(resptr->lock, (uint64*)&resptr->userid,GetUserId());
8494
}
8595

8696
/*
87-
* Register `GetRemoteBackendUserId` function as RPC
97+
* Register `GetRemoteBackendUserId` function as RPC and request necessary
98+
* locks.
8899
*/
89100
void
90101
RegisterGetRemoteBackendUserId()
@@ -93,42 +104,41 @@ RegisterGetRemoteBackendUserId()
93104
if (UserPollReason==INVALID_PROCSIGNAL)
94105
ereport(WARNING, (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
95106
errmsg("pg_query_state isn't loaded: insufficient custom ProcSignal slots")));
107+
108+
RequestNamedLWLockTranche("userid_rpc",1);
96109
}
97110

98111
/*
99112
* Extract effective user id from external backend session specified by `proc`.
100113
*
114+
* This function must be called after `UserPollReason` register and
115+
* initialization specialized structures in shared memory.
101116
* Assume `proc` is valid backend and doesn't point to current process.
102117
*/
103118
Oid
104119
GetRemoteBackendUserId(PGPROC*proc)
105120
{
106-
intsig_result;
107-
shm_mq_handle*mqh;
108-
shm_mq_resultmq_receive_result;
109-
Oid*result;
110-
Sizeres_len;
121+
intsig_result;
122+
Oidresult=InvalidOid;
111123

124+
Assert(UserPollReason!=INVALID_PROCSIGNAL);
125+
Assert(resptr!=NULL);
112126
Assert(proc&&proc!=MyProc&&proc->backendId!=InvalidBackendId);
113127

114-
mq=shm_mq_create(mq,QUEUE_SIZE);
115-
shm_mq_set_sender(mq,proc);
116-
shm_mq_set_receiver(mq,MyProc);
128+
LWLockAcquire(resptr->lock,LW_EXCLUSIVE);
117129

118130
sig_result=SendProcSignal(proc->pid,UserPollReason,proc->backendId);
119131
if (sig_result==-1)
120132
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
121133
errmsg("invalid send signal")));
122134

123-
mqh=shm_mq_attach(mq,NULL,NULL);
124-
mq_receive_result=shm_mq_receive_with_timeout(mqh,
125-
&res_len,
126-
(void**)&result,
127-
TIMEOUT_MSEC);
128-
if (mq_receive_result!=SHM_MQ_SUCCESS)
129-
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
130-
errmsg("invalid read from message queue")));
131-
shm_mq_detach(mq);
135+
while (result==InvalidOid)
136+
LWLockWaitForVar(resptr->lock,
137+
(uint64*)&resptr->userid,
138+
result,
139+
(uint64*)&result);
140+
141+
LWLockRelease(resptr->lock);
132142

133-
return*result;
143+
returnresult;
134144
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp