eventbus
packageThis package is not in the latest version of its module.
Details
Validgo.mod file
The Go module system was introduced in Go 1.11 and is the official dependency management solution for Go.
Redistributable license
Redistributable licenses place minimal restrictions on how software can be used, modified, and redistributed.
Tagged version
Modules with tagged versions give importers more predictable builds.
Stable version
When a project reaches major version v1 it is considered stable.
- Learn more about best practices
Repository
Links
Documentation¶
Overview¶
Package eventbus provides an in-process event bus.
An event bus connects publishers of typed events with subscribersinterested in those events. Typically, there is one global eventbus per process.
Usage¶
To send or receive events, first useBus.Client to register withthe bus. Clients should register with a human-readable name thatidentifies the code using the client, to aid in debugging.
To publish events, usePublish on a Client to get a typedpublisher for your event type, then callPublisher.Publish asneeded. If your event is expensive to construct, you can optionallyusePublisher.ShouldPublish to skip the work if nobody islistening for the event.
To receive events, useSubscribe to get a typed subscriber foreach event type you're interested in. Receive the events themselvesby selecting over all yourSubscriber.Events channels, as well asSubscriber.Done for shutdown notifications.
Concurrency properties¶
The bus serializes all published events across all publishers, andpreserves that ordering when delivering to subscribers that areattached to the same Client. In more detail:
- An event is published to the bus at some instant between thestart and end of the call toPublisher.Publish.
- Two events cannot be published at the same instant, and so aretotally ordered by their publication time. Given two events E1and E2, either E1 happens before E2, or E2 happens before E1.
- Clients dispatch events to their Subscribers in publicationorder: if E1 happens before E2, the client always delivers E1before E2.
- Clients do not synchronize subscriptions with each other: givenclients C1 and C2, both subscribed to events E1 and E2, C1 maydeliver both E1 and E2 before C2 delivers E1.
Less formally: there is one true timeline of all published events.If you make a Client and subscribe to events, you will receiveevents one at a time, in the same order as the one truetimeline. You will "skip over" events you didn't subscribe to, butyour view of the world always moves forward in time, neverbackwards, and you will observe events in the same order aseveryone else.
However, you cannot assume that what your client see as "now" isthe same as what other clients. They may be further behind you inworking through the timeline, or running ahead of you. This meansyou should be careful about reaching out to another componentdirectly after receiving an event, as its view of the world may notyet (or ever) be exactly consistent with yours.
To make your code more testable and understandable, you should tryto structure it following the actor model: you have some localstate over which you have authority, but your only way to interactwith state elsewhere in the program is to receive and processevents coming from elsewhere, or to emit events of your own.
Expected subscriber behavior¶
Subscribers are expected to promptly receive their events onSubscriber.Events. The bus has a small, fixed amount of internalbuffering, meaning that a slow subscriber will eventually causebackpressure and block publication of all further events.
In general, you should receive from your subscriber(s) in a loop,and only do fast state updates within that loop. Any heavier workshould be offloaded to another goroutine.
Causing publishers to block from backpressure is considered a bugin the slow subscriber causing the backpressure, and should beaddressed there. Publishers should assume that Publish will notblock for extended periods of time, and should not make exceptionaleffort to behave gracefully if they do get blocked.
These blocking semantics are provisional and subject tochange. Please speak up if this causes development pain, so that wecan adapt the semantics to better suit our needs.
Debugging facilities¶
TheDebugger, obtained throughBus.Debugger, providesintrospection facilities to monitor events flowing through the bus,and inspect publisher and subscriber state.
Additionally, a debug command exists for monitoring the eventbus:
tailscale debug daemon-bus-events
Testing facilities¶
Helpers for testing code with the eventbus can be found in:
eventbus/eventbustest
Index¶
- type Bus
- type BusOptions
- type Client
- type DebugEvent
- type DebugTopic
- type DebugTopics
- type Debugger
- func (d *Debugger) Clients() []*Client
- func (d *Debugger) PublishQueue() []PublishedEvent
- func (d *Debugger) PublishTypes(client *Client) []reflect.Type
- func (d *Debugger) RegisterHTTP(td *tsweb.DebugHandler)
- func (d *Debugger) SubscribeQueue(client *Client) []DeliveredEvent
- func (d *Debugger) SubscribeTypes(client *Client) []reflect.Type
- func (d *Debugger) WatchBus() *Subscriber[RoutedEvent]
- func (d *Debugger) WatchPublish(client *Client) *Subscriber[PublishedEvent]
- func (d *Debugger) WatchSubscribe(client *Client) *Subscriber[DeliveredEvent]
- type DeliveredEvent
- type Monitor
- type PublishedEvent
- type Publisher
- type RoutedEvent
- type Subscriber
- type SubscriberFunc
Constants¶
This section is empty.
Variables¶
This section is empty.
Functions¶
This section is empty.
Types¶
typeBus¶
type Bus struct {// contains filtered or unexported fields}Bus is an event bus that distributes published events to interestedsubscribers.
funcNew¶
func New() *Bus
New returns a new bus with default options. It is equivalent tocallingNewWithOptions with zeroBusOptions.
funcNewWithOptions¶added inv1.90.6
func NewWithOptions(optsBusOptions) *Bus
NewWithOptions returns a newBus with the specifiedBusOptions.UseBus.Client to construct clients on the bus.UsePublish to make event publishers.UseSubscribe andSubscribeFunc to make event subscribers.
func (*Bus)Client¶
Client returns a new client with no subscriptions. UseSubscribeto receive events, andPublish to emit events.
The client's name is used only for debugging, to tell humans whatpiece of code a publisher/subscriber belongs to. Aim for somethingshort but unique, for example "kernel-route-monitor" or "taildrop",not "watcher".
typeBusOptions¶added inv1.90.6
type BusOptions struct {// Logf, if non-nil, is used for debug logs emitted by the bus and clients,// publishers, and subscribers under its care. If it is nil, logs are sent// to [log.Printf].Logflogger.Logf}BusOptions are optional parameters for aBus. A zero value is ready foruse and provides defaults as described.
typeClient¶
type Client struct {// contains filtered or unexported fields}A Client can publish and subscribe to events on its attachedbus. SeePublish to publish events, andSubscribe to receiveevents.
Subscribers that share the same client receive events one at atime, in the order they were published.
func (*Client)Close¶
func (c *Client) Close()
Close closes the client. It implicitly closes all publishers andsubscribers obtained from this client.
func (*Client)Done¶added inv1.90.0
func (c *Client) Done() <-chan struct{}
Done returns a channel that is closed whenClient.Close is called.The channel is closed after all the publishers and subscribers governed bythe client have been closed.
func (*Client)Monitor¶added inv1.90.0
Monitor executes f in a new goroutine attended by aMonitor. The calleris responsible for waiting for the goroutine to complete, by calling eitherMonitor.Close orMonitor.Wait.
typeDebugEvent¶added inv1.86.0
DebugEvent is a representation of an event used for debug clients.
typeDebugTopic¶added inv1.86.0
DebugTopic provides the JSON encoding of publishers and subscribers for agiven topic.
typeDebugTopics¶added inv1.86.0
type DebugTopics struct {Topics []DebugTopic}DebugTopics provides the JSON encoding as a wrapper for a collection ofDebugTopic.
typeDebugger¶
type Debugger struct {// contains filtered or unexported fields}A Debugger offers access to a bus's privileged introspection anddebugging facilities.
The debugger's functionality is intended for humans and their toolsto examine and troubleshoot bus clients, and should not be used innormal codepaths.
In particular, the debugger provides access to information that isdeliberately withheld from bus clients to encourage more robust andmaintainable code - for example, the sender of an event, or theevent streams of other clients. Please don't use the debugger tocircumvent these restrictions for purposes other than debugging.
func (*Debugger)PublishQueue¶
func (d *Debugger) PublishQueue() []PublishedEvent
PublishQueue returns the contents of the publish queue.
The publish queue contains events that have been accepted by thebus from Publish() calls, but have not yet been routed to relevantsubscribers.
This queue is expected to be almost empty in normal operation. Afull publish queue indicates that a slow subscriber downstream iscausing backpressure and stalling the bus.
func (*Debugger)PublishTypes¶
PublishTypes returns the list of types being published by client.
The returned types are those for which the client has obtained aPublisher. The client may not have ever sent the type inquestion.
func (*Debugger)RegisterHTTP¶
func (d *Debugger) RegisterHTTP(td *tsweb.DebugHandler)
func (*Debugger)SubscribeQueue¶
func (d *Debugger) SubscribeQueue(client *Client) []DeliveredEvent
SubscribeQueue returns the contents of the given client's subscribequeue.
The subscribe queue contains events that are to be delivered to theclient, but haven't yet been handed off to the relevantSubscriber.
This queue is expected to be almost empty in normal operation. Afull subscribe queue indicates that the client is accepting eventstoo slowly, and may be causing the rest of the bus to stall.
func (*Debugger)SubscribeTypes¶
SubscribeTypes returns the list of types being subscribed to byclient.
The returned types are those for which the client has obtained aSubscriber. The client may not have ever received the type inquestion, and here may not be any publishers of the type.
func (*Debugger)WatchBus¶
func (d *Debugger) WatchBus() *Subscriber[RoutedEvent]
WatchBus streams information about all events passing through thebus.
Monitored events are delivered in the bus's global publicationorder (see "Concurrency properties" in the package docs).
The caller must consume monitoring events promptly to avoidstalling the bus (see "Expected subscriber behavior" in the packagedocs).
func (*Debugger)WatchPublish¶
func (d *Debugger) WatchPublish(client *Client) *Subscriber[PublishedEvent]
WatchPublish streams information about all events published by thegiven client.
Monitored events are delivered in the bus's global publicationorder (see "Concurrency properties" in the package docs).
The caller must consume monitoring events promptly to avoidstalling the bus (see "Expected subscriber behavior" in the packagedocs).
func (*Debugger)WatchSubscribe¶
func (d *Debugger) WatchSubscribe(client *Client) *Subscriber[DeliveredEvent]
WatchSubscribe streams information about all events received by thegiven client.
Monitored events are delivered in the bus's global publicationorder (see "Concurrency properties" in the package docs).
The caller must consume monitoring events promptly to avoidstalling the bus (see "Expected subscriber behavior" in the packagedocs).
typeDeliveredEvent¶
typeMonitor¶added inv1.90.0
type Monitor struct {// contains filtered or unexported fields}A Monitor monitors the execution of a goroutine processing events from aClient, allowing the caller to block until it is complete. The zero valueof m is valid; its Close and Wait methods return immediately, and its Donemethod returns an already-closed channel.
func (Monitor)Close¶added inv1.90.0
func (mMonitor) Close()
Close closes the client associated with m and blocks until the processinggoroutine is complete.
typePublishedEvent¶
typePublisher¶
type Publisher[Tany] struct {// contains filtered or unexported fields}
A Publisher publishes typed events on a bus.
funcPublish¶
Publish returns a publisher for event type T using the given client.It panics if c is closed.
func (*Publisher[T])Close¶
func (p *Publisher[T]) Close()
Close closes the publisher.
Calls to Publish after Close silently do nothing.
If the Bus or Client from which the Publisher was created is closed,the Publisher is implicitly closed and does not need to be closedseparately.
func (*Publisher[T])Publish¶
func (p *Publisher[T]) Publish(v T)
Publish publishes event v on the bus.
func (*Publisher[T])ShouldPublish¶
ShouldPublish reports whether anyone is subscribed to the eventsthat this publisher emits.
ShouldPublish can be used to skip expensive event construction ifnobody seems to care. Publishers must not assume that someone willdefinitely receive an event if ShouldPublish returns true.
typeRoutedEvent¶
typeSubscriber¶
type Subscriber[Tany] struct {// contains filtered or unexported fields}
A Subscriber delivers one type of event from aClient.Events are sent to theSubscriber.Events channel.
funcSubscribe¶
func Subscribe[Tany](c *Client) *Subscriber[T]
Subscribe requests delivery of events of type T through the given client.It panics if c already has a subscriber for type T, or if c is closed.
func (*Subscriber[T])Close¶
func (s *Subscriber[T]) Close()
Close closes the Subscriber, indicating the caller no longer wishesto receive this event type. After Close, receives onSubscriber.Events block for ever.
If the Bus from which the Subscriber was created is closed,the Subscriber is implicitly closed and does not need to be closedseparately.
func (*Subscriber[T])Done¶
func (s *Subscriber[T]) Done() <-chan struct{}
Done returns a channel that is closed when the subscriber isclosed.
func (*Subscriber[T])Events¶
func (s *Subscriber[T]) Events() <-chan T
Events returns a channel on which the subscriber's events aredelivered.
typeSubscriberFunc¶added inv1.90.0
type SubscriberFunc[Tany] struct {// contains filtered or unexported fields}
A SubscriberFunc delivers one type of event from aClient.Events are forwarded synchronously to a function provided at construction.
funcSubscribeFunc¶added inv1.90.0
func SubscribeFunc[Tany](c *Client, f func(T)) *SubscriberFunc[T]
SubscribeFunc is likeSubscribe, but calls the provided func for eachevent of type T.
A SubscriberFunc calls f synchronously from the client's goroutine.This means the callback must not block for an extended period of time,as this will block the subscriber and slow event processing for allsubscriptions on c.
func (*SubscriberFunc[T])Close¶added inv1.90.0
func (s *SubscriberFunc[T]) Close()
Close closes the SubscriberFunc, indicating the caller no longer wishes toreceive this event type. After Close, no further events will be passed tothe callback.
If theBus from which s was created is closed, s is implicitly closed anddoes not need to be closed separately.
Source Files¶
Directories¶
| Path | Synopsis |
|---|---|
debug-demo is a program that serves a bus's debug interface over HTTP, then generates some fake traffic from a handful of clients. | debug-demo is a program that serves a bus's debug interface over HTTP, then generates some fake traffic from a handful of clients. |
Package eventbustest provides helper methods for testing an eventbus.Bus. | Package eventbustest provides helper methods for testing an eventbus.Bus. |