Movatterモバイル変換


[0]ホーム

URL:


eventbus

package
v1.92.2Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2025 License:BSD-3-ClauseImports:24Imported by:1

Details

Repository

github.com/tailscale/tailscale

Links

Documentation

Overview

Package eventbus provides an in-process event bus.

An event bus connects publishers of typed events with subscribersinterested in those events. Typically, there is one global eventbus per process.

Usage

To send or receive events, first useBus.Client to register withthe bus. Clients should register with a human-readable name thatidentifies the code using the client, to aid in debugging.

To publish events, usePublish on a Client to get a typedpublisher for your event type, then callPublisher.Publish asneeded. If your event is expensive to construct, you can optionallyusePublisher.ShouldPublish to skip the work if nobody islistening for the event.

To receive events, useSubscribe to get a typed subscriber foreach event type you're interested in. Receive the events themselvesby selecting over all yourSubscriber.Events channels, as well asSubscriber.Done for shutdown notifications.

Concurrency properties

The bus serializes all published events across all publishers, andpreserves that ordering when delivering to subscribers that areattached to the same Client. In more detail:

  • An event is published to the bus at some instant between thestart and end of the call toPublisher.Publish.
  • Two events cannot be published at the same instant, and so aretotally ordered by their publication time. Given two events E1and E2, either E1 happens before E2, or E2 happens before E1.
  • Clients dispatch events to their Subscribers in publicationorder: if E1 happens before E2, the client always delivers E1before E2.
  • Clients do not synchronize subscriptions with each other: givenclients C1 and C2, both subscribed to events E1 and E2, C1 maydeliver both E1 and E2 before C2 delivers E1.

Less formally: there is one true timeline of all published events.If you make a Client and subscribe to events, you will receiveevents one at a time, in the same order as the one truetimeline. You will "skip over" events you didn't subscribe to, butyour view of the world always moves forward in time, neverbackwards, and you will observe events in the same order aseveryone else.

However, you cannot assume that what your client see as "now" isthe same as what other clients. They may be further behind you inworking through the timeline, or running ahead of you. This meansyou should be careful about reaching out to another componentdirectly after receiving an event, as its view of the world may notyet (or ever) be exactly consistent with yours.

To make your code more testable and understandable, you should tryto structure it following the actor model: you have some localstate over which you have authority, but your only way to interactwith state elsewhere in the program is to receive and processevents coming from elsewhere, or to emit events of your own.

Expected subscriber behavior

Subscribers are expected to promptly receive their events onSubscriber.Events. The bus has a small, fixed amount of internalbuffering, meaning that a slow subscriber will eventually causebackpressure and block publication of all further events.

In general, you should receive from your subscriber(s) in a loop,and only do fast state updates within that loop. Any heavier workshould be offloaded to another goroutine.

Causing publishers to block from backpressure is considered a bugin the slow subscriber causing the backpressure, and should beaddressed there. Publishers should assume that Publish will notblock for extended periods of time, and should not make exceptionaleffort to behave gracefully if they do get blocked.

These blocking semantics are provisional and subject tochange. Please speak up if this causes development pain, so that wecan adapt the semantics to better suit our needs.

Debugging facilities

TheDebugger, obtained throughBus.Debugger, providesintrospection facilities to monitor events flowing through the bus,and inspect publisher and subscriber state.

Additionally, a debug command exists for monitoring the eventbus:

tailscale debug daemon-bus-events

Testing facilities

Helpers for testing code with the eventbus can be found in:

eventbus/eventbustest

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

typeBus

