Go Concurrency Patterns

For the Real World

25 April 2013

Bryan Mills

Disclaimer

The opinions and advice presented here are my own, and are not necessarily shared by my employer or the Go Authors.

(But I hope they agree anyway!)

Basic concurrency: goroutines

If we want to make more efficient use of our processors, we need to give them more work to do concurrently.

Instead of this:

package main

import (
	"log"
	"time"
)

const nTasks = 4

type task int

func doTask(t task) {
	log.Printf("starting task %v\n", t)
	time.Sleep(time.Second)
	log.Printf("finished task %v\n", t)
}

func main() {
    for i := 0; i < nTasks; i++ {
        doTask(task(i))
    }
}

We can do this:

package main

import (
	"log"
	"time"
)

const nTasks = 4

type task int

func doTask(t task) {
	log.Printf("starting task %v\n", t)
	time.Sleep(time.Second)
	log.Printf("finished task %v\n", t)
}

func main() {
    for i := 0; i < nTasks; i++ {
        go doTask(task(i))
    }
}

Basic concurrency: goroutines

Ok, so maybe that didn't work so well. But we can fix it!

The moral: know when your goroutines will finish.

package main

import (
	"log"
	"sync"
	"time"
)

const nTasks = 4

type task int

func doTask(t task) {
	log.Printf("starting task %v\n", t)
	time.Sleep(time.Second)
	log.Printf("finished task %v\n", t)
}

func main() {
    var wg sync.WaitGroup
    defer wg.Wait()

    for i := 0; i < nTasks; i++ {
        wg.Add(1)
        go func(t task) {
            doTask(t)
            wg.Done()
        }(task(i))
    }
}

Basic concurrency: channels

This is Go - we don't ignore errors, we handle them.

package main

import (
	"errors"
	"log"
	"time"
)

const nTasks = 4

var errHalfway = errors.New("error: halfway done")

type task int

func doTask(t task) error {
	log.Printf("starting task %v\n", t)
	time.Sleep(time.Second)
	log.Printf("finished task %v\n", t)
	if t == nTasks/2 {
		return errHalfway
	}
	return nil
}

func doTasks() error {
    errs := make(chan error)

    for i := 0; i < nTasks; i++ {
        go func(t task) { errs <- doTask(t) }(task(i))
    }

    for i := 0; i < nTasks; i++ {
        if err := <-errs; err != nil {
            return err
        }
    }
    return nil
}

func main() {
	if err := doTasks(); err != nil {
		log.Print(err)
	}
}

Basic concurrency: buffered channnels

But wait, is that a leak? (Know when your goroutines will finish!)

package main

import (
	"errors"
	"log"
	"time"
)

const nTasks = 4

var errHalfway = errors.New("error: halfway done")

type task int

func doTask(t task) error {
	log.Printf("starting task %v\n", t)
	time.Sleep(time.Second)
	log.Printf("finished task %v\n", t)
	if t == nTasks/2 {
		return errHalfway
	}
	return nil
}

func doTasks() error {
    errs := make(chan error, nTasks)

    for i := 0; i < nTasks; i++ {
        go func(t task) { errs <- doTask(t) }(task(i))
    }

    for i := 0; i < nTasks; i++ {
        if err := <-errs; err != nil {
            return err
        }
    }
    return nil
}

func main() {
	if err := doTasks(); err != nil {
		log.Print(err)
	}
}

Basic concurrency: close

To indicate the end of data on a channel, we can close it. Receiving on a closed channel produces the zero value for the channel's element type, or we can use the two-argument form of the receive operator (<-) to detect a receive from a closed channel.

package main

import (
	"encoding/binary"
	"io"
	"log"
	"time"
)

const nTasks = 4

type task int

func doTask(t task) {
	log.Printf("starting task %v\n", t)
	time.Sleep(time.Second)
	log.Printf("finished task %v\n", t)
}

func unmarshalUints(r io.Reader, c chan<- uint32) error {
    buf := make([]byte, 4)
    _, err := io.ReadFull(r, buf)
    for ; err == nil; _, err = io.ReadFull(r, buf) {
        c <- binary.BigEndian.Uint32(buf)
    }
    close(c)
    return err
}
func marshalUints(w io.WriteCloser, c <-chan uint32) error {
    buf := make([]byte, 4)
    for i, ok := <-c; ok; i, ok = <-c {
        binary.BigEndian.PutUint32(buf, i)
        if _, err := w.Write(buf); err != nil {
            return err
        }
    }
    return w.Close()
}

func main() {
	r, w := io.Pipe()
	in, out := make(chan uint32), make(chan uint32)
	go marshalUints(w, in)
	go unmarshalUints(r, out)
	go func() {
		for i := uint32(0); i < nTasks; i++ {
			in <- i
		}
		close(in)
	}()
	for i, ok := <-out; ok; i, ok = <-out {
		doTask(task(i))
	}
}

Basic concurrency: range

Or, we can use the range keyword with a for loop, and it will break when the channel is closed.

package main

import (
	"encoding/binary"
	"io"
	"log"
	"time"
)

const nTasks = 4

type task int

func doTask(t task) {
	log.Printf("starting task %v\n", t)
	time.Sleep(time.Second)
	log.Printf("finished task %v\n", t)
}

func unmarshalUints(r io.Reader, c chan<- uint32) error {
	buf := make([]byte, 4)
	_, err := io.ReadFull(r, buf)
	for ; err == nil; _, err = io.ReadFull(r, buf) {
		c <- binary.BigEndian.Uint32(buf)
	}
	close(c)
	return err
}

func marshalUints(w io.WriteCloser, c <-chan uint32) error {
    buf := make([]byte, 4)
    for i := range c {
        binary.BigEndian.PutUint32(buf, i)
        if _, err := w.Write(buf); err != nil {
            return err
        }
    }
    return w.Close()
}

func main() {
	r, w := io.Pipe()
	in, out := make(chan uint32), make(chan uint32)
	go marshalUints(w, in)
	go unmarshalUints(r, out)
	go func() {
		for i := uint32(0); i < nTasks; i++ {
			in <- i
		}
		close(in)
	}()
	for i := range out {
		doTask(task(i))
	}
}

Basic concurrency: select

The real power of channels, though, is the select statement.

Basic concurrency: select

We can use select in a lot of different ways. One simple application is to multiplex between input and output channels.

package main

import (
	"log"
	"math/rand"
	"time"
)

const (
	duration = 5 * time.Second
	period   = time.Second / 2
	maxEvent = 4
)

type event int

func producer(events chan<- event) {
	defer close(events)
	done := time.After(duration)
	for {
		select {
		case <-done:
			return
		case events <- event(rand.Intn(maxEvent)):
			time.Sleep(time.Duration(rand.Int63n(int64(period * 2))))
		}
	}
}

func consumer(aggregates <-chan map[event]int) {
	ticker := time.NewTicker(period)
	defer ticker.Stop()
	for _ = range ticker.C {
		if agg, ok := <-aggregates; ok {
			log.Printf("%v", agg)
		} else {
			return
		}
	}
}

func aggregator(events <-chan event, aggregates chan<- map[event]int) {
    defer close(aggregates)
    agg := make(map[event]int)
    for {
        select {
        case e, ok := <-events:
            if !ok {
                aggregates <- agg
                return
            }
            agg[e]++
        case aggregates <- agg:
            agg = make(map[event]int)
        }
    }
}

func main() {
	events := make(chan event)
	aggregates := make(chan map[event]int)

	go producer(events)
	go aggregator(events, aggregates)
	consumer(aggregates)
}

Basic concurrency: nil channels

nil channels act like channels that never send or receive anything.
We can use nil to disable sending in our aggregator when it has no events buffered.

package main

import (
	"log"
	"math/rand"
	"time"
)

const (
	duration = 5 * time.Second
	period   = time.Second / 2
	maxEvent = 4
)

type event int

func producer(events chan<- event) {
	defer close(events)
	done := time.After(duration)
	for {
		select {
		case <-done:
			return
		case events <- event(rand.Intn(maxEvent)):
			time.Sleep(time.Duration(rand.Int63n(int64(period * 2))))
		}
	}
}

func consumer(aggregates <-chan map[event]int) {
	ticker := time.NewTicker(period)
	defer ticker.Stop()
	for _ = range ticker.C {
		if agg, ok := <-aggregates; ok {
			log.Printf("%v", agg)
		} else {
			return
		}
	}
}

func aggregator(events <-chan event, aggregates chan<- map[event]int) {
    defer close(aggregates)
    var aggc chan<- map[event]int // aggregates or nil
    agg := make(map[event]int)
    for events != nil || len(agg) > 0 {
        select {
        case e, ok := <-events:
            if !ok {
                events = nil
                continue
            }
            agg[e]++
            aggc = aggregates
        case aggc <- agg:
            agg = make(map[event]int)
            aggc = nil
        }
    }
}

func main() {
	events := make(chan event)
	aggregates := make(chan map[event]int)

	go producer(events)
	go aggregator(events, aggregates)
	consumer(aggregates)
}

WARNING

MORE CODE AHEAD

Now that we've covered the basics, let's see how we can use channels and concurrency in some other applications.

Rate limiter

Go lets us write efficient programs - sometimes too efficient.
We need to play nice with slower services. How about a rate-limiter for outgoing requests?

package main

import (
	"log"
	"sync"
	"time"
)

const nTasks = 6

type task int

func doTask(t task) {
	log.Printf("starting task %v\n", t)
	time.Sleep(time.Second)
	log.Printf("finished task %v\n", t)
}

type Token struct{}
type Limiter struct {
	tokens chan Token
}

func NewLimiter(period time.Duration, burst int) Limiter {
    l := Limiter{tokens: make(chan Token, burst)}
    go l.loop(period)
    return l
}
func (l Limiter) loop(period time.Duration) {
    for {
        l.tokens <- Token{}
        time.Sleep(period)
    }
}
func (l Limiter) Wait() {
    <-l.tokens
}

func main() {
	var wg sync.WaitGroup
	defer wg.Wait()

	l := NewLimiter(2*time.Second, 2)
	for i := 0; i < nTasks; i++ {
		l.Wait()
		wg.Add(1)
		go func(t task) {
			doTask(t)
			wg.Done()
		}(task(i))
	}
}

Rate limiter

Hey, no littering! (Know when your goroutines will finish!)

package main

import (
	"log"
	"sync"
	"time"
)

const nTasks = 6

type task int

func doTask(t task) {
	log.Printf("starting task %v\n", t)
	time.Sleep(time.Second)
	log.Printf("finished task %v\n", t)
}

type Token struct{}
type Limiter struct {
	tokens    chan Token
	cycleTime time.Duration
}

func NewLimiter(period time.Duration, burst int) Limiter {
    l := Limiter{
        tokens:    make(chan Token, burst),
        cycleTime: time.Duration(int(period) * burst),
    }
    go l.fill(period, burst)
    return l
}
func (l Limiter) fill(period time.Duration, burst int) {
    for i := 0; i < burst; i++ {
        l.tokens <- Token{}
        time.Sleep(period)
    }
}
func (l Limiter) Wait() {
    <-l.tokens
    time.AfterFunc(l.cycleTime, func() { l.tokens <- Token{} })
}

func main() {
	var wg sync.WaitGroup
	defer wg.Wait()

	l := NewLimiter(2*time.Second, 2)
	for i := 0; i < nTasks; i++ {
		l.Wait()
		wg.Add(1)
		go func(t task) {
			doTask(t)
			wg.Done()
		}(task(i))
	}
}

Packet switch

We can use buffered channels and select to help an overloaded Go server, too.

package main

import (
	"log"
	"math/rand"
	"time"
)

const (
	duration   = 5 * time.Second
	period     = time.Second / 2
	maxAddr    = 4
	bufferSize = 4
)

type address int
type Packet struct {
	from address
	to   address
}

func randPacket() Packet {
	return Packet{
		from: address(rand.Intn(maxAddr)),
		to:   address(rand.Intn(maxAddr)),
	}
}

func producer(packets chan<- Packet) {
	defer close(packets)
	done := time.After(duration)
	packet := randPacket()
	for {
		select {
		case <-done:
			return
		case packets <- packet:
			time.Sleep(time.Duration(rand.Int63n(int64(period * 2))))
			packet = randPacket()
		}
	}
}

func connect(address address, buffer <-chan Packet) {
	go func() {
		for packet := range buffer {
			log.Printf("sent %v: %+v", address, packet)
		}
	}()
}

func packetSwitch(in <-chan Packet) {
    buffers := make(map[address]chan<- Packet)
    for packet := range in {
        buf := buffers[packet.to]
        if buf == nil {
            newBuf := make(chan Packet, bufferSize)
            buffers[packet.to] = newBuf
            connect(packet.to, newBuf)
            buf = newBuf
        }

        select {
        case buf <- packet:
        default:
            log.Printf("packet dropped: %+v", packet)
        }
    }
    for _, buf := range buffers {
        close(buf)
    }
}

func main() {
	in := make(chan Packet)
	go producer(in)
	packetSwitch(in)
}

Double-buffered renderer

If we need to reduce pressure on the garbage collector, we can reuse memory.

package main

import (
	"log"
	"time"
)

const (
	frameSize = 4096
	nFrames   = 4
)

func render(i int, buf []byte) {
	log.Printf("rendering(%d, %p)\n", i, buf)
	time.Sleep(1 * time.Second)
	log.Printf("rendered(%d, %p)\n", i, buf)
}

func display(i int, buf []byte) {
	log.Printf("displaying(%d, %p)\n", i, buf)
	time.Sleep(2 * time.Second)
	log.Printf("displayed(%d, %p)\n", i, buf)
}

func main() {
    ready := make(chan []byte)
    free := make(chan []byte, 1)

    go func() {
        for i := 0; i < nFrames; i++ {
            frame := <-free
            render(i, frame)
            ready <- frame
        }
        close(ready)
        <-free
        <-free
    }()
    free <- make([]byte, frameSize)
    free <- make([]byte, frameSize)

    i := 0
    for frame := range ready {
        display(i, frame)
        free <- frame
        i++
    }
}

Classic concurrency

We've seen some applications, but what about traditional concurrency?
As it turns out, we can implement those with channels, too.

Mutex

A mutex is really just a special case of a channel.

package main

import (
	"log"
	"sync"
	"time"
)

const nTasks = 4

type task int
type Resource struct{}

func doTask(t task, r Resource) {
	log.Printf("starting task %v\n", t)
	time.Sleep(time.Second)
	log.Printf("finished task %v\n", t)
}

type token struct{}
type Mutex struct {
	c chan token
}

func NewMutex() Mutex {
    m := Mutex{
        c: make(chan token, 1),
    }
    m.c <- token{}
    return m
}
func (m Mutex) Lock() {
    <-m.c
}
func (m Mutex) Unlock() {
    m.c <- token{}
}

func main() {
	m := NewMutex()
	r := Resource{}
	var wg sync.WaitGroup
	defer wg.Wait()

	for i := 0; i < nTasks; i++ {
		wg.Add(1)
		t := task(i)
		go func() {
			m.Lock()
			defer m.Unlock()
			defer wg.Done()
			doTask(t, r)
		}()
	}
}

Mutex

Lock and Unlock are error-prone. Could we use callbacks instead?

package main

import (
	"log"
	"sync"
	"time"
)

const nTasks = 4

type task int
type Resource struct{}

func doTask(t task, r Resource) {
	log.Printf("starting task %v\n", t)
	time.Sleep(time.Second)
	log.Printf("finished task %v\n", t)
}

type CallbackMutex struct {
	in   chan func(Resource)
	done chan bool
}

func NewCallbackMutex(r Resource) CallbackMutex {
    m := CallbackMutex{
        in:   make(chan func(Resource)),
        done: make(chan bool),
    }
    go m.loop(r)
    return m
}
func (m CallbackMutex) loop(r Resource) {
    for f := range m.in {
        f(r)
        m.done <- true
    }
}
func (m CallbackMutex) Do(f func(Resource)) {
    m.in <- f
    <-m.done
}
func (m CallbackMutex) Close() error {
    close(m.in)
    return nil
}
// TRY OMIT
func (m CallbackMutex) Try(f func(Resource)) bool {
	select {
	case m.in <- f:
		return <-m.done
	default:
	}
	return false
}

// END TRY OMIT

func main() {
	m := NewCallbackMutex(Resource{})
	defer m.Close()

	var wg sync.WaitGroup
	defer wg.Wait()

	for i := 0; i < nTasks; i++ {
		wg.Add(1)
		t := task(i)
		go m.Do(func(r Resource) {
			defer wg.Done()
			doTask(t, r)
		})
	}
}

Mutex

It's usually clearer to send resources instead of callbacks.
(Don't communicate by sharing memory; share memory by communicating.)

package main

import (
	"log"
	"sync"
	"time"
)

const nTasks = 4

type task int
type Resource struct{}

func doTask(t task, r Resource) {
	log.Printf("starting task %v\n", t)
	time.Sleep(time.Second)
	log.Printf("finished task %v\n", t)
}

type ChanMutex struct {
	c chan Resource
}

func NewChanMutex(r Resource) ChanMutex {
    m := ChanMutex{
        c: make(chan Resource, 1),
    }
    m.c <- r
    return m
}
func (m ChanMutex) Do(f func(Resource)) {
    r := <-m.c
    f(r)
    m.c <- r
}
// TRY OMIT
func (m ChanMutex) Try(f func(Resource)) (ok bool) {
	select {
	case r := <-m.c:
		f(r)
		m.c <- r
		return true
	default:
	}
	return false
}

// END TRY OMIT

func main() {
	m := NewChanMutex(Resource{})

	var wg sync.WaitGroup
	defer wg.Wait()

	for i := 0; i < nTasks; i++ {
		wg.Add(1)
		t := task(i)
		go m.Do(func(r Resource) {
			defer wg.Done()
			doTask(t, r)
		})
	}
}

In this case, it's also more efficient and easier to use: we don't have an extra goroutine hanging around, and we don't have to remember to Close it.

Mutex

Either way, we can use select to add flexibility.

package main

import (
	"log"
	"sync"
	"time"
)

const nTasks = 4

type task int
type Resource struct{}

func doTask(t task, r Resource) {
	log.Printf("starting task %v\n", t)
	time.Sleep(time.Second)
	log.Printf("finished task %v\n", t)
}

type CallbackMutex struct {
	in   chan func(Resource)
	done chan bool
}

// MUTEX OMIT
func NewCallbackMutex(r Resource) CallbackMutex {
	m := CallbackMutex{
		in:   make(chan func(Resource)),
		done: make(chan bool),
	}
	go m.loop(r)
	return m
}
func (m CallbackMutex) loop(r Resource) {
	for f := range m.in {
		f(r)           // HL
		m.done <- true // HL
	}
}
func (m CallbackMutex) Do(f func(Resource)) {
	m.in <- f // HL
	<-m.done  // HL
}
func (m CallbackMutex) Close() error {
	close(m.in)
	return nil
}

// END MUTEX OMIT
func (m CallbackMutex) Try(f func(Resource)) bool {
    select {
    case m.in <- f:
        return <-m.done
    default:
    }
    return false
}

func main() {
	m := NewCallbackMutex(Resource{})
	defer m.Close()

	var wg sync.WaitGroup
	defer wg.Wait()

	for i := 0; i < nTasks; i++ {
		wg.Add(1)
		t := task(i)
		go m.Do(func(r Resource) {
			defer wg.Done()
			doTask(t, r)
		})
	}
}
package main

import (
	"log"
	"sync"
	"time"
)

const nTasks = 4

type task int
type Resource struct{}

func doTask(t task, r Resource) {
	log.Printf("starting task %v\n", t)
	time.Sleep(time.Second)
	log.Printf("finished task %v\n", t)
}

type ChanMutex struct {
	c chan Resource
}

// MUTEX OMIT
func NewChanMutex(r Resource) ChanMutex {
	m := ChanMutex{
		c: make(chan Resource, 1),
	}
	m.c <- r
	return m
}
func (m ChanMutex) Do(f func(Resource)) {
	r := <-m.c // HL
	f(r)       // HL
	m.c <- r   // HL
}

// END MUTEX OMIT
func (m ChanMutex) Try(f func(Resource)) (ok bool) {
    select {
    case r := <-m.c:
        f(r)
        m.c <- r
        return true
    default:
    }
    return false
}

func main() {
	m := NewChanMutex(Resource{})

	var wg sync.WaitGroup
	defer wg.Wait()

	for i := 0; i < nTasks; i++ {
		wg.Add(1)
		t := task(i)
		go m.Do(func(r Resource) {
			defer wg.Done()
			doTask(t, r)
		})
	}
}

Semaphore

Here's a cute channel-based implementation of semaphores.

package main

import (
	"log"
	"sync"
	"time"
)

const (
	nTasks  = 4
	semSize = 2
)

type task int

func doTask(t task) {
	log.Printf("starting task %v\n", t)
	time.Sleep(time.Second)
	log.Printf("finished task %v\n", t)
}

type token struct{}

type Semaphore struct {
	c chan token
}

func NewSemaphore(size int) Semaphore {
    s := Semaphore{c: make(chan token, size)}
    for i := 0; i < size; i++ {
        s.c <- token{}
    }
    return s
}
func (s Semaphore) Acquire() {
    <-s.c
}
func (s Semaphore) Release() {
    s.c <- token{}
}

func main() {
	s := NewSemaphore(semSize)

	var wg sync.WaitGroup
	defer wg.Wait()

	for i := 0; i < nTasks; i++ {
		wg.Add(1)
		s.Acquire()
		go func(t task) {
			defer wg.Done()
			defer s.Release()
			doTask(t)
		}(task(i))
	}
}

Semaphore

Making it more space-efficient gets a bit tricky, though.

package main

import (
	"log"
	"sync"
	"time"
)

const (
	nTasks  = 4
	semSize = 2
)

type task int

func doTask(t task) {
	log.Printf("starting task %v\n", t)
	time.Sleep(time.Second)
	log.Printf("finished task %v\n", t)
}

type signal struct{}

type Semaphore struct {
	nWaiters chan int
	signals  chan signal
}

func NewSemaphore(size int) Semaphore {
    s := Semaphore{
        nWaiters: make(chan int, 1),
        signals:  make(chan signal),
    }
    s.nWaiters <- -size
    return s
}
func (s Semaphore) Acquire() {
    w := <-s.nWaiters + 1
    s.nWaiters <- w
    if w > 0 {
        <-s.signals
    }
}
func (s Semaphore) Release() {
    w := <-s.nWaiters
    s.nWaiters <- w - 1
    if w > 0 {
        s.signals <- signal{}
    }
}

func main() {
	s := NewSemaphore(semSize)

	var wg sync.WaitGroup
	defer wg.Wait()

	for i := 0; i < nTasks; i++ {
		wg.Add(1)
		s.Acquire()
		go func(t task) {
			defer wg.Done()
			defer s.Release()
			doTask(t)
		}(task(i))
	}
}

Semaphore

So don't be afraid to use the tools in the sync package when appropriate.

package main

import (
	"log"
	"sync"
	"time"
)

const (
	nTasks  = 4
	semSize = 2
)

type task int

func doTask(t task) {
	log.Printf("starting task %v\n", t)
	time.Sleep(time.Second)
	log.Printf("finished task %v\n", t)
}

type Semaphore struct {
	mutex     sync.Mutex
	cond      sync.Cond
	remaining int
}

func NewSemaphore(size int) *Semaphore {
    s := &Semaphore{remaining: size}
    s.cond.L = &s.mutex
    return s
}
func (s *Semaphore) Acquire() {
    s.mutex.Lock()
    defer s.mutex.Unlock()
    for s.remaining < 1 {
        s.cond.Wait()
    }
    s.remaining--
}
func (s *Semaphore) Release() {
    s.mutex.Lock()
    defer s.mutex.Unlock()
    s.remaining++
    s.cond.Signal()
}

func main() {
	s := NewSemaphore(semSize)

	var wg sync.WaitGroup
	defer wg.Wait()

	for i := 0; i < nTasks; i++ {
		wg.Add(1)
		s.Acquire()
		go func(t task) {
			defer wg.Done()
			defer s.Release()
			doTask(t)
		}(task(i))
	}
}

Worker pool

But why bother with a Semaphore when we can just limit the number of goroutines?

package main

import (
	"log"
	"sync"
	"time"
)

const (
	nTasks   = 4
	nWorkers = 2
)

type task int
type worker int

func (w worker) doTask(t task) {
	log.Printf("worker %v: starting task %v\n", w, t)
	time.Sleep(time.Second)
	log.Printf("worker %v: finished task %v\n", w, t)
}

func main() {
    var wg sync.WaitGroup
    defer wg.Wait()

    tasks := make(chan task)
    defer close(tasks)

    for i := 0; i < nWorkers; i++ {
        wg.Add(1)
        go func(w worker) {
            defer wg.Done()
            for t := range tasks {
                w.doTask(t)
            }
        }(worker(i))
    }

    for i := 0; i < nTasks; i++ {
        tasks <- task(i)
    }
}

Worker pool

If we hit an error, we can even bail out early.

    var wg sync.WaitGroup
    tasks := make(chan task)
    errc := make(chan error, 1)

    for i := 0; i < nWorkers; i++ {
        wg.Add(1)
        go func(w worker) {
            defer wg.Done()
            for t := range tasks {
                if err := w.doTask(t); err != nil {
                    select {
                    case errc <- err:
                    default:
                    }
                    return
                }
            }
        }(worker(i))
    }

Worker pool

(cont'd)

    var wg sync.WaitGroup
    tasks := make(chan task)
    errc := make(chan error, 1)
[...]
package main

import (
	"errors"
	"log"
	"sync"
	"time"
)

const (
	nTasks   = 4
	nWorkers = 2
)

var errHalfway = errors.New("error: halfway done")

type task int
type worker int

func (w worker) doTask(t task) error {
	log.Printf("worker %v: starting task %v\n", w, t)
	time.Sleep(time.Second)
	log.Printf("worker %v: finished task %v\n", w, t)
	if t == nTasks/2 {
		return errHalfway
	}
	return nil
}

func doTasks(nTasks int) error {
	// HEADER OMIT
	var wg sync.WaitGroup
	tasks := make(chan task)
	errc := make(chan error, 1) // HL
	// HEADER END OMIT

	// SLIDE1 OMIT
	for i := 0; i < nWorkers; i++ {
		wg.Add(1)
		go func(w worker) {
			defer wg.Done()
			for t := range tasks {
				if err := w.doTask(t); err != nil {
					select {
					case errc <- err: // HL
					default: // HL
					}
					return
				}
			}
		}(worker(i))
	}
	// SLIDE1 END OMIT

    for i := 0; i < nTasks; i++ {
        select {
        case err := <-errc:
            close(tasks)
            wg.Wait()
            return err
        case tasks <- task(i):
        }
    }
    close(tasks)
    wg.Wait()
    for err := range errc {
        return err
    }
    return nil
}

func main() {
	if err := doTasks(nTasks); err != nil {
		log.Print(err)
	}
}

Some assembly required

In real Go code, we don't usually need to implement pure abstractions like Mutex or Semaphore. As we've seen, we often need to incorporate other constraints such as resource limits, timeouts, error handling, and memory efficiency.

Using channels and select, we can recognize these kinds of patterns and easily combine them. The result is often simpler and more responsive than mutex- or callback-driven alternatives.

close(slides)

for questions != nil && comments != nil {
    select {
    case question, ok := <-questions:
        if !ok {
            questions = nil
            continue
        }
        answer(question)
    case comment, ok := <-comments:
        if !ok {
            comments = nil
            continue
        }
        respond(comment)
    case <-timeout:
        return
}

Thank you

Use the left and right arrow keys or click the left and right edges of the page to navigate between slides.
(Press 'H' or navigate to hide this message.)