Saturday 16 February 2013

Waiting for Golang channels to drain

Golang's channels easily map onto the producer consumer pattern. Lets assume that everything that is produced needs to be consumed, even if the process receives a SIGTERM.

The following example shows how we can register channels, monitor for a kill signal, and then wait for everything to be consumed.



package main

import (
"log"
"os"
"os/signal"
"reflect"
"syscall"
"time"
)

var (
BufferSize = 512
MaxIter    = 10
monitored  []interface{}
c          = make(chan int, BufferSize)
stopping   bool
)

func RegisterChannel(i interface{}) {
monitored = append(monitored, i)
}

func MonitorSigTerm() chan bool {
s := make(chan os.Signal, 1)
b := make(chan bool)
signal.Notify(s, syscall.SIGTERM)

go func(c chan os.Signal, b chan bool) {
_ = <-c
log.Println("Cleaning up")
// tell the caller
b <- true
for _, i := range monitored {
ch := reflect.ValueOf(i)
if ch.Kind() != reflect.Chan {
continue
}
prev := 0
iteration := 0
for {
if ch.Len() == 0 {
break
}

if prev == ch.Len() {
iteration++
// enough?
if iteration >= MaxIter {
log.Println("Dropping")
break
}
} else {
iteration = 0
}

prev = ch.Len()
log.Printf("Draining:%v\n", prev)
// other goroutines are working, let them
time.Sleep(1e9)
}
}
os.Exit(1)
}(s, b)
return b
}

func main() {
RegisterChannel(c)
stop := MonitorSigTerm()

go func() {
i := 0
for {
if stopping {
break
}
i++
c <- i
time.Sleep(1e9)
}
}()

go func() {
for {
i := <-c
log.Printf("rx:%v\n", i)
// slower read
time.Sleep(2e9)
}
}()

stopping = <-stop

// wait for cleanup to finish
select {}

}