type Bus struct {// contains filtered or unexported fields}

Bus is an event bus that distributes published events to interestedsubscribers.

funcNew

func New() *Bus

New returns a new bus with default options. It is equivalent tocallingNewWithOptions with zeroBusOptions.

funcNewWithOptionsadded inv1.90.6

func NewWithOptions(optsBusOptions) *Bus

NewWithOptions returns a newBus with the specifiedBusOptions.UseBus.Client to construct clients on the bus.UsePublish to make event publishers.UseSubscribe andSubscribeFunc to make event subscribers.

func (*Bus)Client

func (b *Bus) Client(namestring) *Client

Client returns a new client with no subscriptions. UseSubscribeto receive events, andPublish to emit events.

The client's name is used only for debugging, to tell humans whatpiece of code a publisher/subscriber belongs to. Aim for somethingshort but unique, for example "kernel-route-monitor" or "taildrop",not "watcher".

func (*Bus)Close

func (b *Bus) Close()

Close closes the bus. It implicitly closes all clients, publishers andsubscribers attached to the bus.

Close blocks until the bus is fully shut down. The bus ispermanently unusable after closing.

func (*Bus)Debugger

func (b *Bus) Debugger() *Debugger

Debugger returns the debugging facility for the bus.

typeBusOptionsadded inv1.90.6

type BusOptions struct {// Logf, if non-nil, is used for debug logs emitted by the bus and clients,// publishers, and subscribers under its care. If it is nil, logs are sent// to [log.Printf].Logflogger.Logf}

BusOptions are optional parameters for aBus. A zero value is ready foruse and provides defaults as described.

typeClient

type Client struct {// contains filtered or unexported fields}

A Client can publish and subscribe to events on its attachedbus. SeePublish to publish events, andSubscribe to receiveevents.

Subscribers that share the same client receive events one at atime, in the order they were published.

func (*Client)Close

func (c *Client) Close()

Close closes the client. It implicitly closes all publishers andsubscribers obtained from this client.

func (*Client)Doneadded inv1.90.0

func (c *Client) Done() <-chan struct{}

Done returns a channel that is closed whenClient.Close is called.The channel is closed after all the publishers and subscribers governed bythe client have been closed.

func (*Client)Monitoradded inv1.90.0

func (c *Client) Monitor(f func(*Client))Monitor

Monitor executes f in a new goroutine attended by aMonitor. The calleris responsible for waiting for the goroutine to complete, by calling eitherMonitor.Close orMonitor.Wait.

func (*Client)Name

func (c *Client) Name()string

typeDebugEventadded inv1.86.0

type DebugEvent struct {CountintTypestringFromstringTo    []stringEventany}

DebugEvent is a representation of an event used for debug clients.

typeDebugTopicadded inv1.86.0

type DebugTopic struct {NamestringPublisherstringSubscribers []string}

DebugTopic provides the JSON encoding of publishers and subscribers for agiven topic.

typeDebugTopicsadded inv1.86.0

type DebugTopics struct {Topics []DebugTopic}

DebugTopics provides the JSON encoding as a wrapper for a collection ofDebugTopic.

typeDebugger

type Debugger struct {// contains filtered or unexported fields}

A Debugger offers access to a bus's privileged introspection anddebugging facilities.

The debugger's functionality is intended for humans and their toolsto examine and troubleshoot bus clients, and should not be used innormal codepaths.

In particular, the debugger provides access to information that isdeliberately withheld from bus clients to encourage more robust andmaintainable code - for example, the sender of an event, or theevent streams of other clients. Please don't use the debugger tocircumvent these restrictions for purposes other than debugging.

func (*Debugger)Clients

func (d *Debugger) Clients() []*Client

Clients returns a list of all clients attached to the bus.

func (*Debugger)PublishQueue

func (d *Debugger) PublishQueue() []PublishedEvent

PublishQueue returns the contents of the publish queue.

The publish queue contains events that have been accepted by thebus from Publish() calls, but have not yet been routed to relevantsubscribers.

This queue is expected to be almost empty in normal operation. Afull publish queue indicates that a slow subscriber downstream iscausing backpressure and stalling the bus.

func (*Debugger)PublishTypes

func (d *Debugger) PublishTypes(client *Client) []reflect.Type

PublishTypes returns the list of types being published by client.

The returned types are those for which the client has obtained aPublisher. The client may not have ever sent the type inquestion.

func (*Debugger)RegisterHTTP

func (d *Debugger) RegisterHTTP(td *tsweb.DebugHandler)

func (*Debugger)SubscribeQueue

func (d *Debugger) SubscribeQueue(client *Client) []DeliveredEvent

SubscribeQueue returns the contents of the given client's subscribequeue.

The subscribe queue contains events that are to be delivered to theclient, but haven't yet been handed off to the relevantSubscriber.

This queue is expected to be almost empty in normal operation. Afull subscribe queue indicates that the client is accepting eventstoo slowly, and may be causing the rest of the bus to stall.

func (*Debugger)SubscribeTypes

func (d *Debugger) SubscribeTypes(client *Client) []reflect.Type

SubscribeTypes returns the list of types being subscribed to byclient.

The returned types are those for which the client has obtained aSubscriber. The client may not have ever received the type inquestion, and here may not be any publishers of the type.

func (*Debugger)WatchBus

func (d *Debugger) WatchBus() *Subscriber[RoutedEvent]

WatchBus streams information about all events passing through thebus.

Monitored events are delivered in the bus's global publicationorder (see "Concurrency properties" in the package docs).

The caller must consume monitoring events promptly to avoidstalling the bus (see "Expected subscriber behavior" in the packagedocs).

func (*Debugger)WatchPublish

func (d *Debugger) WatchPublish(client *Client) *Subscriber[PublishedEvent]

WatchPublish streams information about all events published by thegiven client.

Monitored events are delivered in the bus's global publicationorder (see "Concurrency properties" in the package docs).

The caller must consume monitoring events promptly to avoidstalling the bus (see "Expected subscriber behavior" in the packagedocs).

func (*Debugger)WatchSubscribe

func (d *Debugger) WatchSubscribe(client *Client) *Subscriber[DeliveredEvent]

WatchSubscribe streams information about all events received by thegiven client.

Monitored events are delivered in the bus's global publicationorder (see "Concurrency properties" in the package docs).

The caller must consume monitoring events promptly to avoidstalling the bus (see "Expected subscriber behavior" in the packagedocs).

typeDeliveredEvent

type DeliveredEvent struct {EventanyFrom  *ClientTo    *Client}

typeMonitoradded inv1.90.0

type Monitor struct {// contains filtered or unexported fields}

A Monitor monitors the execution of a goroutine processing events from aClient, allowing the caller to block until it is complete. The zero valueof m is valid; its Close and Wait methods return immediately, and its Donemethod returns an already-closed channel.

func (Monitor)Closeadded inv1.90.0

func (mMonitor) Close()

Close closes the client associated with m and blocks until the processinggoroutine is complete.

func (Monitor)Doneadded inv1.90.0

func (mMonitor) Done() <-chan struct{}

Done returns a channel that is closed when the monitored goroutine hasfinished executing.

func (Monitor)Waitadded inv1.90.0

func (mMonitor) Wait()

Wait blocks until the goroutine monitored by m has finished executing, butdoes not close the associated client. It is safe to call Wait repeatedly,and from multiple concurrent goroutines.

typePublishedEvent

type PublishedEvent struct {EventanyFrom  *Client}

typePublisher

type Publisher[Tany] struct {// contains filtered or unexported fields}

A Publisher publishes typed events on a bus.

funcPublish

func Publish[Tany](c *Client) *Publisher[T]

Publish returns a publisher for event type T using the given client.It panics if c is closed.

func (*Publisher[T])Close

func (p *Publisher[T]) Close()

Close closes the publisher.

Calls to Publish after Close silently do nothing.

If the Bus or Client from which the Publisher was created is closed,the Publisher is implicitly closed and does not need to be closedseparately.

func (*Publisher[T])Publish

func (p *Publisher[T]) Publish(v T)

Publish publishes event v on the bus.

func (*Publisher[T])ShouldPublish

func (p *Publisher[T]) ShouldPublish()bool

ShouldPublish reports whether anyone is subscribed to the eventsthat this publisher emits.

ShouldPublish can be used to skip expensive event construction ifnobody seems to care. Publishers must not assume that someone willdefinitely receive an event if ShouldPublish returns true.

typeRoutedEvent

type RoutedEvent struct {EventanyFrom  *ClientTo    []*Client}

typeSubscriber

type Subscriber[Tany] struct {// contains filtered or unexported fields}

A Subscriber delivers one type of event from aClient.Events are sent to theSubscriber.Events channel.

funcSubscribe

func Subscribe[Tany](c *Client) *Subscriber[T]

Subscribe requests delivery of events of type T through the given client.It panics if c already has a subscriber for type T, or if c is closed.

func (*Subscriber[T])Close

func (s *Subscriber[T]) Close()

Close closes the Subscriber, indicating the caller no longer wishesto receive this event type. After Close, receives onSubscriber.Events block for ever.

If the Bus from which the Subscriber was created is closed,the Subscriber is implicitly closed and does not need to be closedseparately.

func (*Subscriber[T])Done

func (s *Subscriber[T]) Done() <-chan struct{}

Done returns a channel that is closed when the subscriber isclosed.

func (*Subscriber[T])Events

func (s *Subscriber[T]) Events() <-chan T

Events returns a channel on which the subscriber's events aredelivered.

typeSubscriberFuncadded inv1.90.0

type SubscriberFunc[Tany] struct {// contains filtered or unexported fields}

A SubscriberFunc delivers one type of event from aClient.Events are forwarded synchronously to a function provided at construction.

funcSubscribeFuncadded inv1.90.0

func SubscribeFunc[Tany](c *Client, f func(T)) *SubscriberFunc[T]

SubscribeFunc is likeSubscribe, but calls the provided func for eachevent of type T.

A SubscriberFunc calls f synchronously from the client's goroutine.This means the callback must not block for an extended period of time,as this will block the subscriber and slow event processing for allsubscriptions on c.

func (*SubscriberFunc[T])Closeadded inv1.90.0

func (s *SubscriberFunc[T]) Close()

Close closes the SubscriberFunc, indicating the caller no longer wishes toreceive this event type. After Close, no further events will be passed tothe callback.

If theBus from which s was created is closed, s is implicitly closed anddoes not need to be closed separately.

Source Files

View all Source files

Directories

PathSynopsis
debug-demo is a program that serves a bus's debug interface over HTTP, then generates some fake traffic from a handful of clients.
debug-demo is a program that serves a bus's debug interface over HTTP, then generates some fake traffic from a handful of clients.
Package eventbustest provides helper methods for testing an eventbus.Bus.
Package eventbustest provides helper methods for testing an eventbus.Bus.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f orF : Jump to
y orY : Canonical URL
go.dev uses cookies from Google to deliver and enhance the quality of its services and to analyze traffic.Learn more.

[8]ページ先頭

©2009-2025 Movatter.jp