Expand Up @@ -227,6 +227,7 @@ type raft struct { maybeLeader bool // The group had a preferred leader. And is maybe already acting as leader prior to scale up. paused bool // Whether or not applies are paused observer bool // The node is observing, i.e. not able to become leader repairing bool // The node is being repaired, either due to being new, wiped or corrupted. initializing bool // The node is new, and "empty log" checks can be temporarily relaxed. scaleUp bool // The node is part of a scale up, puts us in observer mode until the log contains data. } Expand Down Expand Up @@ -476,6 +477,9 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel } truncateAndErr := func(index uint64) { // Mark ourselves as requiring our log to be repaired, this will ensure we can't become // leader as long as at least a single server has a non-corrupt log. n.markRepairing() if err := n.wal.Truncate(index); err != nil { n.setWriteErr(err) } Expand All @@ -488,6 +492,12 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel n.wal.FastState(&state) n.bytes = state.Bytes // If we've been restarted halfway through repairing our log, // we need to mark ourselves as repairing again. if n.needRepairing() { n.repairing = true } if state.Msgs > 0 { n.debug("Replaying state of %d entries", state.Msgs) if first, err := n.loadFirstEntry(); err == nil { Expand Down Expand Up @@ -557,14 +567,18 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel n.resetElectionTimeout() n.llqrt = time.Now() // If our log is empty, and we're initializing, relax the "empty log" checks temporarily. if !cfg.Recovering && n.pindex == 0 { n.initializing = true // If we're scaling up and our log is empty, must put ourselves into observer // and wait for data from the leader. if !cfg.Observer && cfg.ScaleUp { n.scaleUp = true n.setObserverLocked(true, extUndetermined) // If our log is empty, we might need to repair it if this node isn't new. if n.pindex == 0 { n.markRepairing() // If we're new and meant to be initializing, relax the "empty log" checks temporarily. if !cfg.Recovering { n.initializing = true // If we're scaling up, must put ourselves into observer and wait for data from the leader. // This prevents us from starting new elections ourselves. if !cfg.Observer && cfg.ScaleUp { n.scaleUp = true n.setObserverLocked(true, extUndetermined) } } } n.Unlock() Expand Down Expand Up @@ -1760,7 +1774,7 @@ func (n *raft) CampaignImmediately() error { n.Lock() defer n.Unlock() n.maybeLeader = true n.resetInitializing () n.resetRepairing () return n.campaign(minCampaignTimeout / 2) } Expand Down Expand Up @@ -3310,12 +3324,16 @@ func (n *raft) runAsCandidate() { n.RLock() nterm := n.term csz := n.csz repairing, initializing, pindex, group := n.repairing, n.initializing, n.pindex, n.group n.RUnlock() if vresp.granted && nterm == vresp.term { // only track peers that would be our followers n.trackPeer(vresp.peer) if !vresp.empty { // A vote only counts toward a majority if it's a non-empty vote from an intact server, // and we're not repairing ourselves either (a more up-to-date server could exist). if !vresp.empty && (!repairing || initializing) { votes[vresp.peer] = struct{}{} } else { emptyVotes[vresp.peer] = struct{}{} Expand All @@ -3328,7 +3346,20 @@ func (n *raft) runAsCandidate() { // Become LEADER if we've got voted in by ALL servers. // We couldn't get quorum based on just our normal votes. // But, we have heard from the full cluster, and some servers came up empty. // We know for sure we have the most up-to-date log. // We know for sure we have the most up-to-date log, but that log could also be empty. if group == defaultMetaGroupName || initializing || !repairing { n.warn("Self got voted leader by all servers, restarting WAL with %d entries", pindex) } else { // If we are here, this means all logs required reparation (so none were complete), // and we were the one with the most up-to-date log. We have either lost "all" or "some" data, // but instead of halting, we prefer to become available again. All servers will agree on the // new state of the log. desc := "the log was fully lost" if pindex > 0 { desc = "the log was partially reset" } n.warn("Self got voted leader by all servers, restarting WAL with %d entries, %s", pindex, desc) } n.switchToLeader() return } Expand Down Expand Up @@ -3429,6 +3460,7 @@ func (n *raft) truncateWAL(term, index uint64) { n.debug("Truncating and repairing WAL to Term %d Index %d", term, index) if term == 0 && index == 0 { n.markRepairing() if n.commit > 0 { n.warn("Resetting WAL state") } else { Expand Down Expand Up @@ -3791,7 +3823,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.Unlock() return } n.resetInitializing() // Now send snapshot to upper levels. Only send the snapshot, not the peerstate entry. n.apply.push(newCommittedEntry(n.commit, ae.entries[:1])) Expand Down Expand Up @@ -3822,7 +3853,6 @@ CONTINUE: return } n.cachePendingEntry(ae) n.resetInitializing() } else { // This is a replay on startup so just take the appendEntry version. n.pterm = ae.term Expand Down Expand Up @@ -3888,6 +3918,9 @@ CONTINUE: // The only way for the leader to receive "success" MUST be through this path. var ar *appendEntryResponse if sub != nil && isNew { // If we were marked as "repairing", reset that now since we're caught up // and aligned to acknowledge new append entries. n.resetRepairing() ar = newAppendEntryResponse(n.pterm, n.pindex, n.id, true) } n.Unlock() Expand All @@ -3899,17 +3932,69 @@ CONTINUE: } } // resetInitializing resets the notion of initializing. // markRepairing marks this server as requiring its log to be repaired. // Lock should be held. func (n *raft) markRepairing() { n.repairing = true n.writeRepairing() } // resetRepairing resets the notion of our log needing to be repaired, for example, // due to it being empty/incomplete/initializing/scaling up. // If we were scaling up, also leaves observer mode. // Lock should be held. func (n *raft) resetInitializing() { func (n *raft) resetRepairing() { if !n.repairing { return } n.repairing = false os.Remove(filepath.Join(n.sd, repairFile)) n.initializing = false if n.scaleUp { n.scaleUp = false n.setObserverLocked(false, extUndetermined) } } const repairFile = "repair.idx" // needRepairing will return whether the log needs to be repaired. // If the file lookup succeeds but the file itself doesn't exist, we don't repair. // Lock should be held. func (n *raft) needRepairing() bool { <-dios _, err := os.ReadFile(filepath.Join(n.sd, repairFile)) dios <- struct{}{} // Mark repairing if the file exists, or we can't access it. return err == nil || !os.IsNotExist(err) } // writeRepairing will mark us as repairing on disk. Either we have an entirely empty log, // or our log is partially corrupt, and it needs to be repaired. // Lock should be held. func (n *raft) writeRepairing() { // No need to write to disk if the log is memory-based, // it will always come up empty after a restart. if _, ok := n.wal.(*memStore); ok { return } if err := writeRepairing(n.sd); err != nil && !n.isClosed() { n.setWriteErrLocked(err) n.warn("Error writing repair file for %q: %v", n.group, err) } } // Writes out our repair file outside of a specific raft context. func writeRepairing(sd string) error { rf := filepath.Join(sd, repairFile) if _, err := os.Stat(rf); err != nil && !os.IsNotExist(err) { return err } return writeFileWithSync(rf, nil, defaultFilePerms) } // processPeerState is called when a peer state entry is received // over the wire or when we're updating known peers. // Lock should be held. Expand Down Expand Up @@ -4410,7 +4495,7 @@ func (n *raft) processVoteRequest(vr *voteRequest) error { n.Lock() vresp := &voteResponse{n.term, n.id, false, n.pindex == 0 } vresp := &voteResponse{n.term, n.id, false, n.repairing } defer n.debug("Sending a voteResponse %+v -> %q", vresp, vr.reply) // Ignore if we are newer. This is important so that we don't accidentally process Expand Down Expand Up @@ -4648,6 +4733,7 @@ func (n *raft) switchToLeader() { n.lxfer = false n.updateLeader(n.id) n.switchState(Leader) n.resetRepairing() // To send out our initial peer state. // In our implementation this is equivalent to sending a NOOP-entry upon becoming leader. Expand Down