all: rename hot containers to hot functions (#465)

This commit is contained in:
C Cirello
2016-12-29 20:07:41 +01:00
committed by GitHub
parent e968576d96
commit 2f0520521c
12 changed files with 73 additions and 73 deletions

View File

@@ -14,26 +14,26 @@ import (
"github.com/iron-io/runner/drivers"
)
// Hot containers - theory of operation
// hot functions - theory of operation
//
// A function is converted into a hot container if its `Format` is either
// A function is converted into a hot function if its `Format` is either
// a streamable format/protocol. At the very first task request a hot
// container shall be started and run it. Each hot container has an internal
// container shall be started and run it. Each hot function has an internal
// clock that actually halts the container if it goes idle long enough. In the
// absence of workload, it just stops the whole clockwork.
//
// Internally, the hot container uses a modified Config whose Stdin and Stdout
// Internally, the hot function uses a modified Config whose Stdin and Stdout
// are bound to an internal pipe. This internal pipe is fed with incoming tasks
// Stdin and feeds incoming tasks with Stdout.
//
// Each execution is the alternation of feeding hot containers stdin with tasks
// Each execution is the alternation of feeding hot functions stdin with tasks
// stdin, and reading the answer back from containers stdout. For all `Format`s
// we send embedded into the message metadata to help the container to know when
// to stop reading from its stdin and Functions expect the container to do the
// same. Refer to api/runner/protocol.go for details of these communications.
//
// Hot Containers implementation relies in two moving parts (drawn below):
// htcntrmgr and htcntr. Refer to their respective comments for
// hot functions implementation relies in two moving parts (drawn below):
// htfnmgr and htfn. Refer to their respective comments for
// details.
// │
// Incoming
@@ -55,15 +55,15 @@ import (
// ▼ ▼ ▼ ▼
// ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐
// │ Hot │ │ Hot │ │ Hot │ │ Cold │
// │ Container │ │ Container │ │ Container │ │ Container
// │ Function │ │ Function │ │ Function │ │ Function
// └───────────┘ └───────────┘ └───────────┘ └───────────┘
// Timeout
// Terminate
// (internal clock)
const (
// Terminate hot container after this timeout
htcntrScaleDownTimeout = 30 * time.Second
// Terminate hot function after this timeout
htfnScaleDownTimeout = 30 * time.Second
)
// RunTask helps sending a task.Request into the common concurrency stream.
@@ -78,11 +78,11 @@ func RunTask(tasks chan task.Request, ctx context.Context, cfg *task.Config) (dr
// StartWorkers operates the common concurrency stream, ie, it will process all
// IronFunctions tasks, either sync or async. In the process, it also dispatches
// the workload to either regular or hot containers.
// the workload to either regular or hot functions.
func StartWorkers(ctx context.Context, rnr *Runner, tasks <-chan task.Request) {
var wg sync.WaitGroup
defer wg.Wait()
var hcmgr htcntrmgr
var hcmgr htfnmgr
for {
select {
@@ -105,15 +105,15 @@ func StartWorkers(ctx context.Context, rnr *Runner, tasks <-chan task.Request) {
}
}
// htcntrmgr is the intermediate between the common concurrency stream and
// hot containers. All hot containers share a single task.Request stream per
// function (chn), but each function may have more than one hot container (hc).
type htcntrmgr struct {
// htfnmgr is the intermediate between the common concurrency stream and
// hot functions. All hot functions share a single task.Request stream per
// function (chn), but each function may have more than one hot function (hc).
type htfnmgr struct {
chn map[string]chan task.Request
hc map[string]*htcntrsvr
hc map[string]*htfnsvr
}
func (h *htcntrmgr) getPipe(ctx context.Context, rnr *Runner, cfg *task.Config) chan task.Request {
func (h *htfnmgr) getPipe(ctx context.Context, rnr *Runner, cfg *task.Config) chan task.Request {
isStream, err := protocol.IsStreamable(cfg.Format)
if err != nil {
logrus.WithError(err).Info("could not detect container IO protocol")
@@ -124,7 +124,7 @@ func (h *htcntrmgr) getPipe(ctx context.Context, rnr *Runner, cfg *task.Config)
if h.chn == nil {
h.chn = make(map[string]chan task.Request)
h.hc = make(map[string]*htcntrsvr)
h.hc = make(map[string]*htfnsvr)
}
// TODO(ccirello): re-implement this without memory allocation (fmt.Sprint)
@@ -133,9 +133,9 @@ func (h *htcntrmgr) getPipe(ctx context.Context, rnr *Runner, cfg *task.Config)
if !ok {
h.chn[fn] = make(chan task.Request)
tasks = h.chn[fn]
svr := newhtcntrsvr(ctx, cfg, rnr, tasks)
svr := newhtfnsvr(ctx, cfg, rnr, tasks)
if err := svr.launch(ctx); err != nil {
logrus.WithError(err).Error("cannot start hot container supervisor")
logrus.WithError(err).Error("cannot start hot function supervisor")
return nil
}
h.hc[fn] = svr
@@ -144,11 +144,11 @@ func (h *htcntrmgr) getPipe(ctx context.Context, rnr *Runner, cfg *task.Config)
return tasks
}
// htcntrsvr is part of htcntrmgr, abstracted apart for simplicity, its only
// purpose is to test for hot containers saturation and try starting as many as
// htfnsvr is part of htfnmgr, abstracted apart for simplicity, its only
// purpose is to test for hot functions saturation and try starting as many as
// needed. In case of absence of workload, it will stop trying to start new hot
// containers.
type htcntrsvr struct {
type htfnsvr struct {
cfg *task.Config
rnr *Runner
tasksin <-chan task.Request
@@ -156,8 +156,8 @@ type htcntrsvr struct {
maxc chan struct{}
}
func newhtcntrsvr(ctx context.Context, cfg *task.Config, rnr *Runner, tasks <-chan task.Request) *htcntrsvr {
svr := &htcntrsvr{
func newhtfnsvr(ctx context.Context, cfg *task.Config, rnr *Runner, tasks <-chan task.Request) *htfnsvr {
svr := &htfnsvr{
cfg: cfg,
rnr: rnr,
tasksin: tasks,
@@ -166,23 +166,23 @@ func newhtcntrsvr(ctx context.Context, cfg *task.Config, rnr *Runner, tasks <-ch
}
// This pipe will take all incoming tasks and just forward them to the
// started hot containers. The catch here is that it feeds a buffered
// started hot functions. The catch here is that it feeds a buffered
// channel from an unbuffered one. And this buffered channel is
// then used to determine the presence of running hot containers.
// If no hot container is available, tasksout will fill up to its
// then used to determine the presence of running hot functions.
// If no hot function is available, tasksout will fill up to its
// capacity and pipe() will start them.
go svr.pipe(ctx)
return svr
}
func (svr *htcntrsvr) pipe(ctx context.Context) {
func (svr *htfnsvr) pipe(ctx context.Context) {
for {
select {
case t := <-svr.tasksin:
svr.tasksout <- t
if len(svr.tasksout) > 0 {
if err := svr.launch(ctx); err != nil {
logrus.WithError(err).Error("cannot start more hot containers")
logrus.WithError(err).Error("cannot start more hot functions")
}
}
case <-ctx.Done():
@@ -191,10 +191,10 @@ func (svr *htcntrsvr) pipe(ctx context.Context) {
}
}
func (svr *htcntrsvr) launch(ctx context.Context) error {
func (svr *htfnsvr) launch(ctx context.Context) error {
select {
case svr.maxc <- struct{}{}:
hc, err := newhtcntr(
hc, err := newhtfn(
svr.cfg,
protocol.Protocol(svr.cfg.Format),
svr.tasksout,
@@ -213,10 +213,10 @@ func (svr *htcntrsvr) launch(ctx context.Context) error {
return nil
}
// htcntr actually interfaces an incoming task from the common concurrency
// htfn actually interfaces an incoming task from the common concurrency
// stream into a long lived container. If idle long enough, it will stop. It
// uses route configuration to determine which protocol to use.
type htcntr struct {
type htfn struct {
cfg *task.Config
proto protocol.ContainerIO
tasks <-chan task.Request
@@ -233,7 +233,7 @@ type htcntr struct {
rnr *Runner
}
func newhtcntr(cfg *task.Config, proto protocol.Protocol, tasks <-chan task.Request, rnr *Runner) (*htcntr, error) {
func newhtfn(cfg *task.Config, proto protocol.Protocol, tasks <-chan task.Request, rnr *Runner) (*htfn, error) {
stdinr, stdinw := io.Pipe()
stdoutr, stdoutw := io.Pipe()
@@ -242,7 +242,7 @@ func newhtcntr(cfg *task.Config, proto protocol.Protocol, tasks <-chan task.Requ
return nil, err
}
hc := &htcntr{
hc := &htfn{
cfg: cfg,
proto: p,
tasks: tasks,
@@ -259,14 +259,14 @@ func newhtcntr(cfg *task.Config, proto protocol.Protocol, tasks <-chan task.Requ
return hc, nil
}
func (hc *htcntr) serve(ctx context.Context) {
func (hc *htfn) serve(ctx context.Context) {
lctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
inactivity := time.After(htcntrScaleDownTimeout)
inactivity := time.After(htfnScaleDownTimeout)
select {
case <-lctx.Done():
@@ -309,10 +309,10 @@ func (hc *htcntr) serve(ctx context.Context) {
// Stderr, on the other hand, can be written by anything any time:
// failure between requests, failures inside requests and messages send
// right after stdout has been finished being transmitted. Thus, with
// hot containers, there is not a 1:1 relation between stderr and tasks.
// hot functions, there is not a 1:1 relation between stderr and tasks.
//
// Still, we do pass - at protocol level - a Task-ID header, from which
// the application running inside the hot container can use to identify
// the application running inside the hot function can use to identify
// its own stderr output.
errr, errw := io.Pipe()
cfg.Stderr = errw
@@ -334,12 +334,12 @@ func (hc *htcntr) serve(ctx context.Context) {
result, err := hc.rnr.Run(lctx, &cfg)
if err != nil {
logrus.WithError(err).Error("hot container failure detected")
logrus.WithError(err).Error("hot function failure detected")
}
cancel()
errw.Close()
wg.Wait()
logrus.WithField("result", result).Info("hot container terminated")
logrus.WithField("result", result).Info("hot function terminated")
}
func runTaskReq(rnr *Runner, wg *sync.WaitGroup, t task.Request) {