Go: A Better Fan-out, Fan-in Example
A better example to understand the fan-out, fan-in concurrency pattern in Go.
As I’ve been learning Go, I’ve knowingly put off picking up the concurrency model as I know it’s something I haven’t touched since college. Wanting an additional resource to dive back in, I’ve been reading Concurrency in Go by Katherine Cox-Buday. This book has been great as someone who generally understands the idioms of Go, but may not know where to start when it comes to concurrency.
Popularized by the Go blog and referenced in the book is the fan-out, fan-in pattern. In my opinion, the examples suffer from being too generic. It was hard for me to reference the examples and understand the real benefit of using this pattern. So, I began thinking of examples that showed why this was a powerful pattern.
The context of out problem is a shipping warehouse. Items are sent from the shelves to a conveyor belt. An employee packs the items and places them on an outbound conveyor belt to be shipped. It’s important here to note that we’ll have one item per box and that some items may require extra handling to package than others. Let’s first conquer the problem using pipelines:
1// https://play.golang.org/p/JWEIQDdxv6G
2package main
3
4import (
5 "fmt"
6 "time"
7)
8
9type Item struct {
10 ID int
11 Name string
12 PackingEffort time.Duration
13}
14
15func PrepareItems(done <-chan bool) <-chan Item {
16 items := make(chan Item)
17 itemsToShip := []Item{
18 Item{0, "Shirt", 1 * time.Second},
19 Item{1, "Legos", 1 * time.Second},
20 Item{2, "TV", 5 * time.Second},
21 Item{3, "Bananas", 2 * time.Second},
22 Item{4, "Hat", 1 * time.Second},
23 Item{5, "Phone", 2 * time.Second},
24 Item{6, "Plates", 3 * time.Second},
25 Item{7, "Computer", 5 * time.Second},
26 Item{8, "Pint Glass", 3 * time.Second},
27 Item{9, "Watch", 2 * time.Second},
28 }
29 go func() {
30 for _, item := range itemsToShip {
31 select {
32 case <-done:
33 return
34 case items <- item:
35 }
36 }
37 close(items)
38 }()
39 return items
40}
41
42func PackItems(done <-chan bool, items <-chan Item) <-chan int {
43 packages := make(chan int)
44 go func() {
45 for item := range items {
46 select {
47 case <-done:
48 return
49 case packages <- item.ID:
50 time.Sleep(item.PackingEffort)
51 fmt.Printf("Shipping package no. %d\n", item.ID)
52 }
53 }
54 close(packages)
55 }()
56 return packages
57}
58
59func main() {
60 done := make(chan bool)
61 defer close(done)
62
63 start := time.Now()
64
65 packages := PackItems(done, PrepareItems(done))
66
67 numPackages := 0
68 for range packages {
69 numPackages++
70 }
71
72 fmt.Printf("Took %fs to ship %d packages\n", time.Since(start).Seconds(), numPackages)
73}
In this example, PrepareItems
queues up items on an unbuffered channel, think of this as our conveyor belt from the shelves to the employee packing the items. PackItems
pulls from this channel, places it on the packages channel (i.e., our shipping conveyor belt), and assimilates work by calling time.Sleep
for the PackingEffort
in seconds. This pipeline essentially runs serially, requiring 25s to run. A possible solution to this problem is adding more workers. Additional workers can pack items and make sure they make it onto the shipping belt. This is the essence of fan-out, fan-in. We’ll introduce additional goroutines in a certain stage to increase our throughput, placing them back into the single pipeline. (Note: I’m going to use a hard-coded number for number of additional goroutines to run for simplicity. You may want to explore runtime.NumCPU()
or experiment in your production environment to understand what suits your usecase.)
1// https://play.golang.org/p/_NAoJ3szSyo
2package main
3
4import (
5 "fmt"
6 "sync"
7 "time"
8)
9
10type Item struct {
11 ID int
12 Name string
13 PackingEffort time.Duration
14}
15
16func PrepareItems(done <-chan bool) <-chan Item {
17 items := make(chan Item)
18 itemsToShip := []Item{
19 Item{0, "Shirt", 1 * time.Second},
20 Item{1, "Legos", 1 * time.Second},
21 Item{2, "TV", 5 * time.Second},
22 Item{3, "Bananas", 2 * time.Second},
23 Item{4, "Hat", 1 * time.Second},
24 Item{5, "Phone", 2 * time.Second},
25 Item{6, "Plates", 3 * time.Second},
26 Item{7, "Computer", 5 * time.Second},
27 Item{8, "Pint Glass", 3 * time.Second},
28 Item{9, "Watch", 2 * time.Second},
29 }
30 go func() {
31 for _, item := range itemsToShip {
32 select {
33 case <-done:
34 return
35 case items <- item:
36 }
37 }
38 close(items)
39 }()
40 return items
41}
42
43func PackItems(done <-chan bool, items <-chan Item, workerID int) <-chan int {
44 packages := make(chan int)
45 go func() {
46 for item := range items {
47 select {
48 case <-done:
49 return
50 case packages <- item.ID:
51 time.Sleep(item.PackingEffort)
52 fmt.Printf("Worker #%d: Shipping package no. %d, took %ds to pack\n", workerID, item.ID, item.PackingEffort / time.Second)
53 }
54 }
55 close(packages)
56 }()
57 return packages
58}
59
60func merge(done <-chan bool, channels ...<-chan int) <-chan int {
61 var wg sync.WaitGroup
62
63 wg.Add(len(channels))
64 outgoingPackages := make(chan int)
65 multiplex := func(c <-chan int) {
66 defer wg.Done()
67 for i := range c {
68 select {
69 case <-done:
70 return
71 case outgoingPackages <- i:
72 }
73 }
74 }
75 for _, c := range channels {
76 go multiplex(c)
77 }
78 go func() {
79 wg.Wait()
80 close(outgoingPackages)
81 }()
82 return outgoingPackages
83}
84
85func main() {
86 done := make(chan bool)
87 defer close(done)
88
89 start := time.Now()
90
91 items := PrepareItems(done)
92
93 workers := make([]<-chan int, 4)
94 for i := 0; i<4; i++ {
95 workers[i] = PackItems(done, items, i)
96 }
97
98 numPackages := 0
99 for range merge(done, workers...) {
100 numPackages++
101 }
102
103 fmt.Printf("Took %fs to ship %d packages\n", time.Since(start).Seconds(), numPackages)
104}
Here, you can see that on lines 93-96 we setup the additional “workers” to fan-out the problem. This part actually didn’t make much sense to me the first few times looking at it. The key part is that they all reference the same channel of Item
s to read from and they each return a channel that they write to, which in the end we will “fan-in” by merging these channels back into one single channel.
By running this, you can actually see which “worker” is preparing a package:
Worker #1: Shipping package no. 1, took 1s to pack
Worker #0: Shipping package no. 0, took 1s to pack
Worker #1: Shipping package no. 4, took 1s to pack
Worker #3: Shipping package no. 3, took 2s to pack
Worker #0: Shipping package no. 5, took 2s to pack
Worker #1: Shipping package no. 6, took 3s to pack
Worker #2: Shipping package no. 2, took 5s to pack
Worker #0: Shipping package no. 8, took 3s to pack
Worker #3: Shipping package no. 7, took 5s to pack
Worker #1: Shipping package no. 9, took 2s to pack
Took 7.000000s to ship 10 packages
7 seconds is quite the improvement from 25! It’s important to note now, that the order is scrambled. Fan-out, fan-in is useful in scenarios when you have a specific stage of your pipeline that is computationally expensive (aka taking long) and that the order of your stream is not important.
I hope that this serves as a bit more verbose, useful example of the fan-out, fan-in problem. It has really helped me grasp the problem better. If you have any questions, feel free to reach me @austburn.
Twitter
Google+
Facebook
Reddit
LinkedIn
StumbleUpon
Pinterest
Email