Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit18eec6f

Browse files
committed
Add a method to check if a log entry has been applied on a particular node.
1 parent252c0f6 commit18eec6f

File tree

2 files changed

+23
-7
lines changed

2 files changed

+23
-7
lines changed

‎include/raft.h‎

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,12 @@ bool raft_peer_down(raft_t r, int id);
5555

5656
// --- Log Actions ---
5757

58-
// Emit an 'update'. Returns true if emitted successfully.
59-
boolraft_emit(raft_tr,raft_update_tupdate);
58+
// Emit an 'update'. Returns the log index if emitted successfully, or -1
59+
// otherwise.
60+
intraft_emit(raft_tr,raft_update_tupdate);
61+
62+
// Checks whether an entry at 'index' has been applied by the peer named 'id'.
63+
boolraft_applied(raft_tt,intid,intindex);
6064

6165
// --- Control ---
6266

‎src/raft.c‎

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ typedef enum roles {
1919

2020
#defineUDP_SAFE_SIZE 508
2121

22-
// raft module does not care what you mean by action and argument
2322
typedefstructraft_entry_t {
2423
intterm;
2524
boolsnapshot;
@@ -46,6 +45,7 @@ typedef struct raft_peer_t {
4645

4746
intseqno;// the rpc sequence number
4847
raft_progress_tacked;// the number of entries:bytes acked by this peer
48+
intapplied;// the number of entries applied by this peer
4949

5050
char*host;
5151
intport;
@@ -111,6 +111,7 @@ typedef struct raft_msg_done_t {
111111
raft_msg_data_tmsg;
112112
intterm;// the term of the appended entry
113113
raft_progress_tprogress;// the progress after appending
114+
intapplied;
114115
boolsuccess;
115116
// the message is considered acked when the last chunk appends successfully
116117
}raft_msg_done_t;
@@ -177,6 +178,7 @@ static void raft_peer_init(raft_peer_t *p) {
177178
p->up= false;
178179
p->seqno=0;
179180
reset_progress(&p->acked);
181+
p->applied=0;
180182

181183
p->host=DEFAULT_LISTENHOST;
182184
p->port=DEFAULT_LISTENPORT;
@@ -633,7 +635,7 @@ static int raft_compact(raft_t raft) {
633635
returncompacted;
634636
}
635637

636-
boolraft_emit(raft_tr,raft_update_tupdate) {
638+
intraft_emit(raft_tr,raft_update_tupdate) {
637639
assert(r->leader==r->me);
638640
assert(r->role==LEADER);
639641

@@ -646,11 +648,12 @@ bool raft_emit(raft_t r, raft_update_t update) {
646648
"cannot emit new entries, the log is"
647649
" full and cannot be compacted\n"
648650
);
649-
returnfalse;
651+
return-1;
650652
}
651653
}
652654

653-
raft_entry_t*e=&RAFT_LOG(r,r->log.first+r->log.size);
655+
intnewindex=RAFT_LOG_LAST_INDEX(r)+1;
656+
raft_entry_t*e=&RAFT_LOG(r,newindex);
654657
e->term=r->term;
655658
assert(e->update.len==0);
656659
assert(e->update.data==NULL);
@@ -661,7 +664,13 @@ bool raft_emit(raft_t r, raft_update_t update) {
661664

662665
raft_beat(r,NOBODY);
663666
raft_reset_timer(r);
664-
return true;
667+
returnnewindex;
668+
}
669+
670+
boolraft_applied(raft_tr,intid,intindex) {
671+
raft_peer_t*p=r->peers+id;
672+
if (!p->up)return false;
673+
returnp->applied >=index;
665674
}
666675

667676
staticboolraft_restore(raft_tr,intprevindex,raft_entry_t*e) {
@@ -836,6 +845,7 @@ static void raft_handle_update(raft_t r, raft_msg_update_t *m) {
836845
}else {
837846
reply.term=-1;
838847
}
848+
reply.applied=r->log.applied;
839849

840850
reply.success= true;
841851
finish:
@@ -863,6 +873,8 @@ static void raft_handle_done(raft_t r, raft_msg_done_t *m) {
863873
return;
864874
}
865875

876+
peer->applied=m->applied;
877+
866878
if (m->success) {
867879
debug("[from %d] ============= done\n",sender);
868880
peer->acked=m->progress;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp