Movatterモバイル変換


[0]ホーム

URL:


backedpipe

package
v2.28.5Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 1, 2025 License:AGPL-3.0Imports:7Imported by:0

Details

Repository

github.com/coder/coder

Links

Documentation

Index

Constants

View Source
const (// Default buffer capacity used by the writer - 64MBDefaultBufferSize = 64 * 1024 * 1024)

Variables

View Source
var (ErrPipeClosed             =xerrors.New("pipe is closed")ErrPipeAlreadyConnected   =xerrors.New("pipe is already connected")ErrReconnectionInProgress =xerrors.New("reconnection already in progress")ErrReconnectFailed        =xerrors.New("reconnect failed")ErrInvalidSequenceNumber  =xerrors.New("remote sequence number exceeds local sequence")ErrReconnectWriterFailed  =xerrors.New("reconnect writer failed"))
View Source
var (ErrWriterClosed          =xerrors.New("cannot reconnect closed writer")ErrNilWriter             =xerrors.New("new writer cannot be nil")ErrFutureSequence        =xerrors.New("cannot replay from future sequence")ErrReplayDataUnavailable =xerrors.New("failed to read replay data")ErrReplayFailed          =xerrors.New("replay failed")ErrPartialReplay         =xerrors.New("partial replay"))

Functions

This section is empty.

Types

typeBackedPipe

type BackedPipe struct {// contains filtered or unexported fields}

BackedPipe provides a reliable bidirectional byte stream over unreliable network connections.It orchestrates a BackedReader and BackedWriter to provide transparent reconnectionand data replay capabilities.

funcNewBackedPipe

func NewBackedPipe(ctxcontext.Context, reconnectorReconnector) *BackedPipe

NewBackedPipe creates a new BackedPipe with default options and the specified reconnector.The pipe starts disconnected and must be connected using Connect().

func (*BackedPipe)Close

func (bp *BackedPipe) Close()error

Close closes the pipe and all underlying connections.

func (*BackedPipe)Connect

func (bp *BackedPipe) Connect()error

Connect establishes the initial connection using the reconnect function.

func (*BackedPipe)Connected

func (bp *BackedPipe) Connected()bool

Connected returns whether the pipe is currently connected.

func (*BackedPipe)ForceReconnect

func (bp *BackedPipe) ForceReconnect()error

ForceReconnect forces a reconnection attempt immediately.This can be used to force a reconnection if a new connection is established.It prevents duplicate reconnections when called concurrently.

func (*BackedPipe)Read

func (bp *BackedPipe) Read(p []byte) (int,error)

Read implements io.Reader by delegating to the BackedReader.

func (*BackedPipe)Write

func (bp *BackedPipe) Write(p []byte) (int,error)

Write implements io.Writer by delegating to the BackedWriter.

typeBackedReader

type BackedReader struct {// contains filtered or unexported fields}

BackedReader wraps an unreliable io.Reader and makes it resilient to disconnections.It tracks sequence numbers for all bytes read and can handle reconnection,blocking reads when disconnected instead of erroring.

funcNewBackedReader

func NewBackedReader(errorEventChan chan<-ErrorEvent) *BackedReader

NewBackedReader creates a new BackedReader with generation-aware error reporting.The reader is initially disconnected and must be connected using Reconnect beforereads will succeed. The errorEventChan will receive ErrorEvent structs containingerror details, component info, and connection generation.

func (*BackedReader)Close

func (br *BackedReader) Close()error

Close the reader and wake up any blocked reads.After closing, all Read calls will return io.EOF.

func (*BackedReader)Connected

func (br *BackedReader) Connected()bool

Connected returns whether the reader is currently connected.

func (*BackedReader)Read

func (br *BackedReader) Read(p []byte) (int,error)

Read implements io.Reader. It blocks when disconnected until either:1. A reconnection is established2. The reader is closed

When connected, it reads from the underlying reader and updates sequence numbers.Connection failures are automatically detected and reported to the higher layer via callback.

func (*BackedReader)Reconnect

func (br *BackedReader) Reconnect(seqNum chan<-uint64, newR <-chanio.Reader)

Reconnect coordinates reconnection using channels for better synchronization.The seqNum channel is used to send the current sequence number to the caller.The newR channel is used to receive the new reader from the caller.This allows for better coordination during the reconnection process.

func (*BackedReader)SequenceNum

func (br *BackedReader) SequenceNum()uint64

SequenceNum returns the current sequence number (total bytes read).

func (*BackedReader)SetGeneration

func (br *BackedReader) SetGeneration(generationuint64)

SetGeneration sets the current connection generation for error reporting.

typeBackedWriter

type BackedWriter struct {// contains filtered or unexported fields}

BackedWriter wraps an unreliable io.Writer and makes it resilient to disconnections.It maintains a ring buffer of recent writes for replay during reconnection.

funcNewBackedWriter

func NewBackedWriter(capacityint, errorEventChan chan<-ErrorEvent) *BackedWriter

NewBackedWriter creates a new BackedWriter with generation-aware error reporting.The writer is initially disconnected and will block writes until connected.The errorEventChan will receive ErrorEvent structs containing error details,component info, and connection generation. Capacity must be > 0.

func (*BackedWriter)Close

func (bw *BackedWriter) Close()error

Close closes the writer and prevents further writes.After closing, all Write calls will return os.ErrClosed.This code keeps the Close() signature consistent with io.Closer,but it never actually returns an error.

IMPORTANT: You must close the current underlying writer, if any, before callingthis method. Otherwise, if a Write operation is currently blocked in theunderlying writer's Write method, this method will deadlock waiting for themutex that Write holds.

func (*BackedWriter)Connected

func (bw *BackedWriter) Connected()bool

Connected returns whether the writer is currently connected.

func (*BackedWriter)Reconnect

func (bw *BackedWriter) Reconnect(replayFromSequint64, newWriterio.Writer)error

Reconnect replaces the current writer with a new one and replays data from the specifiedsequence number. If the requested sequence number is no longer in the buffer,returns an error indicating data loss.

IMPORTANT: You must close the current writer, if any, before calling this method.Otherwise, if a Write operation is currently blocked in the underlying writer'sWrite method, this method will deadlock waiting for the mutex that Write holds.

func (*BackedWriter)SequenceNum

func (bw *BackedWriter) SequenceNum()uint64

SequenceNum returns the current sequence number (total bytes written).

func (*BackedWriter)SetGeneration

func (bw *BackedWriter) SetGeneration(generationuint64)

SetGeneration sets the current connection generation for error reporting.

func (*BackedWriter)Write

func (bw *BackedWriter) Write(p []byte) (int,error)

Write implements io.Writer.When connected, it writes to both the ring buffer (to preserve data in case we need to replay it)and the underlying writer.If the underlying write fails, the writer is marked as disconnected and the write blocksuntil reconnection occurs.

typeErrorEvent

type ErrorEvent struct {ErrerrorComponentstring// "reader" or "writer"Generationuint64// connection generation when error occurred}

ErrorEvent represents an error from a reader or writer with connection generation info.

typeReconnector

type Reconnector interface {Reconnect(ctxcontext.Context, readerSeqNumuint64) (connio.ReadWriteCloser, remoteReaderSeqNumuint64, errerror)}

Reconnector is an interface for establishing connections when the BackedPipe needs to reconnect.Implementations should:1. Establish a new connection to the remote side2. Exchange sequence numbers with the remote side3. Return the new connection and the remote's reader sequence number

The readerSeqNum parameter is the local reader's current sequence number(total bytes successfully read from the remote). This must be sent to theremote so it can replay its data to us starting from this number.

The returned remoteReaderSeqNum should be the remote side's reader sequencenumber (how many bytes of our outbound data it has successfully read). Thisinforms our writer where to resume (i.e., which bytes to replay to the remote).

Source Files

View all Source files

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f orF : Jump to
y orY : Canonical URL
go.dev uses cookies from Google to deliver and enhance the quality of its services and to analyze traffic.Learn more.

[8]ページ先頭

©2009-2025 Movatter.jp