mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
fn: enforce container/FDK contract in dispatch (#1314)
1) FDK returned 200/502/504 codes now handled. 2) Container init timeout is now default to 5 seconds.
This commit is contained in:
@@ -605,21 +605,7 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error {
|
|||||||
})
|
})
|
||||||
|
|
||||||
call.req = call.req.WithContext(ctx) // TODO this is funny biz reed is bad
|
call.req = call.req.WithContext(ctx) // TODO this is funny biz reed is bad
|
||||||
|
return s.dispatch(ctx, call)
|
||||||
errApp := s.dispatch(ctx, call)
|
|
||||||
|
|
||||||
select {
|
|
||||||
case err := <-errApp: // from dispatch
|
|
||||||
if err != nil {
|
|
||||||
if models.IsAPIError(err) {
|
|
||||||
s.trySetError(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
case <-ctx.Done(): // call timeout
|
|
||||||
s.trySetError(ctx.Err())
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var removeHeaders = map[string]bool{
|
var removeHeaders = map[string]bool{
|
||||||
@@ -632,7 +618,7 @@ var removeHeaders = map[string]bool{
|
|||||||
"authorization": true,
|
"authorization": true,
|
||||||
}
|
}
|
||||||
|
|
||||||
func callToHTTPRequest(ctx context.Context, call *call) *http.Request {
|
func createUDSRequest(ctx context.Context, call *call) *http.Request {
|
||||||
req, err := http.NewRequest("POST", "http://localhost/call", call.req.Body)
|
req, err := http.NewRequest("POST", "http://localhost/call", call.req.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
common.Logger(ctx).WithError(err).Error("somebody put a bad url in the call http request. 10 lashes.")
|
common.Logger(ctx).WithError(err).Error("somebody put a bad url in the call http request. 10 lashes.")
|
||||||
@@ -661,70 +647,85 @@ func callToHTTPRequest(ctx context.Context, call *call) *http.Request {
|
|||||||
return req
|
return req
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *hotSlot) dispatch(ctx context.Context, call *call) chan error {
|
func (s *hotSlot) dispatch(ctx context.Context, call *call) error {
|
||||||
// TODO we can't trust that resp.Write doesn't timeout, even if the http
|
ctx, span := trace.StartSpan(ctx, "agent_dispatch_httpstream")
|
||||||
// client should respect the request context (right?) so we still need this (right?)
|
defer span.End()
|
||||||
errApp := make(chan error, 1)
|
|
||||||
|
|
||||||
|
// TODO it's possible we can get rid of this (after getting rid of logs API) - may need for call id/debug mode still
|
||||||
|
swapBack := s.container.swap(call.stderr, call.stderr, &call.Stats)
|
||||||
|
defer swapBack()
|
||||||
|
|
||||||
|
resp, err := s.container.udsClient.Do(createUDSRequest(ctx, call))
|
||||||
|
if err != nil {
|
||||||
|
// IMPORTANT: Container contract: If http-uds errors/timeout, container cannot continue
|
||||||
|
s.trySetError(err)
|
||||||
|
// first filter out timeouts
|
||||||
|
if ctx.Err() == context.DeadlineExceeded {
|
||||||
|
return context.DeadlineExceeded
|
||||||
|
}
|
||||||
|
return models.ErrFunctionResponse
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
common.Logger(ctx).WithField("resp", resp).Debug("Got resp from UDS socket")
|
||||||
|
|
||||||
|
ioErrChan := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
ctx, span := trace.StartSpan(ctx, "agent_dispatch_httpstream")
|
ioErrChan <- s.writeResp(ctx, s.cfg.MaxResponseSize, resp, call.respWriter)
|
||||||
defer span.End()
|
|
||||||
|
|
||||||
// TODO it's possible we can get rid of this (after getting rid of logs API) - may need for call id/debug mode still
|
|
||||||
// TODO there's a timeout race for swapping this back if the container doesn't get killed for timing out, and don't you forget it
|
|
||||||
swapBack := s.container.swap(call.stderr, call.stderr, &call.Stats)
|
|
||||||
defer swapBack()
|
|
||||||
|
|
||||||
req := callToHTTPRequest(ctx, call)
|
|
||||||
resp, err := s.container.udsClient.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
common.Logger(ctx).WithError(err).Error("Got error from UDS socket")
|
|
||||||
errApp <- models.ErrFunctionResponse
|
|
||||||
return
|
|
||||||
}
|
|
||||||
common.Logger(ctx).WithField("resp", resp).Debug("Got resp from UDS socket")
|
|
||||||
|
|
||||||
// if ctx is canceled/timedout, then we close the body to unlock writeResp() below
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
ioErrChan := make(chan error, 1)
|
|
||||||
go func() {
|
|
||||||
ioErrChan <- writeResp(s.cfg.MaxResponseSize, resp, call.w)
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case ioErr := <-ioErrChan:
|
|
||||||
errApp <- ioErr
|
|
||||||
case <-ctx.Done():
|
|
||||||
errApp <- ctx.Err()
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
return errApp
|
|
||||||
|
select {
|
||||||
|
case ioErr := <-ioErrChan:
|
||||||
|
return ioErr
|
||||||
|
case <-ctx.Done():
|
||||||
|
if ctx.Err() == context.DeadlineExceeded {
|
||||||
|
// IMPORTANT: Container contract: If http-uds timeout, container cannot continue
|
||||||
|
s.trySetError(ctx.Err())
|
||||||
|
}
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func writeResp(max uint64, resp *http.Response, w io.Writer) error {
|
func (s *hotSlot) writeResp(ctx context.Context, max uint64, resp *http.Response, w io.Writer) error {
|
||||||
rw, ok := w.(http.ResponseWriter)
|
rw, ok := w.(http.ResponseWriter)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
// WARNING: this bypasses container contract translation. Assuming this is
|
||||||
|
// async mode, where we are storing response in call.stderr.
|
||||||
w = common.NewClampWriter(w, max, models.ErrFunctionResponseTooBig)
|
w = common.NewClampWriter(w, max, models.ErrFunctionResponseTooBig)
|
||||||
return resp.Write(w)
|
return resp.Write(w)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IMPORTANT: Container contract: Enforce 200/502/504 expections
|
||||||
|
switch resp.StatusCode {
|
||||||
|
case http.StatusOK:
|
||||||
|
// FDK processed the request OK
|
||||||
|
case http.StatusBadGateway:
|
||||||
|
// FDK detected failure, container can continue
|
||||||
|
return models.ErrFunctionFailed
|
||||||
|
case http.StatusGatewayTimeout:
|
||||||
|
// FDK detected timeout, respond as if ctx expired, this gets translated & handled in handleCallEnd()
|
||||||
|
return context.DeadlineExceeded
|
||||||
|
default:
|
||||||
|
// Any other code. Possible FDK failure. We shutdown the container
|
||||||
|
s.trySetError(fmt.Errorf("FDK Error, invalid status code %d", resp.StatusCode))
|
||||||
|
return models.ErrFunctionInvalidResponse
|
||||||
|
}
|
||||||
|
|
||||||
rw = newSizerRespWriter(max, rw)
|
rw = newSizerRespWriter(max, rw)
|
||||||
|
rw.WriteHeader(http.StatusOK)
|
||||||
|
|
||||||
|
// WARNING: is the following header copy safe?
|
||||||
// if we're writing directly to the response writer, we need to set headers
|
// if we're writing directly to the response writer, we need to set headers
|
||||||
// and status code, and only copy the body. resp.Write would copy a full
|
// and only copy the body. resp.Write would copy a full
|
||||||
// http request into the response body (not what we want).
|
// http request into the response body (not what we want).
|
||||||
|
|
||||||
for k, vs := range resp.Header {
|
for k, vs := range resp.Header {
|
||||||
for _, v := range vs {
|
for _, v := range vs {
|
||||||
rw.Header().Add(k, v)
|
rw.Header().Add(k, v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if resp.StatusCode > 0 {
|
|
||||||
rw.WriteHeader(resp.StatusCode)
|
_, ioErr := io.Copy(rw, resp.Body)
|
||||||
}
|
return ioErr
|
||||||
_, err := io.Copy(rw, resp.Body)
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// XXX(reed): this is a remnant of old io.pipe plumbing, we need to get rid of
|
// XXX(reed): this is a remnant of old io.pipe plumbing, we need to get rid of
|
||||||
|
|||||||
@@ -167,7 +167,7 @@ func WithTrigger(t *models.Trigger) CallOpt {
|
|||||||
// TODO this should be required
|
// TODO this should be required
|
||||||
func WithWriter(w io.Writer) CallOpt {
|
func WithWriter(w io.Writer) CallOpt {
|
||||||
return func(c *call) error {
|
return func(c *call) error {
|
||||||
c.w = w
|
c.respWriter = w
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -252,10 +252,10 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) {
|
|||||||
// XXX(reed): forcing this as default is not great / configuring it isn't great either. reconsider.
|
// XXX(reed): forcing this as default is not great / configuring it isn't great either. reconsider.
|
||||||
c.stderr = setupLogger(c.req.Context(), a.cfg.MaxLogSize, !a.cfg.DisableDebugUserLogs, c.Call)
|
c.stderr = setupLogger(c.req.Context(), a.cfg.MaxLogSize, !a.cfg.DisableDebugUserLogs, c.Call)
|
||||||
}
|
}
|
||||||
if c.w == nil {
|
if c.respWriter == nil {
|
||||||
// send STDOUT to logs if no writer given (async...)
|
// send STDOUT to logs if no writer given (async...)
|
||||||
// TODO we could/should probably make this explicit to GetCall, ala 'WithLogger', but it's dupe code (who cares?)
|
// TODO we could/should probably make this explicit to GetCall, ala 'WithLogger', but it's dupe code (who cares?)
|
||||||
c.w = c.stderr
|
c.respWriter = c.stderr
|
||||||
}
|
}
|
||||||
|
|
||||||
return &c, nil
|
return &c, nil
|
||||||
@@ -271,7 +271,7 @@ type call struct {
|
|||||||
*models.Call
|
*models.Call
|
||||||
|
|
||||||
handler CallHandler
|
handler CallHandler
|
||||||
w io.Writer
|
respWriter io.Writer
|
||||||
req *http.Request
|
req *http.Request
|
||||||
stderr io.ReadWriteCloser
|
stderr io.ReadWriteCloser
|
||||||
ct callTrigger
|
ct callTrigger
|
||||||
@@ -304,7 +304,7 @@ func (c *call) RequestBody() io.ReadCloser {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *call) ResponseWriter() http.ResponseWriter {
|
func (c *call) ResponseWriter() http.ResponseWriter {
|
||||||
return c.w.(http.ResponseWriter)
|
return c.respWriter.(http.ResponseWriter)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *call) StdErr() io.ReadWriteCloser {
|
func (c *call) StdErr() io.ReadWriteCloser {
|
||||||
|
|||||||
@@ -134,7 +134,7 @@ func NewConfig() (*Config, error) {
|
|||||||
err = setEnvMsecs(err, EnvHotPoll, &cfg.HotPoll, DefaultHotPoll)
|
err = setEnvMsecs(err, EnvHotPoll, &cfg.HotPoll, DefaultHotPoll)
|
||||||
err = setEnvMsecs(err, EnvHotLauncherTimeout, &cfg.HotLauncherTimeout, time.Duration(60)*time.Minute)
|
err = setEnvMsecs(err, EnvHotLauncherTimeout, &cfg.HotLauncherTimeout, time.Duration(60)*time.Minute)
|
||||||
err = setEnvMsecs(err, EnvHotPullTimeout, &cfg.HotPullTimeout, time.Duration(10)*time.Minute)
|
err = setEnvMsecs(err, EnvHotPullTimeout, &cfg.HotPullTimeout, time.Duration(10)*time.Minute)
|
||||||
err = setEnvMsecs(err, EnvHotStartTimeout, &cfg.HotStartTimeout, time.Duration(10)*time.Second)
|
err = setEnvMsecs(err, EnvHotStartTimeout, &cfg.HotStartTimeout, time.Duration(5)*time.Second)
|
||||||
err = setEnvMsecs(err, EnvAsyncChewPoll, &cfg.AsyncChewPoll, time.Duration(60)*time.Second)
|
err = setEnvMsecs(err, EnvAsyncChewPoll, &cfg.AsyncChewPoll, time.Duration(60)*time.Second)
|
||||||
err = setEnvMsecs(err, EnvDetachedHeadroom, &cfg.DetachedHeadRoom, time.Duration(360)*time.Second)
|
err = setEnvMsecs(err, EnvDetachedHeadroom, &cfg.DetachedHeadRoom, time.Duration(360)*time.Second)
|
||||||
err = setEnvUint(err, EnvMaxResponseSize, &cfg.MaxResponseSize)
|
err = setEnvUint(err, EnvMaxResponseSize, &cfg.MaxResponseSize)
|
||||||
|
|||||||
@@ -220,7 +220,7 @@ func (a *lbAgent) Submit(callI Call) error {
|
|||||||
|
|
||||||
func (a *lbAgent) placeDetachCall(ctx context.Context, call *call) error {
|
func (a *lbAgent) placeDetachCall(ctx context.Context, call *call) error {
|
||||||
errPlace := make(chan error, 1)
|
errPlace := make(chan error, 1)
|
||||||
rw := call.w.(*DetachedResponseWriter)
|
rw := call.respWriter.(*DetachedResponseWriter)
|
||||||
go a.spawnPlaceCall(ctx, call, errPlace)
|
go a.spawnPlaceCall(ctx, call, errPlace)
|
||||||
select {
|
select {
|
||||||
case err := <-errPlace:
|
case err := <-errPlace:
|
||||||
|
|||||||
@@ -138,6 +138,14 @@ var (
|
|||||||
code: http.StatusBadGateway,
|
code: http.StatusBadGateway,
|
||||||
error: fmt.Errorf("error receiving function response"),
|
error: fmt.Errorf("error receiving function response"),
|
||||||
}
|
}
|
||||||
|
ErrFunctionFailed = err{
|
||||||
|
code: http.StatusBadGateway,
|
||||||
|
error: fmt.Errorf("function failed"),
|
||||||
|
}
|
||||||
|
ErrFunctionInvalidResponse = err{
|
||||||
|
code: http.StatusBadGateway,
|
||||||
|
error: fmt.Errorf("invalid function response"),
|
||||||
|
}
|
||||||
ErrRequestContentTooBig = err{
|
ErrRequestContentTooBig = err{
|
||||||
code: http.StatusRequestEntityTooLarge,
|
code: http.StatusRequestEntityTooLarge,
|
||||||
error: fmt.Errorf("Request content too large"),
|
error: fmt.Errorf("Request content too large"),
|
||||||
|
|||||||
Reference in New Issue
Block a user