mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Added authentication and changed routing a bit.
This commit is contained in:
@@ -7,8 +7,15 @@
|
||||
"to": "papertrail",
|
||||
"level": "debug",
|
||||
"prefix": "router"
|
||||
}
|
||||
},
|
||||
"mongo_auth": {
|
||||
"hosts": [
|
||||
"irondbauth-staging-1.irondns.info:27017",
|
||||
"irondbauth-staging-2.irondns.info:27017",
|
||||
"irondbauth-staging-3.irondns.info:27017"
|
||||
],
|
||||
"database": "simpleworker_staging",
|
||||
"username": "sworker_stage",
|
||||
"password": "sup3rwork3r"
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -16,8 +16,9 @@ code_name = @config[:iron][:code_name] || "sinatra"
|
||||
|
||||
response = rest.post(
|
||||
# "http://localhost:8080/",
|
||||
"http://router.irondns.info/?project_id=#{project_id}&token=#{token}&code_name=#{code_name}",
|
||||
headers: {"Iron-Router"=>"register", "Content-Type"=>"application/json"},
|
||||
"http://router.irondns.info/1/#{project_id}/register",
|
||||
headers: {"Iron-Router"=>"register", "Content-Type"=>"application/json",
|
||||
"Authorization"=>"OAuth #{token}"},
|
||||
body: {"host"=>host_name, "code"=>code_name}
|
||||
)
|
||||
puts "response body:"
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
"github.com/iron-io/iron_go/worker"
|
||||
"github.com/iron-io/common"
|
||||
"github.com/iron-io/golog"
|
||||
"labix.org/v2/mgo"
|
||||
"log"
|
||||
"math/rand"
|
||||
// "net"
|
||||
@@ -25,7 +26,7 @@ import (
|
||||
"time"
|
||||
"runtime"
|
||||
"flag"
|
||||
// "io/ioutil"
|
||||
// "io/ioutil"
|
||||
)
|
||||
|
||||
var config struct {
|
||||
@@ -33,6 +34,7 @@ var config struct {
|
||||
Token string `json:"token"`
|
||||
ProjectId string `json:"project_id"`
|
||||
} `json:"iron"`
|
||||
MongoAuth common.MongoConfig `json:"mongo_auth"`
|
||||
Logging struct {
|
||||
To string `json:"to"`
|
||||
Level string `json:"level"`
|
||||
@@ -43,6 +45,10 @@ var config struct {
|
||||
//var routingTable = map[string]*Route{}
|
||||
var icache = cache.New("routing-table")
|
||||
|
||||
var (
|
||||
ironAuth *common.IronAuth
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
||||
}
|
||||
@@ -84,13 +90,26 @@ func main() {
|
||||
runtime.GOMAXPROCS(runtime.NumCPU())
|
||||
log.Println("Running on", runtime.NumCPU(), "CPUs")
|
||||
|
||||
hosts := strings.Join(config.MongoAuth.Hosts, ",")
|
||||
session, err := mgo.Dial(hosts)
|
||||
if err != nil {
|
||||
log.Panicln(err)
|
||||
}
|
||||
err = session.DB(config.MongoAuth.Database).Login(config.MongoAuth.Username, config.MongoAuth.Password)
|
||||
if err != nil {
|
||||
log.Fatalln("Could not log in to db:", err)
|
||||
}
|
||||
ironAuth = common.NewIronAuth(session, config.MongoAuth.Database)
|
||||
|
||||
icache.Settings.UseConfigMap(map[string]interface{}{"token": config.Iron.Token, "project_id": config.Iron.ProjectId})
|
||||
|
||||
r := mux.NewRouter()
|
||||
s := r.Headers("Iron-Router", "").Subrouter()
|
||||
s.HandleFunc("/", AddWorker)
|
||||
r.HandleFunc("/addworker", AddWorker)
|
||||
r.HandleFunc("/ping", Ping) // for health
|
||||
// s := r.Headers("Iron-Router", "").Subrouter()
|
||||
s := r.Host("router.iron.io").Subrouter()
|
||||
s.Handle("/1/projects/{project_id:[0-9a-fA-F]{24}}/register", &common.AuthHandler{&WorkerHandler{}, ironAuth})
|
||||
s.HandleFunc("/ping", Ping)
|
||||
s.HandleFunc("/addworker", &WorkerHandler{})
|
||||
s.HandleFunc("/", Ping)
|
||||
|
||||
r.HandleFunc("/", ProxyFunc)
|
||||
|
||||
@@ -109,7 +128,7 @@ func ProxyFunc(w http.ResponseWriter, req *http.Request) {
|
||||
// 2) This host has active workers so we do the proxy
|
||||
// 3) This host has no active workers so we queue one (or more) up and return a 503 or something with message that says "try again in a minute"
|
||||
// route := routingTable[host]
|
||||
golog.Infoln("getting route for host:", host)
|
||||
golog.Infoln("getting route for host:", host)
|
||||
route, err := getRoute(host)
|
||||
// choose random dest
|
||||
if err != nil {
|
||||
@@ -142,9 +161,12 @@ func serveEndpoint(w http.ResponseWriter, req *http.Request, route *Route) {
|
||||
destUrlString2 := "http://" + destUrlString
|
||||
destUrl, err := url.Parse(destUrlString2)
|
||||
if err != nil {
|
||||
// todo: should remove destination here
|
||||
golog.Infoln("error!", err)
|
||||
panic(err)
|
||||
common.SendError(w, 500, fmt.Sprintln("Internal error occurred:", err))
|
||||
return
|
||||
}
|
||||
// todo: check destination runtime and remove it if it's expired so we don't send requests to an endpoint that is about to be killed
|
||||
golog.Infoln("proxying to", destUrl)
|
||||
proxy := NewSingleHostReverseProxy(destUrl)
|
||||
err = proxy.ServeHTTP(w, req)
|
||||
@@ -192,7 +214,7 @@ func startNewWorker(route *Route) (error) {
|
||||
golog.Infoln("Couldn't marshal json!", err)
|
||||
return err
|
||||
}
|
||||
timeout := time.Second * time.Duration(1800 + rand.Intn(600)) // a little random factor in here to spread out worker deaths
|
||||
timeout := time.Second*time.Duration(1800 + rand.Intn(600)) // a little random factor in here to spread out worker deaths
|
||||
task := worker.Task{
|
||||
CodeName: route.CodeName,
|
||||
Payload: string(jsonPayload),
|
||||
@@ -209,13 +231,49 @@ func startNewWorker(route *Route) (error) {
|
||||
return err
|
||||
}
|
||||
|
||||
// When a worker starts up, it calls this
|
||||
func Register(w http.ResponseWriter, req *http.Request) {
|
||||
log.Println("Register called!")
|
||||
|
||||
// s, err := ioutil.ReadAll(req.Body)
|
||||
// fmt.Println("req.body:", err, string(s))
|
||||
|
||||
// get project id and token
|
||||
vars := mux.Vars(req)
|
||||
projectId := vars["project_id"]
|
||||
// projectId := req.FormValue("project_id")
|
||||
// token := req.FormValue("token")
|
||||
// codeName := req.FormValue("code_name")
|
||||
token := ironAuth.GetToken(req)
|
||||
golog.Infoln("project_id:", projectId, "token:", token.Token)
|
||||
|
||||
route := Route{}
|
||||
if !common.ReadJSON(w, req, &route) {
|
||||
return
|
||||
}
|
||||
golog.Infoln("body read into route:", route)
|
||||
route.ProjectId = projectId
|
||||
route.Token = token.Token
|
||||
// todo: do we need to close body?
|
||||
err := putRoute(&route)
|
||||
if err != nil {
|
||||
golog.Infoln("couldn't register host:", err)
|
||||
common.SendError(w, 400, fmt.Sprintln("Could not register host!", err))
|
||||
return
|
||||
}
|
||||
golog.Infoln("registered route:", route)
|
||||
fmt.Fprintln(w, "Host registered successfully.")
|
||||
}
|
||||
|
||||
type WorkerHandler struct {
|
||||
}
|
||||
|
||||
// When a worker starts up, it calls this
|
||||
func AddWorker(w http.ResponseWriter, req *http.Request) {
|
||||
func (wh *WorkerHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
log.Println("AddWorker called!")
|
||||
|
||||
// s, err := ioutil.ReadAll(req.Body)
|
||||
// fmt.Println("req.body:", err, string(s))
|
||||
// s, err := ioutil.ReadAll(req.Body)
|
||||
// fmt.Println("req.body:", err, string(s))
|
||||
|
||||
// get project id and token
|
||||
projectId := req.FormValue("project_id")
|
||||
|
||||
Reference in New Issue
Block a user