Files
fn-serverless/api/agent/agent.go
Reed Allman 3a9c48b8a3 http-stream format (#1202)
* POC code for inotify UDS-io-socket

* http-stream format

introducing the `http-stream` format support in fn. there are many details for
this, none of which can be linked from github :( -- docs are coming (I could
even try to add some here?). this is kinda MVP-ish level, but does not
implement the remaining spec, ie 'headers' fixing up / invoke fixing up. the
thinking being we can land this to test fdks / cli with and start splitting
work up on top of this. all other formats work the same as previous (no
breakage, only new stuff)

with the cli you can set `format: http-stream` and deploy, and then invoke a
function via the `http-stream` format. this uses unix domain socket (uds) on
the container instead of previous stdin/stdout, and fdks will have to support
this in a new fashion (will see about getting docs on here). fdk-go works,
which is here: https://github.com/fnproject/fdk-go/pull/30 . the output looks
the same as an http format function when invoking a function. wahoo.

there's some amount of stuff we can clean up here, enumerated:

* the cleanup of the sock files is iffy, high pri here

* permissions are a pain in the ass and i punted on dealing with them. you can
run `sudo ./fnserver` if running locally, it may/may not work in dind(?) ootb

* no pipe usage at all (yay), still could reduce buffer usage around the pipe
behavior, we could clean this up potentially before removal (and tests)

* my brain can’t figure out if dispatchOldFormats changes pipe behavior, but
tests work

* i marked XXX to do some clean up which will follow soon… need this to test fdk
tho so meh, any thoughts on those marked would be appreciated however (1 less
decision for me). mostly happy w/ general shape/plumbing tho

* there are no tests atm, this is a tricky dance indeed. attempts were made.
need to futz with the permission stuff before committing to adding any tests
here, which I don't like either. also, need to get the fdk-go based test image
updated according to the fdk-go, and there's a dance there too. rumba time..

* delaying the big big cleanup until we have good enough fdk support to kill
all the other formats.

open to ideas on how to maneuver landing stuff...

* fix unmount

* see if the tests work on ci...

* add call id header

* fix up makefile

* add configurable iofs opts

* add format file describing http-stream contract

* rm some cruft

* default iofs to /tmp, remove mounting

out of the box fn we can't mount. /tmp will provide a memory backed fs for us
on most systems, this will be fine for local developing and this can be
configured to be wherever for anyone that wants to make things more difficult
for themselves.

also removes the mounting, this has to be done as root. we can't do this in
the oss fn (short of requesting root, but no). in the future, we may want to
have a knob here to have a function that can be configured in fn that allows
further configuration here. since we don't know what we need in this dept
really, not doing that yet (it may be the case that it could be done
operationally outside of fn, eg, but not if each directory needs to be
configured itself, which seems likely, anyway...)

* add WIP note just in case...
2018-09-14 10:59:12 +01:00

1355 lines
42 KiB
Go

package agent
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"os"
"strings"
"sync"
"time"
"github.com/fnproject/fn/api/agent/drivers"
"github.com/fnproject/fn/api/agent/protocol"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/id"
"github.com/fnproject/fn/api/models"
"github.com/fnproject/fn/fnext"
"github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"
"go.opencensus.io/stats"
"go.opencensus.io/trace"
)
// TODO we should prob store async calls in db immediately since we're returning id (will 404 until post-execution)
// TODO async calls need to add route.Headers as well
// TODO handle timeouts / no response in sync & async (sync is json+503 atm, not 504, async is empty log+status)
// see also: server/runner.go wrapping the response writer there, but need to handle async too (push down?)
// TODO storing logs / call can push call over the timeout
// TODO async is still broken, but way less so. we need to modify mq semantics
// to be much more robust. now we're at least running it if we delete the msg,
// but we may never store info about that execution so still broked (if fn
// dies). need coordination w/ db.
// TODO if a cold call times out but container is created but hasn't replied, could
// end up that the client doesn't get a reply until long after the timeout (b/c of container removal, async it?)
// TODO if async would store requests (or interchange format) it would be slick, but
// if we're going to store full calls in db maybe we should only queue pointers to ids?
// TODO examine cases where hot can't start a container and the user would never see an error
// about why that may be so (say, whatever it is takes longer than the timeout, e.g.)
// TODO if an image is not found or similar issues in getting a slot, then async should probably
// mark the call as errored rather than forever trying & failing to run it
// TODO it would be really nice if we made the ramToken wrap the driver cookie (less brittle,
// if those leak the container leaks too...) -- not the allocation, but the token.Close and cookie.Close
// TODO if machine is out of ram, just timeout immediately / wait for hot slot? (discuss policy)
// Agent exposes an api to create calls from various parameters and then submit
// those calls, it also exposes a 'safe' shutdown mechanism via its Close method.
// Agent has a few roles:
// * manage the memory pool for a given server
// * manage the container lifecycle for calls (hot+cold)
// * execute calls against containers
// * invoke Start and End for each call appropriately
// * check the mq for any async calls, and submit them
//
// Overview:
// Upon submission of a call, Agent will start the call's timeout timer
// immediately. If the call is hot, Agent will attempt to find an active hot
// container for that route, and if necessary launch another container. Cold
// calls will launch one container each. Cold calls will get container input
// and output directly, whereas hot calls will be able to read/write directly
// from/to a pipe in a container via Dispatch. If it's necessary to launch a
// container, first an attempt will be made to try to reserve the ram required
// while waiting for any hot 'slot' to become available [if applicable]. If
// there is an error launching the container, an error will be returned
// provided the call has not yet timed out or found another hot 'slot' to
// execute in [if applicable]. call.Start will be called immediately before
// starting a container, if cold (i.e. after pulling), or immediately before
// sending any input, if hot. call.End will be called regardless of the
// timeout timer's status if the call was executed, and that error returned may
// be returned from Submit.
type Agent interface {
// GetCall will return a Call that is executable by the Agent, which
// can be built via various CallOpt's provided to the method.
GetCall(...CallOpt) (Call, error)
// Submit will attempt to execute a call locally, a Call may store information
// about itself in its Start and End methods, which will be called in Submit
// immediately before and after the Call is executed, respectively. An error
// will be returned if there is an issue executing the call or the error
// may be from the call's execution itself (if, say, the container dies,
// or the call times out).
Submit(Call) error
// Close will wait for any outstanding calls to complete and then exit.
// Closing the agent will invoke Close on the underlying DataAccess.
// Close is not safe to be called from multiple threads.
io.Closer
AddCallListener(fnext.CallListener)
}
type agent struct {
cfg Config
da CallHandler
callListeners []fnext.CallListener
driver drivers.Driver
slotMgr *slotQueueMgr
evictor Evictor
// track usage
resources ResourceTracker
// used to track running calls / safe shutdown
shutWg *common.WaitGroup
shutonce sync.Once
disableAsyncDequeue bool
callOverrider CallOverrider
// deferred actions to call at end of initialisation
onStartup []func()
}
// Option configures an agent at startup
type Option func(*agent) error
// New creates an Agent that executes functions locally as Docker containers.
func New(da CallHandler, options ...Option) Agent {
cfg, err := NewConfig()
if err != nil {
logrus.WithError(err).Fatalf("error in agent config cfg=%+v", cfg)
}
a := &agent{
cfg: *cfg,
}
a.shutWg = common.NewWaitGroup()
a.da = da
a.slotMgr = NewSlotQueueMgr()
a.evictor = NewEvictor()
// Allow overriding config
for _, option := range options {
err = option(a)
if err != nil {
logrus.WithError(err).Fatalf("error in agent options")
}
}
logrus.Infof("agent starting cfg=%+v", a.cfg)
if a.driver == nil {
d, err := NewDockerDriver(&a.cfg)
if err != nil {
logrus.WithError(err).Fatal("failed to create docker driver ")
}
a.driver = d
}
a.resources = NewResourceTracker(&a.cfg)
for _, sup := range a.onStartup {
sup()
}
return a
}
func (a *agent) addStartup(sup func()) {
a.onStartup = append(a.onStartup, sup)
}
// WithAsync Enables Async operations on the agent
func WithAsync(dqda DequeueDataAccess) Option {
return func(a *agent) error {
if !a.shutWg.AddSession(1) {
logrus.Fatalf("cannot start agent, unable to add session")
}
a.addStartup(func() {
go a.asyncDequeue(dqda) // safe shutdown can nanny this fine
})
return nil
}
}
// WithConfig sets the agent config to the provided config
func WithConfig(cfg *Config) Option {
return func(a *agent) error {
a.cfg = *cfg
return nil
}
}
// WithDockerDriver Provides a customer driver to agent
func WithDockerDriver(drv drivers.Driver) Option {
return func(a *agent) error {
if a.driver != nil {
return errors.New("cannot add driver to agent, driver already exists")
}
a.driver = drv
return nil
}
}
// WithCallOverrider registers register a CallOverrider to modify a Call and extensions on call construction
func WithCallOverrider(fn CallOverrider) Option {
return func(a *agent) error {
if a.callOverrider != nil {
return errors.New("lb-agent call overriders already exists")
}
a.callOverrider = fn
return nil
}
}
// NewDockerDriver creates a default docker driver from agent config
func NewDockerDriver(cfg *Config) (drivers.Driver, error) {
return drivers.New("docker", drivers.Config{
DockerNetworks: cfg.DockerNetworks,
DockerLoadFile: cfg.DockerLoadFile,
ServerVersion: cfg.MinDockerVersion,
PreForkPoolSize: cfg.PreForkPoolSize,
PreForkImage: cfg.PreForkImage,
PreForkCmd: cfg.PreForkCmd,
PreForkUseOnce: cfg.PreForkUseOnce,
PreForkNetworks: cfg.PreForkNetworks,
MaxTmpFsInodes: cfg.MaxTmpFsInodes,
EnableReadOnlyRootFs: !cfg.DisableReadOnlyRootFs,
EnableTini: !cfg.DisableTini,
})
}
func (a *agent) Close() error {
var err error
// wait for ongoing sessions
a.shutWg.CloseGroup()
a.shutonce.Do(func() {
// now close docker layer
if a.driver != nil {
err = a.driver.Close()
}
})
return err
}
func (a *agent) Submit(callI Call) error {
if !a.shutWg.AddSession(1) {
return models.ErrCallTimeoutServerBusy
}
call := callI.(*call)
ctx := call.req.Context()
ctx, span := trace.StartSpan(ctx, "agent_submit")
defer span.End()
err := a.submit(ctx, call)
return err
}
func (a *agent) startStateTrackers(ctx context.Context, call *call) {
if !protocol.IsStreamable(protocol.Protocol(call.Format)) {
// For cold containers, we track the container state in call
call.containerState = NewContainerState()
}
call.requestState = NewRequestState()
}
func (a *agent) endStateTrackers(ctx context.Context, call *call) {
call.requestState.UpdateState(ctx, RequestStateDone, call.slots)
// For cold containers, we are done with the container.
if call.containerState != nil {
call.containerState.UpdateState(ctx, ContainerStateDone, call.slots)
}
}
func (a *agent) submit(ctx context.Context, call *call) error {
statsEnqueue(ctx)
a.startStateTrackers(ctx, call)
defer a.endStateTrackers(ctx, call)
slot, err := a.getSlot(ctx, call)
if err != nil {
return a.handleCallEnd(ctx, call, slot, err, false)
}
err = call.Start(ctx)
if err != nil {
return a.handleCallEnd(ctx, call, slot, err, false)
}
statsDequeueAndStart(ctx)
// We are about to execute the function, set container Exec Deadline (call.Timeout)
slotCtx, cancel := context.WithTimeout(ctx, time.Duration(call.Timeout)*time.Second)
defer cancel()
// Pass this error (nil or otherwise) to end directly, to store status, etc.
err = slot.exec(slotCtx, call)
return a.handleCallEnd(ctx, call, slot, err, true)
}
func (a *agent) handleCallEnd(ctx context.Context, call *call, slot Slot, err error, isStarted bool) error {
if slot != nil {
slot.Close(common.BackgroundContext(ctx))
}
// This means call was routed (executed)
if isStarted {
call.End(ctx, err)
}
handleStatsEnd(ctx, err)
a.shutWg.DoneSession()
return transformTimeout(err, !isStarted)
}
func transformTimeout(e error, isRetriable bool) error {
if e == context.DeadlineExceeded {
if isRetriable {
return models.ErrCallTimeoutServerBusy
}
return models.ErrCallTimeout
} else if e == CapacityFull {
return models.ErrCallTimeoutServerBusy
}
return e
}
// handleStatsDequeue handles stats for dequeuing for early exit (getSlot or Start)
// cases. Only timeouts can be a simple dequeue while other cases are actual errors.
func handleStatsDequeue(ctx context.Context, err error) {
if err == context.DeadlineExceeded {
statsDequeue(ctx)
statsTooBusy(ctx)
} else {
statsDequeueAndFail(ctx)
statsErrors(ctx)
}
}
// handleStatsEnd handles stats for after a call is ran, depending on error.
func handleStatsEnd(ctx context.Context, err error) {
if err == nil {
// decrement running count, increment completed count
statsComplete(ctx)
} else {
// decrement running count, increment failed count
statsFailed(ctx)
// increment the timeout or errors count, as appropriate
if err == context.DeadlineExceeded {
statsTimedout(ctx)
} else {
statsErrors(ctx)
}
}
}
// getSlot returns a Slot (or error) for the request to run. Depending on hot/cold
// request type, this may launch a new container or wait for other containers to become idle
// or it may wait for resources to become available to launch a new container.
func (a *agent) getSlot(ctx context.Context, call *call) (Slot, error) {
if call.Type == models.TypeAsync {
// *) for async, slot deadline is also call.Timeout. This is because we would like to
// allocate enough time for docker-pull, slot-wait, docker-start, etc.
// and also make sure we have call.Timeout inside the container. Total time
// to run an async becomes 2 * call.Timeout.
// *) for sync, there's no slot deadline, the timeout is controlled by http-client
// context (or runner gRPC context)
tmp, cancel := context.WithTimeout(ctx, time.Duration(call.Timeout)*time.Second)
ctx = tmp
defer cancel()
}
ctx, span := trace.StartSpan(ctx, "agent_get_slot")
defer span.End()
if protocol.IsStreamable(protocol.Protocol(call.Format)) {
// For hot requests, we use a long lived slot queue, which we use to manage hot containers
var isNew bool
if call.slotHashId == "" {
call.slotHashId = getSlotQueueKey(call)
}
call.slots, isNew = a.slotMgr.getSlotQueue(call.slotHashId)
call.requestState.UpdateState(ctx, RequestStateWait, call.slots)
if isNew {
go a.hotLauncher(ctx, call)
}
s, err := a.waitHot(ctx, call)
return s, err
}
call.requestState.UpdateState(ctx, RequestStateWait, call.slots)
return a.launchCold(ctx, call)
}
// hotLauncher is spawned in a go routine for each slot queue to monitor stats and launch hot
// containers if needed. Upon shutdown or activity timeout, hotLauncher exits and during exit,
// it destroys the slot queue.
func (a *agent) hotLauncher(ctx context.Context, call *call) {
// Let use 60 minutes or 2 * IdleTimeout as hot queue idle timeout, pick
// whichever is longer. If in this time, there's no activity, then
// we destroy the hot queue.
timeout := a.cfg.HotLauncherTimeout
idleTimeout := time.Duration(call.IdleTimeout) * time.Second * 2
if timeout < idleTimeout {
timeout = idleTimeout
}
logger := common.Logger(ctx)
logger.WithField("launcher_timeout", timeout).Info("Hot function launcher starting")
// IMPORTANT: get a context that has a child span / logger but NO timeout
// TODO this is a 'FollowsFrom'
ctx = common.BackgroundContext(ctx)
ctx, span := trace.StartSpan(ctx, "agent_hot_launcher")
defer span.End()
var notifyChan chan error
for {
ctx, cancel := context.WithTimeout(ctx, timeout)
a.checkLaunch(ctx, call, notifyChan)
notifyChan = nil
select {
case <-a.shutWg.Closer(): // server shutdown
cancel()
return
case <-ctx.Done(): // timed out
cancel()
if a.slotMgr.deleteSlotQueue(call.slots) {
logger.Info("Hot function launcher timed out")
return
}
case notifyChan = <-call.slots.signaller:
cancel()
}
}
}
func tryNotify(notifyChan chan error, err error) {
if notifyChan != nil && err != nil {
select {
case notifyChan <- err:
default:
}
}
}
func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan error) {
curStats := call.slots.getStats()
isAsync := call.Type == models.TypeAsync
isNB := a.cfg.EnableNBResourceTracker
if !isNewContainerNeeded(&curStats) {
return
}
state := NewContainerState()
state.UpdateState(ctx, ContainerStateWait, call.slots)
mem := call.Memory + uint64(call.TmpFsSize)
// WARNING: Tricky flow below. We are here because: isNewContainerNeeded is true,
// in other words, we need to launch a new container at this time due to high load.
//
// For non-blocking mode, this means, if we cannot acquire resources (cpu+mem), then we need
// to notify the caller through notifyChan. This is not perfect as the callers and
// checkLaunch do not match 1-1. But this is OK, we can notify *any* waiter that
// has signalled us, this is because non-blocking mode is a system wide setting.
// The notifications are lossy, but callers will signal/poll again if this is the case
// or this may not matter if they've already acquired an empty slot.
//
// For Non-blocking mode, a.cfg.HotPoll should not be set to too high since a missed
// notify event from here will add a.cfg.HotPoll msec latency. Setting a.cfg.HotPoll may
// be an acceptable workaround for the short term since non-blocking mode likely to reduce
// the number of waiters which perhaps could compensate for more frequent polling.
//
// Non-blocking mode only applies to cpu+mem, and if isNewContainerNeeded decided that we do not
// need to start a new container, then waiters will wait.
select {
case tok := <-a.resources.GetResourceToken(ctx, mem, uint64(call.CPUs), isAsync, isNB):
if tok != nil && tok.Error() != nil {
// before returning error response, as a last resort, try evicting idle containers.
if tok.Error() != CapacityFull || !a.evictor.PerformEviction(call.slotHashId, mem, uint64(call.CPUs)) {
tryNotify(notifyChan, tok.Error())
}
} else if a.shutWg.AddSession(1) {
go func() {
// NOTE: runHot will not inherit the timeout from ctx (ignore timings)
a.runHot(ctx, call, tok, state)
a.shutWg.DoneSession()
}()
// early return (do not allow container state to switch to ContainerStateDone)
return
}
if tok != nil {
tok.Close()
}
// Request routines are polling us with this a.cfg.HotPoll frequency. We can use this
// same timer to assume that we waited for cpu/mem long enough. Let's try to evict an
// idle container.
case <-time.After(a.cfg.HotPoll):
a.evictor.PerformEviction(call.slotHashId, mem, uint64(call.CPUs))
case <-ctx.Done(): // timeout
case <-a.shutWg.Closer(): // server shutdown
}
state.UpdateState(ctx, ContainerStateDone, call.slots)
}
// waitHot pings and waits for a hot container from the slot queue
func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) {
ctx, span := trace.StartSpan(ctx, "agent_wait_hot")
defer span.End()
ctx, cancel := context.WithCancel(ctx)
defer cancel() // shut down dequeuer if we grab a slot
ch := call.slots.startDequeuer(ctx)
notifyChan := make(chan error)
// 1) if we can get a slot immediately, grab it.
// 2) if we don't, send a signaller every x msecs until we do.
sleep := 1 * time.Microsecond // pad, so time.After doesn't send immediately
for {
select {
case err := <-notifyChan:
return nil, err
case s := <-ch:
if call.slots.acquireSlot(s) {
if s.slot.Error() != nil {
s.slot.Close(ctx)
return nil, s.slot.Error()
}
return s.slot, nil
}
// we failed to take ownership of the token (eg. container idle timeout) => try again
case <-ctx.Done():
return nil, ctx.Err()
case <-a.shutWg.Closer(): // server shutdown
return nil, models.ErrCallTimeoutServerBusy
case <-time.After(sleep):
// ping dequeuer again
}
// set sleep to x msecs after first iteration
sleep = a.cfg.HotPoll
// send a notification to launchHot()
select {
case call.slots.signaller <- notifyChan:
default:
}
}
}
// launchCold waits for necessary resources to launch a new container, then
// returns the slot for that new container to run the request on.
func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) {
isAsync := call.Type == models.TypeAsync
isNB := a.cfg.EnableNBResourceTracker
ch := make(chan Slot)
ctx, span := trace.StartSpan(ctx, "agent_launch_cold")
defer span.End()
call.containerState.UpdateState(ctx, ContainerStateWait, call.slots)
mem := call.Memory + uint64(call.TmpFsSize)
select {
case tok := <-a.resources.GetResourceToken(ctx, mem, uint64(call.CPUs), isAsync, isNB):
if tok.Error() != nil {
return nil, tok.Error()
}
go a.prepCold(ctx, call, tok, ch)
case <-ctx.Done():
return nil, ctx.Err()
}
// wait for launch err or a slot to open up
select {
case s := <-ch:
if s.Error() != nil {
s.Close(ctx)
return nil, s.Error()
}
return s, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
// implements Slot
type coldSlot struct {
cookie drivers.Cookie
tok ResourceToken
fatalErr error
}
func (s *coldSlot) Error() error {
return s.fatalErr
}
func (s *coldSlot) exec(ctx context.Context, call *call) error {
ctx, span := trace.StartSpan(ctx, "agent_cold_exec")
defer span.End()
call.requestState.UpdateState(ctx, RequestStateExec, call.slots)
call.containerState.UpdateState(ctx, ContainerStateBusy, call.slots)
waiter, err := s.cookie.Run(ctx)
if err != nil {
return err
}
res := waiter.Wait(ctx)
if res.Error() != nil {
// check for call error (oom/exit) and beam it up
return res.Error()
}
// nil or timed out
return ctx.Err()
}
func (s *coldSlot) Close(ctx context.Context) error {
if s.cookie != nil {
s.cookie.Close(ctx)
}
if s.tok != nil {
s.tok.Close()
}
return nil
}
// implements Slot
type hotSlot struct {
done chan struct{} // signal we are done with slot
errC <-chan error // container error
container *container // TODO mask this
cfg *Config
udsClient http.Client
fatalErr error
containerSpan trace.SpanContext
}
func (s *hotSlot) Close(ctx context.Context) error {
close(s.done)
return nil
}
func (s *hotSlot) Error() error {
return s.fatalErr
}
func (s *hotSlot) trySetError(err error) {
if s.fatalErr == nil {
s.fatalErr = err
}
}
func (s *hotSlot) exec(ctx context.Context, call *call) error {
ctx, span := trace.StartSpan(ctx, "agent_hot_exec")
defer span.End()
call.requestState.UpdateState(ctx, RequestStateExec, call.slots)
// link the container id and id in the logs [for us!]
common.Logger(ctx).WithField("container_id", s.container.id).Info("starting call")
// link the container span to ours for additional context (start/freeze/etc.)
span.AddLink(trace.Link{
TraceID: s.containerSpan.TraceID,
SpanID: s.containerSpan.SpanID,
Type: trace.LinkTypeChild,
})
call.req = call.req.WithContext(ctx) // TODO this is funny biz reed is bad
var errApp chan error
if call.Format == models.FormatHTTPStream {
errApp = s.dispatch(ctx, call)
} else { // TODO remove this block one glorious day
errApp = s.dispatchOldFormats(ctx, call)
}
select {
case err := <-s.errC: // error from container
s.trySetError(err)
return err
case err := <-errApp: // from dispatch
if err != nil {
if models.IsAPIError(err) {
s.trySetError(err)
} else if err == protocol.ErrExcessData {
s.trySetError(err)
// suppress excess data error, but do shutdown the container
return nil
}
}
return err
case <-ctx.Done(): // call timeout
s.trySetError(ctx.Err())
return ctx.Err()
}
}
func (s *hotSlot) dispatch(ctx context.Context, call *call) chan error {
ctx, span := trace.StartSpan(ctx, "agent_dispatch_httpstream")
defer span.End()
// TODO we can't trust that resp.Write doesn't timeout, even if the http
// client should respect the request context (right?) so we still need this (right?)
errApp := make(chan error, 1)
req := call.req
req.RequestURI = "" // we have to clear this before using it as a client request, see https://golang.org/pkg/net/http/#Request
//req.Header.Set("FN_DEADLINE", ci.Deadline().String())
req.Header.Set("FN_CALL_ID", call.ID)
go func() {
resp, err := s.udsClient.Do(req)
if err != nil {
errApp <- err
return
}
defer resp.Body.Close()
select {
case errApp <- writeResp(resp, call.w):
case <-ctx.Done():
errApp <- ctx.Err()
}
}()
return errApp
}
// XXX(reed): dupe code in http proto (which will die...)
func writeResp(resp *http.Response, w io.Writer) error {
rw, ok := w.(http.ResponseWriter)
if !ok {
return resp.Write(w)
}
// if we're writing directly to the response writer, we need to set headers
// and status code, and only copy the body. resp.Write would copy a full
// http request into the response body (not what we want).
for k, vs := range resp.Header {
for _, v := range vs {
rw.Header().Add(k, v)
}
}
if resp.StatusCode > 0 {
rw.WriteHeader(resp.StatusCode)
}
_, err := io.Copy(rw, resp.Body)
return err
}
// TODO remove
func (s *hotSlot) dispatchOldFormats(ctx context.Context, call *call) chan error {
errApp := make(chan error, 1)
go func() {
// XXX(reed): this may be liable to leave the pipes fucked up if dispatch times out, eg
// we may need ye ole close() func to put the Close()/swapBack() in from the caller
// swap in fresh pipes & stat accumulator to not interlace with other calls that used this slot [and timed out]
stdinRead, stdinWrite := io.Pipe()
stdoutRead, stdoutWritePipe := io.Pipe()
defer stdinRead.Close()
defer stdoutWritePipe.Close()
// NOTE: stderr is limited separately (though line writer is vulnerable to attack?)
// limit the bytes allowed to be written to the stdout pipe, which handles any
// buffering overflows (json to a string, http to a buffer, etc)
stdoutWrite := common.NewClampWriter(stdoutWritePipe, s.cfg.MaxResponseSize, models.ErrFunctionResponseTooBig)
swapBack := s.container.swap(stdinRead, stdoutWrite, call.stderr, &call.Stats)
defer swapBack() // NOTE: it's important this runs before the pipes are closed.
// TODO this should get killed completely
// TODO we could alternatively dial in and use the conn as stdin/stdout for an interim solution
// XXX(reed): ^^^ do we need that for the cloud event dance ????
proto := protocol.New(protocol.Protocol(call.Format), stdinWrite, stdoutRead)
ci := protocol.NewCallInfo(call.IsCloudEvent, call.Call, call.req)
errApp <- proto.Dispatch(ctx, ci, call.w)
}()
return errApp
}
func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch chan Slot) {
ctx, span := trace.StartSpan(ctx, "agent_prep_cold")
defer span.End()
call.containerState.UpdateState(ctx, ContainerStateStart, call.slots)
deadline := time.Now().Add(time.Duration(call.Timeout) * time.Second)
// add Fn-specific information to the config to shove everything into env vars for cold
call.Config["FN_DEADLINE"] = common.DateTime(deadline).String()
call.Config["FN_METHOD"] = call.Model().Method
call.Config["FN_REQUEST_URL"] = call.Model().URL
call.Config["FN_CALL_ID"] = call.Model().ID
// User headers are prefixed with FN_HEADER and shoved in the env vars too
for k, v := range call.Headers {
k = "FN_HEADER_" + k
call.Config[k] = strings.Join(v, ", ")
}
container := &container{
id: id.New().String(), // XXX we could just let docker generate ids...
image: call.Image,
env: map[string]string(call.Config),
memory: call.Memory,
cpus: uint64(call.CPUs),
fsSize: a.cfg.MaxFsSize,
timeout: time.Duration(call.Timeout) * time.Second, // this is unnecessary, but in case removal fails...
logCfg: drivers.LoggerConfig{
URL: strings.TrimSpace(call.SyslogURL),
Tags: []drivers.LoggerTag{
{Name: "app_name", Value: call.AppName},
{Name: "func_name", Value: call.Path},
},
},
stdin: call.req.Body,
stdout: common.NewClampWriter(call.w, a.cfg.MaxResponseSize, models.ErrFunctionResponseTooBig),
stderr: call.stderr,
stats: &call.Stats,
}
cookie, err := a.driver.CreateCookie(ctx, container)
if err == nil {
// pull & create container before we return a slot, so as to be friendly
// about timing out if this takes a while...
err = a.driver.PrepareCookie(ctx, cookie)
}
call.containerState.UpdateState(ctx, ContainerStateIdle, call.slots)
slot := &coldSlot{cookie, tok, err}
select {
case ch <- slot:
case <-ctx.Done():
slot.Close(ctx)
}
}
func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state ContainerState) {
// IMPORTANT: get a context that has a child span / logger but NO timeout
// TODO this is a 'FollowsFrom'
ctx = common.BackgroundContext(ctx)
ctx, span := trace.StartSpan(ctx, "agent_run_hot")
defer span.End()
defer tok.Close() // IMPORTANT: this MUST get called
state.UpdateState(ctx, ContainerStateStart, call.slots)
defer state.UpdateState(ctx, ContainerStateDone, call.slots)
container, err := newHotContainer(ctx, call, &a.cfg)
if err != nil {
call.slots.queueSlot(&hotSlot{done: make(chan struct{}), fatalErr: err})
return
}
defer container.Close()
// NOTE: soon this isn't assigned in a branch...
var udsClient http.Client
udsAwait := make(chan error)
if call.Format == models.FormatHTTPStream {
// start our listener before starting the container, so we don't miss the pretty things whispered in our ears
// XXX(reed): figure out cleaner way to carry around the directory and expose the lsnr.sock file
go inotifyUDS(ctx, container.UDSPath(), udsAwait)
udsClient = http.Client{
Transport: &http.Transport{
// XXX(reed): other settings ?
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
var d net.Dialer
return d.DialContext(ctx, "unix", container.UDSPath()+"/lsnr.sock") // XXX(reed): hardcoded lsnr.sock
},
},
}
} else {
close(udsAwait) // XXX(reed): short case first / kill this
}
logger := logrus.WithFields(logrus.Fields{"id": container.id, "app_id": call.AppID, "route": call.Path, "image": call.Image, "memory": call.Memory, "cpus": call.CPUs, "format": call.Format, "idle_timeout": call.IdleTimeout})
ctx = common.WithLogger(ctx, logger)
cookie, err := a.driver.CreateCookie(ctx, container)
if err != nil {
call.slots.queueSlot(&hotSlot{done: make(chan struct{}), fatalErr: err})
return
}
defer cookie.Close(ctx)
err = a.driver.PrepareCookie(ctx, cookie)
if err != nil {
call.slots.queueSlot(&hotSlot{done: make(chan struct{}), fatalErr: err})
return
}
waiter, err := cookie.Run(ctx)
if err != nil {
call.slots.queueSlot(&hotSlot{done: make(chan struct{}), fatalErr: err})
return
}
// now we wait for the socket to be created before handing out any slots
select {
case err := <-udsAwait: // XXX(reed): need to leave a note about pairing ctx here?
// sends a nil error if all is good, we can proceed...
if err != nil {
call.slots.queueSlot(&hotSlot{done: make(chan struct{}), fatalErr: err})
return
}
case <-ctx.Done():
call.slots.queueSlot(&hotSlot{done: make(chan struct{}), fatalErr: ctx.Err()})
return
}
// container is running
state.UpdateState(ctx, ContainerStateIdle, call.slots)
// buffered, in case someone has slot when waiter returns but isn't yet listening
errC := make(chan error, 1)
ctx, shutdownContainer := context.WithCancel(ctx)
defer shutdownContainer() // close this if our waiter returns, to call off slots
go func() {
defer shutdownContainer() // also close if we get an agent shutdown / idle timeout
for {
select { // make sure everything is up before trying to send slot
case <-ctx.Done(): // container shutdown
return
case <-a.shutWg.Closer(): // server shutdown
return
default: // ok
}
slot := &hotSlot{
done: make(chan struct{}),
errC: errC,
container: container,
cfg: &a.cfg,
udsClient: udsClient,
containerSpan: trace.FromContext(ctx).SpanContext(),
}
if !a.runHotReq(ctx, call, state, logger, cookie, slot) {
return
}
// wait for this call to finish
// NOTE do NOT select with shutdown / other channels. slot handles this.
<-slot.done
if slot.fatalErr != nil {
logger.WithError(slot.fatalErr).Info("hot function terminating")
return
}
}
}()
res := waiter.Wait(ctx)
if res.Error() != nil {
errC <- res.Error() // TODO: race condition, no guaranteed delivery fix this...
}
logger.WithError(res.Error()).Info("hot function terminated")
}
func createIOFS(cfg *Config) (string, error) {
// XXX(reed): need to ensure these are cleaned up if any of these ops in here fail...
dir := cfg.IOFSPath
if dir == "" {
// /tmp should be a memory backed filesystem, where we can get user perms
// on the socket file (fdks must give write permissions to users on sock).
// /var/run is root only, hence this...
dir = "/tmp"
}
// create a tmpdir
iofsDir, err := ioutil.TempDir(dir, "iofs")
if err != nil {
return "", fmt.Errorf("cannot create tmpdir for iofs: %v", err)
}
opts := cfg.IOFSOpts
if opts == "" {
// opts = "size=1k,nr_inodes=8,mode=0777"
}
// under tmpdir, create tmpfs
// TODO uh, yea, idk
//if cfg.IOFSPath != "" {
//err = syscall.Mount("tmpfs", iofsDir, "tmpfs", uintptr( [>syscall.MS_NOEXEC|syscall.MS_NOSUID|syscall.MS_NODEV<] 0), opts)
//if err != nil {
//return "", fmt.Errorf("cannot mount/create tmpfs=%s", iofsDir)
//}
//}
return iofsDir, nil
}
func inotifyUDS(ctx context.Context, iofsDir string, awaitUDS chan<- error) {
// XXX(reed): I forgot how to plumb channels temporarily forgive me for this sin (inotify will timeout, this is just bad programming)
err := inotifyAwait(ctx, iofsDir)
select {
case awaitUDS <- err:
case <-ctx.Done():
}
}
func inotifyAwait(ctx context.Context, iofsDir string) error {
ctx, span := trace.StartSpan(ctx, "inotify_await")
defer span.End()
fsWatcher, err := fsnotify.NewWatcher()
if err != nil {
return fmt.Errorf("error getting fsnotify watcher: %v", err)
}
defer fsWatcher.Close()
err = fsWatcher.Add(iofsDir)
if err != nil {
return fmt.Errorf("error adding iofs dir to fswatcher: %v", err)
}
for {
select {
case <-ctx.Done():
// XXX(reed): damn it would sure be nice to tell users they didn't make a uds and that's why it timed out
return ctx.Err()
case err := <-fsWatcher.Errors:
return fmt.Errorf("error watching for iofs: %v", err)
case event := <-fsWatcher.Events:
common.Logger(ctx).WithField("event", event).Debug("fsnotify event")
if event.Op&fsnotify.Create == fsnotify.Create && event.Name == iofsDir+"/lsnr.sock" {
// XXX(reed): hardcoded /lsnr.sock path
// wait until the socket file is created by the container
return nil
}
}
}
}
// runHotReq enqueues a free slot to slot queue manager and watches various timers and the consumer until
// the slot is consumed. A return value of false means, the container should shutdown and no subsequent
// calls should be made to this function.
func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState, logger logrus.FieldLogger, cookie drivers.Cookie, slot *hotSlot) bool {
var err error
isFrozen := false
isEvictable := false
freezeTimer := time.NewTimer(a.cfg.FreezeIdle)
idleTimer := time.NewTimer(time.Duration(call.IdleTimeout) * time.Second)
ejectTimer := time.NewTimer(a.cfg.EjectIdle)
defer freezeTimer.Stop()
defer idleTimer.Stop()
defer ejectTimer.Stop()
// log if any error is encountered
defer func() {
if err != nil {
logger.WithError(err).Error("hot function failure")
}
}()
// if an immediate freeze is requested, freeze first before enqueuing at all.
if a.cfg.FreezeIdle == time.Duration(0) && !isFrozen {
err = cookie.Freeze(ctx)
if err != nil {
return false
}
isFrozen = true
state.UpdateState(ctx, ContainerStatePaused, call.slots)
}
evictor := a.evictor.GetEvictor(call.ID, call.slotHashId, call.Memory+uint64(call.TmpFsSize), uint64(call.CPUs))
if !isFrozen {
state.UpdateState(ctx, ContainerStateIdle, call.slots)
}
s := call.slots.queueSlot(slot)
isEvictEvent := false
for {
select {
case <-s.trigger: // slot already consumed
case <-ctx.Done(): // container shutdown
case <-a.shutWg.Closer(): // server shutdown
case <-idleTimer.C:
case <-freezeTimer.C:
if !isFrozen {
err = cookie.Freeze(ctx)
if err != nil {
return false
}
isFrozen = true
state.UpdateState(ctx, ContainerStatePaused, call.slots)
}
continue
case <-evictor.C:
logger.Debug("attempting hot function eject")
isEvictEvent = true
case <-ejectTimer.C:
// we've been idle too long, now we are ejectable
a.evictor.RegisterEvictor(evictor)
isEvictable = true
continue
}
break
}
if isEvictable {
a.evictor.UnregisterEvictor(evictor)
}
// if we can acquire token, that means we are here due to
// abort/shutdown/timeout, attempt to acquire and terminate,
// otherwise continue processing the request
if call.slots.acquireSlot(s) {
slot.Close(ctx)
if isEvictEvent {
statsContainerEvicted(ctx)
}
return false
}
// In case, timer/acquireSlot failure landed us here, make
// sure to unfreeze.
if isFrozen {
err = cookie.Unfreeze(ctx)
if err != nil {
return false
}
isFrozen = false
}
state.UpdateState(ctx, ContainerStateBusy, call.slots)
return true
}
// container implements drivers.ContainerTask container is the execution of a
// single container, which may run multiple functions [consecutively]. the id
// and stderr can be swapped out by new calls in the container. input and
// output must be copied in and out.
type container struct {
id string // contrived
image string
env map[string]string
extensions map[string]string
memory uint64
cpus uint64
fsSize uint64
tmpFsSize uint64
iofs string
timeout time.Duration // cold only (superfluous, but in case)
logCfg drivers.LoggerConfig
close func()
stdin io.Reader
stdout io.Writer
stderr io.Writer
// swapMu protects the stats swapping
swapMu sync.Mutex
stats *drivers.Stats
}
//newHotContainer creates a container that can be used for multiple sequential events
func newHotContainer(ctx context.Context, call *call, cfg *Config) (*container, error) {
// if freezer is enabled, be consistent with freezer behavior and
// block stdout and stderr between calls.
isBlockIdleIO := MaxMsDisabled != cfg.FreezeIdle
id := id.New().String()
stdin := common.NewGhostReader()
stderr := common.NewGhostWriter()
stdout := common.NewGhostWriter()
// for use if no freezer (or we ever make up our minds)
var bufs []*bytes.Buffer
// when not processing a request, do we block IO?
if !isBlockIdleIO {
// IMPORTANT: we are not operating on a TTY allocated container. This means, stderr and stdout are multiplexed
// from the same stream internally via docker using a multiplexing protocol. Therefore, stderr/stdout *BOTH*
// have to be read or *BOTH* blocked consistently. In other words, we cannot block one and continue
// reading from the other one without risking head-of-line blocking.
// wrap the syslog and debug loggers in the same (respective) line writer
// syslog complete chain for this (from top):
// stderr -> line writer
// TODO(reed): I guess this is worth it
// TODO(reed): there's a bug here where the between writers could have
// bytes in there, get swapped for real stdout/stderr, come back and write
// bytes in and the bytes are [really] stale. I played with fixing this
// and mostly came to the conclusion that life is meaningless.
buf1 := bufPool.Get().(*bytes.Buffer)
buf2 := bufPool.Get().(*bytes.Buffer)
bufs = []*bytes.Buffer{buf1, buf2}
soc := &nopCloser{&logWriter{
logrus.WithFields(logrus.Fields{"tag": "stdout", "app_id": call.AppID, "path": call.Path, "image": call.Image, "container_id": id}),
}}
sec := &nopCloser{&logWriter{
logrus.WithFields(logrus.Fields{"tag": "stderr", "app_id": call.AppID, "path": call.Path, "image": call.Image, "container_id": id}),
}}
stdout.Swap(newLineWriterWithBuffer(buf1, soc))
stderr.Swap(newLineWriterWithBuffer(buf2, sec))
}
var iofs string
var err error
closer := func() {} // XXX(reed):
if call.Format == models.FormatHTTPStream {
// XXX(reed): we should also point stdout to stderr, and not have stdin
iofs, err = createIOFS(cfg)
if err != nil {
return nil, err
}
// XXX(reed): futz with this, we have to make sure shit gets cleaned up properly
closer = func() {
//err := syscall.Unmount(iofs, 0)
//if err != nil {
//common.Logger(ctx).WithError(err).Error("error unmounting iofs")
//}
err = os.RemoveAll(iofs)
if err != nil {
common.Logger(ctx).WithError(err).Error("error removing iofs")
}
}
}
return &container{
id: id, // XXX we could just let docker generate ids...
image: call.Image,
env: map[string]string(call.Config),
extensions: call.extensions,
memory: call.Memory,
cpus: uint64(call.CPUs),
fsSize: cfg.MaxFsSize,
tmpFsSize: uint64(call.TmpFsSize),
iofs: iofs,
logCfg: drivers.LoggerConfig{
URL: strings.TrimSpace(call.SyslogURL),
Tags: []drivers.LoggerTag{
{Name: "app_name", Value: call.AppName},
{Name: "func_name", Value: call.Path},
},
},
stdin: stdin,
stdout: stdout,
stderr: stderr,
close: func() {
stdin.Close()
stderr.Close()
stdout.Close()
for _, b := range bufs {
bufPool.Put(b)
}
closer() // XXX(reed): clean up
},
}, nil
}
func (c *container) swap(stdin io.Reader, stdout, stderr io.Writer, cs *drivers.Stats) func() {
// if tests don't catch this, then fuck me
ostdin := c.stdin.(common.GhostReader).Swap(stdin)
ostdout := c.stdout.(common.GhostWriter).Swap(stdout)
ostderr := c.stderr.(common.GhostWriter).Swap(stderr)
c.swapMu.Lock()
ocs := c.stats
c.stats = cs
c.swapMu.Unlock()
return func() {
c.stdin.(common.GhostReader).Swap(ostdin)
c.stdout.(common.GhostWriter).Swap(ostdout)
c.stderr.(common.GhostWriter).Swap(ostderr)
c.swapMu.Lock()
c.stats = ocs
c.swapMu.Unlock()
}
}
func (c *container) Id() string { return c.id }
func (c *container) Command() string { return "" }
func (c *container) Input() io.Reader { return c.stdin }
func (c *container) Logger() (io.Writer, io.Writer) { return c.stdout, c.stderr }
func (c *container) Volumes() [][2]string { return nil }
func (c *container) WorkDir() string { return "" }
func (c *container) Close() { c.close() }
func (c *container) Image() string { return c.image }
func (c *container) Timeout() time.Duration { return c.timeout }
func (c *container) EnvVars() map[string]string { return c.env }
func (c *container) Memory() uint64 { return c.memory * 1024 * 1024 } // convert MB
func (c *container) CPUs() uint64 { return c.cpus }
func (c *container) FsSize() uint64 { return c.fsSize }
func (c *container) TmpFsSize() uint64 { return c.tmpFsSize }
func (c *container) Extensions() map[string]string { return c.extensions }
func (c *container) LoggerConfig() drivers.LoggerConfig { return c.logCfg }
func (c *container) UDSPath() string { return c.iofs }
// WriteStat publishes each metric in the specified Stats structure as a histogram metric
func (c *container) WriteStat(ctx context.Context, stat drivers.Stat) {
for key, value := range stat.Metrics {
if m, ok := dockerMeasures[key]; ok {
stats.Record(ctx, m.M(int64(value)))
}
}
c.swapMu.Lock()
if c.stats != nil {
*(c.stats) = append(*(c.stats), stat)
}
c.swapMu.Unlock()
}
//func (c *container) DockerAuth() (docker.AuthConfiguration, error) {
// Implementing the docker.AuthConfiguration interface.
// TODO per call could implement this stored somewhere (vs. configured on host)
//}