diff --git a/api/server/config.go b/api/models/config.go similarity index 82% rename from api/server/config.go rename to api/models/config.go index d4ceb87c2..9bd1f7e21 100644 --- a/api/server/config.go +++ b/api/models/config.go @@ -1,7 +1,8 @@ -package server +package models type Config struct { DatabaseURL string `json:"db"` + API string `json:"api"` Logging struct { To string `json:"to"` Level string `json:"level"` @@ -10,6 +11,5 @@ type Config struct { } func (c *Config) Validate() error { - // TODO: return nil } diff --git a/api/models/datastore.go b/api/models/datastore.go index abc269e39..3481ca86f 100644 --- a/api/models/datastore.go +++ b/api/models/datastore.go @@ -17,5 +17,6 @@ func ApplyAppFilter(app *App, filter *AppFilter) bool { } func ApplyRouteFilter(route *Route, filter *RouteFilter) bool { - return true + return (filter.Path != "" && route.Path == filter.Path) && + (filter.AppName != "" && route.AppName == filter.AppName) } diff --git a/api/models/route.go b/api/models/route.go index 46b2b1020..501426671 100644 --- a/api/models/route.go +++ b/api/models/route.go @@ -54,5 +54,6 @@ func (r *Route) Validate() error { } type RouteFilter struct { + Path string AppName string } diff --git a/api/models/runner.go b/api/models/runner.go new file mode 100644 index 000000000..0d51734e6 --- /dev/null +++ b/api/models/runner.go @@ -0,0 +1,13 @@ +package models + +import "errors" + +var ( + ErrRunnerRouteNotFound = errors.New("Route not found on that application") + ErrRunnerInvalidPayload = errors.New("Invalid payload") + ErrRunnerRunRoute = errors.New("Couldn't run this route in the job server") + ErrRunnerAPICantConnect = errors.New("Couldn`t connect to the job server API") + ErrRunnerAPICreateJob = errors.New("Could not create a job in job server") + ErrRunnerInvalidResponse = errors.New("Invalid response") + ErrRunnerTimeout = errors.New("Timed out") +) diff --git a/api/runner/runner.go b/api/runner/runner.go index c72741dfe..6d833c768 100644 --- a/api/runner/runner.go +++ b/api/runner/runner.go @@ -1,208 +1,136 @@ package runner -import ( - "bufio" - "bytes" - "errors" - "fmt" - "io" - "io/ioutil" - "log" - "math/rand" - "net/http" - "os" - "os/exec" - "strings" +import "github.com/iron-io/functions/api/models" +import "time" - "github.com/Sirupsen/logrus" - "github.com/gin-gonic/gin" - "github.com/iron-io/functions/api/models" -) - -type RunningApp struct { - Route *models.Route - Port int - ContainerName string -} - -var ( - ErrRunnerRouteNotFound = errors.New("Route not found on that application") -) - -var runningImages map[string]*RunningApp - -func init() { - runningImages = make(map[string]*RunningApp) - fmt.Println("ENV:", os.Environ()) -} - -func Run(c *gin.Context) error { - log := c.MustGet("log").(logrus.FieldLogger) - store := c.MustGet("store").(models.Datastore) - - appName := c.Param("app") - - if appName == "" { - host := strings.Split(c.Request.Host, ":")[0] - appName = strings.Split(host, ".")[0] - } - - filter := &models.RouteFilter{ - AppName: appName, - } - - routes, err := store.GetRoutes(filter) - if err != nil { - return err - } - - route := c.Param("route") - - log.WithFields(logrus.Fields{"app": appName}).Debug("Running app") - - for _, el := range routes { - if el.Path == route { - err = checkAndPull(el.Image) - if err != nil { - return err - } - if el.Type == "app" { - return DockerHost(el, c) - } else { - return DockerRun(el, c) - } - } - } - - return ErrRunnerRouteNotFound +type RouteRunner struct { + Route *models.Route + Endpoint string + Payload string + Timeout time.Duration } // TODO: use Docker utils from docker-job for this and a few others in here -func DockerRun(route *models.Route, c *gin.Context) error { - image := route.Image - payload, err := ioutil.ReadAll(c.Request.Body) - if err != nil { - return err - } - // log.WithField("payload", "---"+string(payload)+"---").Infoln("incoming request") - // log.WithField("image", image).Infoln("About to run using this image") +// func DockerRun(route *models.Route, c *gin.Context) error { +// image := route.Image +// payload := c.Value("payload").(string) - for k, v := range route.Headers { - c.Header(k, v[0]) - } +// for k, v := range route.Headers { +// c.Header(k, v[0]) +// } - // TODO: swap all this out with Titan's running via API - cmd := exec.Command("docker", "run", "--rm", "-i", "-e", fmt.Sprintf("PAYLOAD=%v", string(payload)), image) - stdout, err := cmd.StdoutPipe() - if err != nil { - log.Fatal(err) - } - stderr, err := cmd.StderrPipe() - if err != nil { - log.Fatal(err) - } - if err := cmd.Start(); err != nil { - log.Fatal(err) - } - var b bytes.Buffer - buff := bufio.NewWriter(&b) +// // TODO: swap all this out with Titan's running via API +// cmd := exec.Command("docker", "run", "--rm", "-i", "-e", fmt.Sprintf("PAYLOAD=%v", payload), image) +// stdout, err := cmd.StdoutPipe() +// if err != nil { +// log.Fatal(err) +// } +// stderr, err := cmd.StderrPipe() +// if err != nil { +// log.Fatal(err) +// } +// if err := cmd.Start(); err != nil { +// log.Fatal(err) +// } +// var b bytes.Buffer +// buff := bufio.NewWriter(&b) - go io.Copy(buff, stdout) - go io.Copy(buff, stderr) +// go io.Copy(buff, stdout) +// go io.Copy(buff, stderr) - log.Printf("Waiting for command to finish...") - if err = cmd.Wait(); err != nil { - // job failed - // log.Infoln("job finished with err:", err) - // log.WithFields(log.Fields{"metric": "run.errors", "value": 1, "type": "count"}).Infoln("failed run") - return err - // TODO: wrap error in json "error": buff - } +// log.Printf("Waiting for command to finish...") +// if err = cmd.Wait(); err != nil { +// // job failed +// // log.Infoln("job finished with err:", err) +// // log.WithFields(log.Fields{"metric": "run.errors", "value": 1, "type": "count"}).Infoln("failed run") +// return err +// // TODO: wrap error in json "error": buff +// } - // log.Infoln("Docker ran successfully:", b.String()) - // print - // log.WithFields(log.Fields{"metric": "run.success", "value": 1, "type": "count"}).Infoln("successful run") - // log.WithFields(log.Fields{"metric": "run", "value": 1, "type": "count"}).Infoln("job ran") - buff.Flush() +// // log.Infoln("Docker ran successfully:", b.String()) +// // print +// // log.WithFields(log.Fields{"metric": "run.success", "value": 1, "type": "count"}).Infoln("successful run") +// // log.WithFields(log.Fields{"metric": "run", "value": 1, "type": "count"}).Infoln("job ran") +// buff.Flush() - c.Data(http.StatusOK, "", bytes.Trim(b.Bytes(), "\x00")) +// c.Data(http.StatusOK, "", bytes.Trim(b.Bytes(), "\x00")) - return nil -} +// return nil +// } -func DockerHost(el *models.Route, c *gin.Context) error { - ra := runningImages[el.Image] - if ra == nil { - ra = &RunningApp{} - ra.Route = el - ra.Port = rand.Intn(9999-9000) + 9000 - ra.ContainerName = fmt.Sprintf("c_%v", rand.Intn(10000)) - runningImages[el.Image] = ra - // TODO: timeout 59 minutes. Mark it in ra as terminated. - cmd := exec.Command("docker", "run", "--name", ra.ContainerName, "--rm", "-i", "-p", fmt.Sprintf("%v:8080", ra.Port), el.Image) - // TODO: What should we do with the output here? Store it? Send it to a log service? - // cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - // TODO: Need to catch interrupt and stop all containers that are started, see devo/dj for how to do this - if err := cmd.Start(); err != nil { - return err - // TODO: What if the app fails to start? Don't want to keep starting the container - } - } else { - // TODO: check if it's still running? - // TODO: if ra.terminated, then start new container? - } - fmt.Println("RunningApp:", ra) - // TODO: if connection fails, check if container still running? If not, start it again - resp, err := http.Get(fmt.Sprintf("http://0.0.0.0:%v%v", ra.Port, el.ContainerPath)) - if err != nil { - return err - } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return err - } +// func DockerHost(el *models.Route, c *gin.Context) error { +// ra := runningImages[el.Image] +// if ra == nil { +// ra = &RunningApp{} +// ra.Route = el +// ra.Port = rand.Intn(9999-9000) + 9000 +// ra.ContainerName = fmt.Sprintf("c_%v", rand.Intn(10000)) +// runningImages[el.Image] = ra +// // TODO: timeout 59 minutes. Mark it in ra as terminated. +// cmd := exec.Command("docker", "run", "--name", ra.ContainerName, "--rm", "-i", "-p", fmt.Sprintf("%v:8080", ra.Port), el.Image) +// // TODO: What should we do with the output here? Store it? Send it to a log service? +// // cmd.Stdout = os.Stdout +// cmd.Stderr = os.Stderr +// // TODO: Need to catch interrupt and stop all containers that are started, see devo/dj for how to do this +// if err := cmd.Start(); err != nil { +// return err +// // TODO: What if the app fails to start? Don't want to keep starting the container +// } +// } else { +// // TODO: check if it's still running? +// // TODO: if ra.terminated, then start new container? +// } +// fmt.Println("RunningApp:", ra) +// // TODO: if connection fails, check if container still running? If not, start it again +// resp, err := http.Get(fmt.Sprintf("http://0.0.0.0:%v%v", ra.Port, el.ContainerPath)) +// if err != nil { +// return err +// } +// defer resp.Body.Close() +// body, err := ioutil.ReadAll(resp.Body) +// if err != nil { +// return err +// } - c.Data(http.StatusOK, "", body) - return nil -} +// c.Data(http.StatusOK, "", body) +// return nil +// } -func checkAndPull(image string) error { - err := execAndPrint("docker", []string{"inspect", image}) - if err != nil { - // image does not exist, so let's pull - fmt.Println("Image not found locally, will pull.", err) - err = execAndPrint("docker", []string{"pull", image}) - } - return err -} +// func checkAndPull(image string) error { +// err := execAndPrint("docker", []string{"inspect", image}) +// if err != nil { +// // image does not exist, so let's pull +// fmt.Println("Image not found locally, will pull.", err) +// err = execAndPrint("docker", []string{"pull", image}) +// } +// return err +// } -func execAndPrint(cmdstr string, args []string) error { - var bout bytes.Buffer - buffout := bufio.NewWriter(&bout) - var berr bytes.Buffer - bufferr := bufio.NewWriter(&berr) - cmd := exec.Command(cmdstr, args...) - stdout, err := cmd.StdoutPipe() - if err != nil { - return err - } - stderr, err := cmd.StderrPipe() - if err != nil { - return err - } - if err := cmd.Start(); err != nil { - return err - } - go io.Copy(buffout, stdout) - go io.Copy(bufferr, stderr) +// func execAndPrint(cmdstr string, args []string) error { +// var bout bytes.Buffer +// buffout := bufio.NewWriter(&bout) +// var berr bytes.Buffer +// bufferr := bufio.NewWriter(&berr) +// cmd := exec.Command(cmdstr, args...) +// stdout, err := cmd.StdoutPipe() +// if err != nil { +// return err +// } +// stderr, err := cmd.StderrPipe() +// if err != nil { +// return err +// } +// if err := cmd.Start(); err != nil { +// return err +// } +// go io.Copy(buffout, stdout) +// go io.Copy(bufferr, stderr) - log.Printf("Waiting for cmd to finish...") - err = cmd.Wait() - if berr.Len() != 0 { - fmt.Println("stderr:", berr.String()) - } - fmt.Println("stdout:", bout.String()) - return err -} +// log.Printf("Waiting for cmd to finish...") +// err = cmd.Wait() +// if berr.Len() != 0 { +// fmt.Println("stderr:", berr.String()) +// } +// fmt.Println("stdout:", bout.String()) +// return err +// } diff --git a/api/runner/titan.go b/api/runner/titan.go new file mode 100644 index 000000000..30a7583a5 --- /dev/null +++ b/api/runner/titan.go @@ -0,0 +1,117 @@ +package runner + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "time" + + "github.com/iron-io/functions/api/models" + tmodels "github.com/iron-io/titan/jobserver/models" +) + +type TitanJob struct { + runner *RouteRunner + resultChan chan error + result []byte +} + +var versionPath = "/v1" + +func CreateTitanJob(runner *RouteRunner) *TitanJob { + t := &TitanJob{ + runner: runner, + resultChan: make(chan error), + } + + go t.Start() + + return t +} + +func (t *TitanJob) Start() { + newjob := tmodels.JobsWrapper{ + Jobs: []*tmodels.Job{ + &tmodels.Job{ + NewJob: tmodels.NewJob{ + Image: &t.runner.Route.Image, + Payload: t.runner.Payload, + }, + }, + }, + } + + jobJSON, err := json.Marshal(newjob) + if err != nil { + t.resultChan <- models.ErrInvalidJSON + return + } + + resp, err := t.titanPOST(fmt.Sprintf("/groups/app-%s/jobs", t.runner.Route.AppName), bytes.NewBuffer(jobJSON)) + if err != nil { + t.resultChan <- models.ErrRunnerAPICantConnect + return + } + + var resultJobs tmodels.JobsWrapper + respBody, err := ioutil.ReadAll(resp.Body) + err = json.Unmarshal(respBody, &resultJobs) + if err != nil { + t.resultChan <- models.ErrInvalidJSON + return + } + + if resultJobs.Jobs == nil { + t.resultChan <- models.ErrRunnerAPICreateJob + return + } + + job := resultJobs.Jobs[0] + begin := time.Now() + for len(t.result) == 0 { + if time.Since(begin) > t.runner.Timeout { + t.resultChan <- models.ErrRunnerTimeout + return + } + + resp, err := t.titanGET(fmt.Sprintf("/groups/app-%s/jobs/%s/log", t.runner.Route.AppName, job.ID)) + if err == nil { + fmt.Println(resp.Status) + if resp.StatusCode == http.StatusOK { + resBody, err := ioutil.ReadAll(resp.Body) + fmt.Println(string(resBody)) + if err != nil { + t.resultChan <- models.ErrRunnerInvalidResponse + return + } + + t.result = resBody + continue + } + } + time.Sleep(100 * time.Millisecond) + } + + t.resultChan <- nil +} + +func (t *TitanJob) Wait() error { + return <-t.resultChan +} + +func (t TitanJob) Result() []byte { + return t.result +} + +func (t TitanJob) titanPOST(path string, body io.Reader) (*http.Response, error) { + fmt.Println(fmt.Sprintf("%s%s%s", t.runner.Endpoint, versionPath, path)) + return http.Post(fmt.Sprintf("%s%s%s", t.runner.Endpoint, versionPath, path), "application/json", body) +} + +func (t TitanJob) titanGET(path string) (*http.Response, error) { + fmt.Println(fmt.Sprintf("%s%s%s", t.runner.Endpoint, versionPath, path)) + return http.Get(fmt.Sprintf("%s%s%s", t.runner.Endpoint, versionPath, path)) +} diff --git a/api/server/datastore/postgres/postgres.go b/api/server/datastore/postgres/postgres.go index b87637608..8adf6d52e 100644 --- a/api/server/datastore/postgres/postgres.go +++ b/api/server/datastore/postgres/postgres.go @@ -266,6 +266,10 @@ func buildFilterQuery(filter *models.RouteFilter) string { filterQuery := "" filterQueries := []string{} + if filter.Path != "" { + filterQueries = append(filterQueries, fmt.Sprintf("path = '%s'", filter.Path)) + } + if filter.AppName != "" { filterQueries = append(filterQueries, fmt.Sprintf("app_name = '%s'", filter.AppName)) } diff --git a/api/server/router/router.go b/api/server/router/router.go index 4eac85f0c..acf7129d1 100644 --- a/api/server/router/router.go +++ b/api/server/router/router.go @@ -29,7 +29,8 @@ func Start(engine *gin.Engine) { } - engine.GET("/r/:app/*route", handleRunner) + engine.Any("/r/:app/*route", handleRunner) + engine.NoRoute(handleRunner) } func simpleError(err error) *models.Error { diff --git a/api/server/router/runner.go b/api/server/router/runner.go index 438dbe91e..abd8b9088 100644 --- a/api/server/router/runner.go +++ b/api/server/router/runner.go @@ -1,19 +1,84 @@ package router import ( + "bytes" + "io/ioutil" "net/http" + "strings" + "time" "github.com/Sirupsen/logrus" "github.com/gin-gonic/gin" + "github.com/iron-io/functions/api/models" "github.com/iron-io/functions/api/runner" ) func handleRunner(c *gin.Context) { log := c.MustGet("log").(logrus.FieldLogger) + store := c.MustGet("store").(models.Datastore) + config := c.MustGet("config").(*models.Config) - err := runner.Run(c) - if err != nil { - log.Debug(err) - c.JSON(http.StatusInternalServerError, simpleError(err)) + var err error + + var payload []byte + if c.Request.Method == "POST" || c.Request.Method == "PUT" { + payload, err = ioutil.ReadAll(c.Request.Body) + } else if c.Request.Method == "GET" { + qPL := c.Request.URL.Query()["payload"] + if len(qPL) > 0 { + payload = []byte(qPL[0]) + } } + + log.WithField("payload", string(payload)).Debug("Got payload") + + appName := c.Param("app") + if appName == "" { + host := strings.Split(c.Request.Header.Get("Host"), ":")[0] + appName = strings.Split(host, ".")[0] + } + + route := c.Param("route") + if route == "" { + route = c.Request.URL.Path + } + + filter := &models.RouteFilter{ + Path: route, + AppName: appName, + } + + log.WithFields(logrus.Fields{"app": appName, "path": route}).Debug("Finding route on datastore") + + routes, err := store.GetRoutes(filter) + if err != nil { + log.WithError(err).Error(models.ErrRoutesList) + c.JSON(http.StatusInternalServerError, simpleError(models.ErrRoutesList)) + } + + log.WithField("routes", routes).Debug("Got routes from datastore") + + for _, el := range routes { + if el.Path == route { + titanJob := runner.CreateTitanJob(&runner.RouteRunner{ + Route: el, + Endpoint: config.API, + Payload: string(payload), + Timeout: 30 * time.Second, + }) + + if err := titanJob.Wait(); err != nil { + log.WithError(err).Error(models.ErrRunnerRunRoute) + c.JSON(http.StatusInternalServerError, simpleError(models.ErrRunnerRunRoute)) + } else { + for k, v := range el.Headers { + c.Header(k, v[0]) + } + + c.Data(http.StatusOK, "", bytes.Trim(titanJob.Result(), "\x00")) + } + return + } + } + } diff --git a/api/server/server.go b/api/server/server.go index dcd75e02b..4c15cd018 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -7,16 +7,17 @@ import ( "github.com/Sirupsen/logrus" "github.com/gin-gonic/gin" + "github.com/iron-io/functions/api/models" "github.com/iron-io/functions/api/server/datastore" "github.com/iron-io/functions/api/server/router" ) type Server struct { router *gin.Engine - cfg *Config + cfg *models.Config } -func New(config *Config) *Server { +func New(config *models.Config) *Server { return &Server{ router: gin.Default(), cfg: config, @@ -37,6 +38,10 @@ func (s *Server) Start() { s.cfg.DatabaseURL = fmt.Sprintf("bolt://%s/bolt.db?bucket=funcs", cwd) } + if s.cfg.API == "" { + s.cfg.API = "http://localhost:8080" + } + ds, err := datastore.New(s.cfg.DatabaseURL) if err != nil { logrus.WithError(err).Fatalln("Invalid DB url.") @@ -46,6 +51,7 @@ func (s *Server) Start() { logrus.SetLevel(logrus.DebugLevel) s.router.Use(func(c *gin.Context) { + c.Set("config", s.cfg) c.Set("store", ds) c.Set("log", logrus.WithFields(extractFields(c))) c.Next() diff --git a/balancer/balance.go b/balancer/balance.go deleted file mode 100644 index 7c189a106..000000000 --- a/balancer/balance.go +++ /dev/null @@ -1,168 +0,0 @@ -// Copyright 2010 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package main - -import ( - "container/heap" - "flag" - "fmt" - "math/rand" - "time" -) - -const nRequester = 100 -const nWorker = 10 - -var roundRobin = flag.Bool("r", false, "use round-robin scheduling") - -// Simulation of some work: just sleep for a while and report how long. -func op() int { - n := rand.Int63n(5) - time.Sleep(time.Duration(n) * time.Second) - return int(n) -} - -type Request struct { - fn func() int - c chan int -} - -func requester(work chan Request) { - c := make(chan int) - for { - time.Sleep(time.Duration(rand.Int63n(nWorker)) * time.Second) - work <- Request{op, c} - <-c - } -} - -type Worker struct { - i int - requests chan Request - pending int -} - -func (w *Worker) work(done chan *Worker) { - for { - req := <-w.requests - req.c <- req.fn() - done <- w - } -} - -type Pool []*Worker - -func (p Pool) Len() int { return len(p) } - -func (p Pool) Less(i, j int) bool { - return p[i].pending < p[j].pending -} - -func (p *Pool) Swap(i, j int) { - a := *p - a[i], a[j] = a[j], a[i] - a[i].i = i - a[j].i = j -} - -func (p *Pool) Push(x interface{}) { - a := *p - n := len(a) - a = a[0 : n+1] - w := x.(*Worker) - a[n] = w - w.i = n - *p = a -} - -func (p *Pool) Pop() interface{} { - a := *p - *p = a[0 : len(a)-1] - w := a[len(a)-1] - w.i = -1 // for safety - return w -} - -type Balancer struct { - pool Pool - done chan *Worker - i int -} - -func NewBalancer() *Balancer { - done := make(chan *Worker, nWorker) - b := &Balancer{make(Pool, 0, nWorker), done, 0} - for i := 0; i < nWorker; i++ { - w := &Worker{requests: make(chan Request, nRequester)} - heap.Push(&b.pool, w) - go w.work(b.done) - } - return b -} - -func (b *Balancer) balance(work chan Request) { - for { - select { - case req := <-work: - b.dispatch(req) - case w := <-b.done: - b.completed(w) - } - b.print() - } -} - -func (b *Balancer) print() { - sum := 0 - sumsq := 0 - for _, w := range b.pool { - fmt.Printf("%d ", w.pending) - sum += w.pending - sumsq += w.pending * w.pending - } - avg := float64(sum) / float64(len(b.pool)) - variance := float64(sumsq)/float64(len(b.pool)) - avg*avg - fmt.Printf(" %.2f %.2f\n", avg, variance) -} - -func (b *Balancer) dispatch(req Request) { - if *roundRobin { - w := b.pool[b.i] - w.requests <- req - w.pending++ - b.i++ - if b.i >= len(b.pool) { - b.i = 0 - } - return - } - - w := heap.Pop(&b.pool).(*Worker) - w.requests <- req - w.pending++ - // fmt.Printf("started %p; now %d\n", w, w.pending) - heap.Push(&b.pool, w) -} - -func (b *Balancer) completed(w *Worker) { - if *roundRobin { - w.pending-- - return - } - - w.pending-- - // fmt.Printf("finished %p; now %d\n", w, w.pending) - heap.Remove(&b.pool, w.i) - heap.Push(&b.pool, w) -} - -func main() { - flag.Parse() - work := make(chan Request) - for i := 0; i < nRequester; i++ { - go requester(work) - } - NewBalancer().balance(work) -} diff --git a/main.go b/main.go index 8bcb920a1..411514290 100644 --- a/main.go +++ b/main.go @@ -11,12 +11,14 @@ import ( "os" log "github.com/Sirupsen/logrus" + "github.com/iron-io/functions/api/models" "github.com/iron-io/functions/api/server" ) func main() { - config := &server.Config{} + config := &models.Config{} config.DatabaseURL = os.Getenv("DB") + config.API = os.Getenv("API") err := config.Validate() if err != nil {