mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Caching routes.
This commit is contained in:
5
config_development.yml
Normal file
5
config_development.yml
Normal file
@@ -0,0 +1,5 @@
|
||||
iron:
|
||||
token: MWx0VfngzsCu0W8NAYw7S2lNrgo
|
||||
project_id: 4fd2729368a0197d1102056b
|
||||
|
||||
|
||||
38
register_host.rb
Normal file
38
register_host.rb
Normal file
@@ -0,0 +1,38 @@
|
||||
require 'rest'
|
||||
require 'uber_config'
|
||||
|
||||
@config = UberConfig.load('config_development.yml')
|
||||
|
||||
rest = Rest::Client.new
|
||||
rest.logger.level = Logger::DEBUG
|
||||
|
||||
project_id = @config[:project_id]
|
||||
token = @config[:token]
|
||||
# host name
|
||||
host_name = @config[:host_name] || "routertest.irondns.info"
|
||||
# which worker to run
|
||||
code_name = @config[:code_name] || "sinatra"
|
||||
|
||||
|
||||
response = rest.post(
|
||||
# "http://localhost:8080/",
|
||||
"http://router.irondns.info/?project_id=#{project_id}&token=#{token}&code_name=#{code_name}",
|
||||
headers: {"Iron-Router"=>"register"},
|
||||
body: {"host"=>host_name, "code"=>code_name)
|
||||
puts "body:"
|
||||
puts response.body
|
||||
puts "\n\n=======\n\n"
|
||||
|
||||
STDOUT.flush
|
||||
|
||||
# Now we start the actual worker
|
||||
##################################################################3
|
||||
|
||||
ENV['PORT'] = port.to_s # for sinatra
|
||||
my_app = Sinatra.new do
|
||||
set :port, port
|
||||
get('/') { "hi" }
|
||||
get('/*') { "you passed in #{params[:splat].inspect}" }
|
||||
end
|
||||
my_app.run!
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/iron-io/iron_go/cache"
|
||||
"github.com/iron-io/iron_go/worker"
|
||||
"github.com/iron-io/common"
|
||||
"log"
|
||||
"math/rand"
|
||||
// "net"
|
||||
@@ -21,22 +22,55 @@ import (
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
"runtime"
|
||||
"flag"
|
||||
)
|
||||
|
||||
var routingTable = map[string]*Route{}
|
||||
var icache = cache.New("routertest")
|
||||
var config struct {
|
||||
Iron struct {
|
||||
Token string `json:"token"`
|
||||
ProjectId string `json:"project_id"`
|
||||
} `json:"iron"`
|
||||
Logging struct {
|
||||
To string `json:"to"`
|
||||
Level string `json:"level"`
|
||||
Prefix string `json:"prefix"`
|
||||
}
|
||||
}
|
||||
|
||||
//var routingTable = map[string]*Route{}
|
||||
var icache = cache.New("routing-table")
|
||||
|
||||
func init() {
|
||||
icache.Settings.UseConfigMap(map[string]interface{}{"token": "MWx0VfngzsCu0W8NAYw7S2lNrgo", "project_id": "50e227be8e7d14359b001373"})
|
||||
runtime.GOMAXPROCS(runtime.NumCPU())
|
||||
log.Println("Running on", runtime.NumCPU(), "CPUs")
|
||||
|
||||
var configFile string
|
||||
var env string
|
||||
flag.StringVar(&configFile, "c", "", "Config file name")
|
||||
flag.StringVar(&env, "e", "development", "environment")
|
||||
|
||||
flag.Parse() // Scans the arg list and sets up flags
|
||||
|
||||
// Deployer is now passing -c in since we're using upstart and it doesn't know what directory to run in
|
||||
if configFile == "" {
|
||||
configFile = "config_" + env + ".json"
|
||||
}
|
||||
|
||||
common.LoadConfig("iron_mq", configFile, &config)
|
||||
common.SetLogLevel(config.Logging.Level)
|
||||
common.SetLogLocation(config.Logging.To, config.Logging.Prefix)
|
||||
|
||||
icache.Settings.UseConfigMap(map[string]interface{}{"token": config.Iron.Token, "project_id": config.Iron.ProjectId})
|
||||
}
|
||||
|
||||
type Route struct {
|
||||
// TODO: Change destinations to a simple cache so it can expire entries after 55 minutes (the one we use in common?)
|
||||
Host string
|
||||
Destinations []string
|
||||
ProjectId string
|
||||
Token string // store this so we can queue up new workers on demand
|
||||
CodeName string
|
||||
Host string `json:"host"`
|
||||
Destinations []string `json:"destinations"`
|
||||
ProjectId string `json:"project_id"`
|
||||
Token string `json:"token"` // store this so we can queue up new workers on demand
|
||||
CodeName string `json:"code_name"`
|
||||
}
|
||||
|
||||
// for adding new hosts
|
||||
@@ -46,8 +80,7 @@ type Route2 struct {
|
||||
}
|
||||
|
||||
func main() {
|
||||
// verbose := flag.Bool("v", true, "should every proxy request be logged to stdout")
|
||||
// flag.Parse()
|
||||
|
||||
|
||||
r := mux.NewRouter()
|
||||
s := r.Headers("Iron-Router", "").Subrouter()
|
||||
@@ -70,12 +103,17 @@ func ProxyFunc(w http.ResponseWriter, req *http.Request) {
|
||||
// 1) This host was never registered so we return 404
|
||||
// 2) This host has active workers so we do the proxy
|
||||
// 3) This host has no active workers so we queue one (or more) up and return a 503 or something with message that says "try again in a minute"
|
||||
route := routingTable[host]
|
||||
// route := routingTable[host]
|
||||
route, err := getRoute(host)
|
||||
// choose random dest
|
||||
if route == nil { // route.Host == "" {
|
||||
fmt.Fprintln(w, "Host not configured!")
|
||||
if err != nil {
|
||||
common.SendError(w, 400, fmt.Sprintln(w, "Host not configured or error!", err))
|
||||
return
|
||||
}
|
||||
// if route == nil { // route.Host == "" {
|
||||
// common.SendError(w, 400, fmt.Sprintln(w, "Host not configured!"))
|
||||
// return
|
||||
// }
|
||||
destIndex := rand.Intn(len(route.Destinations))
|
||||
destUrlString := route.Destinations[destIndex]
|
||||
// todo: should check if http:// already exists.
|
||||
@@ -97,13 +135,15 @@ func ProxyFunc(w http.ResponseWriter, req *http.Request) {
|
||||
if strings.Contains(etype.String(), "net.OpError") { // == reflect.TypeOf(net.OpError{}) { // couldn't figure out a better way to do this
|
||||
if len(route.Destinations) > 1 {
|
||||
fmt.Println("It's a network error, removing this destination from routing table.")
|
||||
route.Destinations = append(route.Destinations[:destIndex], route.Destinations[destIndex+1:]...)
|
||||
fmt.Println("New routing table:", routingTable)
|
||||
route.Destinations = append(route.Destinations[:destIndex], route.Destinations[destIndex + 1:]...)
|
||||
putRoute(route)
|
||||
fmt.Println("New route:", route)
|
||||
return
|
||||
} else {
|
||||
fmt.Println("It's a network error and no other destinations available so we're going to remove it and start new task.")
|
||||
route.Destinations = append(route.Destinations[:destIndex], route.Destinations[destIndex+1:]...)
|
||||
fmt.Println("New routing table:", routingTable)
|
||||
route.Destinations = append(route.Destinations[:destIndex], route.Destinations[destIndex + 1:]...)
|
||||
putRoute(route)
|
||||
fmt.Println("New route:", route)
|
||||
}
|
||||
// start new worker
|
||||
payload := map[string]interface{}{
|
||||
@@ -118,7 +158,7 @@ func ProxyFunc(w http.ResponseWriter, req *http.Request) {
|
||||
fmt.Println("Couldn't marshal json!", err)
|
||||
return
|
||||
}
|
||||
timeout := time.Second * 120
|
||||
timeout := time.Second*120
|
||||
task := worker.Task{
|
||||
CodeName: route.CodeName,
|
||||
Payload: string(jsonPayload),
|
||||
@@ -145,32 +185,62 @@ func ProxyFunc(w http.ResponseWriter, req *http.Request) {
|
||||
func AddWorker(w http.ResponseWriter, req *http.Request) {
|
||||
log.Println("AddWorker called!")
|
||||
|
||||
r2 := Route2{}
|
||||
decoder := json.NewDecoder(req.Body)
|
||||
decoder.Decode(&r2)
|
||||
// todo: do we need to close body?
|
||||
fmt.Println("DECODED:", r2)
|
||||
|
||||
// get project id and token
|
||||
projectId := req.FormValue("project_id")
|
||||
token := req.FormValue("token")
|
||||
codeName := req.FormValue("code_name")
|
||||
fmt.Println("project_id:", projectId, "token:", token, "code_name:", codeName)
|
||||
|
||||
// todo: routing table should be in mongo (or IronCache?) so all routers can update/read from it.
|
||||
// todo: one cache entry per host domain
|
||||
route := routingTable[r2.Host]
|
||||
if route == nil {
|
||||
route = &Route{}
|
||||
// check header for what operation to perform
|
||||
routerHeader := req.Header.Get("Iron-Router")
|
||||
if routerHeader == "register" {
|
||||
route := Route{}
|
||||
decoder := json.NewDecoder(req.Body)
|
||||
decoder.Decode(&route)
|
||||
route.ProjectId = projectId
|
||||
route.Token = token
|
||||
route.CodeName = codeName
|
||||
// todo: do we need to close body?
|
||||
fmt.Println("registered route:", route)
|
||||
|
||||
putRoute(route)
|
||||
|
||||
} else {
|
||||
r2 := Route2{}
|
||||
decoder := json.NewDecoder(req.Body)
|
||||
decoder.Decode(&r2)
|
||||
// todo: do we need to close body?
|
||||
fmt.Println("DECODED:", r2)
|
||||
|
||||
route, err := getRoute(r2.Host)
|
||||
// route := routingTable[r2.Host]
|
||||
if err != nil {
|
||||
common.SendError(w, 400, fmt.Sprintln(w, "This host is not registered!", err))
|
||||
return
|
||||
// route = &Route{}
|
||||
}
|
||||
fmt.Println("ROUTE:", route)
|
||||
route.Destinations = append(route.Destinations, r2.Dest)
|
||||
fmt.Println("ROUTE new:", route)
|
||||
|
||||
putRoute(route)
|
||||
// routingTable[r2.Host] = route
|
||||
// fmt.Println("New routing table:", routingTable)
|
||||
fmt.Fprintln(w, "Worker added")
|
||||
}
|
||||
fmt.Println("ROUTE:", route)
|
||||
route.Host = r2.Host
|
||||
route.Destinations = append(route.Destinations, r2.Dest)
|
||||
route.ProjectId = projectId
|
||||
route.Token = token
|
||||
route.CodeName = codeName
|
||||
fmt.Println("ROUTE:", route)
|
||||
routingTable[r2.Host] = route
|
||||
fmt.Println("New routing table:", routingTable)
|
||||
fmt.Fprintln(w, "Worker added")
|
||||
}
|
||||
|
||||
func getRoute(host string) (Route, error) {
|
||||
rx, err := icache.Get(host)
|
||||
route := Route{}
|
||||
if err == nil {
|
||||
route = rx.(Route)
|
||||
}
|
||||
return route, err
|
||||
}
|
||||
|
||||
func putRoute(route Route) {
|
||||
item := cache.Item{}
|
||||
item.Value = route
|
||||
icache.Put(route.Host, &item)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user