death to format (#1281)

* get rid of old format stuff, utils usage, fix up for fdk2.0 interface

* pure agent format removal, TODO remove format field, fix up all tests

* shitter's clogged

* fix agent tests

* start rolling through server tests

* tests compile, some failures

* remove json / content type detection on invoke/httptrigger, fix up tests

* remove hello, fixup system tests

the fucking status checker test just hangs and it's testing that it doesn't
work so the test passes but the test doesn't pass fuck life it's not worth it

* fix migration

* meh

* make dbhelper shut up about dbhelpers not being used

* move fail status at least into main thread, jfc

* fix status call to have FN_LISTENER

also turns off the stdout/stderr blocking between calls, because it's
impossible to debug without that (without syslog), now that stdout and stderr
go to the same place (either to host stderr or nowhere) and isn't used for
function output this shouldn't be a big fuss really

* remove stdin

* cleanup/remind: fixed bug where watcher would leak if container dies first

* silence system-test logs until fail, fix datastore tests

postgres does weird things with constraints when renaming tables, took the
easy way out

system-tests were loud as fuck and made you download a circleci text file of
the logs, made them only yell when they goof

* fix fdk-go dep for test image. fun

* fix swagger and remove test about format

* update all the gopkg files

* add back FN_FORMAT for fdks that assert things. pfft

* add useful error for functions that exit

this error is really confounding because containers can exit for all manner of
reason, we're just guessing that this is the most likely cause for now, and
this error message should very likely change or be removed from the client
path anyway (context.Canceled wasn't all that useful either, but anyway, I'd
been hunting for this... so found it). added a test to avoid being publicly
shamed for 1 line commits (beware...).
This commit is contained in:
Reed Allman
2018-10-26 10:43:04 -07:00
committed by GitHub
parent 7fd61054b0
commit e13a6fd029
54 changed files with 875 additions and 2706 deletions

View File

@@ -15,7 +15,6 @@ import (
"path/filepath"
"github.com/fnproject/fn/api/agent/drivers"
"github.com/fnproject/fn/api/agent/protocol"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/id"
"github.com/fnproject/fn/api/models"
@@ -37,8 +36,6 @@ import (
// to be much more robust. now we're at least running it if we delete the msg,
// but we may never store info about that execution so still broked (if fn
// dies). need coordination w/ db.
// TODO if a cold call times out but container is created but hasn't replied, could
// end up that the client doesn't get a reply until long after the timeout (b/c of container removal, async it?)
// TODO if async would store requests (or interchange format) it would be slick, but
// if we're going to store full calls in db maybe we should only queue pointers to ids?
// TODO examine cases where hot can't start a container and the user would never see an error
@@ -53,7 +50,7 @@ import (
// those calls, it also exposes a 'safe' shutdown mechanism via its Close method.
// Agent has a few roles:
// * manage the memory pool for a given server
// * manage the container lifecycle for calls (hot+cold)
// * manage the container lifecycle for calls
// * execute calls against containers
// * invoke Start and End for each call appropriately
// * check the mq for any async calls, and submit them
@@ -61,19 +58,16 @@ import (
// Overview:
// Upon submission of a call, Agent will start the call's timeout timer
// immediately. If the call is hot, Agent will attempt to find an active hot
// container for that route, and if necessary launch another container. Cold
// calls will launch one container each. Cold calls will get container input
// and output directly, whereas hot calls will be able to read/write directly
// from/to a pipe in a container via Dispatch. If it's necessary to launch a
// container, first an attempt will be made to try to reserve the ram required
// while waiting for any hot 'slot' to become available [if applicable]. If
// there is an error launching the container, an error will be returned
// provided the call has not yet timed out or found another hot 'slot' to
// execute in [if applicable]. call.Start will be called immediately before
// starting a container, if cold (i.e. after pulling), or immediately before
// sending any input, if hot. call.End will be called regardless of the
// timeout timer's status if the call was executed, and that error returned may
// be returned from Submit.
// container for that route, and if necessary launch another container. calls
// will be able to read/write directly from/to a socket in the container. If
// it's necessary to launch a container, first an attempt will be made to try
// to reserve the ram required while waiting for any hot 'slot' to become
// available [if applicable]. If there is an error launching the container, an
// error will be returned provided the call has not yet timed out or found
// another hot 'slot' to execute in [if applicable]. call.Start will be called
// immediately before sending any input to a container. call.End will be called
// regardless of the timeout timer's status if the call was executed, and that
// error returned may be returned from Submit.
type Agent interface {
// GetCall will return a Call that is executable by the Agent, which
// can be built via various CallOpt's provided to the method.
@@ -266,23 +260,11 @@ func (a *agent) Submit(callI Call) error {
}
func (a *agent) startStateTrackers(ctx context.Context, call *call) {
if !protocol.IsStreamable(protocol.Protocol(call.Format)) {
// For cold containers, we track the container state in call
call.containerState = NewContainerState()
}
call.requestState = NewRequestState()
}
func (a *agent) endStateTrackers(ctx context.Context, call *call) {
call.requestState.UpdateState(ctx, RequestStateDone, call.slots)
// For cold containers, we are done with the container.
if call.containerState != nil {
call.containerState.UpdateState(ctx, ContainerStateDone, call.slots)
}
}
func (a *agent) submit(ctx context.Context, call *call) error {
@@ -345,9 +327,9 @@ func (a *agent) handleCallEnd(ctx context.Context, call *call, slot Slot, err er
return err
}
// getSlot returns a Slot (or error) for the request to run. Depending on hot/cold
// request type, this may launch a new container or wait for other containers to become idle
// or it may wait for resources to become available to launch a new container.
// getSlot returns a Slot (or error) for the request to run. This will wait
// for other containers to become idle or it may wait for resources to become
// available to launch a new container.
func (a *agent) getSlot(ctx context.Context, call *call) (Slot, error) {
if call.Type == models.TypeAsync {
// *) for async, slot deadline is also call.Timeout. This is because we would like to
@@ -364,25 +346,20 @@ func (a *agent) getSlot(ctx context.Context, call *call) (Slot, error) {
ctx, span := trace.StartSpan(ctx, "agent_get_slot")
defer span.End()
if protocol.IsStreamable(protocol.Protocol(call.Format)) {
// For hot requests, we use a long lived slot queue, which we use to manage hot containers
var isNew bool
// For hot requests, we use a long lived slot queue, which we use to manage hot containers
var isNew bool
if call.slotHashId == "" {
call.slotHashId = getSlotQueueKey(call)
}
call.slots, isNew = a.slotMgr.getSlotQueue(call.slotHashId)
call.requestState.UpdateState(ctx, RequestStateWait, call.slots)
if isNew {
go a.hotLauncher(ctx, call)
}
s, err := a.waitHot(ctx, call)
return s, err
if call.slotHashId == "" {
call.slotHashId = getSlotQueueKey(call)
}
call.slots, isNew = a.slotMgr.getSlotQueue(call.slotHashId)
call.requestState.UpdateState(ctx, RequestStateWait, call.slots)
return a.launchCold(ctx, call)
if isNew {
go a.hotLauncher(ctx, call)
}
s, err := a.waitHot(ctx, call)
return s, err
}
// hotLauncher is spawned in a go routine for each slot queue to monitor stats and launch hot
@@ -575,86 +552,6 @@ func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) {
}
}
// launchCold waits for necessary resources to launch a new container, then
// returns the slot for that new container to run the request on.
func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) {
isNB := a.cfg.EnableNBResourceTracker
ch := make(chan Slot)
ctx, span := trace.StartSpan(ctx, "agent_launch_cold")
defer span.End()
call.containerState.UpdateState(ctx, ContainerStateWait, call.slots)
mem := call.Memory + uint64(call.TmpFsSize)
select {
case tok := <-a.resources.GetResourceToken(ctx, mem, call.CPUs, isNB):
if tok.Error() != nil {
return nil, tok.Error()
}
go a.prepCold(ctx, call, tok, ch)
case <-ctx.Done():
return nil, ctx.Err()
}
// wait for launch err or a slot to open up
select {
case s := <-ch:
if s.Error() != nil {
s.Close()
return nil, s.Error()
}
return s, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
// implements Slot
type coldSlot struct {
cookie drivers.Cookie
tok ResourceToken
closer func()
fatalErr error
}
func (s *coldSlot) Error() error {
return s.fatalErr
}
func (s *coldSlot) exec(ctx context.Context, call *call) error {
ctx, span := trace.StartSpan(ctx, "agent_cold_exec")
defer span.End()
call.requestState.UpdateState(ctx, RequestStateExec, call.slots)
call.containerState.UpdateState(ctx, ContainerStateBusy, call.slots)
waiter, err := s.cookie.Run(ctx)
if err != nil {
return err
}
res := waiter.Wait(ctx)
if res.Error() != nil {
// check for call error (oom/exit) and beam it up
return res.Error()
}
// nil or timed out
return ctx.Err()
}
func (s *coldSlot) Close() error {
if s.closer != nil {
s.closer()
s.closer = nil
}
return nil
}
// implements Slot
type hotSlot struct {
done chan struct{} // signal we are done with slot
@@ -699,12 +596,7 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error {
call.req = call.req.WithContext(ctx) // TODO this is funny biz reed is bad
var errApp chan error
if call.Format == models.FormatHTTPStream {
errApp = s.dispatch(ctx, call)
} else { // TODO remove this block one glorious day
errApp = s.dispatchOldFormats(ctx, call)
}
errApp := s.dispatch(ctx, call)
select {
case err := <-s.errC: // error from container
@@ -714,10 +606,6 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error {
if err != nil {
if models.IsAPIError(err) {
s.trySetError(err)
} else if err == protocol.ErrExcessData {
s.trySetError(err)
// suppress excess data error, but do shutdown the container
return nil
}
}
return err
@@ -756,15 +644,11 @@ func callToHTTPRequest(ctx context.Context, call *call) *http.Request {
}
}
//req.Header.Set("FN_DEADLINE", ci.Deadline().String())
// TODO(occ) : fix compatidupes when FDKs are updated
req.Header.Set("Fn-Call-Id", call.ID)
req.Header.Set("FN_CALL_ID", call.ID)
deadline, ok := ctx.Deadline()
if ok {
deadlineStr := deadline.Format(time.RFC3339)
req.Header.Set("Fn-Deadline", deadlineStr)
req.Header.Set("FN_DEADLINE", deadlineStr)
}
return req
@@ -781,7 +665,7 @@ func (s *hotSlot) dispatch(ctx context.Context, call *call) chan error {
// TODO it's possible we can get rid of this (after getting rid of logs API) - may need for call id/debug mode still
// TODO there's a timeout race for swapping this back if the container doesn't get killed for timing out, and don't you forget it
swapBack := s.container.swap(nil, call.stderr, call.stderr, &call.Stats)
swapBack := s.container.swap(call.stderr, call.stderr, &call.Stats)
defer swapBack()
req := callToHTTPRequest(ctx, call)
@@ -811,11 +695,10 @@ func (s *hotSlot) dispatch(ctx context.Context, call *call) chan error {
return errApp
}
// XXX(reed): dupe code in http proto (which will die...)
func writeResp(max uint64, resp *http.Response, w io.Writer) error {
rw, ok := w.(http.ResponseWriter)
if !ok {
w = common.NewClampWriter(rw, max, models.ErrFunctionResponseTooBig)
w = common.NewClampWriter(w, max, models.ErrFunctionResponseTooBig)
return resp.Write(w)
}
@@ -855,110 +738,6 @@ func newSizerRespWriter(max uint64, rw http.ResponseWriter) http.ResponseWriter
func (s *sizerRespWriter) Write(b []byte) (int, error) { return s.w.Write(b) }
// TODO remove
func (s *hotSlot) dispatchOldFormats(ctx context.Context, call *call) chan error {
errApp := make(chan error, 1)
go func() {
// XXX(reed): this may be liable to leave the pipes fucked up if dispatch times out, eg
// we may need ye ole close() func to put the Close()/swapBack() in from the caller
// swap in fresh pipes & stat accumulator to not interlace with other calls that used this slot [and timed out]
stdinRead, stdinWrite := io.Pipe()
stdoutRead, stdoutWritePipe := io.Pipe()
defer stdinRead.Close()
defer stdoutWritePipe.Close()
// NOTE: stderr is limited separately (though line writer is vulnerable to attack?)
// limit the bytes allowed to be written to the stdout pipe, which handles any
// buffering overflows (json to a string, http to a buffer, etc)
stdoutWrite := common.NewClampWriter(stdoutWritePipe, s.cfg.MaxResponseSize, models.ErrFunctionResponseTooBig)
swapBack := s.container.swap(stdinRead, stdoutWrite, call.stderr, &call.Stats)
defer swapBack() // NOTE: it's important this runs before the pipes are closed.
// TODO this should get killed completely
// TODO we could alternatively dial in and use the conn as stdin/stdout for an interim solution
// XXX(reed): ^^^ do we need that for the cloud event dance ????
proto := protocol.New(protocol.Protocol(call.Format), stdinWrite, stdoutRead)
ci := protocol.NewCallInfo(call.IsCloudEvent, call.Call, call.req)
errApp <- proto.Dispatch(ctx, ci, call.w)
}()
return errApp
}
func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch chan Slot) {
ctx, span := trace.StartSpan(ctx, "agent_prep_cold")
defer span.End()
statsUtilization(ctx, a.resources.GetUtilization())
call.containerState.UpdateState(ctx, ContainerStateStart, call.slots)
deadline := time.Now().Add(time.Duration(call.Timeout) * time.Second)
// add Fn-specific information to the config to shove everything into env vars for cold
call.Config["FN_DEADLINE"] = common.DateTime(deadline).String()
call.Config["FN_METHOD"] = call.Model().Method
call.Config["FN_REQUEST_URL"] = call.Model().URL
call.Config["FN_CALL_ID"] = call.Model().ID
// User headers are prefixed with FN_HEADER and shoved in the env vars too
for k, v := range call.Headers {
k = "FN_HEADER_" + k
call.Config[k] = strings.Join(v, ", ")
}
container := &container{
id: id.New().String(), // XXX we could just let docker generate ids...
image: call.Image,
env: map[string]string(call.Config),
extensions: call.extensions,
memory: call.Memory,
cpus: uint64(call.CPUs),
fsSize: a.cfg.MaxFsSize,
iofs: &noopIOFS{},
timeout: time.Duration(call.Timeout) * time.Second, // this is unnecessary, but in case removal fails...
logCfg: drivers.LoggerConfig{
URL: strings.TrimSpace(call.SyslogURL),
Tags: []drivers.LoggerTag{
{Name: "app_id", Value: call.AppID},
{Name: "fn_id", Value: call.FnID},
},
},
stdin: call.req.Body,
stdout: common.NewClampWriter(call.w, a.cfg.MaxResponseSize, models.ErrFunctionResponseTooBig),
stderr: call.stderr,
stats: &call.Stats,
}
cookie, err := a.driver.CreateCookie(ctx, container)
if err == nil {
// pull & create container before we return a slot, so as to be friendly
// about timing out if this takes a while...
err = a.driver.PrepareCookie(ctx, cookie)
}
call.containerState.UpdateState(ctx, ContainerStateIdle, call.slots)
closer := func() {
if cookie != nil {
cookie.Close(ctx)
}
if tok != nil {
tok.Close()
}
statsUtilization(ctx, a.resources.GetUtilization())
}
slot := &coldSlot{cookie: cookie, tok: tok, closer: closer, fatalErr: err}
select {
case ch <- slot:
case <-ctx.Done():
slot.Close()
}
}
func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state ContainerState) {
// IMPORTANT: get a context that has a child span / logger but NO timeout
// TODO this is a 'FollowsFrom'
@@ -989,30 +768,9 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
}
defer container.Close()
// NOTE: soon this isn't assigned in a branch...
var udsClient http.Client
udsAwait := make(chan error)
if call.Format == models.FormatHTTPStream {
// start our listener before starting the container, so we don't miss the pretty things whispered in our ears
go inotifyUDS(ctx, container.UDSAgentPath(), udsAwait)
udsClient = http.Client{
Transport: &http.Transport{
MaxIdleConns: 1,
MaxIdleConnsPerHost: 1,
// XXX(reed): other settings ?
IdleConnTimeout: 1 * time.Second,
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
var d net.Dialer
return d.DialContext(ctx, "unix", filepath.Join(container.UDSAgentPath(), udsFilename))
},
},
}
} else {
close(udsAwait) // XXX(reed): short case first / kill this
}
logger := logrus.WithFields(logrus.Fields{"id": container.id, "app_id": call.AppID, "fn_id": call.FnID, "image": call.Image, "memory": call.Memory, "cpus": call.CPUs, "format": call.Format, "idle_timeout": call.IdleTimeout})
// XXX(reed): we need to timeout the cookie create / prepare since docker client doesn't have timeout anymore,
// and handle cookie close having a timed out context it still needs to delete the thing. fun stuff
logger := logrus.WithFields(logrus.Fields{"id": container.id, "app_id": call.AppID, "fn_id": call.FnID, "image": call.Image, "memory": call.Memory, "cpus": call.CPUs, "idle_timeout": call.IdleTimeout})
ctx = common.WithLogger(ctx, logger)
cookie, err := a.driver.CreateCookie(ctx, container)
@@ -1029,6 +787,27 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
return
}
ctx, shutdownContainer := context.WithCancel(ctx)
defer shutdownContainer() // close this if our waiter returns, to call off slots, needs to follow cookie.Close so the cookie crumbles
udsAwait := make(chan error)
// start our listener before starting the container, so we don't miss the pretty things whispered in our ears
// make sure this thread has the shutdownContainer context in case the container exits
go inotifyUDS(ctx, container.UDSAgentPath(), udsAwait)
udsClient := http.Client{
Transport: &http.Transport{
MaxIdleConns: 1,
MaxIdleConnsPerHost: 1,
// XXX(reed): other settings ?
IdleConnTimeout: 1 * time.Second,
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
var d net.Dialer
return d.DialContext(ctx, "unix", filepath.Join(container.UDSAgentPath(), udsFilename))
},
},
}
waiter, err := cookie.Run(ctx)
if err != nil {
call.slots.queueSlot(&hotSlot{done: make(chan struct{}), fatalErr: err})
@@ -1038,8 +817,6 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
// buffered, in case someone has slot when waiter returns but isn't yet listening
errC := make(chan error, 1)
ctx, shutdownContainer := context.WithCancel(ctx)
defer shutdownContainer() // close this if our waiter returns, to call off slots
go func() {
defer shutdownContainer() // also close if we get an agent shutdown / idle timeout
@@ -1054,7 +831,10 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
}
case <-ctx.Done():
call.slots.queueSlot(&hotSlot{done: make(chan struct{}), fatalErr: ctx.Err()})
// XXX(reed): this seems like a bad idea? why are we even handing out a
// bad slot? shouldn't we make the client wait for a valid one and maybe
// timeout? not in this PR, NOT TONIGHT!
call.slots.queueSlot(&hotSlot{done: make(chan struct{}), fatalErr: models.ErrContainerExitedEarly})
return
}
@@ -1090,8 +870,16 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
}()
res := waiter.Wait(ctx)
if res.Error() != nil {
errC <- res.Error() // TODO: race condition, no guaranteed delivery fix this...
if err := res.Error(); err != nil {
if err == context.Canceled {
// TODO consider removal after format biz mellows out (t-minus 30 days)
// we can turn this into a user visible error (for now) if the container
// exits because they are using the wrong format, say, and try to guess/
// help them out for now. context.Canceled only comes from container exit.
errC <- models.ErrContainerExitedEarly
} else {
errC <- err // TODO: race condition, no guaranteed delivery fix this... we can't really, if nobody is there it's working as intended for idle timeout?
}
}
if res.Error() != context.Canceled {
logger.WithError(res.Error()).Info("hot function terminated")
@@ -1167,7 +955,6 @@ func inotifyAwait(ctx context.Context, iofsDir string) error {
case event := <-fsWatcher.Events:
common.Logger(ctx).WithField("event", event).Debug("fsnotify event")
if event.Op&fsnotify.Create == fsnotify.Create && event.Name == filepath.Join(iofsDir, udsFilename) {
// wait until the socket file is created by the container
return nil
}
@@ -1266,11 +1053,9 @@ type container struct {
fsSize uint64
tmpFsSize uint64
iofs iofs
timeout time.Duration // cold only (superfluous, but in case)
logCfg drivers.LoggerConfig
close func()
stdin io.Reader
stdout io.Writer
stderr io.Writer
@@ -1281,65 +1066,52 @@ type container struct {
//newHotContainer creates a container that can be used for multiple sequential events
func newHotContainer(ctx context.Context, call *call, cfg *Config) (*container, error) {
// if freezer is enabled, be consistent with freezer behavior and
// block stdout and stderr between calls.
isBlockIdleIO := MaxMsDisabled != cfg.FreezeIdle
id := id.New().String()
stdin := common.NewGhostReader()
stderr := common.NewGhostWriter()
stdout := common.NewGhostWriter()
// for use if no freezer (or we ever make up our minds)
var bufs []*bytes.Buffer
// when not processing a request, do we block IO?
if !isBlockIdleIO {
// IMPORTANT: we are not operating on a TTY allocated container. This means, stderr and stdout are multiplexed
// from the same stream internally via docker using a multiplexing protocol. Therefore, stderr/stdout *BOTH*
// have to be read or *BOTH* blocked consistently. In other words, we cannot block one and continue
// reading from the other one without risking head-of-line blocking.
// IMPORTANT: we are not operating on a TTY allocated container. This means, stderr and stdout are multiplexed
// from the same stream internally via docker using a multiplexing protocol. Therefore, stderr/stdout *BOTH*
// have to be read or *BOTH* blocked consistently. In other words, we cannot block one and continue
// reading from the other one without risking head-of-line blocking.
// wrap the syslog and debug loggers in the same (respective) line writer
// syslog complete chain for this (from top):
// stderr -> line writer
// wrap the syslog and debug loggers in the same (respective) line writer
// stderr -> line writer
// TODO(reed): I guess this is worth it
// TODO(reed): there's a bug here where the between writers could have
// bytes in there, get swapped for real stdout/stderr, come back and write
// bytes in and the bytes are [really] stale. I played with fixing this
// and mostly came to the conclusion that life is meaningless.
buf1 := bufPool.Get().(*bytes.Buffer)
buf2 := bufPool.Get().(*bytes.Buffer)
bufs = []*bytes.Buffer{buf1, buf2}
// TODO(reed): there's a bug here where the between writers could have
// bytes in there, get swapped for real stdout/stderr, come back and write
// bytes in and the bytes are [really] stale. I played with fixing this
// and mostly came to the conclusion that life is meaningless.
// TODO(reed): we should let the syslog driver pick this up really but our
// default story sucks there
buf1 := bufPool.Get().(*bytes.Buffer)
buf2 := bufPool.Get().(*bytes.Buffer)
bufs = []*bytes.Buffer{buf1, buf2}
soc := &nopCloser{&logWriter{
logrus.WithFields(logrus.Fields{"tag": "stdout", "app_id": call.AppID, "fn_id": call.FnID, "image": call.Image, "container_id": id}),
}}
sec := &nopCloser{&logWriter{
logrus.WithFields(logrus.Fields{"tag": "stderr", "app_id": call.AppID, "fn_id": call.FnID, "image": call.Image, "container_id": id}),
}}
soc := &nopCloser{&logWriter{
logrus.WithFields(logrus.Fields{"tag": "stdout", "app_id": call.AppID, "fn_id": call.FnID, "image": call.Image, "container_id": id}),
}}
sec := &nopCloser{&logWriter{
logrus.WithFields(logrus.Fields{"tag": "stderr", "app_id": call.AppID, "fn_id": call.FnID, "image": call.Image, "container_id": id}),
}}
stdout.Swap(newLineWriterWithBuffer(buf1, soc))
stderr.Swap(newLineWriterWithBuffer(buf2, sec))
}
stdout.Swap(newLineWriterWithBuffer(buf1, soc))
stderr.Swap(newLineWriterWithBuffer(buf2, sec))
var iofs iofs
var err error
if call.Format == models.FormatHTTPStream {
// XXX(reed): we should also point stdout to stderr, and not have stdin
if cfg.IOFSEnableTmpfs {
iofs, err = newTmpfsIOFS(ctx, cfg)
} else {
iofs, err = newDirectoryIOFS(ctx, cfg)
}
if err != nil {
return nil, err
}
if cfg.IOFSEnableTmpfs {
iofs, err = newTmpfsIOFS(ctx, cfg)
} else {
iofs = &noopIOFS{}
iofs, err = newDirectoryIOFS(ctx, cfg)
}
if err != nil {
return nil, err
}
return &container{
@@ -1359,11 +1131,9 @@ func newHotContainer(ctx context.Context, call *call, cfg *Config) (*container,
{Name: "fn_id", Value: call.FnID},
},
},
stdin: stdin,
stdout: stdout,
stderr: stderr,
close: func() {
stdin.Close()
stderr.Close()
stdout.Close()
for _, b := range bufs {
@@ -1378,9 +1148,8 @@ func newHotContainer(ctx context.Context, call *call, cfg *Config) (*container,
}, nil
}
func (c *container) swap(stdin io.Reader, stdout, stderr io.Writer, cs *drivers.Stats) func() {
func (c *container) swap(stdout, stderr io.Writer, cs *drivers.Stats) func() {
// if tests don't catch this, then fuck me
ostdin := c.stdin.(common.GhostReader).Swap(stdin)
ostdout := c.stdout.(common.GhostWriter).Swap(stdout)
ostderr := c.stderr.(common.GhostWriter).Swap(stderr)
@@ -1390,7 +1159,6 @@ func (c *container) swap(stdin io.Reader, stdout, stderr io.Writer, cs *drivers.
c.swapMu.Unlock()
return func() {
c.stdin.(common.GhostReader).Swap(ostdin)
c.stdout.(common.GhostWriter).Swap(ostdout)
c.stderr.(common.GhostWriter).Swap(ostderr)
c.swapMu.Lock()
@@ -1401,13 +1169,13 @@ func (c *container) swap(stdin io.Reader, stdout, stderr io.Writer, cs *drivers.
func (c *container) Id() string { return c.id }
func (c *container) Command() string { return "" }
func (c *container) Input() io.Reader { return c.stdin }
func (c *container) Input() io.Reader { return nil }
func (c *container) Logger() (io.Writer, io.Writer) { return c.stdout, c.stderr }
func (c *container) Volumes() [][2]string { return nil }
func (c *container) WorkDir() string { return "" }
func (c *container) Close() { c.close() }
func (c *container) Image() string { return c.image }
func (c *container) Timeout() time.Duration { return c.timeout }
func (c *container) Timeout() time.Duration { return 0 } // context handles this
func (c *container) EnvVars() map[string]string { return c.env }
func (c *container) Memory() uint64 { return c.memory * 1024 * 1024 } // convert MB
func (c *container) CPUs() uint64 { return c.cpus }

View File

@@ -78,7 +78,6 @@ func TestCallConfigurationRequest(t *testing.T) {
const idleTimeout = 20
const memory = 256
typ := "sync"
format := "default"
cfg := models.Config{"APP_VAR": "FOO"}
rCfg := models.Config{"FN_VAR": "BAR"}
@@ -89,7 +88,6 @@ func TestCallConfigurationRequest(t *testing.T) {
AppID: app.ID,
Config: rCfg,
Image: image,
Format: format,
ResourceConfig: models.ResourceConfig{Timeout: timeout,
IdleTimeout: idleTimeout,
Memory: memory,
@@ -163,7 +161,6 @@ func TestCallConfigurationRequest(t *testing.T) {
}
expectedConfig := map[string]string{
"FN_FORMAT": format,
"FN_MEMORY": strconv.Itoa(memory),
"FN_TYPE": typ,
"APP_VAR": "FOO",
@@ -203,9 +200,7 @@ func TestCallConfigurationModel(t *testing.T) {
url := "http://127.0.0.1:8080/invoke/" + fn.ID
payload := "payload"
typ := "sync"
format := "default"
cfg := models.Config{
"FN_FORMAT": format,
"FN_MEMORY": strconv.Itoa(memory),
"FN_TYPE": typ,
"APP_VAR": "FOO",
@@ -217,7 +212,6 @@ func TestCallConfigurationModel(t *testing.T) {
Config: cfg,
Image: image,
Type: typ,
Format: format,
Timeout: timeout,
IdleTimeout: idleTimeout,
Memory: memory,
@@ -374,10 +368,10 @@ func TestSubmitError(t *testing.T) {
url := "http://127.0.0.1:8080/invoke/" + fn.ID
payload := `{"sleepTime": 0, "isDebug": true, "isCrash": true}`
typ := "sync"
format := "default"
config := map[string]string{
"FN_FORMAT": format,
"FN_APP_NAME": app.Name,
"FN_LISTENER": "unix:" + filepath.Join(iofsDockerMountDest, udsFilename),
"FN_APP_ID": app.ID,
"FN_FN_ID": fn.ID,
"FN_MEMORY": strconv.Itoa(memory),
"FN_TYPE": typ,
"APP_VAR": "FOO",
@@ -391,7 +385,6 @@ func TestSubmitError(t *testing.T) {
Config: config,
Image: image,
Type: typ,
Format: format,
Timeout: timeout,
IdleTimeout: idleTimeout,
Memory: memory,
@@ -448,9 +441,8 @@ func TestHTTPWithoutContentLengthWorks(t *testing.T) {
// response writer with sync, and also test that this works with async + log
app := &models.App{ID: "app_id"}
fn := &models.Fn{
ID: "fn_id",
Image: "fnproject/fn-test-utils",
Format: "http", // this _is_ the test
ID: "fn_id",
Image: "fnproject/fn-test-utils",
ResourceConfig: models.ResourceConfig{
Timeout: 5,
IdleTimeout: 10,
@@ -515,7 +507,6 @@ func TestGetCallReturnsResourceImpossibility(t *testing.T) {
FnID: id.New().String(),
Image: "fnproject/fn-test-utils",
Type: "sync",
Format: "http",
Timeout: 1,
IdleTimeout: 2,
Memory: math.MaxUint64,
@@ -539,10 +530,9 @@ func TestTmpFsRW(t *testing.T) {
app := &models.App{ID: "app_id"}
fn := &models.Fn{
ID: "fn_id",
AppID: app.ID,
Image: "fnproject/fn-test-utils",
Format: "http", // this _is_ the test
ID: "fn_id",
AppID: app.ID,
Image: "fnproject/fn-test-utils",
ResourceConfig: models.ResourceConfig{Timeout: 5,
IdleTimeout: 10,
Memory: 128,
@@ -629,10 +619,9 @@ func TestTmpFsSize(t *testing.T) {
app := &models.App{ID: "app_id", Name: appName}
fn := &models.Fn{
ID: "fn_id",
AppID: app.ID,
Image: "fnproject/fn-test-utils",
Format: "http", // this _is_ the test
ID: "fn_id",
AppID: app.ID,
Image: "fnproject/fn-test-utils",
ResourceConfig: models.ResourceConfig{Timeout: 5,
IdleTimeout: 10,
Memory: 64,
@@ -734,11 +723,9 @@ func testCall() *models.Call {
url := "http://127.0.0.1:8080/invoke/" + fn.ID
payload := "payload"
typ := "sync"
format := "http"
contentType := "suberb_type"
contentLength := strconv.FormatInt(int64(len(payload)), 10)
config := map[string]string{
"FN_FORMAT": format,
"FN_MEMORY": strconv.Itoa(memory),
"FN_TYPE": typ,
"APP_VAR": "FOO",
@@ -757,7 +744,6 @@ func testCall() *models.Call {
Headers: headers,
Image: image,
Type: typ,
Format: format,
Timeout: timeout,
IdleTimeout: idleTimeout,
Memory: memory,
@@ -790,16 +776,14 @@ func TestPipesAreClear(t *testing.T) {
ca := testCall()
ca.Type = "sync"
ca.Format = "http"
ca.IdleTimeout = 60 // keep this bad boy alive
ca.Timeout = 4 // short
app := &models.App{ID: ca.AppID}
fn := &models.Fn{
AppID: ca.AppID,
ID: ca.FnID,
Image: ca.Image,
Format: ca.Format,
AppID: ca.AppID,
ID: ca.FnID,
Image: ca.Image,
ResourceConfig: models.ResourceConfig{
Timeout: ca.Timeout,
IdleTimeout: ca.IdleTimeout,
@@ -925,17 +909,15 @@ func (r *delayReader) Read(b []byte) (int, error) {
return r.Reader.Read(b)
}
func TestPipesDontMakeSpuriousCalls(t *testing.T) {
// if we swap out the pipes between tasks really fast, we need to ensure that
// there are no spurious reads on the container's input that give us a bad
// task output (i.e. 2nd task should succeed). if this test is fussing up,
// make sure input swapping out is not racing, it is very likely not the test
// that is finicky since this is a totally normal happy path (run 2 hot tasks
// in the same container in a row).
func TestCallsDontInterlace(t *testing.T) {
// this runs a task that times out and then writes bytes after the timeout,
// and then runs another task before those bytes are written. the 2nd task
// should be successful using the same container, and the 1st task should
// time out and its bytes shouldn't interfere with the 2nd task (this should
// be a totally normal happy path).
call := testCall()
call.Type = "sync"
call.Format = "http"
call.IdleTimeout = 60 // keep this bad boy alive
call.Timeout = 4 // short
app := &models.App{Name: "myapp"}
@@ -943,10 +925,9 @@ func TestPipesDontMakeSpuriousCalls(t *testing.T) {
app.ID = call.AppID
fn := &models.Fn{
ID: "fn_id",
AppID: call.AppID,
Image: call.Image,
Format: call.Format,
ID: "fn_id",
AppID: call.AppID,
Image: call.Image,
ResourceConfig: models.ResourceConfig{
Timeout: call.Timeout,
IdleTimeout: call.IdleTimeout,
@@ -1028,7 +1009,6 @@ func TestNBIOResourceTracker(t *testing.T) {
call := testCall()
call.Type = "sync"
call.Format = "http"
call.IdleTimeout = 60
call.Timeout = 30
call.Memory = 50
@@ -1037,10 +1017,9 @@ func TestNBIOResourceTracker(t *testing.T) {
app.ID = call.AppID
fn := &models.Fn{
ID: call.FnID,
AppID: call.AppID,
Image: call.Image,
Format: call.Format,
ID: call.FnID,
AppID: call.AppID,
Image: call.Image,
ResourceConfig: models.ResourceConfig{
Timeout: call.Timeout,
IdleTimeout: call.IdleTimeout,
@@ -1116,7 +1095,6 @@ func TestDockerAuthExtn(t *testing.T) {
FnID: id.New().String(),
Image: "fnproject/fn-test-utils",
Type: "sync",
Format: "http",
Timeout: 1,
IdleTimeout: 2,
}

View File

@@ -112,11 +112,6 @@ func (a *agent) asyncRun(ctx context.Context, model *models.Call) {
return
}
// TODO if the task is cold and doesn't require reading STDIN, it could
// run but we may not listen for output since the task timed out. these
// are at least once semantics, which is really preferable to at most
// once, so let's do it for now
err = a.Submit(call)
if err != nil {
// NOTE: these could be errors / timeouts from the call that we're

View File

@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
"mime"
"net/http"
"path/filepath"
"strings"
@@ -46,46 +45,11 @@ type CallOverrider func(*models.Call, map[string]string) (map[string]string, err
// TODO build w/o closures... lazy
type CallOpt func(c *call) error
const (
ceMimeType = "application/cloudevents+json"
// static path for all fn invocations
invokePath = "/invoke"
)
// Sets up a call from an http trigger request
func FromHTTPFnRequest(app *models.App, fn *models.Fn, req *http.Request) CallOpt {
return func(c *call) error {
ctx := req.Context()
log := common.Logger(ctx)
// Check whether this is a CloudEvent, if coming in via HTTP router (only way currently), then we'll look for a special header
// Content-Type header: https://github.com/cloudevents/spec/blob/master/http-transport-binding.md#32-structured-content-mode
// Expected Content-Type for a CloudEvent: application/cloudevents+json; charset=UTF-8
contentType := req.Header.Get("Content-Type")
t, _, err := mime.ParseMediaType(contentType)
if err != nil && contentType != "" {
// won't fail here, but log
log.Debugf("Could not parse Content-Type header: %v %v", contentType, err)
} else {
if t == ceMimeType {
c.IsCloudEvent = true
fn.Format = models.FormatCloudEvent
}
}
if fn.Format == "" {
fn.Format = models.FormatDefault
}
id := id.New().String()
// TODO this relies on ordering of opts, but tests make sure it works, probably re-plumb/destroy headers
// TODO async should probably supply an http.ResponseWriter that records the logs, to attach response headers to
if rw, ok := c.w.(http.ResponseWriter); ok {
rw.Header().Add("FN_CALL_ID", id)
rw.Header().Add("Fn-Call-Id", id)
}
var syslogURL string
if app.SyslogURL != nil {
syslogURL = *app.SyslogURL
@@ -95,8 +59,7 @@ func FromHTTPFnRequest(app *models.App, fn *models.Fn, req *http.Request) CallOp
ID: id,
Image: fn.Image,
// Delay: 0,
Type: "sync",
Format: fn.Format,
Type: "sync",
// Payload: TODO,
Priority: new(int32), // TODO this is crucial, apparently
Timeout: fn.Timeout,
@@ -104,7 +67,7 @@ func FromHTTPFnRequest(app *models.App, fn *models.Fn, req *http.Request) CallOp
TmpFsSize: 0, // TODO clean up this
Memory: fn.Memory,
CPUs: 0, // TODO clean up this
Config: buildConfig(app, fn, invokePath),
Config: buildConfig(app, fn),
// TODO - this wasn't really the intention here (that annotations would naturally cascade
// but seems to be necessary for some runner behaviour
Annotations: app.Annotations.MergeChange(fn.Annotations),
@@ -123,7 +86,7 @@ func FromHTTPFnRequest(app *models.App, fn *models.Fn, req *http.Request) CallOp
}
}
func buildConfig(app *models.App, fn *models.Fn, path string) models.Config {
func buildConfig(app *models.App, fn *models.Fn) models.Config {
conf := make(models.Config, 8+len(app.Config)+len(fn.Config))
for k, v := range app.Config {
conf[k] = v
@@ -132,16 +95,12 @@ func buildConfig(app *models.App, fn *models.Fn, path string) models.Config {
conf[k] = v
}
conf["FN_FORMAT"] = fn.Format
if fn.Format == models.FormatHTTPStream { // TODO should be always soon...
conf["FN_LISTENER"] = "unix:" + filepath.Join(iofsDockerMountDest, udsFilename)
}
conf["FN_APP_NAME"] = app.Name
conf["FN_PATH"] = path
// TODO: might be a good idea to pass in: "FN_BASE_PATH" = fmt.Sprintf("/r/%s", appName) || "/" if using DNS entries per app
// XXX(reed): add trigger id to request headers on call?
conf["FN_MEMORY"] = fmt.Sprintf("%d", fn.Memory)
conf["FN_TYPE"] = "sync"
conf["FN_FN_ID"] = fn.ID
conf["FN_APP_ID"] = app.ID
return conf
}
@@ -263,6 +222,13 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) {
return nil, models.ErrCallResourceTooBig
}
if c.Call.Config == nil {
c.Call.Config = make(models.Config)
}
c.Call.Config["FN_LISTENER"] = "unix:" + filepath.Join(iofsDockerMountDest, udsFilename)
c.Call.Config["FN_FORMAT"] = "http-stream" // TODO: remove this after fdk's forget what it means
// TODO we could set type here too, for now, or anything else not based in fn/app/trigger config
setupCtx(&c)
c.handler = a.da
@@ -287,18 +253,14 @@ func setupCtx(c *call) {
type call struct {
*models.Call
// IsCloudEvent flag whether this was ingested as a cloud event. This may become the default or only way.
IsCloudEvent bool `json:"is_cloud_event"`
handler CallHandler
w io.Writer
req *http.Request
stderr io.ReadWriteCloser
ct callTrigger
slots *slotQueue
requestState RequestState
containerState ContainerState
slotHashId string
handler CallHandler
w io.Writer
req *http.Request
stderr io.ReadWriteCloser
ct callTrigger
slots *slotQueue
requestState RequestState
slotHashId string
// LB & Pure Runner Extra Config
extensions map[string]string

View File

@@ -7,7 +7,7 @@
// create calls from various parameters and then execute those calls. An Agent
// has a few roles:
// * manage the memory pool for a given server
// * manage the container lifecycle for calls (hot+cold)
// * manage the container lifecycle for calls
// * execute calls against containers
// * invoke Start and End for each call appropriately
// * check the mq for any async calls, and submit them

View File

@@ -216,15 +216,18 @@ func (drv *DockerDriver) CreateCookie(ctx context.Context, task drivers.Containe
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "CreateCookie"})
stdinOn := task.Input() != nil
// XXX(reed): we can do same with stderr/stdout
opts := docker.CreateContainerOptions{
Name: task.Id(),
Config: &docker.Config{
Image: task.Image(),
OpenStdin: true,
OpenStdin: stdinOn,
StdinOnce: stdinOn,
AttachStdin: stdinOn,
AttachStdout: true,
AttachStdin: true,
AttachStderr: true,
StdinOnce: true,
},
HostConfig: &docker.HostConfig{
ReadonlyRootfs: drv.conf.EnableReadOnlyRootFs,

View File

@@ -18,13 +18,11 @@ type taskDockerTest struct {
errors io.Writer
}
func (f *taskDockerTest) Command() string { return "" }
func (f *taskDockerTest) EnvVars() map[string]string {
return map[string]string{"FN_FORMAT": "default"}
}
func (f *taskDockerTest) Command() string { return "" }
func (f *taskDockerTest) EnvVars() map[string]string { return map[string]string{} }
func (f *taskDockerTest) Id() string { return f.id }
func (f *taskDockerTest) Group() string { return "" }
func (f *taskDockerTest) Image() string { return "fnproject/fn-test-utils" }
func (f *taskDockerTest) Image() string { return "hello-world" }
func (f *taskDockerTest) Timeout() time.Duration { return 30 * time.Second }
func (f *taskDockerTest) Logger() (stdout, stderr io.Writer) { return f.output, f.errors }
func (f *taskDockerTest) WriteStat(context.Context, drivers.Stat) { /* TODO */ }
@@ -153,16 +151,14 @@ func TestRunnerDockerVersion(t *testing.T) {
}
}
func TestRunnerDockerStdin(t *testing.T) {
func TestRunnerDockerStdout(t *testing.T) {
dkr := NewDocker(drivers.Config{})
ctx := context.Background()
input := `{"echoContent": "italian parsley", "isDebug": true}`
var output bytes.Buffer
var errors bytes.Buffer
task := &taskDockerTest{"test-docker-stdin", bytes.NewBufferString(input), &output, &errors}
task := &taskDockerTest{"test-docker-stdin", bytes.NewBufferString(""), &output, &errors}
cookie, err := dkr.CreateCookie(ctx, task)
if err != nil {
@@ -191,7 +187,8 @@ func TestRunnerDockerStdin(t *testing.T) {
result.Error(), output.String(), errors.String())
}
expect := "italian parsley"
// if hello world image changes, change dis
expect := "Hello from Docker!"
got := output.String()
if !strings.Contains(got, expect) {
t.Errorf("Test expected output to contain '%s', got '%s'", expect, got)

View File

@@ -1,204 +0,0 @@
package protocol
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"go.opencensus.io/trace"
"github.com/fnproject/fn/api/models"
)
// CloudEvent is the official JSON representation of a CloudEvent: https://github.com/cloudevents/spec/blob/master/serialization.md
type CloudEvent struct {
CloudEventsVersion string `json:"cloudEventsVersion"`
EventID string `json:"eventID"`
Source string `json:"source"`
EventType string `json:"eventType"`
EventTypeVersion string `json:"eventTypeVersion"`
EventTime time.Time `json:"eventTime"`
SchemaURL string `json:"schemaURL"`
ContentType string `json:"contentType"`
Extensions map[string]interface{} `json:"extensions"`
Data interface{} `json:"data,omitempty"` // docs: the payload is encoded into a media format which is specified by the contentType attribute (e.g. application/json)
}
type cloudEventIn struct {
CloudEvent
// Deadline string `json:"deadline"`
// Protocol CallRequestHTTP `json:"protocol"`
}
// cloudEventOut the expected response from the function container
type cloudEventOut struct {
CloudEvent
// Protocol *CallResponseHTTP `json:"protocol,omitempty"`
}
// CloudEventProtocol converts stdin/stdout streams from HTTP into JSON format.
type CloudEventProtocol struct {
// These are the container input streams, not the input from the request or the output for the response
in io.Writer
out io.Reader
}
func (p *CloudEventProtocol) IsStreamable() bool {
return true
}
func (h *CloudEventProtocol) writeJSONToContainer(ci CallInfo) error {
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset()
defer bufPool.Put(buf)
_, err := io.Copy(buf, ci.Input())
if err != nil {
return err
}
// TODO: handle binary
var in cloudEventIn
if ci.IsCloudEvent() {
// then it's already in the right format, let's parse it, then modify
err = json.Unmarshal(buf.Bytes(), &in)
if err != nil {
return fmt.Errorf("Invalid CloudEvent input. %v", err)
}
} else {
in = cloudEventIn{
CloudEvent: CloudEvent{
ContentType: ci.ContentType(),
EventID: ci.CallID(),
EventType: "http",
CloudEventsVersion: "0.1",
Source: ci.RequestURL(),
},
}
// NOTE: data is an optional field, we can leave it as nil
if buf.Len() > 0 {
// NOTE: if it's not contentType=application/json, then a string is a valid json value, so this will work.
err := json.NewDecoder(buf).Decode(&in.Data)
if err != nil {
return fmt.Errorf("Invalid json body with contentType 'application/json'. %v", err)
}
}
}
// todo: deal with the dual ID's, one from outside, one from inside
if in.Extensions == nil {
in.Extensions = map[string]interface{}{}
}
// note: protocol stuff should be set on first ingestion of the event in fn2.0, the http router for example, not here
in.Extensions["protocol"] = CallRequestHTTP{
Type: ci.ProtocolType(),
Method: ci.Method(),
RequestURL: ci.RequestURL(),
Headers: ci.Headers(),
}
in.Extensions["deadline"] = ci.Deadline().String()
return json.NewEncoder(h.in).Encode(in)
}
func (h *CloudEventProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) error {
ctx, span := trace.StartSpan(ctx, "dispatch_cloudevent")
defer span.End()
_, span = trace.StartSpan(ctx, "dispatch_cloudevent_write_request")
err := h.writeJSONToContainer(ci)
span.End()
if err != nil {
return err
}
_, span = trace.StartSpan(ctx, "dispatch_cloudevent_read_response")
var jout cloudEventOut
decoder := json.NewDecoder(h.out)
err = decoder.Decode(&jout)
span.End()
if err != nil {
return models.NewAPIError(http.StatusBadGateway, fmt.Errorf("invalid json response from function err: %v", err))
}
_, span = trace.StartSpan(ctx, "dispatch_cloudevent_write_response")
defer span.End()
rw, ok := w.(http.ResponseWriter)
if !ok {
// logs can just copy the full thing in there, headers and all.
err := json.NewEncoder(w).Encode(jout)
return isExcessData(err, decoder)
}
// this has to be done for pulling out:
// - status code
// - body
// - headers
pp := jout.Extensions["protocol"]
var p map[string]interface{}
if pp != nil {
p = pp.(map[string]interface{})
hh := p["headers"]
if hh != nil {
h, ok := hh.(map[string]interface{})
if !ok {
return fmt.Errorf("Invalid JSON for protocol headers, not a map")
}
for k, v := range h {
// fmt.Printf("HEADER: %v: %v\n", k, v)
// fmt.Printf("%v", reflect.TypeOf(v))
harray, ok := v.([]interface{})
if !ok {
return fmt.Errorf("Invalid JSON for protocol headers, not an array of strings for header value")
}
for _, vv := range harray {
rw.Header().Add(k, vv.(string)) // on top of any specified on the route
}
}
}
}
// after other header setting, top level content_type takes precedence and is
// absolute (if set). it is expected that if users want to set multiple
// values they put it in the string, e.g. `"content-type:"application/json; charset=utf-8"`
// TODO this value should not exist since it's redundant in proto headers?
if jout.ContentType != "" {
rw.Header().Set("Content-Type", jout.ContentType)
}
// we must set all headers before writing the status, see http.ResponseWriter contract
if p != nil && p["status_code"] != nil {
sc, ok := p["status_code"].(float64)
if !ok {
return fmt.Errorf("Invalid status_code type in protocol extension, must be an integer: %v\n", p["status_code"])
}
rw.WriteHeader(int(sc))
}
if ci.IsCloudEvent() {
// then it's already in the right format so just return it as is
err = json.NewEncoder(rw).Encode(jout)
if err != nil {
return fmt.Errorf("Error marshalling CloudEvent response to json. %v\n", err)
}
} else {
if jout.ContentType == "application/json" {
d, err := json.Marshal(jout.Data)
if err != nil {
return fmt.Errorf("Error marshalling function response 'data' to json. %v\n", err)
}
_, err = rw.Write(d)
} else if jout.ContentType == "text/plain" {
_, err = io.WriteString(rw, jout.Data.(string))
} else {
return fmt.Errorf("Error: Unknown content type: %v\n", jout.ContentType)
}
}
return isExcessData(err, decoder)
}

View File

@@ -1,100 +0,0 @@
package protocol
import (
"bytes"
"encoding/json"
"io"
"net/http"
"strings"
"testing"
"time"
"github.com/fnproject/fn/api/common"
)
// implements CallInfo, modify as needed
type testCall struct {
cloud bool
contentType string
input io.Reader
}
func (t *testCall) IsCloudEvent() bool { return t.cloud }
func (t *testCall) CallID() string { return "foo" }
func (t *testCall) ContentType() string { return t.contentType }
func (t *testCall) Input() io.Reader { return t.input }
func (t *testCall) Deadline() common.DateTime {
return common.DateTime(time.Now().Add(30 * time.Second))
}
func (t *testCall) CallType() string { return "sync" }
func (t *testCall) ProtocolType() string { return "http" }
func (t *testCall) Request() *http.Request { return nil } // unused here
func (t *testCall) Method() string { return "GET" }
func (t *testCall) RequestURL() string { return "http://example.com/r/yo/dawg" }
func (t *testCall) Headers() map[string][]string { return map[string][]string{} }
func TestJSONMap(t *testing.T) {
in := strings.NewReader(`{"yo":"dawg"}`)
var ib, ob bytes.Buffer
cep := &CloudEventProtocol{
in: &ib,
out: &ob,
}
tc := &testCall{false, "application/json; charset=utf-8", in}
err := cep.writeJSONToContainer(tc)
if err != nil {
t.Fatal(err)
}
var oce CloudEvent
err = json.NewDecoder(&ib).Decode(&oce)
if err != nil {
t.Fatal(err)
}
mappo, ok := oce.Data.(map[string]interface{})
if !ok {
t.Fatalf("data field should be map[string]interface{}: %T", oce.Data)
}
v, ok := mappo["yo"].(string)
if v != "dawg" {
t.Fatal("value in map is wrong", v)
}
}
func TestJSONNotMap(t *testing.T) {
// we accept all json values here https://tools.ietf.org/html/rfc7159#section-3
in := strings.NewReader(`true`)
var ib, ob bytes.Buffer
cep := &CloudEventProtocol{
in: &ib,
out: &ob,
}
tc := &testCall{false, "application/json", in}
err := cep.writeJSONToContainer(tc)
if err != nil {
t.Fatal(err)
}
var oce CloudEvent
err = json.NewDecoder(&ib).Decode(&oce)
if err != nil {
t.Fatal(err)
}
boolo, ok := oce.Data.(bool)
if !ok {
t.Fatalf("data field should be bool: %T", oce.Data)
}
if !boolo {
t.Fatal("bool should be true", boolo)
}
}

View File

@@ -1,14 +0,0 @@
package protocol
import (
"context"
"io"
)
// DefaultProtocol is the protocol used by cold-containers
type DefaultProtocol struct{}
func (p *DefaultProtocol) IsStreamable() bool { return false }
func (d *DefaultProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) error {
return nil
}

View File

@@ -1,12 +0,0 @@
// Package protocol defines the protocol between the Fn Agent and the code
// running inside of a container. When an Fn Agent wants to perform a function
// call it needs to pass that call to a container over stdin. The call is
// encoded in one of the following protocols.
//
// * Default I/O Format
// * JSON I/O Format
// * HTTP I/O Format
//
// For more information on the function formats see
// https://github.com/fnproject/fn/blob/master/docs/developers/function-format.md.
package protocol

View File

@@ -1,178 +0,0 @@
package protocol
import (
"context"
"errors"
"io"
"net/http"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models"
)
var errInvalidProtocol = errors.New("Invalid Protocol")
var ErrExcessData = errors.New("Excess data in stream")
type errorProto struct {
error
}
func (e errorProto) IsStreamable() bool { return false }
func (e errorProto) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) error { return e }
// ContainerIO defines the interface used to talk to a hot function.
// Internally, a protocol must know when to alternate between stdin and stdout.
// It returns any protocol error, if present.
type ContainerIO interface {
IsStreamable() bool
// Dispatch will handle sending stdin and stdout to a container. Implementers
// of Dispatch may format the input and output differently. Dispatch must respect
// the req.Context() timeout / cancellation.
Dispatch(ctx context.Context, ci CallInfo, w io.Writer) error
}
// CallInfo is passed into dispatch with only the required data the protocols require
type CallInfo interface {
IsCloudEvent() bool
CallID() string
ContentType() string
Input() io.Reader
Deadline() common.DateTime
CallType() string
// ProtocolType let's function/fdk's know what type original request is. Only 'http' for now.
// This could be abstracted into separate Protocol objects for each type and all the following information could go in there.
// This is a bit confusing because we also have the protocol's for getting information in and out of the function containers.
ProtocolType() string
Request() *http.Request
Method() string
RequestURL() string
Headers() map[string][]string
}
type callInfoImpl struct {
call *models.Call
req *http.Request
isCloudEvent bool
}
func (ci callInfoImpl) IsCloudEvent() bool {
return ci.isCloudEvent
}
func (ci callInfoImpl) CallID() string {
return ci.call.ID
}
func (ci callInfoImpl) ContentType() string {
return ci.req.Header.Get("Content-Type")
}
// Input returns the call's input/body
func (ci callInfoImpl) Input() io.Reader {
return ci.req.Body
}
func (ci callInfoImpl) Deadline() common.DateTime {
deadline, ok := ci.req.Context().Deadline()
if !ok {
// In theory deadline must have been set here
panic("No context deadline is set in protocol, should never happen")
}
return common.DateTime(deadline)
}
// CallType returns whether the function call was "sync" or "async".
func (ci callInfoImpl) CallType() string {
return ci.call.Type
}
// ProtocolType at the moment can only be "http". Once we have Kafka or other
// possible origins for calls this will track what the origin was.
func (ci callInfoImpl) ProtocolType() string {
return "http"
}
// Request basically just for the http format, since that's the only that makes sense to have the full request as is
func (ci callInfoImpl) Request() *http.Request {
return ci.req
}
func (ci callInfoImpl) Method() string {
return ci.call.Method
}
func (ci callInfoImpl) RequestURL() string {
return ci.call.URL
}
func (ci callInfoImpl) Headers() map[string][]string {
return ci.req.Header
}
func NewCallInfo(isCloudEvent bool, call *models.Call, req *http.Request) CallInfo {
ci := &callInfoImpl{
isCloudEvent: isCloudEvent,
call: call,
req: req,
}
return ci
}
// Protocol defines all protocols that operates a ContainerIO.
type Protocol string
// hot function protocols
const (
Default Protocol = models.FormatDefault
HTTP Protocol = models.FormatHTTP
HTTPStream Protocol = models.FormatHTTPStream
JSON Protocol = models.FormatJSON
CloudEventP Protocol = models.FormatCloudEvent
Empty Protocol = ""
)
func (p *Protocol) UnmarshalJSON(b []byte) error {
switch Protocol(b) {
case Empty, Default:
*p = Default
case HTTP:
*p = HTTP
case JSON:
*p = JSON
default:
return errInvalidProtocol
}
return nil
}
func (p Protocol) MarshalJSON() ([]byte, error) {
switch p {
case Empty, Default:
return []byte(Default), nil
case HTTP:
return []byte(HTTP), nil
case JSON:
return []byte(JSON), nil
}
return nil, errInvalidProtocol
}
// New creates a valid protocol handler from a I/O pipe representing containers
// stdin/stdout.
func New(p Protocol, in io.Writer, out io.Reader) ContainerIO {
switch p {
case HTTP, HTTPStream:
return &HTTPProtocol{in, out}
case JSON:
return &JSONProtocol{in, out}
case CloudEventP:
return &CloudEventProtocol{in, out}
case Default, Empty:
return &DefaultProtocol{}
}
return &errorProto{errInvalidProtocol}
}
// IsStreamable says whether the given protocol can be used for streaming into
// hot functions.
func IsStreamable(p Protocol) bool { return New(p, nil, nil).IsStreamable() }

View File

@@ -1,80 +0,0 @@
package protocol
import (
"bufio"
"context"
"fmt"
"io"
"net/http"
"go.opencensus.io/trace"
"github.com/fnproject/fn/api/models"
)
// HTTPProtocol converts stdin/stdout streams into HTTP/1.1 compliant
// communication. It relies on Content-Length to know when to stop reading from
// containers stdout. It also mandates valid HTTP headers back and forth, thus
// returning errors in case of parsing problems.
type HTTPProtocol struct {
in io.Writer
out io.Reader
}
func (p *HTTPProtocol) IsStreamable() bool { return true }
func (h *HTTPProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) error {
ctx, span := trace.StartSpan(ctx, "dispatch_http")
defer span.End()
req := ci.Request()
req.RequestURI = ci.RequestURL() // force set to this, for req.Write to use (TODO? still?)
// Add Fn-specific headers for this protocol
req.Header.Set("FN_DEADLINE", ci.Deadline().String())
req.Header.Set("FN_METHOD", ci.Method())
req.Header.Set("FN_REQUEST_URL", ci.RequestURL())
req.Header.Set("FN_CALL_ID", ci.CallID())
_, span = trace.StartSpan(ctx, "dispatch_http_write_request")
// req.Write handles if the user does not specify content length
err := req.Write(h.in)
span.End()
if err != nil {
return err
}
_, span = trace.StartSpan(ctx, "dispatch_http_read_response")
resp, err := http.ReadResponse(bufio.NewReader(h.out), ci.Request())
span.End()
if err != nil {
return models.NewAPIError(http.StatusBadGateway, fmt.Errorf("invalid http response from function err: %v", err))
}
_, span = trace.StartSpan(ctx, "dispatch_http_write_response")
defer span.End()
rw, ok := w.(http.ResponseWriter)
if !ok {
// async / [some] tests go through here. write a full http request to the writer
resp.Write(w)
return nil
}
// if we're writing directly to the response writer, we need to set headers
// and status code, and only copy the body. resp.Write would copy a full
// http request into the response body (not what we want).
// add resp's on top of any specified on the route [on rw]
for k, vs := range resp.Header {
for _, v := range vs {
rw.Header().Add(k, v)
}
}
if resp.StatusCode > 0 {
rw.WriteHeader(resp.StatusCode)
}
io.Copy(rw, resp.Body)
return nil
}

View File

@@ -1,168 +0,0 @@
package protocol
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
"unicode"
"go.opencensus.io/trace"
"github.com/fnproject/fn/api/models"
)
var (
bufPool = &sync.Pool{New: func() interface{} { return new(bytes.Buffer) }}
)
// CallRequestHTTP for the protocol that was used by the end user to call this function. We only have HTTP right now.
type CallRequestHTTP struct {
Type string `json:"type"`
Method string `json:"method"`
RequestURL string `json:"request_url"`
Headers http.Header `json:"headers"`
}
// CallResponseHTTP for the protocol that was used by the end user to call this function. We only have HTTP right now.
type CallResponseHTTP struct {
StatusCode int `json:"status_code,omitempty"`
Headers http.Header `json:"headers,omitempty"`
}
// jsonIn We're not using this since we're writing JSON directly right now, but trying to keep it current anyways, much easier to read/follow
type jsonIn struct {
CallID string `json:"call_id"`
Deadline string `json:"deadline"`
Body string `json:"body"`
ContentType string `json:"content_type"`
Protocol CallRequestHTTP `json:"protocol"`
}
// jsonOut the expected response from the function container
type jsonOut struct {
Body string `json:"body"`
ContentType string `json:"content_type"`
Protocol *CallResponseHTTP `json:"protocol,omitempty"`
}
// JSONProtocol converts stdin/stdout streams from HTTP into JSON format.
type JSONProtocol struct {
// These are the container input streams, not the input from the request or the output for the response
in io.Writer
out io.Reader
}
func (p *JSONProtocol) IsStreamable() bool {
return true
}
func (h *JSONProtocol) writeJSONToContainer(ci CallInfo) error {
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset()
defer bufPool.Put(buf)
_, err := io.Copy(buf, ci.Input())
if err != nil {
return err
}
body := buf.String()
in := jsonIn{
Body: body,
ContentType: ci.ContentType(),
CallID: ci.CallID(),
Deadline: ci.Deadline().String(),
Protocol: CallRequestHTTP{
Type: ci.ProtocolType(),
Method: ci.Method(),
RequestURL: ci.RequestURL(),
Headers: ci.Headers(),
},
}
return json.NewEncoder(h.in).Encode(in)
}
func (h *JSONProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) error {
ctx, span := trace.StartSpan(ctx, "dispatch_json")
defer span.End()
_, span = trace.StartSpan(ctx, "dispatch_json_write_request")
err := h.writeJSONToContainer(ci)
span.End()
if err != nil {
return err
}
_, span = trace.StartSpan(ctx, "dispatch_json_read_response")
var jout jsonOut
decoder := json.NewDecoder(h.out)
err = decoder.Decode(&jout)
span.End()
if err != nil {
return models.NewAPIError(http.StatusBadGateway, fmt.Errorf("invalid json response from function err: %v", err))
}
_, span = trace.StartSpan(ctx, "dispatch_json_write_response")
defer span.End()
rw, ok := w.(http.ResponseWriter)
if !ok {
// logs can just copy the full thing in there, headers and all.
err := json.NewEncoder(w).Encode(jout)
return isExcessData(err, decoder)
}
// this has to be done for pulling out:
// - status code
// - body
// - headers
if jout.Protocol != nil {
p := jout.Protocol
for k, v := range p.Headers {
for _, vv := range v {
rw.Header().Add(k, vv) // on top of any specified on the fn
}
}
}
// after other header setting, top level content_type takes precedence and is
// absolute (if set). it is expected that if users want to set multiple
// values they put it in the string, e.g. `"content-type:"application/json; charset=utf-8"`
// TODO this value should not exist since it's redundant in proto headers?
if jout.ContentType != "" {
rw.Header().Set("Content-Type", jout.ContentType)
}
// we must set all headers before writing the status, see http.ResponseWriter contract
if p := jout.Protocol; p != nil && p.StatusCode != 0 {
rw.WriteHeader(p.StatusCode)
}
_, err = io.WriteString(rw, jout.Body)
return isExcessData(err, decoder)
}
func isExcessData(err error, decoder *json.Decoder) error {
if err == nil {
// Now check for excess output, if this is the case, we can be certain that the next request will fail.
reader, ok := decoder.Buffered().(*bytes.Reader)
if ok && reader.Len() > 0 {
// Let's check if extra data is whitespace, which is valid/ignored in json
for {
r, _, err := reader.ReadRune()
if err == io.EOF {
break
}
if !unicode.IsSpace(r) {
return ErrExcessData
}
}
}
}
return err
}

View File

@@ -1,212 +0,0 @@
package protocol
import (
"bytes"
"context"
"encoding/json"
"io"
"io/ioutil"
"net/http"
"net/url"
"testing"
"time"
"github.com/fnproject/fn/api/models"
)
type RequestData struct {
A string `json:"a"`
}
func setupRequest(data interface{}) (*callInfoImpl, context.CancelFunc) {
req := &http.Request{
Method: http.MethodPost,
URL: &url.URL{
Scheme: "http",
Host: "localhost:8080",
Path: "/v1/apps",
RawQuery: "something=something&etc=etc",
},
ProtoMajor: 1,
ProtoMinor: 1,
Header: http.Header{
"Host": []string{"localhost:8080"},
"User-Agent": []string{"curl/7.51.0"},
"Content-Type": []string{"application/json"},
},
Host: "localhost:8080",
}
var buf bytes.Buffer
if data != nil {
_ = json.NewEncoder(&buf).Encode(data)
}
req.Body = ioutil.NopCloser(&buf)
call := &models.Call{Type: "sync"}
// fixup URL in models.Call
call.URL = req.URL.String()
ctx, cancel := context.WithTimeout(req.Context(), 1*time.Second)
ci := &callInfoImpl{call: call, req: req.WithContext(ctx)}
return ci, cancel
}
func TestJSONProtocolwriteJSONInputRequestBasicFields(t *testing.T) {
ci, cancel := setupRequest(nil)
defer cancel()
r, w := io.Pipe()
proto := JSONProtocol{w, r}
go func() {
err := proto.writeJSONToContainer(ci)
if err != nil {
t.Error(err.Error())
}
w.Close()
}()
incomingReq := &jsonIn{}
bb := new(bytes.Buffer)
_, err := bb.ReadFrom(r)
if err != nil {
t.Error(err.Error())
}
err = json.Unmarshal(bb.Bytes(), incomingReq)
if err != nil {
t.Error(err.Error())
}
if incomingReq.CallID != ci.CallID() {
t.Errorf("Request CallID assertion mismatch: expected: %s, got %s",
ci.CallID(), incomingReq.CallID)
}
if incomingReq.ContentType != ci.ContentType() {
t.Errorf("Request ContentType assertion mismatch: expected: %s, got %s",
ci.ContentType(), incomingReq.ContentType)
}
if incomingReq.Deadline != ci.Deadline().String() {
t.Errorf("Request Deadline assertion mismatch: expected: %s, got %s",
ci.Deadline(), incomingReq.Deadline)
}
}
func TestJSONProtocolwriteJSONInputRequestWithData(t *testing.T) {
rDataBefore := RequestData{A: "a"}
ci, cancel := setupRequest(rDataBefore)
defer cancel()
r, w := io.Pipe()
proto := JSONProtocol{w, r}
go func() {
err := proto.writeJSONToContainer(ci)
if err != nil {
t.Error(err.Error())
}
w.Close()
}()
incomingReq := &jsonIn{}
bb := new(bytes.Buffer)
_, err := bb.ReadFrom(r)
if err != nil {
t.Error(err.Error())
}
err = json.Unmarshal(bb.Bytes(), incomingReq)
if err != nil {
t.Error(err.Error())
}
rDataAfter := new(RequestData)
err = json.Unmarshal([]byte(incomingReq.Body), &rDataAfter)
if err != nil {
t.Error(err.Error())
}
if rDataBefore.A != rDataAfter.A {
t.Errorf("Request data assertion mismatch: expected: %s, got %s",
rDataBefore.A, rDataAfter.A)
}
if incomingReq.Protocol.Type != ci.ProtocolType() {
t.Errorf("Call protocol type assertion mismatch: expected: %s, got %s",
ci.ProtocolType(), incomingReq.Protocol.Type)
}
if incomingReq.Protocol.Method != ci.Method() {
t.Errorf("Call protocol method assertion mismatch: expected: %s, got %s",
ci.Method(), incomingReq.Protocol.Method)
}
if incomingReq.Protocol.RequestURL != ci.RequestURL() {
t.Errorf("Call protocol request URL assertion mismatch: expected: %s, got %s",
ci.RequestURL(), incomingReq.Protocol.RequestURL)
}
}
func TestJSONProtocolwriteJSONInputRequestWithoutData(t *testing.T) {
ci, cancel := setupRequest(nil)
defer cancel()
r, w := io.Pipe()
proto := JSONProtocol{w, r}
go func() {
err := proto.writeJSONToContainer(ci)
if err != nil {
t.Error(err.Error())
}
w.Close()
}()
incomingReq := &jsonIn{}
bb := new(bytes.Buffer)
_, err := bb.ReadFrom(r)
if err != nil {
t.Error(err.Error())
}
err = json.Unmarshal(bb.Bytes(), incomingReq)
if err != nil {
t.Error(err.Error())
}
if incomingReq.Body != "" {
t.Errorf("Request body assertion mismatch: expected: %s, got %s",
"<empty-string>", incomingReq.Body)
}
if !models.Headers(ci.req.Header).Equals(models.Headers(incomingReq.Protocol.Headers)) {
t.Errorf("Request headers assertion mismatch: expected: %s, got %s",
ci.req.Header, incomingReq.Protocol.Headers)
}
if incomingReq.Protocol.Type != ci.ProtocolType() {
t.Errorf("Call protocol type assertion mismatch: expected: %s, got %s",
ci.ProtocolType(), incomingReq.Protocol.Type)
}
if incomingReq.Protocol.Method != ci.Method() {
t.Errorf("Call protocol method assertion mismatch: expected: %s, got %s",
ci.Method(), incomingReq.Protocol.Method)
}
if incomingReq.Protocol.RequestURL != ci.RequestURL() {
t.Errorf("Call protocol request URL assertion mismatch: expected: %s, got %s",
ci.RequestURL(), incomingReq.Protocol.RequestURL)
}
}
func TestJSONProtocolwriteJSONInputRequestWithQuery(t *testing.T) {
ci, cancel := setupRequest(nil)
defer cancel()
r, w := io.Pipe()
proto := JSONProtocol{w, r}
go func() {
err := proto.writeJSONToContainer(ci)
if err != nil {
t.Error(err.Error())
}
w.Close()
}()
incomingReq := &jsonIn{}
bb := new(bytes.Buffer)
_, err := bb.ReadFrom(r)
if err != nil {
t.Error(err.Error())
}
err = json.Unmarshal(bb.Bytes(), incomingReq)
if err != nil {
t.Error(err.Error())
}
if incomingReq.Protocol.RequestURL != ci.call.URL {
t.Errorf("Request URL does not match protocol URL: expected: %s, got %s",
ci.call.URL, incomingReq.Protocol.RequestURL)
}
}

View File

@@ -701,7 +701,6 @@ func (pr *pureRunner) runStatusCall(ctx context.Context) *runner.RunnerStatus {
c.ID = id.New().String()
c.Image = pr.status.imageName
c.Type = "sync"
c.Format = "json"
c.TmpFsSize = 0
c.Memory = 0
c.CPUs = models.MilliCPUs(0)
@@ -709,7 +708,6 @@ func (pr *pureRunner) runStatusCall(ctx context.Context) *runner.RunnerStatus {
c.Method = "GET"
c.CreatedAt = common.DateTime(start)
c.Config = make(models.Config)
c.Config["FN_FORMAT"] = c.Format
c.Payload = "{}"
c.Timeout = StatusCallTimeout
c.IdleTimeout = StatusCallIdleTimeout

View File

@@ -292,8 +292,6 @@ func getSlotQueueKey(call *call) string {
hash.Write(unsafeBytes("\x00"))
hash.Write(unsafeBytes(call.Image))
hash.Write(unsafeBytes("\x00"))
hash.Write(unsafeBytes(call.Format))
hash.Write(unsafeBytes("\x00"))
// these are all static in size we only need to delimit the whole block of them
var byt [8]byte

View File

@@ -278,9 +278,7 @@ func BenchmarkSlotKey(b *testing.B) {
url := "http://127.0.0.1:8080/invoke/" + fnID
payload := "payload"
typ := "sync"
format := "default"
cfg := models.Config{
"FN_FORMAT": format,
"FN_APP_NAME": appName,
"FN_MEMORY": strconv.Itoa(memory),
"FN_CPUS": CPUs.String(),
@@ -294,7 +292,6 @@ func BenchmarkSlotKey(b *testing.B) {
FnID: fnID,
Image: image,
Type: typ,
Format: format,
Timeout: timeout,
IdleTimeout: idleTimeout,
Memory: memory,