Initial commit.

This commit is contained in:
Travis
2012-12-30 19:21:31 -08:00
commit 92623001b5
8 changed files with 345 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
.idea/*
*.sublime*

26
README.md Normal file
View File

@@ -0,0 +1,26 @@
This is just a simple prototype. To get to production would need:
- Routing table in central storage (mongo or IronCache) so all routers can write to it and read to get updates.
- Update routing table from central store every X minutes.
- Remove failed routes and start new workers if it failed.
- Ability to start new workers if none are running.
- Ability to always keep a minimum number running at all times, like at least one (or not if on free account?).
- Ability to start new workers based on some auto scaling scheme.
- Authentication (same as always).
## Testing
- start helloserver.go
- start router.go
- ruby worker.rb a couple times
- ruby client.rb
What's going on?
- worker.rb connects to router and adds routes.
- client.rb connects to router which checks the routing table, proxies the request to one of the destinations and returns the response.
The idea here is that IronWorker backend can tell the router that it started a process and to start routing requests. The endpoint should only be cached for 55 minutes or so.

168
balance.go Normal file
View File

@@ -0,0 +1,168 @@
// Copyright 2010 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"container/heap"
"flag"
"fmt"
"math/rand"
"time"
)
const nRequester = 100
const nWorker = 10
var roundRobin = flag.Bool("r", false, "use round-robin scheduling")
// Simulation of some work: just sleep for a while and report how long.
func op() int {
n := rand.Int63n(5)
time.Sleep(time.Duration(n) * time.Second)
return int(n)
}
type Request struct {
fn func() int
c chan int
}
func requester(work chan Request) {
c := make(chan int)
for {
time.Sleep(time.Duration(rand.Int63n(nWorker)) * time.Second)
work <- Request{op, c}
<-c
}
}
type Worker struct {
i int
requests chan Request
pending int
}
func (w *Worker) work(done chan *Worker) {
for {
req := <-w.requests
req.c <- req.fn()
done <- w
}
}
type Pool []*Worker
func (p Pool) Len() int { return len(p) }
func (p Pool) Less(i, j int) bool {
return p[i].pending < p[j].pending
}
func (p *Pool) Swap(i, j int) {
a := *p
a[i], a[j] = a[j], a[i]
a[i].i = i
a[j].i = j
}
func (p *Pool) Push(x interface{}) {
a := *p
n := len(a)
a = a[0 : n+1]
w := x.(*Worker)
a[n] = w
w.i = n
*p = a
}
func (p *Pool) Pop() interface{} {
a := *p
*p = a[0 : len(a)-1]
w := a[len(a)-1]
w.i = -1 // for safety
return w
}
type Balancer struct {
pool Pool
done chan *Worker
i int
}
func NewBalancer() *Balancer {
done := make(chan *Worker, nWorker)
b := &Balancer{make(Pool, 0, nWorker), done, 0}
for i := 0; i < nWorker; i++ {
w := &Worker{requests: make(chan Request, nRequester)}
heap.Push(&b.pool, w)
go w.work(b.done)
}
return b
}
func (b *Balancer) balance(work chan Request) {
for {
select {
case req := <-work:
b.dispatch(req)
case w := <-b.done:
b.completed(w)
}
b.print()
}
}
func (b *Balancer) print() {
sum := 0
sumsq := 0
for _, w := range b.pool {
fmt.Printf("%d ", w.pending)
sum += w.pending
sumsq += w.pending * w.pending
}
avg := float64(sum) / float64(len(b.pool))
variance := float64(sumsq)/float64(len(b.pool)) - avg*avg
fmt.Printf(" %.2f %.2f\n", avg, variance)
}
func (b *Balancer) dispatch(req Request) {
if *roundRobin {
w := b.pool[b.i]
w.requests <- req
w.pending++
b.i++
if b.i >= len(b.pool) {
b.i = 0
}
return
}
w := heap.Pop(&b.pool).(*Worker)
w.requests <- req
w.pending++
// fmt.Printf("started %p; now %d\n", w, w.pending)
heap.Push(&b.pool, w)
}
func (b *Balancer) completed(w *Worker) {
if *roundRobin {
w.pending--
return
}
w.pending--
// fmt.Printf("finished %p; now %d\n", w, w.pending)
heap.Remove(&b.pool, w.i)
heap.Push(&b.pool, w)
}
func main() {
flag.Parse()
work := make(chan Request)
for i := 0; i < nRequester; i++ {
go requester(work)
}
NewBalancer().balance(work)
}

8
client.rb Normal file
View File

@@ -0,0 +1,8 @@
require 'rest'
rest = Rest::Client.new
rest.logger.level = Logger::DEBUG
response = rest.get("http://localhost:8080/") # "http://www.github.com")
puts "body:"
puts response.body

22
goproxy1.go Normal file
View File

@@ -0,0 +1,22 @@
package main
import (
"flag"
"github.com/elazarl/goproxy"
"log"
"net/http"
)
func main() {
verbose := flag.Bool("v", true, "should every proxy request be logged to stdout")
flag.Parse()
proxy := goproxy.NewProxyHttpServer()
proxy.Verbose = *verbose
proxy.OnRequest().DoFunc(
func(r *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) {
r.Header.Set("X-GoProxy", "yxorPoG-X")
return r, nil
})
log.Fatal(http.ListenAndServe(":8080", proxy))
}

21
helloserver.go Normal file
View File

@@ -0,0 +1,21 @@
package main
import (
"fmt"
"github.com/gorilla/mux"
"log"
"net/http"
)
func main() {
r := mux.NewRouter()
r.HandleFunc("/", Hello)
http.Handle("/", r)
log.Fatal(http.ListenAndServe(":8081", nil))
}
func Hello(w http.ResponseWriter, req *http.Request) {
fmt.Fprintln(w, "Hello world!")
}

88
src/router/router.go Normal file
View File

@@ -0,0 +1,88 @@
/*
For keeping a minimum running, perhaps when doing a routing table update, if destination hosts are all
expired or about to expire we start more.
*/
package main
import (
"encoding/json"
"fmt"
"github.com/gorilla/mux"
"log"
"math/rand"
"net/http"
"net/http/httputil"
"net/url"
"strings"
)
var routingTable = map[string]Route{}
type Route struct {
// TODO: Change destinations to a simple cache so it can expire entries after 55 minutes (the one we use in common?)
Destinations []string
}
// for adding new hosts
type Route2 struct {
Host string `json:"host"`
Dest string `json:"dest"`
}
func main() {
// verbose := flag.Bool("v", true, "should every proxy request be logged to stdout")
// flag.Parse()
r := mux.NewRouter()
s := r.Headers("Iron-Router", "").Subrouter()
s.HandleFunc("/", AddWorker)
r.HandleFunc("/addworker", AddWorker)
r.HandleFunc("/", ProxyFunc)
http.Handle("/", r)
log.Fatal(http.ListenAndServe(":8080", nil))
}
func ProxyFunc(w http.ResponseWriter, req *http.Request) {
fmt.Println("HOST:", req.Host)
host := strings.Split(req.Host, ":")[0]
route := routingTable[host]
// choose random dest
if len(route.Destinations) == 0 {
fmt.Fprintln(w, "No matching routes!")
return
}
destUrls := route.Destinations[rand.Intn(len(route.Destinations))]
// todo: should check if http:// already exists.
destUrls = "http://" + destUrls
destUrl, err := url.Parse(destUrls)
if err != nil {
panic(err)
}
proxy := httputil.NewSingleHostReverseProxy(destUrl)
proxy.ServeHTTP(w, req)
// todo: how to handle destination failures. I got this in log output when testing a bad endpoint:
// 2012/12/26 23:22:08 http: proxy error: dial tcp 127.0.0.1:8082: connection refused
}
func AddWorker(w http.ResponseWriter, req *http.Request) {
log.Println("AddWorker called!")
r2 := Route2{}
decoder := json.NewDecoder(req.Body)
decoder.Decode(&r2)
// todo: do we need to close body?
fmt.Println("DECODED:", r2)
// todo: routing table should be in mongo (or IronCache?) so all routers can update/read from it.
route := routingTable[r2.Host]
fmt.Println("ROUTE:", route)
route.Destinations = append(route.Destinations, r2.Dest)
fmt.Println("ROUTE:", route)
routingTable[r2.Host] = route
fmt.Println("New routing table:", routingTable)
fmt.Fprintln(w, "Worker added")
}

10
worker.rb Normal file
View File

@@ -0,0 +1,10 @@
require 'rest'
rest = Rest::Client.new
rest.logger.level = Logger::DEBUG
response = rest.post("http://localhost:8080/",
headers: {"Iron-Router"=>"YES!"},
body: {"host"=>"localhost", "dest"=>"localhost:8082"})
puts "body:"
puts response.body