Movatterモバイル変換


[0]ホーム

URL:


Go Concurrency: GoRoutines, Worker Pools and Throttling Made Simple

GoRoutines, WaitGroups, Workers.

Go’s concurrency is simple and very powerful, and here’s the main use cases and how to use them.

Setting the field

For the sake of these examples, here’s the structs we’ll be using:

type Job struct {    Id int}type JobResult struct {    Output string}

Launch and forget

By far the easiest form of concurrency, the launch-and-forget is very easy in Go.

You must simply prepend the function you want to run asynchronously withgo, and that’s all.

i.e.

go doSomething()

Alternatively, you can wrap a code of block withgo func() { and}():

go func() {    // do multiple things}()

Keep in mind that if your program finishes before your goroutine, your goroutine may be cut short:

func main() {    go func() {        time.Sleep(time.Second * 3)        println("World") // <------ this will never execute because the program will have already exited!    }()    println("Hello")}

If you want to prevent that from happening, you might have to resort to a WaitGroup (I’ll talk about this later)or a channel to tell the program to wait until everything is done:

func main() {    finished := make(chan bool)    go func() {        time.Sleep(time.Second * 3)        println("World")        finished <- true    }()    println("Hello")    <- finished}

If you’re curious about whatfinished := make(chan bool),finished <- true and<- finished does:

finished := make(chan bool) creates a channel that expects booleans as data type,finished <- true sends the datatrue to the channelfinished and<- finished waits for data to be sent to the channelfinished.

Note that the result does not change whether you sendtrue orfalse. In fact, you could’ve even used string insteadof bool. The crucial part is that the program will wait for the channel to receive something before moving on.

Technically,wg.Wait() is just like having onefinished channel for each job.

In fact, channels are very powerful. For instance, the following:

func main() {    worldChannel := make(chan string)    dearChannel := make(chan string)    go func() {        time.Sleep(time.Second * 3)        worldChannel <- "world"    }()    go func() {        time.Sleep(time.Second * 2)        dearChannel <- "dear"    }()    println("Hello", <- dearChannel, <- worldChannel)}

would outputHello dear world in ~3 seconds, because the highest denominator here is the first goroutine thattakes 3 seconds to execute, as opposed to the second goroutine, which, by the time the first goroutine finishes,will already have sent its data through its own channel.

Anyways, I digress.

Launch and return

In order to return the result of multiple jobs, we need to make sure to wait that all jobs are done before returning.

To do that, we’ll usesync.WaitGroup, which will allow us to add the number of jobs that needs to be completed tothe WaitGroup and keep track of when all jobs are completed.

Furthermore, because unlike the previous examples, we need to keep track of the results, this means that we’d be risking making concurrent changes to a slice, which would cause problems. To prevent that, we’ll usesync.RWMutex, which will allow us to prevent concurrent changes. Note that there any many alternatives to this, but this is the easiest method.

Because we want to return the result of each job, our function signature will look something like this:

func launchAndReturn(jobs []Job) []JobResult {

We’ll instantiate our slice of results, RWMutex as well as WaitGroup:

    var (        results      []JobResult        resultsMutex = sync.RWMutex{}        wg           sync.WaitGroup    )

Add the number of jobs in the WaitGroup

    wg.Add(len(jobs))

And now that we’ve set everything up, we can iterate over each job, and start a new goroutine for each entry.

    for _, job := range jobs {        go func(job Job) {

To make sure that the job is marked as done at the end of each goroutines, we’lldefer wg.Done() right now.This is important, because ifwg.Done() is not called as often aslen(jobs) from the earlierwg.Add(len(jobs)),the WaitGroup will hang forever later on (we’ll get to this).

            defer wg.Done()

Finally, we can execute the job

            jobResult := doSomething(job)

Now that we’ve executed the job asynchronously, we have to store the result.First, we need to lock the RWMutex, second, append the result, and finally, unlock the mutex

            resultsMutex.Lock()            results = append(results, jobResult)            resultsMutex.Unlock()

And now we can close our goroutine

        }(job)    }

The last step is to wait for the WaitGroup to complete by usingwg.Wait(), and then we can return the results.

    wg.Wait()    return results}

That’s all! This will concurrently execute each jobs, and wait for all of them to complete. Here’s the final result:

package mainimport (    "fmt"    "sync")type Job struct {    Id int}type JobResult struct {    Output string}func main() {    var jobs []Job    for i := 0; i < 5; i++ {        jobs = append(jobs, Job{Id: i})    }    jobResults := launchAndReturn(jobs)    fmt.Println(jobResults)}func launchAndReturn(jobs []Job) []JobResult {    var (        results      []JobResult        resultsMutex = sync.RWMutex{}        wg           sync.WaitGroup    )    wg.Add(len(jobs))    for _, job := range jobs {        go func(job Job) {            defer wg.Done()            jobResult := doSomething(job)            resultsMutex.Lock()            results = append(results, jobResult)            resultsMutex.Unlock()        }(job)    }    wg.Wait()    return results}func doSomething(job Job) JobResult {    fmt.Printf("Running job #%d\n", job.Id)    return JobResult{Output: "Success"}}

Output:

Running job #0Running job #4Running job #2Running job #3Running job #1[{Success} {Success} {Success} {Success} {Success}]

Launch, throttle and forget

There’s two kinds of throttling: maximum number of concurrent workers, and number of jobs execute per unit of time.

Because another example on throttling concurrency will be introduced in the next section,I’ll take care of the former case here:maximum number of concurrent workers.

There are alot of ways to throttle, so I’ll be mentioning my favorite, which also happens to be the easiest and thesimplest. It involves having each workers on a different goroutine, and having each workers listen to the same channelfor new jobs.

As I just mentioned the worker (which is just a function) will be launched on a different goroutine,and will listen to the channel for new jobs available:

func worker(id int, wg *sync.WaitGroup, jobChannel <-chan Job) {    defer wg.Done()    for job := range jobChannel {        doSomething(id, job)    }}

You might be wondering, “Why is there a for loop on a channel when it’s not even a slice?”

Iterating over a channel means you’re listening to the channel, and this listening will continue untilthe channel is closed. Note that multiple workers will be listening to the same channel, but not to worrysince Go channels ismade for theproducer-consumer problem.

We’ll start by creating some fake jobs.

func main() {    var jobs []Job    for i := 0; i < 100; i++ {        jobs = append(jobs, Job{Id: i})    }

Now, we’ll create our WaitGroup and we’ll create a constant with the desired number of workers

    const NumberOfWorkers = 10    var wg sync.WaitGroup

Because each worker will run on a different goroutine, we also need to set our WaitGroup to wait for our number ofworkers, as opposed to the previous use case which required us to pass the total number of jobs instead.

    wg.Add(NumberOfWorkers)    jobChannel := make(chan Job)

Now that our channel is ready, we can start the workers.For now, they’ll just sit here and wait, since we haven’t sent the jobs to the channel yet.

    // Start the workers    for i := 0; i < NumberOfWorkers; i++ {        go worker(i, &wg, jobChannel)    }

We send the jobs to the channel here, which is being read on by our workers.Note that a channel is like a pointer to a queue, meaning that two workers who are reading from the same queuewon’t get the same jobs.

    // Send jobs to worker    for _, job := range jobs {        jobChannel <- job    }

Since we already sent each jobs to the channel, we can close the channel and wait for the workers to finish.

    close(jobChannel)    wg.Wait()}

Final result:

package mainimport (    "fmt"    "sync"    "time")type Job struct {    Id int}type JobResult struct {    Output string}const NumberOfWorkers = 10func main() {    start := time.Now()    // Create fake jobs for testing purposes    var jobs []Job    for i := 0; i < 100; i++ {        jobs = append(jobs, Job{Id: i})    }    var (        wg         sync.WaitGroup        jobChannel = make(chan Job)    )    wg.Add(NumberOfWorkers)    // Start the workers    for i := 0; i < NumberOfWorkers; i++ {        go worker(i, &wg, jobChannel)    }    // Send jobs to worker    for _, job := range jobs {        jobChannel <- job    }    close(jobChannel)    wg.Wait()    fmt.Printf("Took %s\n", time.Since(start))}func worker(id int, wg *sync.WaitGroup, jobChannel <-chan Job) {    defer wg.Done()    for job := range jobChannel {        doSomething(id, job)    }}func doSomething(workerId int, job Job) JobResult {    fmt.Printf("Worker #%d Running job #%d\n", workerId, job.Id)    time.Sleep(1 * time.Second)    return JobResult{Output: "Success"}}

Launch, throttle and return

This use case is a mix of both previous sections, but you won’t need a RWMutex, because we’ll be reading the resultssynchronously.

Unlike before, our worker function now has a new parameter calledjobResultChannel where we’ll send our results.

func worker(id int, wg *sync.WaitGroup, jobChannel <-chan Job, jobResultChannel chan JobResult) {    defer wg.Done()    for job := range jobChannel {        resultChannel <- doSomething(id, job)    }}

Nothing new here.

func main() {    var jobs []Job    for i := 0; i < 100; i++ {        jobs = append(jobs, Job{Id: i})    }    const NumberOfWorkers = 10

Unlike previously, we’ll create another channel for the output as well as a slice to store our results.

    var (        wg               sync.WaitGroup        jobChannel       = make(chan Job)        jobResultChannel = make(chan JobResult, len(jobs))        jobResults       []JobResult    )    wg.Add(NumberOfWorkers)

Start the workers and send the jobs to the channel which is being read by the workers.

    // Start the workers    for i := 0; i < NumberOfWorkers; i++ {        go worker(i, &wg, jobChannel, jobResultChannel)    }    // Send jobs to worker    for _, job := range jobs {        jobChannel <- job    }

Now that we’ve already sent all jobs to the channel, we can close it. Likewise, since the workers also took care ofsending the result to thejobResultChannel, we can close it too.

    close(jobChannel)    wg.Wait()    close(jobResultChannel)

Read all JobResults from the channel (this is synchronous), and then do whatever you want with the results.

    // Receive job results from workers    for result := range jobResultChannel {        jobResults = append(jobResults, result)    }    fmt.Println(jobResults)}

Final result:

package mainimport (    "fmt"    "sync"    "time")type Job struct {    Id int}type JobResult struct {    Output string}const NumberOfWorkers = 10func main() {    start := time.Now()    // Create fake jobs for testing purposes    var jobs []Job    for i := 0; i < 100; i++ {        jobs = append(jobs, Job{Id: i})    }    var wg sync.WaitGroup    wg.Add(NumberOfWorkers)    jobChannel := make(chan Job)    jobResultChannel := make(chan JobResult, len(jobs))    // Start the workers    for i := 0; i < NumberOfWorkers; i++ {        go worker(i, &wg, jobChannel, jobResultChannel)    }    // Send jobs to worker    for _, job := range jobs {        jobChannel <- job    }    close(jobChannel)    wg.Wait()    close(jobResultChannel)    var jobResults []JobResult    // Receive job results from workers    for result := range jobResultChannel {        jobResults = append(jobResults, result)    }    fmt.Println(jobResults)    fmt.Printf("Took %s", time.Since(start))}func worker(id int, wg *sync.WaitGroup, jobChannel <-chan Job, resultChannel chan JobResult) {    defer wg.Done()    for job := range jobChannel {        resultChannel <- doSomething(id, job)    }}func doSomething(workerId int, job Job) JobResult {    fmt.Printf("Worker #%d Running job #%d\n", workerId, job.Id)    time.Sleep(500 * time.Millisecond)    return JobResult{Output: "Success"}}

Time-based throttling

Unlike the two previous examples, which both throttle based on total number of workers, this one throttles both basedon a maximum number of workers as well as a maximum number of executions per second.

By now, you get the hang of it, so we’ll go directly to the final result.

The only difference is, for testing purposes, we’ll use a random execution time for thedoSomething function.

const (    NumberOfWorkers                    = 10    MaximumNumberOfExecutionsPerSecond = 50)func main() {    start := time.Now()    // Create fake jobs for testing purposes    var jobs []Job    for i := 0; i < 500; i++ {        jobs = append(jobs, Job{Id: i})    }    var (        wg         sync.WaitGroup        jobChannel = make(chan Job)    )    wg.Add(NumberOfWorkers)    // Start the workers    for i := 0; i < NumberOfWorkers; i++ {        go worker(i, &wg, jobChannel)    }    // Send jobs to worker    for _, job := range jobs {        jobChannel <- job    }    close(jobChannel)    wg.Wait()    fmt.Printf("Took %s\n", time.Since(start))}func worker(id int, wg *sync.WaitGroup, jobChannel <-chan Job) {    defer wg.Done()    lastExecutionTime := time.Now()    minimumTimeBetweenEachExecution := time.Duration(math.Ceil(1e9/(MaximumNumberOfExecutionsPerSecond/float64(NumberOfWorkers))))    for job := range jobChannel {        timeUntilNextExecution := - (time.Since(lastExecutionTime) - minimumTimeBetweenEachExecution)        if timeUntilNextExecution > 0 {            fmt.Printf("Worker #%d backing off for %s\n", id, timeUntilNextExecution.String())            time.Sleep(timeUntilNextExecution)        } else {            fmt.Printf("Worker #%d not backing off\n", id)        }        lastExecutionTime = time.Now()        doSomething(id, job)    }}func doSomething(workerId int, job Job) JobResult {    simulatedExecutionTime := rand.Intn(1000)    fmt.Printf("Worker #%d Running job #%d (simulatedExecutionTime=%dms)\n", workerId, job.Id, simulatedExecutionTime)    time.Sleep(time.Duration(simulatedExecutionTime) * time.Millisecond)    return JobResult{Output: "Success"}}

What this does is divide the total number of workers by the maximum amount of calls per second, and throttles eachworker to what the result of that equation is. It’s not the most accurate way of doing it, since some workers mayget several jobs with long execution time in a row while other workers may get several jobs with short executiontime in a row, but it’s the easiest way of doing it and it’s generally good enough.

In other words, if you haveNumberOfWorkers set to2 andMaximumNumberOfExecutionsPerSecond set to10,each workers would be throttled to callingdoSomething() at most once every 200ms.

Extra

Beside the patterns shown above, I have one more Ireally like and occasionally use to greatly improve the performanceof some applications.

Using channel to perform bulk operations

Go makes it really easy to create a goroutine, and it’s not uncommon to pick up the habit of looking toward even moreconcurrency the moment you want to improve performance, but the truth is, a single worker performing bulk operationsmight be much better than several workers each performing one operation each.

const MaxBulkSize = 50func main() {    jobChannel := make(chan Job, 1000)    go worker(jobChannel)    for i := 0; i < 50000; i++ {        jobChannel <- Job{Id: i}    }    // wait for channel to be empty    for len(jobChannel) != 0 {        time.Sleep(100*time.Millisecond)    }}func worker(jobChannel <-chan Job) {    var jobs []Job    for {        if len(jobChannel) > 0 && len(jobs) < MaxBulkSize {            jobs = append(jobs, <-jobChannel)            continue        }        if (len(jobChannel) == 0 && len(jobs) > 0) || len(jobs) == MaxBulkSize {            fmt.Printf("processing bulk of %d jobs\n", len(jobs))            // clear the list of jobs that were just processed            jobs = jobs[:0]        }        // No jobs in the channel? back off.        if len(jobChannel) == 0 {            fmt.Println("Backing off")            time.Sleep(500 * time.Millisecond)        }    }}

What’s even more interesting is that using the job channel provides a form of rate limiting out of the box.

In fact, if this was an operation you’d do based on HTTP requests you have no control over the speed as opposed to alist of jobs which you have control over, all you’d have to do is check if the length of the channel is the same as thecapacity of the channel, and if the length has reached the capacity, return429: Too many requests.

e.g.:

if len(jobChannel) == cap(jobChannel) {    // reached job channel capacity, return 429 here    return}jobChannel <- Job{Id: i}

[8]ページ先頭

©2009-2025 Movatter.jp