Movatterモバイル変換


[0]ホーム

URL:


Notice  The highest tagged major version isv4.

machine

packagemodule
v1.4.0Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2021 License:Apache-2.0Imports:16Imported by:5

Details

Repository

github.com/autom8ter/machine

Links

README

MachineGoDoc

concurrency

import "github.com/autom8ter/machine"

        ctx, cancel := context.WithCancel(context.Background())defer cancel()m := machine.New(ctx,machine.WithMaxRoutines(10),machine.WithMiddlewares(machine.PanicRecover()),)defer m.Close()channelName := "acme.com"const publisherID = "publisher"// start another goroutine that publishes to the target channel every second for 5 seconds OR the routine's context cancelsm.Go(func(routine machine.Routine) {fmt.Printf("%v | streaming msg to channel = %v stats = %s\n", routine.PID(), channelName, routine.Machine().Stats().String())// publish message to channelroutine.Publish(channelName, "hey there bud!")}, machine.GoWithTags("publish"),machine.GoWithPID(publisherID),machine.GoWithTimeout(5*time.Second),machine.GoWithMiddlewares(// run every second until context cancelsmachine.Cron(time.NewTicker(1*time.Second)),),)// start a goroutine that subscribes to all messages sent to the target channel for 3 seconds OR the routine's context cancelsm.Go(func(routine machine.Routine) {routine.Subscribe(channelName, func(obj interface{}) bool {fmt.Printf("%v | subscription msg received! channel = %v msg = %v stats = %s\n", routine.PID(), channelName, obj, m.Stats().String())    return true        })}, machine.GoWithTags("subscribe"),machine.GoWithTimeout(3*time.Second),)m.Wait()

Machine is a zero dependency library for highly concurrent Go applications. It is inspired byerrgroup.Group with extra bells & whistles:

  • throttled goroutines

  • self-cancellable goroutines withContext

  • global-cancellable goroutines with context (seeCancel)

  • goroutines have IDs and optional tags for easy debugging (seeStats)

  • nativepublish/subscribe implementation for broadcasting messages to active goroutines

  • middlewares for wrapping/decorating functions

  • "sub" machines for creating a dependency tree between groups of goroutines

  • goroutine leak prevention

  • native pprof & golang execution tracer integration

Use Cases

Machine is meant to be completely agnostic and dependency free- its use cases are expected to be emergent.Really, it can be usedanywhere goroutines are used.

Highly concurrent and/or asynchronous applications include:

  • gRPC streaming servers

  • websocket servers

  • pubsub servers

  • reverse proxies

  • cron jobs

  • custom database/cache

  • ETL pipelines

  • log sink

  • filesystem walker

  • code generation

Examples

All examples are < 500 lines of code(excluding code generation)

Documentation

Index

Examples

Constants

View Source
const DefaultMaxRoutines = 1000

Variables

This section is empty.

Functions

This section is empty.

Types

typeFunc

type Func func(routineRoutine)

Func is the function passed into machine.Go. The Routine is passed into this function at runtime.

typeGoOpt

type GoOpt func(o *goOpts)

GoOpt is a function that configures GoOpts

funcGoWithDeadlineadded inv0.2.0

func GoWithDeadline(deadlinetime.Time)GoOpt

GoWithDeadline is a GoOpt that creates the Routine's context with the given deadline.

funcGoWithMiddlewaresadded inv0.0.8

func GoWithMiddlewares(middlewares ...Middleware)GoOpt

GoWithMiddlewares wraps the gived function with the input middlewares.

funcGoWithPIDadded inv0.0.8

func GoWithPID(idstring)GoOpt

GoWithPID is a GoOpt that sets/overrides the process ID of the Routine. A random id is assigned if this option is not used.

funcGoWithTagsadded inv0.0.8

func GoWithTags(tags ...string)GoOpt

GoWithTags is a GoOpt that adds an array of strings as "tags" to the Routine.

funcGoWithTimeoutadded inv0.0.8

func GoWithTimeout(totime.Duration)GoOpt

GoWithTimeout is a GoOpt that creates the Routine's context with the given timeout value

funcGoWithValuesadded inv0.1.1

func GoWithValues(key, val interface{})GoOpt

GoWithValues adds the k/v to the routine's root context. It can be retrieved with routine.Context().Value()

typeMachine

type Machine struct {// contains filtered or unexported fields}

Machine is a zero dependency runtime for managed goroutines. It is inspired by errgroup.Group with extra bells & whistles:

funcNew

func New(ctxcontext.Context, options ...Opt) *Machine

New Creates a new machine instance with the given root context & options

Example
package mainimport ("context""fmt""github.com/autom8ter/machine""time")func main() {ctx, cancel := context.WithCancel(context.Background())defer cancel()m := machine.New(ctx,machine.WithMaxRoutines(10),machine.WithMiddlewares(machine.PanicRecover()),)defer m.Close()channelName := "acme.com"const publisherID = "publisher"// start another goroutine that publishes to the target channel every second for 5 seconds OR the routine's context cancelsm.Go(func(routine machine.Routine) {fmt.Printf("%v | streaming msg to channel = %v stats = %s\n", routine.PID(), channelName, routine.Machine().Stats().String())// publish message to channelroutine.Publish(channelName, "hey there bud!")}, machine.GoWithTags("publish"),machine.GoWithPID(publisherID),machine.GoWithTimeout(5*time.Second),machine.GoWithMiddlewares(// run every second until context cancelsmachine.Cron(time.NewTicker(1*time.Second)),),)// start a goroutine that subscribes to all messages sent to the target channel for 3 seconds OR the routine's context cancelsm.Go(func(routine machine.Routine) {routine.Subscribe(channelName, func(obj interface{}) bool {fmt.Printf("%v | subscription msg received! channel = %v msg = %v stats = %s\n", routine.PID(), channelName, obj, m.Stats().String())return true})}, machine.GoWithTags("subscribe"),machine.GoWithTimeout(3*time.Second),)// start a goroutine that subscribes to the channel until the publishing goroutine exits OR the routine's context cancelsm.Go(func(routine machine.Routine) {routine.Subscribe(channelName, func(obj interface{}) bool {fmt.Printf("%v | subscription msg received! channel = %v msg = %v stats = %s\n", routine.PID(), channelName, obj, m.Stats().String())return m.HasRoutine(publisherID)})}, machine.GoWithTags("subscribeUntil"))m.Wait()}

func (*Machine)Activeadded inv0.0.8

func (p *Machine) Active()int

Active returns current active managed goroutine count

func (*Machine)Cancel

func (p *Machine) Cancel()

Cancel cancels every goroutines context within the machine instance & it's children

func (*Machine)CancelRoutineadded inv0.2.0

func (m *Machine) CancelRoutine(idstring)

CancelRoutine cancels the context of the active routine with the given id if it exists.

func (*Machine)Closeadded inv0.0.5

func (m *Machine) Close()

Close completely closes the machine's pubsub instance & all of it's closer functions. It also closes all of it's child machines(if they exist)

func (*Machine)Go

func (m *Machine) Go(fnFunc, opts ...GoOpt)string

Go calls the given function in a new goroutine and returns the goroutine's unique idit is passed information about the goroutine at runtime via the Routine interface

func (*Machine)HasRoutineadded inv0.2.0

func (m *Machine) HasRoutine(idstring)bool

HasRoutine returns true if the machine has a active routine with the given id

func (*Machine)IDadded inv0.2.0

func (m *Machine) ID()string

ID returns the machine instance's unique id.

func (*Machine)Parentadded inv0.0.8

func (m *Machine) Parent() *Machine

Parent returns the parent Machine instance if it exists and nil if not.

func (*Machine)PubSubadded inv1.0.1

func (m *Machine) PubSub()pubsub.PubSub

PubSub returns the machine's underlying pubsub implementation

func (*Machine)Stats

func (m *Machine) Stats() *Stats

Stats returns Goroutine information from the machine and all of it's children

func (*Machine)Subadded inv0.0.8

func (m *Machine) Sub(opts ...Opt) *Machine

Sub returns a nested Machine instance that is dependent on the parent machine's context.It inherits the parent's pubsub implementation & middlewares if none are providedSub machine's do not inherit their parents max routine setting

func (*Machine)Tagsadded inv0.0.9

func (p *Machine) Tags() []string

Tags returns the machine's tags

func (*Machine)Total

func (p *Machine) Total()int

Total returns total goroutines that have been fully executed by the machine

func (*Machine)Wait

func (m *Machine) Wait()

Wait blocks until total active goroutine count reaches zero for the instance and all of it's children.At least one goroutine must have finished in order for wait to un-block

typeMiddleware

type Middleware func(fnFunc)Func

Middleware is a function that wraps/modifies the behavior of a machine.Func.

funcAfteradded inv0.0.4

func After(afterFunc func(routineRoutine))Middleware

After exectues the afterFunc after the main goroutine exits.

funcBeforeadded inv0.0.4

func Before(beforeFunc func(routineRoutine))Middleware

Before exectues the beforeFunc before the main goroutine is executed.

funcCron

func Cron(ticker *time.Ticker)Middleware

Cron is a middleware that execute the function every time the ticker ticks until the goroutine's context cancels

funcDecideradded inv0.0.4

func Decider(deciderFunc func(routineRoutine)bool)Middleware

Decider exectues the deciderFunc before the main goroutine is executed.If it returns false, the goroutine won't be executed.

funcPanicRecoveradded inv0.0.9

func PanicRecover()Middleware

PanicRecover wraps a goroutine with a middleware the recovers from panics.

funcWhileadded inv0.2.0

func While(deciderFunc func(routineRoutine)bool)Middleware

While is a middleware that will execute the Func while deciderFunc() returns true or the context cancels.

typeOpt

type Opt func(o *option)

Opt is a single option when creating a machine instance with New

funcWithChildrenadded inv0.0.8

func WithChildren(children ...*Machine)Opt

WithChildren sets the machine instances children

funcWithClosersadded inv0.7.2

func WithClosers(closers ...func())Opt

WithClosers makes the Machine instance execute the given closers before it closes

funcWithDeadlineadded inv0.2.0

func WithDeadline(deadlinetime.Time)Opt

WithDeadline is an Opt that creates the Machine's context with the given deadline.

funcWithIDadded inv0.2.0

func WithID(idstring)Opt

WithID sets the machine instances unique id. If one isn't provided, a unique id will be assigned

funcWithMaxRoutines

func WithMaxRoutines(maxint)Opt

WithMaxRoutines throttles goroutines at the input number. It will panic if <= zero.

funcWithMiddlewares

func WithMiddlewares(middlewares ...Middleware)Opt

WithMiddlewares wraps every goroutine function executed by the machine with the given middlewares.Middlewares can be added to individual goroutines with GoWithMiddlewares

funcWithPubSubadded inv0.0.5

func WithPubSub(pubsubpubsub.PubSub)Opt

WithPubSub sets the pubsub implementation for the machine instance. An inmemory implementation is used if none is provided.

funcWithTags

func WithTags(tags ...string)Opt

WithTags sets the machine instances tags

funcWithTimeout

func WithTimeout(totime.Duration)Opt

WithTimeout is an Opt that creates the Machine's context with the given timeout value

funcWithValueadded inv0.2.0

func WithValue(key, val interface{})Opt

WithValue adds the k/v to the Machine's root context. It can be retrieved with context.Value() in all sub routine contexts

typeRoutine

type Routine interface {// Context returns the goroutines unique context that may be used for cancellationContext()context.Context// Cancel cancels the context returned from Context()Cancel()// PID() is the goroutines unique process idPID()string// Tags() are the tags associated with the goroutineTags() []string// Start is when the goroutine startedStart()time.Time// Duration is the duration since the goroutine startedDuration()time.Duration// Publish publishes the object to the given channelPublish(channelstring, obj interface{})error// Subscribe subscribes to a channel and executes the function on every message passed to it. It exits if the goroutines context is cancelled.Subscribe(channelstring, handlerpubsub.Handler, options ...pubsub.SubOpt)error// TraceLog logs a message within the goroutine execution tracer. ref:https://golang.org/pkg/runtime/trace/#example_TraceLog(messagestring)// Machine returns the underlying routine's machine instanceMachine() *Machine}

Routine is an interface representing a goroutine

typeRoutineStats

type RoutineStats struct {PIDstring        `json:"pid"`Starttime.Time     `json:"start"`Durationtime.Duration `json:"duration"`Tags     []string      `json:"tags"`}

RoutineStats holds information about a single goroutine

typeStats

type Stats struct {IDstring         `json:"id"`Tags             []string       `json:"tags"`TotalRoutinesint            `json:"totalRoutines"`ActiveRoutinesint            `json:"activeRoutines"`Routines         []RoutineStats `json:"routines"`TotalChildrenint            `json:"totalChildren"`HasParentbool           `json:"hasParent"`TotalMiddlewaresint            `json:"totalMiddlewares"`Timeouttime.Duration  `json:"timeout"`Deadlinetime.Time      `json:"deadline"`Children         []*Stats       `json:"children"`}

Stats holds information about goroutines

func (Stats)String

func (sStats) String()string

String prints a pretty json string of the stats

Source Files

View all Source files

Directories

PathSynopsis
examplesmodule
croncommand

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f orF : Jump to
y orY : Canonical URL
go.dev uses cookies from Google to deliver and enhance the quality of its services and to analyze traffic.Learn more.

[8]ページ先頭

©2009-2025 Movatter.jp