changing titan from API to interface

This commit is contained in:
Pedro Nasser
2016-07-28 01:02:22 -03:00
parent 2578530822
commit a92dffb3fc
4 changed files with 142 additions and 235 deletions

43
api/runner/job.go Normal file
View File

@@ -0,0 +1,43 @@
package runner
import (
"io"
"os"
dockercli "github.com/fsouza/go-dockerclient"
"github.com/iron-io/titan/runner/tasker"
titan_models "github.com/iron-io/titan/runner/tasker/client/models"
)
type WrapperJob struct {
auth tasker.Auther
log *os.File
m *titan_models.Job
}
func (f *WrapperJob) Command() string { return "" }
func (f *WrapperJob) EnvVars() map[string]string {
m := map[string]string{
"JOB_ID": f.Id(),
"PAYLOAD": f.m.Payload,
}
for k, v := range f.m.EnvVars {
m[k] = v
}
return m
}
func (f *WrapperJob) Id() string { return f.m.ID }
func (f *WrapperJob) Group() string { return f.m.GroupName }
func (f *WrapperJob) Image() string { return *f.m.Image }
func (f *WrapperJob) Timeout() uint { return uint(*f.m.Timeout) }
func (f *WrapperJob) Logger() (stdout, stderr io.Writer) { return f.log, f.log }
func (f *WrapperJob) Volumes() [][2]string { return [][2]string{} }
func (f *WrapperJob) WorkDir() string { return "" }
func (f *WrapperJob) Close() {}
func (f *WrapperJob) DockerAuth() []dockercli.AuthConfiguration {
return f.auth.Auth(f.Image())
}

View File

@@ -1,136 +1,113 @@
package runner
import "github.com/iron-io/functions/api/models"
import "time"
import (
"fmt"
"io/ioutil"
"os"
"time"
type RouteRunner struct {
"golang.org/x/net/context"
"github.com/iron-io/functions/api/models"
"github.com/iron-io/titan/common"
"github.com/iron-io/titan/runner/agent"
"github.com/iron-io/titan/runner/configloader"
"github.com/iron-io/titan/runner/drivers"
"github.com/iron-io/titan/runner/drivers/docker"
"github.com/iron-io/titan/runner/drivers/mock"
titan_models "github.com/iron-io/titan/runner/tasker/client/models"
)
type Config struct {
Route *models.Route
Endpoint string
Payload string
Timeout time.Duration
}
// TODO: use Docker utils from docker-job for this and a few others in here
// func DockerRun(route *models.Route, c *gin.Context) error {
// image := route.Image
// payload := c.Value("payload").(string)
type Runner struct {
cfg *Config
status string
result []byte
}
// for k, v := range route.Headers {
// c.Header(k, v[0])
// }
func New(cfg *Config) *Runner {
return &Runner{
cfg: cfg,
}
}
// // TODO: swap all this out with Titan's running via API
// cmd := exec.Command("docker", "run", "--rm", "-i", "-e", fmt.Sprintf("PAYLOAD=%v", payload), image)
// stdout, err := cmd.StdoutPipe()
// if err != nil {
// log.Fatal(err)
// }
// stderr, err := cmd.StderrPipe()
// if err != nil {
// log.Fatal(err)
// }
// if err := cmd.Start(); err != nil {
// log.Fatal(err)
// }
// var b bytes.Buffer
// buff := bufio.NewWriter(&b)
func (r *Runner) Start() error {
image := r.cfg.Route.Image
payload := r.cfg.Payload
timeout := int32(r.cfg.Timeout.Seconds())
// go io.Copy(buff, stdout)
// go io.Copy(buff, stderr)
var err error
// log.Printf("Waiting for command to finish...")
// if err = cmd.Wait(); err != nil {
// // job failed
// // log.Infoln("job finished with err:", err)
// // log.WithFields(log.Fields{"metric": "run.errors", "value": 1, "type": "count"}).Infoln("failed run")
// return err
// // TODO: wrap error in json "error": buff
// }
runnerConfig := configloader.RunnerConfiguration()
// // log.Infoln("Docker ran successfully:", b.String())
// // print
// // log.WithFields(log.Fields{"metric": "run.success", "value": 1, "type": "count"}).Infoln("successful run")
// // log.WithFields(log.Fields{"metric": "run", "value": 1, "type": "count"}).Infoln("job ran")
// buff.Flush()
au := agent.ConfigAuth{runnerConfig.Registries}
env := common.NewEnvironment(func(e *common.Environment) {})
driver, err := selectDriver(env, runnerConfig)
if err != nil {
return err
}
// c.Data(http.StatusOK, "", bytes.Trim(b.Bytes(), "\x00"))
job := &titan_models.Job{
NewJob: titan_models.NewJob{
Image: &image,
Payload: payload,
Timeout: &timeout,
},
}
// return nil
// }
tempLog, err := ensureLogFile(job)
if err != nil {
return err
}
defer tempLog.Close()
// func DockerHost(el *models.Route, c *gin.Context) error {
// ra := runningImages[el.Image]
// if ra == nil {
// ra = &RunningApp{}
// ra.Route = el
// ra.Port = rand.Intn(9999-9000) + 9000
// ra.ContainerName = fmt.Sprintf("c_%v", rand.Intn(10000))
// runningImages[el.Image] = ra
// // TODO: timeout 59 minutes. Mark it in ra as terminated.
// cmd := exec.Command("docker", "run", "--name", ra.ContainerName, "--rm", "-i", "-p", fmt.Sprintf("%v:8080", ra.Port), el.Image)
// // TODO: What should we do with the output here? Store it? Send it to a log service?
// // cmd.Stdout = os.Stdout
// cmd.Stderr = os.Stderr
// // TODO: Need to catch interrupt and stop all containers that are started, see devo/dj for how to do this
// if err := cmd.Start(); err != nil {
// return err
// // TODO: What if the app fails to start? Don't want to keep starting the container
// }
// } else {
// // TODO: check if it's still running?
// // TODO: if ra.terminated, then start new container?
// }
// fmt.Println("RunningApp:", ra)
// // TODO: if connection fails, check if container still running? If not, start it again
// resp, err := http.Get(fmt.Sprintf("http://0.0.0.0:%v%v", ra.Port, el.ContainerPath))
// if err != nil {
// return err
// }
// defer resp.Body.Close()
// body, err := ioutil.ReadAll(resp.Body)
// if err != nil {
// return err
// }
wjob := &WrapperJob{
auth: &au,
m: job,
log: tempLog,
}
// c.Data(http.StatusOK, "", body)
// return nil
// }
result, err := driver.Run(context.Background(), wjob)
if err != nil {
return err
}
// func checkAndPull(image string) error {
// err := execAndPrint("docker", []string{"inspect", image})
// if err != nil {
// // image does not exist, so let's pull
// fmt.Println("Image not found locally, will pull.", err)
// err = execAndPrint("docker", []string{"pull", image})
// }
// return err
// }
b, _ := ioutil.ReadFile(tempLog.Name())
r.result = b
r.status = result.Status()
// func execAndPrint(cmdstr string, args []string) error {
// var bout bytes.Buffer
// buffout := bufio.NewWriter(&bout)
// var berr bytes.Buffer
// bufferr := bufio.NewWriter(&berr)
// cmd := exec.Command(cmdstr, args...)
// stdout, err := cmd.StdoutPipe()
// if err != nil {
// return err
// }
// stderr, err := cmd.StderrPipe()
// if err != nil {
// return err
// }
// if err := cmd.Start(); err != nil {
// return err
// }
// go io.Copy(buffout, stdout)
// go io.Copy(bufferr, stderr)
return nil
}
// log.Printf("Waiting for cmd to finish...")
// err = cmd.Wait()
// if berr.Len() != 0 {
// fmt.Println("stderr:", berr.String())
// }
// fmt.Println("stdout:", bout.String())
// return err
// }
func (r Runner) Result() []byte {
return r.result
}
func (r Runner) Status() string {
return r.status
}
func ensureLogFile(job *titan_models.Job) (*os.File, error) {
log, err := ioutil.TempFile("", fmt.Sprintf("titan-log-%s", job.ID))
if err != nil {
return nil, fmt.Errorf("couldn't open task log for writing: %v", err)
}
return log, nil
}
func selectDriver(env *common.Environment, conf *agent.Config) (drivers.Driver, error) {
switch conf.Driver {
case "docker":
docker := docker.NewDocker(env, conf.DriverConfig)
return docker, nil
case "mock":
return mock.New(), nil
}
return nil, fmt.Errorf("driver %v not found", conf.Driver)
}

View File

@@ -1,117 +0,0 @@
package runner
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"
"github.com/iron-io/functions/api/models"
tmodels "github.com/iron-io/titan/jobserver/models"
)
type TitanJob struct {
runner *RouteRunner
resultChan chan error
result []byte
}
var versionPath = "/v1"
func CreateTitanJob(runner *RouteRunner) *TitanJob {
t := &TitanJob{
runner: runner,
resultChan: make(chan error),
}
go t.Start()
return t
}
func (t *TitanJob) Start() {
newjob := tmodels.JobsWrapper{
Jobs: []*tmodels.Job{
&tmodels.Job{
NewJob: tmodels.NewJob{
Image: &t.runner.Route.Image,
Payload: t.runner.Payload,
},
},
},
}
jobJSON, err := json.Marshal(newjob)
if err != nil {
t.resultChan <- models.ErrInvalidJSON
return
}
resp, err := t.titanPOST(fmt.Sprintf("/groups/app-%s/jobs", t.runner.Route.AppName), bytes.NewBuffer(jobJSON))
if err != nil {
t.resultChan <- models.ErrRunnerAPICantConnect
return
}
var resultJobs tmodels.JobsWrapper
respBody, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(respBody, &resultJobs)
if err != nil {
t.resultChan <- models.ErrInvalidJSON
return
}
if resultJobs.Jobs == nil {
t.resultChan <- models.ErrRunnerAPICreateJob
return
}
job := resultJobs.Jobs[0]
begin := time.Now()
for len(t.result) == 0 {
if time.Since(begin) > t.runner.Timeout {
t.resultChan <- models.ErrRunnerTimeout
return
}
resp, err := t.titanGET(fmt.Sprintf("/groups/app-%s/jobs/%s/log", t.runner.Route.AppName, job.ID))
if err == nil {
fmt.Println(resp.Status)
if resp.StatusCode == http.StatusOK {
resBody, err := ioutil.ReadAll(resp.Body)
fmt.Println(string(resBody))
if err != nil {
t.resultChan <- models.ErrRunnerInvalidResponse
return
}
t.result = resBody
continue
}
}
time.Sleep(100 * time.Millisecond)
}
t.resultChan <- nil
}
func (t *TitanJob) Wait() error {
return <-t.resultChan
}
func (t TitanJob) Result() []byte {
return t.result
}
func (t TitanJob) titanPOST(path string, body io.Reader) (*http.Response, error) {
fmt.Println(fmt.Sprintf("%s%s%s", t.runner.Endpoint, versionPath, path))
return http.Post(fmt.Sprintf("%s%s%s", t.runner.Endpoint, versionPath, path), "application/json", body)
}
func (t TitanJob) titanGET(path string) (*http.Response, error) {
fmt.Println(fmt.Sprintf("%s%s%s", t.runner.Endpoint, versionPath, path))
return http.Get(fmt.Sprintf("%s%s%s", t.runner.Endpoint, versionPath, path))
}