machine
packagemoduleThis package is not in the latest version of its module.
Details
Validgo.mod file
The Go module system was introduced in Go 1.11 and is the official dependency management solution for Go.
Redistributable license
Redistributable licenses place minimal restrictions on how software can be used, modified, and redistributed.
Tagged version
Modules with tagged versions give importers more predictable builds.
Stable version
When a project reaches major version v1 it is considered stable.
- Learn more about best practices
Repository
Links
README¶
Machine

import "github.com/autom8ter/machine/v4"
Machine is a zero dependency library for highly concurrent Go applications.It is inspired byerrgroup.Group with extra bells & whistles:
- In memory Publish Subscribe for asynchronously broadcasting & consuming messages in memory
- Asynchronous worker groups similar to errgroup.Group
- Throttled max active goroutine count
- Asynchronous error handling(see
WithErrorHandlerto override default error handler) - Asynchronous cron jobs-
Cron()
Use Cases
Machine is meant to be completely agnostic and dependency free- its use cases are expected to be emergent.Really, it can be usedanywhere goroutines are used.
Highly concurrent and/or asynchronous applications include:
gRPC streaming servers
websocket servers
pubsub servers
reverse proxies
cron jobs
custom database/cache
ETL pipelines
log sink
filesystem walker
code generation
// Machine is an interface for highly asynchronous Go applicationstype Machine interface {// Publish synchronously publishes the MessagePublish(ctx context.Context, msg Message)// Subscribe synchronously subscribes to messages on a given channel, executing the given HandlerFunc UNTIL the context cancels OR false is returned by the HandlerFunc.// Glob matching IS supported for subscribing to multiple channels at once.Subscribe(ctx context.Context, channel string, handler MessageHandlerFunc, opts ...SubscriptionOpt)// Subscribers returns total number of subscribers to the given channelSubscribers(channel string) int// Channels returns the channel names that messages have been sent toChannels() []string// Go asynchronously executes the given FuncGo(ctx context.Context, fn Func)// Cron asynchronously executes the given function on a timed interval UNTIL the context cancels OR false is returned by the CronFuncCron(ctx context.Context, interval time.Duration, fn CronFunc)// Current returns the number of active jobs that are running concurrentlyCurrent() int// Wait blocks until all active async functions(Go, Cron) exitWait()// Close blocks until all active routine's exit(calls Wait) then closes all active subscriptionsClose()}Example
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() var ( m = machine.New() results []string mu sync.RWMutex ) defer m.Close() m.Go(ctx, func(ctx context.Context) error { m.Subscribe(ctx, "accounting.*", func(ctx context.Context, msg machine.Message) (bool, error) { mu.Lock() results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body)) mu.Unlock() return true, nil }) return nil }) m.Go(ctx, func(ctx context.Context) error { m.Subscribe(ctx, "engineering.*", func(ctx context.Context, msg machine.Message) (bool, error) { mu.Lock() results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body)) mu.Unlock() return true, nil }) return nil }) m.Go(ctx, func(ctx context.Context) error { m.Subscribe(ctx, "human_resources.*", func(ctx context.Context, msg machine.Message) (bool, error) { mu.Lock() results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body)) mu.Unlock() return true, nil }) return nil }) m.Go(ctx, func(ctx context.Context) error { m.Subscribe(ctx, "*", func(ctx context.Context, msg machine.Message) (bool, error) { mu.Lock() results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body)) mu.Unlock() return true, nil }) return nil }) <-time.After(1 * time.Second) m.Publish(ctx, machine.Message{ Channel: "human_resources.chat_room6", Body: "hello world human resources", }) m.Publish(ctx, machine.Message{ Channel: "accounting.chat_room2", Body: "hello world accounting", }) m.Publish(ctx, machine.Message{ Channel: "engineering.chat_room1", Body: "hello world engineering", }) m.Wait() sort.Strings(results) for _, res := range results { fmt.Print(res) } // Output: //(accounting.chat_room2) received msg: hello world accounting //(accounting.chat_room2) received msg: hello world accounting //(engineering.chat_room1) received msg: hello world engineering //(engineering.chat_room1) received msg: hello world engineering //(human_resources.chat_room6) received msg: hello world human resources //(human_resources.chat_room6) received msg: hello world human resourcesExtended Examples
All examples are < 500 lines of code(excluding code generation)
Documentation¶
Index¶
Examples¶
Constants¶
This section is empty.
Variables¶
This section is empty.
Functions¶
This section is empty.
Types¶
typeMachine¶
type Machine interface {// Publish synchronously publishes the MessagePublish(ctxcontext.Context, msgMessage)// Subscribe synchronously subscribes to messages on a given channel, executing the given HandlerFunc UNTIL the context cancels OR false is returned by the HandlerFunc.// Glob matching IS supported for subscribing to multiple channels at once.Subscribe(ctxcontext.Context, channelstring, handlerMessageHandlerFunc, options ...SubscriptionOpt)error// Go asynchronously executes the given FuncGo(ctxcontext.Context, fnFunc)// Current returns the number of active jobs that are running concurrentlyCurrent()int// Wait blocks until all active async functions(Go) exitWait()error// Close blocks until all active routine's exit(calls Wait) then closes all active subscriptionsClose()}Machine is an interface for highly asynchronous Go applications
funcNew¶
New creates a new Machine instance with the given options(if present)
Example¶
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)defer cancel()var (m = machine.New()results []stringmu sync.RWMutex)defer m.Close()m.Go(ctx, func(ctx context.Context) error {return m.Subscribe(ctx, "accounting.chat_room2", func(ctx context.Context, msg machine.Message) (bool, error) {mu.Lock()results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body))mu.Unlock()return true, nil})})m.Go(ctx, func(ctx context.Context) error {return m.Subscribe(ctx, "engineering.chat_room1", func(ctx context.Context, msg machine.Message) (bool, error) {mu.Lock()results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body))mu.Unlock()return true, nil})})m.Go(ctx, func(ctx context.Context) error {return m.Subscribe(ctx, "human_resources.chat_room6", func(ctx context.Context, msg machine.Message) (bool, error) {mu.Lock()results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body))mu.Unlock()return true, nil})})m.Go(ctx, func(ctx context.Context) error {return m.Subscribe(ctx, "*", func(ctx context.Context, msg machine.Message) (bool, error) {mu.Lock()results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body))mu.Unlock()return true, nil})})<-time.After(1 * time.Second)m.Publish(ctx, machine.Message{Channel: "human_resources.chat_room6",Body: "sending message to human resources",})m.Publish(ctx, machine.Message{Channel: "accounting.chat_room2",Body: "sending message to accounting",})m.Publish(ctx, machine.Message{Channel: "engineering.chat_room1",Body: "sending message to engineering",})if err := m.Wait(); err != nil {panic(err)}sort.Strings(results)for _, res := range results {fmt.Print(res)}Output:(accounting.chat_room2) received msg: sending message to accounting(accounting.chat_room2) received msg: sending message to accounting(engineering.chat_room1) received msg: sending message to engineering(engineering.chat_room1) received msg: sending message to engineering(human_resources.chat_room6) received msg: sending message to human resources(human_resources.chat_room6) received msg: sending message to human resources
typeMessageFilterFunc¶
Filter is a first class function to filter out messages before they reach a subscriptions primary HandlerReturn true to indicate that a message passes the filter
typeMessageHandlerFunc¶
Handler is a first class that is executed against the inbound message in a subscription.Return false to indicate that the subscription should end
typeOpt¶
type Opt func(o *Options)
Opt configures a machine instance
funcWithThrottledRoutines¶
WithThrottledRoutines throttles the max number of active routine's spawned by the Machine.
typeOptions¶
type Options struct {// contains filtered or unexported fields}Options holds config options for a machine instance
typeSubscriptionOpt¶
type SubscriptionOpt func(options *SubscriptionOptions)
SubscriptionOpt configures a subscription
funcWithFilter¶
func WithFilter(filterMessageFilterFunc)SubscriptionOpt
WithFilter is a subscription option that filters messages
funcWithSubscriptionID¶
func WithSubscriptionID(idstring)SubscriptionOpt
WithSubscriptionID is a subscription option that sets the subscription id - if multiple consumers have the same subscritpion id,a message will be broadcasted to just one of the consumers
typeSubscriptionOptions¶
type SubscriptionOptions struct {// contains filtered or unexported fields}SubscriptionOptions holds config options for a subscription