Updated README and simplified/cleaned up some code.

This commit is contained in:
Travis Reeder
2016-10-12 01:35:44 -07:00
parent e85c7560c3
commit 25f582b180
11 changed files with 182 additions and 84 deletions

View File

@@ -48,11 +48,13 @@ type Task struct {
*/
Error string `json:"error,omitempty"`
/* Route this task belongs to.
/* App this task belongs to.
Read Only: true
*/
RouteName string `json:"route_name,omitempty"`
AppName string `json:"route_name,omitempty"`
Path string `json:"path"`
/* Machine usable reason for task being in this state.
Valid values for error status are `timeout | killed | bad_exit`.

View File

@@ -12,12 +12,16 @@ import (
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/Sirupsen/logrus"
"github.com/iron-io/functions/api/models"
"github.com/iron-io/runner/common"
"github.com/iron-io/runner/drivers"
)
func getTask(url string) (*models.Task, error) {
func getTask(ctx context.Context, url string) (*models.Task, error) {
// log := common.Logger(ctx)
// log.Infoln("Getting task from URL:", url)
resp, err := http.Get(url)
if err != nil {
return nil, err
@@ -41,8 +45,8 @@ func getTask(url string) (*models.Task, error) {
}
func getCfg(task *models.Task) *Config {
var stdout bytes.Buffer // TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader
stderr := NewFuncLogger(task.RouteName, "", *task.Image, task.ID) // TODO: missing path here, how do i get that?
// TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader
stderr := NewFuncLogger(task.AppName, task.Path, *task.Image, task.ID) // TODO: missing path here, how do i get that?
if task.Timeout == nil {
timeout := int32(30)
task.Timeout = &timeout
@@ -51,8 +55,8 @@ func getCfg(task *models.Task) *Config {
Image: *task.Image,
Timeout: time.Duration(*task.Timeout) * time.Second,
ID: task.ID,
AppName: task.RouteName,
Stdout: &stdout,
AppName: task.AppName,
Stdout: stderr,
Stderr: stderr,
Env: task.EnvVars,
}
@@ -84,10 +88,9 @@ func deleteTask(url string, task *models.Task) error {
return nil
}
func runTask(task *models.Task) (drivers.RunResult, error) {
func runTask(ctx context.Context, task *models.Task) (drivers.RunResult, error) {
// Set up runner and process task
cfg := getCfg(task)
ctx := context.Background()
rnr, err := New(NewMetricLogger())
if err != nil {
return nil, err
@@ -96,8 +99,8 @@ func runTask(task *models.Task) (drivers.RunResult, error) {
}
// RunAsyncRunner pulls tasks off a queue and processes them
func RunAsyncRunner(ctx context.Context, tasksrv, port string, n int) {
u, h := tasksrvURL(tasksrv, port)
func RunAsyncRunner(ctx context.Context, tasksrv string, n int) {
u, h := tasksrvURL(tasksrv)
if isHostOpen(h) {
return
}
@@ -105,7 +108,7 @@ func RunAsyncRunner(ctx context.Context, tasksrv, port string, n int) {
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go startAsyncRunners(ctx, &wg, i, u, runTask)
go startAsyncRunners(ctx, &wg, i, u)
}
wg.Wait()
@@ -121,7 +124,8 @@ func isHostOpen(host string) bool {
return available
}
func startAsyncRunners(ctx context.Context, wg *sync.WaitGroup, i int, url string, runTask func(task *models.Task) (drivers.RunResult, error)) {
func startAsyncRunners(ctx context.Context, wg *sync.WaitGroup, i int, url string) {
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"async_runner": i})
defer wg.Done()
for {
select {
@@ -129,8 +133,12 @@ func startAsyncRunners(ctx context.Context, wg *sync.WaitGroup, i int, url strin
return
default:
task, err := getTask(url)
task, err := getTask(ctx, url)
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
log.Infoln("Could not fetch task, timeout. Probably no tasks to run.")
continue
}
log.WithError(err).Error("Could not fetch task")
time.Sleep(1 * time.Second)
continue
@@ -139,33 +147,36 @@ func startAsyncRunners(ctx context.Context, wg *sync.WaitGroup, i int, url strin
time.Sleep(1 * time.Second)
continue
}
log.Debug("Picked up task:", task.ID)
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": task.ID})
log.Debug("Running task:", task.ID)
// Process Task
if _, err := runTask(task); err != nil {
log.WithError(err).WithFields(log.Fields{"async runner": i, "task_id": task.ID}).Error("Cannot run task")
if _, err := runTask(ctx, task); err != nil {
log.WithError(err).Error("Cannot run task")
continue
}
log.Debug("Processed task:", task.ID)
log.Debug("Processed task")
// Delete task from queue
if err := deleteTask(url, task); err != nil {
log.WithError(err).WithFields(log.Fields{"async runner": i, "task_id": task.ID}).Error("Cannot delete task")
log.WithError(err).Error("Cannot delete task")
continue
}
log.Debug("Deleted task:", task.ID)
log.Info("Task complete:", task.ID)
log.Info("Task complete:")
}
}
}
func tasksrvURL(tasksrv, port string) (parsedURL, host string) {
func tasksrvURL(tasksrv string) (parsedURL, host string) {
parsed, err := url.Parse(tasksrv)
if err != nil {
log.Fatalf("cannot parse TASKSRV endpoint: %v", err)
logrus.WithError(err).Fatalln("cannot parse TASKSRV endpoint")
}
// host, port, err := net.SplitHostPort(parsed.Host)
// if err != nil {
// log.WithError(err).Fatalln("net.SplitHostPort")
// }
if parsed.Scheme == "" {
parsed.Scheme = "http"
@@ -175,9 +186,9 @@ func tasksrvURL(tasksrv, port string) (parsedURL, host string) {
parsed.Path = "/tasks"
}
if _, _, err := net.SplitHostPort(parsed.Host); err != nil {
parsed.Host = net.JoinHostPort(parsed.Host, port)
}
// if _, _, err := net.SplitHostPort(parsed.Host); err != nil {
// parsed.Host = net.JoinHostPort(parsed.Host, parsed)
// }
return parsed.String(), parsed.Host
}

View File

@@ -37,7 +37,7 @@ func getMockTask() models.Task {
task := &models.Task{}
task.Image = &image
task.ID = fmt.Sprintf("ID-%d", rand.Int31()%1000)
task.RouteName = fmt.Sprintf("RouteName-%d", rand.Int31()%1000)
task.AppName = fmt.Sprintf("RouteName-%d", rand.Int31()%1000)
task.Priority = &priority
return *task
}

View File

@@ -19,7 +19,7 @@ func NewFuncLogger(appName, path, function, requestID string) io.Writer {
r: r,
w: w,
}
log := logrus.WithFields(logrus.Fields{"user_log": true, "app_name": appName, "path": path, "function": function, "request_id": requestID})
log := logrus.WithFields(logrus.Fields{"user_log": true, "app_name": appName, "path": path, "function": function, "call_id": requestID})
go func(reader io.Reader) {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {

View File

@@ -27,7 +27,8 @@ func setLogBuffer() *bytes.Buffer {
func TestAppCreate(t *testing.T) {
buf := setLogBuffer()
New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t))
router := testRouter()
s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t))
router := testRouter(s)
for i, test := range []struct {
path string
@@ -70,8 +71,8 @@ func TestAppCreate(t *testing.T) {
func TestAppDelete(t *testing.T) {
buf := setLogBuffer()
New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t))
router := testRouter()
s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t))
router := testRouter(s)
for i, test := range []struct {
path string
@@ -104,8 +105,8 @@ func TestAppDelete(t *testing.T) {
func TestAppList(t *testing.T) {
buf := setLogBuffer()
New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t))
router := testRouter()
s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t))
router := testRouter(s)
for i, test := range []struct {
path string
@@ -137,8 +138,8 @@ func TestAppList(t *testing.T) {
func TestAppGet(t *testing.T) {
buf := setLogBuffer()
New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t))
router := testRouter()
s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t))
router := testRouter(s)
for i, test := range []struct {
path string
@@ -170,8 +171,8 @@ func TestAppGet(t *testing.T) {
func TestAppUpdate(t *testing.T) {
buf := setLogBuffer()
New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t))
router := testRouter()
s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t))
router := testRouter(s)
for i, test := range []struct {
path string

90
api/server/helpers.go Normal file
View File

@@ -0,0 +1,90 @@
package server
// TODO: this whole file shouldn't be in a non test file
import (
"context"
"encoding/json"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"github.com/gin-gonic/gin"
"github.com/iron-io/functions/api/models"
"github.com/iron-io/functions/api/runner"
"github.com/iron-io/runner/common"
)
type appResponse struct {
Message string `json:"message"`
App *models.App `json:"app"`
}
type appsResponse struct {
Message string `json:"message"`
Apps models.Apps `json:"apps"`
}
type routeResponse struct {
Message string `json:"message"`
Route *models.Route `json:"route"`
}
type routesResponse struct {
Message string `json:"message"`
Routes models.Routes `json:"routes"`
}
type tasksResponse struct {
Message string `json:"message"`
Task models.Task `json:"tasksResponse"`
}
func testRouter(s *Server) *gin.Engine {
r := gin.Default()
ctx := context.Background()
r.Use(func(c *gin.Context) {
ctx, _ := common.LoggerWithFields(ctx, extractFields(c))
c.Set("ctx", ctx)
c.Next()
})
s.bindHandlers()
return r
}
func testRunner(t *testing.T) *runner.Runner {
r, err := runner.New(runner.NewMetricLogger())
if err != nil {
t.Fatal("Test: failed to create new runner")
}
return r
}
func routerRequest(t *testing.T, router *gin.Engine, method, path string, body io.Reader) (*http.Request, *httptest.ResponseRecorder) {
req, err := http.NewRequest(method, "http://localhost:8080"+path, body)
if err != nil {
t.Fatalf("Test: Could not create %s request to %s: %v", method, path, err)
}
rec := httptest.NewRecorder()
router.ServeHTTP(rec, req)
return req, rec
}
func getErrorResponse(t *testing.T, rec *httptest.ResponseRecorder) models.Error {
respBody, err := ioutil.ReadAll(rec.Body)
if err != nil {
t.Error("Test: Expected not empty response body")
}
var errResp models.Error
err = json.Unmarshal(respBody, &errResp)
if err != nil {
t.Error("Test: Expected response body to be a valid models.Error object")
}
return errResp
}

View File

@@ -42,12 +42,11 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) {
}
ctx := c.MustGet("ctx").(context.Context)
log := common.Logger(ctx)
reqID := uuid.NewV5(uuid.Nil, fmt.Sprintf("%s%s%d", c.Request.RemoteAddr, c.Request.URL.Path, time.Now().Unix())).String()
c.Set("reqID", reqID) // todo: put this in the ctx instead of gin's
ctx, log = common.LoggerWithFields(ctx, logrus.Fields{"call_id": reqID})
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": reqID})
var err error
var payload io.Reader
@@ -164,7 +163,8 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) {
task := &models.Task{}
task.Image = &cfg.Image
task.ID = cfg.ID
task.RouteName = cfg.AppName
task.Path = el.Path
task.AppName = cfg.AppName
task.Priority = &priority
task.EnvVars = cfg.Env
task.Payload = string(pl)

View File

@@ -131,7 +131,6 @@ func extractFields(c *gin.Context) logrus.Fields {
for _, param := range c.Params {
fields[param.Key] = param.Value
}
return fields
}
@@ -142,7 +141,7 @@ func (s *Server) Run(ctx context.Context) {
c.Next()
})
bindHandlers(s.Router, s.handleRunnerRequest, s.handleTaskRequest)
s.bindHandlers()
// By default it serves on :8080 unless a
// PORT environment variable was defined.
@@ -150,8 +149,9 @@ func (s *Server) Run(ctx context.Context) {
<-ctx.Done()
}
func bindHandlers(engine *gin.Engine, reqHandler func(ginC *gin.Context), taskHandler func(ginC *gin.Context)) {
engine.Use(gin.Logger())
func (s *Server) bindHandlers() {
engine := s.Router
engine.GET("/", handlePing)
engine.GET("/version", handleVersion)
@@ -177,9 +177,9 @@ func bindHandlers(engine *gin.Engine, reqHandler func(ginC *gin.Context), taskHa
}
}
engine.DELETE("/tasks", taskHandler)
engine.GET("/tasks", taskHandler)
engine.Any("/r/:app/*route", reqHandler)
engine.DELETE("/tasks", s.handleTaskRequest)
engine.GET("/tasks", s.handleTaskRequest)
engine.Any("/r/:app/*route", s.handleRunnerRequest)
// This final route is used for extensions, see Server.Add
engine.NoRoute(handleSpecial)

View File

@@ -263,10 +263,6 @@ definitions:
allOf:
- type: object
properties:
name:
type: string
description: "Route name"
readOnly: true
app_name:
type: string
description: "App this route belongs to."