Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit06e1028

Browse files
committed
Add more sckhub tests
1 parent582d9f5 commit06e1028

File tree

7 files changed

+266
-27
lines changed

7 files changed

+266
-27
lines changed

‎contrib/pg_dtm/sockhub/Makefile‎

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
CC = gcc
2-
CFLAGS = -c -I. -Wall -O2 -g -fPIC
2+
CFLAGS = -c -I. -Wall -O0 -g -fPIC
33
LD =$(CC)
44
LDFLAGS = -g
55
AR = ar
@@ -9,7 +9,7 @@ all: sockhub library tests
99

1010
library: libsockhub.a
1111

12-
tests: test-client test-server
12+
tests: test-client test-async-client test-server
1313

1414
sockhup.o: sockhub.c sockhub.h
1515
$(CC)$(CFLAGS) sockhub.c
@@ -30,6 +30,12 @@ test-client.o: test-client.c sockhub.h
3030
test-client: test-client.o libsockhub.a
3131
$(LD)$(LDFLAGS) -o test-client test-client.o libsockhub.a
3232

33+
test-async-client.o: test-async-client.c sockhub.h
34+
$(CC)$(CFLAGS) test-async-client.c
35+
36+
test-async-client: test-async-client.o libsockhub.a
37+
$(LD)$(LDFLAGS) -o test-async-client test-async-client.o libsockhub.a
38+
3339
test-server.o: test-server.c sockhub.h
3440
$(CC)$(CFLAGS) test-server.c
3541

‎contrib/pg_dtm/sockhub/sockhub.c‎

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include<string.h>
1616
#include<signal.h>
1717
#include<errno.h>
18+
#include<assert.h>
1819

1920
#include"sockhub.h"
2021

@@ -70,11 +71,11 @@ static void close_socket(Shub* shub, int fd)
7071
FD_CLR(fd,&shub->inset);
7172
}
7273

73-
staticintread_socket_ex(intsd,char*buf,intmin_size,intmax_size)
74+
intShubReadSocketEx(intsd,void*buf,intmin_size,intmax_size)
7475
{
7576
intreceived=0;
7677
while (received<min_size) {
77-
intn=recv(sd,buf+received,max_size-received,0);
78+
intn=recv(sd,(char*)buf+received,max_size-received,0);
7879
if (n <=0) {
7980
break;
8081
}
@@ -83,20 +84,21 @@ static int read_socket_ex(int sd, char* buf, int min_size, int max_size)
8384
returnreceived;
8485
}
8586

86-
staticintread_socket(intsd,char*buf,intsize)
87+
intShubReadSocket(intsd,void*buf,intsize)
8788
{
88-
returnread_socket_ex(sd,buf,size,size)==size;
89+
returnShubReadSocketEx(sd,buf,size,size)==size;
8990
}
9091

91-
staticintwrite_socket(intsd,charconst*buf,intsize)
92+
intShubWriteSocket(intsd,voidconst*buf,intsize)
9293
{
94+
char*src= (char*)buf;
9395
while (size!=0) {
94-
intn=send(sd,buf,size,0);
96+
intn=send(sd,src,size,0);
9597
if (n <=0) {
9698
return0;
9799
}
98100
size-=n;
99-
buf+=n;
101+
src+=n;
100102
}
101103
return1;
102104
}
@@ -180,7 +182,7 @@ static void notify_disconnect(Shub* shub, int chan)
180182
hdr->code=MSG_DISCONNECT;
181183
shub->in_buffer_used+=sizeof(ShubMessageHdr);
182184
if (shub->in_buffer_used+sizeof(ShubMessageHdr)>shub->params->buffer_size) {
183-
while (!write_socket(shub->output,shub->in_buffer,shub->in_buffer_used)) {
185+
while (!ShubWriteSocket(shub->output,shub->in_buffer,shub->in_buffer_used)) {
184186
shub->params->error_handler("Failed to write to inet socket",SHUB_RECOVERABLE_ERROR);
185187
reconnect(shub);
186188
}
@@ -291,7 +293,7 @@ void ShubLoop(Shub* shub)
291293
}
292294
}elseif (i==shub->output) {/* receive response from server */
293295
/* try to read as much as possible */
294-
intavailable=read_socket_ex(shub->output,shub->out_buffer+shub->out_buffer_used,sizeof(ShubMessageHdr),buffer_size-shub->out_buffer_used);
296+
intavailable=ShubReadSocketEx(shub->output,shub->out_buffer+shub->out_buffer_used,sizeof(ShubMessageHdr),buffer_size-shub->out_buffer_used);
295297
intpos=0;
296298
if (available<sizeof(ShubMessageHdr)) {
297299
shub->params->error_handler("Failed to read inet socket",SHUB_RECOVERABLE_ERROR);
@@ -307,7 +309,7 @@ void ShubLoop(Shub* shub)
307309
unsignedintn=pos+sizeof(ShubMessageHdr)+hdr->size <=shub->out_buffer_used
308310
?hdr->size+sizeof(ShubMessageHdr)
309311
:shub->out_buffer_used-pos;
310-
if (!write_socket(chan, (char*)hdr,n)) {
312+
if (!ShubWriteSocket(chan, (char*)hdr,n)) {
311313
shub->params->error_handler("Failed to write to local socket",SHUB_RECOVERABLE_ERROR);
312314
close_socket(shub,chan);
313315
notify_disconnect(shub,chan);
@@ -318,12 +320,12 @@ void ShubLoop(Shub* shub)
318320
inttail=hdr->size+sizeof(ShubMessageHdr)-n;
319321
do {
320322
n=tail<buffer_size ?tail :buffer_size;
321-
if (!read_socket(shub->output,shub->out_buffer,n)) {
323+
if (!ShubReadSocket(shub->output,shub->out_buffer,n)) {
322324
shub->params->error_handler("Failed to read inet socket",SHUB_RECOVERABLE_ERROR);
323325
reconnect(shub);
324326
continue;
325327
}
326-
if (chan >=0&& !write_socket(chan,shub->out_buffer,n)) {
328+
if (chan >=0&& !ShubWriteSocket(chan,shub->out_buffer,n)) {
327329
shub->params->error_handler("Failed to write to local socket",SHUB_RECOVERABLE_ERROR);
328330
close_socket(shub,chan);
329331
notify_disconnect(shub,chan);
@@ -344,7 +346,8 @@ void ShubLoop(Shub* shub)
344346
intchan=i;
345347
intavailable=0;
346348
while (1) {
347-
available+=read_socket_ex(chan,&shub->in_buffer[shub->in_buffer_used+available],sizeof(ShubMessageHdr)-available,buffer_size-shub->in_buffer_used-available);
349+
assert(sizeof(ShubMessageHdr)>available);
350+
available+=ShubReadSocketEx(chan,&shub->in_buffer[shub->in_buffer_used+available],sizeof(ShubMessageHdr)-available,buffer_size-shub->in_buffer_used-available);
348351
if (available<sizeof(ShubMessageHdr)) {
349352
shub->params->error_handler("Failed to read local socket",SHUB_RECOVERABLE_ERROR);
350353
close_socket(shub,i);
@@ -366,12 +369,12 @@ void ShubLoop(Shub* shub)
366369
/* message doesn't completely fit in buffer */
367370
if (shub->in_buffer_used!=0) {/* if buffer is not empty...*/
368371
/* ... then send it */
369-
while (!write_socket(shub->output,shub->in_buffer,shub->in_buffer_used)) {
372+
while (!ShubWriteSocket(shub->output,shub->in_buffer,shub->in_buffer_used)) {
370373
shub->params->error_handler("Failed to write to inet socket",SHUB_RECOVERABLE_ERROR);
371374
reconnect(shub);
372375
}
373376
/* move received message header to the beginning of the buffer */
374-
memcpy(shub->in_buffer,shub->in_buffer+shub->in_buffer_used,buffer_size-shub->in_buffer_used);
377+
memcpy(shub->in_buffer,shub->in_buffer+shub->in_buffer_used,available-shub->in_buffer_used);
375378
shub->in_buffer_used=0;
376379
}
377380
}
@@ -381,7 +384,7 @@ void ShubLoop(Shub* shub)
381384
do {
382385
unsignedintn=size+shub->in_buffer_used>buffer_size ?buffer_size-shub->in_buffer_used :size;
383386
/* fetch rest of message body */
384-
if (chan >=0&& !read_socket(chan,shub->in_buffer+shub->in_buffer_used,n)) {
387+
if (chan >=0&& !ShubReadSocket(chan,shub->in_buffer+shub->in_buffer_used,n)) {
385388
shub->params->error_handler("Failed to read local socket",SHUB_RECOVERABLE_ERROR);
386389
close_socket(shub,chan);
387390
if (hdr!=NULL) {/* if message header is not yet sent to the server... */
@@ -398,7 +401,7 @@ void ShubLoop(Shub* shub)
398401
/* if there is no more free space in the buffer to receive new message header... */
399402
if (shub->in_buffer_used+sizeof(ShubMessageHdr)>buffer_size) {
400403
/* ... then send buffer to the server */
401-
while (!write_socket(shub->output,shub->in_buffer,shub->in_buffer_used)) {
404+
while (!ShubWriteSocket(shub->output,shub->in_buffer,shub->in_buffer_used)) {
402405
shub->params->error_handler("Failed to write to inet socket",SHUB_RECOVERABLE_ERROR);
403406
reconnect(shub);
404407
}
@@ -417,7 +420,7 @@ void ShubLoop(Shub* shub)
417420
if (chan >=0&&pos!=available) {/* partly fetched message header */
418421
if (shub->in_buffer_used+sizeof(ShubMessageHdr)>buffer_size) {
419422
/* message doesn't completely fit in buffer */
420-
while (!write_socket(shub->output,shub->in_buffer,shub->in_buffer_used)) {
423+
while (!ShubWriteSocket(shub->output,shub->in_buffer,shub->in_buffer_used)) {
421424
shub->params->error_handler("Failed to write to inet socket",SHUB_RECOVERABLE_ERROR);
422425
reconnect(shub);
423426
}
@@ -448,7 +451,7 @@ void ShubLoop(Shub* shub)
448451
printf("Average sent buffer size: %ld\n",total_sent/total_count);
449452
}
450453
#endif
451-
while (!write_socket(shub->output,shub->in_buffer,shub->in_buffer_used)) {
454+
while (!ShubWriteSocket(shub->output,shub->in_buffer,shub->in_buffer_used)) {
452455
shub->params->error_handler("Failed to write to inet socket",SHUB_RECOVERABLE_ERROR);
453456
reconnect(shub);
454457
}

‎contrib/pg_dtm/sockhub/sockhub.h‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ typedef struct
4949
ShubParams*params;
5050
}Shub;
5151

52+
intShubReadSocketEx(intsd,void*buf,intmin_size,intmax_size);
53+
intShubReadSocket(intsd,void*buf,intsize);
54+
intShubWriteSocket(intsd,voidconst*buf,intsize);
55+
5256
voidShubInitParams(ShubParams*params);
5357
voidShubInitialize(Shub*shub,ShubParams*params);
5458
voidShubLoop(Shub*shub);
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
n_clients=10
2+
n_iters=1000
3+
pkill -9 sockub
4+
pkill -9 test-async-client
5+
./sockhub -h$1 -p 5001 -f /tmp/p5002&
6+
for((i=0;i<n_clients;i++))
7+
do
8+
./test-async-client -h localhost -p 5002 -i$n_iters&
9+
done

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp