From 92623001b5f63a8ecee77525dc0b853f8ba5fe79 Mon Sep 17 00:00:00 2001 From: Travis Date: Sun, 30 Dec 2012 19:21:31 -0800 Subject: [PATCH] Initial commit. --- .gitignore | 2 + README.md | 26 +++++++ balance.go | 168 +++++++++++++++++++++++++++++++++++++++++++ client.rb | 8 +++ goproxy1.go | 22 ++++++ helloserver.go | 21 ++++++ src/router/router.go | 88 +++++++++++++++++++++++ worker.rb | 10 +++ 8 files changed, 345 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 balance.go create mode 100644 client.rb create mode 100644 goproxy1.go create mode 100644 helloserver.go create mode 100644 src/router/router.go create mode 100644 worker.rb diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..d0741d053 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.idea/* +*.sublime* diff --git a/README.md b/README.md new file mode 100644 index 000000000..7f39e68eb --- /dev/null +++ b/README.md @@ -0,0 +1,26 @@ + +This is just a simple prototype. To get to production would need: + +- Routing table in central storage (mongo or IronCache) so all routers can write to it and read to get updates. +- Update routing table from central store every X minutes. +- Remove failed routes and start new workers if it failed. +- Ability to start new workers if none are running. +- Ability to always keep a minimum number running at all times, like at least one (or not if on free account?). +- Ability to start new workers based on some auto scaling scheme. +- Authentication (same as always). + + +## Testing + +- start helloserver.go +- start router.go +- ruby worker.rb a couple times +- ruby client.rb + +What's going on? + +- worker.rb connects to router and adds routes. +- client.rb connects to router which checks the routing table, proxies the request to one of the destinations and returns the response. + +The idea here is that IronWorker backend can tell the router that it started a process and to start routing requests. The endpoint should only be cached for 55 minutes or so. + diff --git a/balance.go b/balance.go new file mode 100644 index 000000000..7c189a106 --- /dev/null +++ b/balance.go @@ -0,0 +1,168 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package main + +import ( + "container/heap" + "flag" + "fmt" + "math/rand" + "time" +) + +const nRequester = 100 +const nWorker = 10 + +var roundRobin = flag.Bool("r", false, "use round-robin scheduling") + +// Simulation of some work: just sleep for a while and report how long. +func op() int { + n := rand.Int63n(5) + time.Sleep(time.Duration(n) * time.Second) + return int(n) +} + +type Request struct { + fn func() int + c chan int +} + +func requester(work chan Request) { + c := make(chan int) + for { + time.Sleep(time.Duration(rand.Int63n(nWorker)) * time.Second) + work <- Request{op, c} + <-c + } +} + +type Worker struct { + i int + requests chan Request + pending int +} + +func (w *Worker) work(done chan *Worker) { + for { + req := <-w.requests + req.c <- req.fn() + done <- w + } +} + +type Pool []*Worker + +func (p Pool) Len() int { return len(p) } + +func (p Pool) Less(i, j int) bool { + return p[i].pending < p[j].pending +} + +func (p *Pool) Swap(i, j int) { + a := *p + a[i], a[j] = a[j], a[i] + a[i].i = i + a[j].i = j +} + +func (p *Pool) Push(x interface{}) { + a := *p + n := len(a) + a = a[0 : n+1] + w := x.(*Worker) + a[n] = w + w.i = n + *p = a +} + +func (p *Pool) Pop() interface{} { + a := *p + *p = a[0 : len(a)-1] + w := a[len(a)-1] + w.i = -1 // for safety + return w +} + +type Balancer struct { + pool Pool + done chan *Worker + i int +} + +func NewBalancer() *Balancer { + done := make(chan *Worker, nWorker) + b := &Balancer{make(Pool, 0, nWorker), done, 0} + for i := 0; i < nWorker; i++ { + w := &Worker{requests: make(chan Request, nRequester)} + heap.Push(&b.pool, w) + go w.work(b.done) + } + return b +} + +func (b *Balancer) balance(work chan Request) { + for { + select { + case req := <-work: + b.dispatch(req) + case w := <-b.done: + b.completed(w) + } + b.print() + } +} + +func (b *Balancer) print() { + sum := 0 + sumsq := 0 + for _, w := range b.pool { + fmt.Printf("%d ", w.pending) + sum += w.pending + sumsq += w.pending * w.pending + } + avg := float64(sum) / float64(len(b.pool)) + variance := float64(sumsq)/float64(len(b.pool)) - avg*avg + fmt.Printf(" %.2f %.2f\n", avg, variance) +} + +func (b *Balancer) dispatch(req Request) { + if *roundRobin { + w := b.pool[b.i] + w.requests <- req + w.pending++ + b.i++ + if b.i >= len(b.pool) { + b.i = 0 + } + return + } + + w := heap.Pop(&b.pool).(*Worker) + w.requests <- req + w.pending++ + // fmt.Printf("started %p; now %d\n", w, w.pending) + heap.Push(&b.pool, w) +} + +func (b *Balancer) completed(w *Worker) { + if *roundRobin { + w.pending-- + return + } + + w.pending-- + // fmt.Printf("finished %p; now %d\n", w, w.pending) + heap.Remove(&b.pool, w.i) + heap.Push(&b.pool, w) +} + +func main() { + flag.Parse() + work := make(chan Request) + for i := 0; i < nRequester; i++ { + go requester(work) + } + NewBalancer().balance(work) +} diff --git a/client.rb b/client.rb new file mode 100644 index 000000000..d73e35ab0 --- /dev/null +++ b/client.rb @@ -0,0 +1,8 @@ +require 'rest' + +rest = Rest::Client.new +rest.logger.level = Logger::DEBUG +response = rest.get("http://localhost:8080/") # "http://www.github.com") + +puts "body:" +puts response.body diff --git a/goproxy1.go b/goproxy1.go new file mode 100644 index 000000000..6664f6906 --- /dev/null +++ b/goproxy1.go @@ -0,0 +1,22 @@ +package main + +import ( + "flag" + "github.com/elazarl/goproxy" + "log" + "net/http" +) + +func main() { + verbose := flag.Bool("v", true, "should every proxy request be logged to stdout") + flag.Parse() + proxy := goproxy.NewProxyHttpServer() + proxy.Verbose = *verbose + proxy.OnRequest().DoFunc( + func(r *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) { + r.Header.Set("X-GoProxy", "yxorPoG-X") + return r, nil + }) + + log.Fatal(http.ListenAndServe(":8080", proxy)) +} diff --git a/helloserver.go b/helloserver.go new file mode 100644 index 000000000..322d2b101 --- /dev/null +++ b/helloserver.go @@ -0,0 +1,21 @@ +package main + +import ( + "fmt" + "github.com/gorilla/mux" + "log" + "net/http" +) + +func main() { + + r := mux.NewRouter() + r.HandleFunc("/", Hello) + + http.Handle("/", r) + log.Fatal(http.ListenAndServe(":8081", nil)) +} + +func Hello(w http.ResponseWriter, req *http.Request) { + fmt.Fprintln(w, "Hello world!") +} diff --git a/src/router/router.go b/src/router/router.go new file mode 100644 index 000000000..7b4a7abc0 --- /dev/null +++ b/src/router/router.go @@ -0,0 +1,88 @@ +/* + +For keeping a minimum running, perhaps when doing a routing table update, if destination hosts are all + expired or about to expire we start more. + +*/ + +package main + +import ( + "encoding/json" + "fmt" + "github.com/gorilla/mux" + "log" + "math/rand" + "net/http" + "net/http/httputil" + "net/url" + "strings" +) + +var routingTable = map[string]Route{} + +type Route struct { + // TODO: Change destinations to a simple cache so it can expire entries after 55 minutes (the one we use in common?) + Destinations []string +} + +// for adding new hosts +type Route2 struct { + Host string `json:"host"` + Dest string `json:"dest"` +} + +func main() { + // verbose := flag.Bool("v", true, "should every proxy request be logged to stdout") + // flag.Parse() + + r := mux.NewRouter() + s := r.Headers("Iron-Router", "").Subrouter() + s.HandleFunc("/", AddWorker) + r.HandleFunc("/addworker", AddWorker) + + r.HandleFunc("/", ProxyFunc) + + http.Handle("/", r) + log.Fatal(http.ListenAndServe(":8080", nil)) +} + +func ProxyFunc(w http.ResponseWriter, req *http.Request) { + fmt.Println("HOST:", req.Host) + host := strings.Split(req.Host, ":")[0] + route := routingTable[host] + // choose random dest + if len(route.Destinations) == 0 { + fmt.Fprintln(w, "No matching routes!") + return + } + destUrls := route.Destinations[rand.Intn(len(route.Destinations))] + // todo: should check if http:// already exists. + destUrls = "http://" + destUrls + destUrl, err := url.Parse(destUrls) + if err != nil { + panic(err) + } + proxy := httputil.NewSingleHostReverseProxy(destUrl) + proxy.ServeHTTP(w, req) + // todo: how to handle destination failures. I got this in log output when testing a bad endpoint: + // 2012/12/26 23:22:08 http: proxy error: dial tcp 127.0.0.1:8082: connection refused +} + +func AddWorker(w http.ResponseWriter, req *http.Request) { + log.Println("AddWorker called!") + r2 := Route2{} + decoder := json.NewDecoder(req.Body) + decoder.Decode(&r2) + // todo: do we need to close body? + fmt.Println("DECODED:", r2) + + // todo: routing table should be in mongo (or IronCache?) so all routers can update/read from it. + route := routingTable[r2.Host] + fmt.Println("ROUTE:", route) + route.Destinations = append(route.Destinations, r2.Dest) + fmt.Println("ROUTE:", route) + routingTable[r2.Host] = route + fmt.Println("New routing table:", routingTable) + fmt.Fprintln(w, "Worker added") +} diff --git a/worker.rb b/worker.rb new file mode 100644 index 000000000..20cbc9700 --- /dev/null +++ b/worker.rb @@ -0,0 +1,10 @@ +require 'rest' + +rest = Rest::Client.new +rest.logger.level = Logger::DEBUG +response = rest.post("http://localhost:8080/", + headers: {"Iron-Router"=>"YES!"}, + body: {"host"=>"localhost", "dest"=>"localhost:8082"}) +puts "body:" +puts response.body +