Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1db67dc

Browse files
committed
Support replication of DDL
1 parent29c7901 commit1db67dc

File tree

4 files changed

+258
-87
lines changed

4 files changed

+258
-87
lines changed

‎contrib/multimaster/dtmd/src/server.c‎

Lines changed: 116 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -16,53 +16,61 @@
1616
#include<netinet/tcp.h>
1717
#include<netinet/in.h>
1818

19+
#ifdefUSE_EPOLL
20+
#include<sys/epoll.h>
21+
#endif
22+
1923
#include"server.h"
2024
#include"limits.h"
2125
#include"util.h"
2226
#include"sockhub.h"
2327

2428
typedefstructbuffer_t {
25-
intready;// number of bytes that are ready to be sent/processed
29+
intready;/* number of bytes that are ready to be sent/processed */
2630
ShubMessageHdr*curmessage;
27-
char*data;// dynamically allocated buffer
31+
char*data;/* dynamically allocated buffer */
2832
}buffer_t;
2933

3034
typedefstructstream_data_t*stream_t;
3135

3236
typedefstructclient_data_t {
33-
stream_tstream;// NULL: client value is empty
37+
stream_tstream;/* NULL: client value is empty */
3438
void*userdata;
3539
unsignedintchan;
3640
}client_data_t;
3741

3842
typedefstructstream_data_t {
3943
intfd;
40-
boolgood;// 'false': stop serving this stream and disconnect when possible
44+
boolgood;/* 'false': stop serving this stream and disconnect when possible */
4145
buffer_tinput;
4246
buffer_toutput;
4347

44-
// a map: 'chan' -> client_data_t
45-
// 'chan' is expected to be < MAX_FDS which is pretty low
46-
client_data_t*clients;// dynamically allocated
48+
/* a map: 'chan' -> client_data_t */
49+
/* 'chan' is expected to be < MAX_FDS which is pretty low */
50+
client_data_t*clients;/* dynamically allocated */
51+
structstream_data_t*next;
4752
}stream_data_t;
4853

4954
typedefstructserver_data_t {
5055
char*host;
5156
intport;
5257

53-
intlistener;// the listening socket
54-
fd_setall;// all sockets including the listener
58+
intlistener;/* the listening socket */
59+
#ifdefUSE_EPOLL
60+
intepollfd;
61+
#else
62+
fd_setall;/* all sockets including the listener */
5563
intmaxfd;
56-
57-
intstreamsnum;
58-
stream_data_tstreams[MAX_STREAMS];
64+
#endif
65+
stream_tused_chain;
66+
stream_tfree_chain;
5967

6068
onmessage_callback_tonmessage;
6169
onconnect_callback_tonconnect;
6270
ondisconnect_callback_tondisconnect;
6371
}server_data_t;
6472

65-
// Returns the created socket, or -1 if failed.
73+
/* Returns the created socket, or -1 if failed. */
6674
staticintcreate_listening_socket(constchar*host,intport) {
6775
ints=socket(AF_INET,SOCK_STREAM,0);
6876
if (s==-1) {
@@ -113,32 +121,56 @@ server_t server_init(
113121
returnserver;
114122
}
115123

124+
boolregister_socket(server_tserver,intfd,stream_tstream)
125+
{
126+
#ifdefUSE_EPOLL
127+
structepoll_eventev;
128+
ev.events=EPOLLIN;
129+
ev.data.ptr= (void*)stream;
130+
if (epoll_ctl(server->epollfd,EPOLL_CTL_ADD,fd,&ev)<0) {
131+
return false;
132+
}
133+
#else
134+
FD_SET(fd,&server->all);
135+
if (fd>server->maxfd) {
136+
server->maxfd=fd;
137+
}
138+
#endif
139+
return true;
140+
}
141+
116142
boolserver_start(server_tserver) {
117143
debug("starting the server\n");
118-
server->streamsnum=0;
119-
144+
server->free_chain=NULL;
145+
server->used_chain=NULL;
146+
120147
server->listener=create_listening_socket(server->host,server->port);
121148
if (server->listener==-1) {
122149
return false;
123150
}
124151

152+
#ifdefUSE_EPOLL
153+
server->epollfd=epoll_create(MAX_EVENTS);
154+
if (server->epollfd<0) {
155+
return false;
156+
}
157+
#else
125158
FD_ZERO(&server->all);
126-
FD_SET(server->listener,&server->all);
127-
server->maxfd=server->listener;
128-
129-
return true;
159+
server->maxfd=0;
160+
#endif
161+
returnregister_socket(server,server->listener,NULL);
130162
}
131163

132164
staticboolstream_flush(stream_tstream) {
133165
inttosend=stream->output.ready;
134166
if (tosend==0) {
135-
// nothing to do
167+
/* nothing to do */
136168
return true;
137169
}
138170

139171
char*cursor=stream->output.data;
140172
while (tosend>0) {
141-
// repeat sending until we send everything
173+
/* repeat sending until we send everything */
142174
intsent=send(stream->fd,cursor,tosend,0);
143175
if (sent==-1) {
144176
shout("failed to flush the stream\n");
@@ -153,7 +185,7 @@ static bool stream_flush(stream_t stream) {
153185
stream->output.ready=0;
154186
ShubMessageHdr*msg=stream->output.curmessage;
155187
if (msg) {
156-
// move the unfinished message to the start of the buffer
188+
/* move the unfinished message to the start of the buffer */
157189
memmove(stream->output.data,msg,msg->size+sizeof(ShubMessageHdr));
158190
stream->output.curmessage= (ShubMessageHdr*)stream->output.data;
159191
}
@@ -163,10 +195,9 @@ static bool stream_flush(stream_t stream) {
163195

164196
staticvoidserver_flush(server_tserver) {
165197
debug("flushing the streams\n");
166-
inti;
167-
for (i=0;i<server->streamsnum;i++) {
168-
stream_tstream=server->streams+i;
169-
stream_flush(stream);
198+
stream_ts;
199+
for (s=server->used_chain;s!=NULL;s=s->next) {
200+
stream_flush(s);
170201
}
171202
}
172203

@@ -187,7 +218,7 @@ static void stream_init(stream_t stream, int fd) {
187218

188219
stream->clients=malloc(MAX_TRANSACTIONS*sizeof(client_data_t));
189220
assert(stream->clients);
190-
// mark all clients as empty
221+
/* mark all clients as empty */
191222
for (i=0;i<MAX_TRANSACTIONS;i++) {
192223
stream->clients[i].stream=NULL;
193224
}
@@ -207,36 +238,28 @@ static void server_stream_destroy(server_t server, stream_t stream) {
207238
}
208239
}
209240
}
210-
211-
FD_CLR(stream->fd,&server->all);
241+
#ifdefUSE_EPOLL
242+
epoll_ctl(server->epollfd,EPOLL_CTL_DEL,stream->fd,NULL);
243+
#else
244+
FD_CLR(stream->fd,&server->all);
245+
#endif
212246
close(stream->fd);
213247
free(stream->clients);
214248
free(stream->input.data);
215249
free(stream->output.data);
216250
}
217251

218-
staticvoidstream_move(stream_tdst,stream_tsrc) {
219-
inti;
220-
*dst=*src;
221-
for (i=0;i<MAX_TRANSACTIONS;i++) {
222-
if (dst->clients[i].stream) {
223-
dst->clients[i].stream=dst;
224-
}
225-
}
226-
}
227-
228252
staticvoidserver_close_bad_streams(server_tserver) {
229-
inti;
230-
for (i=server->streamsnum-1;i >=0;i--) {
231-
stream_tstream=server->streams+i;
232-
if (!stream->good) {
233-
server_stream_destroy(server,stream);
234-
if (i!=server->streamsnum-1) {
235-
// move the last one here
236-
*stream=server->streams[server->streamsnum-1];
237-
stream_move(stream,server->streams+server->streamsnum-1);
238-
}
239-
server->streamsnum--;
253+
stream_ts,next,*spp;
254+
for (spp=&server->used_chain; (s=*spp)!=NULL;s=next) {
255+
next=s->next;
256+
if (!s->good) {
257+
server_stream_destroy(server,s);
258+
*spp=next;
259+
s->next=server->free_chain;
260+
server->free_chain=s;
261+
}else {
262+
spp=&s->next;
240263
}
241264
}
242265
}
@@ -279,7 +302,7 @@ static bool stream_message_append(stream_t stream, size_t len, void *data) {
279302

280303
intnewsize=stream->output.curmessage->size+sizeof(ShubMessageHdr)+len;
281304
if (newsize>BUFFER_SIZE) {
282-
// the flushing will not help here
305+
/* the flushing will not help here */
283306
shout("the message cannot be bigger than the buffer size\n");
284307
stream->good= false;
285308
return false;
@@ -326,7 +349,8 @@ bool client_message_finish(client_t client) {
326349
returnstream_message_finish(client->stream);
327350
}
328351

329-
boolclient_message_shortcut(client_tclient,xid_targ) {
352+
boolclient_message_shortcut(client_tclient,xid_targ)
353+
{
330354
if (!stream_message_start(client->stream,client->chan)) {
331355
return false;
332356
}
@@ -348,36 +372,33 @@ static bool server_accept(server_t server) {
348372
return false;
349373
}
350374
debug("a new connection accepted\n");
351-
352-
if (server->streamsnum >=MAX_STREAMS) {
353-
shout("streams limit hit, disconnecting the accepted connection\n");
354-
close(fd);
355-
return false;
375+
376+
stream_ts=server->free_chain;
377+
if (s==NULL) {
378+
s=malloc(sizeof(stream_data_t));
379+
}else {
380+
server->free_chain=s->next;
356381
}
382+
/* add new stream */
383+
s->next=server->used_chain;
384+
server->used_chain=s;
357385

358-
// add new stream
359-
stream_ts=server->streams+server->streamsnum++;
360386
stream_init(s,fd);
361387

362-
FD_SET(fd,&server->all);
363-
if (fd>server->maxfd) {
364-
server->maxfd=fd;
365-
}
366-
367-
return true;
388+
returnregister_socket(server,fd,s);
368389
}
369390

370391
staticclient_tstream_get_client(stream_tstream,unsignedintchan,bool*isnew) {
371392
assert(chan<MAX_TRANSACTIONS);
372393
client_tclient=stream->clients+chan;
373394
if (client->stream==NULL) {
374-
// client is new
395+
/* client is new */
375396
client->stream=stream;
376397
client->chan=chan;
377398
*isnew= true;
378399
client->userdata=NULL;
379400
}else {
380-
// collisions should not happen
401+
/* collisions should not happen */
381402
assert(client->chan==chan);
382403
*isnew= false;
383404
}
@@ -412,7 +433,7 @@ static bool server_stream_handle(server_t server, stream_t stream) {
412433
ShubMessageHdr*msg= (ShubMessageHdr*)cursor;
413434
intheader_and_data=sizeof(ShubMessageHdr)+msg->size;
414435
if (header_and_data <=toprocess) {
415-
// handle message
436+
/* handle message */
416437
boolisnew;
417438
client_tclient=stream_get_client(stream,msg->chan,&isnew);
418439
if (isnew) {
@@ -457,9 +478,30 @@ static bool server_stream_handle(server_t server, stream_t stream) {
457478
voidserver_loop(server_tserver) {
458479
while (1) {
459480
inti;
481+
intnumready;
482+
#ifdefUSE_EPOLL
483+
structepoll_eventevents[MAX_EVENTS];
484+
numready=epoll_wait(server->epollfd,events,MAX_EVENTS,-1);
485+
if (numready<0) {
486+
shout("failed to select: %s\n",strerror(errno));
487+
return;
488+
}
489+
for (i=0;i<numready;i++) {
490+
stream_tstream= (stream_t)events[i].data.ptr;
491+
if (stream==NULL) {
492+
server_accept(server);
493+
}else {
494+
if (events[i].events&EPOLLERR) {
495+
stream->good= false;
496+
}elseif (events[i].events&EPOLLIN) {
497+
server_stream_handle(server,stream);
498+
}
499+
}
500+
}
501+
#else
460502
fd_setreadfds=server->all;
461-
debug("selecting\n");
462503
intnumready=select(server->maxfd+1,&readfds,NULL,NULL,NULL);
504+
stream_ts;
463505
if (numready==-1) {
464506
shout("failed to select: %s\n",strerror(errno));
465507
return;
@@ -470,14 +512,13 @@ void server_loop(server_t server) {
470512
server_accept(server);
471513
}
472514

473-
for (i=0; (i<server->streamsnum)&& (numready>0);i++) {
474-
stream_tstream=server->streams+i;
475-
if (FD_ISSET(stream->fd,&readfds)) {
476-
server_stream_handle(server,stream);
515+
for (s=server_used_chain;s!=NULL&&numready>0;s=s->next) {
516+
if (FD_ISSET(s->fd,&readfds)) {
517+
server_stream_handle(server,s);
477518
numready--;
478519
}
479520
}
480-
521+
#endif
481522
server_close_bad_streams(server);
482523
server_flush(server);
483524
}
@@ -501,7 +542,7 @@ unsigned client_get_ip_addr(client_t client)
501542
}
502543

503544
#if0
504-
// usage example
545+
/* usage example */
505546

506547
voidtest_onconnect(client_tclient) {
507548
char*name="hello";

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp