Tags: #go #golang #concurrency #semaphore
If you find that you’re running out of system resources whilst trying to open lots of files, then you should consider limiting your concurrency.
One way would be to define a pool that controls the concurrency so that you don’t try to open too many files at once and exhaust your system resources.
There are two approaches you can take to implement that:
package main
import (
"fmt"
"log"
"sync"
"time"
)
const iterations = 5
func main() {
var wg sync.WaitGroup
startTime := time.Now()
// we use channels to coordinate the tasks
tasks := make(chan time.Duration, 10)
// we avoid creating 'unbounded' number of goroutines (10 in this case)
// we need to decide on the size of this 'worker pool' and so we'll pick 10 to match up with the channel
// but depending on how many tasks there are you'll need to be more selective on pool size
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
// we can do 10 things at a time due to the buffered task channel
for d := range tasks {
fmt.Printf("goroutine %d: will pause for %d milliseconds\n", i, d)
time.Sleep(d * time.Millisecond)
fmt.Printf("goroutine %d: finished task (%d)\n", i, d)
}
}(i)
}
// we use channels to coordinate and generate tasks
for i := 0; i < iterations; i++ {
tasks <- 100
tasks <- 5000
tasks <- 2000
}
close(tasks)
wg.Wait()
log.Println("time spent:", time.Since(startTime))
}
/*
OUTPUT: notice although we pause for long periods of time,
the overall time is just over 5 seconds because we
are parallelising the processing via goroutines.
goroutine 7: will pause for 5000 milliseconds
goroutine 3: will pause for 5000 milliseconds
goroutine 5: will pause for 100 milliseconds
goroutine 0: will pause for 100 milliseconds
goroutine 8: will pause for 2000 milliseconds
goroutine 2: will pause for 100 milliseconds
goroutine 9: will pause for 100 milliseconds
goroutine 6: will pause for 2000 milliseconds
goroutine 1: will pause for 5000 milliseconds
goroutine 4: will pause for 2000 milliseconds << first 10 tasks have filled up the channel!
goroutine 5: finished task (100)
goroutine 5: will pause for 5000 milliseconds << once a task has finished, the channel frees up one space.
goroutine 0: finished task (100)
goroutine 2: finished task (100)
goroutine 2: will pause for 100 milliseconds
goroutine 9: finished task (100)
goroutine 9: will pause for 5000 milliseconds
goroutine 0: will pause for 2000 milliseconds
goroutine 2: finished task (100)
goroutine 2: will pause for 2000 milliseconds
goroutine 8: finished task (2000)
goroutine 6: finished task (2000)
goroutine 4: finished task (2000)
goroutine 0: finished task (2000)
goroutine 2: finished task (2000)
goroutine 7: finished task (5000)
goroutine 3: finished task (5000)
goroutine 1: finished task (5000)
goroutine 9: finished task (5000)
goroutine 5: finished task (5000)
2019/02/08 14:31:30 time spent: 5.107444551s
*/
sem := make(chan struct{}, 12) // 12 is the maximum number of concurrent processes that may run at any time
func main() {
var wg sync.WaitGroup
wg.Add(1024 * 1024)
for i := 0; i < (1024 * 1024); i++ {
go func(index int) {
// if there are already 12 goroutines running,
// the buffered channel will block
// and a new file wont be opened
sem <- struct{}{}
// once this goroutine finishes, empty the buffer by one
// so the next process may start
//
// i.e. another goroutine blocked on the above channel 'send'
// will now be able to execute the statement and continue
defer func() { <-sem }()
// wg.Done must be called _before_ the read from the sem channel
// remember that defers are executed like a stack (LIFO)
// hence we defer wg.Done _after_ deferring the sem read
defer wg.Done()
if f, e := os.Open(strconv.Itoa(index)); e != nil {
// handle file open failure
return
}
defer f.Close()
// handle open file
}(i)
}
wg.Wait()
close(sem)
}
// Package middleware provides wrapper functions around the http.Handler
// interface, allowing for an incoming HTTP request to be modified or analysed.
package middleware
import (
"net/http"
"github.com/example/internal/pkg/settings"
"golang.org/x/sync/semaphore"
)
// LimitConcurrency will reject any new connections that exceed the service's
// ability to continue functioning.
func LimitConcurrency(handler http.Handler, config *settings.Config) http.Handler {
s := semaphore.NewWeighted(int64(config.ConcurrencyLimit))
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !s.TryAcquire(1) {
http.Error(w, "TOO_MANY_CONCURRENT_CONNECTIONS", http.StatusServiceUnavailable)
return
}
defer func() {
s.Release(1)
}()
handler.ServeHTTP(w, r)
})
}