Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0b3be2b

Browse files
anna-akentevadlepikhova
authored andcommitted
PGPRO-4197: attempt to send stuff byte-wise in pg_query_state
1 parentfa5c7c0 commit0b3be2b

File tree

3 files changed

+85
-4
lines changed

3 files changed

+85
-4
lines changed

‎pg_query_state.c

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -960,6 +960,55 @@ copy_msg(shm_mq_msg *msg)
960960
returnresult;
961961
}
962962

963+
// ----------------- DEBUG -----------------
964+
staticvoid
965+
print_recv_bytes(intnum,char*src,intoffset)
966+
{
967+
elog(INFO,"======= RECV MSG SEGMENT START (%d bytes) =======",num);
968+
for (inti=offset;i<offset+num;i++)
969+
elog(INFO,"RECV byte #%d = %02x",i, (unsignedchar)*(src+i));
970+
}
971+
// ----------------- DEBUG -----------------
972+
973+
staticshm_mq_result
974+
shm_mq_receive_by_bytes(shm_mq_handle*mqh,Size*total,void**datap)
975+
{
976+
shm_mq_resultmq_receive_result;
977+
shm_mq_msg*buff;
978+
intii;
979+
intoffset;
980+
int*expected;
981+
Sizelen;
982+
983+
/* Get the expected number of bytes in message */
984+
mq_receive_result=shm_mq_receive(mqh,&len, (void**)&expected, false);
985+
if (mq_receive_result!=SHM_MQ_SUCCESS)
986+
returnmq_receive_result;
987+
Assert(len==sizeof(int));
988+
//elog(INFO, "======= RECV MSG (expecting %d bytes) =======", *expected);
989+
990+
*datap=palloc0(*expected);
991+
992+
/* Get the message itself */
993+
for (offset=0,ii=0;offset<*expected;ii++)
994+
{
995+
// Keep receiving new messages until we assemble the full message
996+
mq_receive_result=shm_mq_receive(mqh,&len, ((void**)&buff), false);
997+
memcpy((char*)*datap+offset,buff,len);
998+
//print_recv_bytes(len, (char *) *datap, offset);
999+
offset+=len;
1000+
if (mq_receive_result!=SHM_MQ_SUCCESS)
1001+
returnmq_receive_result;
1002+
}
1003+
1004+
//elog(INFO, "RECV: END cycle - %d", ii);
1005+
*total=offset;
1006+
//mq_receive_result = shm_mq_receive(mqh, &len, (void **) &msg, false);
1007+
//*datap = buff;
1008+
1009+
returnmq_receive_result;
1010+
}
1011+
9631012
staticList*
9641013
GetRemoteBackendQueryStates(PGPROC*leader,
9651014
List*pworkers,
@@ -1032,7 +1081,7 @@ GetRemoteBackendQueryStates(PGPROC *leader,
10321081
/* extract query state from leader process */
10331082
mqh=shm_mq_attach(mq,NULL,NULL);
10341083
elog(DEBUG1,"Wait response from leader %d",leader->pid);
1035-
mq_receive_result=shm_mq_receive(mqh,&len, (void**)&msg, false);
1084+
mq_receive_result=shm_mq_receive_by_bytes(mqh,&len, ((void**)&msg));
10361085
if (mq_receive_result!=SHM_MQ_SUCCESS)
10371086
gotomq_error;
10381087
if (msg->reqid!=reqid)

‎pg_query_state.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include"storage/shm_mq.h"
1919

2020
#defineQUEUE_SIZE(16 * 1024)
21+
#defineBUF_SIZE7
2122

2223
#defineTIMINIG_OFF_WARNING 1
2324
#defineBUFFERS_OFF_WARNING 2

‎signal_handler.c

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ typedef struct
2727
char*plan;
2828
}stack_frame;
2929

30+
staticvoidsend_msg_by_bits(shm_mq_handle*mqh,Sizenbytes,constvoid*data);
31+
3032
/*
3133
*Get List of stack_frames as a stack of function calls starting from outermost call.
3234
*Each entry contains query text and query state in form of EXPLAIN ANALYZE output.
@@ -148,6 +150,35 @@ serialize_stack(char *dest, List *qs_stack)
148150
serialize_stack_frame(&dest,qs_frame);
149151
}
150152
}
153+
// ----------------- DEBUG -----------------
154+
staticvoid
155+
print_sent_bytes(intnum,char*src,intoffset)
156+
{
157+
elog(INFO,"======= SEND MSG SEGMENT START (%d bytes) =======",num);
158+
for (inti=offset;i<offset+num;i++)
159+
elog(INFO,"SENT byte #%d = %02x",i, (unsignedchar)*(src+i));
160+
}
161+
// ----------------- DEBUG -----------------
162+
163+
staticvoid
164+
send_msg_by_bits(shm_mq_handle*mqh,Sizenbytes,constvoid*data)
165+
{
166+
intbytes_left;
167+
intbytes_send;
168+
169+
/* Send the expected message length */
170+
shm_mq_send(mqh,sizeof(int),&nbytes, false);
171+
172+
//elog(INFO, "======= SEND MSG (%lu bytes) =======", nbytes);
173+
for (intoffset=0;offset<nbytes;offset+=bytes_send)
174+
{
175+
bytes_left=nbytes-offset;
176+
bytes_send= (bytes_left<BUF_SIZE) ?bytes_left :BUF_SIZE;
177+
shm_mq_send(mqh,bytes_send,&(((unsignedchar*)data)[offset]), false);
178+
// DEBUG: print message that we just sent
179+
//print_sent_bytes(bytes_send, (char *) data, offset);
180+
}
181+
}
151182

152183
/*
153184
* Send state of current query to shared queue.
@@ -207,15 +238,15 @@ SendQueryState(void)
207238
{
208239
shm_mq_msgmsg= {reqid,BASE_SIZEOF_SHM_MQ_MSG,MyProc,STAT_DISABLED };
209240

210-
shm_mq_send(mqh,msg.length,&msg, false);
241+
send_msg_by_bits(mqh,msg.length,&msg);
211242
}
212243

213244
/* check if backend doesn't execute any query */
214245
elseif (list_length(QueryDescStack)==0)
215246
{
216247
shm_mq_msgmsg= {reqid,BASE_SIZEOF_SHM_MQ_MSG,MyProc,QUERY_NOT_RUNNING };
217248

218-
shm_mq_send(mqh,msg.length,&msg, false);
249+
send_msg_by_bits(mqh,msg.length,&msg);
219250
}
220251

221252
/* happy path */
@@ -238,7 +269,7 @@ SendQueryState(void)
238269

239270
msg->stack_depth=list_length(qs_stack);
240271
serialize_stack(msg->stack,qs_stack);
241-
shm_mq_send(mqh,msglen,msg, false);
272+
send_msg_by_bits(mqh,msglen,msg);
242273
}
243274
elog(DEBUG1,"Worker %d sends response for pg_query_state to %d",shm_mq_get_sender(mq)->pid,shm_mq_get_receiver(mq)->pid);
244275
DetachPeer();

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp