refactor runner using titan

This commit is contained in:
Pedro Nasser
2016-07-24 17:46:08 -03:00
parent a8e984f834
commit 14cc57fd9c
12 changed files with 339 additions and 369 deletions

View File

@@ -1,7 +1,8 @@
package server
package models
type Config struct {
DatabaseURL string `json:"db"`
API string `json:"api"`
Logging struct {
To string `json:"to"`
Level string `json:"level"`
@@ -10,6 +11,5 @@ type Config struct {
}
func (c *Config) Validate() error {
// TODO:
return nil
}

View File

@@ -17,5 +17,6 @@ func ApplyAppFilter(app *App, filter *AppFilter) bool {
}
func ApplyRouteFilter(route *Route, filter *RouteFilter) bool {
return true
return (filter.Path != "" && route.Path == filter.Path) &&
(filter.AppName != "" && route.AppName == filter.AppName)
}

View File

@@ -54,5 +54,6 @@ func (r *Route) Validate() error {
}
type RouteFilter struct {
Path string
AppName string
}

13
api/models/runner.go Normal file
View File

@@ -0,0 +1,13 @@
package models
import "errors"
var (
ErrRunnerRouteNotFound = errors.New("Route not found on that application")
ErrRunnerInvalidPayload = errors.New("Invalid payload")
ErrRunnerRunRoute = errors.New("Couldn't run this route in the job server")
ErrRunnerAPICantConnect = errors.New("Couldn`t connect to the job server API")
ErrRunnerAPICreateJob = errors.New("Could not create a job in job server")
ErrRunnerInvalidResponse = errors.New("Invalid response")
ErrRunnerTimeout = errors.New("Timed out")
)

View File

@@ -1,208 +1,136 @@
package runner
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"math/rand"
"net/http"
"os"
"os/exec"
"strings"
import "github.com/iron-io/functions/api/models"
import "time"
"github.com/Sirupsen/logrus"
"github.com/gin-gonic/gin"
"github.com/iron-io/functions/api/models"
)
type RunningApp struct {
type RouteRunner struct {
Route *models.Route
Port int
ContainerName string
}
var (
ErrRunnerRouteNotFound = errors.New("Route not found on that application")
)
var runningImages map[string]*RunningApp
func init() {
runningImages = make(map[string]*RunningApp)
fmt.Println("ENV:", os.Environ())
}
func Run(c *gin.Context) error {
log := c.MustGet("log").(logrus.FieldLogger)
store := c.MustGet("store").(models.Datastore)
appName := c.Param("app")
if appName == "" {
host := strings.Split(c.Request.Host, ":")[0]
appName = strings.Split(host, ".")[0]
}
filter := &models.RouteFilter{
AppName: appName,
}
routes, err := store.GetRoutes(filter)
if err != nil {
return err
}
route := c.Param("route")
log.WithFields(logrus.Fields{"app": appName}).Debug("Running app")
for _, el := range routes {
if el.Path == route {
err = checkAndPull(el.Image)
if err != nil {
return err
}
if el.Type == "app" {
return DockerHost(el, c)
} else {
return DockerRun(el, c)
}
}
}
return ErrRunnerRouteNotFound
Endpoint string
Payload string
Timeout time.Duration
}
// TODO: use Docker utils from docker-job for this and a few others in here
func DockerRun(route *models.Route, c *gin.Context) error {
image := route.Image
payload, err := ioutil.ReadAll(c.Request.Body)
if err != nil {
return err
}
// log.WithField("payload", "---"+string(payload)+"---").Infoln("incoming request")
// log.WithField("image", image).Infoln("About to run using this image")
// func DockerRun(route *models.Route, c *gin.Context) error {
// image := route.Image
// payload := c.Value("payload").(string)
for k, v := range route.Headers {
c.Header(k, v[0])
}
// for k, v := range route.Headers {
// c.Header(k, v[0])
// }
// TODO: swap all this out with Titan's running via API
cmd := exec.Command("docker", "run", "--rm", "-i", "-e", fmt.Sprintf("PAYLOAD=%v", string(payload)), image)
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Fatal(err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
log.Fatal(err)
}
if err := cmd.Start(); err != nil {
log.Fatal(err)
}
var b bytes.Buffer
buff := bufio.NewWriter(&b)
// // TODO: swap all this out with Titan's running via API
// cmd := exec.Command("docker", "run", "--rm", "-i", "-e", fmt.Sprintf("PAYLOAD=%v", payload), image)
// stdout, err := cmd.StdoutPipe()
// if err != nil {
// log.Fatal(err)
// }
// stderr, err := cmd.StderrPipe()
// if err != nil {
// log.Fatal(err)
// }
// if err := cmd.Start(); err != nil {
// log.Fatal(err)
// }
// var b bytes.Buffer
// buff := bufio.NewWriter(&b)
go io.Copy(buff, stdout)
go io.Copy(buff, stderr)
// go io.Copy(buff, stdout)
// go io.Copy(buff, stderr)
log.Printf("Waiting for command to finish...")
if err = cmd.Wait(); err != nil {
// job failed
// log.Infoln("job finished with err:", err)
// log.WithFields(log.Fields{"metric": "run.errors", "value": 1, "type": "count"}).Infoln("failed run")
return err
// TODO: wrap error in json "error": buff
}
// log.Printf("Waiting for command to finish...")
// if err = cmd.Wait(); err != nil {
// // job failed
// // log.Infoln("job finished with err:", err)
// // log.WithFields(log.Fields{"metric": "run.errors", "value": 1, "type": "count"}).Infoln("failed run")
// return err
// // TODO: wrap error in json "error": buff
// }
// log.Infoln("Docker ran successfully:", b.String())
// print
// log.WithFields(log.Fields{"metric": "run.success", "value": 1, "type": "count"}).Infoln("successful run")
// log.WithFields(log.Fields{"metric": "run", "value": 1, "type": "count"}).Infoln("job ran")
buff.Flush()
// // log.Infoln("Docker ran successfully:", b.String())
// // print
// // log.WithFields(log.Fields{"metric": "run.success", "value": 1, "type": "count"}).Infoln("successful run")
// // log.WithFields(log.Fields{"metric": "run", "value": 1, "type": "count"}).Infoln("job ran")
// buff.Flush()
c.Data(http.StatusOK, "", bytes.Trim(b.Bytes(), "\x00"))
// c.Data(http.StatusOK, "", bytes.Trim(b.Bytes(), "\x00"))
return nil
}
// return nil
// }
func DockerHost(el *models.Route, c *gin.Context) error {
ra := runningImages[el.Image]
if ra == nil {
ra = &RunningApp{}
ra.Route = el
ra.Port = rand.Intn(9999-9000) + 9000
ra.ContainerName = fmt.Sprintf("c_%v", rand.Intn(10000))
runningImages[el.Image] = ra
// TODO: timeout 59 minutes. Mark it in ra as terminated.
cmd := exec.Command("docker", "run", "--name", ra.ContainerName, "--rm", "-i", "-p", fmt.Sprintf("%v:8080", ra.Port), el.Image)
// TODO: What should we do with the output here? Store it? Send it to a log service?
// cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
// TODO: Need to catch interrupt and stop all containers that are started, see devo/dj for how to do this
if err := cmd.Start(); err != nil {
return err
// TODO: What if the app fails to start? Don't want to keep starting the container
}
} else {
// TODO: check if it's still running?
// TODO: if ra.terminated, then start new container?
}
fmt.Println("RunningApp:", ra)
// TODO: if connection fails, check if container still running? If not, start it again
resp, err := http.Get(fmt.Sprintf("http://0.0.0.0:%v%v", ra.Port, el.ContainerPath))
if err != nil {
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
// func DockerHost(el *models.Route, c *gin.Context) error {
// ra := runningImages[el.Image]
// if ra == nil {
// ra = &RunningApp{}
// ra.Route = el
// ra.Port = rand.Intn(9999-9000) + 9000
// ra.ContainerName = fmt.Sprintf("c_%v", rand.Intn(10000))
// runningImages[el.Image] = ra
// // TODO: timeout 59 minutes. Mark it in ra as terminated.
// cmd := exec.Command("docker", "run", "--name", ra.ContainerName, "--rm", "-i", "-p", fmt.Sprintf("%v:8080", ra.Port), el.Image)
// // TODO: What should we do with the output here? Store it? Send it to a log service?
// // cmd.Stdout = os.Stdout
// cmd.Stderr = os.Stderr
// // TODO: Need to catch interrupt and stop all containers that are started, see devo/dj for how to do this
// if err := cmd.Start(); err != nil {
// return err
// // TODO: What if the app fails to start? Don't want to keep starting the container
// }
// } else {
// // TODO: check if it's still running?
// // TODO: if ra.terminated, then start new container?
// }
// fmt.Println("RunningApp:", ra)
// // TODO: if connection fails, check if container still running? If not, start it again
// resp, err := http.Get(fmt.Sprintf("http://0.0.0.0:%v%v", ra.Port, el.ContainerPath))
// if err != nil {
// return err
// }
// defer resp.Body.Close()
// body, err := ioutil.ReadAll(resp.Body)
// if err != nil {
// return err
// }
c.Data(http.StatusOK, "", body)
return nil
}
// c.Data(http.StatusOK, "", body)
// return nil
// }
func checkAndPull(image string) error {
err := execAndPrint("docker", []string{"inspect", image})
if err != nil {
// image does not exist, so let's pull
fmt.Println("Image not found locally, will pull.", err)
err = execAndPrint("docker", []string{"pull", image})
}
return err
}
// func checkAndPull(image string) error {
// err := execAndPrint("docker", []string{"inspect", image})
// if err != nil {
// // image does not exist, so let's pull
// fmt.Println("Image not found locally, will pull.", err)
// err = execAndPrint("docker", []string{"pull", image})
// }
// return err
// }
func execAndPrint(cmdstr string, args []string) error {
var bout bytes.Buffer
buffout := bufio.NewWriter(&bout)
var berr bytes.Buffer
bufferr := bufio.NewWriter(&berr)
cmd := exec.Command(cmdstr, args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
stderr, err := cmd.StderrPipe()
if err != nil {
return err
}
if err := cmd.Start(); err != nil {
return err
}
go io.Copy(buffout, stdout)
go io.Copy(bufferr, stderr)
// func execAndPrint(cmdstr string, args []string) error {
// var bout bytes.Buffer
// buffout := bufio.NewWriter(&bout)
// var berr bytes.Buffer
// bufferr := bufio.NewWriter(&berr)
// cmd := exec.Command(cmdstr, args...)
// stdout, err := cmd.StdoutPipe()
// if err != nil {
// return err
// }
// stderr, err := cmd.StderrPipe()
// if err != nil {
// return err
// }
// if err := cmd.Start(); err != nil {
// return err
// }
// go io.Copy(buffout, stdout)
// go io.Copy(bufferr, stderr)
log.Printf("Waiting for cmd to finish...")
err = cmd.Wait()
if berr.Len() != 0 {
fmt.Println("stderr:", berr.String())
}
fmt.Println("stdout:", bout.String())
return err
}
// log.Printf("Waiting for cmd to finish...")
// err = cmd.Wait()
// if berr.Len() != 0 {
// fmt.Println("stderr:", berr.String())
// }
// fmt.Println("stdout:", bout.String())
// return err
// }

117
api/runner/titan.go Normal file
View File

@@ -0,0 +1,117 @@
package runner
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"
"github.com/iron-io/functions/api/models"
tmodels "github.com/iron-io/titan/jobserver/models"
)
type TitanJob struct {
runner *RouteRunner
resultChan chan error
result []byte
}
var versionPath = "/v1"
func CreateTitanJob(runner *RouteRunner) *TitanJob {
t := &TitanJob{
runner: runner,
resultChan: make(chan error),
}
go t.Start()
return t
}
func (t *TitanJob) Start() {
newjob := tmodels.JobsWrapper{
Jobs: []*tmodels.Job{
&tmodels.Job{
NewJob: tmodels.NewJob{
Image: &t.runner.Route.Image,
Payload: t.runner.Payload,
},
},
},
}
jobJSON, err := json.Marshal(newjob)
if err != nil {
t.resultChan <- models.ErrInvalidJSON
return
}
resp, err := t.titanPOST(fmt.Sprintf("/groups/app-%s/jobs", t.runner.Route.AppName), bytes.NewBuffer(jobJSON))
if err != nil {
t.resultChan <- models.ErrRunnerAPICantConnect
return
}
var resultJobs tmodels.JobsWrapper
respBody, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(respBody, &resultJobs)
if err != nil {
t.resultChan <- models.ErrInvalidJSON
return
}
if resultJobs.Jobs == nil {
t.resultChan <- models.ErrRunnerAPICreateJob
return
}
job := resultJobs.Jobs[0]
begin := time.Now()
for len(t.result) == 0 {
if time.Since(begin) > t.runner.Timeout {
t.resultChan <- models.ErrRunnerTimeout
return
}
resp, err := t.titanGET(fmt.Sprintf("/groups/app-%s/jobs/%s/log", t.runner.Route.AppName, job.ID))
if err == nil {
fmt.Println(resp.Status)
if resp.StatusCode == http.StatusOK {
resBody, err := ioutil.ReadAll(resp.Body)
fmt.Println(string(resBody))
if err != nil {
t.resultChan <- models.ErrRunnerInvalidResponse
return
}
t.result = resBody
continue
}
}
time.Sleep(100 * time.Millisecond)
}
t.resultChan <- nil
}
func (t *TitanJob) Wait() error {
return <-t.resultChan
}
func (t TitanJob) Result() []byte {
return t.result
}
func (t TitanJob) titanPOST(path string, body io.Reader) (*http.Response, error) {
fmt.Println(fmt.Sprintf("%s%s%s", t.runner.Endpoint, versionPath, path))
return http.Post(fmt.Sprintf("%s%s%s", t.runner.Endpoint, versionPath, path), "application/json", body)
}
func (t TitanJob) titanGET(path string) (*http.Response, error) {
fmt.Println(fmt.Sprintf("%s%s%s", t.runner.Endpoint, versionPath, path))
return http.Get(fmt.Sprintf("%s%s%s", t.runner.Endpoint, versionPath, path))
}

View File

@@ -266,6 +266,10 @@ func buildFilterQuery(filter *models.RouteFilter) string {
filterQuery := ""
filterQueries := []string{}
if filter.Path != "" {
filterQueries = append(filterQueries, fmt.Sprintf("path = '%s'", filter.Path))
}
if filter.AppName != "" {
filterQueries = append(filterQueries, fmt.Sprintf("app_name = '%s'", filter.AppName))
}

View File

@@ -29,7 +29,8 @@ func Start(engine *gin.Engine) {
}
engine.GET("/r/:app/*route", handleRunner)
engine.Any("/r/:app/*route", handleRunner)
engine.NoRoute(handleRunner)
}
func simpleError(err error) *models.Error {

View File

@@ -1,19 +1,84 @@
package router
import (
"bytes"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/Sirupsen/logrus"
"github.com/gin-gonic/gin"
"github.com/iron-io/functions/api/models"
"github.com/iron-io/functions/api/runner"
)
func handleRunner(c *gin.Context) {
log := c.MustGet("log").(logrus.FieldLogger)
store := c.MustGet("store").(models.Datastore)
config := c.MustGet("config").(*models.Config)
err := runner.Run(c)
var err error
var payload []byte
if c.Request.Method == "POST" || c.Request.Method == "PUT" {
payload, err = ioutil.ReadAll(c.Request.Body)
} else if c.Request.Method == "GET" {
qPL := c.Request.URL.Query()["payload"]
if len(qPL) > 0 {
payload = []byte(qPL[0])
}
}
log.WithField("payload", string(payload)).Debug("Got payload")
appName := c.Param("app")
if appName == "" {
host := strings.Split(c.Request.Header.Get("Host"), ":")[0]
appName = strings.Split(host, ".")[0]
}
route := c.Param("route")
if route == "" {
route = c.Request.URL.Path
}
filter := &models.RouteFilter{
Path: route,
AppName: appName,
}
log.WithFields(logrus.Fields{"app": appName, "path": route}).Debug("Finding route on datastore")
routes, err := store.GetRoutes(filter)
if err != nil {
log.Debug(err)
c.JSON(http.StatusInternalServerError, simpleError(err))
log.WithError(err).Error(models.ErrRoutesList)
c.JSON(http.StatusInternalServerError, simpleError(models.ErrRoutesList))
}
log.WithField("routes", routes).Debug("Got routes from datastore")
for _, el := range routes {
if el.Path == route {
titanJob := runner.CreateTitanJob(&runner.RouteRunner{
Route: el,
Endpoint: config.API,
Payload: string(payload),
Timeout: 30 * time.Second,
})
if err := titanJob.Wait(); err != nil {
log.WithError(err).Error(models.ErrRunnerRunRoute)
c.JSON(http.StatusInternalServerError, simpleError(models.ErrRunnerRunRoute))
} else {
for k, v := range el.Headers {
c.Header(k, v[0])
}
c.Data(http.StatusOK, "", bytes.Trim(titanJob.Result(), "\x00"))
}
return
}
}
}

View File

@@ -7,16 +7,17 @@ import (
"github.com/Sirupsen/logrus"
"github.com/gin-gonic/gin"
"github.com/iron-io/functions/api/models"
"github.com/iron-io/functions/api/server/datastore"
"github.com/iron-io/functions/api/server/router"
)
type Server struct {
router *gin.Engine
cfg *Config
cfg *models.Config
}
func New(config *Config) *Server {
func New(config *models.Config) *Server {
return &Server{
router: gin.Default(),
cfg: config,
@@ -37,6 +38,10 @@ func (s *Server) Start() {
s.cfg.DatabaseURL = fmt.Sprintf("bolt://%s/bolt.db?bucket=funcs", cwd)
}
if s.cfg.API == "" {
s.cfg.API = "http://localhost:8080"
}
ds, err := datastore.New(s.cfg.DatabaseURL)
if err != nil {
logrus.WithError(err).Fatalln("Invalid DB url.")
@@ -46,6 +51,7 @@ func (s *Server) Start() {
logrus.SetLevel(logrus.DebugLevel)
s.router.Use(func(c *gin.Context) {
c.Set("config", s.cfg)
c.Set("store", ds)
c.Set("log", logrus.WithFields(extractFields(c)))
c.Next()

View File

@@ -1,168 +0,0 @@
// Copyright 2010 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"container/heap"
"flag"
"fmt"
"math/rand"
"time"
)
const nRequester = 100
const nWorker = 10
var roundRobin = flag.Bool("r", false, "use round-robin scheduling")
// Simulation of some work: just sleep for a while and report how long.
func op() int {
n := rand.Int63n(5)
time.Sleep(time.Duration(n) * time.Second)
return int(n)
}
type Request struct {
fn func() int
c chan int
}
func requester(work chan Request) {
c := make(chan int)
for {
time.Sleep(time.Duration(rand.Int63n(nWorker)) * time.Second)
work <- Request{op, c}
<-c
}
}
type Worker struct {
i int
requests chan Request
pending int
}
func (w *Worker) work(done chan *Worker) {
for {
req := <-w.requests
req.c <- req.fn()
done <- w
}
}
type Pool []*Worker
func (p Pool) Len() int { return len(p) }
func (p Pool) Less(i, j int) bool {
return p[i].pending < p[j].pending
}
func (p *Pool) Swap(i, j int) {
a := *p
a[i], a[j] = a[j], a[i]
a[i].i = i
a[j].i = j
}
func (p *Pool) Push(x interface{}) {
a := *p
n := len(a)
a = a[0 : n+1]
w := x.(*Worker)
a[n] = w
w.i = n
*p = a
}
func (p *Pool) Pop() interface{} {
a := *p
*p = a[0 : len(a)-1]
w := a[len(a)-1]
w.i = -1 // for safety
return w
}
type Balancer struct {
pool Pool
done chan *Worker
i int
}
func NewBalancer() *Balancer {
done := make(chan *Worker, nWorker)
b := &Balancer{make(Pool, 0, nWorker), done, 0}
for i := 0; i < nWorker; i++ {
w := &Worker{requests: make(chan Request, nRequester)}
heap.Push(&b.pool, w)
go w.work(b.done)
}
return b
}
func (b *Balancer) balance(work chan Request) {
for {
select {
case req := <-work:
b.dispatch(req)
case w := <-b.done:
b.completed(w)
}
b.print()
}
}
func (b *Balancer) print() {
sum := 0
sumsq := 0
for _, w := range b.pool {
fmt.Printf("%d ", w.pending)
sum += w.pending
sumsq += w.pending * w.pending
}
avg := float64(sum) / float64(len(b.pool))
variance := float64(sumsq)/float64(len(b.pool)) - avg*avg
fmt.Printf(" %.2f %.2f\n", avg, variance)
}
func (b *Balancer) dispatch(req Request) {
if *roundRobin {
w := b.pool[b.i]
w.requests <- req
w.pending++
b.i++
if b.i >= len(b.pool) {
b.i = 0
}
return
}
w := heap.Pop(&b.pool).(*Worker)
w.requests <- req
w.pending++
// fmt.Printf("started %p; now %d\n", w, w.pending)
heap.Push(&b.pool, w)
}
func (b *Balancer) completed(w *Worker) {
if *roundRobin {
w.pending--
return
}
w.pending--
// fmt.Printf("finished %p; now %d\n", w, w.pending)
heap.Remove(&b.pool, w.i)
heap.Push(&b.pool, w)
}
func main() {
flag.Parse()
work := make(chan Request)
for i := 0; i < nRequester; i++ {
go requester(work)
}
NewBalancer().balance(work)
}

View File

@@ -11,12 +11,14 @@ import (
"os"
log "github.com/Sirupsen/logrus"
"github.com/iron-io/functions/api/models"
"github.com/iron-io/functions/api/server"
)
func main() {
config := &server.Config{}
config := &models.Config{}
config.DatabaseURL = os.Getenv("DB")
config.API = os.Getenv("API")
err := config.Validate()
if err != nil {