diff --git a/api/agent/agent.go b/api/agent/agent.go index f4457521f..24486b2e7 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -605,21 +605,7 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error { }) call.req = call.req.WithContext(ctx) // TODO this is funny biz reed is bad - - errApp := s.dispatch(ctx, call) - - select { - case err := <-errApp: // from dispatch - if err != nil { - if models.IsAPIError(err) { - s.trySetError(err) - } - } - return err - case <-ctx.Done(): // call timeout - s.trySetError(ctx.Err()) - return ctx.Err() - } + return s.dispatch(ctx, call) } var removeHeaders = map[string]bool{ @@ -632,7 +618,7 @@ var removeHeaders = map[string]bool{ "authorization": true, } -func callToHTTPRequest(ctx context.Context, call *call) *http.Request { +func createUDSRequest(ctx context.Context, call *call) *http.Request { req, err := http.NewRequest("POST", "http://localhost/call", call.req.Body) if err != nil { common.Logger(ctx).WithError(err).Error("somebody put a bad url in the call http request. 10 lashes.") @@ -661,70 +647,85 @@ func callToHTTPRequest(ctx context.Context, call *call) *http.Request { return req } -func (s *hotSlot) dispatch(ctx context.Context, call *call) chan error { - // TODO we can't trust that resp.Write doesn't timeout, even if the http - // client should respect the request context (right?) so we still need this (right?) - errApp := make(chan error, 1) +func (s *hotSlot) dispatch(ctx context.Context, call *call) error { + ctx, span := trace.StartSpan(ctx, "agent_dispatch_httpstream") + defer span.End() + // TODO it's possible we can get rid of this (after getting rid of logs API) - may need for call id/debug mode still + swapBack := s.container.swap(call.stderr, call.stderr, &call.Stats) + defer swapBack() + + resp, err := s.container.udsClient.Do(createUDSRequest(ctx, call)) + if err != nil { + // IMPORTANT: Container contract: If http-uds errors/timeout, container cannot continue + s.trySetError(err) + // first filter out timeouts + if ctx.Err() == context.DeadlineExceeded { + return context.DeadlineExceeded + } + return models.ErrFunctionResponse + } + defer resp.Body.Close() + + common.Logger(ctx).WithField("resp", resp).Debug("Got resp from UDS socket") + + ioErrChan := make(chan error, 1) go func() { - ctx, span := trace.StartSpan(ctx, "agent_dispatch_httpstream") - defer span.End() - - // TODO it's possible we can get rid of this (after getting rid of logs API) - may need for call id/debug mode still - // TODO there's a timeout race for swapping this back if the container doesn't get killed for timing out, and don't you forget it - swapBack := s.container.swap(call.stderr, call.stderr, &call.Stats) - defer swapBack() - - req := callToHTTPRequest(ctx, call) - resp, err := s.container.udsClient.Do(req) - if err != nil { - common.Logger(ctx).WithError(err).Error("Got error from UDS socket") - errApp <- models.ErrFunctionResponse - return - } - common.Logger(ctx).WithField("resp", resp).Debug("Got resp from UDS socket") - - // if ctx is canceled/timedout, then we close the body to unlock writeResp() below - defer resp.Body.Close() - - ioErrChan := make(chan error, 1) - go func() { - ioErrChan <- writeResp(s.cfg.MaxResponseSize, resp, call.w) - }() - - select { - case ioErr := <-ioErrChan: - errApp <- ioErr - case <-ctx.Done(): - errApp <- ctx.Err() - } + ioErrChan <- s.writeResp(ctx, s.cfg.MaxResponseSize, resp, call.respWriter) }() - return errApp + + select { + case ioErr := <-ioErrChan: + return ioErr + case <-ctx.Done(): + if ctx.Err() == context.DeadlineExceeded { + // IMPORTANT: Container contract: If http-uds timeout, container cannot continue + s.trySetError(ctx.Err()) + } + return ctx.Err() + } } -func writeResp(max uint64, resp *http.Response, w io.Writer) error { +func (s *hotSlot) writeResp(ctx context.Context, max uint64, resp *http.Response, w io.Writer) error { rw, ok := w.(http.ResponseWriter) if !ok { + // WARNING: this bypasses container contract translation. Assuming this is + // async mode, where we are storing response in call.stderr. w = common.NewClampWriter(w, max, models.ErrFunctionResponseTooBig) return resp.Write(w) } + // IMPORTANT: Container contract: Enforce 200/502/504 expections + switch resp.StatusCode { + case http.StatusOK: + // FDK processed the request OK + case http.StatusBadGateway: + // FDK detected failure, container can continue + return models.ErrFunctionFailed + case http.StatusGatewayTimeout: + // FDK detected timeout, respond as if ctx expired, this gets translated & handled in handleCallEnd() + return context.DeadlineExceeded + default: + // Any other code. Possible FDK failure. We shutdown the container + s.trySetError(fmt.Errorf("FDK Error, invalid status code %d", resp.StatusCode)) + return models.ErrFunctionInvalidResponse + } + rw = newSizerRespWriter(max, rw) + rw.WriteHeader(http.StatusOK) + // WARNING: is the following header copy safe? // if we're writing directly to the response writer, we need to set headers - // and status code, and only copy the body. resp.Write would copy a full + // and only copy the body. resp.Write would copy a full // http request into the response body (not what we want). - for k, vs := range resp.Header { for _, v := range vs { rw.Header().Add(k, v) } } - if resp.StatusCode > 0 { - rw.WriteHeader(resp.StatusCode) - } - _, err := io.Copy(rw, resp.Body) - return err + + _, ioErr := io.Copy(rw, resp.Body) + return ioErr } // XXX(reed): this is a remnant of old io.pipe plumbing, we need to get rid of diff --git a/api/agent/call.go b/api/agent/call.go index 7af811520..b6ce16f68 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -167,7 +167,7 @@ func WithTrigger(t *models.Trigger) CallOpt { // TODO this should be required func WithWriter(w io.Writer) CallOpt { return func(c *call) error { - c.w = w + c.respWriter = w return nil } } @@ -252,10 +252,10 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) { // XXX(reed): forcing this as default is not great / configuring it isn't great either. reconsider. c.stderr = setupLogger(c.req.Context(), a.cfg.MaxLogSize, !a.cfg.DisableDebugUserLogs, c.Call) } - if c.w == nil { + if c.respWriter == nil { // send STDOUT to logs if no writer given (async...) // TODO we could/should probably make this explicit to GetCall, ala 'WithLogger', but it's dupe code (who cares?) - c.w = c.stderr + c.respWriter = c.stderr } return &c, nil @@ -271,7 +271,7 @@ type call struct { *models.Call handler CallHandler - w io.Writer + respWriter io.Writer req *http.Request stderr io.ReadWriteCloser ct callTrigger @@ -304,7 +304,7 @@ func (c *call) RequestBody() io.ReadCloser { } func (c *call) ResponseWriter() http.ResponseWriter { - return c.w.(http.ResponseWriter) + return c.respWriter.(http.ResponseWriter) } func (c *call) StdErr() io.ReadWriteCloser { diff --git a/api/agent/config.go b/api/agent/config.go index 32188770c..08f298a2a 100644 --- a/api/agent/config.go +++ b/api/agent/config.go @@ -134,7 +134,7 @@ func NewConfig() (*Config, error) { err = setEnvMsecs(err, EnvHotPoll, &cfg.HotPoll, DefaultHotPoll) err = setEnvMsecs(err, EnvHotLauncherTimeout, &cfg.HotLauncherTimeout, time.Duration(60)*time.Minute) err = setEnvMsecs(err, EnvHotPullTimeout, &cfg.HotPullTimeout, time.Duration(10)*time.Minute) - err = setEnvMsecs(err, EnvHotStartTimeout, &cfg.HotStartTimeout, time.Duration(10)*time.Second) + err = setEnvMsecs(err, EnvHotStartTimeout, &cfg.HotStartTimeout, time.Duration(5)*time.Second) err = setEnvMsecs(err, EnvAsyncChewPoll, &cfg.AsyncChewPoll, time.Duration(60)*time.Second) err = setEnvMsecs(err, EnvDetachedHeadroom, &cfg.DetachedHeadRoom, time.Duration(360)*time.Second) err = setEnvUint(err, EnvMaxResponseSize, &cfg.MaxResponseSize) diff --git a/api/agent/lb_agent.go b/api/agent/lb_agent.go index 84b18c4ea..3773fd9e1 100644 --- a/api/agent/lb_agent.go +++ b/api/agent/lb_agent.go @@ -220,7 +220,7 @@ func (a *lbAgent) Submit(callI Call) error { func (a *lbAgent) placeDetachCall(ctx context.Context, call *call) error { errPlace := make(chan error, 1) - rw := call.w.(*DetachedResponseWriter) + rw := call.respWriter.(*DetachedResponseWriter) go a.spawnPlaceCall(ctx, call, errPlace) select { case err := <-errPlace: diff --git a/api/models/error.go b/api/models/error.go index 15b6d1059..7d66689eb 100644 --- a/api/models/error.go +++ b/api/models/error.go @@ -138,6 +138,14 @@ var ( code: http.StatusBadGateway, error: fmt.Errorf("error receiving function response"), } + ErrFunctionFailed = err{ + code: http.StatusBadGateway, + error: fmt.Errorf("function failed"), + } + ErrFunctionInvalidResponse = err{ + code: http.StatusBadGateway, + error: fmt.Errorf("invalid function response"), + } ErrRequestContentTooBig = err{ code: http.StatusRequestEntityTooLarge, error: fmt.Errorf("Request content too large"),