Movatterモバイル変換


[0]ホーム

URL:


machine

packagemodule
v4.0.0Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2023 License:Apache-2.0Imports:5Imported by:3

Details

Repository

github.com/autom8ter/machine

Links

README

MachineGoDoc

concurrency

import "github.com/autom8ter/machine/v4"

Machine is a zero dependency library for highly concurrent Go applications.It is inspired byerrgroup.Group with extra bells & whistles:

  • In memory Publish Subscribe for asynchronously broadcasting & consuming messages in memory
  • Asynchronous worker groups similar to errgroup.Group
  • Throttled max active goroutine count
  • Asynchronous error handling(seeWithErrorHandler to override default error handler)
  • Asynchronous cron jobs-Cron()

Use Cases

Machine is meant to be completely agnostic and dependency free- its use cases are expected to be emergent.Really, it can be usedanywhere goroutines are used.

Highly concurrent and/or asynchronous applications include:

  • gRPC streaming servers

  • websocket servers

  • pubsub servers

  • reverse proxies

  • cron jobs

  • custom database/cache

  • ETL pipelines

  • log sink

  • filesystem walker

  • code generation

// Machine is an interface for highly asynchronous Go applicationstype Machine interface {// Publish synchronously publishes the MessagePublish(ctx context.Context, msg Message)// Subscribe synchronously subscribes to messages on a given channel,  executing the given HandlerFunc UNTIL the context cancels OR false is returned by the HandlerFunc.// Glob matching IS supported for subscribing to multiple channels at once.Subscribe(ctx context.Context, channel string, handler MessageHandlerFunc, opts ...SubscriptionOpt)// Subscribers returns total number of subscribers to the given channelSubscribers(channel string) int// Channels returns the channel names that messages have been sent toChannels() []string// Go asynchronously executes the given FuncGo(ctx context.Context, fn Func)// Cron asynchronously executes the given function on a timed interval UNTIL the context cancels OR false is returned by the CronFuncCron(ctx context.Context, interval time.Duration, fn CronFunc)// Current returns the number of active jobs that are running concurrentlyCurrent() int// Wait blocks until all active async functions(Go, Cron) exitWait()// Close blocks until all active routine's exit(calls Wait) then closes all active subscriptionsClose()}

Example

        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)  defer cancel()  var (  m       = machine.New()  results []string  mu      sync.RWMutex  )  defer m.Close()    m.Go(ctx, func(ctx context.Context) error {  m.Subscribe(ctx, "accounting.*", func(ctx context.Context, msg machine.Message) (bool, error) {  mu.Lock()  results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body))  mu.Unlock()  return true, nil  })  return nil  })  m.Go(ctx, func(ctx context.Context) error {  m.Subscribe(ctx, "engineering.*", func(ctx context.Context, msg machine.Message) (bool, error) {  mu.Lock()  results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body))  mu.Unlock()  return true, nil  })  return nil  })  m.Go(ctx, func(ctx context.Context) error {  m.Subscribe(ctx, "human_resources.*", func(ctx context.Context, msg machine.Message) (bool, error) {  mu.Lock()  results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body))  mu.Unlock()  return true, nil  })  return nil  })  m.Go(ctx, func(ctx context.Context) error {  m.Subscribe(ctx, "*", func(ctx context.Context, msg machine.Message) (bool, error) {  mu.Lock()  results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body))  mu.Unlock()  return true, nil  })  return nil  })  <-time.After(1 * time.Second)  m.Publish(ctx, machine.Message{  Channel: "human_resources.chat_room6",  Body:    "hello world human resources",  })  m.Publish(ctx, machine.Message{  Channel: "accounting.chat_room2",  Body:    "hello world accounting",  })  m.Publish(ctx, machine.Message{  Channel: "engineering.chat_room1",  Body:    "hello world engineering",  })  m.Wait()  sort.Strings(results)  for _, res := range results {  fmt.Print(res)  }  // Output:  //(accounting.chat_room2) received msg: hello world accounting  //(accounting.chat_room2) received msg: hello world accounting  //(engineering.chat_room1) received msg: hello world engineering  //(engineering.chat_room1) received msg: hello world engineering  //(human_resources.chat_room6) received msg: hello world human resources  //(human_resources.chat_room6) received msg: hello world human resources
Extended Examples

All examples are < 500 lines of code(excluding code generation)

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

typeFunc

type Func func(ctxcontext.Context)error

Func is a first class function that is asynchronously executed.

typeMachine

