From 529c83acba8a5fe2c4bafb0ed26da09db1d513cd Mon Sep 17 00:00:00 2001 From: Travis Date: Tue, 15 Jan 2013 23:53:17 -0800 Subject: [PATCH] Caching routes. --- config_development.yml | 5 ++ register_host.rb | 38 +++++++++++ src/router/router.go | 148 ++++++++++++++++++++++++++++++----------- 3 files changed, 152 insertions(+), 39 deletions(-) create mode 100644 config_development.yml create mode 100644 register_host.rb diff --git a/config_development.yml b/config_development.yml new file mode 100644 index 000000000..a6d864c45 --- /dev/null +++ b/config_development.yml @@ -0,0 +1,5 @@ +iron: + token: MWx0VfngzsCu0W8NAYw7S2lNrgo + project_id: 4fd2729368a0197d1102056b + + diff --git a/register_host.rb b/register_host.rb new file mode 100644 index 000000000..f12886bce --- /dev/null +++ b/register_host.rb @@ -0,0 +1,38 @@ +require 'rest' +require 'uber_config' + +@config = UberConfig.load('config_development.yml') + +rest = Rest::Client.new +rest.logger.level = Logger::DEBUG + +project_id = @config[:project_id] +token = @config[:token] +# host name +host_name = @config[:host_name] || "routertest.irondns.info" +# which worker to run +code_name = @config[:code_name] || "sinatra" + + +response = rest.post( +# "http://localhost:8080/", + "http://router.irondns.info/?project_id=#{project_id}&token=#{token}&code_name=#{code_name}", + headers: {"Iron-Router"=>"register"}, + body: {"host"=>host_name, "code"=>code_name) +puts "body:" +puts response.body +puts "\n\n=======\n\n" + +STDOUT.flush + +# Now we start the actual worker +##################################################################3 + +ENV['PORT'] = port.to_s # for sinatra +my_app = Sinatra.new do + set :port, port + get('/') { "hi" } + get('/*') { "you passed in #{params[:splat].inspect}" } +end +my_app.run! + diff --git a/src/router/router.go b/src/router/router.go index 24984385d..2845e7133 100644 --- a/src/router/router.go +++ b/src/router/router.go @@ -13,6 +13,7 @@ import ( "github.com/gorilla/mux" "github.com/iron-io/iron_go/cache" "github.com/iron-io/iron_go/worker" + "github.com/iron-io/common" "log" "math/rand" // "net" @@ -21,22 +22,55 @@ import ( "reflect" "strings" "time" + "runtime" + "flag" ) -var routingTable = map[string]*Route{} -var icache = cache.New("routertest") +var config struct { + Iron struct { + Token string `json:"token"` + ProjectId string `json:"project_id"` +} `json:"iron"` + Logging struct { + To string `json:"to"` + Level string `json:"level"` + Prefix string `json:"prefix"` +} +} + +//var routingTable = map[string]*Route{} +var icache = cache.New("routing-table") func init() { - icache.Settings.UseConfigMap(map[string]interface{}{"token": "MWx0VfngzsCu0W8NAYw7S2lNrgo", "project_id": "50e227be8e7d14359b001373"}) + runtime.GOMAXPROCS(runtime.NumCPU()) + log.Println("Running on", runtime.NumCPU(), "CPUs") + + var configFile string + var env string + flag.StringVar(&configFile, "c", "", "Config file name") + flag.StringVar(&env, "e", "development", "environment") + + flag.Parse() // Scans the arg list and sets up flags + + // Deployer is now passing -c in since we're using upstart and it doesn't know what directory to run in + if configFile == "" { + configFile = "config_" + env + ".json" + } + + common.LoadConfig("iron_mq", configFile, &config) + common.SetLogLevel(config.Logging.Level) + common.SetLogLocation(config.Logging.To, config.Logging.Prefix) + + icache.Settings.UseConfigMap(map[string]interface{}{"token": config.Iron.Token, "project_id": config.Iron.ProjectId}) } type Route struct { // TODO: Change destinations to a simple cache so it can expire entries after 55 minutes (the one we use in common?) - Host string - Destinations []string - ProjectId string - Token string // store this so we can queue up new workers on demand - CodeName string + Host string `json:"host"` + Destinations []string `json:"destinations"` + ProjectId string `json:"project_id"` + Token string `json:"token"` // store this so we can queue up new workers on demand + CodeName string `json:"code_name"` } // for adding new hosts @@ -46,8 +80,7 @@ type Route2 struct { } func main() { - // verbose := flag.Bool("v", true, "should every proxy request be logged to stdout") - // flag.Parse() + r := mux.NewRouter() s := r.Headers("Iron-Router", "").Subrouter() @@ -70,12 +103,17 @@ func ProxyFunc(w http.ResponseWriter, req *http.Request) { // 1) This host was never registered so we return 404 // 2) This host has active workers so we do the proxy // 3) This host has no active workers so we queue one (or more) up and return a 503 or something with message that says "try again in a minute" - route := routingTable[host] + // route := routingTable[host] + route, err := getRoute(host) // choose random dest - if route == nil { // route.Host == "" { - fmt.Fprintln(w, "Host not configured!") + if err != nil { + common.SendError(w, 400, fmt.Sprintln(w, "Host not configured or error!", err)) return } + // if route == nil { // route.Host == "" { + // common.SendError(w, 400, fmt.Sprintln(w, "Host not configured!")) + // return + // } destIndex := rand.Intn(len(route.Destinations)) destUrlString := route.Destinations[destIndex] // todo: should check if http:// already exists. @@ -97,13 +135,15 @@ func ProxyFunc(w http.ResponseWriter, req *http.Request) { if strings.Contains(etype.String(), "net.OpError") { // == reflect.TypeOf(net.OpError{}) { // couldn't figure out a better way to do this if len(route.Destinations) > 1 { fmt.Println("It's a network error, removing this destination from routing table.") - route.Destinations = append(route.Destinations[:destIndex], route.Destinations[destIndex+1:]...) - fmt.Println("New routing table:", routingTable) + route.Destinations = append(route.Destinations[:destIndex], route.Destinations[destIndex + 1:]...) + putRoute(route) + fmt.Println("New route:", route) return } else { fmt.Println("It's a network error and no other destinations available so we're going to remove it and start new task.") - route.Destinations = append(route.Destinations[:destIndex], route.Destinations[destIndex+1:]...) - fmt.Println("New routing table:", routingTable) + route.Destinations = append(route.Destinations[:destIndex], route.Destinations[destIndex + 1:]...) + putRoute(route) + fmt.Println("New route:", route) } // start new worker payload := map[string]interface{}{ @@ -118,7 +158,7 @@ func ProxyFunc(w http.ResponseWriter, req *http.Request) { fmt.Println("Couldn't marshal json!", err) return } - timeout := time.Second * 120 + timeout := time.Second*120 task := worker.Task{ CodeName: route.CodeName, Payload: string(jsonPayload), @@ -145,32 +185,62 @@ func ProxyFunc(w http.ResponseWriter, req *http.Request) { func AddWorker(w http.ResponseWriter, req *http.Request) { log.Println("AddWorker called!") - r2 := Route2{} - decoder := json.NewDecoder(req.Body) - decoder.Decode(&r2) - // todo: do we need to close body? - fmt.Println("DECODED:", r2) - // get project id and token projectId := req.FormValue("project_id") token := req.FormValue("token") codeName := req.FormValue("code_name") fmt.Println("project_id:", projectId, "token:", token, "code_name:", codeName) - // todo: routing table should be in mongo (or IronCache?) so all routers can update/read from it. - // todo: one cache entry per host domain - route := routingTable[r2.Host] - if route == nil { - route = &Route{} + // check header for what operation to perform + routerHeader := req.Header.Get("Iron-Router") + if routerHeader == "register" { + route := Route{} + decoder := json.NewDecoder(req.Body) + decoder.Decode(&route) + route.ProjectId = projectId + route.Token = token + route.CodeName = codeName + // todo: do we need to close body? + fmt.Println("registered route:", route) + + putRoute(route) + + } else { + r2 := Route2{} + decoder := json.NewDecoder(req.Body) + decoder.Decode(&r2) + // todo: do we need to close body? + fmt.Println("DECODED:", r2) + + route, err := getRoute(r2.Host) + // route := routingTable[r2.Host] + if err != nil { + common.SendError(w, 400, fmt.Sprintln(w, "This host is not registered!", err)) + return + // route = &Route{} + } + fmt.Println("ROUTE:", route) + route.Destinations = append(route.Destinations, r2.Dest) + fmt.Println("ROUTE new:", route) + + putRoute(route) + // routingTable[r2.Host] = route + // fmt.Println("New routing table:", routingTable) + fmt.Fprintln(w, "Worker added") } - fmt.Println("ROUTE:", route) - route.Host = r2.Host - route.Destinations = append(route.Destinations, r2.Dest) - route.ProjectId = projectId - route.Token = token - route.CodeName = codeName - fmt.Println("ROUTE:", route) - routingTable[r2.Host] = route - fmt.Println("New routing table:", routingTable) - fmt.Fprintln(w, "Worker added") +} + +func getRoute(host string) (Route, error) { + rx, err := icache.Get(host) + route := Route{} + if err == nil { + route = rx.(Route) + } + return route, err +} + +func putRoute(route Route) { + item := cache.Item{} + item.Value = route + icache.Put(route.Host, &item) }