Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6de2564

Browse files
anna-akentevadlepikhova
authored andcommitted
PGPRO-4197: code cleanup + send stuff by 1024 bytes now
1 parent0b3be2b commit6de2564

File tree

3 files changed

+19
-40
lines changed

3 files changed

+19
-40
lines changed

‎pg_query_state.c

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ static void qs_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction,
6060
#endif
6161
staticvoidqs_ExecutorFinish(QueryDesc*queryDesc);
6262

63+
staticshm_mq_resultreceive_msg_by_parts(shm_mq_handle*mqh,Size*total,
64+
void**datap,boolnowait);
65+
6366
/* Global variables */
6467
List*QueryDescStack=NIL;
6568
staticProcSignalReasonUserIdPollReason=INVALID_PROCSIGNAL;
@@ -777,7 +780,7 @@ shm_mq_receive_with_timeout(shm_mq_handle *mqh,
777780
{
778781
shm_mq_resultmq_receive_result;
779782

780-
mq_receive_result=shm_mq_receive(mqh,nbytesp,datap, true);
783+
mq_receive_result=receive_msg_by_parts(mqh,nbytesp,datap, true);
781784
if (mq_receive_result!=SHM_MQ_WOULD_BLOCK)
782785
returnmq_receive_result;
783786
if (rc&WL_TIMEOUT||delay <=0)
@@ -960,18 +963,9 @@ copy_msg(shm_mq_msg *msg)
960963
returnresult;
961964
}
962965

963-
// ----------------- DEBUG -----------------
964-
staticvoid
965-
print_recv_bytes(intnum,char*src,intoffset)
966-
{
967-
elog(INFO,"======= RECV MSG SEGMENT START (%d bytes) =======",num);
968-
for (inti=offset;i<offset+num;i++)
969-
elog(INFO,"RECV byte #%d = %02x",i, (unsignedchar)*(src+i));
970-
}
971-
// ----------------- DEBUG -----------------
972-
973966
staticshm_mq_result
974-
shm_mq_receive_by_bytes(shm_mq_handle*mqh,Size*total,void**datap)
967+
receive_msg_by_parts(shm_mq_handle*mqh,Size*total,void**datap,
968+
boolnowait)
975969
{
976970
shm_mq_resultmq_receive_result;
977971
shm_mq_msg*buff;
@@ -982,29 +976,25 @@ shm_mq_receive_by_bytes(shm_mq_handle *mqh, Size *total, void **datap)
982976

983977
/* Get the expected number of bytes in message */
984978
mq_receive_result=shm_mq_receive(mqh,&len, (void**)&expected, false);
979+
mq_receive_result=shm_mq_receive(mqh,&len, (void**)&expected,nowait);
985980
if (mq_receive_result!=SHM_MQ_SUCCESS)
986981
returnmq_receive_result;
987982
Assert(len==sizeof(int));
988-
//elog(INFO, "======= RECV MSG (expecting %d bytes) =======", *expected);
989983

990984
*datap=palloc0(*expected);
991985

992986
/* Get the message itself */
993-
for (offset=0,ii=0;offset<*expected;ii++)
987+
for (offset=0;offset<*expected; )
994988
{
995-
// Keep receiving new messages until we assemble the full message
996-
mq_receive_result=shm_mq_receive(mqh,&len, ((void**)&buff),false);
989+
/* Keep receiving new messages until we assemble the full message */
990+
mq_receive_result=shm_mq_receive(mqh,&len, ((void**)&buff),nowait);
997991
memcpy((char*)*datap+offset,buff,len);
998-
//print_recv_bytes(len, (char *) *datap, offset);
999992
offset+=len;
1000993
if (mq_receive_result!=SHM_MQ_SUCCESS)
1001994
returnmq_receive_result;
1002995
}
1003996

1004-
//elog(INFO, "RECV: END cycle - %d", ii);
1005997
*total=offset;
1006-
//mq_receive_result = shm_mq_receive(mqh, &len, (void **) &msg, false);
1007-
//*datap = buff;
1008998

1009999
returnmq_receive_result;
10101000
}
@@ -1081,7 +1071,8 @@ GetRemoteBackendQueryStates(PGPROC *leader,
10811071
/* extract query state from leader process */
10821072
mqh=shm_mq_attach(mq,NULL,NULL);
10831073
elog(DEBUG1,"Wait response from leader %d",leader->pid);
1084-
mq_receive_result=shm_mq_receive_by_bytes(mqh,&len, ((void**)&msg));
1074+
mq_receive_result=receive_msg_by_parts(mqh,&len, (void**)&msg,
1075+
false);
10851076
if (mq_receive_result!=SHM_MQ_SUCCESS)
10861077
gotomq_error;
10871078
if (msg->reqid!=reqid)

‎pg_query_state.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
#include"storage/shm_mq.h"
1919

2020
#defineQUEUE_SIZE(16 * 1024)
21-
#defineBUF_SIZE7
21+
#defineMSG_MAX_SIZE1024
2222

2323
#defineTIMINIG_OFF_WARNING 1
2424
#defineBUFFERS_OFF_WARNING 2

‎signal_handler.c

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ typedef struct
2727
char*plan;
2828
}stack_frame;
2929

30-
staticvoidsend_msg_by_bits(shm_mq_handle*mqh,Sizenbytes,constvoid*data);
30+
staticvoidsend_msg_by_parts(shm_mq_handle*mqh,Sizenbytes,constvoid*data);
3131

3232
/*
3333
*Get List of stack_frames as a stack of function calls starting from outermost call.
@@ -150,33 +150,21 @@ serialize_stack(char *dest, List *qs_stack)
150150
serialize_stack_frame(&dest,qs_frame);
151151
}
152152
}
153-
// ----------------- DEBUG -----------------
154-
staticvoid
155-
print_sent_bytes(intnum,char*src,intoffset)
156-
{
157-
elog(INFO,"======= SEND MSG SEGMENT START (%d bytes) =======",num);
158-
for (inti=offset;i<offset+num;i++)
159-
elog(INFO,"SENT byte #%d = %02x",i, (unsignedchar)*(src+i));
160-
}
161-
// ----------------- DEBUG -----------------
162153

163154
staticvoid
164-
send_msg_by_bits(shm_mq_handle*mqh,Sizenbytes,constvoid*data)
155+
send_msg_by_pats(shm_mq_handle*mqh,Sizenbytes,constvoid*data)
165156
{
166157
intbytes_left;
167158
intbytes_send;
168159

169160
/* Send the expected message length */
170161
shm_mq_send(mqh,sizeof(int),&nbytes, false);
171162

172-
//elog(INFO, "======= SEND MSG (%lu bytes) =======", nbytes);
173163
for (intoffset=0;offset<nbytes;offset+=bytes_send)
174164
{
175165
bytes_left=nbytes-offset;
176-
bytes_send= (bytes_left<BUF_SIZE) ?bytes_left :BUF_SIZE;
166+
bytes_send= (bytes_left<MSG_MAX_SIZE) ?bytes_left :MSG_MAX_SIZE;
177167
shm_mq_send(mqh,bytes_send,&(((unsignedchar*)data)[offset]), false);
178-
// DEBUG: print message that we just sent
179-
//print_sent_bytes(bytes_send, (char *) data, offset);
180168
}
181169
}
182170

