[Feature] Function status

This commit is contained in:
Denis Makogon
2017-06-06 14:12:50 -07:00
parent 6334f44a72
commit 3f065ce6bf
29 changed files with 1165 additions and 564 deletions

View File

@@ -88,14 +88,14 @@ func deleteTask(url string, task *models.Task) error {
}
// RunAsyncRunner pulls tasks off a queue and processes them
func RunAsyncRunner(ctx context.Context, tasksrv string, tasks chan task.Request, rnr *Runner) {
func RunAsyncRunner(ctx context.Context, tasksrv string, tasks chan task.Request, rnr *Runner, ds models.Datastore) {
u := tasksrvURL(tasksrv)
startAsyncRunners(ctx, u, tasks, rnr)
startAsyncRunners(ctx, u, tasks, rnr, ds)
<-ctx.Done()
}
func startAsyncRunners(ctx context.Context, url string, tasks chan task.Request, rnr *Runner) {
func startAsyncRunners(ctx context.Context, url string, tasks chan task.Request, rnr *Runner, ds models.Datastore) {
var wg sync.WaitGroup
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"runner": "async"})
for {
@@ -129,23 +129,24 @@ func startAsyncRunners(ctx context.Context, url string, tasks chan task.Request,
log.Debug("Running task:", task.ID)
wg.Add(1)
go func() {
defer wg.Done()
// Process Task
if _, err := RunTask(tasks, ctx, getCfg(task)); err != nil {
_, err := RunTrackedTask(task, tasks, ctx, getCfg(task), ds)
if err != nil {
log.WithError(err).Error("Cannot run task")
}
log.Debug("Processed task")
}()
log.Debug("Processed task")
// Delete task from queue
if err := deleteTask(url, task); err != nil {
log.WithError(err).Error("Cannot delete task")
continue
}
log.Info("Task complete")
log.Info("Task complete")
}
}
}

View File

@@ -18,6 +18,8 @@ import (
"gitlab-odx.oracle.com/odx/functions/api/models"
"gitlab-odx.oracle.com/odx/functions/api/mqs"
"gitlab-odx.oracle.com/odx/functions/api/runner/task"
"gitlab-odx.oracle.com/odx/functions/api/datastore"
"gitlab-odx.oracle.com/odx/functions/api/runner/drivers"
)
func setLogBuffer() *bytes.Buffer {
@@ -198,6 +200,15 @@ func testRunner(t *testing.T) (*Runner, context.CancelFunc) {
return r, cancel
}
type RunResult struct {
drivers.RunResult
}
func (r RunResult) Status() string {
return "success"
}
func TestAsyncRunnersGracefulShutdown(t *testing.T) {
buf := setLogBuffer()
mockTask := getMockTask()
@@ -211,16 +222,15 @@ func TestAsyncRunnersGracefulShutdown(t *testing.T) {
go func() {
for t := range tasks {
t.Response <- task.Response{
Result: nil,
Result: RunResult{},
Err: nil,
}
}
}()
rnr, cancel := testRunner(t)
defer cancel()
startAsyncRunners(ctx, ts.URL+"/tasks", tasks, rnr)
startAsyncRunners(ctx, ts.URL+"/tasks", tasks, rnr, datastore.NewMock())
if err := ctx.Err(); err != context.DeadlineExceeded {
t.Log(buf.String())

View File

@@ -216,7 +216,7 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask
// NOTE: this is hyper-sensitive and may not be correct like this even, but it passes old tests
// task.Command() in swapi is always "sh /mnt/task/.runtask" so fields is safe
cmd = strings.Fields(task.Command())
logrus.WithFields(logrus.Fields{"task_id": task.Id(), "cmd": cmd, "len": len(cmd)}).Debug("docker command")
logrus.WithFields(logrus.Fields{"call_id": task.Id(), "cmd": cmd, "len": len(cmd)}).Debug("docker command")
}
envvars := make([]string, 0, len(task.EnvVars()))
@@ -251,11 +251,11 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask
container.Config.Volumes[containerDir] = struct{}{}
mapn := fmt.Sprintf("%s:%s", hostDir, containerDir)
container.HostConfig.Binds = append(container.HostConfig.Binds, mapn)
logrus.WithFields(logrus.Fields{"volumes": mapn, "task_id": task.Id()}).Debug("setting volumes")
logrus.WithFields(logrus.Fields{"volumes": mapn, "call_id": task.Id()}).Debug("setting volumes")
}
if wd := task.WorkDir(); wd != "" {
logrus.WithFields(logrus.Fields{"wd": wd, "task_id": task.Id()}).Debug("setting work dir")
logrus.WithFields(logrus.Fields{"wd": wd, "call_id": task.Id()}).Debug("setting work dir")
container.Config.WorkingDir = wd
}
@@ -270,7 +270,7 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask
if err != nil {
// since we retry under the hood, if the container gets created and retry fails, we can just ignore error
if err != docker.ErrContainerAlreadyExists {
logrus.WithFields(logrus.Fields{"task_id": task.Id(), "command": container.Config.Cmd, "memory": container.Config.Memory,
logrus.WithFields(logrus.Fields{"call_id": task.Id(), "command": container.Config.Cmd, "memory": container.Config.Memory,
"cpu_shares": container.Config.CPUShares, "hostname": container.Config.Hostname, "name": container.Name,
"image": container.Config.Image, "volumes": container.Config.Volumes, "binds": container.HostConfig.Binds, "container": containerName,
}).WithError(err).Error("Could not create container")
@@ -473,7 +473,7 @@ func (drv *DockerDriver) collectStats(ctx context.Context, container string, tas
})
if err != nil && err != io.ErrClosedPipe {
logrus.WithError(err).WithFields(logrus.Fields{"container": container, "task_id": task.Id()}).Error("error streaming docker stats for task")
logrus.WithError(err).WithFields(logrus.Fields{"container": container, "call_id": task.Id()}).Error("error streaming docker stats for task")
}
}()

View File

@@ -10,9 +10,11 @@ import (
"github.com/Sirupsen/logrus"
uuid "github.com/satori/go.uuid"
"gitlab-odx.oracle.com/odx/functions/api/models"
"gitlab-odx.oracle.com/odx/functions/api/runner/drivers"
"gitlab-odx.oracle.com/odx/functions/api/runner/protocol"
"gitlab-odx.oracle.com/odx/functions/api/runner/task"
"github.com/go-openapi/strfmt"
)
// hot functions - theory of operation
@@ -62,6 +64,25 @@ import (
// Terminate
// (internal clock)
// RunTrackedTask is just a wrapper for shared logic for async/sync runners
func RunTrackedTask(newTask *models.Task, tasks chan task.Request, ctx context.Context, cfg *task.Config, ds models.Datastore) (drivers.RunResult, error) {
startedAt := strfmt.DateTime(time.Now())
newTask.StartedAt = startedAt
result, err := RunTask(tasks, ctx, cfg)
completedAt := strfmt.DateTime(time.Now())
status := result.Status()
newTask.CompletedAt = completedAt
newTask.Status = status
err = ds.InsertTask(ctx, newTask)
return result, err
}
// RunTask helps sending a task.Request into the common concurrency stream.
// Refer to StartWorkers() to understand what this is about.
func RunTask(tasks chan task.Request, ctx context.Context, cfg *task.Config) (drivers.RunResult, error) {