mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
refactor runner
This commit is contained in:
@@ -1,8 +1,8 @@
|
||||
package runner
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
@@ -24,24 +24,15 @@ type Config struct {
|
||||
Timeout time.Duration
|
||||
RequestURL string
|
||||
AppName string
|
||||
Stdout io.Writer
|
||||
Stderr io.Writer
|
||||
}
|
||||
|
||||
type Runner struct {
|
||||
cfg *Config
|
||||
status string
|
||||
out bytes.Buffer
|
||||
err bytes.Buffer
|
||||
driver drivers.Driver
|
||||
}
|
||||
|
||||
func New(cfg *Config) *Runner {
|
||||
return &Runner{
|
||||
cfg: cfg,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Runner) Run() error {
|
||||
var err error
|
||||
|
||||
func New() (*Runner, error) {
|
||||
// TODO: Is this really required for Titan's driver?
|
||||
// Can we remove it?
|
||||
env := common.NewEnvironment(func(e *common.Environment) {})
|
||||
@@ -49,36 +40,28 @@ func (r *Runner) Run() error {
|
||||
// TODO: Create a drivers.New(runnerConfig) in Titan
|
||||
driver, err := selectDriver("docker", env, &driverscommon.Config{})
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Runner{
|
||||
driver: driver,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *Runner) Run(ctx context.Context, cfg *Config) (drivers.RunResult, error) {
|
||||
var err error
|
||||
|
||||
ctask := &containerTask{
|
||||
cfg: r.cfg,
|
||||
cfg: cfg,
|
||||
auth: &agent.ConfigAuth{},
|
||||
stdout: &r.out,
|
||||
stderr: &r.err,
|
||||
}
|
||||
|
||||
result, err := driver.Run(r.cfg.Ctx, ctask)
|
||||
result, err := r.driver.Run(ctx, ctask)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r.status = result.Status()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner) ReadOut() []byte {
|
||||
return r.out.Bytes()
|
||||
}
|
||||
|
||||
func (r Runner) ReadErr() []byte {
|
||||
return r.err.Bytes()
|
||||
}
|
||||
|
||||
func (r Runner) Status() string {
|
||||
return r.status
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func selectDriver(driver string, env *common.Environment, conf *driverscommon.Config) (drivers.Driver, error) {
|
||||
|
||||
@@ -11,6 +11,13 @@ import (
|
||||
)
|
||||
|
||||
func TestRunnerHello(t *testing.T) {
|
||||
runner, err := New()
|
||||
if err != nil {
|
||||
t.Fatalf("Test error during New() - %s", err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
for i, test := range []struct {
|
||||
route *models.Route
|
||||
payload string
|
||||
@@ -21,33 +28,43 @@ func TestRunnerHello(t *testing.T) {
|
||||
{&models.Route{Image: "iron/hello"}, ``, "success", "Hello World!", ""},
|
||||
{&models.Route{Image: "iron/hello"}, `{"name": "test"}`, "success", "Hello test!", ""},
|
||||
} {
|
||||
runner := New(&Config{
|
||||
var stdout, stderr bytes.Buffer
|
||||
cfg := &Config{
|
||||
ID: fmt.Sprintf("task-hello-%d-%d", i, time.Now().Unix()),
|
||||
Ctx: context.Background(),
|
||||
Route: test.route,
|
||||
Timeout: 5 * time.Second,
|
||||
Payload: test.payload,
|
||||
})
|
||||
Stdout: &stdout,
|
||||
Stderr: &stderr,
|
||||
}
|
||||
|
||||
if err := runner.Run(); err != nil {
|
||||
result, err := runner.Run(ctx, cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("Test %d: error during Run() - %s", i, err)
|
||||
}
|
||||
|
||||
if test.expectedStatus != runner.Status() {
|
||||
t.Fatalf("Test %d: expected result status to be `%s` but it was `%s`", i, test.expectedStatus, runner.Status())
|
||||
if test.expectedStatus != result.Status() {
|
||||
t.Fatalf("Test %d: expected result status to be `%s` but it was `%s`", i, test.expectedStatus, result.Status())
|
||||
}
|
||||
|
||||
if !bytes.Contains(runner.ReadOut(), []byte(test.expectedOut)) {
|
||||
t.Fatalf("Test %d: expected output log to contain `%s` in `%s`", i, test.expectedOut, runner.ReadOut())
|
||||
if !bytes.Contains(stdout.Bytes(), []byte(test.expectedOut)) {
|
||||
t.Fatalf("Test %d: expected output log to contain `%s` in `%s`", i, test.expectedOut, stdout.String())
|
||||
}
|
||||
|
||||
if !bytes.Contains(runner.ReadErr(), []byte(test.expectedErr)) {
|
||||
t.Fatalf("Test %d: expected error log to contain `%s` in `%s`", i, test.expectedErr, runner.ReadErr())
|
||||
if !bytes.Contains(stderr.Bytes(), []byte(test.expectedErr)) {
|
||||
t.Fatalf("Test %d: expected error log to contain `%s` in `%s`", i, test.expectedErr, stderr.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunnerError(t *testing.T) {
|
||||
runner, err := New()
|
||||
if err != nil {
|
||||
t.Fatalf("Test error during New() - %s", err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
for i, test := range []struct {
|
||||
route *models.Route
|
||||
payload string
|
||||
@@ -58,28 +75,31 @@ func TestRunnerError(t *testing.T) {
|
||||
{&models.Route{Image: "iron/error"}, ``, "error", "", "RuntimeError"},
|
||||
{&models.Route{Image: "iron/error"}, `{"name": "test"}`, "error", "", "RuntimeError"},
|
||||
} {
|
||||
runner := New(&Config{
|
||||
ID: fmt.Sprintf("task-error-%d-%d", i, time.Now().Unix()),
|
||||
Ctx: context.Background(),
|
||||
var stdout, stderr bytes.Buffer
|
||||
cfg := &Config{
|
||||
ID: fmt.Sprintf("task-err-%d-%d", i, time.Now().Unix()),
|
||||
Route: test.route,
|
||||
Timeout: 5 * time.Second,
|
||||
Payload: test.payload,
|
||||
})
|
||||
Stdout: &stdout,
|
||||
Stderr: &stderr,
|
||||
}
|
||||
|
||||
if err := runner.Run(); err != nil {
|
||||
result, err := runner.Run(ctx, cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("Test %d: error during Run() - %s", i, err)
|
||||
}
|
||||
|
||||
if test.expectedStatus != runner.Status() {
|
||||
t.Fatalf("Test %d: expected result status to be `%s` but it was `%s`", i, test.expectedStatus, runner.Status())
|
||||
if test.expectedStatus != result.Status() {
|
||||
t.Fatalf("Test %d: expected result status to be `%s` but it was `%s`", i, test.expectedStatus, result.Status())
|
||||
}
|
||||
|
||||
if !bytes.Contains(runner.ReadOut(), []byte(test.expectedOut)) {
|
||||
t.Fatalf("Test %d: expected output log to contain `%s` in `%s`", i, test.expectedOut, runner.ReadOut())
|
||||
if !bytes.Contains(stdout.Bytes(), []byte(test.expectedOut)) {
|
||||
t.Fatalf("Test %d: expected output log to contain `%s` in `%s`", i, test.expectedOut, stdout.String())
|
||||
}
|
||||
|
||||
if !bytes.Contains(runner.ReadErr(), []byte(test.expectedErr)) {
|
||||
t.Fatalf("Test %d: expected error log to contain `%s` in `%s`", i, test.expectedErr, runner.ReadErr())
|
||||
if !bytes.Contains(stderr.Bytes(), []byte(test.expectedErr)) {
|
||||
t.Fatalf("Test %d: expected error log to contain `%s` in `%s`", i, test.expectedErr, stderr.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,8 +9,6 @@ import (
|
||||
|
||||
type containerTask struct {
|
||||
auth tasker.Auther
|
||||
stdout io.Writer
|
||||
stderr io.Writer
|
||||
cfg *Config
|
||||
}
|
||||
|
||||
@@ -34,7 +32,7 @@ func (t *containerTask) Id() string { return t.cfg.ID }
|
||||
func (t *containerTask) Group() string { return "" }
|
||||
func (t *containerTask) Image() string { return t.cfg.Route.Image }
|
||||
func (t *containerTask) Timeout() uint { return uint(t.cfg.Timeout.Seconds()) }
|
||||
func (t *containerTask) Logger() (stdout, stderr io.Writer) { return t.stdout, t.stderr }
|
||||
func (t *containerTask) Logger() (stdout, stderr io.Writer) { return t.cfg.Stdout, t.cfg.Stderr }
|
||||
func (t *containerTask) Volumes() [][2]string { return [][2]string{} }
|
||||
func (t *containerTask) WorkDir() string { return "" }
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
)
|
||||
|
||||
func TestAppCreate(t *testing.T) {
|
||||
New(&datastore.Mock{}, &models.Config{})
|
||||
New(&models.Config{}, &datastore.Mock{}, testRunner(t))
|
||||
router := testRouter()
|
||||
|
||||
for i, test := range []struct {
|
||||
@@ -52,7 +52,7 @@ func TestAppCreate(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAppDelete(t *testing.T) {
|
||||
New(&datastore.Mock{}, &models.Config{})
|
||||
New(&models.Config{}, &datastore.Mock{}, testRunner(t))
|
||||
router := testRouter()
|
||||
|
||||
for i, test := range []struct {
|
||||
@@ -83,7 +83,7 @@ func TestAppDelete(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAppList(t *testing.T) {
|
||||
New(&datastore.Mock{}, &models.Config{})
|
||||
New(&models.Config{}, &datastore.Mock{}, testRunner(t))
|
||||
router := testRouter()
|
||||
|
||||
for i, test := range []struct {
|
||||
@@ -113,7 +113,7 @@ func TestAppList(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAppGet(t *testing.T) {
|
||||
New(&datastore.Mock{}, &models.Config{})
|
||||
New(&models.Config{}, &datastore.Mock{}, testRunner(t))
|
||||
router := testRouter()
|
||||
|
||||
for i, test := range []struct {
|
||||
@@ -143,7 +143,7 @@ func TestAppGet(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAppUpdate(t *testing.T) {
|
||||
New(&datastore.Mock{}, &models.Config{})
|
||||
New(&models.Config{}, &datastore.Mock{}, testRunner(t))
|
||||
router := testRouter()
|
||||
|
||||
for i, test := range []struct {
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/iron-io/functions/api/models"
|
||||
"github.com/iron-io/functions/api/runner"
|
||||
titancommon "github.com/iron-io/titan/common"
|
||||
)
|
||||
|
||||
@@ -47,6 +48,14 @@ func testRouter() *gin.Engine {
|
||||
return r
|
||||
}
|
||||
|
||||
func testRunner(t *testing.T) *runner.Runner {
|
||||
r, err := runner.New()
|
||||
if err != nil {
|
||||
t.Fatal("Test: failed to create new runner")
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
func routerRequest(t *testing.T, router *gin.Engine, method, path string, body io.Reader) (*http.Request, *httptest.ResponseRecorder) {
|
||||
req, err := http.NewRequest(method, "http://localhost:8080"+path, body)
|
||||
if err != nil {
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
)
|
||||
|
||||
func TestRouteCreate(t *testing.T) {
|
||||
New(&datastore.Mock{}, &models.Config{})
|
||||
New(&models.Config{}, &datastore.Mock{}, testRunner(t))
|
||||
router := testRouter()
|
||||
|
||||
for i, test := range []struct {
|
||||
@@ -52,7 +52,7 @@ func TestRouteCreate(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRouteDelete(t *testing.T) {
|
||||
New(&datastore.Mock{}, &models.Config{})
|
||||
New(&models.Config{}, &datastore.Mock{}, testRunner(t))
|
||||
router := testRouter()
|
||||
|
||||
for i, test := range []struct {
|
||||
@@ -83,7 +83,7 @@ func TestRouteDelete(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRouteList(t *testing.T) {
|
||||
New(&datastore.Mock{}, &models.Config{})
|
||||
New(&models.Config{}, &datastore.Mock{}, testRunner(t))
|
||||
router := testRouter()
|
||||
|
||||
for i, test := range []struct {
|
||||
@@ -113,7 +113,7 @@ func TestRouteList(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRouteGet(t *testing.T) {
|
||||
New(&datastore.Mock{}, &models.Config{})
|
||||
New(&models.Config{}, &datastore.Mock{}, testRunner(t))
|
||||
router := testRouter()
|
||||
|
||||
for i, test := range []struct {
|
||||
@@ -143,7 +143,7 @@ func TestRouteGet(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRouteUpdate(t *testing.T) {
|
||||
New(&datastore.Mock{}, &models.Config{})
|
||||
New(&models.Config{}, &datastore.Mock{}, testRunner(t))
|
||||
router := testRouter()
|
||||
|
||||
for i, test := range []struct {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
@@ -81,7 +82,6 @@ func handleRunner(c *gin.Context) {
|
||||
c.JSON(http.StatusBadRequest, simpleError(models.ErrAppsNotFound))
|
||||
return
|
||||
}
|
||||
|
||||
route := c.Param("route")
|
||||
if route == "" {
|
||||
route = c.Request.URL.Path
|
||||
@@ -109,17 +109,19 @@ func handleRunner(c *gin.Context) {
|
||||
log.WithField("routes", routes).Debug("Got routes from datastore")
|
||||
for _, el := range routes {
|
||||
if el.Path == route {
|
||||
run := runner.New(&runner.Config{
|
||||
Ctx: c,
|
||||
var stdout, stderr bytes.Buffer
|
||||
cfg := &runner.Config{
|
||||
Route: el,
|
||||
Payload: string(payload),
|
||||
Timeout: 30 * time.Second,
|
||||
ID: reqID,
|
||||
RequestURL: c.Request.URL.String(),
|
||||
AppName: appName,
|
||||
})
|
||||
Stdout: &stdout,
|
||||
Stderr: &stderr,
|
||||
}
|
||||
|
||||
if err := run.Run(); err != nil {
|
||||
if result, err := Api.Runner.Run(c, cfg); err != nil {
|
||||
log.WithError(err).Error(models.ErrRunnerRunRoute)
|
||||
c.JSON(http.StatusInternalServerError, simpleError(models.ErrRunnerRunRoute))
|
||||
} else {
|
||||
@@ -127,10 +129,11 @@ func handleRunner(c *gin.Context) {
|
||||
c.Header(k, v[0])
|
||||
}
|
||||
|
||||
if run.Status() == "success" {
|
||||
c.Data(http.StatusOK, "", run.ReadOut())
|
||||
if result.Status() == "success" {
|
||||
c.Data(http.StatusOK, "", stdout.Bytes())
|
||||
} else {
|
||||
c.Data(http.StatusInternalServerError, "", run.ReadErr())
|
||||
log.WithFields(logrus.Fields{"app": appName, "route": el, "req_id": reqID}).Debug(stderr.String())
|
||||
c.AbortWithStatus(http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
return
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
)
|
||||
|
||||
func TestRouteRunnerGet(t *testing.T) {
|
||||
New(&datastore.Mock{}, &models.Config{})
|
||||
New(&models.Config{}, &datastore.Mock{}, testRunner(t))
|
||||
router := testRouter()
|
||||
|
||||
for i, test := range []struct {
|
||||
@@ -45,7 +45,7 @@ func TestRouteRunnerGet(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRouteRunnerPost(t *testing.T) {
|
||||
New(&datastore.Mock{}, &models.Config{})
|
||||
New(&models.Config{}, &datastore.Mock{}, testRunner(t))
|
||||
router := testRouter()
|
||||
|
||||
for i, test := range []struct {
|
||||
@@ -81,12 +81,12 @@ func TestRouteRunnerPost(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRouteRunnerExecution(t *testing.T) {
|
||||
New(&datastore.Mock{
|
||||
New(&models.Config{}, &datastore.Mock{
|
||||
FakeRoutes: []*models.Route{
|
||||
{Path: "/myroute", Image: "iron/hello", Headers: map[string][]string{"X-Function": []string{"Test"}}},
|
||||
{Path: "/myerror", Image: "iron/error", Headers: map[string][]string{"X-Function": []string{"Test"}}},
|
||||
},
|
||||
}, &models.Config{})
|
||||
}, testRunner(t))
|
||||
router := testRouter()
|
||||
|
||||
for i, test := range []struct {
|
||||
@@ -97,6 +97,10 @@ func TestRouteRunnerExecution(t *testing.T) {
|
||||
}{
|
||||
{"/r/myapp/myroute", ``, http.StatusOK, map[string][]string{"X-Function": []string{"Test"}}},
|
||||
{"/r/myapp/myerror", ``, http.StatusInternalServerError, map[string][]string{"X-Function": []string{"Test"}}},
|
||||
|
||||
// Added same tests again to check if time is reduced by the auth cache
|
||||
{"/r/myapp/myroute", ``, http.StatusOK, map[string][]string{"X-Function": []string{"Test"}}},
|
||||
{"/r/myapp/myerror", ``, http.StatusInternalServerError, map[string][]string{"X-Function": []string{"Test"}}},
|
||||
} {
|
||||
body := bytes.NewBuffer([]byte(test.body))
|
||||
_, rec := routerRequest(t, router, "GET", test.path, body)
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/iron-io/functions/api/ifaces"
|
||||
"github.com/iron-io/functions/api/models"
|
||||
"github.com/iron-io/functions/api/runner"
|
||||
titancommon "github.com/iron-io/titan/common"
|
||||
)
|
||||
|
||||
@@ -17,6 +18,7 @@ import (
|
||||
var Api *Server
|
||||
|
||||
type Server struct {
|
||||
Runner *runner.Runner
|
||||
Router *gin.Engine
|
||||
Config *models.Config
|
||||
Datastore models.Datastore
|
||||
@@ -24,11 +26,12 @@ type Server struct {
|
||||
SpecialHandlers []ifaces.SpecialHandler
|
||||
}
|
||||
|
||||
func New(ds models.Datastore, config *models.Config) *Server {
|
||||
func New(c *models.Config, ds models.Datastore, r *runner.Runner) *Server {
|
||||
Api = &Server{
|
||||
Router: gin.Default(),
|
||||
Config: config,
|
||||
Config: c,
|
||||
Datastore: ds,
|
||||
Runner: r,
|
||||
}
|
||||
return Api
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ func TestFullStack(t *testing.T) {
|
||||
ds, close := prepareBolt(t)
|
||||
defer close()
|
||||
|
||||
New(ds, &models.Config{})
|
||||
New(&models.Config{}, ds, testRunner(t))
|
||||
router := testRouter()
|
||||
|
||||
for i, test := range []struct {
|
||||
|
||||
8
main.go
8
main.go
@@ -5,6 +5,7 @@ import (
|
||||
"github.com/iron-io/functions/api/config"
|
||||
"github.com/iron-io/functions/api/datastore"
|
||||
"github.com/iron-io/functions/api/models"
|
||||
"github.com/iron-io/functions/api/runner"
|
||||
"github.com/iron-io/functions/api/server"
|
||||
"github.com/spf13/viper"
|
||||
"golang.org/x/net/context"
|
||||
@@ -26,6 +27,11 @@ func main() {
|
||||
log.WithError(err).Fatalln("Invalid DB url.")
|
||||
}
|
||||
|
||||
srv := server.New(ds, c)
|
||||
runner, err := runner.New()
|
||||
if err != nil {
|
||||
log.WithError(err).Fatalln("Failed to create a runner")
|
||||
}
|
||||
|
||||
srv := server.New(c, ds, runner)
|
||||
srv.Run(ctx)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user