Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Machine is a zero dependency library for highly concurrent Go applications. It is inspired by errgroup.Group with extra bells & whistles

License

NotificationsYou must be signed in to change notification settings

autom8ter/machine

Repository files navigation

concurrency

import "github.com/autom8ter/machine/v4"

Machine is a zero dependency library for highly concurrent Go applications.It is inspired byerrgroup.Group with extra bells & whistles:

  • In memory Publish Subscribe for asynchronously broadcasting & consuming messages in memory
  • Asynchronous worker groups similar to errgroup.Group
  • Throttled max active goroutine count
  • Asynchronous error handling(seeWithErrorHandler to override default error handler)
  • Asynchronous cron jobs-Cron()

Use Cases

Machine is meant to be completely agnostic and dependency free- its use cases are expected to be emergent.Really, it can be usedanywhere goroutines are used.

Highly concurrent and/or asynchronous applications include:

  • gRPC streaming servers

  • websocket servers

  • pubsub servers

  • reverse proxies

  • cron jobs

  • custom database/cache

  • ETL pipelines

  • log sink

  • filesystem walker

  • code generation

// Machine is an interface for highly asynchronous Go applicationstypeMachineinterface {// Publish synchronously publishes the MessagePublish(ctx context.Context,msgMessage)// Subscribe synchronously subscribes to messages on a given channel,  executing the given HandlerFunc UNTIL the context cancels OR false is returned by the HandlerFunc.// Glob matching IS supported for subscribing to multiple channels at once.Subscribe(ctx context.Context,channelstring,handlerMessageHandlerFunc,opts...SubscriptionOpt)// Subscribers returns total number of subscribers to the given channelSubscribers(channelstring)int// Channels returns the channel names that messages have been sent toChannels() []string// Go asynchronously executes the given FuncGo(ctx context.Context,fnFunc)// Cron asynchronously executes the given function on a timed interval UNTIL the context cancels OR false is returned by the CronFuncCron(ctx context.Context,interval time.Duration,fnCronFunc)// Current returns the number of active jobs that are running concurrentlyCurrent()int// Wait blocks until all active async functions(Go, Cron) exitWait()// Close blocks until all active routine's exit(calls Wait) then closes all active subscriptionsClose()}

Example

ctx,cancel:=context.WithTimeout(context.Background(),5*time.Second)defercancel()var (m=machine.New()results []stringmu      sync.RWMutex  )deferm.Close()m.Go(ctx,func(ctx context.Context)error {m.Subscribe(ctx,"accounting.*",func(ctx context.Context,msg machine.Message) (bool,error) {mu.Lock()results=append(results,fmt.Sprintf("(%s) received msg: %v\n",msg.Channel,msg.Body))mu.Unlock()returntrue,nil  })returnnil  })m.Go(ctx,func(ctx context.Context)error {m.Subscribe(ctx,"engineering.*",func(ctx context.Context,msg machine.Message) (bool,error) {mu.Lock()results=append(results,fmt.Sprintf("(%s) received msg: %v\n",msg.Channel,msg.Body))mu.Unlock()returntrue,nil  })returnnil  })m.Go(ctx,func(ctx context.Context)error {m.Subscribe(ctx,"human_resources.*",func(ctx context.Context,msg machine.Message) (bool,error) {mu.Lock()results=append(results,fmt.Sprintf("(%s) received msg: %v\n",msg.Channel,msg.Body))mu.Unlock()returntrue,nil  })returnnil  })m.Go(ctx,func(ctx context.Context)error {m.Subscribe(ctx,"*",func(ctx context.Context,msg machine.Message) (bool,error) {mu.Lock()results=append(results,fmt.Sprintf("(%s) received msg: %v\n",msg.Channel,msg.Body))mu.Unlock()returntrue,nil  })returnnil  })<-time.After(1*time.Second)m.Publish(ctx, machine.Message{Channel:"human_resources.chat_room6",Body:"hello world human resources",  })m.Publish(ctx, machine.Message{Channel:"accounting.chat_room2",Body:"hello world accounting",  })m.Publish(ctx, machine.Message{Channel:"engineering.chat_room1",Body:"hello world engineering",  })m.Wait()sort.Strings(results)for_,res:=rangeresults {fmt.Print(res)  }// Output://(accounting.chat_room2) received msg: hello world accounting//(accounting.chat_room2) received msg: hello world accounting//(engineering.chat_room1) received msg: hello world engineering//(engineering.chat_room1) received msg: hello world engineering//(human_resources.chat_room6) received msg: hello world human resources//(human_resources.chat_room6) received msg: hello world human resources

Extended Examples

All examples are < 500 lines of code(excluding code generation)

About

Machine is a zero dependency library for highly concurrent Go applications. It is inspired by errgroup.Group with extra bells & whistles

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors2

  •  
  •  

[8]ページ先頭

©2009-2025 Movatter.jp