I have re-written the core functionality of the data store mentioned in The Reliability Of Go and was surprised as to how few lines of code it boiled down to using the latest version of go.
It's here quickbeam.
devHell
Saturday, 7 June 2014
Wednesday, 6 November 2013
Dynamic Input Channels
Suppose a function produces some output on a channel, and that the output of this function is to be conditionally consumed dependant on the current system state. Also, lets suppose, the system contains a restriction in that the output channel of the producer function must remain fixed.
The question is then, how to route the output to the correct consuming logic, without reassigning the output channel.
One solution is to dynamically assign the consumer's input channel. Go provides a mechanism for this; the channel itself.
s := make(chan chan int)
c := make(chan int)
for {
select {
case c = <- s:
case v := <- c:
logic(v)
}
}
With this logic in place the consumer's input channel can be dynamically reassigned whilst the producer remains agnostic.
Here is a complete example of the mechanism at work.
Tuesday, 24 September 2013
Bullhorn
Bullhorn is a golang package that provides lightweight type-agnostic publish / subscribe messaging to goroutines.
You can find it here.
The model has been extracted from various time sensitive messaging applications I have worked with over the past few years.
Here is an example limit order / price matching procedure written using bullhorn subscriptions and events.
Running the code, will give you something like...
Got order:{CBG 0 1.05 100}
Got order:{CBG 1 1.05 200}
Matching:{CBG 0 1.05 100} against:CBG: 1655 0.8704 1.4305 2797
Matching:{CBG 1 1.05 200} against:CBG: 1655 0.8704 1.4305 2797
Matching:{CBG 0 1.05 100} against:CBG: 9414 0.8847 1.9453 9074
Matching:{CBG 1 1.05 200} against:CBG: 9414 0.8847 1.9453 9074
Matching:{CBG 0 1.05 100} against:CBG: 8787 0.9490 1.0373 12530 - Execute Order!
Matching:{CBG 1 1.05 200} against:CBG: 8787 0.9490 1.0373 12530
Matching:{CBG 1 1.05 200} against:CBG: 5840 0.9519 1.5722 12054
Matching:{CBG 1 1.05 200} against:CBG: 2485 0.9425 1.7220 11042
Matching:{CBG 1 1.05 200} against:CBG: 5235 1.0583 1.5840 4333 - Execute Order!
Here is an example limit order / price matching procedure written using bullhorn subscriptions and events.
Running the code, will give you something like...
Got order:{CBG 0 1.05 100}
Got order:{CBG 1 1.05 200}
Matching:{CBG 0 1.05 100} against:CBG: 1655 0.8704 1.4305 2797
Matching:{CBG 1 1.05 200} against:CBG: 1655 0.8704 1.4305 2797
Matching:{CBG 0 1.05 100} against:CBG: 9414 0.8847 1.9453 9074
Matching:{CBG 1 1.05 200} against:CBG: 9414 0.8847 1.9453 9074
Matching:{CBG 0 1.05 100} against:CBG: 8787 0.9490 1.0373 12530 - Execute Order!
Matching:{CBG 1 1.05 200} against:CBG: 8787 0.9490 1.0373 12530
Matching:{CBG 1 1.05 200} against:CBG: 5840 0.9519 1.5722 12054
Matching:{CBG 1 1.05 200} against:CBG: 2485 0.9425 1.7220 11042
Matching:{CBG 1 1.05 200} against:CBG: 5235 1.0583 1.5840 4333 - Execute Order!
Labels:
channels,
golang,
goroutines,
pub/sub
Thursday, 5 September 2013
Sorting share trading books with Golang
Golang's sort package provides a set of primitives that allows us to define the order of our own structures. Furthermore we can define various ways of sorting the structures by extending the base collection.
type Order struct {
Ind string // buy sell indicator
Epic string
Price float64
Quantity float64
Time int64
}
type Orders []Order
func (o Orders) Len() int {
return len(o)
}
func (o Orders) Swap(i, j int) {
o[i], o[j] = o[j], o[i]
}
type BuyOrders struct {
Orders
}
type SellOrders struct {
Orders
}
func (o BuyOrders) Less(i, j int) bool {
// buy orders are reversed
return true
}
return false
}
// because of this element we cannot simply call reverse
return o.Orders[i].Time < o.Orders[j].Time
}
func (o SellOrders) Less(i, j int) bool {
return true
}
return false
}
return o.Orders[i].Time < o.Orders[j].Time
}
Labels:
golang,
share trading,
sorting
Tuesday, 14 May 2013
The Reliability of Go
As
part of the Canonical Cloud Sprint taking place in San Francisco last
week I attended Dave Cheney's talk at the GoSF meetup on the porting and
extension of juju. Juju is an open-source cloud management and
service orchestration tool that if you haven't heard of yet, you soon
will have.
After the talk an audience member asked if Go was reliable. Having used Go in production for coming up to three years now, without incident, this came as a bit of a surprise to me. Prior to moving to Canonical I worked for one of the UK's largest market makers. A market maker is basically a wholesaler for institutional share traders and stock brokers. During my time there I replaced several key systems components with Go.
System monitoring.
The services within the system were monitored by a python script, pinging each node, discovering services, connecting the networking dots, checking health etc. Due to the complex nature of the system this script could take up to three minutes to scan nodes and process the results. The script would often stall whilst processing the vast amounts of data produced. After porting the script to Go the runtime was reduced to under one second, and we never saw a single stall when processing.
Data store.
A legacy relational database was replaced with a Go based key/value store to remove bottlenecks at market open. This service is now the key piece of architecture in the system, processing all inbound and outbound quotes/orders to and from the London Stock Exchange, the Multi-lateral Trading Facilities, and key exchanges across Europe. This service processes instructions at an average of 7 microseconds (actually, 6 under Go1.1), and never once failed, even at peaks, processing tens of thousands of instructions per second. Go is currently providing key infrastructure components within the finance industry.
As I left my old position I was in the process of swapping the messaging middleware and the third-party price feeds with services written in Go.
After the talk an audience member asked if Go was reliable. Having used Go in production for coming up to three years now, without incident, this came as a bit of a surprise to me. Prior to moving to Canonical I worked for one of the UK's largest market makers. A market maker is basically a wholesaler for institutional share traders and stock brokers. During my time there I replaced several key systems components with Go.
System monitoring.
The services within the system were monitored by a python script, pinging each node, discovering services, connecting the networking dots, checking health etc. Due to the complex nature of the system this script could take up to three minutes to scan nodes and process the results. The script would often stall whilst processing the vast amounts of data produced. After porting the script to Go the runtime was reduced to under one second, and we never saw a single stall when processing.
Data store.
A legacy relational database was replaced with a Go based key/value store to remove bottlenecks at market open. This service is now the key piece of architecture in the system, processing all inbound and outbound quotes/orders to and from the London Stock Exchange, the Multi-lateral Trading Facilities, and key exchanges across Europe. This service processes instructions at an average of 7 microseconds (actually, 6 under Go1.1), and never once failed, even at peaks, processing tens of thousands of instructions per second. Go is currently providing key infrastructure components within the finance industry.
As I left my old position I was in the process of swapping the messaging middleware and the third-party price feeds with services written in Go.
Go's adoption is gathering pace thanks to the terse syntax, straightforward powerful
standard library, excellent tooling and concurrency primitives.
Go shows real maturity beyond its relatively young age due to the experience of the core development team and the consideration that is shown when introducing language constructs and extending the standard library.
I changed positions so that I could work with Go full-time. Ask anyone that knows me and they'll tell you that I'm not a betting man; you better believe Go is reliable.
Go shows real maturity beyond its relatively young age due to the experience of the core development team and the consideration that is shown when introducing language constructs and extending the standard library.
I changed positions so that I could work with Go full-time. Ask anyone that knows me and they'll tell you that I'm not a betting man; you better believe Go is reliable.
Labels:
go,
golang,
reliability
Saturday, 16 February 2013
Waiting for Golang channels to drain
Golang's channels easily map onto the producer consumer pattern. Lets assume that everything that is produced needs to be consumed, even if the process receives a SIGTERM.
The following example shows how we can register channels, monitor for a kill signal, and then wait for everything to be consumed.
package main
import (
"log"
"os"
"os/signal"
"reflect"
"syscall"
"time"
)
var (
BufferSize = 512
MaxIter = 10
monitored []interface{}
c = make(chan int, BufferSize)
stopping bool
)
func RegisterChannel(i interface{}) {
monitored = append(monitored, i)
}
func MonitorSigTerm() chan bool {
s := make(chan os.Signal, 1)
b := make(chan bool)
signal.Notify(s, syscall.SIGTERM)
go func(c chan os.Signal, b chan bool) {
_ = <-c
log.Println("Cleaning up")
// tell the caller
b <- true
for _, i := range monitored {
ch := reflect.ValueOf(i)
if ch.Kind() != reflect.Chan {
continue
}
prev := 0
iteration := 0
for {
if ch.Len() == 0 {
break
}
if prev == ch.Len() {
iteration++
// enough?
if iteration >= MaxIter {
log.Println("Dropping")
break
}
} else {
iteration = 0
}
prev = ch.Len()
log.Printf("Draining:%v\n", prev)
// other goroutines are working, let them
time.Sleep(1e9)
}
}
os.Exit(1)
}(s, b)
return b
}
func main() {
RegisterChannel(c)
stop := MonitorSigTerm()
go func() {
i := 0
for {
if stopping {
break
}
i++
c <- i
time.Sleep(1e9)
}
}()
go func() {
for {
i := <-c
log.Printf("rx:%v\n", i)
// slower read
time.Sleep(2e9)
}
}()
stopping = <-stop
// wait for cleanup to finish
select {}
}
The following example shows how we can register channels, monitor for a kill signal, and then wait for everything to be consumed.
package main
import (
"log"
"os"
"os/signal"
"reflect"
"syscall"
"time"
)
var (
BufferSize = 512
MaxIter = 10
monitored []interface{}
c = make(chan int, BufferSize)
stopping bool
)
func RegisterChannel(i interface{}) {
monitored = append(monitored, i)
}
func MonitorSigTerm() chan bool {
s := make(chan os.Signal, 1)
b := make(chan bool)
signal.Notify(s, syscall.SIGTERM)
go func(c chan os.Signal, b chan bool) {
_ = <-c
log.Println("Cleaning up")
// tell the caller
b <- true
for _, i := range monitored {
ch := reflect.ValueOf(i)
if ch.Kind() != reflect.Chan {
continue
}
prev := 0
iteration := 0
for {
if ch.Len() == 0 {
break
}
if prev == ch.Len() {
iteration++
// enough?
if iteration >= MaxIter {
log.Println("Dropping")
break
}
} else {
iteration = 0
}
prev = ch.Len()
log.Printf("Draining:%v\n", prev)
// other goroutines are working, let them
time.Sleep(1e9)
}
}
os.Exit(1)
}(s, b)
return b
}
func main() {
RegisterChannel(c)
stop := MonitorSigTerm()
go func() {
i := 0
for {
if stopping {
break
}
i++
c <- i
time.Sleep(1e9)
}
}()
go func() {
for {
i := <-c
log.Printf("rx:%v\n", i)
// slower read
time.Sleep(2e9)
}
}()
stopping = <-stop
// wait for cleanup to finish
select {}
}
Labels:
channels,
golang,
reflection,
signals
Sunday, 20 January 2013
Golang: Overflowing JSON
Go's json.Unmarshal function works perfectly by taking a JSON blob and attempting to create a known structure from the data.
type S struct {
A int
B string
}
var s S
json.Unmarshal([]byte(`{"A": 42, "B": "b","C": "c"}`),&s)
I recently had to construct a structure in this way, but also needed to store data that overflowed the structure.
I came up with this little function to do just that.
func UnmarshalJSON(src []byte, dst interface{}) (remainder []byte, err error) {
var m map[string]interface{}
o := make(map[string]interface{})
_ = json.Unmarshal(src, &m)
// put anything that doesnt match dst into a map
rv := reflect.ValueOf(dst).Elem()
for k, v := range m {
if rv.FieldByName(k).IsValid() == false {
o[k] = v
}
}
// marshal the map to JSON
remainder, _ = json.Marshal(o)
// now fill the dst
err = json.Unmarshal(src, dst)
return
}
Now if you set src bytes to the remainder bytes in the call you can use this function to 'consume' structures from JSON blobs.
src, _ = UnmarshalJSON(src,&myStruct)
type S struct {
A int
B string
}
var s S
json.Unmarshal([]byte(`{"A": 42, "B": "b","C": "c"}`),&s)
I recently had to construct a structure in this way, but also needed to store data that overflowed the structure.
I came up with this little function to do just that.
func UnmarshalJSON(src []byte, dst interface{}) (remainder []byte, err error) {
var m map[string]interface{}
o := make(map[string]interface{})
_ = json.Unmarshal(src, &m)
// put anything that doesnt match dst into a map
rv := reflect.ValueOf(dst).Elem()
for k, v := range m {
if rv.FieldByName(k).IsValid() == false {
o[k] = v
}
}
// marshal the map to JSON
remainder, _ = json.Marshal(o)
// now fill the dst
err = json.Unmarshal(src, dst)
return
}
Now if you set src bytes to the remainder bytes in the call you can use this function to 'consume' structures from JSON blobs.
src, _ = UnmarshalJSON(src,&myStruct)
Labels:
golang JSON
Subscribe to:
Posts (Atom)