Using super token to queue up new tasks.

This commit is contained in:
Travis
2013-01-27 15:52:57 -08:00
parent f68d100380
commit 31cdc21fc2
3 changed files with 39 additions and 39 deletions

View File

@@ -22,16 +22,17 @@ This is just a simple prototype. To get to production would need:
## Testing ## Testing
- start helloserver.go - go build router.go
- start router.go - sudo ./router.go
- ruby worker.rb a couple times - ruby register_host.rb
- ruby client.rb - not a good way to really test beyond that locally
## Testing for reals ## Testing for reals
- start router.go on remote server (there's a test project on SD already: http://www.simpledeployer.com/projects/ea129e74-52fa-11e2-a91a-12313d008ea2/servers) - start router.go on remote server (there's a test project on SD already: http://www.simpledeployer.com/projects/ea129e74-52fa-11e2-a91a-12313d008ea2/servers)
- go build ./src/router; sudo ./router - go build ./src/router; sudo ./router
- iron_worker upload sinatra - iron_worker upload sinatra
- iron_worker queue sinatra - ruby register_host.rb
- ruby client.rb - visit http://routertest.iron.io (or ruby client.rb)
- BOOM! - BOOM!

View File

@@ -1,7 +1,8 @@
{ {
"iron": { "iron": {
"token": "MWx0VfngzsCu0W8NAYw7S2lNrgo", "token": "MWx0VfngzsCu0W8NAYw7S2lNrgo",
"project_id": "4fd2729368a0197d1102056b" "project_id": "4fd2729368a0197d1102056b",
"super_token": "QpJzvVokpKwi9vjQfSU9ZTWUGhU"
}, },
"logging": { "logging": {
"to": "papertrail", "to": "papertrail",

View File

@@ -11,43 +11,45 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/iron-io/iron_go/cache"
"github.com/iron-io/iron_go/worker"
"github.com/iron-io/common" "github.com/iron-io/common"
"github.com/iron-io/golog" "github.com/iron-io/golog"
"github.com/iron-io/iron_go/cache"
"github.com/iron-io/iron_go/worker"
"labix.org/v2/mgo" "labix.org/v2/mgo"
"log" "log"
"math/rand" "math/rand"
// "net" // "net"
"flag"
"net/http" "net/http"
"net/url" "net/url"
"reflect" "reflect"
"runtime"
"strings" "strings"
"time" "time"
"runtime"
"flag"
// "io/ioutil" // "io/ioutil"
) )
var config struct { var config struct {
Iron struct { Iron struct {
Token string `json:"token"` Token string `json:"token"`
ProjectId string `json:"project_id"` ProjectId string `json:"project_id"`
} `json:"iron"` SuperToken string `json:"super_token"`
MongoAuth common.MongoConfig `json:"mongo_auth"` } `json:"iron"`
Logging struct { MongoAuth common.MongoConfig `json:"mongo_auth"`
To string `json:"to"` Logging struct {
Level string `json:"level"` To string `json:"to"`
Prefix string `json:"prefix"` Level string `json:"level"`
} Prefix string `json:"prefix"`
}
} }
var version = "0.0.14" var version = "0.0.14"
//var routingTable = map[string]*Route{} //var routingTable = map[string]*Route{}
var icache = cache.New("routing-table") var icache = cache.New("routing-table")
var ( var (
ironAuth *common.IronAuth ironAuth *common.IronAuth
) )
func init() { func init() {
@@ -56,11 +58,11 @@ func init() {
type Route struct { type Route struct {
// TODO: Change destinations to a simple cache so it can expire entries after 55 minutes (the one we use in common?) // TODO: Change destinations to a simple cache so it can expire entries after 55 minutes (the one we use in common?)
Host string `json:"host"` Host string `json:"host"`
Destinations []string `json:"destinations"` Destinations []string `json:"destinations"`
ProjectId string `json:"project_id"` ProjectId string `json:"project_id"`
Token string `json:"token"` // store this so we can queue up new workers on demand Token string `json:"token"` // store this so we can queue up new workers on demand
CodeName string `json:"code_name"` CodeName string `json:"code_name"`
} }
// for adding new hosts // for adding new hosts
@@ -196,8 +198,8 @@ func serveEndpoint(w http.ResponseWriter, req *http.Request, route *Route) {
golog.Infoln("Served!") golog.Infoln("Served!")
} }
func removeDestination(route *Route, destIndex int, w http.ResponseWriter){ func removeDestination(route *Route, destIndex int, w http.ResponseWriter) {
route.Destinations = append(route.Destinations[:destIndex], route.Destinations[destIndex + 1:]...) route.Destinations = append(route.Destinations[:destIndex], route.Destinations[destIndex+1:]...)
err := putRoute(route) err := putRoute(route)
if err != nil { if err != nil {
golog.Infoln("Couldn't update routing table:", err) golog.Infoln("Couldn't update routing table:", err)
@@ -212,11 +214,11 @@ func removeDestination(route *Route, destIndex int, w http.ResponseWriter){
} }
} }
func startNewWorker(route *Route) (error) { func startNewWorker(route *Route) error {
golog.Infoln("Starting a new worker") golog.Infoln("Starting a new worker")
// start new worker // start new worker
payload := map[string]interface{}{ payload := map[string]interface{}{
"token": route.Token, "token": config.Iron.SuperToken,
"project_id": route.ProjectId, "project_id": route.ProjectId,
"code_name": route.CodeName, "code_name": route.CodeName,
} }
@@ -227,7 +229,7 @@ func startNewWorker(route *Route) (error) {
golog.Infoln("Couldn't marshal json!", err) golog.Infoln("Couldn't marshal json!", err)
return err return err
} }
timeout := time.Second*time.Duration(1800 + rand.Intn(600)) // a little random factor in here to spread out worker deaths timeout := time.Second * time.Duration(1800+rand.Intn(600)) // a little random factor in here to spread out worker deaths
task := worker.Task{ task := worker.Task{
CodeName: route.CodeName, CodeName: route.CodeName,
Payload: string(jsonPayload), Payload: string(jsonPayload),
@@ -244,7 +246,7 @@ func startNewWorker(route *Route) (error) {
return err return err
} }
type Register struct {} type Register struct{}
// This registers a new host // This registers a new host
func (r *Register) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (r *Register) ServeHTTP(w http.ResponseWriter, req *http.Request) {
@@ -297,8 +299,8 @@ func (wh *WorkerHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// get project id and token // get project id and token
projectId := req.FormValue("project_id") projectId := req.FormValue("project_id")
token := req.FormValue("token") token := req.FormValue("token")
codeName := req.FormValue("code_name") // codeName := req.FormValue("code_name")
golog.Infoln("project_id:", projectId, "token:", token, "code_name:", codeName) golog.Infoln("project_id:", projectId, "token:", token)
// check header for what operation to perform // check header for what operation to perform
routerHeader := req.Header.Get("Iron-Router") routerHeader := req.Header.Get("Iron-Router")
@@ -312,11 +314,9 @@ func (wh *WorkerHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// todo: do we need to close body? // todo: do we need to close body?
golog.Infoln("DECODED:", r2) golog.Infoln("DECODED:", r2)
route, err := getRoute(r2.Host) route, err := getRoute(r2.Host)
// route := routingTable[r2.Host]
if err != nil { if err != nil {
common.SendError(w, 400, fmt.Sprintln("This host is not registered!", err)) common.SendError(w, 400, fmt.Sprintln("This host is not registered!", err))
return return
// route = &Route{}
} }
golog.Infoln("ROUTE:", route) golog.Infoln("ROUTE:", route)
route.Destinations = append(route.Destinations, r2.Dest) route.Destinations = append(route.Destinations, r2.Dest)
@@ -327,8 +327,6 @@ func (wh *WorkerHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
common.SendError(w, 400, fmt.Sprintln("Could not register host!", err)) common.SendError(w, 400, fmt.Sprintln("Could not register host!", err))
return return
} }
// routingTable[r2.Host] = route
// fmt.Println("New routing table:", routingTable)
fmt.Fprintln(w, "Worker added") fmt.Fprintln(w, "Worker added")
} }
} }
@@ -347,7 +345,7 @@ func getRoute(host string) (*Route, error) {
return &route, err return &route, err
} }
func putRoute(route *Route) (error) { func putRoute(route *Route) error {
item := cache.Item{} item := cache.Item{}
v, err := json.Marshal(route) v, err := json.Marshal(route)
if err != nil { if err != nil {