Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite436c65

Browse files
committed
Bulk read of client requests in sockhub
1 parent65c7884 commite436c65

File tree

4 files changed

+117
-71
lines changed

4 files changed

+117
-71
lines changed

‎contrib/pg_dtm/sockhub/sockhub.c‎

Lines changed: 114 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -69,19 +69,24 @@ static void close_socket(Shub* shub, int fd)
6969
FD_CLR(fd,&shub->inset);
7070
}
7171

72-
staticintread_socket(intsd,char*buf,intsize)
72+
staticintread_socket_ex(intsd,char*buf,intmin_size,intmax_size)
7373
{
74-
while (size!=0) {
75-
intn=recv(sd,buf,size,0);
74+
intreceived=0;
75+
while (received<min_size) {
76+
intn=recv(sd,buf+received,max_size-received,0);
7677
if (n <=0) {
77-
return0;
78+
break;
7879
}
79-
size-=n;
80-
buf+=n;
80+
received+=n;
8181
}
82-
return1;
82+
returnreceived;
8383
}
8484

85+
staticintread_socket(intsd,char*buf,intsize)
86+
{
87+
returnread_socket_ex(sd,buf,size,size)==size;
88+
}
89+
8590
staticintwrite_socket(intsd,charconst*buf,intsize)
8691
{
8792
while (size!=0) {
@@ -235,9 +240,9 @@ void ShubLoop(Shub* shub)
235240
}
236241
}elseif (i==shub->output) {/* receive response from server */
237242
/* try to read as much as possible */
238-
intavailable=recv(shub->output,shub->out_buffer+shub->out_buffer_used,buffer_size-shub->out_buffer_used,0);
243+
intavailable=read_socket_ex(shub->output,shub->out_buffer+shub->out_buffer_used,sizeof(ShubMessageHdr),buffer_size-shub->out_buffer_used);
239244
intpos=0;
240-
if (available <=0) {
245+
if (available<sizeof(ShubMessageHdr)) {
241246
shub->params->error_handler("Failed to read inet socket",SHUB_RECOVERABLE_ERROR);
242247
reconnect(shub);
243248
continue;
@@ -279,74 +284,116 @@ void ShubLoop(Shub* shub)
279284
}
280285
pos+=n;
281286
}
282-
/* Move partly fetched message header (if any) to the beginning ofbyffer */
287+
/* Move partly fetched message header (if any) to the beginning ofbuffer */
283288
memcpy(shub->out_buffer,shub->out_buffer+pos,shub->out_buffer_used-pos);
284289
shub->out_buffer_used-=pos;
285290
}else {/* receive request from client */
286-
ShubMessageHdr*hdr= (ShubMessageHdr*)&shub->in_buffer[shub->in_buffer_used];
287291
intchan=i;
288-
if (!read_socket(chan, (char*)hdr,sizeof(ShubMessageHdr))) {/* fetch message header */
289-
shub->params->error_handler("Failed to read local socket",SHUB_RECOVERABLE_ERROR);
290-
close_socket(shub,i);
291-
}else {
292-
unsignedintsize=hdr->size;
293-
hdr->chan=chan;/* remember socket descriptor from which this message was read */
294-
if (size+shub->in_buffer_used+sizeof(ShubMessageHdr)>buffer_size) {
295-
/* message doesn't completely fit in buffer */
296-
if (shub->in_buffer_used!=0) {/* if buffer is not empty...*/
297-
/* ... then send it */
298-
while (!write_socket(shub->output,shub->in_buffer,shub->in_buffer_used)) {
299-
shub->params->error_handler("Failed to write to inet socket",SHUB_RECOVERABLE_ERROR);
300-
reconnect(shub);
301-
}
302-
/* move received message header to the beginning of the buffer */
303-
memcpy(shub->in_buffer,shub->in_buffer+shub->in_buffer_used,sizeof(ShubMessageHdr));
304-
shub->in_buffer_used=0;
305-
}
306-
}
307-
shub->in_buffer_used+=sizeof(ShubMessageHdr);
308-
309-
do {
310-
unsignedintn=size+shub->in_buffer_used>buffer_size ?buffer_size-shub->in_buffer_used :size;
311-
/* fetch message body */
312-
if (chan >=0&& !read_socket(chan,shub->in_buffer+shub->in_buffer_used,n)) {
313-
shub->params->error_handler("Failed to read local socket",SHUB_RECOVERABLE_ERROR);
314-
close_socket(shub,chan);
315-
if (hdr!=NULL) {/* if message header is not yet sent to the server... */
316-
/* ... then skip this message */
317-
shub->in_buffer_used= (char*)hdr-shub->in_buffer;
318-
break;
319-
}else {/* if message was partly sent to the server, we can not skip it, so we have to send garbage to the server */
320-
chan=-1;/* do not try to read rest of body of this message */
292+
intavailable=0;
293+
while (1) {
294+
available+=read_socket_ex(chan,&shub->in_buffer[shub->in_buffer_used+available],sizeof(ShubMessageHdr)-available,buffer_size-shub->in_buffer_used-available);
295+
if (available<sizeof(ShubMessageHdr)) {
296+
shub->params->error_handler("Failed to read local socket",SHUB_RECOVERABLE_ERROR);
297+
close_socket(shub,i);
298+
}else {
299+
intpos=0;
300+
/* loop through all fetched messages */
301+
while (pos+sizeof(ShubMessageHdr) <=available) {
302+
ShubMessageHdr*hdr= (ShubMessageHdr*)&shub->in_buffer[shub->in_buffer_used];
303+
unsignedintsize=hdr->size;
304+
pos+=sizeof(ShubMessageHdr)+size;
305+
hdr->chan=chan;/* remember socket descriptor from which this message was read */
306+
if (pos <=available) {
307+
shub->in_buffer_used+=sizeof(ShubMessageHdr)+size;
308+
continue;
321309
}
322-
}
323-
shub->in_buffer_used+=n;
324-
size-=n;
325-
/* if there is no more free space in the buffer to receive new message header... */
326-
if (shub->in_buffer_used+sizeof(ShubMessageHdr)>buffer_size) {
327310

328-
/* ... then send buffer to the server */
329-
while (!write_socket(shub->output,shub->in_buffer,shub->in_buffer_used)) {
330-
shub->params->error_handler("Failed to write to inet socket",SHUB_RECOVERABLE_ERROR);
331-
reconnect(shub);
332-
}
333-
hdr=NULL;/* message is partly sent to the server: can not skip it any more */
334-
shub->in_buffer_used=0;
311+
if (shub->in_buffer_used+sizeof(ShubMessageHdr)+size>buffer_size) {
312+
/* message doesn't completely fit in buffer */
313+
if (shub->in_buffer_used!=0) {/* if buffer is not empty...*/
314+
/* ... then send it */
315+
while (!write_socket(shub->output,shub->in_buffer,shub->in_buffer_used)) {
316+
shub->params->error_handler("Failed to write to inet socket",SHUB_RECOVERABLE_ERROR);
317+
reconnect(shub);
318+
}
319+
/* move received message header to the beginning of the buffer */
320+
memcpy(shub->in_buffer,shub->in_buffer+shub->in_buffer_used,buffer_size-shub->in_buffer_used);
321+
shub->in_buffer_used=0;
322+
}
323+
}
324+
shub->in_buffer_used+=sizeof(ShubMessageHdr)+size- (pos-available);
325+
size=pos-available;
326+
327+
do {
328+
unsignedintn=size+shub->in_buffer_used>buffer_size ?buffer_size-shub->in_buffer_used :size;
329+
/* fetch rest of message body */
330+
if (chan >=0&& !read_socket(chan,shub->in_buffer+shub->in_buffer_used,n)) {
331+
shub->params->error_handler("Failed to read local socket",SHUB_RECOVERABLE_ERROR);
332+
close_socket(shub,chan);
333+
if (hdr!=NULL) {/* if message header is not yet sent to the server... */
334+
/* ... then skip this message */
335+
shub->in_buffer_used= (char*)hdr-shub->in_buffer;
336+
break;
337+
}else {/* if message was partly sent to the server, we can not skip it, so we have to send garbage to the server */
338+
chan=-1;/* do not try to read rest of body of this message */
339+
}
340+
}
341+
shub->in_buffer_used+=n;
342+
size-=n;
343+
/* if there is no more free space in the buffer to receive new message header... */
344+
if (shub->in_buffer_used+sizeof(ShubMessageHdr)>buffer_size) {
345+
/* ... then send buffer to the server */
346+
while (!write_socket(shub->output,shub->in_buffer,shub->in_buffer_used)) {
347+
shub->params->error_handler("Failed to write to inet socket",SHUB_RECOVERABLE_ERROR);
348+
reconnect(shub);
349+
}
350+
hdr=NULL;/* message is partly sent to the server: can not skip it any more */
351+
shub->in_buffer_used=0;
352+
}
353+
}while (size!=0);/* repeat until all message body is received */
354+
355+
pos=available;
356+
break;
335357
}
336-
}while (size!=0);/* repeat until all message body is received */
337-
}
358+
if (chan >=0&&pos!=available) {/* partly fetched message header */
359+
if (shub->in_buffer_used+sizeof(ShubMessageHdr)>buffer_size) {
360+
/* message doesn't completely fit in buffer */
361+
while (!write_socket(shub->output,shub->in_buffer,shub->in_buffer_used)) {
362+
shub->params->error_handler("Failed to write to inet socket",SHUB_RECOVERABLE_ERROR);
363+
reconnect(shub);
364+
}
365+
/* move received message header to the beginning of the buffer */
366+
memcpy(shub->in_buffer,shub->in_buffer+shub->in_buffer_used,available-pos);
367+
shub->in_buffer_used=0;
368+
}
369+
available-=pos;
370+
continue;
371+
}
372+
}
373+
break;
374+
}
338375
}
339376
}
340377
}
341-
}else {/* timeout expired */
342-
if (shub->in_buffer_used!=0) {/* if buffer is not empty... */
343-
/* ...then send it */
344-
while (!write_socket(shub->output,shub->in_buffer,shub->in_buffer_used)) {
345-
shub->params->error_handler("Failed to write to inet socket",SHUB_RECOVERABLE_ERROR);
346-
reconnect(shub);
347-
}
348-
shub->in_buffer_used=0;
349-
}
378+
if (shub->params->delay!=0) {
379+
continue;
380+
}
381+
}
382+
if (shub->in_buffer_used!=0) {/* if buffer is not empty... */
383+
/* ...then send it */
384+
#ifSHOW_SENT_STATISTIC
385+
staticsize_ttotal_sent;
386+
staticsize_ttotal_count;
387+
total_sent+=shub->in_buffer_used;
388+
if (++total_count %1024==0) {
389+
printf("Average sent buffer size: %ld\n",total_sent/total_count);
390+
}
391+
#endif
392+
while (!write_socket(shub->output,shub->in_buffer,shub->in_buffer_used)) {
393+
shub->params->error_handler("Failed to write to inet socket",SHUB_RECOVERABLE_ERROR);
394+
reconnect(shub);
395+
}
396+
shub->in_buffer_used=0;
350397
}
351398
}
352399
}

‎contrib/pg_dtm/sockhub/start-clients.sh‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
n_clients=10
1+
n_clients=100
22
n_iters=100000
33
./sockhub -h$1 -p 5001 -f /tmp/p5002&
44
for((i=0;i<n_clients;i++))

‎contrib/pg_dtm/tests/pg_shard_transfers.go‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ import (
1111
)
1212

1313
const (
14-
TRANSFER_CONNECTIONS=8
14+
TRANSFER_CONNECTIONS=50
1515
INIT_AMOUNT=10000
16-
N_ITERATIONS=10000
16+
N_ITERATIONS=2000
1717
N_ACCOUNTS=2*100000
1818
)
1919

‎contrib/pg_dtm/tests/transfers.go‎

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,6 @@ func prepare_one(connstr string, wg *sync.WaitGroup) {
204204
exec(conn,"drop table if exists t")
205205
exec(conn,"create table t(u int primary key, v int)")
206206

207-
exec(conn,"begin transaction isolation level "+cfg.Isolation)
208207
exec(conn,"begin transaction isolation level "+cfg.Isolation)
209208

210209
start:=time.Now()

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp