Merge pull request #26 from pedronasser/fix-runner

Changing runner from Titan API to Titan's runner driver
This commit is contained in:
Travis Reeder
2016-07-29 11:41:16 -07:00
committed by GitHub
4 changed files with 120 additions and 236 deletions

View File

@@ -1,136 +1,95 @@
package runner
import "github.com/iron-io/functions/api/models"
import "time"
import (
"bytes"
"fmt"
"time"
type RouteRunner struct {
"golang.org/x/net/context"
"github.com/iron-io/functions/api/models"
"github.com/iron-io/titan/common"
"github.com/iron-io/titan/runner/agent"
"github.com/iron-io/titan/runner/configloader"
"github.com/iron-io/titan/runner/drivers"
"github.com/iron-io/titan/runner/drivers/docker"
"github.com/iron-io/titan/runner/drivers/mock"
)
type Config struct {
Ctx context.Context
Route *models.Route
Endpoint string
Payload string
Timeout time.Duration
}
// TODO: use Docker utils from docker-job for this and a few others in here
// func DockerRun(route *models.Route, c *gin.Context) error {
// image := route.Image
// payload := c.Value("payload").(string)
type Runner struct {
cfg *Config
status string
out bytes.Buffer
err bytes.Buffer
}
// for k, v := range route.Headers {
// c.Header(k, v[0])
// }
func New(cfg *Config) *Runner {
return &Runner{
cfg: cfg,
}
}
// // TODO: swap all this out with Titan's running via API
// cmd := exec.Command("docker", "run", "--rm", "-i", "-e", fmt.Sprintf("PAYLOAD=%v", payload), image)
// stdout, err := cmd.StdoutPipe()
// if err != nil {
// log.Fatal(err)
// }
// stderr, err := cmd.StderrPipe()
// if err != nil {
// log.Fatal(err)
// }
// if err := cmd.Start(); err != nil {
// log.Fatal(err)
// }
// var b bytes.Buffer
// buff := bufio.NewWriter(&b)
func (r *Runner) Run() error {
var err error
// go io.Copy(buff, stdout)
// go io.Copy(buff, stderr)
runnerConfig := configloader.RunnerConfiguration()
// log.Printf("Waiting for command to finish...")
// if err = cmd.Wait(); err != nil {
// // job failed
// // log.Infoln("job finished with err:", err)
// // log.WithFields(log.Fields{"metric": "run.errors", "value": 1, "type": "count"}).Infoln("failed run")
// return err
// // TODO: wrap error in json "error": buff
// }
au := agent.ConfigAuth{runnerConfig.Registries}
// // log.Infoln("Docker ran successfully:", b.String())
// // print
// // log.WithFields(log.Fields{"metric": "run.success", "value": 1, "type": "count"}).Infoln("successful run")
// // log.WithFields(log.Fields{"metric": "run", "value": 1, "type": "count"}).Infoln("job ran")
// buff.Flush()
// TODO: Is this really required for Titan's driver?
// Can we remove it?
env := common.NewEnvironment(func(e *common.Environment) {})
// c.Data(http.StatusOK, "", bytes.Trim(b.Bytes(), "\x00"))
// TODO: Create a drivers.New(runnerConfig) in Titan
driver, err := selectDriver(env, runnerConfig)
if err != nil {
return err
}
// return nil
// }
ctask := &containerTask{
cfg: r.cfg,
auth: &au,
stdout: &r.out,
stderr: &r.err,
}
// func DockerHost(el *models.Route, c *gin.Context) error {
// ra := runningImages[el.Image]
// if ra == nil {
// ra = &RunningApp{}
// ra.Route = el
// ra.Port = rand.Intn(9999-9000) + 9000
// ra.ContainerName = fmt.Sprintf("c_%v", rand.Intn(10000))
// runningImages[el.Image] = ra
// // TODO: timeout 59 minutes. Mark it in ra as terminated.
// cmd := exec.Command("docker", "run", "--name", ra.ContainerName, "--rm", "-i", "-p", fmt.Sprintf("%v:8080", ra.Port), el.Image)
// // TODO: What should we do with the output here? Store it? Send it to a log service?
// // cmd.Stdout = os.Stdout
// cmd.Stderr = os.Stderr
// // TODO: Need to catch interrupt and stop all containers that are started, see devo/dj for how to do this
// if err := cmd.Start(); err != nil {
// return err
// // TODO: What if the app fails to start? Don't want to keep starting the container
// }
// } else {
// // TODO: check if it's still running?
// // TODO: if ra.terminated, then start new container?
// }
// fmt.Println("RunningApp:", ra)
// // TODO: if connection fails, check if container still running? If not, start it again
// resp, err := http.Get(fmt.Sprintf("http://0.0.0.0:%v%v", ra.Port, el.ContainerPath))
// if err != nil {
// return err
// }
// defer resp.Body.Close()
// body, err := ioutil.ReadAll(resp.Body)
// if err != nil {
// return err
// }
result, err := driver.Run(r.cfg.Ctx, ctask)
if err != nil {
return err
}
// c.Data(http.StatusOK, "", body)
// return nil
// }
r.status = result.Status()
// func checkAndPull(image string) error {
// err := execAndPrint("docker", []string{"inspect", image})
// if err != nil {
// // image does not exist, so let's pull
// fmt.Println("Image not found locally, will pull.", err)
// err = execAndPrint("docker", []string{"pull", image})
// }
// return err
// }
return nil
}
// func execAndPrint(cmdstr string, args []string) error {
// var bout bytes.Buffer
// buffout := bufio.NewWriter(&bout)
// var berr bytes.Buffer
// bufferr := bufio.NewWriter(&berr)
// cmd := exec.Command(cmdstr, args...)
// stdout, err := cmd.StdoutPipe()
// if err != nil {
// return err
// }
// stderr, err := cmd.StderrPipe()
// if err != nil {
// return err
// }
// if err := cmd.Start(); err != nil {
// return err
// }
// go io.Copy(buffout, stdout)
// go io.Copy(bufferr, stderr)
func (r *Runner) ReadOut() []byte {
return r.out.Bytes()
}
// log.Printf("Waiting for cmd to finish...")
// err = cmd.Wait()
// if berr.Len() != 0 {
// fmt.Println("stderr:", berr.String())
// }
// fmt.Println("stdout:", bout.String())
// return err
// }
func (r Runner) ReadErr() []byte {
return r.err.Bytes()
}
func (r Runner) Status() string {
return r.status
}
func selectDriver(env *common.Environment, conf *agent.Config) (drivers.Driver, error) {
switch conf.Driver {
case "docker":
docker := docker.NewDocker(env, conf.DriverConfig)
return docker, nil
case "mock":
return mock.New(), nil
}
return nil, fmt.Errorf("driver %v not found", conf.Driver)
}

38
api/runner/task.go Normal file
View File

@@ -0,0 +1,38 @@
package runner
import (
"io"
dockercli "github.com/fsouza/go-dockerclient"
"github.com/iron-io/titan/runner/tasker"
)
type containerTask struct {
auth tasker.Auther
stdout io.Writer
stderr io.Writer
cfg *Config
}
func (t *containerTask) Command() string { return "" }
func (t *containerTask) EnvVars() map[string]string {
env := map[string]string{
"PAYLOAD": t.cfg.Payload,
}
return env
}
func (t *containerTask) Id() string { return "" }
func (t *containerTask) Group() string { return "" }
func (t *containerTask) Image() string { return t.cfg.Route.Image }
func (t *containerTask) Timeout() uint { return uint(t.cfg.Timeout.Seconds()) }
func (t *containerTask) Logger() (stdout, stderr io.Writer) { return t.stdout, t.stderr }
func (t *containerTask) Volumes() [][2]string { return [][2]string{} }
func (t *containerTask) WorkDir() string { return "" }
func (t *containerTask) Close() {}
func (t *containerTask) DockerAuth() []dockercli.AuthConfiguration {
return t.auth.Auth(t.Image())
}

View File

@@ -1,117 +0,0 @@
package runner
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"
"github.com/iron-io/functions/api/models"
tmodels "github.com/iron-io/titan/jobserver/models"
)
type TitanJob struct {
runner *RouteRunner
resultChan chan error
result []byte
}
var versionPath = "/v1"
func CreateTitanJob(runner *RouteRunner) *TitanJob {
t := &TitanJob{
runner: runner,
resultChan: make(chan error),
}
go t.Start()
return t
}
func (t *TitanJob) Start() {
newjob := tmodels.JobsWrapper{
Jobs: []*tmodels.Job{
&tmodels.Job{
NewJob: tmodels.NewJob{
Image: &t.runner.Route.Image,
Payload: t.runner.Payload,
},
},
},
}
jobJSON, err := json.Marshal(newjob)
if err != nil {
t.resultChan <- models.ErrInvalidJSON
return
}
resp, err := t.titanPOST(fmt.Sprintf("/groups/app-%s/jobs", t.runner.Route.AppName), bytes.NewBuffer(jobJSON))
if err != nil {
t.resultChan <- models.ErrRunnerAPICantConnect
return
}
var resultJobs tmodels.JobsWrapper
respBody, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(respBody, &resultJobs)
if err != nil {
t.resultChan <- models.ErrInvalidJSON
return
}
if resultJobs.Jobs == nil {
t.resultChan <- models.ErrRunnerAPICreateJob
return
}
job := resultJobs.Jobs[0]
begin := time.Now()
for len(t.result) == 0 {
if time.Since(begin) > t.runner.Timeout {
t.resultChan <- models.ErrRunnerTimeout
return
}
resp, err := t.titanGET(fmt.Sprintf("/groups/app-%s/jobs/%s/log", t.runner.Route.AppName, job.ID))
if err == nil {
fmt.Println(resp.Status)
if resp.StatusCode == http.StatusOK {
resBody, err := ioutil.ReadAll(resp.Body)
fmt.Println(string(resBody))
if err != nil {
t.resultChan <- models.ErrRunnerInvalidResponse
return
}
t.result = resBody
continue
}
}
time.Sleep(100 * time.Millisecond)
}
t.resultChan <- nil
}
func (t *TitanJob) Wait() error {
return <-t.resultChan
}
func (t TitanJob) Result() []byte {
return t.result
}
func (t TitanJob) titanPOST(path string, body io.Reader) (*http.Response, error) {
fmt.Println(fmt.Sprintf("%s%s%s", t.runner.Endpoint, versionPath, path))
return http.Post(fmt.Sprintf("%s%s%s", t.runner.Endpoint, versionPath, path), "application/json", body)
}
func (t TitanJob) titanGET(path string) (*http.Response, error) {
fmt.Println(fmt.Sprintf("%s%s%s", t.runner.Endpoint, versionPath, path))
return http.Get(fmt.Sprintf("%s%s%s", t.runner.Endpoint, versionPath, path))
}

View File

@@ -1,7 +1,6 @@
package router
import (
"bytes"
"io/ioutil"
"net/http"
"strings"
@@ -60,14 +59,15 @@ func handleRunner(c *gin.Context) {
for _, el := range routes {
if el.Path == route {
titanJob := runner.CreateTitanJob(&runner.RouteRunner{
run := runner.New(&runner.Config{
Ctx: c,
Route: el,
Endpoint: config.API,
Payload: string(payload),
Timeout: 30 * time.Second,
})
if err := titanJob.Wait(); err != nil {
if err := run.Run(); err != nil {
log.WithError(err).Error(models.ErrRunnerRunRoute)
c.JSON(http.StatusInternalServerError, simpleError(models.ErrRunnerRunRoute))
} else {
@@ -75,7 +75,11 @@ func handleRunner(c *gin.Context) {
c.Header(k, v[0])
}
c.Data(http.StatusOK, "", bytes.Trim(titanJob.Result(), "\x00"))
if run.Status() == "success" {
c.Data(http.StatusOK, "", run.ReadOut())
} else {
c.Data(http.StatusInternalServerError, "", run.ReadErr())
}
}
return
}