mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
fn: introducing 503 responses for out of capacity case (#518)
* fn: introducing 503 responses for out of capacity case *) Adding 503 with Retry-After header case if request failed during waiting for slots. *) TODO: return 503 without Retry-After if the request can never be met by this fn server. *) fn: runner test docker pull fixup *) fn: MaxMemory for routes is now a variable to allow testing and adjusting it according to fleet memory sizes.
This commit is contained in:
@@ -170,7 +170,18 @@ func (a *agent) Close() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func transformTimeout(e error, isRetriable bool) error {
|
||||||
|
if e == context.DeadlineExceeded {
|
||||||
|
if isRetriable {
|
||||||
|
return models.ErrCallTimeoutServerBusy
|
||||||
|
}
|
||||||
|
return models.ErrCallTimeout
|
||||||
|
}
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
|
||||||
func (a *agent) Submit(callI Call) error {
|
func (a *agent) Submit(callI Call) error {
|
||||||
|
|
||||||
a.wg.Add(1)
|
a.wg.Add(1)
|
||||||
defer a.wg.Done()
|
defer a.wg.Done()
|
||||||
|
|
||||||
@@ -199,7 +210,7 @@ func (a *agent) Submit(callI Call) error {
|
|||||||
slot, err := a.getSlot(ctx, call) // find ram available / running
|
slot, err := a.getSlot(ctx, call) // find ram available / running
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.stats.Dequeue(callI.Model().Path)
|
a.stats.Dequeue(callI.Model().Path)
|
||||||
return err
|
return transformTimeout(err, true)
|
||||||
}
|
}
|
||||||
// TODO if the call times out & container is created, we need
|
// TODO if the call times out & container is created, we need
|
||||||
// to make this remove the container asynchronously?
|
// to make this remove the container asynchronously?
|
||||||
@@ -209,7 +220,7 @@ func (a *agent) Submit(callI Call) error {
|
|||||||
err = call.Start(ctx, a)
|
err = call.Start(ctx, a)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.stats.Dequeue(callI.Model().Path)
|
a.stats.Dequeue(callI.Model().Path)
|
||||||
return err
|
return transformTimeout(err, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// decrement queued count, increment running count
|
// decrement queued count, increment running count
|
||||||
@@ -231,7 +242,7 @@ func (a *agent) Submit(callI Call) error {
|
|||||||
// but this could put us over the timeout if the call did not reply yet (need better policy).
|
// but this could put us over the timeout if the call did not reply yet (need better policy).
|
||||||
ctx = opentracing.ContextWithSpan(context.Background(), span)
|
ctx = opentracing.ContextWithSpan(context.Background(), span)
|
||||||
err = call.End(ctx, err, a)
|
err = call.End(ctx, err, a)
|
||||||
return err
|
return transformTimeout(err, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getSlot must ensure that if it receives a slot, it will be returned, otherwise
|
// getSlot must ensure that if it receives a slot, it will be returned, otherwise
|
||||||
|
|||||||
@@ -20,6 +20,10 @@ var (
|
|||||||
code: http.StatusGatewayTimeout,
|
code: http.StatusGatewayTimeout,
|
||||||
error: errors.New("Timed out"),
|
error: errors.New("Timed out"),
|
||||||
}
|
}
|
||||||
|
ErrCallTimeoutServerBusy = err{
|
||||||
|
code: http.StatusServiceUnavailable,
|
||||||
|
error: errors.New("Timed out - server too busy"),
|
||||||
|
}
|
||||||
ErrAppsMissingName = err{
|
ErrAppsMissingName = err{
|
||||||
code: http.StatusBadRequest,
|
code: http.StatusBadRequest,
|
||||||
error: errors.New("Missing app name"),
|
error: errors.New("Missing app name"),
|
||||||
@@ -154,7 +158,7 @@ var (
|
|||||||
}
|
}
|
||||||
ErrRoutesInvalidMemory = err{
|
ErrRoutesInvalidMemory = err{
|
||||||
code: http.StatusBadRequest,
|
code: http.StatusBadRequest,
|
||||||
error: fmt.Errorf("memory value is invalid. 0 < memory < %d", MaxMemory),
|
error: fmt.Errorf("memory value is invalid. 0 < memory < %d", RouteMaxMemory),
|
||||||
}
|
}
|
||||||
ErrCallNotFound = err{
|
ErrCallNotFound = err{
|
||||||
code: http.StatusNotFound,
|
code: http.StatusNotFound,
|
||||||
|
|||||||
@@ -17,9 +17,10 @@ const (
|
|||||||
MaxSyncTimeout = 120 // 2 minutes
|
MaxSyncTimeout = 120 // 2 minutes
|
||||||
MaxAsyncTimeout = 3600 // 1 hour
|
MaxAsyncTimeout = 3600 // 1 hour
|
||||||
MaxIdleTimeout = MaxAsyncTimeout
|
MaxIdleTimeout = MaxAsyncTimeout
|
||||||
MaxMemory = 1024 * 8 // 8GB TODO should probably be a var of machine max?
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var RouteMaxMemory = uint64(8 * 1024) // 8GB TODO should probably be a var of machine max?
|
||||||
|
|
||||||
type Routes []*Route
|
type Routes []*Route
|
||||||
|
|
||||||
type Route struct {
|
type Route struct {
|
||||||
@@ -112,7 +113,7 @@ func (r *Route) Validate() error {
|
|||||||
return ErrRoutesInvalidIdleTimeout
|
return ErrRoutesInvalidIdleTimeout
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.Memory < 1 || r.Memory > MaxMemory {
|
if r.Memory < 1 || r.Memory > RouteMaxMemory {
|
||||||
return ErrRoutesInvalidMemory
|
return ErrRoutesInvalidMemory
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -32,6 +32,11 @@ func HandleErrorResponse(ctx context.Context, w http.ResponseWriter, err error)
|
|||||||
if e.Code() >= 500 {
|
if e.Code() >= 500 {
|
||||||
log.WithFields(logrus.Fields{"code": e.Code()}).WithError(e).Error("api error")
|
log.WithFields(logrus.Fields{"code": e.Code()}).WithError(e).Error("api error")
|
||||||
}
|
}
|
||||||
|
if err == models.ErrCallTimeoutServerBusy {
|
||||||
|
// TODO: Determine a better delay value here (perhaps ask Agent). For now 15 secs with
|
||||||
|
// the hopes that fnlb will land this on a better server immediately.
|
||||||
|
w.Header().Set("Retry-After", "15")
|
||||||
|
}
|
||||||
statuscode = e.Code()
|
statuscode = e.Code()
|
||||||
} else {
|
} else {
|
||||||
log.WithError(err).WithFields(logrus.Fields{"stack": string(debug.Stack())}).Error("internal server error")
|
log.WithError(err).WithFields(logrus.Fields{"stack": string(debug.Stack())}).Error("internal server error")
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package server
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"path"
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -90,12 +89,10 @@ func (s *Server) serve(c *gin.Context, appName, path string) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
// NOTE if they cancel the request then it will stop the call (kind of cool),
|
// NOTE if they cancel the request then it will stop the call (kind of cool),
|
||||||
// we could filter that error out here too as right now it yells a little
|
// we could filter that error out here too as right now it yells a little
|
||||||
if err == context.DeadlineExceeded {
|
if err == models.ErrCallTimeoutServerBusy || err == models.ErrCallTimeout {
|
||||||
// TODO maneuver
|
// TODO maneuver
|
||||||
// add this, since it means that start may not have been called [and it's relevant]
|
// add this, since it means that start may not have been called [and it's relevant]
|
||||||
c.Writer.Header().Add("XXX-FXLB-WAIT", time.Now().Sub(time.Time(model.CreatedAt)).String())
|
c.Writer.Header().Add("XXX-FXLB-WAIT", time.Now().Sub(time.Time(model.CreatedAt)).String())
|
||||||
|
|
||||||
err = models.ErrCallTimeout // 504 w/ friendly note
|
|
||||||
}
|
}
|
||||||
// NOTE: if the task wrote the headers already then this will fail to write
|
// NOTE: if the task wrote the headers already then this will fail to write
|
||||||
// a 5xx (and log about it to us) -- that's fine (nice, even!)
|
// a 5xx (and log about it to us) -- that's fine (nice, even!)
|
||||||
|
|||||||
@@ -232,15 +232,19 @@ func TestFailedEnqueue(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestRouteRunnerTimeout(t *testing.T) {
|
func TestRouteRunnerTimeout(t *testing.T) {
|
||||||
t.Skip("doesn't work on old Ubuntu")
|
|
||||||
buf := setLogBuffer()
|
buf := setLogBuffer()
|
||||||
|
|
||||||
|
models.RouteMaxMemory = uint64(1024 * 1024 * 1024) // 1024 TB
|
||||||
|
hugeMem := uint64(models.RouteMaxMemory - 1)
|
||||||
|
|
||||||
ds := datastore.NewMockInit(
|
ds := datastore.NewMockInit(
|
||||||
[]*models.App{
|
[]*models.App{
|
||||||
{Name: "myapp", Config: models.Config{}},
|
{Name: "myapp", Config: models.Config{}},
|
||||||
},
|
},
|
||||||
[]*models.Route{
|
[]*models.Route{
|
||||||
{Path: "/sleeper", AppName: "myapp", Image: "fnproject/sleeper", Timeout: 1},
|
{Path: "/pull", AppName: "myapp", Image: "fnproject/sleeper", Type: "sync", Memory: 128, Timeout: 30, IdleTimeout: 30},
|
||||||
|
{Path: "/sleeper", AppName: "myapp", Image: "fnproject/sleeper", Type: "sync", Memory: 128, Timeout: 1, IdleTimeout: 30},
|
||||||
|
{Path: "/waitmemory", AppName: "myapp", Image: "fnproject/sleeper", Type: "sync", Memory: hugeMem, Timeout: 1, IdleTimeout: 30},
|
||||||
}, nil,
|
}, nil,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -257,8 +261,11 @@ func TestRouteRunnerTimeout(t *testing.T) {
|
|||||||
expectedCode int
|
expectedCode int
|
||||||
expectedHeaders map[string][]string
|
expectedHeaders map[string][]string
|
||||||
}{
|
}{
|
||||||
|
// first request with large timeout, we let the docker pull go through...
|
||||||
|
{"/r/myapp/pull", `{"sleep": 0}`, "POST", http.StatusOK, nil},
|
||||||
{"/r/myapp/sleeper", `{"sleep": 0}`, "POST", http.StatusOK, nil},
|
{"/r/myapp/sleeper", `{"sleep": 0}`, "POST", http.StatusOK, nil},
|
||||||
{"/r/myapp/sleeper", `{"sleep": 2}`, "POST", http.StatusGatewayTimeout, nil},
|
{"/r/myapp/sleeper", `{"sleep": 4}`, "POST", http.StatusGatewayTimeout, nil},
|
||||||
|
{"/r/myapp/waitmemory", `{"sleep": 0}`, "POST", http.StatusServiceUnavailable, map[string][]string{"Retry-After": {"15"}}},
|
||||||
} {
|
} {
|
||||||
body := strings.NewReader(test.body)
|
body := strings.NewReader(test.body)
|
||||||
_, rec := routerRequest(t, srv.Router, test.method, test.path, body)
|
_, rec := routerRequest(t, srv.Router, test.method, test.path, body)
|
||||||
|
|||||||
Reference in New Issue
Block a user