« Back to Index

[Limit Concurrency]

View original Gist on GitHub

Tags: #go #golang #concurrency #semaphore

Limit Concurrency - An Explanation.md

If you find that you’re running out of system resources whilst trying to open lots of files, then you should consider limiting your concurrency.

One way would be to define a pool that controls the concurrency so that you don’t try to open too many files at once and exhaust your system resources.

There are two approaches you can take to implement that:

  1. Channels
  2. Semaphore (via channels)
  3. Semaphore (via golang.org/x/sync/semaphore)

Limit Concurrency - Channels.go

package main

import (
	"fmt"
	"log"
	"sync"
	"time"
)

const iterations = 5

func main() {
	var wg sync.WaitGroup

	startTime := time.Now()
  
    // we use channels to coordinate the tasks
	tasks := make(chan time.Duration, 10)

	// we avoid creating 'unbounded' number of goroutines (10 in this case)
    // we need to decide on the size of this 'worker pool' and so we'll pick 10 to match up with the channel
    // but depending on how many tasks there are you'll need to be more selective on pool size
	for i := 0; i < 10; i++ {
		wg.Add(1)

		go func(i int) {
			defer wg.Done()

			// we can do 10 things at a time due to the buffered task channel
			for d := range tasks {
				fmt.Printf("goroutine %d: will pause for %d milliseconds\n", i, d)
				time.Sleep(d * time.Millisecond)
				fmt.Printf("goroutine %d: finished task (%d)\n", i, d)
			}
		}(i)
	}

	// we use channels to coordinate and generate tasks
	for i := 0; i < iterations; i++ {
		tasks <- 100
		tasks <- 5000
		tasks <- 2000
	}

	close(tasks)
	wg.Wait()
	log.Println("time spent:", time.Since(startTime))
}

/*
OUTPUT: notice although we pause for long periods of time,
        the overall time is just over 5 seconds because we
        are parallelising the processing via goroutines.

goroutine 7: will pause for 5000 milliseconds
goroutine 3: will pause for 5000 milliseconds
goroutine 5: will pause for 100 milliseconds
goroutine 0: will pause for 100 milliseconds
goroutine 8: will pause for 2000 milliseconds
goroutine 2: will pause for 100 milliseconds
goroutine 9: will pause for 100 milliseconds
goroutine 6: will pause for 2000 milliseconds
goroutine 1: will pause for 5000 milliseconds
goroutine 4: will pause for 2000 milliseconds << first 10 tasks have filled up the channel!
goroutine 5: finished task (100)
goroutine 5: will pause for 5000 milliseconds << once a task has finished, the channel frees up one space.
goroutine 0: finished task (100)
goroutine 2: finished task (100)
goroutine 2: will pause for 100 milliseconds
goroutine 9: finished task (100)
goroutine 9: will pause for 5000 milliseconds
goroutine 0: will pause for 2000 milliseconds
goroutine 2: finished task (100)
goroutine 2: will pause for 2000 milliseconds
goroutine 8: finished task (2000)
goroutine 6: finished task (2000)
goroutine 4: finished task (2000)
goroutine 0: finished task (2000)
goroutine 2: finished task (2000)
goroutine 7: finished task (5000)
goroutine 3: finished task (5000)
goroutine 1: finished task (5000)
goroutine 9: finished task (5000)
goroutine 5: finished task (5000)
2019/02/08 14:31:30 time spent: 5.107444551s
*/

Limit Concurrency - Semaphore (using channels).go

sem := make(chan struct{}, 12) // 12 is the maximum number of concurrent processes that may run at any time

func main() {
    var wg sync.WaitGroup
    wg.Add(1024 * 1024)

    for i := 0; i < (1024 * 1024); i++ {
        go func(index int) {
            // if there are already 12 goroutines running,
            // the buffered channel will block
            // and a new file wont be opened
            sem <- struct{}{}

            // once this goroutine finishes, empty the buffer by one
            // so the next process may start 
            //
            // i.e. another goroutine blocked on the above channel 'send' 
            // will now be able to execute the statement and continue
            defer func() { <-sem }()

            // wg.Done must be called _before_ the read from the sem channel
            // remember that defers are executed like a stack (LIFO)
            // hence we defer wg.Done _after_ deferring the sem read
            defer wg.Done()

            if f, e := os.Open(strconv.Itoa(index)); e != nil {
                // handle file open failure
                return
            }
            defer f.Close()
            
            // handle open file
        }(i)
    }

    wg.Wait()
    close(sem)
}

Limit Concurrency - Semaphore (using middleware with the golang.org x sync semaphore package).go

// Package middleware provides wrapper functions around the http.Handler
// interface, allowing for an incoming HTTP request to be modified or analysed.
package middleware

import (
	"net/http"

	"github.com/example/internal/pkg/settings"
	"golang.org/x/sync/semaphore"
)

// LimitConcurrency will reject any new connections that exceed the service's
// ability to continue functioning.
func LimitConcurrency(handler http.Handler, config *settings.Config) http.Handler {
  	s := semaphore.NewWeighted(int64(config.ConcurrencyLimit))

	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		if !s.TryAcquire(1) {
			http.Error(w, "TOO_MANY_CONCURRENT_CONNECTIONS", http.StatusServiceUnavailable)
			return
		}
		defer func() {
			s.Release(1)
		}()

		handler.ServeHTTP(w, r)
	})
}