Go 并发设计模式

并发编程常见问题

  1. 资源共享问题。竞争条件(race condition), 死锁(deadlock),资源争用(contention)

    • 比如并发读写原生的 map 导致 panic

    • 读锁重入导致思索。获取读锁的协程,不要二次重入再去获取读锁

  2. 协程管理

  3. 通道使用

屏障模式(Barrier Mode)

屏障模式(Barrier Mode),用来阻塞goroutine直到聚合所有goroutine返回结果,可以用通道实现。使用场景:

  1. 多个网络请求并发,聚合结果

  2. 粗粒度任务拆分并发执行,聚合结果

packagemainimport("fmt""io/ioutil""net/http""time")// 封装屏障模式响应的结构体(一般就是返回值和错误)typeBarrierResponsestruct{ErrerrorRespstringStatusint}// 执行单个请求,结果放入 channelfuncdoRequest(outchan<-BarrierResponse,urlstring){res:=BarrierResponse{}client:=http.Client{Timeout:time.Duration(3*time.Second)}resp,err:=client.Get(url)ifresp!=nil{res.Status=resp.StatusCode}iferr!=nil{res.Err=errout<-resreturn}b,err:=ioutil.ReadAll(resp.Body)deferresp.Body.Close()iferr!=nil{res.Err=errout<-resreturn}res.Resp=string(b)out<-res//  结果放入通道}// 并发请求并聚合结果funcBarrier(urls...string){n:=len(urls)in:=make(chanBarrierResponse,n)response:=make([]BarrierResponse,n)deferclose(in)for_,url:=rangeurls{godoRequest(in,url)}varhasErrorboolfori:=0;i<n;i++{resp:=<-inifresp.Err!=nil{fmt.Println("Error: ",resp.Err,resp.Status)hasError=true}response[i]=resp}if!hasError{for_,resp:=rangeresponse{fmt.Println(resp.Status)}}}funcmain(){urls:=[]string{"https://www.baidu.com","https://www.weibo.com","https://www.zhihu.com",}Barrier(urls...)}

未来模式(Future Mode)

Future模式(也称为Promise Mode)。使用fire-and-forget 方式,主进程不等子进程执行完就直接返回,然后等到未来执行完的时候再去获取结果。未来模式中主goroutine不用等待子goroutine返回的结果,可以先去做其他事情,等未来需要子goroutine结果的时候再来取。如果子goroutine还没有返回结果,则一直等待。以下简单的代码示例说明了该模式的原理:

c:=make(chanint)// futuregofunc(){c<-f()}()// asyncvalue:=<-c// await

可以针对 future 模式做一个统一的封装,方便后续使用,代码示例如下:

/* https://github.com/golang-collections/go-datastructures/blob/59788d5eb259/futures/futures.goPackage futures is useful for broadcasting an identical message to a multitudeof listeners as opposed to channels which will choose a listener at randomif multiple listeners are listening to the same channel.  The future willalso cache the result so any future interest will be immediately returnedto the consumer.*/packagemainimport("fmt""sync""time")// Completer is a channel that the future expects to receive// a result on.  The future only receives on this channel.typeCompleter<-chaninterface{}// Future represents an object that can be used to perform asynchronous// tasks.  The constructor of the future will complete it, and listeners// will block on getresult until a result is received.  This is different// from a channel in that the future is only completed once, and anyone// listening on the future will get the result, regardless of the number// of listeners.typeFuturestruct{triggeredbool// because item can technically be nil and still be validiteminterface{}errerrorlocksync.Mutexwgsync.WaitGroup}// GetResult will immediately fetch the result if it exists// or wait on the result until it is ready.func(f*Future)GetResult()(interface{},error){f.lock.Lock()iff.triggered{f.lock.Unlock()returnf.item,f.err}f.lock.Unlock()f.wg.Wait()returnf.item,f.err}func(f*Future)setItem(iteminterface{},errerror){f.lock.Lock()f.triggered=truef.item=itemf.err=errf.lock.Unlock()f.wg.Done()}funclistenForResult(f*Future,chCompleter,timeouttime.Duration,wg*sync.WaitGroup){wg.Done()select{caseitem:=<-ch:f.setItem(item,nil)case<-time.After(timeout):f.setItem(nil,fmt.Errorf(`Timeout after %f seconds.`,timeout.Seconds()))}}// New is the constructor to generate a new future.  Pass the completed// item to the toComplete channel and any listeners will get// notified.  If timeout is hit before toComplete is called,// any listeners will get passed an error.funcNew(completerCompleter,timeouttime.Duration)*Future{f:=&Future{}f.wg.Add(1)varwgsync.WaitGroupwg.Add(1)golistenForResult(f,completer,timeout,&wg)wg.Wait()returnf}// 使用示例funcmain(){c:=make(chaninterface{})gofunc(){time.Sleep(time.Second)c<-"hehe"}()f:=New(c,time.Second*3)res,err:=f.GetResult()fmt.Println(res,err)}

管道模式(Pipeline Mode)

也称作流水线模式,一般有以下几个步骤:

  1. 流水线由一道道工序构成,每道工序通过通道把数据传递到下一个工序

  2. 每道工序一般会对应一个函数,函数里有协程和通道,协程一般用于处理数据并把它放入通道中,每道工序会返回这个通道以供下一道工序使用

  3. 最终要有一个组织者(示例中的main()函数)把这些工序串起来,这样就形成了一个完整的流水线,对于数据来说就是数据流

// 以组装计算机为例。三道工序:配件采购(Buy)-> 配件组装(Build) -> 打包成品(Pack)funcBuy(nint)<-chanstring{out:=make(chanstring)gofunc(){deferclose(out)fori:=1;i<=n;i++{out<-fmt.Sprintf("配件%d",i)}}()returnout}funcBuild(in<-chanstring)<-chanstring{out:=make(chanstring)gofunc(){deferclose(out)forc:=rangein{out<-fmt.Sprintf("组装(%s)",c)}}()returnout}funcPack(in<-chanstring)<-chanstring{out:=make(chanstring)gofunc(){deferclose(out)forc:=rangein{out<-fmt.Sprintf("打包(%s)",c)}}()returnout}funcmain(){accessories:=Buy(6)computers:=Build(accessories)packs:=Pack(computers)forp:=rangepacks{fmt.Println(p)}}
packagemainimport"fmt"// 工序 1:数组生成器funcGenerator(maxint)<-chanint{out:=make(chanint,100)gofunc(){fori:=1;i<=max;i++{out<-i}close(out)}()returnout}// 工序 2:求整数的平方funcSquare(in<-chanint)<-chanint{out:=make(chanint,100)gofunc(){forv:=rangein{out<-v*v}close(out)}()returnout}// 工序 3:求和funcSum(in<-chanint)<-chanint{out:=make(chanint,100)gofunc(){varsumintforv:=rangein{sum+=v}out<-sumclose(out)}()returnout}funcmain(){arr:=Generator(5)squ:=Square(arr)sum:=<-Sum(squ)fmt.Println(sum)}

扇出和扇入模式(Fan-out Fan-in)

扇出(Fan-out)是指多个函数可以从同一个通道读取数据,直到该通道关闭。扇入(Fan-in)是指一个函数可以从多个输入中读取数据并继续进行,直到所有输入都关闭。扇出和扇入模式的方法是将输入通道多路复用到一个通道上,当所有输入都关闭时,该通道才关闭。扇出的数据流向是发散传递出去,是输出流;扇入的数据流向是汇聚进来,是输入流。

../_images/%E6%89%87%E5%87%BA%E6%89%87%E5%85%A5.png
// 扇入函数,把多个channel 中的数据发送到一个 channel 中funcMerge(ins...<-chanstring)<-chanstring{varwgsync.WaitGroupout:=make(chanstring)p:=func(in<-chanstring){deferwg.Done()forc:=rangein{out<-c}}wg.Add(len(ins))// 扇入for_,cs:=rangeins{gop(cs)}gofunc(){wg.Wait()close(out)}()returnout}funcmain(){accessories:=Buy(12)computers1:=Build(accessories)computers2:=Build(accessories)computers3:=Build(accessories)computers:=Merge(computers1,computers2,computers3)packs:=Pack(computers)forp:=rangepacks{fmt.Println(p)}}

协程池模式

即便 go 的协程比较轻量,但是当需要操作大量 goroutine 的时候,依然有内存开销和 GC 的压力。可以考虑使用协程池减少频繁创建销毁协程的开销。

packagemainimport("fmt""sync""sync/atomic")// 任务处理器typeTaskHandlerfunc(interface{})// 任务结构体typeTaskstruct{Paraminterface{}HandlerTaskHandler}// 协程池接口typeWorkerPoolImplinterface{AddWorker()SendTask(Task)Release()}// 协程池typeWorkerPoolstruct{wgsync.WaitGroupinChchanTask}func(d*WorkerPool)AddWorker(){d.wg.Add(1)gofunc(){deferd.wg.Done()fortask:=ranged.inCh{task.Handler(task.Param)}}()}func(d*WorkerPool)Release(){close(d.inCh)d.wg.Wait()}func(d*WorkerPool)SendTask(tTask){d.inCh<-t}funcNewWorkerPool(bufferint)WorkerPoolImpl{return&WorkerPool{inCh:make(chanTask,buffer),}}funcmain(){bufferSize:=100varworkerPool=NewWorkerPool(bufferSize)workers:=4fori:=0;i<workers;i++{workerPool.AddWorker()}varsumint32testFunc:=func(iinterface{}){n:=i.(int32)atomic.AddInt32(&sum,n)}vari,nint32n=100for;i<n;i++{task:=Task{i,testFunc,}workerPool.SendTask(task)}workerPool.Release()fmt.Println(sum)// 4950}

发布订阅模式

基于消息通知的并发设计模式。发送者发送消息,订阅者通过订阅感兴趣的主题(Topic) 接收消息。

packagemainimport("fmt""strings""time")import("sync")type(//订阅者通道Subscriberchaninterface{}//主题函数TopicFuncfunc(vinterface{})bool)//发布者结构体typePublisherstruct{// subscribers 是程序的核心,订阅者都会注册在这里,// publisher发布消息的时候也会从这里开始subscribersmap[Subscriber]TopicFuncbufferint// 订阅者的缓冲区长度timeouttime.Duration// publisher 发送消息的超时时间// m 用来保护 subscribers// 当修改 subscribers 的时候(即新加订阅者或删除订阅者)使用写锁// 当向某个订阅者发送消息的时候(即向某个 Subscriber channel 中写入数据),使用读锁msync.RWMutex}//实例化funcNewPublisher(publishTimeouttime.Duration,bufferint)*Publisher{return&Publisher{buffer:buffer,timeout:publishTimeout,subscribers:make(map[Subscriber]TopicFunc),}}//发布者订阅方法func(p*Publisher)Subscribe()Subscriber{returnp.SubscribeTopic(nil)}//发布者订阅主题func(p*Publisher)SubscribeTopic(topicTopicFunc)Subscriber{ch:=make(Subscriber,p.buffer)p.m.Lock()p.subscribers[ch]=topicp.m.Unlock()returnch}//Delete 删除掉某个订阅者func(p*Publisher)Delete(subSubscriber){p.m.Lock()deferp.m.Unlock()delete(p.subscribers,sub)close(sub)}//发布者发布func(p*Publisher)Publish(vinterface{}){p.m.RLock()deferp.m.RUnlock()varwgsync.WaitGroup// 同时向所有订阅者写消息,订阅者利用 topic 过滤消息forsub,topic:=rangep.subscribers{wg.Add(1)gop.sendTopic(sub,topic,v,&wg)}wg.Wait()}//Close 关闭 Publisher,删除所有订阅者func(p*Publisher)Close(){p.m.Lock()deferp.m.Unlock()forsub:=rangep.subscribers{delete(p.subscribers,sub)close(sub)}}//发送主题func(p*Publisher)sendTopic(subSubscriber,topicTopicFunc,vinterface{},wg*sync.WaitGroup){deferwg.Done()iftopic!=nil&&!topic(v){return}select{casesub<-v:case<-time.After(p.timeout):}}funcmain(){//实例化p:=NewPublisher(100*time.Millisecond,10)deferp.Close()// 订阅者订阅所有消息all:=p.Subscribe()//订阅者仅订阅包含 golang 的消息golang:=p.SubscribeTopic(func(vinterface{})bool{ifs,ok:=v.(string);ok{returnstrings.Contains(s,"golang")}returnfalse})//发布消息p.Publish("hello, world!")p.Publish("hello, golang!")//加锁varwgsync.WaitGroupwg.Add(2)//开启goroutinegofunc(){formsg:=rangeall{_,ok:=msg.(string)fmt.Println(ok)}wg.Done()}()//开启goroutinegofunc(){formsg:=rangegolang{v,ok:=msg.(string)fmt.Println(v)fmt.Println(ok)}wg.Done()}()p.Close()wg.Wait()}

参考:《Go 语言高级开发与实战》