Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commita390ab0

Browse files
committed
Implement Raft protocol without dynamic configuration.
0 parents  commita390ab0

File tree

6 files changed

+1498
-0
lines changed

6 files changed

+1498
-0
lines changed

‎Makefile‎

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#override CC := clang
2+
overrideCFLAGS += -Wfatal-errors -O0 -g
3+
overrideCPPFLAGS += -I. -Iinclude -DDEBUG
4+
overrideSERVER_LDFLAGS += -Llib -lraft -ljansson
5+
overrideCLIENT_LDFLAGS += -ljansson
6+
7+
AR = ar
8+
ARFLAGS = -cru
9+
10+
.PHONY: all clean bindir objdir libdir
11+
12+
all: lib/libraft.a bin/heart
13+
@echo Done.
14+
15+
lib/libraft.a: obj/raft.o obj/util.o | libdir objdir
16+
$(AR)$(ARFLAGS) lib/libraft.a obj/raft.o obj/util.o
17+
18+
bin/heart: obj/heart.o lib/libraft.a | bindir objdir
19+
$(CC) -o bin/heart$(CFLAGS)$(CPPFLAGS)\
20+
obj/heart.o$(SERVER_LDFLAGS)
21+
22+
obj/%.o: src/%.c | objdir
23+
$(CC)$(CFLAGS)$(CPPFLAGS) -c -o$@$<
24+
25+
obj/%.o: example/%.c | objdir
26+
$(CC)$(CFLAGS)$(CPPFLAGS) -c -o$@$<
27+
28+
bindir:
29+
mkdir -p bin
30+
31+
objdir:
32+
mkdir -p obj
33+
34+
libdir:
35+
mkdir -p lib
36+
37+
clean:
38+
rm -rfv bin obj lib

‎example/heart.c‎

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
#include<stdio.h>
2+
#include<stdlib.h>
3+
#include<string.h>
4+
#include<unistd.h>
5+
#include<assert.h>
6+
#include<time.h>
7+
#include<signal.h>
8+
#include<errno.h>
9+
#include<sys/wait.h>
10+
#include<sys/time.h>
11+
#include<stdbool.h>
12+
13+
#include<jansson.h>
14+
15+
#include"raft.h"
16+
#include"util.h"
17+
18+
staticvoidapplier(void*state,raft_update_tupdate,boolsnapshot) {
19+
json_error_terror;
20+
21+
json_t*patch=json_loadb(update.data,update.len,0,&error);
22+
if (!patch) {
23+
shout(
24+
"error parsing json at position %d: %s\n",
25+
error.column,
26+
error.text
27+
);
28+
}
29+
30+
if (snapshot) {
31+
json_object_clear(state);
32+
}
33+
34+
if (json_object_update(state,patch)) {
35+
shout("error updating state\n");
36+
}
37+
38+
json_decref(patch);
39+
40+
char*encoded=json_dumps(state,JSON_INDENT(4) |JSON_SORT_KEYS);
41+
if (encoded) {
42+
debug(
43+
"applied %s: new state is %s\n",
44+
snapshot ?"a snapshot" :"an update",
45+
encoded
46+
);
47+
}else {
48+
shout(
49+
"applied %s, but new state could not be encoded\n",
50+
snapshot ?"a snapshot" :"an update"
51+
);
52+
}
53+
free(encoded);
54+
}
55+
56+
staticraft_update_tsnapshooter(void*state) {
57+
json_error_terror;
58+
59+
raft_update_tshot;
60+
shot.data=json_dumps(state,JSON_SORT_KEYS);
61+
shot.len=strlen(shot.data);
62+
if (shot.data) {
63+
debug("snapshot taken: %.*s\n",shot.len,shot.data);
64+
}else {
65+
shout("failed to take a snapshot\n");
66+
}
67+
68+
returnshot;
69+
}
70+
71+
staticvoiddie(intsignum) {
72+
debug("dying\n");
73+
exit(signum);
74+
}
75+
76+
staticvoidusage(char*prog) {
77+
printf(
78+
"Usage: %s -i ID -r ID:HOST:PORT [-r ID:HOST:PORT ...] [-l LOGFILE]\n"
79+
" -l : Run as a daemon and write output to LOGFILE.\n",
80+
prog
81+
);
82+
}
83+
84+
raft_traft;
85+
86+
staticvoidmain_loop(char*host,intport) {
87+
mstimer_tt;
88+
mstimer_reset(&t);
89+
90+
// create a UDP socket for raft
91+
intr=raft_create_udp_socket(raft);
92+
if (r==NOBODY) {
93+
die(EXIT_FAILURE);
94+
}
95+
96+
#defineEMIT_EVERY_MS 1000
97+
intemit_ms=0;
98+
while (true) {
99+
intms;
100+
raft_msg_tm;
101+
intapplied;
102+
103+
ms=mstimer_reset(&t);
104+
105+
raft_tick(raft,ms);
106+
m=raft_recv_message(raft);
107+
if (m) {
108+
raft_handle_message(raft,m);
109+
}
110+
111+
if (raft_is_leader(raft)) {
112+
emit_ms+=ms;
113+
while (emit_ms>EMIT_EVERY_MS) {
114+
emit_ms-=EMIT_EVERY_MS;
115+
charbuf[1000];
116+
charkey='a'+rand() %5;
117+
intvalue=rand() %10000;
118+
sprintf(buf,"{\"key-%c\":%d}",key,value);
119+
shout("emit update: = %s\n",buf);
120+
raft_update_tupdate= {strlen(buf),buf,NULL};
121+
raft_emit(raft,update);
122+
}
123+
}
124+
}
125+
126+
close(r);
127+
}
128+
129+
intmain(intargc,char**argv) {
130+
char*logfilename=NULL;
131+
booldaemonize= false;
132+
133+
intmyid=NOBODY;
134+
intid;
135+
char*host;
136+
char*str;
137+
intport;
138+
intopt;
139+
140+
char*myhost=NULL;
141+
intmyport;
142+
143+
json_t*state=json_object();
144+
145+
raft_config_trc;
146+
rc.peernum_max=64;
147+
rc.heartbeat_ms=20;
148+
rc.election_ms_min=150;
149+
rc.election_ms_max=300;
150+
rc.log_len=10;
151+
rc.chunk_len=4;
152+
rc.msg_len_max=500;
153+
rc.applier=applier;
154+
rc.snapshooter=snapshooter;
155+
raft=raft_init(&rc);
156+
157+
intpeernum=0;
158+
while ((opt=getopt(argc,argv,"hi:r:l:"))!=-1) {
159+
switch (opt) {
160+
case'i':
161+
myid=atoi(optarg);
162+
break;
163+
case'r':
164+
if (myid==NOBODY) {
165+
usage(argv[0]);
166+
returnEXIT_FAILURE;
167+
}
168+
169+
str=strtok(optarg,":");
170+
if (str) {
171+
id=atoi(str);
172+
}else {
173+
usage(argv[0]);
174+
returnEXIT_FAILURE;
175+
}
176+
177+
host=strtok(NULL,":");
178+
179+
str=strtok(NULL,":");
180+
if (str) {
181+
port=atoi(str);
182+
}else {
183+
usage(argv[0]);
184+
returnEXIT_FAILURE;
185+
}
186+
187+
if (!raft_peer_up(raft,id,host,port,id==myid)) {
188+
usage(argv[0]);
189+
returnEXIT_FAILURE;
190+
}
191+
if (id==myid) {
192+
myhost=host;
193+
myport=port;
194+
}
195+
peernum++;
196+
break;
197+
case'l':
198+
logfilename=optarg;
199+
daemonize= true;
200+
break;
201+
case'h':
202+
usage(argv[0]);
203+
returnEXIT_SUCCESS;
204+
default:
205+
usage(argv[0]);
206+
returnEXIT_FAILURE;
207+
}
208+
}
209+
if (!myhost) {
210+
usage(argv[0]);
211+
returnEXIT_FAILURE;
212+
}
213+
214+
if (logfilename) {
215+
if (!freopen(logfilename,"a",stdout)) {
216+
// nowhere to report this failure
217+
returnEXIT_FAILURE;
218+
}
219+
if (!freopen(logfilename,"a",stderr)) {
220+
// nowhere to report this failure
221+
returnEXIT_FAILURE;
222+
}
223+
}
224+
225+
if (daemonize) {
226+
if (daemon(true, true)==-1) {
227+
shout("could not daemonize: %s\n",strerror(errno));
228+
returnEXIT_FAILURE;
229+
}
230+
}
231+
232+
signal(SIGTERM,die);
233+
signal(SIGINT,die);
234+
235+
main_loop(myhost,myport);
236+
237+
returnEXIT_SUCCESS;
238+
}

‎include/raft.h‎

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
#ifndefRAFT_H
2+
#defineRAFT_H
3+
4+
#defineNOBODY -1
5+
6+
typedefstructraft_data_t*raft_t;
7+
typedefstructraft_msg_data_t*raft_msg_t;
8+
9+
typedefstructraft_update_t {
10+
intlen;
11+
char*data;
12+
void*userdata;// use this to track which query caused this update
13+
}raft_update_t;
14+
15+
// --- Callbacks ---
16+
17+
// This should be a function that applies an 'update' to the state machine.
18+
// 'snapshot' is true if 'update' contains a snapshot. 'userdata' is the
19+
// userdata that raft was configured with.
20+
typedefvoid (*raft_applier_t)(void*userdata,raft_update_tupdate,boolsnapshot);
21+
22+
// This should be a function that makes a snapshot of the state machine. Used
23+
// for raft log compaction. 'userdata' is the userdata that raft was configured
24+
// with.
25+
typedefraft_update_t (*raft_snapshooter_t)(void*userdata);
26+
27+
// --- Configuration ---
28+
29+
typedefstructraft_config_t {
30+
intpeernum_max;
31+
32+
intheartbeat_ms;
33+
intelection_ms_min;
34+
intelection_ms_max;
35+
36+
intlog_len;
37+
38+
intchunk_len;
39+
intmsg_len_max;
40+
41+
void*userdata;// this will get passed to applier() and snapshooter()
42+
raft_applier_tapplier;
43+
raft_snapshooter_tsnapshooter;
44+
}raft_config_t;
45+
46+
// Initialize a raft instance. Returns NULL on failure.
47+
raft_traft_init(raft_config_t*config);
48+
49+
// Add a peer named 'id'. 'self' should be true, if that peer is this instance.
50+
// Only one peer should have 'self' == true.
51+
boolraft_peer_up(raft_tr,intid,char*host,intport,boolself);
52+
53+
// Remove a previously added peer named 'id'.
54+
boolraft_peer_down(raft_tr,intid);
55+
56+
// --- Log Actions ---
57+
58+
// Emit an 'update'. Returns true if emitted successfully.
59+
boolraft_emit(raft_tr,raft_update_tupdate);
60+
61+
// --- Control ---
62+
63+
// Note, that UDP socket and raft messages are exposed to the user. This gives
64+
// the user the opportunity to incorporate the socket with other sockets in
65+
// select() or poll(). Thus, the messages will be processed as soon as they
66+
// come, not as soon as we call raft_tick().
67+
68+
// Perform various raft logic tied to time. Call this function once in a while
69+
// and pass the elapsed 'msec' from the previous call. This function will only
70+
// trigger time-related events, and will not receive and process messages (see
71+
// the note above).
72+
voidraft_tick(raft_tr,intmsec);
73+
74+
// Receive a raft message. Returns NULL if no message available.
75+
raft_msg_traft_recv_message(raft_tr);
76+
77+
// Process the message.
78+
voidraft_handle_message(raft_tr,raft_msg_tm);
79+
80+
// Create the raft socket.
81+
intraft_create_udp_socket(raft_tr);
82+
83+
// Returns true if this peer thinks it is the leader.
84+
boolraft_is_leader(raft_tr);
85+
86+
// Returns the id of the current leader, or NOBODY if no leader.
87+
intraft_get_leader(raft_tr);
88+
89+
#endif

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp