Go: A Better Fan-out, Fan-in Example

A better example to understand the fan-out, fan-in concurrency pattern in Go.

Austin Burnett

6 minute read

As I’ve been learning Go, I’ve knowingly put off picking up the concurrency model as I know it’s something I haven’t touched since college. Wanting an additional resource to dive back in, I’ve been reading Concurrency in Go by Katherine Cox-Buday. This book has been great as someone who generally understands the idioms of Go, but may not know where to start when it comes to concurrency.

Popularized by the Go blog and referenced in the book is the fan-out, fan-in pattern. In my opinion, the examples suffer from being too generic. It was hard for me to reference the examples and understand the real benefit of using this pattern. So, I began thinking of examples that showed why this was a powerful pattern.

The context of out problem is a shipping warehouse. Items are sent from the shelves to a conveyor belt. An employee packs the items and places them on an outbound conveyor belt to be shipped. It’s important here to note that we’ll have one item per box and that some items may require extra handling to package than others. Let’s first conquer the problem using pipelines:

 1// https://play.golang.org/p/JWEIQDdxv6G
 2package main
 3
 4import (
 5        "fmt"
 6        "time"
 7)
 8
 9type Item struct {
10        ID            int
11        Name          string
12        PackingEffort time.Duration
13}
14
15func PrepareItems(done <-chan bool) <-chan Item {
16        items := make(chan Item)
17        itemsToShip := []Item{
18                Item{0, "Shirt", 1 * time.Second},
19                Item{1, "Legos", 1 * time.Second},
20                Item{2, "TV", 5 * time.Second},
21                Item{3, "Bananas", 2 * time.Second},
22                Item{4, "Hat", 1 * time.Second},
23                Item{5, "Phone", 2 * time.Second},
24                Item{6, "Plates", 3 * time.Second},
25                Item{7, "Computer", 5 * time.Second},
26                Item{8, "Pint Glass", 3 * time.Second},
27                Item{9, "Watch", 2 * time.Second},
28        }
29        go func() {
30                for _, item := range itemsToShip {
31                        select {
32                        case <-done:
33                                return
34                        case items <- item:
35                        }
36                }
37                close(items)
38        }()
39        return items
40}
41
42func PackItems(done <-chan bool, items <-chan Item) <-chan int {
43        packages := make(chan int)
44        go func() {
45                for item := range items {
46                        select {
47                        case <-done:
48                                return
49                        case packages <- item.ID:
50                                time.Sleep(item.PackingEffort)
51                                fmt.Printf("Shipping package no. %d\n", item.ID)
52                        }
53                }
54                close(packages)
55        }()
56        return packages
57}
58
59func main() {
60        done := make(chan bool)
61        defer close(done)
62
63        start := time.Now()
64
65        packages := PackItems(done, PrepareItems(done))
66
67        numPackages := 0
68        for range packages {
69                numPackages++
70        }
71
72        fmt.Printf("Took %fs to ship %d packages\n", time.Since(start).Seconds(), numPackages)
73}

In this example, PrepareItems queues up items on an unbuffered channel, think of this as our conveyor belt from the shelves to the employee packing the items. PackItems pulls from this channel, places it on the packages channel (i.e., our shipping conveyor belt), and assimilates work by calling time.Sleep for the PackingEffort in seconds. This pipeline essentially runs serially, requiring 25s to run. A possible solution to this problem is adding more workers. Additional workers can pack items and make sure they make it onto the shipping belt. This is the essence of fan-out, fan-in. We’ll introduce additional goroutines in a certain stage to increase our throughput, placing them back into the single pipeline. (Note: I’m going to use a hard-coded number for number of additional goroutines to run for simplicity. You may want to explore runtime.NumCPU() or experiment in your production environment to understand what suits your usecase.)

  1// https://play.golang.org/p/_NAoJ3szSyo
  2package main
  3
  4import (
  5        "fmt"
  6        "sync"
  7        "time"
  8)
  9
 10type Item struct {
 11        ID            int
 12        Name          string
 13        PackingEffort time.Duration
 14}
 15
 16func PrepareItems(done <-chan bool) <-chan Item {
 17        items := make(chan Item)
 18        itemsToShip := []Item{
 19                Item{0, "Shirt", 1 * time.Second},
 20                Item{1, "Legos", 1 * time.Second},
 21                Item{2, "TV", 5 * time.Second},
 22                Item{3, "Bananas", 2 * time.Second},
 23                Item{4, "Hat", 1 * time.Second},
 24                Item{5, "Phone", 2 * time.Second},
 25                Item{6, "Plates", 3 * time.Second},
 26                Item{7, "Computer", 5 * time.Second},
 27                Item{8, "Pint Glass", 3 * time.Second},
 28                Item{9, "Watch", 2 * time.Second},
 29        }
 30        go func() {
 31                for _, item := range itemsToShip {
 32                        select {
 33                        case <-done:
 34                                return
 35                        case items <- item:
 36                        }
 37                }
 38                close(items)
 39        }()
 40        return items
 41}
 42
 43func PackItems(done <-chan bool, items <-chan Item, workerID int) <-chan int {
 44        packages := make(chan int)
 45        go func() {
 46                for item := range items {
 47                        select {
 48                        case <-done:
 49                                return
 50                        case packages <- item.ID:
 51                                time.Sleep(item.PackingEffort)
 52                                fmt.Printf("Worker #%d: Shipping package no. %d, took %ds to pack\n", workerID, item.ID, item.PackingEffort / time.Second)
 53                        }
 54                }
 55                close(packages)
 56        }()
 57        return packages
 58}
 59
 60func merge(done <-chan bool, channels ...<-chan int) <-chan int {
 61        var wg sync.WaitGroup
 62
 63        wg.Add(len(channels))
 64        outgoingPackages := make(chan int)
 65        multiplex := func(c <-chan int) {
 66                defer wg.Done()
 67                for i := range c {
 68                        select {
 69                        case <-done:
 70                                return
 71                        case outgoingPackages <- i:
 72                        }
 73                }
 74        }
 75        for _, c := range channels {
 76                go multiplex(c)
 77        }
 78        go func() {
 79                wg.Wait()
 80                close(outgoingPackages)
 81        }()
 82        return outgoingPackages
 83}
 84
 85func main() {
 86        done := make(chan bool)
 87        defer close(done)
 88
 89        start := time.Now()
 90
 91        items := PrepareItems(done)
 92
 93        workers := make([]<-chan int, 4)
 94        for i := 0; i<4; i++ {
 95                workers[i] = PackItems(done, items, i)
 96        }
 97
 98        numPackages := 0
 99        for range merge(done, workers...) {
100                numPackages++
101        }
102
103        fmt.Printf("Took %fs to ship %d packages\n", time.Since(start).Seconds(), numPackages)
104}

Here, you can see that on lines 93-96 we setup the additional “workers” to fan-out the problem. This part actually didn’t make much sense to me the first few times looking at it. The key part is that they all reference the same channel of Items to read from and they each return a channel that they write to, which in the end we will “fan-in” by merging these channels back into one single channel.

By running this, you can actually see which “worker” is preparing a package:

Worker #1: Shipping package no. 1, took 1s to pack
Worker #0: Shipping package no. 0, took 1s to pack
Worker #1: Shipping package no. 4, took 1s to pack
Worker #3: Shipping package no. 3, took 2s to pack
Worker #0: Shipping package no. 5, took 2s to pack
Worker #1: Shipping package no. 6, took 3s to pack
Worker #2: Shipping package no. 2, took 5s to pack
Worker #0: Shipping package no. 8, took 3s to pack
Worker #3: Shipping package no. 7, took 5s to pack
Worker #1: Shipping package no. 9, took 2s to pack
Took 7.000000s to ship 10 packages

7 seconds is quite the improvement from 25! It’s important to note now, that the order is scrambled. Fan-out, fan-in is useful in scenarios when you have a specific stage of your pipeline that is computationally expensive (aka taking long) and that the order of your stream is not important.

I hope that this serves as a bit more verbose, useful example of the fan-out, fan-in problem. It has really helped me grasp the problem better. If you have any questions, feel free to reach me @austburn.