functions: hot containers (#332)

* functions: modify datastore to accomodate hot containers support

* functions: protocol between functions and hot containers

* functions: add hot containers clockwork

* fn: add hot containers support
This commit is contained in:
C Cirello
2016-11-28 18:45:35 +01:00
committed by Pedro Nasser
parent d0429c3dfd
commit ac0044f7d9
31 changed files with 809 additions and 170 deletions

View File

@@ -14,6 +14,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/iron-io/functions/api/models"
"github.com/iron-io/functions/api/runner/task"
"github.com/iron-io/runner/common"
)
@@ -40,18 +41,18 @@ func getTask(ctx context.Context, url string) (*models.Task, error) {
return &task, nil
}
func getCfg(task *models.Task) *Config {
// TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader
if task.Timeout == nil {
func getCfg(t *models.Task) *task.Config {
if t.Timeout == nil {
timeout := int32(30)
task.Timeout = &timeout
t.Timeout = &timeout
}
cfg := &Config{
Image: *task.Image,
Timeout: time.Duration(*task.Timeout) * time.Second,
ID: task.ID,
AppName: task.AppName,
Env: task.EnvVars,
cfg := &task.Config{
Image: *t.Image,
Timeout: time.Duration(*t.Timeout) * time.Second,
ID: t.ID,
AppName: t.AppName,
Env: t.EnvVars,
}
return cfg
}
@@ -82,14 +83,14 @@ func deleteTask(url string, task *models.Task) error {
}
// RunAsyncRunner pulls tasks off a queue and processes them
func RunAsyncRunner(ctx context.Context, tasksrv string, tasks chan TaskRequest, rnr *Runner) {
func RunAsyncRunner(ctx context.Context, tasksrv string, tasks chan task.Request, rnr *Runner) {
u := tasksrvURL(tasksrv)
startAsyncRunners(ctx, u, tasks, rnr)
<-ctx.Done()
}
func startAsyncRunners(ctx context.Context, url string, tasks chan TaskRequest, rnr *Runner) {
func startAsyncRunners(ctx context.Context, url string, tasks chan task.Request, rnr *Runner) {
var wg sync.WaitGroup
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"runner": "async"})
for {

View File

@@ -17,6 +17,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/iron-io/functions/api/models"
"github.com/iron-io/functions/api/mqs"
"github.com/iron-io/functions/api/runner/task"
)
func setLogBuffer() *bytes.Buffer {
@@ -202,13 +203,13 @@ func TestAsyncRunnersGracefulShutdown(t *testing.T) {
ts := getTestServer([]*models.Task{&mockTask})
defer ts.Close()
tasks := make(chan TaskRequest)
tasks := make(chan task.Request)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
defer close(tasks)
go func() {
for t := range tasks {
t.Response <- TaskResponse{
t.Response <- task.Response{
Result: nil,
Err: nil,
}

View File

@@ -0,0 +1,19 @@
package protocol
import (
"context"
"github.com/iron-io/functions/api/runner/task"
)
// DefaultProtocol is the protocol used by cold-containers
type DefaultProtocol struct {
}
func (p *DefaultProtocol) IsStreamable() bool {
return false
}
func (p *DefaultProtocol) Dispatch(ctx context.Context, t task.Request) error {
return nil
}

View File

@@ -0,0 +1,52 @@
package protocol
import (
"context"
"errors"
"io"
"github.com/iron-io/functions/api/models"
"github.com/iron-io/functions/api/runner/task"
)
var errInvalidProtocol = errors.New("Invalid Protocol")
// ContainerIO defines the interface used to talk to a hot container.
// Internally, a protocol must know when to alternate between stdin and stdout.
// It returns any protocol error, if present.
type ContainerIO interface {
IsStreamable() bool
Dispatch(ctx context.Context, t task.Request) error
}
// Protocol defines all protocols that operates a ContainerIO.
type Protocol string
// Hot container protocols
const (
Default Protocol = models.FormatDefault
HTTP Protocol = models.FormatHTTP
)
// New creates a valid protocol handler from a I/O pipe representing containers
// stdin/stdout.
func New(p Protocol, in io.Writer, out io.Reader) (ContainerIO, error) {
switch p {
case HTTP:
return &HTTPProtocol{in, out}, nil
case Default:
return &DefaultProtocol{}, nil
default:
return nil, errInvalidProtocol
}
}
// IsStreamable says whether the given protocol can be used for streaming into
// hot containers.
func IsStreamable(p string) (bool, error) {
proto, err := New(Protocol(p), nil, nil)
if err != nil {
return false, err
}
return proto.IsStreamable(), nil
}

View File

@@ -0,0 +1,73 @@
package protocol
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/http/httputil"
"time"
"github.com/iron-io/functions/api/models"
"github.com/iron-io/functions/api/runner/task"
)
// HTTPProtocol converts stdin/stdout streams into HTTP/1.1 compliant
// communication. It relies on Content-Length to know when to stop reading from
// containers stdout. It also mandates valid HTTP headers back and forth, thus
// returning errors in case of parsing problems.
type HTTPProtocol struct {
in io.Writer
out io.Reader
}
func (p *HTTPProtocol) IsStreamable() bool {
return true
}
func (p *HTTPProtocol) Dispatch(ctx context.Context, t task.Request) error {
var retErr error
done := make(chan struct{})
go func() {
var body bytes.Buffer
io.Copy(&body, t.Config.Stdin)
req, err := http.NewRequest("GET", "/", &body)
if err != nil {
retErr = err
return
}
for k, v := range t.Config.Env {
req.Header.Set(k, v)
}
req.Header.Set("Content-Length", fmt.Sprint(body.Len()))
req.Header.Set("Task-ID", t.Config.ID)
raw, err := httputil.DumpRequest(req, true)
if err != nil {
retErr = err
return
}
p.in.Write(raw)
res, err := http.ReadResponse(bufio.NewReader(p.out), req)
if err != nil {
retErr = err
return
}
io.Copy(t.Config.Stdout, res.Body)
done <- struct{}{}
}()
timeout := time.After(t.Config.Timeout)
select {
case <-ctx.Done():
return ctx.Err()
case <-timeout:
return models.ErrRunnerTimeout
case <-done:
return retErr
}
}

View File

@@ -5,7 +5,6 @@ import (
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"runtime"
@@ -15,6 +14,7 @@ import (
"time"
"github.com/Sirupsen/logrus"
"github.com/iron-io/functions/api/runner/task"
"github.com/iron-io/runner/common"
"github.com/iron-io/runner/drivers"
driverscommon "github.com/iron-io/runner/drivers"
@@ -22,19 +22,6 @@ import (
"github.com/iron-io/runner/drivers/mock"
)
type Config struct {
ID string
Image string
Timeout time.Duration
AppName string
Path string
Memory uint64
Env map[string]string
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
}
type Runner struct {
driver drivers.Driver
taskQueue chan *containerTask
@@ -155,7 +142,7 @@ func (r *Runner) checkMemAndUse(req uint64) bool {
return true
}
func (r *Runner) Run(ctx context.Context, cfg *Config) (drivers.RunResult, error) {
func (r *Runner) Run(ctx context.Context, cfg *task.Config) (drivers.RunResult, error) {
var err error
if cfg.Memory == 0 {
@@ -224,7 +211,7 @@ func (r *Runner) Run(ctx context.Context, cfg *Config) (drivers.RunResult, error
return result, nil
}
func (r Runner) EnsureImageExists(ctx context.Context, cfg *Config) error {
func (r Runner) EnsureImageExists(ctx context.Context, cfg *task.Config) error {
ctask := &containerTask{
cfg: cfg,
}

View File

@@ -9,6 +9,7 @@ import (
"time"
"github.com/iron-io/functions/api/models"
"github.com/iron-io/functions/api/runner/task"
)
func TestRunnerHello(t *testing.T) {
@@ -31,7 +32,7 @@ func TestRunnerHello(t *testing.T) {
{&models.Route{Image: "iron/hello"}, `{"name": "test"}`, "success", "Hello test!", ""},
} {
var stdout, stderr bytes.Buffer
cfg := &Config{
cfg := &task.Config{
ID: fmt.Sprintf("hello-%d-%d", i, time.Now().Unix()),
Image: test.route.Image,
Timeout: 5 * time.Second,
@@ -84,7 +85,7 @@ func TestRunnerError(t *testing.T) {
{&models.Route{Image: "iron/error"}, `{"name": "test"}`, "error", "", "RuntimeError"},
} {
var stdout, stderr bytes.Buffer
cfg := &Config{
cfg := &task.Config{
ID: fmt.Sprintf("err-%d-%d", i, time.Now().Unix()),
Image: test.route.Image,
Timeout: 5 * time.Second,

View File

@@ -6,12 +6,13 @@ import (
"time"
"github.com/fsouza/go-dockerclient"
"github.com/iron-io/functions/api/runner/task"
"github.com/iron-io/runner/drivers"
)
type containerTask struct {
ctx context.Context
cfg *Config
cfg *task.Config
canRun chan bool
}

40
api/runner/task/task.go Normal file
View File

@@ -0,0 +1,40 @@
package task
import (
"context"
"io"
"time"
"github.com/iron-io/runner/drivers"
)
type Config struct {
ID string
Path string
Image string
Timeout time.Duration
AppName string
Memory uint64
Env map[string]string
Format string
MaxConcurrency int
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
}
// Request stores the task to be executed by the common concurrency stream,
// whatever type the ask actually is, either sync or async. It holds in itself
// the channel to return its response to its caller.
type Request struct {
Ctx context.Context
Config *Config
Response chan Response
}
// Response holds the response metainformation of a Request
type Response struct {
Result drivers.RunResult
Err error
}

View File

@@ -1,52 +1,367 @@
package runner
import (
"bufio"
"context"
"fmt"
"io"
"sync"
"time"
"github.com/Sirupsen/logrus"
"github.com/iron-io/functions/api/runner/protocol"
"github.com/iron-io/functions/api/runner/task"
"github.com/iron-io/runner/drivers"
)
type TaskRequest struct {
Ctx context.Context
Config *Config
Response chan TaskResponse
}
// Hot containers - theory of operation
//
// A function is converted into a hot container if its `Format` is either
// a streamable format/protocol. At the very first task request a hot
// container shall be started and run it. Each hot container has an internal
// clock that actually halts the container if it goes idle long enough. In the
// absence of workload, it just stops the whole clockwork.
//
// Internally, the hot container uses a modified Config whose Stdin and Stdout
// are bound to an internal pipe. This internal pipe is fed with incoming tasks
// Stdin and feeds incoming tasks with Stdout.
//
// Each execution is the alternation of feeding hot containers stdin with tasks
// stdin, and reading the answer back from containers stdout. For all `Format`s
// we send embedded into the message metadata to help the container to know when
// to stop reading from its stdin and Functions expect the container to do the
// same. Refer to api/runner/protocol.go for details of these communications.
//
// Hot Containers implementation relies in two moving parts (drawn below):
// htcntrmgr and htcntr. Refer to their respective comments for
// details.
// │
// Incoming
// Task
// │
// ▼
// ┌───────────────┐
// │ Task Request │
// │ Main Loop │
// └───────────────┘
// │
// ┌──────▼────────┐
// ┌┴──────────────┐│
// │ Per Function ││ non-streamable f()
// ┌───────│ Container │├──────┐───────────────┐
// │ │ Manager ├┘ │ │
// │ └───────────────┘ │ │
// │ │ │ │
// ▼ ▼ ▼ ▼
// ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐
// │ Hot │ │ Hot │ │ Hot │ │ Cold │
// │ Container │ │ Container │ │ Container │ │ Container │
// └───────────┘ └───────────┘ └───────────┘ └───────────┘
// Timeout
// Terminate
// (internal clock)
type TaskResponse struct {
Result drivers.RunResult
Err error
}
const (
// Terminate hot container after this timeout
htcntrScaleDownTimeout = 30 * time.Second
)
// StartWorkers handle incoming tasks and spawns self-regulating container
// workers.
func StartWorkers(ctx context.Context, rnr *Runner, tasks <-chan TaskRequest) {
var wg sync.WaitGroup
for {
select {
case <-ctx.Done():
wg.Wait()
return
case task := <-tasks:
wg.Add(1)
go func(task TaskRequest) {
defer wg.Done()
result, err := rnr.Run(task.Ctx, task.Config)
select {
case task.Response <- TaskResponse{result, err}:
close(task.Response)
default:
}
}(task)
}
}
}
func RunTask(tasks chan TaskRequest, ctx context.Context, cfg *Config) (drivers.RunResult, error) {
tresp := make(chan TaskResponse)
treq := TaskRequest{Ctx: ctx, Config: cfg, Response: tresp}
// RunTask helps sending a task.Request into the common concurrency stream.
// Refer to StartWorkers() to understand what this is about.
func RunTask(tasks chan task.Request, ctx context.Context, cfg *task.Config) (drivers.RunResult, error) {
tresp := make(chan task.Response)
treq := task.Request{Ctx: ctx, Config: cfg, Response: tresp}
tasks <- treq
resp := <-treq.Response
return resp.Result, resp.Err
}
// StartWorkers operates the common concurrency stream, ie, it will process all
// IronFunctions tasks, either sync or async. In the process, it also dispatches
// the workload to either regular or hot containers.
func StartWorkers(ctx context.Context, rnr *Runner, tasks <-chan task.Request) {
var wg sync.WaitGroup
defer wg.Wait()
var hcmgr htcntrmgr
for {
select {
case <-ctx.Done():
return
case task := <-tasks:
p := hcmgr.getPipe(ctx, rnr, task.Config)
if p == nil {
wg.Add(1)
go runTaskReq(rnr, &wg, task)
continue
}
select {
case <-ctx.Done():
return
case p <- task:
}
}
}
}
// htcntrmgr is the intermediate between the common concurrency stream and
// hot containers. All hot containers share a single task.Request stream per
// function (chn), but each function may have more than one hot container (hc).
type htcntrmgr struct {
chn map[string]chan task.Request
hc map[string]*htcntrsvr
}
func (h *htcntrmgr) getPipe(ctx context.Context, rnr *Runner, cfg *task.Config) chan task.Request {
isStream, err := protocol.IsStreamable(cfg.Format)
if err != nil {
logrus.WithError(err).Info("could not detect container IO protocol")
return nil
} else if !isStream {
return nil
}
if h.chn == nil {
h.chn = make(map[string]chan task.Request)
h.hc = make(map[string]*htcntrsvr)
}
// TODO(ccirello): re-implement this without memory allocation (fmt.Sprint)
fn := fmt.Sprint(cfg.AppName, ",", cfg.Path, cfg.Image, cfg.Timeout, cfg.Memory, cfg.Format, cfg.MaxConcurrency)
tasks, ok := h.chn[fn]
if !ok {
h.chn[fn] = make(chan task.Request)
tasks = h.chn[fn]
svr := newhtcntrsvr(ctx, cfg, rnr, tasks)
if err := svr.launch(ctx); err != nil {
logrus.WithError(err).Error("cannot start hot container supervisor")
return nil
}
h.hc[fn] = svr
}
return tasks
}
// htcntrsvr is part of htcntrmgr, abstracted apart for simplicity, its only
// purpose is to test for hot containers saturation and try starting as many as
// needed. In case of absence of workload, it will stop trying to start new hot
// containers.
type htcntrsvr struct {
cfg *task.Config
rnr *Runner
tasksin <-chan task.Request
tasksout chan task.Request
maxc chan struct{}
}
func newhtcntrsvr(ctx context.Context, cfg *task.Config, rnr *Runner, tasks <-chan task.Request) *htcntrsvr {
svr := &htcntrsvr{
cfg: cfg,
rnr: rnr,
tasksin: tasks,
tasksout: make(chan task.Request, 1),
maxc: make(chan struct{}, cfg.MaxConcurrency),
}
// This pipe will take all incoming tasks and just forward them to the
// started hot containers. The catch here is that it feeds a buffered
// channel from an unbuffered one. And this buffered channel is
// then used to determine the presence of running hot containers.
// If no hot container is available, tasksout will fill up to its
// capacity and pipe() will start them.
go svr.pipe(ctx)
return svr
}
func (svr *htcntrsvr) pipe(ctx context.Context) {
for {
select {
case t := <-svr.tasksin:
svr.tasksout <- t
if len(svr.tasksout) > 0 {
if err := svr.launch(ctx); err != nil {
logrus.WithError(err).Error("cannot start more hot containers")
}
}
case <-ctx.Done():
return
}
}
}
func (svr *htcntrsvr) launch(ctx context.Context) error {
select {
case svr.maxc <- struct{}{}:
hc, err := newhtcntr(
svr.cfg,
protocol.Protocol(svr.cfg.Format),
svr.tasksout,
svr.rnr,
)
if err != nil {
return err
}
go func() {
hc.serve(ctx)
<-svr.maxc
}()
default:
}
return nil
}
// htcntr actually interfaces an incoming task from the common concurrency
// stream into a long lived container. If idle long enough, it will stop. It
// uses route configuration to determine which protocol to use.
type htcntr struct {
cfg *task.Config
proto protocol.ContainerIO
tasks <-chan task.Request
// Side of the pipe that takes information from outer world
// and injects into the container.
in io.Writer
out io.Reader
// Receiving side of the container.
containerIn io.Reader
containerOut io.Writer
rnr *Runner
}
func newhtcntr(cfg *task.Config, proto protocol.Protocol, tasks <-chan task.Request, rnr *Runner) (*htcntr, error) {
stdinr, stdinw := io.Pipe()
stdoutr, stdoutw := io.Pipe()
p, err := protocol.New(proto, stdinw, stdoutr)
if err != nil {
return nil, err
}
hc := &htcntr{
cfg: cfg,
proto: p,
tasks: tasks,
in: stdinw,
out: stdoutr,
containerIn: stdinr,
containerOut: stdoutw,
rnr: rnr,
}
return hc, nil
}
func (hc *htcntr) serve(ctx context.Context) {
lctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
inactivity := time.After(htcntrScaleDownTimeout)
select {
case <-lctx.Done():
return
case <-inactivity:
cancel()
case t := <-hc.tasks:
if err := hc.proto.Dispatch(lctx, t); err != nil {
logrus.WithField("ctx", lctx).Info("task failed")
t.Response <- task.Response{
&runResult{StatusValue: "error", error: err},
err,
}
continue
}
t.Response <- task.Response{
&runResult{StatusValue: "success"},
nil,
}
}
}
}()
cfg := *hc.cfg
cfg.Timeout = 0 // add a timeout to simulate ab.end. failure.
cfg.Stdin = hc.containerIn
cfg.Stdout = hc.containerOut
// Why can we not attach stderr to the task like we do for stdin and
// stdout?
//
// Stdin/Stdout are completely known to the scope of the task. You must
// have a task stdin to feed containers stdin, and also the other way
// around when reading from stdout. So both are directly related to the
// life cycle of the request.
//
// Stderr, on the other hand, can be written by anything any time:
// failure between requests, failures inside requests and messages send
// right after stdout has been finished being transmitted. Thus, with
// hot containers, there is not a 1:1 relation between stderr and tasks.
//
// Still, we do pass - at protocol level - a Task-ID header, from which
// the application running inside the hot container can use to identify
// its own stderr output.
errr, errw := io.Pipe()
cfg.Stderr = errw
wg.Add(1)
go func() {
defer wg.Done()
scanner := bufio.NewScanner(errr)
for scanner.Scan() {
logrus.WithFields(logrus.Fields{
"app": cfg.AppName,
"route": cfg.Path,
"image": cfg.Image,
"memory": cfg.Memory,
"format": cfg.Format,
"max_concurrency": cfg.MaxConcurrency,
}).Info(scanner.Text())
}
}()
result, err := hc.rnr.Run(lctx, &cfg)
if err != nil {
logrus.WithError(err).Error("hot container failure detected")
}
cancel()
errw.Close()
wg.Wait()
logrus.WithField("result", result).Info("hot container terminated")
}
func runTaskReq(rnr *Runner, wg *sync.WaitGroup, t task.Request) {
defer wg.Done()
result, err := rnr.Run(t.Ctx, t.Config)
select {
case t.Response <- task.Response{result, err}:
close(t.Response)
default:
}
}
type runResult struct {
error
StatusValue string
}
func (r *runResult) Error() string {
if r.error == nil {
return ""
}
return r.error.Error()
}
func (r *runResult) Status() string { return r.StatusValue }