Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

NRG: WAL requires repair after truncation#7587

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Draft
MauriceVanVeen wants to merge5 commits intomain
base:main
Choose a base branch
Loading
frommaurice/nrg-truncate-recovery
Draft
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 102 additions & 16 deletionsserver/raft.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -227,6 +227,7 @@ type raft struct {
maybeLeader bool // The group had a preferred leader. And is maybe already acting as leader prior to scale up.
paused bool // Whether or not applies are paused
observer bool // The node is observing, i.e. not able to become leader
repairing bool // The node is being repaired, either due to being new, wiped or corrupted.
initializing bool // The node is new, and "empty log" checks can be temporarily relaxed.
scaleUp bool // The node is part of a scale up, puts us in observer mode until the log contains data.
}
Expand DownExpand Up@@ -476,6 +477,9 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
}

truncateAndErr := func(index uint64) {
// Mark ourselves as requiring our log to be repaired, this will ensure we can't become
// leader as long as at least a single server has a non-corrupt log.
n.markRepairing()
if err := n.wal.Truncate(index); err != nil {
n.setWriteErr(err)
}
Expand All@@ -488,6 +492,12 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
n.wal.FastState(&state)
n.bytes = state.Bytes

// If we've been restarted halfway through repairing our log,
// we need to mark ourselves as repairing again.
if n.needRepairing() {
n.repairing = true
}

if state.Msgs > 0 {
n.debug("Replaying state of %d entries", state.Msgs)
if first, err := n.loadFirstEntry(); err == nil {
Expand DownExpand Up@@ -557,14 +567,18 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
n.resetElectionTimeout()
n.llqrt = time.Now()

// If our log is empty, and we're initializing, relax the "empty log" checks temporarily.
if !cfg.Recovering && n.pindex == 0 {
n.initializing = true
// If we're scaling up and our log is empty, must put ourselves into observer
// and wait for data from the leader.
if !cfg.Observer && cfg.ScaleUp {
n.scaleUp = true
n.setObserverLocked(true, extUndetermined)
// If our log is empty, we might need to repair it if this node isn't new.
if n.pindex == 0 {
n.markRepairing()
// If we're new and meant to be initializing, relax the "empty log" checks temporarily.
if !cfg.Recovering {
n.initializing = true
// If we're scaling up, must put ourselves into observer and wait for data from the leader.
// This prevents us from starting new elections ourselves.
if !cfg.Observer && cfg.ScaleUp {
n.scaleUp = true
n.setObserverLocked(true, extUndetermined)
}
}
}
n.Unlock()
Expand DownExpand Up@@ -1760,7 +1774,7 @@ func (n *raft) CampaignImmediately() error {
n.Lock()
defer n.Unlock()
n.maybeLeader = true
n.resetInitializing()
n.resetRepairing()
return n.campaign(minCampaignTimeout / 2)
}

Expand DownExpand Up@@ -3310,12 +3324,16 @@ func (n *raft) runAsCandidate() {
n.RLock()
nterm := n.term
csz := n.csz
repairing, initializing, pindex, group := n.repairing, n.initializing, n.pindex, n.group
n.RUnlock()

if vresp.granted && nterm == vresp.term {
// only track peers that would be our followers
n.trackPeer(vresp.peer)
if !vresp.empty {

// A vote only counts toward a majority if it's a non-empty vote from an intact server,
// and we're not repairing ourselves either (a more up-to-date server could exist).
if !vresp.empty && (!repairing || initializing) {
votes[vresp.peer] = struct{}{}
} else {
emptyVotes[vresp.peer] = struct{}{}
Expand All@@ -3328,7 +3346,20 @@ func (n *raft) runAsCandidate() {
// Become LEADER if we've got voted in by ALL servers.
// We couldn't get quorum based on just our normal votes.
// But, we have heard from the full cluster, and some servers came up empty.
// We know for sure we have the most up-to-date log.
// We know for sure we have the most up-to-date log, but that log could also be empty.
if group == defaultMetaGroupName || initializing || !repairing {
n.warn("Self got voted leader by all servers, restarting WAL with %d entries", pindex)
} else {
// If we are here, this means all logs required reparation (so none were complete),
// and we were the one with the most up-to-date log. We have either lost "all" or "some" data,
// but instead of halting, we prefer to become available again. All servers will agree on the
// new state of the log.
desc := "the log was fully lost"
if pindex > 0 {
desc = "the log was partially reset"
}
n.warn("Self got voted leader by all servers, restarting WAL with %d entries, %s", pindex, desc)
}
n.switchToLeader()
return
}
Expand DownExpand Up@@ -3429,6 +3460,7 @@ func (n *raft) truncateWAL(term, index uint64) {
n.debug("Truncating and repairing WAL to Term %d Index %d", term, index)

if term == 0 && index == 0 {
n.markRepairing()
if n.commit > 0 {
n.warn("Resetting WAL state")
} else {
Expand DownExpand Up@@ -3791,7 +3823,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.Unlock()
return
}
n.resetInitializing()

// Now send snapshot to upper levels. Only send the snapshot, not the peerstate entry.
n.apply.push(newCommittedEntry(n.commit, ae.entries[:1]))
Expand DownExpand Up@@ -3822,7 +3853,6 @@ CONTINUE:
return
}
n.cachePendingEntry(ae)
n.resetInitializing()
} else {
// This is a replay on startup so just take the appendEntry version.
n.pterm = ae.term
Expand DownExpand Up@@ -3888,6 +3918,9 @@ CONTINUE:
// The only way for the leader to receive "success" MUST be through this path.
var ar *appendEntryResponse
if sub != nil && isNew {
// If we were marked as "repairing", reset that now since we're caught up
// and aligned to acknowledge new append entries.
n.resetRepairing()
ar = newAppendEntryResponse(n.pterm, n.pindex, n.id, true)
}
n.Unlock()
Expand All@@ -3899,17 +3932,69 @@ CONTINUE:
}
}

// resetInitializing resets the notion of initializing.
// markRepairing marks this server as requiring its log to be repaired.
// Lock should be held.
func (n *raft) markRepairing() {
n.repairing = true
n.writeRepairing()
}

// resetRepairing resets the notion of our log needing to be repaired, for example,
// due to it being empty/incomplete/initializing/scaling up.
// If we were scaling up, also leaves observer mode.
// Lock should be held.
func (n *raft) resetInitializing() {
func (n *raft) resetRepairing() {
if !n.repairing {
return
}
n.repairing = false
os.Remove(filepath.Join(n.sd, repairFile))

n.initializing = false
if n.scaleUp {
n.scaleUp = false
n.setObserverLocked(false, extUndetermined)
}
}

const repairFile = "repair.idx"

// needRepairing will return whether the log needs to be repaired.
// If the file lookup succeeds but the file itself doesn't exist, we don't repair.
// Lock should be held.
func (n *raft) needRepairing() bool {
<-dios
_, err := os.ReadFile(filepath.Join(n.sd, repairFile))
dios <- struct{}{}

// Mark repairing if the file exists, or we can't access it.
return err == nil || !os.IsNotExist(err)
}

// writeRepairing will mark us as repairing on disk. Either we have an entirely empty log,
// or our log is partially corrupt, and it needs to be repaired.
// Lock should be held.
func (n *raft) writeRepairing() {
// No need to write to disk if the log is memory-based,
// it will always come up empty after a restart.
if _, ok := n.wal.(*memStore); ok {
return
}
if err := writeRepairing(n.sd); err != nil && !n.isClosed() {
n.setWriteErrLocked(err)
n.warn("Error writing repair file for %q: %v", n.group, err)
}
}

// Writes out our repair file outside of a specific raft context.
func writeRepairing(sd string) error {
rf := filepath.Join(sd, repairFile)
if _, err := os.Stat(rf); err != nil && !os.IsNotExist(err) {
return err
}
return writeFileWithSync(rf, nil, defaultFilePerms)
}

// processPeerState is called when a peer state entry is received
// over the wire or when we're updating known peers.
// Lock should be held.
Expand DownExpand Up@@ -4410,7 +4495,7 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {

n.Lock()

vresp := &voteResponse{n.term, n.id, false, n.pindex == 0}
vresp := &voteResponse{n.term, n.id, false, n.repairing}
defer n.debug("Sending a voteResponse %+v -> %q", vresp, vr.reply)

// Ignore if we are newer. This is important so that we don't accidentally process
Expand DownExpand Up@@ -4648,6 +4733,7 @@ func (n *raft) switchToLeader() {
n.lxfer = false
n.updateLeader(n.id)
n.switchState(Leader)
n.resetRepairing()

// To send out our initial peer state.
// In our implementation this is equivalent to sending a NOOP-entry upon becoming leader.
Expand Down
5 changes: 3 additions & 2 deletionsserver/raft_helpers_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -148,12 +148,13 @@ func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf s
for _, s := range servers {
var cfg *RaftConfig
if st == FileStorage {
sd := c.t.TempDir()
fs, err := newFileStore(
FileStoreConfig{StoreDir:c.t.TempDir(), BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncInterval: 5 * time.Minute},
FileStoreConfig{StoreDir:sd, BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncInterval: 5 * time.Minute},
StreamConfig{Name: name, Storage: FileStorage},
)
require_NoError(c.t, err)
cfg = &RaftConfig{Name: name, Store:c.t.TempDir(), Log: fs}
cfg = &RaftConfig{Name: name, Store:sd, Log: fs}
} else {
ms, err := newMemStore(&StreamConfig{Name: name, Storage: MemoryStorage})
require_NoError(c.t, err)
Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp