16
16
#include <netinet/tcp.h>
17
17
#include <netinet/in.h>
18
18
19
+ #ifdef USE_EPOLL
20
+ #include <sys/epoll.h>
21
+ #endif
22
+
19
23
#include "server.h"
20
24
#include "limits.h"
21
25
#include "util.h"
22
26
#include "sockhub.h"
23
27
24
28
typedef struct buffer_t {
25
- int ready ;// number of bytes that are ready to be sent/processed
29
+ int ready ;/* number of bytes that are ready to be sent/processed */
26
30
ShubMessageHdr * curmessage ;
27
- char * data ;// dynamically allocated buffer
31
+ char * data ;/* dynamically allocated buffer */
28
32
}buffer_t ;
29
33
30
34
typedef struct stream_data_t * stream_t ;
31
35
32
36
typedef struct client_data_t {
33
- stream_t stream ;// NULL: client value is empty
37
+ stream_t stream ;/* NULL: client value is empty */
34
38
void * userdata ;
35
39
unsignedint chan ;
36
40
}client_data_t ;
37
41
38
42
typedef struct stream_data_t {
39
43
int fd ;
40
- bool good ;// 'false': stop serving this stream and disconnect when possible
44
+ bool good ;/* 'false': stop serving this stream and disconnect when possible */
41
45
buffer_t input ;
42
46
buffer_t output ;
43
47
44
- // a map: 'chan' -> client_data_t
45
- // 'chan' is expected to be < MAX_FDS which is pretty low
46
- client_data_t * clients ;// dynamically allocated
48
+ /* a map: 'chan' -> client_data_t */
49
+ /* 'chan' is expected to be < MAX_FDS which is pretty low */
50
+ client_data_t * clients ;/* dynamically allocated */
51
+ struct stream_data_t * next ;
47
52
}stream_data_t ;
48
53
49
54
typedef struct server_data_t {
50
55
char * host ;
51
56
int port ;
52
57
53
- int listener ;// the listening socket
54
- fd_set all ;// all sockets including the listener
58
+ int listener ;/* the listening socket */
59
+ #ifdef USE_EPOLL
60
+ int epollfd ;
61
+ #else
62
+ fd_set all ;/* all sockets including the listener */
55
63
int maxfd ;
56
-
57
- int streamsnum ;
58
- stream_data_t streams [ MAX_STREAMS ] ;
64
+ #endif
65
+ stream_t used_chain ;
66
+ stream_t free_chain ;
59
67
60
68
onmessage_callback_t onmessage ;
61
69
onconnect_callback_t onconnect ;
62
70
ondisconnect_callback_t ondisconnect ;
63
71
}server_data_t ;
64
72
65
- // Returns the created socket, or -1 if failed.
73
+ /* Returns the created socket, or -1 if failed. */
66
74
static int create_listening_socket (const char * host ,int port ) {
67
75
int s = socket (AF_INET ,SOCK_STREAM ,0 );
68
76
if (s == -1 ) {
@@ -113,32 +121,56 @@ server_t server_init(
113
121
return server ;
114
122
}
115
123
124
+ bool register_socket (server_t server ,int fd ,stream_t stream )
125
+ {
126
+ #ifdef USE_EPOLL
127
+ struct epoll_event ev ;
128
+ ev .events = EPOLLIN ;
129
+ ev .data .ptr = (void * )stream ;
130
+ if (epoll_ctl (server -> epollfd ,EPOLL_CTL_ADD ,fd ,& ev )< 0 ) {
131
+ return false;
132
+ }
133
+ #else
134
+ FD_SET (fd ,& server -> all );
135
+ if (fd > server -> maxfd ) {
136
+ server -> maxfd = fd ;
137
+ }
138
+ #endif
139
+ return true;
140
+ }
141
+
116
142
bool server_start (server_t server ) {
117
143
debug ("starting the server\n" );
118
- server -> streamsnum = 0 ;
119
-
144
+ server -> free_chain = NULL ;
145
+ server -> used_chain = NULL ;
146
+
120
147
server -> listener = create_listening_socket (server -> host ,server -> port );
121
148
if (server -> listener == -1 ) {
122
149
return false;
123
150
}
124
151
152
+ #ifdef USE_EPOLL
153
+ server -> epollfd = epoll_create (MAX_EVENTS );
154
+ if (server -> epollfd < 0 ) {
155
+ return false;
156
+ }
157
+ #else
125
158
FD_ZERO (& server -> all );
126
- FD_SET (server -> listener ,& server -> all );
127
- server -> maxfd = server -> listener ;
128
-
129
- return true;
159
+ server -> maxfd = 0 ;
160
+ #endif
161
+ return register_socket (server ,server -> listener ,NULL );
130
162
}
131
163
132
164
static bool stream_flush (stream_t stream ) {
133
165
int tosend = stream -> output .ready ;
134
166
if (tosend == 0 ) {
135
- // nothing to do
167
+ /* nothing to do */
136
168
return true;
137
169
}
138
170
139
171
char * cursor = stream -> output .data ;
140
172
while (tosend > 0 ) {
141
- // repeat sending until we send everything
173
+ /* repeat sending until we send everything */
142
174
int sent = send (stream -> fd ,cursor ,tosend ,0 );
143
175
if (sent == -1 ) {
144
176
shout ("failed to flush the stream\n" );
@@ -153,7 +185,7 @@ static bool stream_flush(stream_t stream) {
153
185
stream -> output .ready = 0 ;
154
186
ShubMessageHdr * msg = stream -> output .curmessage ;
155
187
if (msg ) {
156
- // move the unfinished message to the start of the buffer
188
+ /* move the unfinished message to the start of the buffer */
157
189
memmove (stream -> output .data ,msg ,msg -> size + sizeof (ShubMessageHdr ));
158
190
stream -> output .curmessage = (ShubMessageHdr * )stream -> output .data ;
159
191
}
@@ -163,10 +195,9 @@ static bool stream_flush(stream_t stream) {
163
195
164
196
static void server_flush (server_t server ) {
165
197
debug ("flushing the streams\n" );
166
- int i ;
167
- for (i = 0 ;i < server -> streamsnum ;i ++ ) {
168
- stream_t stream = server -> streams + i ;
169
- stream_flush (stream );
198
+ stream_t s ;
199
+ for (s = server -> used_chain ;s != NULL ;s = s -> next ) {
200
+ stream_flush (s );
170
201
}
171
202
}
172
203
@@ -187,7 +218,7 @@ static void stream_init(stream_t stream, int fd) {
187
218
188
219
stream -> clients = malloc (MAX_TRANSACTIONS * sizeof (client_data_t ));
189
220
assert (stream -> clients );
190
- // mark all clients as empty
221
+ /* mark all clients as empty */
191
222
for (i = 0 ;i < MAX_TRANSACTIONS ;i ++ ) {
192
223
stream -> clients [i ].stream = NULL ;
193
224
}
@@ -207,36 +238,28 @@ static void server_stream_destroy(server_t server, stream_t stream) {
207
238
}
208
239
}
209
240
}
210
-
211
- FD_CLR (stream -> fd ,& server -> all );
241
+ #ifdef USE_EPOLL
242
+ epoll_ctl (server -> epollfd ,EPOLL_CTL_DEL ,stream -> fd ,NULL );
243
+ #else
244
+ FD_CLR (stream -> fd ,& server -> all );
245
+ #endif
212
246
close (stream -> fd );
213
247
free (stream -> clients );
214
248
free (stream -> input .data );
215
249
free (stream -> output .data );
216
250
}
217
251
218
- static void stream_move (stream_t dst ,stream_t src ) {
219
- int i ;
220
- * dst = * src ;
221
- for (i = 0 ;i < MAX_TRANSACTIONS ;i ++ ) {
222
- if (dst -> clients [i ].stream ) {
223
- dst -> clients [i ].stream = dst ;
224
- }
225
- }
226
- }
227
-
228
252
static void server_close_bad_streams (server_t server ) {
229
- int i ;
230
- for (i = server -> streamsnum - 1 ;i >=0 ;i -- ) {
231
- stream_t stream = server -> streams + i ;
232
- if (!stream -> good ) {
233
- server_stream_destroy (server ,stream );
234
- if (i != server -> streamsnum - 1 ) {
235
- // move the last one here
236
- * stream = server -> streams [server -> streamsnum - 1 ];
237
- stream_move (stream ,server -> streams + server -> streamsnum - 1 );
238
- }
239
- server -> streamsnum -- ;
253
+ stream_t s ,next ,* spp ;
254
+ for (spp = & server -> used_chain ; (s = * spp )!= NULL ;s = next ) {
255
+ next = s -> next ;
256
+ if (!s -> good ) {
257
+ server_stream_destroy (server ,s );
258
+ * spp = next ;
259
+ s -> next = server -> free_chain ;
260
+ server -> free_chain = s ;
261
+ }else {
262
+ spp = & s -> next ;
240
263
}
241
264
}
242
265
}
@@ -279,7 +302,7 @@ static bool stream_message_append(stream_t stream, size_t len, void *data) {
279
302
280
303
int newsize = stream -> output .curmessage -> size + sizeof (ShubMessageHdr )+ len ;
281
304
if (newsize > BUFFER_SIZE ) {
282
- // the flushing will not help here
305
+ /* the flushing will not help here */
283
306
shout ("the message cannot be bigger than the buffer size\n" );
284
307
stream -> good = false;
285
308
return false;
@@ -326,7 +349,8 @@ bool client_message_finish(client_t client) {
326
349
return stream_message_finish (client -> stream );
327
350
}
328
351
329
- bool client_message_shortcut (client_t client ,xid_t arg ) {
352
+ bool client_message_shortcut (client_t client ,xid_t arg )
353
+ {
330
354
if (!stream_message_start (client -> stream ,client -> chan )) {
331
355
return false;
332
356
}
@@ -348,36 +372,33 @@ static bool server_accept(server_t server) {
348
372
return false;
349
373
}
350
374
debug ("a new connection accepted\n" );
351
-
352
- if (server -> streamsnum >=MAX_STREAMS ) {
353
- shout ("streams limit hit, disconnecting the accepted connection\n" );
354
- close (fd );
355
- return false;
375
+
376
+ stream_t s = server -> free_chain ;
377
+ if (s == NULL ) {
378
+ s = malloc (sizeof (stream_data_t ));
379
+ }else {
380
+ server -> free_chain = s -> next ;
356
381
}
382
+ /* add new stream */
383
+ s -> next = server -> used_chain ;
384
+ server -> used_chain = s ;
357
385
358
- // add new stream
359
- stream_t s = server -> streams + server -> streamsnum ++ ;
360
386
stream_init (s ,fd );
361
387
362
- FD_SET (fd ,& server -> all );
363
- if (fd > server -> maxfd ) {
364
- server -> maxfd = fd ;
365
- }
366
-
367
- return true;
388
+ return register_socket (server ,fd ,s );
368
389
}
369
390
370
391
static client_t stream_get_client (stream_t stream ,unsignedint chan ,bool * isnew ) {
371
392
assert (chan < MAX_TRANSACTIONS );
372
393
client_t client = stream -> clients + chan ;
373
394
if (client -> stream == NULL ) {
374
- // client is new
395
+ /* client is new */
375
396
client -> stream = stream ;
376
397
client -> chan = chan ;
377
398
* isnew = true;
378
399
client -> userdata = NULL ;
379
400
}else {
380
- // collisions should not happen
401
+ /* collisions should not happen */
381
402
assert (client -> chan == chan );
382
403
* isnew = false;
383
404
}
@@ -412,7 +433,7 @@ static bool server_stream_handle(server_t server, stream_t stream) {
412
433
ShubMessageHdr * msg = (ShubMessageHdr * )cursor ;
413
434
int header_and_data = sizeof (ShubMessageHdr )+ msg -> size ;
414
435
if (header_and_data <=toprocess ) {
415
- // handle message
436
+ /* handle message */
416
437
bool isnew ;
417
438
client_t client = stream_get_client (stream ,msg -> chan ,& isnew );
418
439
if (isnew ) {
@@ -457,9 +478,30 @@ static bool server_stream_handle(server_t server, stream_t stream) {
457
478
void server_loop (server_t server ) {
458
479
while (1 ) {
459
480
int i ;
481
+ int numready ;
482
+ #ifdef USE_EPOLL
483
+ struct epoll_event events [MAX_EVENTS ];
484
+ numready = epoll_wait (server -> epollfd ,events ,MAX_EVENTS ,-1 );
485
+ if (numready < 0 ) {
486
+ shout ("failed to select: %s\n" ,strerror (errno ));
487
+ return ;
488
+ }
489
+ for (i = 0 ;i < numready ;i ++ ) {
490
+ stream_t stream = (stream_t )events [i ].data .ptr ;
491
+ if (stream == NULL ) {
492
+ server_accept (server );
493
+ }else {
494
+ if (events [i ].events & EPOLLERR ) {
495
+ stream -> good = false;
496
+ }else if (events [i ].events & EPOLLIN ) {
497
+ server_stream_handle (server ,stream );
498
+ }
499
+ }
500
+ }
501
+ #else
460
502
fd_set readfds = server -> all ;
461
- debug ("selecting\n" );
462
503
int numready = select (server -> maxfd + 1 ,& readfds ,NULL ,NULL ,NULL );
504
+ stream_t s ;
463
505
if (numready == -1 ) {
464
506
shout ("failed to select: %s\n" ,strerror (errno ));
465
507
return ;
@@ -470,14 +512,13 @@ void server_loop(server_t server) {
470
512
server_accept (server );
471
513
}
472
514
473
- for (i = 0 ; (i < server -> streamsnum )&& (numready > 0 );i ++ ) {
474
- stream_t stream = server -> streams + i ;
475
- if (FD_ISSET (stream -> fd ,& readfds )) {
476
- server_stream_handle (server ,stream );
515
+ for (s = server_used_chain ;s != NULL && numready > 0 ;s = s -> next ) {
516
+ if (FD_ISSET (s -> fd ,& readfds )) {
517
+ server_stream_handle (server ,s );
477
518
numready -- ;
478
519
}
479
520
}
480
-
521
+ #endif
481
522
server_close_bad_streams (server );
482
523
server_flush (server );
483
524
}
@@ -501,7 +542,7 @@ unsigned client_get_ip_addr(client_t client)
501
542
}
502
543
503
544
#if 0
504
- // usage example
545
+ /* usage example */
505
546
506
547
void test_onconnect (client_t client ) {
507
548
char * name = "hello" ;