99#define QUEUE_SIZE (shm_mq_minimum_size + sizeof(Oid))
1010
1111/* shared objects */
12- static shm_mq * mq = NULL ;
12+ typedef struct
13+ {
14+ LWLock * lock ;
15+ Oid userid ;
16+ }uirpcFuncResult ;
17+ static uirpcFuncResult * resptr = NULL ;
1318
1419/* global variables */
15- static ProcSignalReason UserPollReason ;
20+ static ProcSignalReason UserPollReason = INVALID_PROCSIGNAL ;
1621
1722/*
1823 * Receive a message from a shared message queue until timeout is exceeded.
@@ -57,9 +62,9 @@ shm_mq_receive_with_timeout(shm_mq_handle *mqh, Size *nbytesp, void **datap,
5762 * Returns estimated size of shared memory needed for `GetRemoteBackendUserId`.
5863 */
5964Size
60- grbui_EstimateShmemSize ()
65+ uirpcEstimateShmemSize ()
6166{
62- return QUEUE_SIZE ;
67+ return MAXALIGN ( sizeof ( uirpcFuncResult )) ;
6368}
6469
6570/*
@@ -69,22 +74,28 @@ grbui_EstimateShmemSize()
6974 * Flag `initialized` specifies the case when shmem is already initialized.
7075 */
7176void
72- grbui_ShmemInit ( void * address , bool initialized )
77+ uirpcShmemInit ( )
7378{
74- mq = address ;
79+ bool found ;
80+
81+ resptr = ShmemInitStruct ("userid_rpc" ,
82+ sizeof (uirpcFuncResult ),
83+ & found );
84+ if (!found )
85+ /* First time through ... */
86+ resptr -> lock = & (GetNamedLWLockTranche ("userid_rpc" ))-> lock ;
87+ resptr -> userid = InvalidOid ;
7588}
7689
7790static void
7891SendCurrentUserId (void )
7992{
80- shm_mq_handle * mqh = shm_mq_attach (mq ,NULL ,NULL );
81- Oid user_oid = GetUserId ();
82-
83- shm_mq_send (mqh ,sizeof (Oid ),& user_oid , false);
93+ LWLockUpdateVar (resptr -> lock , (uint64 * )& resptr -> userid ,GetUserId ());
8494}
8595
8696/*
87- * Register `GetRemoteBackendUserId` function as RPC
97+ * Register `GetRemoteBackendUserId` function as RPC and request necessary
98+ * locks.
8899 */
89100void
90101RegisterGetRemoteBackendUserId ()
@@ -93,42 +104,41 @@ RegisterGetRemoteBackendUserId()
93104if (UserPollReason == INVALID_PROCSIGNAL )
94105ereport (WARNING , (errcode (ERRCODE_INSUFFICIENT_RESOURCES ),
95106errmsg ("pg_query_state isn't loaded: insufficient custom ProcSignal slots" )));
107+
108+ RequestNamedLWLockTranche ("userid_rpc" ,1 );
96109}
97110
98111/*
99112 * Extract effective user id from external backend session specified by `proc`.
100113 *
114+ * This function must be called after `UserPollReason` register and
115+ * initialization specialized structures in shared memory.
101116 * Assume `proc` is valid backend and doesn't point to current process.
102117 */
103118Oid
104119GetRemoteBackendUserId (PGPROC * proc )
105120{
106- int sig_result ;
107- shm_mq_handle * mqh ;
108- shm_mq_result mq_receive_result ;
109- Oid * result ;
110- Size res_len ;
121+ int sig_result ;
122+ Oid result = InvalidOid ;
111123
124+ Assert (UserPollReason != INVALID_PROCSIGNAL );
125+ Assert (resptr != NULL );
112126Assert (proc && proc != MyProc && proc -> backendId != InvalidBackendId );
113127
114- mq = shm_mq_create (mq ,QUEUE_SIZE );
115- shm_mq_set_sender (mq ,proc );
116- shm_mq_set_receiver (mq ,MyProc );
128+ LWLockAcquire (resptr -> lock ,LW_EXCLUSIVE );
117129
118130sig_result = SendProcSignal (proc -> pid ,UserPollReason ,proc -> backendId );
119131if (sig_result == -1 )
120132ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
121133errmsg ("invalid send signal" )));
122134
123- mqh = shm_mq_attach (mq ,NULL ,NULL );
124- mq_receive_result = shm_mq_receive_with_timeout (mqh ,
125- & res_len ,
126- (void * * )& result ,
127- TIMEOUT_MSEC );
128- if (mq_receive_result != SHM_MQ_SUCCESS )
129- ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
130- errmsg ("invalid read from message queue" )));
131- shm_mq_detach (mq );
135+ while (result == InvalidOid )
136+ LWLockWaitForVar (resptr -> lock ,
137+ (uint64 * )& resptr -> userid ,
138+ result ,
139+ (uint64 * )& result );
140+
141+ LWLockRelease (resptr -> lock );
132142
133- return * result ;
143+ return result ;
134144}