Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf77065d

Browse files
committed
Implement leader stepdown when quorum lost.
1 parent1ffbabe commitf77065d

File tree

1 file changed

+64
-28
lines changed

1 file changed

+64
-28
lines changed

‎src/raft.c‎

Lines changed: 64 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ typedef struct raft_peer_t {
5050
char*host;
5151
intport;
5252
structsockaddr_inaddr;
53+
54+
intsilent_ms;// how long was this peer silent
5355
}raft_peer_t;
5456

5557
typedefstructraft_data_t {
@@ -178,6 +180,7 @@ static void raft_peer_init(raft_peer_t *p) {
178180

179181
p->host=DEFAULT_LISTENHOST;
180182
p->port=DEFAULT_LISTENPORT;
183+
p->silent_ms=0;
181184
}
182185

183186
staticvoidraft_entry_init(raft_entry_t*e) {
@@ -549,6 +552,22 @@ static void raft_refresh_acked(raft_t r) {
549552
}
550553
}
551554

555+
staticintraft_increase_silent_time(raft_tr,intms) {
556+
intrecent_peers=1;// count myself as recent
557+
558+
for (inti=0;i<r->config.peernum_max;i++) {
559+
if (!r->peers[i].up)continue;
560+
if (i==r->me)continue;
561+
562+
r->peers[i].silent_ms+=ms;
563+
if (r->peers[i].silent_ms<r->config.election_ms_max) {
564+
recent_peers++;
565+
}
566+
}
567+
568+
returnrecent_peers;
569+
}
570+
552571
voidraft_tick(raft_tr,intmsec) {
553572
r->timer-=msec;
554573
if (r->timer<0) {
@@ -578,6 +597,13 @@ void raft_tick(raft_t r, int msec) {
578597
raft_reset_timer(r);
579598
}
580599
raft_refresh_acked(r);
600+
601+
intrecent_peers=raft_increase_silent_time(r,msec);
602+
if ((r->role==LEADER)&& (recent_peers*2 <=r->peernum)) {
603+
shout("lost quorum, demoting\n");
604+
r->leader=NOBODY;
605+
r->role=FOLLOWER;
606+
}
581607
}
582608

583609
staticintraft_compact(raft_traft) {
@@ -757,6 +783,7 @@ static void raft_handle_update(raft_t r, raft_msg_update_t *m) {
757783
r->leader=sender;
758784
}
759785

786+
r->peers[sender].silent_ms=0;
760787
raft_reset_timer(r);
761788

762789
if (m->acked>r->log.acked) {
@@ -768,38 +795,37 @@ static void raft_handle_update(raft_t r, raft_msg_update_t *m) {
768795
p->acked.entries=r->log.acked;
769796
}
770797

771-
if (m->empty) {
772-
// just a heartbeat
773-
return;
774-
}
775-
776-
if (m->offset>e->bytes) {
777-
shout("unexpectedly large offset %d for a chunk, ignoring to avoid gaps\n",m->offset);
778-
gotofinish;
779-
}
798+
if (!m->empty) {
799+
if (m->offset>e->bytes) {
800+
shout("unexpectedly large offset %d for a chunk, ignoring to avoid gaps\n",m->offset);
801+
gotofinish;
802+
}
780803

781-
u->len=m->totallen;
782-
u->data=realloc(u->data,m->totallen);
804+
u->len=m->totallen;
805+
u->data=realloc(u->data,m->totallen);
783806

784-
memcpy(u->data+m->offset,m->data,m->len);
785-
e->term=m->term;
786-
e->bytes=m->offset+m->len;
787-
assert(e->bytes <=u->len);
807+
memcpy(u->data+m->offset,m->data,m->len);
808+
e->term=m->term;
809+
e->bytes=m->offset+m->len;
810+
assert(e->bytes <=u->len);
788811

789-
e->snapshot=m->snapshot;
812+
e->snapshot=m->snapshot;
790813

791-
if (e->bytes==u->len) {
792-
if (m->snapshot) {
793-
if (!raft_restore(r,m->previndex,e)) {
794-
shout("restore from snapshot failed\n");
795-
gotofinish;
796-
}
797-
}else {
798-
if (!raft_append(r,m->previndex,m->prevterm,e)) {
799-
debug("log_append failed\n");
800-
gotofinish;
814+
if (e->bytes==u->len) {
815+
if (m->snapshot) {
816+
if (!raft_restore(r,m->previndex,e)) {
817+
shout("restore from snapshot failed\n");
818+
gotofinish;
819+
}
820+
}else {
821+
if (!raft_append(r,m->previndex,m->prevterm,e)) {
822+
debug("log_append failed\n");
823+
gotofinish;
824+
}
801825
}
802826
}
827+
}else {
828+
// just a heartbeat
803829
}
804830

805831
reply.progress.entries=RAFT_LOG_LAST_INDEX(r)+1;
@@ -839,6 +865,7 @@ static void raft_handle_done(raft_t r, raft_msg_done_t *m) {
839865
if (m->success) {
840866
debug("[from %d] ============= done\n",sender);
841867
peer->acked=m->progress;
868+
peer->silent_ms=0;
842869
}else {
843870
debug("[from %d] ============= refused\n",sender);
844871
if (peer->acked.entries>0) {
@@ -872,7 +899,7 @@ static void raft_handle_claim(raft_t r, raft_msg_claim_t *m) {
872899

873900
if (m->msg.term >=r->term) {
874901
if (r->role!=FOLLOWER) {
875-
shout("demoting myself\n");
902+
shout("There is another candidate,demoting myself\n");
876903
}
877904
if (m->msg.term>r->term) {
878905
raft_set_term(r,m->term);
@@ -913,6 +940,14 @@ static void raft_reset_bytes_acked(raft_t r) {
913940
}
914941
}
915942

943+
staticvoidraft_reset_silent_time(raft_tr,intid) {
944+
for (inti=0;i<r->config.peernum_max;i++) {
945+
if ((i==id)|| (id==NOBODY)) {
946+
r->peers[i].silent_ms=0;
947+
}
948+
}
949+
}
950+
916951
staticvoidraft_handle_vote(raft_tr,raft_msg_vote_t*m) {
917952
intsender=m->msg.from;
918953
raft_peer_t*peer=r->peers+sender;
@@ -931,14 +966,15 @@ static void raft_handle_vote(raft_t r, raft_msg_vote_t *m) {
931966
r->role=LEADER;
932967
r->leader=r->me;
933968
raft_reset_bytes_acked(r);
969+
raft_reset_silent_time(r,NOBODY);
934970
raft_reset_timer(r);
935971
}
936972
}
937973

938974
voidraft_handle_message(raft_tr,raft_msg_tm) {
939975
if (m->term>r->term) {
940976
if (r->role!=FOLLOWER) {
941-
shout("demoting myself\n");
977+
shout("I have an old term,demoting myself\n");
942978
}
943979
raft_set_term(r,m->term);
944980
r->role=FOLLOWER;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp