Go Concurrency
At this month's meetup of Cambridge Gophers, Firat Nur gave us an excellent introduction to writing concurrent programs with Go. His talk covered goroutines, channels, pipelines, and race detection. He ended with an example of a simple online multiplayer game; this had been indended as an exercise, but unfortunately we ran out of time at the end.
One of his early examples demonstrated the importance of wait groups and mutexes:
package main
import "fmt"
func main() {
var accum int
for i := 0; i <= 100; i++ {
go func(delta int) {
accum += delta
}(i)
}
fmt.Println(accum)
}
Those of you who recall the anecdote about the schoolboy Gauss will be expecting the answer 5050, but we get a different result every time we run this program.
The first problem is that we do not wait for all of the goroutines to
complete; this is addressed by the use of a
WaitGroup
to track the
running goroutines:
package main
import (
"fmt"
"sync"
)
func main() {
var accum int
var wg sync.WaitGroup
for i := 0; i <= 100; i++ {
wg.Add(1)
go func(delta int) {
defer wg.Done()
accum += delta
}(i)
}
wg.Wait()
fmt.Println(accum)
}
The second problem is that we have a race condition with the different
goroutines vying to add to the accumulator at the same time. This can
be fixed by using a Mutex
to
protect access to the shared variable:
package main
import (
"fmt"
"sync"
)
func main() {
var accum int
var wg sync.WaitGroup
var mu sync.Mutex
for i := 0; i <= 100; i++ {
wg.Add(1)
go func(delta int) {
defer wg.Done()
mu.Lock()
accum += delta
mu.Unlock()
}(i)
}
wg.Wait()
fmt.Println(accum)
}
Our program now gives the correct answer, but I wondered if using the
sync/atomic
package might
give us better performance.
Here are two versions of the accumulator function, one using a mutex,
and the other using atomic.AddUint64
:
package adder
import (
"sync"
"sync/atomic"
)
func AccumAtomic(n uint64) uint64 {
var accum uint64
var wg sync.WaitGroup
for i := uint64(1); i <= n; i++ {
wg.Add(1)
go func(delta uint64) {
defer wg.Done()
atomic.AddUint64(&accum, delta)
}(i)
}
wg.Wait()
return accum
}
func AccumMutex(n uint64) uint64 {
var accum uint64
var mu sync.Mutex
var wg sync.WaitGroup
for i := uint64(1); i <= n; i++ {
wg.Add(1)
go func(delta uint64) {
defer wg.Done()
mu.Lock()
accum += delta
mu.Unlock()
}(i)
}
wg.Wait()
return accum
}
Now we can write benchmarks for these two functions:
package adder
import (
"testing"
)
func BenchmarkAccumAtomic(b *testing.B) {
for n := 0; n < b.N; n++ {
AccumAtomic(10000)
}
}
func BenchmarkAccumMutex(b *testing.B) {
for n := 0; n < b.N; n++ {
AccumMutex(10000)
}
}
As expected, the atomic version gives slightly better performance:
$ go test -bench=.
goos: linux
goarch: amd64
BenchmarkAccumAtomic-8 500 3222956 ns/op
BenchmarkAccumMutex-8 500 3640759 ns/op
PASS
ok _/home/ray/Workspace/personal/go-examples/adder 4.151s
I'm always nervous about using the functions from the sync/atomic
package, whose documentation warns:
These functions require great care to be used correctly. Except for special, low-level applications, synchronization is better done with channels or the facilities of the sync package.
Indeed, if we needed to access the accumulator while the goroutines were
still running (rather than at the end, after the call to wg.Wait()
),
we would have to use
atomic.LoadUint64
to read the value safely. I know I've forgotten this on at least one
occasion and have a bug in that code waiting to bite me.
See Go by Example: Atomic Counters for a more detailed walkthrough.