@@ -238,15 +226,15 @@ SendQueryState(void)
238226
{
239227
shm_mq_msgmsg= {reqid,BASE_SIZEOF_SHM_MQ_MSG,MyProc,STAT_DISABLED };
240228

241-
send_msg_by_bits(mqh,msg.length,&msg);
229+
send_msg_by_parts(mqh,msg.length,&msg);
242230
}
243231

244232
/* check if backend doesn't execute any query */
245233
elseif (list_length(QueryDescStack)==0)
246234
{
247235
shm_mq_msgmsg= {reqid,BASE_SIZEOF_SHM_MQ_MSG,MyProc,QUERY_NOT_RUNNING };
248236

249-
send_msg_by_bits(mqh,msg.length,&msg);
237+
send_msg_by_parts(mqh,msg.length,&msg);
250238
}
251239

252240
/* happy path */
@@ -269,7 +257,7 @@ SendQueryState(void)
269257

270258
msg->stack_depth=list_length(qs_stack);
271259
serialize_stack(msg->stack,qs_stack);
272-
send_msg_by_bits(mqh,msglen,msg);
260+
send_msg_by_parts(mqh,msglen,msg);
273261
}
274262
elog(DEBUG1,"Worker %d sends response for pg_query_state to %d",shm_mq_get_sender(mq)->pid,shm_mq_get_receiver(mq)->pid);
275263
DetachPeer();

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp