Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit94d09aa

Browse files
committed
Switch multimaster to the unified arbiter and sockhub implementation.
1 parent3f68177 commit94d09aa

File tree

69 files changed

+520
-5595
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+520
-5595
lines changed

‎contrib/arbiter/README‎

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ The commands:
5050
[RES_OK, min, max] if reserved a range [min, max]
5151
[RES_FAILED] on failure
5252

53-
'b': begin(size)
54-
Starts a global transaction and assign a 'xid' to it. 'size' is used
55-
for vote results calculation. The arbiter also creates and returns the
53+
'b': begin(), begin(size)
54+
Starts a global transaction and assign a 'xid' to it.Optional'size' is
55+
usedfor vote results calculation. The arbiter also creates and returns the
5656
snapshot.
5757

5858
The arbiter replies with:

‎contrib/arbiter/api/arbiter.c‎

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ void ArbiterInitSnapshot(Snapshot snapshot)
345345
}
346346
}
347347

348-
TransactionIdArbiterStartTransaction(Snapshotsnapshot,TransactionId*gxmin)
348+
TransactionIdArbiterStartTransaction(Snapshotsnapshot,TransactionId*gxmin,intnParticipants)
349349
{
350350
inti;
351351
xid_txid;
@@ -359,7 +359,11 @@ TransactionId ArbiterStartTransaction(Snapshot snapshot, TransactionId *gxmin)
359359
assert(snapshot!=NULL);
360360

361361
// command
362-
if (!arbiter_send_command(arbiter,CMD_BEGIN,0)) gotofailure;
362+
if (nParticipants) {
363+
if (!arbiter_send_command(arbiter,CMD_BEGIN,1,nParticipants)) gotofailure;
364+
}else {
365+
if (!arbiter_send_command(arbiter,CMD_BEGIN,0)) gotofailure;
366+
}
363367

364368
// results
365369
reslen=arbiter_recv_results(arbiter,RESULTS_SIZE,results);

‎contrib/arbiter/api/arbiter.h‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ void ArbiterInitSnapshot(Snapshot snapshot);
2121
* smallest xmin among all snapshots known to arbiter. Returns INVALID_XID
2222
* otherwise.
2323
*/
24-
TransactionIdArbiterStartTransaction(Snapshotsnapshot,TransactionId*gxmin);
24+
TransactionIdArbiterStartTransaction(Snapshotsnapshot,TransactionId*gxmin,intnParticipants);
2525

2626
/**
2727
* Asks the arbiter for a fresh snapshot. Fills the 'snapshot' and 'gxmin' on
Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,25 @@
11
#ifndefARBITER_LIMITS_H
22
#defineARBITER_LIMITS_H
33

4-
// how many xids are reserved per raft term
4+
/* how many xids are reserved per raft term */
55
#defineXIDS_PER_TERM 1000000
66

7-
// start a new term when this number of xids is left
7+
/* start a new term when this number of xids is left */
88
#defineNEW_TERM_THRESHOLD 100000
99

1010
#defineMAX_TRANSACTIONS 4096
1111
#defineMAX_SNAPSHOTS_PER_TRANS 8
1212

13-
#defineBUFFER_SIZE (64 * 1024)
13+
#defineBUFFER_SIZE (256 * 1024)
1414
#defineLISTEN_QUEUE_SIZE 100
1515
#defineMAX_STREAMS 4096
16+
#defineSOCKET_BUFFER_SIZE (1024 * 1024)
1617

1718
#defineMAX_SERVERS 16
1819
#defineHEARTBEAT_TIMEOUT_MS 20
1920
#defineELECTION_TIMEOUT_MS_MIN 150
2021
#defineELECTION_TIMEOUT_MS_MAX 300
2122
#defineRAFT_LOGLEN 1024
22-
#defineRAFT_KEEP_APPLIED 512 // how many applied entries to keep during compaction
23+
#defineRAFT_KEEP_APPLIED 512/* how many applied entries to keep during compaction */
2324

2425
#endif

‎contrib/arbiter/include/transaction.h‎

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ typedef struct Transaction {
2121
xid_txid;
2222
xid_txmin;
2323

24-
intsize;// number of paritcipants
24+
intsize;// number of participants
25+
boolfixed_size;
2526

2627
// for + against ≤ size
2728
intvotes_for;
@@ -57,8 +58,6 @@ static inline void l2_list_unlink(L2List* elem)
5758
elem->prev->next=elem->next;
5859
}
5960

60-
61-
6261
Snapshot*transaction_latest_snapshot(Transaction*t);
6362
Snapshot*transaction_snapshot(Transaction*t,intsnapno);
6463
Snapshot*transaction_next_snapshot(Transaction*t);

‎contrib/arbiter/sockhub/Makefile‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
CC = gcc
2-
CFLAGS = -c -I. -Wall -O3 -g -fPIC
2+
CFLAGS = -c -I. -Wall -O0 -g -fPIC
33
LD =$(CC)
44
LDFLAGS = -g
55
AR = ar

‎contrib/arbiter/sockhub/sockhub.c‎

Lines changed: 91 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,31 @@
1919

2020
#include"sockhub.h"
2121

22+
#defineSOCKHUB_BUFFER_SIZE (1024*1024)
23+
#defineERR_BUF_SIZE 1024
24+
25+
voidShubAddSocket(Shub*shub,intfd);
26+
27+
inlinevoidShubAddSocket(Shub*shub,intfd)
28+
{
29+
#ifdefUSE_EPOLL
30+
structepoll_eventev;
31+
ev.events=EPOLLIN;
32+
ev.data.fd=fd;
33+
if (epoll_ctl(shub->epollfd,EPOLL_CTL_ADD,fd,&ev)<0) {
34+
charbuf[ERR_BUF_SIZE];
35+
sprintf(buf,"Failed to add socket %d to epoll set",fd);
36+
shub->params->error_handler(buf,SHUB_FATAL_ERROR);
37+
}
38+
#else
39+
FD_SET(fd,&shub->inset);
40+
if (fd>shub->max_fd) {
41+
shub->max_fd=fd;
42+
}
43+
#endif
44+
}
45+
46+
2247
staticvoiddefault_error_handler(charconst*msg,ShubErrorSeverityseverity)
2348
{
2449
perror(msg);
@@ -116,8 +141,15 @@ static int resolve_host_by_name(const char *hostname, unsigned* addrs, unsigned*
116141

117142
staticvoidclose_socket(Shub*shub,intfd)
118143
{
119-
close(fd);
144+
#ifdefUSE_EPOLL
145+
if (epoll_ctl(shub->epollfd,EPOLL_CTL_DEL,fd,NULL)<0) {
146+
charbuf[ERR_BUF_SIZE];
147+
sprintf(buf,"Failed to remove socket %d from epoll set",fd);
148+
shub->params->error_handler(buf,SHUB_RECOVERABLE_ERROR);
149+
}
150+
#else
120151
FD_CLR(fd,&shub->inset);
152+
#endif
121153
}
122154

123155
intShubReadSocketEx(intsd,void*buf,intmin_size,intmax_size)
@@ -211,7 +243,12 @@ static void reconnect(Shub* shub)
211243
}else {
212244
intoptval=1;
213245
setsockopt(shub->output,IPPROTO_TCP,TCP_NODELAY, (charconst*)&optval,sizeof(optval));
214-
FD_SET(shub->output,&shub->inset);
246+
optval=SOCKHUB_BUFFER_SIZE;
247+
setsockopt(shub->output,SOL_SOCKET,SO_SNDBUF, (constchar*)&optval,sizeof(int));
248+
optval=SOCKHUB_BUFFER_SIZE;
249+
setsockopt(shub->output,SOL_SOCKET,SO_RCVBUF, (constchar*)&optval,sizeof(int));
250+
251+
ShubAddSocket(shub,shub->output);
215252
return;
216253
}
217254
}
@@ -238,6 +275,7 @@ static void notify_disconnect(Shub* shub, int chan)
238275

239276
staticvoidrecovery(Shub*shub)
240277
{
278+
#ifndefUSE_EPOLL
241279
inti,max_fd;
242280

243281
for (i=0,max_fd=shub->max_fd;i <=max_fd;i++) {
@@ -254,6 +292,7 @@ static void recovery(Shub* shub)
254292
}
255293
}
256294
}
295+
#endif
257296
}
258297

259298
voidShubInitialize(Shub*shub,ShubParams*params)
@@ -276,11 +315,17 @@ void ShubInitialize(Shub* shub, ShubParams* params)
276315
if (listen(shub->input,params->queue_size)<0) {
277316
shub->params->error_handler("Failed to listen local socket",SHUB_FATAL_ERROR);
278317
}
279-
FD_ZERO(&shub->inset);
280-
FD_SET(shub->input,&shub->inset);
281-
282318
shub->output=-1;
283-
shub->max_fd=shub->input;
319+
#ifdefUSE_EPOLL
320+
shub->epollfd=epoll_create(MAX_EVENTS);
321+
if (shub->epollfd<0) {
322+
shub->params->error_handler("Failed to create epoll",SHUB_FATAL_ERROR);
323+
}
324+
#else
325+
FD_ZERO(&shub->inset);
326+
shub->max_fd=0;
327+
#endif
328+
ShubAddSocket(shub,shub->input);
284329
reconnect(shub);
285330

286331
shub->in_buffer=malloc(params->buffer_size);
@@ -301,43 +346,61 @@ static void die(int sig)
301346
voidShubLoop(Shub*shub)
302347
{
303348
intbuffer_size=shub->params->buffer_size;
349+
sigset_tsset;
304350
signal(SIGINT,die);
305351
signal(SIGQUIT,die);
306352
signal(SIGTERM,die);
307-
// signal(SIGHUP, die);
308-
sigset_tsset;
353+
/* signal(SIGHUP, die); */
309354
sigfillset(&sset);
310355
sigprocmask(SIG_UNBLOCK,&sset,NULL);
311356

312-
while (!stop) {
357+
while (!stop) {
358+
inti,rc;
359+
#ifdefUSE_EPOLL
360+
structepoll_eventevents[MAX_EVENTS];
361+
rc=epoll_wait(shub->epollfd,events,MAX_EVENTS,shub->in_buffer_used==0 ?-1 :shub->params->delay);
362+
#else
313363
fd_setevents;
314364
structtimevaltm;
315-
inti,rc;
316365
intmax_fd=shub->max_fd;
317366

318367
tm.tv_sec=shub->params->delay/1000;
319368
tm.tv_usec=shub->params->delay %1000*1000;
320-
321369
events=shub->inset;
322370
rc=select(max_fd+1,&events,NULL,NULL,shub->in_buffer_used==0 ?NULL :&tm);
323-
if (rc<0) {
324-
if (errno!=EINTR) {
371+
#endif
372+
if (rc<0) {
373+
if (errno!=EINTR) {
325374
shub->params->error_handler("Select failed",SHUB_RECOVERABLE_ERROR);
326375
recovery(shub);
327376
}
328377
}else {
329378
if (rc>0) {
379+
#ifdefUSE_EPOLL
380+
intj;
381+
intn=rc;
382+
for (j=0;j<n;j++) {
383+
i=events[j].data.fd;
384+
if (events[j].events&EPOLLERR) {
385+
if (i==shub->input) {
386+
shub->params->error_handler("Input socket error",SHUB_FATAL_ERROR);
387+
}elseif (i==shub->output) {
388+
reconnect(shub);
389+
}else {
390+
notify_disconnect(shub,i);
391+
close_socket(shub,i);
392+
}
393+
}elseif (events[j].events&EPOLLIN) {
394+
#else
330395
for (i=0;i <=max_fd;i++) {
331396
if (FD_ISSET(i,&events)) {
332-
if (i==shub->input) {/* accept incomming connection */
397+
#endif
398+
if (i==shub->input) {/* accept incomming connection */
333399
ints=accept(i,NULL,NULL);
334400
if (s<0) {
335401
shub->params->error_handler("Failed to accept socket",SHUB_RECOVERABLE_ERROR);
336402
}else {
337-
if (s>shub->max_fd) {
338-
shub->max_fd=s;
339-
}
340-
FD_SET(s,&shub->inset);
403+
ShubAddSocket(shub,s);
341404
}
342405
}elseif (i==shub->output) {/* receive response from server */
343406
/* try to read as much as possible */
@@ -424,8 +487,11 @@ void ShubLoop(Shub* shub)
424487
assert(sizeof(ShubMessageHdr)>available);
425488
/* read as much as possible */
426489
rc=ShubReadSocketEx(chan,&shub->in_buffer[pos+available],sizeof(ShubMessageHdr)-available,buffer_size-pos-available);
427-
if (rc<sizeof(ShubMessageHdr)-available) {
428-
shub->params->error_handler("Failed to read local socket",SHUB_RECOVERABLE_ERROR);
490+
if (rc<sizeof(ShubMessageHdr)-available) {
491+
charbuf[ERR_BUF_SIZE];
492+
sprintf(buf,"Failed to read local socket chan=%d, rc=%d, min requested=%ld, max requested=%d, errno=%d",chan,rc,sizeof(ShubMessageHdr)-available,buffer_size-pos-available,errno);
493+
shub->params->error_handler(buf,SHUB_RECOVERABLE_ERROR);
494+
//shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
429495
close_socket(shub,i);
430496
shub->in_buffer_used=pos;
431497
notify_disconnect(shub,i);
@@ -460,8 +526,11 @@ void ShubLoop(Shub* shub)
460526
/* fetch rest of message body */
461527
do {
462528
unsignedintn=processed+size>buffer_size ?buffer_size-processed :size;
463-
if (chan >=0&& !ShubReadSocket(chan,shub->in_buffer+processed,n)) {
464-
shub->params->error_handler("Failed to read local socket",SHUB_RECOVERABLE_ERROR);
529+
if (chan >=0&& !ShubReadSocket(chan,shub->in_buffer+processed,n)) {
530+
charbuf[ERR_BUF_SIZE];
531+
sprintf(buf,"Failed to read local socket rc=%d, len=%d, errno=%d",rc,n,errno);
532+
shub->params->error_handler(buf,SHUB_RECOVERABLE_ERROR);
533+
//shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
465534
close_socket(shub,chan);
466535
if (hdr!=NULL) {/* if message header is not yet sent to the server... */
467536
/* ... then skip this message */

‎contrib/arbiter/sockhub/sockhub.h‎

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,18 @@
11
#ifndef__SOCKHUB_H__
22
#define__SOCKHUB_H__
33

4+
5+
#ifdef__linux__
6+
#defineUSE_EPOLL 1
7+
#endif
8+
9+
#ifdefUSE_EPOLL
10+
#include<sys/epoll.h>
11+
#defineMAX_EVENTS 1024
12+
#else
413
#include<sys/select.h>
14+
#endif
15+
516

617
typedefstruct {
718
unsignedintsize :24;/* size of message without header */
@@ -47,15 +58,20 @@ typedef struct
4758
{
4859
intoutput;
4960
intinput;
61+
#ifdefUSE_EPOLL
62+
intepollfd;
63+
#else
5064
intmax_fd;
5165
fd_setinset;
66+
#endif
5267
char*in_buffer;
5368
char*out_buffer;
5469
intin_buffer_used;
5570
intout_buffer_used;
5671
ShubParams*params;
5772
}Shub;
5873

74+
5975
intShubReadSocketEx(intsd,void*buf,intmin_size,intmax_size);
6076
intShubReadSocket(intsd,void*buf,intsize);
6177
intShubWriteSocket(intsd,voidconst*buf,intsize);
@@ -66,5 +82,3 @@ void ShubInitialize(Shub* shub, ShubParams* params);
6682
voidShubLoop(Shub*shub);
6783

6884
#endif
69-
70-
// vim: sts=4 ts=4 sw=4 expandtab

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp