Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf3bb0b7

Browse files
author
Maksim Milyutin
committed
Add timeout support to sh_mq and example call of function on external backend
1 parent9a31e28 commitf3bb0b7

File tree

1 file changed

+95
-1
lines changed

1 file changed

+95
-1
lines changed

‎pg_query_state.c

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717
#include"miscadmin.h"
1818
#include"pgstat.h"
1919
#include"storage/ipc.h"
20+
#include"storage/procarray.h"
2021
#include"storage/procsignal.h"
2122
#include"storage/shm_toc.h"
2223
#include"utils/guc.h"
24+
#include"utils/timestamp.h"
2325

2426
#ifdefPG_MODULE_MAGIC
2527
PG_MODULE_MAGIC;
@@ -60,6 +62,7 @@ static void qs_postExecProcNode(PlanState *planstate, TupleTableSlot *result);
6062
/* Global variables */
6163
List*QueryDescStack=NIL;
6264
staticProcSignalReasonQueryStatePollReason;
65+
staticProcSignalReasonRolePollReason;
6366
staticboolmodule_initialized= false;
6467
staticconstchar*be_state_str[]= {/* BackendState -> string repr */
6568
"undefined",/* STATE_UNDEFINED */
@@ -90,6 +93,9 @@ typedef struct
9093
pid_ttraceable;
9194
}trace_request;
9295

96+
staticvoidSendCurrentRoleOid(void);
97+
OidGetRemoteBackendUser(pid_tpid,int*error_code);
98+
9399
/* Shared memory variables */
94100
shm_toc*toc=NULL;
95101
user_data*caller=NULL;
@@ -181,7 +187,8 @@ _PG_init(void)
181187

182188
/* Register interrupt on custom signal of polling query state */
183189
QueryStatePollReason=RegisterCustomProcSignalHandler(SendQueryState);
184-
if (QueryStatePollReason==INVALID_PROCSIGNAL)
190+
RolePollReason=RegisterCustomProcSignalHandler(SendCurrentRoleOid);
191+
if (QueryStatePollReason==INVALID_PROCSIGNAL||RolePollReason==INVALID_PROCSIGNAL)
185192
{
186193
ereport(WARNING, (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
187194
errmsg("pg_query_state isn't loaded: insufficient custom ProcSignal slots")));
@@ -608,6 +615,9 @@ pg_query_state(PG_FUNCTION_ARGS)
608615
init_lock_tag(&tag,PG_QUERY_STATE_KEY);
609616
LockAcquire(&tag,ExclusiveLock, false, false);
610617

618+
interror_code;
619+
Oiduser_id=GetRemoteBackendUser(pid,&error_code);
620+
611621
/* fill in caller's user data */
612622
caller->user_id=GetUserId();
613623
caller->superuser=superuser();
@@ -811,3 +821,87 @@ executor_continue(PG_FUNCTION_ARGS)
811821

812822
PG_RETURN_VOID();
813823
}
824+
825+
staticshm_mq_result
826+
shm_mq_receive_with_timeout(shm_mq_handle*mqh,Size*nbytesp,void**datap,longtimeout)
827+
{
828+
829+
#ifdefHAVE_INT64_TIMESTAMP
830+
#defineGetNowLong()((long) GetCurrentTimestamp() / 1000)
831+
#else
832+
#defineGetNowLong()1000 * GetCurrentTimestamp()
833+
#endif
834+
835+
longendtime=GetNowLong()+timeout;
836+
837+
for (;;)
838+
{
839+
intrc;
840+
longdelay;
841+
shm_mq_resultmq_receive_result=shm_mq_receive(mqh,nbytesp,datap, true);
842+
843+
if (mq_receive_result!=SHM_MQ_WOULD_BLOCK)
844+
returnmq_receive_result;
845+
846+
delay=endtime-GetNowLong();
847+
rc=WaitLatch(MyLatch,WL_LATCH_SET |WL_TIMEOUT,delay);
848+
CHECK_FOR_INTERRUPTS();
849+
ResetLatch(MyLatch);
850+
851+
if (rc&WL_TIMEOUT)
852+
returnSHM_MQ_WOULD_BLOCK;
853+
}
854+
}
855+
856+
staticvoid
857+
SendCurrentRoleOid(void)
858+
{
859+
shm_mq_handle*mqh=shm_mq_attach(mq,NULL,NULL);
860+
Oidrole_oid=GetUserId();
861+
862+
shm_mq_send(mqh,sizeof(Oid),&role_oid, false);
863+
}
864+
865+
#defineNOT_BACKEND_PROCESS1
866+
#defineCOULD_NOT_SEND_SIGNAL 2
867+
#defineINVALID_MQ_READ3
868+
869+
Oid
870+
GetRemoteBackendUser(pid_tpid,int*error_code)
871+
{
872+
PGPROC*proc=BackendPidGetProc(pid);
873+
intsig_result;
874+
shm_mq_handle*mqh;
875+
shm_mq_resultmq_receive_result;
876+
Oid*result;
877+
Sizeres_len;
878+
879+
if (proc==NULL||proc->backendId==InvalidBackendId)
880+
{
881+
*error_code=NOT_BACKEND_PROCESS;
882+
returnInvalidOid;
883+
}
884+
885+
mq=shm_mq_create(mq,QUEUE_SIZE);
886+
shm_mq_set_sender(mq,proc);
887+
shm_mq_set_receiver(mq,MyProc);
888+
889+
sig_result=SendProcSignal(pid,RolePollReason,proc->backendId);
890+
if (sig_result==-1)
891+
{
892+
*error_code=COULD_NOT_SEND_SIGNAL;
893+
returnInvalidOid;
894+
}
895+
896+
mqh=shm_mq_attach(mq,NULL,NULL);
897+
mq_receive_result=shm_mq_receive_with_timeout(mqh,&res_len, (void**)&result,1000);
898+
if (mq_receive_result!=SHM_MQ_SUCCESS)
899+
{
900+
*error_code=INVALID_MQ_READ;
901+
returnInvalidOid;
902+
}
903+
904+
shm_mq_detach(mq);
905+
906+
return*result;
907+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp