mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
multiple ajustments
- renamed WrapperJob (not exported anymore) - removed need for temp log file - not using titan models - using gin.Context as runner context
This commit is contained in:
1
.glide/cache/src/https-golang.org-x-net
vendored
Submodule
1
.glide/cache/src/https-golang.org-x-net
vendored
Submodule
Submodule .glide/cache/src/https-golang.org-x-net added at 6a513affb3
@@ -1,43 +0,0 @@
|
|||||||
package runner
|
|
||||||
|
|
||||||
import (
|
|
||||||
"io"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
dockercli "github.com/fsouza/go-dockerclient"
|
|
||||||
"github.com/iron-io/titan/runner/tasker"
|
|
||||||
titan_models "github.com/iron-io/titan/runner/tasker/client/models"
|
|
||||||
)
|
|
||||||
|
|
||||||
type WrapperJob struct {
|
|
||||||
auth tasker.Auther
|
|
||||||
log *os.File
|
|
||||||
m *titan_models.Job
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *WrapperJob) Command() string { return "" }
|
|
||||||
|
|
||||||
func (f *WrapperJob) EnvVars() map[string]string {
|
|
||||||
m := map[string]string{
|
|
||||||
"JOB_ID": f.Id(),
|
|
||||||
"PAYLOAD": f.m.Payload,
|
|
||||||
}
|
|
||||||
for k, v := range f.m.EnvVars {
|
|
||||||
m[k] = v
|
|
||||||
}
|
|
||||||
return m
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *WrapperJob) Id() string { return f.m.ID }
|
|
||||||
func (f *WrapperJob) Group() string { return f.m.GroupName }
|
|
||||||
func (f *WrapperJob) Image() string { return *f.m.Image }
|
|
||||||
func (f *WrapperJob) Timeout() uint { return uint(*f.m.Timeout) }
|
|
||||||
func (f *WrapperJob) Logger() (stdout, stderr io.Writer) { return f.log, f.log }
|
|
||||||
func (f *WrapperJob) Volumes() [][2]string { return [][2]string{} }
|
|
||||||
func (f *WrapperJob) WorkDir() string { return "" }
|
|
||||||
|
|
||||||
func (f *WrapperJob) Close() {}
|
|
||||||
|
|
||||||
func (f *WrapperJob) DockerAuth() []dockercli.AuthConfiguration {
|
|
||||||
return f.auth.Auth(f.Image())
|
|
||||||
}
|
|
||||||
@@ -1,9 +1,8 @@
|
|||||||
package runner
|
package runner
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
|
||||||
"os"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
@@ -15,10 +14,10 @@ import (
|
|||||||
"github.com/iron-io/titan/runner/drivers"
|
"github.com/iron-io/titan/runner/drivers"
|
||||||
"github.com/iron-io/titan/runner/drivers/docker"
|
"github.com/iron-io/titan/runner/drivers/docker"
|
||||||
"github.com/iron-io/titan/runner/drivers/mock"
|
"github.com/iron-io/titan/runner/drivers/mock"
|
||||||
titan_models "github.com/iron-io/titan/runner/tasker/client/models"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
|
Ctx context.Context
|
||||||
Route *models.Route
|
Route *models.Route
|
||||||
Endpoint string
|
Endpoint string
|
||||||
Payload string
|
Payload string
|
||||||
@@ -28,7 +27,8 @@ type Config struct {
|
|||||||
type Runner struct {
|
type Runner struct {
|
||||||
cfg *Config
|
cfg *Config
|
||||||
status string
|
status string
|
||||||
result []byte
|
out bytes.Buffer
|
||||||
|
err bytes.Buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(cfg *Config) *Runner {
|
func New(cfg *Config) *Runner {
|
||||||
@@ -37,70 +37,52 @@ func New(cfg *Config) *Runner {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Runner) Start() error {
|
func (r *Runner) Run() error {
|
||||||
image := r.cfg.Route.Image
|
|
||||||
payload := r.cfg.Payload
|
|
||||||
timeout := int32(r.cfg.Timeout.Seconds())
|
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
runnerConfig := configloader.RunnerConfiguration()
|
runnerConfig := configloader.RunnerConfiguration()
|
||||||
|
|
||||||
au := agent.ConfigAuth{runnerConfig.Registries}
|
au := agent.ConfigAuth{runnerConfig.Registries}
|
||||||
|
|
||||||
|
// TODO: Is this really required for Titan's driver?
|
||||||
|
// Can we remove it?
|
||||||
env := common.NewEnvironment(func(e *common.Environment) {})
|
env := common.NewEnvironment(func(e *common.Environment) {})
|
||||||
|
|
||||||
|
// TODO: Create a drivers.New(runnerConfig) in Titan
|
||||||
driver, err := selectDriver(env, runnerConfig)
|
driver, err := selectDriver(env, runnerConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
job := &titan_models.Job{
|
ctask := &containerTask{
|
||||||
NewJob: titan_models.NewJob{
|
cfg: r.cfg,
|
||||||
Image: &image,
|
|
||||||
Payload: payload,
|
|
||||||
Timeout: &timeout,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
tempLog, err := ensureLogFile(job)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer tempLog.Close()
|
|
||||||
|
|
||||||
wjob := &WrapperJob{
|
|
||||||
auth: &au,
|
auth: &au,
|
||||||
m: job,
|
stdout: &r.out,
|
||||||
log: tempLog,
|
stderr: &r.err,
|
||||||
}
|
}
|
||||||
|
|
||||||
result, err := driver.Run(context.Background(), wjob)
|
result, err := driver.Run(r.cfg.Ctx, ctask)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
b, _ := ioutil.ReadFile(tempLog.Name())
|
|
||||||
r.result = b
|
|
||||||
r.status = result.Status()
|
r.status = result.Status()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r Runner) Result() []byte {
|
func (r *Runner) ReadOut() []byte {
|
||||||
return r.result
|
return r.out.Bytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r Runner) ReadErr() []byte {
|
||||||
|
return r.err.Bytes()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r Runner) Status() string {
|
func (r Runner) Status() string {
|
||||||
return r.status
|
return r.status
|
||||||
}
|
}
|
||||||
|
|
||||||
func ensureLogFile(job *titan_models.Job) (*os.File, error) {
|
|
||||||
log, err := ioutil.TempFile("", fmt.Sprintf("titan-log-%s", job.ID))
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("couldn't open task log for writing: %v", err)
|
|
||||||
}
|
|
||||||
return log, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func selectDriver(env *common.Environment, conf *agent.Config) (drivers.Driver, error) {
|
func selectDriver(env *common.Environment, conf *agent.Config) (drivers.Driver, error) {
|
||||||
switch conf.Driver {
|
switch conf.Driver {
|
||||||
case "docker":
|
case "docker":
|
||||||
|
|||||||
38
api/runner/task.go
Normal file
38
api/runner/task.go
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
package runner
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
|
||||||
|
dockercli "github.com/fsouza/go-dockerclient"
|
||||||
|
"github.com/iron-io/titan/runner/tasker"
|
||||||
|
)
|
||||||
|
|
||||||
|
type containerTask struct {
|
||||||
|
auth tasker.Auther
|
||||||
|
stdout io.Writer
|
||||||
|
stderr io.Writer
|
||||||
|
cfg *Config
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *containerTask) Command() string { return "" }
|
||||||
|
|
||||||
|
func (t *containerTask) EnvVars() map[string]string {
|
||||||
|
env := map[string]string{
|
||||||
|
"PAYLOAD": t.cfg.Payload,
|
||||||
|
}
|
||||||
|
return env
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *containerTask) Id() string { return "" }
|
||||||
|
func (t *containerTask) Group() string { return "" }
|
||||||
|
func (t *containerTask) Image() string { return t.cfg.Route.Image }
|
||||||
|
func (t *containerTask) Timeout() uint { return uint(t.cfg.Timeout.Seconds()) }
|
||||||
|
func (t *containerTask) Logger() (stdout, stderr io.Writer) { return t.stdout, t.stderr }
|
||||||
|
func (t *containerTask) Volumes() [][2]string { return [][2]string{} }
|
||||||
|
func (t *containerTask) WorkDir() string { return "" }
|
||||||
|
|
||||||
|
func (t *containerTask) Close() {}
|
||||||
|
|
||||||
|
func (t *containerTask) DockerAuth() []dockercli.AuthConfiguration {
|
||||||
|
return t.auth.Auth(t.Image())
|
||||||
|
}
|
||||||
@@ -61,13 +61,14 @@ func handleRunner(c *gin.Context) {
|
|||||||
for _, el := range routes {
|
for _, el := range routes {
|
||||||
if el.Path == route {
|
if el.Path == route {
|
||||||
run := runner.New(&runner.Config{
|
run := runner.New(&runner.Config{
|
||||||
|
Ctx: c,
|
||||||
Route: el,
|
Route: el,
|
||||||
Endpoint: config.API,
|
Endpoint: config.API,
|
||||||
Payload: string(payload),
|
Payload: string(payload),
|
||||||
Timeout: 30 * time.Second,
|
Timeout: 30 * time.Second,
|
||||||
})
|
})
|
||||||
|
|
||||||
if err := run.Start(); err != nil {
|
if err := run.Run(); err != nil {
|
||||||
log.WithError(err).Error(models.ErrRunnerRunRoute)
|
log.WithError(err).Error(models.ErrRunnerRunRoute)
|
||||||
c.JSON(http.StatusInternalServerError, simpleError(models.ErrRunnerRunRoute))
|
c.JSON(http.StatusInternalServerError, simpleError(models.ErrRunnerRunRoute))
|
||||||
} else {
|
} else {
|
||||||
@@ -76,9 +77,9 @@ func handleRunner(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if run.Status() == "success" {
|
if run.Status() == "success" {
|
||||||
c.Data(http.StatusOK, "", bytes.Trim(run.Result(), "\x00"))
|
c.Data(http.StatusOK, "", bytes.Trim(run.ReadOut(), "\x00"))
|
||||||
} else {
|
} else {
|
||||||
c.Data(http.StatusInternalServerError, "", bytes.Trim(run.Result(), "\x00"))
|
c.Data(http.StatusInternalServerError, "", bytes.Trim(run.ReadErr(), "\x00"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
|||||||
Reference in New Issue
Block a user