Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4c98dec

Browse files
authored
chore: add backed reader, writer and pipe implementation (#19147)
Relates to:#18101This PR introduces a new `backedpipe` package that provides reliablebidirectional byte streams over unreliable network connections. Theimplementation includes:- `BackedPipe`: Orchestrates a reader and writer to provide transparentreconnection and data replay- `BackedReader`: Handles reading with automatic reconnection, blockingreads when disconnected- `BackedWriter`: Maintains a ring buffer of recent writes for replayduring reconnection- `RingBuffer`: Efficient circular buffer implementation for storingdataThe package enables resilient connections by tracking sequence numbersand replaying missed data after reconnection. It handles connectionfailures gracefully, automatically reconnecting and resuming datatransfer from the appropriate point.
1 parente53bc24 commit4c98dec

File tree

8 files changed

+3737
-0
lines changed

8 files changed

+3737
-0
lines changed
Lines changed: 350 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
package backedpipe
2+
3+
import (
4+
"context"
5+
"io"
6+
"sync"
7+
8+
"golang.org/x/sync/errgroup"
9+
"golang.org/x/sync/singleflight"
10+
"golang.org/x/xerrors"
11+
)
12+
13+
var (
14+
ErrPipeClosed=xerrors.New("pipe is closed")
15+
ErrPipeAlreadyConnected=xerrors.New("pipe is already connected")
16+
ErrReconnectionInProgress=xerrors.New("reconnection already in progress")
17+
ErrReconnectFailed=xerrors.New("reconnect failed")
18+
ErrInvalidSequenceNumber=xerrors.New("remote sequence number exceeds local sequence")
19+
ErrReconnectWriterFailed=xerrors.New("reconnect writer failed")
20+
)
21+
22+
// connectionState represents the current state of the BackedPipe connection.
23+
typeconnectionStateint
24+
25+
const (
26+
// connected indicates the pipe is connected and operational.
27+
connectedconnectionState=iota
28+
// disconnected indicates the pipe is not connected but not closed.
29+
disconnected
30+
// reconnecting indicates a reconnection attempt is in progress.
31+
reconnecting
32+
// closed indicates the pipe is permanently closed.
33+
closed
34+
)
35+
36+
// ErrorEvent represents an error from a reader or writer with connection generation info.
37+
typeErrorEventstruct {
38+
Errerror
39+
Componentstring// "reader" or "writer"
40+
Generationuint64// connection generation when error occurred
41+
}
42+
43+
const (
44+
// Default buffer capacity used by the writer - 64MB
45+
DefaultBufferSize=64*1024*1024
46+
)
47+
48+
// Reconnector is an interface for establishing connections when the BackedPipe needs to reconnect.
49+
// Implementations should:
50+
// 1. Establish a new connection to the remote side
51+
// 2. Exchange sequence numbers with the remote side
52+
// 3. Return the new connection and the remote's reader sequence number
53+
//
54+
// The readerSeqNum parameter is the local reader's current sequence number
55+
// (total bytes successfully read from the remote). This must be sent to the
56+
// remote so it can replay its data to us starting from this number.
57+
//
58+
// The returned remoteReaderSeqNum should be the remote side's reader sequence
59+
// number (how many bytes of our outbound data it has successfully read). This
60+
// informs our writer where to resume (i.e., which bytes to replay to the remote).
61+
typeReconnectorinterface {
62+
Reconnect(ctx context.Context,readerSeqNumuint64) (conn io.ReadWriteCloser,remoteReaderSeqNumuint64,errerror)
63+
}
64+
65+
// BackedPipe provides a reliable bidirectional byte stream over unreliable network connections.
66+
// It orchestrates a BackedReader and BackedWriter to provide transparent reconnection
67+
// and data replay capabilities.
68+
typeBackedPipestruct {
69+
ctx context.Context
70+
cancel context.CancelFunc
71+
mu sync.RWMutex
72+
reader*BackedReader
73+
writer*BackedWriter
74+
reconnectorReconnector
75+
conn io.ReadWriteCloser
76+
77+
// State machine
78+
stateconnectionState
79+
connGenuint64// Increments on each successful reconnection
80+
81+
// Unified error handling with generation filtering
82+
errChanchanErrorEvent
83+
84+
// singleflight group to dedupe concurrent ForceReconnect calls
85+
sf singleflight.Group
86+
87+
// Track first error per generation to avoid duplicate reconnections
88+
lastErrorGenuint64
89+
}
90+
91+
// NewBackedPipe creates a new BackedPipe with default options and the specified reconnector.
92+
// The pipe starts disconnected and must be connected using Connect().
93+
funcNewBackedPipe(ctx context.Context,reconnectorReconnector)*BackedPipe {
94+
pipeCtx,cancel:=context.WithCancel(ctx)
95+
96+
errChan:=make(chanErrorEvent,1)
97+
98+
bp:=&BackedPipe{
99+
ctx:pipeCtx,
100+
cancel:cancel,
101+
reconnector:reconnector,
102+
state:disconnected,
103+
connGen:0,// Start with generation 0
104+
errChan:errChan,
105+
}
106+
107+
// Create reader and writer with typed error channel for generation-aware error reporting
108+
bp.reader=NewBackedReader(errChan)
109+
bp.writer=NewBackedWriter(DefaultBufferSize,errChan)
110+
111+
// Start error handler goroutine
112+
gobp.handleErrors()
113+
114+
returnbp
115+
}
116+
117+
// Connect establishes the initial connection using the reconnect function.
118+
func (bp*BackedPipe)Connect()error {
119+
bp.mu.Lock()
120+
deferbp.mu.Unlock()
121+
122+
ifbp.state==closed {
123+
returnErrPipeClosed
124+
}
125+
126+
ifbp.state==connected {
127+
returnErrPipeAlreadyConnected
128+
}
129+
130+
// Use internal context for the actual reconnect operation to ensure
131+
// Close() reliably cancels any in-flight attempt.
132+
returnbp.reconnectLocked()
133+
}
134+
135+
// Read implements io.Reader by delegating to the BackedReader.
136+
func (bp*BackedPipe)Read(p []byte) (int,error) {
137+
returnbp.reader.Read(p)
138+
}
139+
140+
// Write implements io.Writer by delegating to the BackedWriter.
141+
func (bp*BackedPipe)Write(p []byte) (int,error) {
142+
bp.mu.RLock()
143+
writer:=bp.writer
144+
state:=bp.state
145+
bp.mu.RUnlock()
146+
147+
ifstate==closed {
148+
return0,io.EOF
149+
}
150+
151+
returnwriter.Write(p)
152+
}
153+
154+
// Close closes the pipe and all underlying connections.
155+
func (bp*BackedPipe)Close()error {
156+
bp.mu.Lock()
157+
deferbp.mu.Unlock()
158+
159+
ifbp.state==closed {
160+
returnnil
161+
}
162+
163+
bp.state=closed
164+
bp.cancel()// Cancel main context
165+
166+
// Close all components in parallel to avoid deadlocks
167+
//
168+
// IMPORTANT: The connection must be closed first to unblock any
169+
// readers or writers that might be holding the mutex on Read/Write
170+
varg errgroup.Group
171+
172+
ifbp.conn!=nil {
173+
conn:=bp.conn
174+
g.Go(func()error {
175+
returnconn.Close()
176+
})
177+
bp.conn=nil
178+
}
179+
180+
ifbp.reader!=nil {
181+
reader:=bp.reader
182+
g.Go(func()error {
183+
returnreader.Close()
184+
})
185+
}
186+
187+
ifbp.writer!=nil {
188+
writer:=bp.writer
189+
g.Go(func()error {
190+
returnwriter.Close()
191+
})
192+
}
193+
194+
// Wait for all close operations to complete and return any error
195+
returng.Wait()
196+
}
197+
198+
// Connected returns whether the pipe is currently connected.
199+
func (bp*BackedPipe)Connected()bool {
200+
bp.mu.RLock()
201+
deferbp.mu.RUnlock()
202+
returnbp.state==connected&&bp.reader.Connected()&&bp.writer.Connected()
203+
}
204+
205+
// reconnectLocked handles the reconnection logic. Must be called with write lock held.
206+
func (bp*BackedPipe)reconnectLocked()error {
207+
ifbp.state==reconnecting {
208+
returnErrReconnectionInProgress
209+
}
210+
211+
bp.state=reconnecting
212+
deferfunc() {
213+
// Only reset to disconnected if we're still in reconnecting state
214+
// (successful reconnection will set state to connected)
215+
ifbp.state==reconnecting {
216+
bp.state=disconnected
217+
}
218+
}()
219+
220+
// Close existing connection if any
221+
ifbp.conn!=nil {
222+
_=bp.conn.Close()
223+
bp.conn=nil
224+
}
225+
226+
// Increment the generation and update both reader and writer.
227+
// We do it now to track even the connections that fail during
228+
// Reconnect.
229+
bp.connGen++
230+
bp.reader.SetGeneration(bp.connGen)
231+
bp.writer.SetGeneration(bp.connGen)
232+
233+
// Reconnect reader and writer
234+
seqNum:=make(chanuint64,1)
235+
newR:=make(chan io.Reader,1)
236+
237+
gobp.reader.Reconnect(seqNum,newR)
238+
239+
// Get the precise reader sequence number from the reader while it holds its lock
240+
readerSeqNum,ok:=<-seqNum
241+
if!ok {
242+
// Reader was closed during reconnection
243+
returnErrReconnectFailed
244+
}
245+
246+
// Perform reconnect using the exact sequence number we just received
247+
conn,remoteReaderSeqNum,err:=bp.reconnector.Reconnect(bp.ctx,readerSeqNum)
248+
iferr!=nil {
249+
// Unblock reader reconnect
250+
newR<-nil
251+
returnErrReconnectFailed
252+
}
253+
254+
// Provide the new connection to the reader (reader still holds its lock)
255+
newR<-conn
256+
257+
// Replay our outbound data from the remote's reader sequence number
258+
writerReconnectErr:=bp.writer.Reconnect(remoteReaderSeqNum,conn)
259+
ifwriterReconnectErr!=nil {
260+
returnErrReconnectWriterFailed
261+
}
262+
263+
// Success - update state
264+
bp.conn=conn
265+
bp.state=connected
266+
267+
returnnil
268+
}
269+
270+
// handleErrors listens for connection errors from reader/writer and triggers reconnection.
271+
// It filters errors from old connections and ensures only the first error per generation
272+
// triggers reconnection.
273+
func (bp*BackedPipe)handleErrors() {
274+
for {
275+
select {
276+
case<-bp.ctx.Done():
277+
return
278+
caseerrorEvt:=<-bp.errChan:
279+
bp.handleConnectionError(errorEvt)
280+
}
281+
}
282+
}
283+
284+
// handleConnectionError handles errors from either reader or writer components.
285+
// It filters errors from old connections and ensures only one reconnection per generation.
286+
func (bp*BackedPipe)handleConnectionError(errorEvtErrorEvent) {
287+
bp.mu.Lock()
288+
deferbp.mu.Unlock()
289+
290+
// Skip if already closed
291+
ifbp.state==closed {
292+
return
293+
}
294+
295+
// Filter errors from old connections (lower generation)
296+
iferrorEvt.Generation<bp.connGen {
297+
return
298+
}
299+
300+
// Skip if not connected (already disconnected or reconnecting)
301+
ifbp.state!=connected {
302+
return
303+
}
304+
305+
// Skip if we've already seen an error for this generation
306+
ifbp.lastErrorGen>=errorEvt.Generation {
307+
return
308+
}
309+
310+
// This is the first error for this generation
311+
bp.lastErrorGen=errorEvt.Generation
312+
313+
// Mark as disconnected
314+
bp.state=disconnected
315+
316+
// Try to reconnect using internal context
317+
reconnectErr:=bp.reconnectLocked()
318+
319+
ifreconnectErr!=nil {
320+
// Reconnection failed - log or handle as needed
321+
// For now, we'll just continue and wait for manual reconnection
322+
_=errorEvt.Err// Use the original error from the component
323+
_=errorEvt.Component// Component info available for potential logging by higher layers
324+
}
325+
}
326+
327+
// ForceReconnect forces a reconnection attempt immediately.
328+
// This can be used to force a reconnection if a new connection is established.
329+
// It prevents duplicate reconnections when called concurrently.
330+
func (bp*BackedPipe)ForceReconnect()error {
331+
// Deduplicate concurrent ForceReconnect calls so only one reconnection
332+
// attempt runs at a time from this API. Use the pipe's internal context
333+
// to ensure Close() cancels any in-flight attempt.
334+
_,err,_:=bp.sf.Do("force-reconnect",func() (interface{},error) {
335+
bp.mu.Lock()
336+
deferbp.mu.Unlock()
337+
338+
ifbp.state==closed {
339+
returnnil,io.EOF
340+
}
341+
342+
// Don't force reconnect if already reconnecting
343+
ifbp.state==reconnecting {
344+
returnnil,ErrReconnectionInProgress
345+
}
346+
347+
returnnil,bp.reconnectLocked()
348+
})
349+
returnerr
350+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp