diff --git a/api/runner/job.go b/api/runner/job.go new file mode 100644 index 000000000..b6097cc81 --- /dev/null +++ b/api/runner/job.go @@ -0,0 +1,43 @@ +package runner + +import ( + "io" + "os" + + dockercli "github.com/fsouza/go-dockerclient" + "github.com/iron-io/titan/runner/tasker" + titan_models "github.com/iron-io/titan/runner/tasker/client/models" +) + +type WrapperJob struct { + auth tasker.Auther + log *os.File + m *titan_models.Job +} + +func (f *WrapperJob) Command() string { return "" } + +func (f *WrapperJob) EnvVars() map[string]string { + m := map[string]string{ + "JOB_ID": f.Id(), + "PAYLOAD": f.m.Payload, + } + for k, v := range f.m.EnvVars { + m[k] = v + } + return m +} + +func (f *WrapperJob) Id() string { return f.m.ID } +func (f *WrapperJob) Group() string { return f.m.GroupName } +func (f *WrapperJob) Image() string { return *f.m.Image } +func (f *WrapperJob) Timeout() uint { return uint(*f.m.Timeout) } +func (f *WrapperJob) Logger() (stdout, stderr io.Writer) { return f.log, f.log } +func (f *WrapperJob) Volumes() [][2]string { return [][2]string{} } +func (f *WrapperJob) WorkDir() string { return "" } + +func (f *WrapperJob) Close() {} + +func (f *WrapperJob) DockerAuth() []dockercli.AuthConfiguration { + return f.auth.Auth(f.Image()) +} diff --git a/api/runner/runner.go b/api/runner/runner.go index 6d833c768..0538204bf 100644 --- a/api/runner/runner.go +++ b/api/runner/runner.go @@ -1,136 +1,113 @@ package runner -import "github.com/iron-io/functions/api/models" -import "time" +import ( + "fmt" + "io/ioutil" + "os" + "time" -type RouteRunner struct { + "golang.org/x/net/context" + + "github.com/iron-io/functions/api/models" + "github.com/iron-io/titan/common" + "github.com/iron-io/titan/runner/agent" + "github.com/iron-io/titan/runner/configloader" + "github.com/iron-io/titan/runner/drivers" + "github.com/iron-io/titan/runner/drivers/docker" + "github.com/iron-io/titan/runner/drivers/mock" + titan_models "github.com/iron-io/titan/runner/tasker/client/models" +) + +type Config struct { Route *models.Route Endpoint string Payload string Timeout time.Duration } -// TODO: use Docker utils from docker-job for this and a few others in here -// func DockerRun(route *models.Route, c *gin.Context) error { -// image := route.Image -// payload := c.Value("payload").(string) +type Runner struct { + cfg *Config + status string + result []byte +} -// for k, v := range route.Headers { -// c.Header(k, v[0]) -// } +func New(cfg *Config) *Runner { + return &Runner{ + cfg: cfg, + } +} -// // TODO: swap all this out with Titan's running via API -// cmd := exec.Command("docker", "run", "--rm", "-i", "-e", fmt.Sprintf("PAYLOAD=%v", payload), image) -// stdout, err := cmd.StdoutPipe() -// if err != nil { -// log.Fatal(err) -// } -// stderr, err := cmd.StderrPipe() -// if err != nil { -// log.Fatal(err) -// } -// if err := cmd.Start(); err != nil { -// log.Fatal(err) -// } -// var b bytes.Buffer -// buff := bufio.NewWriter(&b) +func (r *Runner) Start() error { + image := r.cfg.Route.Image + payload := r.cfg.Payload + timeout := int32(r.cfg.Timeout.Seconds()) -// go io.Copy(buff, stdout) -// go io.Copy(buff, stderr) + var err error -// log.Printf("Waiting for command to finish...") -// if err = cmd.Wait(); err != nil { -// // job failed -// // log.Infoln("job finished with err:", err) -// // log.WithFields(log.Fields{"metric": "run.errors", "value": 1, "type": "count"}).Infoln("failed run") -// return err -// // TODO: wrap error in json "error": buff -// } + runnerConfig := configloader.RunnerConfiguration() -// // log.Infoln("Docker ran successfully:", b.String()) -// // print -// // log.WithFields(log.Fields{"metric": "run.success", "value": 1, "type": "count"}).Infoln("successful run") -// // log.WithFields(log.Fields{"metric": "run", "value": 1, "type": "count"}).Infoln("job ran") -// buff.Flush() + au := agent.ConfigAuth{runnerConfig.Registries} + env := common.NewEnvironment(func(e *common.Environment) {}) + driver, err := selectDriver(env, runnerConfig) + if err != nil { + return err + } -// c.Data(http.StatusOK, "", bytes.Trim(b.Bytes(), "\x00")) + job := &titan_models.Job{ + NewJob: titan_models.NewJob{ + Image: &image, + Payload: payload, + Timeout: &timeout, + }, + } -// return nil -// } + tempLog, err := ensureLogFile(job) + if err != nil { + return err + } + defer tempLog.Close() -// func DockerHost(el *models.Route, c *gin.Context) error { -// ra := runningImages[el.Image] -// if ra == nil { -// ra = &RunningApp{} -// ra.Route = el -// ra.Port = rand.Intn(9999-9000) + 9000 -// ra.ContainerName = fmt.Sprintf("c_%v", rand.Intn(10000)) -// runningImages[el.Image] = ra -// // TODO: timeout 59 minutes. Mark it in ra as terminated. -// cmd := exec.Command("docker", "run", "--name", ra.ContainerName, "--rm", "-i", "-p", fmt.Sprintf("%v:8080", ra.Port), el.Image) -// // TODO: What should we do with the output here? Store it? Send it to a log service? -// // cmd.Stdout = os.Stdout -// cmd.Stderr = os.Stderr -// // TODO: Need to catch interrupt and stop all containers that are started, see devo/dj for how to do this -// if err := cmd.Start(); err != nil { -// return err -// // TODO: What if the app fails to start? Don't want to keep starting the container -// } -// } else { -// // TODO: check if it's still running? -// // TODO: if ra.terminated, then start new container? -// } -// fmt.Println("RunningApp:", ra) -// // TODO: if connection fails, check if container still running? If not, start it again -// resp, err := http.Get(fmt.Sprintf("http://0.0.0.0:%v%v", ra.Port, el.ContainerPath)) -// if err != nil { -// return err -// } -// defer resp.Body.Close() -// body, err := ioutil.ReadAll(resp.Body) -// if err != nil { -// return err -// } + wjob := &WrapperJob{ + auth: &au, + m: job, + log: tempLog, + } -// c.Data(http.StatusOK, "", body) -// return nil -// } + result, err := driver.Run(context.Background(), wjob) + if err != nil { + return err + } -// func checkAndPull(image string) error { -// err := execAndPrint("docker", []string{"inspect", image}) -// if err != nil { -// // image does not exist, so let's pull -// fmt.Println("Image not found locally, will pull.", err) -// err = execAndPrint("docker", []string{"pull", image}) -// } -// return err -// } + b, _ := ioutil.ReadFile(tempLog.Name()) + r.result = b + r.status = result.Status() -// func execAndPrint(cmdstr string, args []string) error { -// var bout bytes.Buffer -// buffout := bufio.NewWriter(&bout) -// var berr bytes.Buffer -// bufferr := bufio.NewWriter(&berr) -// cmd := exec.Command(cmdstr, args...) -// stdout, err := cmd.StdoutPipe() -// if err != nil { -// return err -// } -// stderr, err := cmd.StderrPipe() -// if err != nil { -// return err -// } -// if err := cmd.Start(); err != nil { -// return err -// } -// go io.Copy(buffout, stdout) -// go io.Copy(bufferr, stderr) + return nil +} -// log.Printf("Waiting for cmd to finish...") -// err = cmd.Wait() -// if berr.Len() != 0 { -// fmt.Println("stderr:", berr.String()) -// } -// fmt.Println("stdout:", bout.String()) -// return err -// } +func (r Runner) Result() []byte { + return r.result +} + +func (r Runner) Status() string { + return r.status +} + +func ensureLogFile(job *titan_models.Job) (*os.File, error) { + log, err := ioutil.TempFile("", fmt.Sprintf("titan-log-%s", job.ID)) + if err != nil { + return nil, fmt.Errorf("couldn't open task log for writing: %v", err) + } + return log, nil +} + +func selectDriver(env *common.Environment, conf *agent.Config) (drivers.Driver, error) { + switch conf.Driver { + case "docker": + docker := docker.NewDocker(env, conf.DriverConfig) + return docker, nil + case "mock": + return mock.New(), nil + } + return nil, fmt.Errorf("driver %v not found", conf.Driver) +} diff --git a/api/runner/titan.go b/api/runner/titan.go deleted file mode 100644 index 30a7583a5..000000000 --- a/api/runner/titan.go +++ /dev/null @@ -1,117 +0,0 @@ -package runner - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - "io/ioutil" - "net/http" - "time" - - "github.com/iron-io/functions/api/models" - tmodels "github.com/iron-io/titan/jobserver/models" -) - -type TitanJob struct { - runner *RouteRunner - resultChan chan error - result []byte -} - -var versionPath = "/v1" - -func CreateTitanJob(runner *RouteRunner) *TitanJob { - t := &TitanJob{ - runner: runner, - resultChan: make(chan error), - } - - go t.Start() - - return t -} - -func (t *TitanJob) Start() { - newjob := tmodels.JobsWrapper{ - Jobs: []*tmodels.Job{ - &tmodels.Job{ - NewJob: tmodels.NewJob{ - Image: &t.runner.Route.Image, - Payload: t.runner.Payload, - }, - }, - }, - } - - jobJSON, err := json.Marshal(newjob) - if err != nil { - t.resultChan <- models.ErrInvalidJSON - return - } - - resp, err := t.titanPOST(fmt.Sprintf("/groups/app-%s/jobs", t.runner.Route.AppName), bytes.NewBuffer(jobJSON)) - if err != nil { - t.resultChan <- models.ErrRunnerAPICantConnect - return - } - - var resultJobs tmodels.JobsWrapper - respBody, err := ioutil.ReadAll(resp.Body) - err = json.Unmarshal(respBody, &resultJobs) - if err != nil { - t.resultChan <- models.ErrInvalidJSON - return - } - - if resultJobs.Jobs == nil { - t.resultChan <- models.ErrRunnerAPICreateJob - return - } - - job := resultJobs.Jobs[0] - begin := time.Now() - for len(t.result) == 0 { - if time.Since(begin) > t.runner.Timeout { - t.resultChan <- models.ErrRunnerTimeout - return - } - - resp, err := t.titanGET(fmt.Sprintf("/groups/app-%s/jobs/%s/log", t.runner.Route.AppName, job.ID)) - if err == nil { - fmt.Println(resp.Status) - if resp.StatusCode == http.StatusOK { - resBody, err := ioutil.ReadAll(resp.Body) - fmt.Println(string(resBody)) - if err != nil { - t.resultChan <- models.ErrRunnerInvalidResponse - return - } - - t.result = resBody - continue - } - } - time.Sleep(100 * time.Millisecond) - } - - t.resultChan <- nil -} - -func (t *TitanJob) Wait() error { - return <-t.resultChan -} - -func (t TitanJob) Result() []byte { - return t.result -} - -func (t TitanJob) titanPOST(path string, body io.Reader) (*http.Response, error) { - fmt.Println(fmt.Sprintf("%s%s%s", t.runner.Endpoint, versionPath, path)) - return http.Post(fmt.Sprintf("%s%s%s", t.runner.Endpoint, versionPath, path), "application/json", body) -} - -func (t TitanJob) titanGET(path string) (*http.Response, error) { - fmt.Println(fmt.Sprintf("%s%s%s", t.runner.Endpoint, versionPath, path)) - return http.Get(fmt.Sprintf("%s%s%s", t.runner.Endpoint, versionPath, path)) -} diff --git a/api/server/router/runner.go b/api/server/router/runner.go index 89b85436f..fb57f36b0 100644 --- a/api/server/router/runner.go +++ b/api/server/router/runner.go @@ -60,14 +60,14 @@ func handleRunner(c *gin.Context) { for _, el := range routes { if el.Path == route { - titanJob := runner.CreateTitanJob(&runner.RouteRunner{ + run := runner.New(&runner.Config{ Route: el, Endpoint: config.API, Payload: string(payload), Timeout: 30 * time.Second, }) - if err := titanJob.Wait(); err != nil { + if err := run.Start(); err != nil { log.WithError(err).Error(models.ErrRunnerRunRoute) c.JSON(http.StatusInternalServerError, simpleError(models.ErrRunnerRunRoute)) } else { @@ -75,7 +75,11 @@ func handleRunner(c *gin.Context) { c.Header(k, v[0]) } - c.Data(http.StatusOK, "", bytes.Trim(titanJob.Result(), "\x00")) + if run.Status() == "success" { + c.Data(http.StatusOK, "", bytes.Trim(run.Result(), "\x00")) + } else { + c.Data(http.StatusInternalServerError, "", bytes.Trim(run.Result(), "\x00")) + } } return }