- Notifications
You must be signed in to change notification settings - Fork197
Practical concurrency guide in Go, communication by channels, patterns
luk4z7/go-concurrency-guide
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
This guide is built on top of the some examples of the bookGo Concurrency in Go andGo Programming Language
- Race Condition and Data Race
- Memory Access Synchronization
- Deadlocks, Livelocks and Starvation
- Channels
- Patterns
- Scheduler Runtime
- References
Race condition occur when two or more operations must execute in the correct order, but the program has not been written so that this order is guaranteed to be maintained.
Data race is when one concurrent operation attempts to read a variable while at some undetermined time another concurrent operation is attempting to write to the same variable. The main func is the main goroutine.
funcmain() {vardataintgofunc() {data++ }()ifdata==0 {fmt.Printf("the value is %d",data) }}
The sync package contains the concurrency primitives that are most useful for low-level memory access synchronization.Critical section is the place in your code that has access to a shared memory
Mutex stands for “mutual exclusion” and is a way to protect critical sections of your program.
typeCounterstruct {mu sync.Mutexvalueint}func (c*Counter)Increment() {c.mu.Lock()deferc.mu.Unlock()c.value++}
Call to add a group of goroutines
varwg sync.WaitGroupfor_,salutation:=range []string{"hello","greetings","good day"} {wg.Add(1)gofunc(salutationstring) {deferwg.Done()fmt.Println(salutation) }(salutation) }wg.Wait()
More fine-grained memory control, being possible to request read-only lock
producer:=func(wg*sync.WaitGroup,l sync.Locker) {deferwg.Done()fori:=5;i>0;i-- {l.Lock()l.Unlock()time.Sleep(1) }}observer:=func(wg*sync.WaitGroup,l sync.Locker) {deferwg.Done()l.Lock()deferl.Unlock()}test:=func(countint,mutex,rwMutex sync.Locker) time.Duration {varwg sync.WaitGroupwg.Add(count+1)beginTestTime:=time.Now()goproducer(&wg,mutex)fori:=count;i>0;i-- {goobserver(&wg,rwMutex) }wg.Wait()returntime.Since(beginTestTime)}tw:=tabwriter.NewWriter(os.Stdout,0,1,2,' ',0)defertw.Flush()varm sync.RWMutexfmt.Fprintf(tw,"Readers\tRWMutex\tMutex\n")fori:=0;i<20;i++ {count:=int(math.Pow(2,float64(i)))fmt.Fprintf(tw,"%d\t%v\t%v\n",count,test(count,&m,m.RLocker()),test(count,&m,&m), )}
It would be better if there were some kind of way for a goroutine to efficiently sleep until it was signaled to wake and check its condition. This is exactly what the Cond type does for us.
The Cond and the Broadcast is the method that provides for notifying goroutines blocked on Wait call that the condition has been triggered.
typeButtonstruct {Clicked*sync.Cond}funcmain() {button:=Button{Clicked:sync.NewCond(&sync.Mutex{}), }// running on goroutine every function that passed/registered// and wait, not exit until that goroutine is confirmed to be runningsubscribe:=func(c*sync.Cond,paramstring,fnfunc(sstring)) {vargoroutineRunning sync.WaitGroupgoroutineRunning.Add(1)gofunc(pstring) {goroutineRunning.Done()c.L.Lock()// critical sectiondeferc.L.Unlock()fmt.Println("Registered and wait ... ")c.Wait()fn(p) }(param)goroutineRunning.Wait() }varclickRegistered sync.WaitGroupfor_,v:=range []string{"Maximizing window.","Displaying annoying dialog box!","Mouse clicked."} {clickRegistered.Add(1)subscribe(button.Clicked,v,func(sstring) {fmt.Println(s)clickRegistered.Done() }) }button.Clicked.Broadcast()clickRegistered.Wait()}
Ensuring that only one execution will be carried out even among several goroutines
varcountintincrement:=func() {count++}varonce sync.Oncevarincrements sync.WaitGroupincrements.Add(100)fori:=0;i<100;i++ {gofunc() {deferincrements.Done()once.Do(increment) }()}increments.Wait()fmt.Printf("Count is %d\n",count)
Manager the pool of connections, a quantity
package mainimport ("fmt""sync")funcmain() {myPool:=&sync.Pool{New:func()interface{} {fmt.Println("Creating new instance.")returnstruct{}{} }, }// Get call New function defined in pool if there is no instance startedmyPool.Get()instance:=myPool.Get()fmt.Println("instance",instance)// here we put a previously retrieved instance back into the pool,// this increases the number of instances available to onemyPool.Put(instance)// when this call is executed, we will reuse the// previously allocated instance and put it back in the poolmyPool.Get()varnumCalcsCreatedintcalcPool:=&sync.Pool{New:func()interface{} {fmt.Println("new calc pool")numCalcsCreated+=1mem:=make([]byte,1024)return&mem }, }fmt.Println("calcPool.New",calcPool.New())calcPool.Put(calcPool.New())calcPool.Put(calcPool.New())calcPool.Put(calcPool.New())calcPool.Put(calcPool.New())calcPool.Get()constnumWorkers=1024*1024varwg sync.WaitGroupwg.Add(numWorkers)fori:=numWorkers;i>0;i-- {gofunc() {deferwg.Done()mem:=calcPool.Get().(*[]byte)defercalcPool.Put(mem)// Assume something interesting, but quick is being done with// this memory. }() }wg.Wait()fmt.Printf("%d calculators were created.",numCalcsCreated)}
Deadlocks is a program is one in which all concurrent processes are waiting on one another.
package mainimport ("fmt""sync""time")typevaluestruct {mu sync.Mutexvalueint}funcmain() {varwg sync.WaitGroupprintSum:=func(v1,v2*value) {deferwg.Done()v1.mu.Lock()deferv1.mu.Unlock()// deadlocktime.Sleep(2*time.Second)v2.mu.Lock()deferv2.mu.Unlock()fmt.Printf("sum=%v\n",v1.value+v2.value) }vara,bvaluewg.Add(2)goprintSum(&a,&b)goprintSum(&b,&a)wg.Wait()}
package mainfuncmain() {message:=make(chanstring)// A goroutine ( main goroutine ) trying to send message to channelmessage<-"message"// fatal error: all goroutines are asleep - deadlock!}
package mainfuncmain() {message:=make(chanstring)// A goroutine ( main goroutine ) trying to receive message from channel<-message// fatal error: all goroutines are asleep - deadlock!}
Livelocks are programs that are actively performing concurrent operations, but these operations do nothing to move the state of the program forward.
package mainimport ("bytes""fmt""sync""sync/atomic""time")funcmain() {cadence:=sync.NewCond(&sync.Mutex{})gofunc() {forrangetime.Tick(1*time.Millisecond) {cadence.Broadcast()}}()takeStep:=func() {cadence.L.Lock()cadence.Wait()cadence.L.Unlock()}tryDir:=func(dirNamestring,dir*int32,out*bytes.Buffer)bool {fmt.Fprintf(out," %v",dirName)atomic.AddInt32(dir,1)takeStep()ifatomic.LoadInt32(dir)==1 {fmt.Fprint(out," . Success!")returntrue}takeStep()atomic.AddInt32(dir,-1)returnfalse}varleft,rightint32tryLeft:=func(out*bytes.Buffer)bool {returntryDir("left",&left,out)}tryRight:=func(out*bytes.Buffer)bool {returntryDir("right",&right,out)}walk:=func(walking*sync.WaitGroup,namestring) {varout bytes.Bufferdeferfunc() {fmt.Println(out.String())}()deferwalking.Done()fmt.Fprintf(&out,"%v is trying to scoot:",name)fori:=0;i<5;i++ {iftryLeft(&out)||tryRight(&out) {return}}fmt.Fprintf(&out,"\n%v tosses her hands up in exasperation",name)}varpeopleInHallway sync.WaitGrouppeopleInHallway.Add(2)gowalk(&peopleInHallway,"Alice")gowalk(&peopleInHallway,"Barbara")peopleInHallway.Wait()}
Starvation is any situation where a concurrent process cannot get all the resources it needs to perform work.
package mainimport ("fmt""sync""time")funcmain() {fmt.Println("vim-go")varwg sync.WaitGroupvarsharedLock sync.Mutexconstruntime=1*time.SecondgreedyWorker:=func() {deferwg.Done()varcountintforbegin:=time.Now();time.Since(begin)<=runtime; {sharedLock.Lock()time.Sleep(3*time.Nanosecond)sharedLock.Unlock()count++}fmt.Printf("Greedy worker was able to execute %v work loops\n",count)}politeWorker:=func() {deferwg.Done()varcountintforbegin:=time.Now();time.Since(begin)<=runtime; {sharedLock.Lock()time.Sleep(1*time.Nanosecond)sharedLock.Unlock()sharedLock.Lock()time.Sleep(1*time.Nanosecond)sharedLock.Unlock()sharedLock.Lock()time.Sleep(1*time.Nanosecond)sharedLock.Unlock()count++}fmt.Printf("Polite worker was able to execute %v work loops\n",count)}wg.Add(2)gogreedyWorker()gopoliteWorker()wg.Wait()}
Channels are one of the synchronization primitives in Go derived from Hoare’s CSP. While they can be used to synchronize access of the memory, they are best used to communicate information between goroutines, default value for channel:nil.
To declare a channel to read and send
stream:=make(chaninterface{})
To declare unidirectional channel that only can read
stream:=make(<-chaninterface{})
To declare unidirectional channel that only can send
stream:=make(chan<-interface{})
is not often see the instantiates channels unidirectional, only in parameters in functions, is common because Go convert them implicity
varreceiveChan<-chaninterface{}varsendChanchan<-interface{}dataStream:=make(chaninterface{})// Valid statements:receiveChan=dataStreamsendChan=dataStream
To receive
<-stream
to send
stream<-"Hello world"
Ranging over a channelthe for range break the loop if the channel is closed
intStream:=make(chanint)gofunc() {deferclose(intStream)fori:=1;i<=5;i++ {intStream<-i }}()forinteger:=rangeintStream {fmt.Printf("%v ",integer)}
unbuffered channel
A send operation on an unbuffered channel blocks the sending goroutine, until another goroutine performs a corresponding receive on the same channel; at that point, the value is passed, and both goroutines can continue. On the other hand, if a receive operation is attempted beforehand, the receiving goroutine is blocked until another goroutine performs a send on the same channel. Communication over an unbuffered channel makes the sending and receiving goroutines synchronize. Because of this, unbuffered channels are sometimes called synchronous channels. When a value is sent over an unbuffered channel, the reception of the value takes place before the sending goroutine wakes up again. In discussions of concurrency, when we say that x occurs before y, we do not simply mean that x occurs before y in time; we mean that this is guaranteed and that all your previous effects like updates to variables will complete and you can count on them. When x does not occur before y or after y, we say that x is concurrent with y. This is not to say that x and y are necessarily simultaneous; it just means that we can't assume anything about your order
buffered channel
both, read and write of a channel full or empty it will block, on the buffered channel
vardataStreamchaninterface{}dataStream=make(chaninterface{},4)
both, read and send a channel empty cause deadlock
vardataStreamchaninterface{}<-dataStream// This panics with: fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive (nil chan)]: main.main() /tmp/babel-23079IVB/go-src-23079O4q.go:9 +0x3f exit status 2vardataStreamchaninterface{}dataStream<-struct{}{}// This produces: fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send (nil chan)]: main.main() /tmp/babel-23079IVB/go-src-23079dnD.go:9 +0x77 exit status 2and a close channel cause a panic
vardataStreamchaninterface{}close(dataStream)// This produces: panic: close of nil channel
goroutine 1 [running]: panic(0x45b0c0, 0xc42000a160) /usr/local/lib/go/src/runtime/panic.go:500 +0x1a1 main.main() /tmp/babel-23079IVB/go-src-230794uu.go:9 +0x2a exit status 2 Yipes! This is probablyTable with result of channel operations
| Operation | Channel State | Result |
|---|---|---|
| Read | nil | Block |
| _ | Open and Not Empty | Value |
| _ | Open and Empty | Block |
| _ | Close | default value, false |
| _ | Write Only | Compilation Error |
| Write | nil | Block |
| _ | Open and Full | Block |
| _ | Open and Not Full | Write Value |
| _ | Closed | panic |
| _ | Receive Only | Compilation Error |
| Close | nil | panic |
| _ | Open and Not Empty | Closes Channel; reads succeed until channel is drained, then reads produce default value |
| _ | Open and Empty | Closes Channel; reads produces default value |
| _ | Closed | panic |
TIP: Cannot close a receive-only channel
Let's start with channel owners. The goroutine that has a channel must:
- 1 - Instantiate the channel.
- 2 - Perform writes, or pass ownership to another goroutine.
- 3 - Close the channel.
- 4 - Encapsulate the previous three things in this list and expose them via a reader channel.
When assigning channel owners responsibilities, a few things happen:
- 1 - Because we’re the one initializing the channel, we remove the risk of deadlocking by writing to a nil channel.
- 2 - Because we’re the one initializing the channel, we remove the risk of panicing by closing a nil channel.
- 3 - Because we’re the one who decides when the channel gets closed, we remove the risk of panicing by writing to a closed channel.
- 4 - Because we’re the one who decides when the channel gets closed, we remove the risk of panicing by closing a channel more than once.
- 5 - We wield the type checker at compile time to prevent improper writes to our channel.
chanOwner:=func()<-chanint {resultStream:=make(chanint,5)gofunc() {deferclose(resultStream)fori:=0;i<=5;i++ {resultStream<-i } }()returnresultStream }resultStream:=chanOwner()forresult:=rangeresultStream {fmt.Printf("Received: %d\n",result)}fmt.Println("Done receiving!")
The creation of channel owners explicitly tends to have greater control of when that channel should be closed and its operation, avoiding the delegation of these functions to other methods/functions of the system, avoiding reading closed channels or sending data to the same already finalized
select
the select cases do not work the same as the switch, which is sequential, and the execution will not automatically fall if none of the criteria is met.
varc1,c2<-chaninterface{}varc3chan<-interface{}select {case<-c1:// Do somethingcase<-c2:// Do somethingcasec3<-struct{}{}:}
Instead, all channel reads and writes are considered simultaneously to see if any of them are ready: channels filled or closed in the case of reads and channels not at capacity in the case of writes. If none of the channels are ready, the entire select command is blocked. Then, when one of the channels is ready, the operation will proceed and its corresponding instructions will be executed.
start:=time.Now()c:=make(chaninterface{})gofunc() {time.Sleep(5*time.Second)close(c) }()fmt.Println("Blocking on read...")select {case<-c:fmt.Printf("Unblocked %v later.\n",time.Since(start))}
questions when work with select and channels
1 - What happens when multiple channels have something to read?
c1:=make(chaninterface{});close(c1)c2:=make(chaninterface{});close(c2)varc1Count,c2Countintfori:=1000;i>=0;i-- {select {case<-c1:c1Count++case<-c2:c2Count++ }}fmt.Printf("c1Count: %d\nc2Count: %d\n",c1Count,c2Count)
This produces:
c1Count: 505
c2Count: 496
half is read by c1 half by c2 by the Go runtime, cannot exactly predict how much each will be read, and will not be exactly the same for both, it can happen but cannot be predicted, the runtime knows nothing about the intent to own 2 channels receiving information or closed as in our example, then the runtime includes a pseudo-randomGo runtime will perform a pseudo-random uniform selection over the select case statement set. This just means that from your set of cases, each one has the same chance of being selected as all the others.
A good way to do this is to introduce a random variable into your equation - in this case, which channel to select from. By weighing the chance that each channel is used equally, all Go programs that use the select statement will perform well in the average case.
2 - What if there are never any channels that become ready?
varc<-chanintselect {case<-c:case<-time.After(1*time.Second):fmt.Println("Timed out.")}
To solve the problem of the channels being blocked, the default can be used to perform some other operation, or in the first examplea time out with time.After
3 - What if we want to do something but no channels are currently ready? usedefault
start:=time.Now()varc1,c2<-chanintselect {case<-c1:case<-c2:default:fmt.Printf("In default after %v\n\n",time.Since(start))}
exit a select block
done:=make(chaninterface{})gofunc() {time.Sleep(5*time.Second)close(done)}()workCounter:=0loop:for {select {case<-done:break loopdefault: }// Simulate workworkCounter++time.Sleep(1*time.Second)}fmt.Printf("Achieved %v cycles of work before signalled to stop.\n",workCounter)
block forever
select {}GOMAXPROCS
Prior to Go 1.5, GOMAXPROCS was always set to one, and usually you’d find this snippet in most Go programs:
runtime.GOMAXPROCS(runtime.NumCPU())
This function controls the number of operating system threads that will host so-called “Work Queues.”documentation
Use a sync.Mutex or a channel?
As a general guide, though:
| Channel | Mutex |
|---|---|
| passing ownership of data, distributing units of work, communicating async results | caches, state |
"Do not communicate by sharing memory; instead, share memory by communicating. (copies)"
Confinement is the simple yet powerful idea of ensuring information is only ever available from one concurrent process.There are two kinds of confinement possible: ad hoc and lexical.
Ad hoc confinement is when you achieve confinement through a convention
data:=make([]int,4)loopData:=func(handleDatachan<-int) {deferclose(handleData)fori:=rangedata {handleData<-data[i] }}handleData:=make(chanint)goloopData(handleData)fornum:=rangehandleData {fmt.Println(num)}
Lexical confinement involves using lexical scope to expose only the correct data and concurrency primitives for multiple concurrent processes to use.
chanOwner:=func()<-chanint {results:=make(chanint,5)gofunc() {deferclose(results)fori:=0;i<=5;i++ {results<-i } }()returnresults}consumer:=func(results<-chanint) {forresult:=rangeresults {fmt.Printf("Received: %d\n",result) }fmt.Println("Done receiving!")}results:=chanOwner()consumer(results)
package mainfuncmain() {doWork:=func(done<-chaninterface{},strings<-chanstring, )<-chaninterface{} {terminated:=make(chaninterface{})gofunc() {deferfmt.Println("doWork exited.")deferclose(terminated)for {select {cases:=<-strings:// Do something interestingfmt.Println(s)case<-done:return } } }()returnterminated }done:=make(chaninterface{})terminated:=doWork(done,nil)gofunc() {// Cancel the operation after 1 second.time.Sleep(1*time.Second)fmt.Println("Canceling doWork goroutine...")close(done) }()<-terminatedfmt.Println("Done.")}
At times you may find yourself wanting to combine one or more done channels into a single done channel that closes if any of its component channels close.
package mainimport ("fmt""time")funcmain() {varorfunc(channels...<-chaninterface{})<-chaninterface{}or=func(channels...<-chaninterface{})<-chaninterface{} {switchlen(channels) {case0:returnnilcase1:returnchannels[0] }orDone:=make(chaninterface{})gofunc() {deferclose(orDone)switchlen(channels) {case2:select {case<-channels[0]:case<-channels[1]: }default:select {case<-channels[0]:case<-channels[1]:case<-channels[2]:case<-or(append(channels[3:],orDone)...): } } }()returnorDone }sig:=func(after time.Duration)<-chaninterface{} {c:=make(chaninterface{})gofunc() {deferclose(c)time.Sleep(after) }()returnc }start:=time.Now()<-or(sig(2*time.Hour),sig(5*time.Minute),sig(1*time.Second),sig(1*time.Hour),sig(1*time.Minute), )fmt.Printf("done after %v",time.Since(start))}
package mainimport ("fmt""net/http")typeResultstruct {ErrorerrorResponse*http.Response}funcmain() {checkStatus:=func(done<-chaninterface{},urls...string)<-chanResult {results:=make(chanResult)gofunc() {deferclose(results)for_,url:=rangeurls {varresultResultresp,err:=http.Get(url)result=Result{Error:err,Response:resp}select {case<-done:returncaseresults<-result: } } }()returnresults }done:=make(chaninterface{})deferclose(done)urls:= []string{"https://www.google.com","https://badhost"}forresult:=rangecheckStatus(done,urls...) {ifresult.Error!=nil {fmt.Printf("error: %v",result.Error)continue }fmt.Printf("Response: %v\n",result.Response.Status) }}
A pipeline is just another tool you can use to form an abstraction in your system.
multiply:=func(values []int,multiplierint) []int {multipliedValues:=make([]int,len(values))fori,v:=rangevalues {multipliedValues[i]=v*multiplier }returnmultipliedValues}add:=func(values []int,additiveint) []int {addedValues:=make([]int,len(values))fori,v:=rangevalues {addedValues[i]=v+additive }returnaddedValues}ints:= []int{1,2,3,4}for_,v:=rangeadd(multiply(ints,2),1) {fmt.Println(v)}
Fan-out is a term to describe the process of starting multiple goroutines to handle pipeline input, and fan-in is a term to describe the process of combining multiple outputs into one channel.
package mainimport ("fmt")typedataint// distribute work items to multiple uniform actors// no data shall be processed twice!// received wch// response resfuncworker(wch<-chandata,reschan<-data) {for {w,ok:=<-wchif!ok {return// return when is closed }w*=2res<-w }}funcmain() {work:= []data{1,2,3,4,5}constnumWorkers=3wch:=make(chandata,len(work))res:=make(chandata,len(work))// fan-out, one input channel for all actorsfori:=0;i<numWorkers;i++ {goworker(wch,res) }// fan-out, one input channel for all actorsfor_,w:=rangework {fmt.Println("send to wch : ",w)wch<-w }close(wch)// fan-in, one result channelforrangework {w:=<-resfmt.Println("receive from res : ",w) }}
Or done is a way to encapsulate verbosity that can be achieved through for/select breaks to check when a channel has ended, and also avoiding goroutine leakage, the code below could be replaced by a closure that encapsulates that verbosity
forval:=rangemyChan {// Do something with val}loop:for {select {case<-done:break loopcasemaybeVal,ok:=<-myChan:ifok==false {return// or maybe break from for }// Do something with val }}
can be created an isolation, a function/method, closure, creating a single goroutine
orDone:=func(done,c<-chaninterface{})<-chaninterface{} {valStream:=make(chaninterface{})gofunc() {deferclose(valStream)for {select {case<-done:returncasev,ok:=<-c:ifok==false {return }select {casevalStream<-v:case<-done: } } } }()returnvalStream}forval:=rangeorDone(done,myChan) {// Do something with val}
Pass the it a channel to read from, and it will return two separate channels that will get the same value:
tee:=func(done<-chaninterface{},in<-chaninterface{}) (_,_<-chaninterface{}) {out1:=make(chaninterface{})out2:=make(chaninterface{})gofunc() {deferclose(out1)deferclose(out2)forval:=rangeorDone(done,in) {varout1,out2=out1,out2fori:=0;i<2;i++ {select {case<-done:caseout1<-val:out1=nilcaseout2<-val:out2=nil } } } }()returnout1,out2}
With this patterns is possible to create a function that destruct a channel of channels into a single channel
bridge:=func(done<-chaninterface{},chanStream<-chan<-chaninterface{})<-chaninterface{} {valStream:=make(chaninterface{})gofunc() {deferclose(valStream)for {varstream<-chaninterface{}select {casemaybeStream,ok:=<-chanStream:ifok==false {return }stream=maybeStreamcase<-done:return }forval:=rangeorDone(done,stream) {select {casevalStream<-val:case<-done: } } } }()returnvalStream}genVals:=func()<-chan<-chaninterface{} {chanStream:=make(chan (<-chaninterface{}))gofunc() {deferclose(chanStream)fori:=0;i<10;i++ {stream:=make(chaninterface{},1)stream<-iclose(stream)chanStream<-stream } }()returnchanStream}done:=make(chaninterface{})deferclose(done)forv:=rangebridge(done,genVals()) {fmt.Printf("%v ",v)}
buffered channel is a type of queue, Adding queuing prematurely can hide synchronization issues such as deadlocks, we can use the queue to makea limit to processing, in this process when thelimit <- struct{}{} is full the queue is wait to be released<-limit, if we remove them the 50 goroutines are created at the same time
package mainimport ("fmt""runtime""sync""time")funcmain() {varwg sync.WaitGrouplimit:=make(chaninterface{},runtime.NumCPU())fmt.Printf("Started, Limit %d\n",cap(limit))workers:=func(lchan<-interface{},wg*sync.WaitGroup) {fori:=0;i<=50;i++ {i:=ilimit<-struct{}{}wg.Add(1)gofunc(xint,w*sync.WaitGroup) {deferw.Done()time.Sleep(1*time.Second)fmt.Printf("Process %d\n",i)<-limit }(i,wg) } }workers(limit,&wg)wg.Wait()fmt.Println("Finished")}
in concurrent programs it’s often necessary to preempt operations because of timeouts, cancellation, or failure of another portion of the system. We’ve looked at the idiom of creating a done channel, which flows through your program and cancels all blocking concurrent operations. This works well, but it’s also somewhat limited.
It would be useful if we could communicate extra information alongside the simple notification to cancel: why the cancellation was occuring, or whether or not our function has a deadline by which it needs to complete.
see below an example to pass value into context, the context package serves two primary purposes:
- To provide an API for canceling branches of your call-graph.
- To provide a data-bag for transporting request-scoped data through your call-graph
package mainimport ("context""fmt")funcmain() {ProcessRequest("jane","abc123")}funcProcessRequest(userID,authTokenstring) {ctx:=context.WithValue(context.Background(),"userID",userID)ctx=context.WithValue(ctx,"authToken",authToken)HandleResponse(ctx)}funcHandleResponse(ctx context.Context) {fmt.Printf("handling response for %v (%v)",ctx.Value("userID"),ctx.Value("authToken"), )}
another example withTimeout, cancellation in a function has three aspects:
- A goroutine’s parent may want to cancel it.
- A goroutine may want to cancel its children.
- Any blocking operations within a goroutine need to be preemptable so that it may be canceled.
The context package helps manage all three of these.
package mainimport ("context""fmt""sync""time")funcmain() {varwg sync.WaitGroupctx,cancel:=context.WithCancel(context.Background())defercancel()wg.Add(1)gofunc() {deferwg.Done()iferr:=printGreeting(ctx);err!=nil {fmt.Printf("cannot print greeting: %v\n",err)cancel() } }()wg.Add(1)gofunc() {deferwg.Done()iferr:=printFarewell(ctx);err!=nil {fmt.Printf("cannot print farewell: %v\n",err) } }()wg.Wait()}funcprintGreeting(ctx context.Context)error {greeting,err:=genGreeting(ctx)iferr!=nil {returnerr }fmt.Printf("%s world!\n",greeting)returnnil}funcprintFarewell(ctx context.Context)error {farewell,err:=genFarewell(ctx)iferr!=nil {returnerr }fmt.Printf("%s world!\n",farewell)returnnil}funcgenGreeting(ctx context.Context) (string,error) {ctx,cancel:=context.WithTimeout(ctx,1*time.Second)defercancel()switchlocale,err:=locale(ctx); {caseerr!=nil:return"",errcaselocale=="EN/US":return"hello",nil }return"",fmt.Errorf("unsupported locale")}funcgenFarewell(ctx context.Context) (string,error) {switchlocale,err:=locale(ctx); {caseerr!=nil:return"",errcaselocale=="EN/US":return"goodbye",nil }return"",fmt.Errorf("unsupported locale")}funclocale(ctx context.Context) (string,error) {ifdeadline,ok:=ctx.Deadline();ok {ifdeadline.Sub(time.Now().Add(1*time.Minute))<=0 {return"",context.DeadlineExceeded } }select {case<-ctx.Done():return"",ctx.Err()case<-time.After(1*time.Minute): }return"EN/US",nil}
Heartbeats are a way for concurrent processes to signal life to outside parties. They get their name from human anatomy wherein a heartbeat signifies life to an observer. Heartbeats have been around since before Go, and remain useful within it.
There are two different types of heartbeats:
- Heartbeats that occur on a time interval.
- Heartbeats that occur at the beginning of a unit of work
You should only replicate requests like this to handlers that have different runtime conditions: different processes, machines, paths to a data store, or access to different data stores. While this can be expensive to set up and maintain, if speed is your goal this is a valuable technique. Also, this naturally provides fault tolerance and scalability.
The only caveat to this approach is that all handlers need to have equal opportunity to fulfill the request. In other words, you won't have a chance to get the fastest time from a handler that can't fulfill the request. As I mentioned, whatever resources the handlers are using to do their work also need to be replicated. A different symptom of the same problem is uniformity. If your handles are very similar, the chances that either one is an outlier are less.
package mainimport ("fmt""math/rand""sync""time")funcmain() {doWork:=func(done<-chaninterface{},idint,wg*sync.WaitGroup,resultchan<-int) {started:=time.Now()deferwg.Done()// Simulate random loadsimulatedLoadTime:=time.Duration(1+rand.Intn(5))*time.Secondselect {case<-done:case<-time.After(simulatedLoadTime): }select {case<-done:caseresult<-id: }took:=time.Since(started)// Display how long handlers would have takeniftook<simulatedLoadTime {took=simulatedLoadTime }fmt.Printf("%v took %v\n",id,took) }done:=make(chaninterface{})result:=make(chanint)varwg sync.WaitGroupwg.Add(10)// Here we start 10 handlers to handle our requests.fori:=0;i<10;i++ {godoWork(done,i,&wg,result) }// This line grabs the first returned value from the group of handlers.firstReturned:=<-result// Here we cancel all the remaining handlers.// This ensures they don’t continue to do unnecessary work.close(done)wg.Wait()fmt.Printf("Received an answer from #%v\n",firstReturned)}
Go will handle multiplexing goroutines onto OS threads for you.
The algorithm it uses to do this is known as a workstealing strategy.
fair scheduling. In an effort to ensure all processors were equally utilized, we could evenly distribute the load between all available processors. Imagine there are n processors and x tasks to perform. In the fair scheduling strategy, each processor would get x/n tasks:
Go models concurrency using a fork-join model.
As a refresher, remember that Go follows a fork-join model for concurrency. Forks are when goroutines are started, and join points are when two or more goroutines are synchronized through channels or types in the sync package. The work stealing algorithm follows a few basic rules. Given a thread of execution:
At a fork point, add tasks to the tail of the deque associated with the thread.
Go scheduler’s job is to distribute runnable goroutines over multiple worker OS threads that runs on one or more processors. In multi-threaded computation, two paradigms have emerged in scheduling: work sharing and work stealing.
- Work-sharing: When a processor generates new threads, it attempts to migrate some of them to the other processors with the hopes of them being utilized by the idle/underutilized processors.
- Work-stealing: An underutilized processor actively looks for other processor’s threads and “steal” some.
The migration of threads occurs less frequently with work stealing than with work sharing. When all processors have work to run, no threads are being migrated. And as soon as there is an idle processor, migration is considered.
Go has a work-stealing scheduler since 1.1, contributed by Dmitry Vyukov. This article will go in depth explaining what work-stealing schedulers are and how Go implements one.
Scheduling basics
Go has an M:N scheduler that can also utilize multiple processors. At any time, M goroutines need to be scheduled on N OS threads that runs on at most GOMAXPROCS numbers of processors. Go scheduler uses the following terminology for goroutines, threads and processors:
- G: goroutine
- M: OS thread (machine)
- P: processor
There is a P-specific local and a global goroutine queue. Each M should be assigned to a P. Ps may have no Ms if they are blocked or in a system call. At any time, there are at most GOMAXPROCS number of P. At any time, only one M can run per P. More Ms can be created by the scheduler if required.runtime doc
Why have a scheduler?
goroutines are user-space threadsconceptually similar to kernel threads managed by the OS, but managed entirely by the Go runtime
lighter-weight and cheaper than kernel threads.
- smaller memory footprint:
- initial goroutine stack = 2KB; default thread stack = 8KB
- state tracking overhead
- faster creation, destruction, context switchesL
- goroutines switches = ~tens of ns; thread switches = ~ a us.
Go schedule put her goroutines on kernel threads which run on the CPU
About
Practical concurrency guide in Go, communication by channels, patterns
Topics
Resources
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Releases
Packages0
Uh oh!
There was an error while loading.Please reload this page.
Contributors5
Uh oh!
There was an error while loading.Please reload this page.