type Machine interface {// Publish synchronously publishes the MessagePublish(ctxcontext.Context, msgMessage)// Subscribe synchronously subscribes to messages on a given channel,  executing the given HandlerFunc UNTIL the context cancels OR false is returned by the HandlerFunc.// Glob matching IS supported for subscribing to multiple channels at once.Subscribe(ctxcontext.Context, channelstring, handlerMessageHandlerFunc, options ...SubscriptionOpt)error// Go asynchronously executes the given FuncGo(ctxcontext.Context, fnFunc)// Current returns the number of active jobs that are running concurrentlyCurrent()int// Wait blocks until all active async functions(Go) exitWait()error// Close blocks until all active routine's exit(calls Wait) then closes all active subscriptionsClose()}

Machine is an interface for highly asynchronous Go applications

funcNew

func New(opts ...Opt)Machine

New creates a new Machine instance with the given options(if present)

Example
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)defer cancel()var (m       = machine.New()results []stringmu      sync.RWMutex)defer m.Close()m.Go(ctx, func(ctx context.Context) error {return m.Subscribe(ctx, "accounting.chat_room2", func(ctx context.Context, msg machine.Message) (bool, error) {mu.Lock()results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body))mu.Unlock()return true, nil})})m.Go(ctx, func(ctx context.Context) error {return m.Subscribe(ctx, "engineering.chat_room1", func(ctx context.Context, msg machine.Message) (bool, error) {mu.Lock()results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body))mu.Unlock()return true, nil})})m.Go(ctx, func(ctx context.Context) error {return m.Subscribe(ctx, "human_resources.chat_room6", func(ctx context.Context, msg machine.Message) (bool, error) {mu.Lock()results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body))mu.Unlock()return true, nil})})m.Go(ctx, func(ctx context.Context) error {return m.Subscribe(ctx, "*", func(ctx context.Context, msg machine.Message) (bool, error) {mu.Lock()results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body))mu.Unlock()return true, nil})})<-time.After(1 * time.Second)m.Publish(ctx, machine.Message{Channel: "human_resources.chat_room6",Body:    "sending message to human resources",})m.Publish(ctx, machine.Message{Channel: "accounting.chat_room2",Body:    "sending message to accounting",})m.Publish(ctx, machine.Message{Channel: "engineering.chat_room1",Body:    "sending message to engineering",})if err := m.Wait(); err != nil {panic(err)}sort.Strings(results)for _, res := range results {fmt.Print(res)}
Output:(accounting.chat_room2) received msg: sending message to accounting(accounting.chat_room2) received msg: sending message to accounting(engineering.chat_room1) received msg: sending message to engineering(engineering.chat_room1) received msg: sending message to engineering(human_resources.chat_room6) received msg: sending message to human resources(human_resources.chat_room6) received msg: sending message to human resources

typeMessage

type Message struct {ChannelstringBody    interface{}}

typeMessageFilterFunc

type MessageFilterFunc func(ctxcontext.Context, msgMessage) (bool,error)

Filter is a first class function to filter out messages before they reach a subscriptions primary HandlerReturn true to indicate that a message passes the filter

typeMessageHandlerFunc

type MessageHandlerFunc func(ctxcontext.Context, msgMessage) (bool,error)

Handler is a first class that is executed against the inbound message in a subscription.Return false to indicate that the subscription should end

typeOpt

type Opt func(o *Options)

Opt configures a machine instance

funcWithThrottledRoutines

func WithThrottledRoutines(maxint)Opt

WithThrottledRoutines throttles the max number of active routine's spawned by the Machine.

typeOptions

type Options struct {// contains filtered or unexported fields}

Options holds config options for a machine instance

typeSubscriptionOpt

type SubscriptionOpt func(options *SubscriptionOptions)

SubscriptionOpt configures a subscription

funcWithFilter

func WithFilter(filterMessageFilterFunc)SubscriptionOpt

WithFilter is a subscription option that filters messages

funcWithSubscriptionID

func WithSubscriptionID(idstring)SubscriptionOpt

WithSubscriptionID is a subscription option that sets the subscription id - if multiple consumers have the same subscritpion id,a message will be broadcasted to just one of the consumers

typeSubscriptionOptions

type SubscriptionOptions struct {// contains filtered or unexported fields}

SubscriptionOptions holds config options for a subscription

Source Files

View all Source files

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f orF : Jump to
y orY : Canonical URL
go.dev uses cookies from Google to deliver and enhance the quality of its services and to analyze traffic.Learn more.

[8]ページ先頭

©2009-2025 Movatter.jp