backedpipe
packageThis package is not in the latest version of its module.
Details
Validgo.mod file
The Go module system was introduced in Go 1.11 and is the official dependency management solution for Go.
Redistributable license
Redistributable licenses place minimal restrictions on how software can be used, modified, and redistributed.
Tagged version
Modules with tagged versions give importers more predictable builds.
Stable version
When a project reaches major version v1 it is considered stable.
- Learn more about best practices
Repository
Links
Documentation¶
Index¶
- Constants
- Variables
- type BackedPipe
- type BackedReader
- func (br *BackedReader) Close() error
- func (br *BackedReader) Connected() bool
- func (br *BackedReader) Read(p []byte) (int, error)
- func (br *BackedReader) Reconnect(seqNum chan<- uint64, newR <-chan io.Reader)
- func (br *BackedReader) SequenceNum() uint64
- func (br *BackedReader) SetGeneration(generation uint64)
- type BackedWriter
- func (bw *BackedWriter) Close() error
- func (bw *BackedWriter) Connected() bool
- func (bw *BackedWriter) Reconnect(replayFromSeq uint64, newWriter io.Writer) error
- func (bw *BackedWriter) SequenceNum() uint64
- func (bw *BackedWriter) SetGeneration(generation uint64)
- func (bw *BackedWriter) Write(p []byte) (int, error)
- type ErrorEvent
- type Reconnector
Constants¶
const (// Default buffer capacity used by the writer - 64MBDefaultBufferSize = 64 * 1024 * 1024)Variables¶
var (ErrPipeClosed =xerrors.New("pipe is closed")ErrPipeAlreadyConnected =xerrors.New("pipe is already connected")ErrReconnectionInProgress =xerrors.New("reconnection already in progress")ErrReconnectFailed =xerrors.New("reconnect failed")ErrInvalidSequenceNumber =xerrors.New("remote sequence number exceeds local sequence")ErrReconnectWriterFailed =xerrors.New("reconnect writer failed"))
var (ErrWriterClosed =xerrors.New("cannot reconnect closed writer")ErrNilWriter =xerrors.New("new writer cannot be nil")ErrFutureSequence =xerrors.New("cannot replay from future sequence")ErrReplayDataUnavailable =xerrors.New("failed to read replay data")ErrReplayFailed =xerrors.New("replay failed")ErrPartialReplay =xerrors.New("partial replay"))
Functions¶
This section is empty.
Types¶
typeBackedPipe¶
type BackedPipe struct {// contains filtered or unexported fields}BackedPipe provides a reliable bidirectional byte stream over unreliable network connections.It orchestrates a BackedReader and BackedWriter to provide transparent reconnectionand data replay capabilities.
funcNewBackedPipe¶
func NewBackedPipe(ctxcontext.Context, reconnectorReconnector) *BackedPipe
NewBackedPipe creates a new BackedPipe with default options and the specified reconnector.The pipe starts disconnected and must be connected using Connect().
func (*BackedPipe)Close¶
func (bp *BackedPipe) Close()error
Close closes the pipe and all underlying connections.
func (*BackedPipe)Connect¶
func (bp *BackedPipe) Connect()error
Connect establishes the initial connection using the reconnect function.
func (*BackedPipe)Connected¶
func (bp *BackedPipe) Connected()bool
Connected returns whether the pipe is currently connected.
func (*BackedPipe)ForceReconnect¶
func (bp *BackedPipe) ForceReconnect()error
ForceReconnect forces a reconnection attempt immediately.This can be used to force a reconnection if a new connection is established.It prevents duplicate reconnections when called concurrently.
typeBackedReader¶
type BackedReader struct {// contains filtered or unexported fields}BackedReader wraps an unreliable io.Reader and makes it resilient to disconnections.It tracks sequence numbers for all bytes read and can handle reconnection,blocking reads when disconnected instead of erroring.
funcNewBackedReader¶
func NewBackedReader(errorEventChan chan<-ErrorEvent) *BackedReader
NewBackedReader creates a new BackedReader with generation-aware error reporting.The reader is initially disconnected and must be connected using Reconnect beforereads will succeed. The errorEventChan will receive ErrorEvent structs containingerror details, component info, and connection generation.
func (*BackedReader)Close¶
func (br *BackedReader) Close()error
Close the reader and wake up any blocked reads.After closing, all Read calls will return io.EOF.
func (*BackedReader)Connected¶
func (br *BackedReader) Connected()bool
Connected returns whether the reader is currently connected.
func (*BackedReader)Read¶
func (br *BackedReader) Read(p []byte) (int,error)
Read implements io.Reader. It blocks when disconnected until either:1. A reconnection is established2. The reader is closed
When connected, it reads from the underlying reader and updates sequence numbers.Connection failures are automatically detected and reported to the higher layer via callback.
func (*BackedReader)Reconnect¶
func (br *BackedReader) Reconnect(seqNum chan<-uint64, newR <-chanio.Reader)
Reconnect coordinates reconnection using channels for better synchronization.The seqNum channel is used to send the current sequence number to the caller.The newR channel is used to receive the new reader from the caller.This allows for better coordination during the reconnection process.
func (*BackedReader)SequenceNum¶
func (br *BackedReader) SequenceNum()uint64
SequenceNum returns the current sequence number (total bytes read).
func (*BackedReader)SetGeneration¶
func (br *BackedReader) SetGeneration(generationuint64)
SetGeneration sets the current connection generation for error reporting.
typeBackedWriter¶
type BackedWriter struct {// contains filtered or unexported fields}BackedWriter wraps an unreliable io.Writer and makes it resilient to disconnections.It maintains a ring buffer of recent writes for replay during reconnection.
funcNewBackedWriter¶
func NewBackedWriter(capacityint, errorEventChan chan<-ErrorEvent) *BackedWriter
NewBackedWriter creates a new BackedWriter with generation-aware error reporting.The writer is initially disconnected and will block writes until connected.The errorEventChan will receive ErrorEvent structs containing error details,component info, and connection generation. Capacity must be > 0.
func (*BackedWriter)Close¶
func (bw *BackedWriter) Close()error
Close closes the writer and prevents further writes.After closing, all Write calls will return os.ErrClosed.This code keeps the Close() signature consistent with io.Closer,but it never actually returns an error.
IMPORTANT: You must close the current underlying writer, if any, before callingthis method. Otherwise, if a Write operation is currently blocked in theunderlying writer's Write method, this method will deadlock waiting for themutex that Write holds.
func (*BackedWriter)Connected¶
func (bw *BackedWriter) Connected()bool
Connected returns whether the writer is currently connected.
func (*BackedWriter)Reconnect¶
func (bw *BackedWriter) Reconnect(replayFromSequint64, newWriterio.Writer)error
Reconnect replaces the current writer with a new one and replays data from the specifiedsequence number. If the requested sequence number is no longer in the buffer,returns an error indicating data loss.
IMPORTANT: You must close the current writer, if any, before calling this method.Otherwise, if a Write operation is currently blocked in the underlying writer'sWrite method, this method will deadlock waiting for the mutex that Write holds.
func (*BackedWriter)SequenceNum¶
func (bw *BackedWriter) SequenceNum()uint64
SequenceNum returns the current sequence number (total bytes written).
func (*BackedWriter)SetGeneration¶
func (bw *BackedWriter) SetGeneration(generationuint64)
SetGeneration sets the current connection generation for error reporting.
func (*BackedWriter)Write¶
func (bw *BackedWriter) Write(p []byte) (int,error)
Write implements io.Writer.When connected, it writes to both the ring buffer (to preserve data in case we need to replay it)and the underlying writer.If the underlying write fails, the writer is marked as disconnected and the write blocksuntil reconnection occurs.
typeErrorEvent¶
type ErrorEvent struct {ErrerrorComponentstring// "reader" or "writer"Generationuint64// connection generation when error occurred}ErrorEvent represents an error from a reader or writer with connection generation info.
typeReconnector¶
type Reconnector interface {Reconnect(ctxcontext.Context, readerSeqNumuint64) (connio.ReadWriteCloser, remoteReaderSeqNumuint64, errerror)}Reconnector is an interface for establishing connections when the BackedPipe needs to reconnect.Implementations should:1. Establish a new connection to the remote side2. Exchange sequence numbers with the remote side3. Return the new connection and the remote's reader sequence number
The readerSeqNum parameter is the local reader's current sequence number(total bytes successfully read from the remote). This must be sent to theremote so it can replay its data to us starting from this number.
The returned remoteReaderSeqNum should be the remote side's reader sequencenumber (how many bytes of our outbound data it has successfully read). Thisinforms our writer where to resume (i.e., which bytes to replay to the remote).