fn: user friendly timeout handling changes (#1021)

* fn: user friendly timeout handling changes

Timeout setting in routes now means "maximum amount
of time a function can run in a container".

Total wait time for a given http request is now expected
to be handled by the client. As long as the client waits,
the LB, runner or agents will search for resources to
schedule it.
This commit is contained in:
Tolga Ceylan
2018-06-01 13:18:13 -07:00
committed by GitHub
parent ffefcf5773
commit a57907eed0
15 changed files with 105 additions and 138 deletions

View File

@@ -228,10 +228,7 @@ func (a *agent) Submit(callI Call) error {
call := callI.(*call) call := callI.(*call)
ctx, cancel := context.WithDeadline(call.req.Context(), call.execDeadline) ctx := call.req.Context()
call.req = call.req.WithContext(ctx)
defer cancel()
ctx, span := trace.StartSpan(ctx, "agent_submit") ctx, span := trace.StartSpan(ctx, "agent_submit")
defer span.End() defer span.End()
@@ -277,7 +274,11 @@ func (a *agent) submit(ctx context.Context, call *call) error {
statsDequeueAndStart(ctx) statsDequeueAndStart(ctx)
// pass this error (nil or otherwise) to end directly, to store status, etc // We are about to execute the function, set container Exec Deadline (call.Timeout)
ctx, cancel := context.WithTimeout(ctx, time.Duration(call.Timeout)*time.Second)
defer cancel()
// Pass this error (nil or otherwise) to end directly, to store status, etc.
err = slot.exec(ctx, call) err = slot.exec(ctx, call)
return a.handleCallEnd(ctx, call, slot, err, true) return a.handleCallEnd(ctx, call, slot, err, true)
} }
@@ -383,9 +384,17 @@ func handleStatsEnd(ctx context.Context, err error) {
// request type, this may launch a new container or wait for other containers to become idle // request type, this may launch a new container or wait for other containers to become idle
// or it may wait for resources to become available to launch a new container. // or it may wait for resources to become available to launch a new container.
func (a *agent) getSlot(ctx context.Context, call *call) (Slot, error) { func (a *agent) getSlot(ctx context.Context, call *call) (Slot, error) {
// start the deadline context for waiting for slots if call.Type == models.TypeAsync {
ctx, cancel := context.WithDeadline(ctx, call.slotDeadline) // *) for async, slot deadline is also call.Timeout. This is because we would like to
defer cancel() // allocate enough time for docker-pull, slot-wait, docker-start, etc.
// and also make sure we have call.Timeout inside the container. Total time
// to run an async becomes 2 * call.Timeout.
// *) for sync, there's no slot deadline, the timeout is controlled by http-client
// context (or runner gRPC context)
tmp, cancel := context.WithTimeout(ctx, time.Duration(call.Timeout)*time.Second)
ctx = tmp
defer cancel()
}
ctx, span := trace.StartSpan(ctx, "agent_get_slot") ctx, span := trace.StartSpan(ctx, "agent_get_slot")
defer span.End() defer span.End()
@@ -721,7 +730,7 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error {
errApp := make(chan error, 1) errApp := make(chan error, 1)
go func() { go func() {
ci := protocol.NewCallInfo(call.IsCloudEvent, call.Call, call.req) ci := protocol.NewCallInfo(call.IsCloudEvent, call.Call, call.req.WithContext(ctx))
errApp <- proto.Dispatch(ctx, ci, call.w) errApp <- proto.Dispatch(ctx, ci, call.w)
}() }()
@@ -752,8 +761,10 @@ func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch
call.containerState.UpdateState(ctx, ContainerStateStart, call.slots) call.containerState.UpdateState(ctx, ContainerStateStart, call.slots)
deadline := time.Now().Add(time.Duration(call.Timeout) * time.Second)
// add Fn-specific information to the config to shove everything into env vars for cold // add Fn-specific information to the config to shove everything into env vars for cold
call.Config["FN_DEADLINE"] = strfmt.DateTime(call.execDeadline).String() call.Config["FN_DEADLINE"] = strfmt.DateTime(deadline).String()
call.Config["FN_METHOD"] = call.Model().Method call.Config["FN_METHOD"] = call.Model().Method
call.Config["FN_REQUEST_URL"] = call.Model().URL call.Config["FN_REQUEST_URL"] = call.Model().URL
call.Config["FN_CALL_ID"] = call.Model().ID call.Config["FN_CALL_ID"] = call.Model().ID

View File

@@ -254,40 +254,36 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) {
} }
mem := c.Memory + uint64(c.TmpFsSize) mem := c.Memory + uint64(c.TmpFsSize)
if !a.resources.IsResourcePossible(mem, uint64(c.CPUs), c.Type == models.TypeAsync) { if !a.resources.IsResourcePossible(mem, uint64(c.CPUs), c.Type == models.TypeAsync) {
// if we're not going to be able to run this call on this machine, bail here. // if we're not going to be able to run this call on this machine, bail here.
return nil, models.ErrCallTimeoutServerBusy return nil, models.ErrCallTimeoutServerBusy
} }
err := setMaxBodyLimit(&a.cfg, &c) err := setMaxBodyLimit(&a.cfg, &c)
if err != nil { if err != nil {
return nil, err return nil, err
} }
setupCtx(&c)
c.da = a.da c.da = a.da
c.ct = a c.ct = a
c.stderr = setupLogger(c.req.Context(), a.cfg.MaxLogSize, c.Call)
ctx, _ := common.LoggerWithFields(c.req.Context(),
logrus.Fields{"id": c.ID, "app_id": c.AppID, "route": c.Path})
c.req = c.req.WithContext(ctx)
c.stderr = setupLogger(ctx, a.cfg.MaxLogSize, c.Call)
if c.w == nil { if c.w == nil {
// send STDOUT to logs if no writer given (async...) // send STDOUT to logs if no writer given (async...)
// TODO we could/should probably make this explicit to GetCall, ala 'WithLogger', but it's dupe code (who cares?) // TODO we could/should probably make this explicit to GetCall, ala 'WithLogger', but it's dupe code (who cares?)
c.w = c.stderr c.w = c.stderr
} }
now := time.Now()
slotDeadline := now.Add(time.Duration(c.Call.Timeout) * time.Second / 2)
execDeadline := now.Add(time.Duration(c.Call.Timeout) * time.Second)
c.slotDeadline = slotDeadline
c.execDeadline = execDeadline
return &c, nil return &c, nil
} }
func setupCtx(c *call) {
ctx, _ := common.LoggerWithFields(c.req.Context(),
logrus.Fields{"id": c.ID, "app_id": c.AppID, "route": c.Path})
c.req = c.req.WithContext(ctx)
}
func setMaxBodyLimit(cfg *AgentConfig, c *call) error { func setMaxBodyLimit(cfg *AgentConfig, c *call) error {
if cfg.MaxRequestSize > 0 && c.req.ContentLength > 0 && uint64(c.req.ContentLength) > cfg.MaxRequestSize { if cfg.MaxRequestSize > 0 && c.req.ContentLength > 0 && uint64(c.req.ContentLength) > cfg.MaxRequestSize {
return models.ErrRequestContentTooBig return models.ErrRequestContentTooBig
@@ -310,16 +306,10 @@ type call struct {
stderr io.ReadWriteCloser stderr io.ReadWriteCloser
ct callTrigger ct callTrigger
slots *slotQueue slots *slotQueue
slotDeadline time.Time
lbDeadline time.Time
execDeadline time.Time
requestState RequestState requestState RequestState
containerState ContainerState containerState ContainerState
slotHashId string slotHashId string
} isLB bool
func (c *call) LbDeadline() time.Time {
return c.lbDeadline
} }
func (c *call) SlotHashId() string { func (c *call) SlotHashId() string {
@@ -358,8 +348,7 @@ func (c *call) Start(ctx context.Context) error {
c.StartedAt = strfmt.DateTime(time.Now()) c.StartedAt = strfmt.DateTime(time.Now())
c.Status = "running" c.Status = "running"
// Do not write this header if lb-agent if !c.isLB {
if c.lbDeadline.IsZero() {
if rw, ok := c.w.(http.ResponseWriter); ok { // TODO need to figure out better way to wire response headers in if rw, ok := c.w.(http.ResponseWriter); ok { // TODO need to figure out better way to wire response headers in
rw.Header().Set("XXX-FXLB-WAIT", time.Time(c.StartedAt).Sub(time.Time(c.CreatedAt)).String()) rw.Header().Set("XXX-FXLB-WAIT", time.Time(c.StartedAt).Sub(time.Time(c.CreatedAt)).String())
} }

View File

@@ -7,7 +7,6 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"sync/atomic" "sync/atomic"
"time"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"go.opencensus.io/trace" "go.opencensus.io/trace"
@@ -104,20 +103,18 @@ func (a *lbAgent) GetCall(opts ...CallOpt) (Call, error) {
if c.req == nil || c.Call == nil { if c.req == nil || c.Call == nil {
return nil, errors.New("no model or request provided for call") return nil, errors.New("no model or request provided for call")
} }
err := setMaxBodyLimit(&a.cfg, &c) err := setMaxBodyLimit(&a.cfg, &c)
if err != nil { if err != nil {
return nil, err return nil, err
} }
setupCtx(&c)
c.isLB = true
c.da = a.da c.da = a.da
c.ct = a c.ct = a
c.stderr = &nullReadWriter{} c.stderr = &nullReadWriter{}
ctx, _ := common.LoggerWithFields(c.req.Context(),
logrus.Fields{"id": c.ID, "app_id": c.AppID, "route": c.Path})
c.req = c.req.WithContext(ctx)
c.lbDeadline = time.Now().Add(time.Duration(c.Call.Timeout) * time.Second)
c.slotHashId = getSlotQueueKey(&c) c.slotHashId = getSlotQueueKey(&c)
return &c, nil return &c, nil
@@ -157,12 +154,7 @@ func (a *lbAgent) Submit(callI Call) error {
} }
call := callI.(*call) call := callI.(*call)
ctx, span := trace.StartSpan(call.req.Context(), "agent_submit")
ctx, cancel := context.WithDeadline(call.req.Context(), call.lbDeadline)
call.req = call.req.WithContext(ctx)
defer cancel()
ctx, span := trace.StartSpan(ctx, "agent_submit")
defer span.End() defer span.End()
statsEnqueue(ctx) statsEnqueue(ctx)

View File

@@ -117,7 +117,6 @@ func (r *mockRunner) Address() string {
} }
type mockRunnerCall struct { type mockRunnerCall struct {
lbDeadline time.Time
r *http.Request r *http.Request
rw http.ResponseWriter rw http.ResponseWriter
stdErr io.ReadWriteCloser stdErr io.ReadWriteCloser
@@ -125,10 +124,6 @@ type mockRunnerCall struct {
slotHashId string slotHashId string
} }
func (c *mockRunnerCall) LbDeadline() time.Time {
return c.lbDeadline
}
func (c *mockRunnerCall) SlotHashId() string { func (c *mockRunnerCall) SlotHashId() string {
return c.slotHashId return c.slotHashId
} }
@@ -157,8 +152,10 @@ func setupMockRunnerPool(expectedRunners []string, execSleep time.Duration, maxC
func TestOneRunner(t *testing.T) { func TestOneRunner(t *testing.T) {
placer := pool.NewNaivePlacer() placer := pool.NewNaivePlacer()
rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5) rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5)
call := &mockRunnerCall{lbDeadline: time.Now().Add(1 * time.Second)} call := &mockRunnerCall{}
err := placer.PlaceCall(rp, context.Background(), call) ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Second))
defer cancel()
err := placer.PlaceCall(rp, ctx, call)
if err != nil { if err != nil {
t.Fatalf("Failed to place call on runner %v", err) t.Fatalf("Failed to place call on runner %v", err)
} }
@@ -167,7 +164,7 @@ func TestOneRunner(t *testing.T) {
func TestEnforceTimeoutFromContext(t *testing.T) { func TestEnforceTimeoutFromContext(t *testing.T) {
placer := pool.NewNaivePlacer() placer := pool.NewNaivePlacer()
rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5) rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5)
call := &mockRunnerCall{lbDeadline: time.Now().Add(1 * time.Second)} call := &mockRunnerCall{}
ctx, cancel := context.WithDeadline(context.Background(), time.Now()) ctx, cancel := context.WithDeadline(context.Background(), time.Now())
defer cancel() defer cancel()
err := placer.PlaceCall(rp, ctx, call) err := placer.PlaceCall(rp, ctx, call)
@@ -187,8 +184,10 @@ func TestRRRunner(t *testing.T) {
wg.Add(1) wg.Add(1)
go func(i int) { go func(i int) {
defer wg.Done() defer wg.Done()
call := &mockRunnerCall{lbDeadline: time.Now().Add(10 * time.Millisecond)} ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Millisecond))
err := placer.PlaceCall(rp, context.Background(), call) defer cancel()
call := &mockRunnerCall{}
err := placer.PlaceCall(rp, ctx, call)
if err != nil { if err != nil {
failures <- fmt.Errorf("Timed out call %d", i) failures <- fmt.Errorf("Timed out call %d", i)
} }
@@ -218,8 +217,11 @@ func TestEnforceLbTimeout(t *testing.T) {
wg.Add(1) wg.Add(1)
go func(i int) { go func(i int) {
defer wg.Done() defer wg.Done()
call := &mockRunnerCall{lbDeadline: time.Now().Add(10 * time.Millisecond)} ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Millisecond))
err := placer.PlaceCall(rp, context.Background(), call) defer cancel()
call := &mockRunnerCall{}
err := placer.PlaceCall(rp, ctx, call)
if err != nil { if err != nil {
failures <- fmt.Errorf("Timed out call %d", i) failures <- fmt.Errorf("Timed out call %d", i)
} }

View File

@@ -5,7 +5,6 @@ import (
"errors" "errors"
"io" "io"
"net/http" "net/http"
"time"
"github.com/fnproject/fn/api/models" "github.com/fnproject/fn/api/models"
"github.com/go-openapi/strfmt" "github.com/go-openapi/strfmt"
@@ -79,16 +78,8 @@ func (ci callInfoImpl) Input() io.Reader {
func (ci callInfoImpl) Deadline() strfmt.DateTime { func (ci callInfoImpl) Deadline() strfmt.DateTime {
deadline, ok := ci.req.Context().Deadline() deadline, ok := ci.req.Context().Deadline()
if !ok { if !ok {
// In theory deadline must have been set here, but if it wasn't then // In theory deadline must have been set here
// at this point it is already too late to raise an error. Set it to panic("No context deadline is set in protocol, should never happen")
// something meaningful.
// This assumes StartedAt was set to something other than the default.
// If that isn't set either, then how many things have gone wrong?
if ci.call.StartedAt == strfmt.NewDateTime() {
// We just panic if StartedAt is the default (i.e. not set)
panic("No context deadline and zero-value StartedAt - this should never happen")
}
deadline = ((time.Time)(ci.call.StartedAt)).Add(time.Duration(ci.call.Timeout) * time.Second)
} }
return strfmt.DateTime(deadline) return strfmt.DateTime(deadline)
} }

View File

@@ -2,12 +2,14 @@ package protocol
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"io" "io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
"testing" "testing"
"time"
"github.com/fnproject/fn/api/models" "github.com/fnproject/fn/api/models"
) )
@@ -16,7 +18,7 @@ type RequestData struct {
A string `json:"a"` A string `json:"a"`
} }
func setupRequest(data interface{}) *callInfoImpl { func setupRequest(data interface{}) (*callInfoImpl, context.CancelFunc) {
req := &http.Request{ req := &http.Request{
Method: http.MethodPost, Method: http.MethodPost,
URL: &url.URL{ URL: &url.URL{
@@ -46,12 +48,14 @@ func setupRequest(data interface{}) *callInfoImpl {
// fixup URL in models.Call // fixup URL in models.Call
call.URL = req.URL.String() call.URL = req.URL.String()
ci := &callInfoImpl{call: call, req: req} ctx, cancel := context.WithTimeout(req.Context(), 1*time.Second)
return ci ci := &callInfoImpl{call: call, req: req.WithContext(ctx)}
return ci, cancel
} }
func TestJSONProtocolwriteJSONInputRequestBasicFields(t *testing.T) { func TestJSONProtocolwriteJSONInputRequestBasicFields(t *testing.T) {
ci := setupRequest(nil) ci, cancel := setupRequest(nil)
defer cancel()
r, w := io.Pipe() r, w := io.Pipe()
proto := JSONProtocol{w, r} proto := JSONProtocol{w, r}
go func() { go func() {
@@ -88,7 +92,8 @@ func TestJSONProtocolwriteJSONInputRequestBasicFields(t *testing.T) {
func TestJSONProtocolwriteJSONInputRequestWithData(t *testing.T) { func TestJSONProtocolwriteJSONInputRequestWithData(t *testing.T) {
rDataBefore := RequestData{A: "a"} rDataBefore := RequestData{A: "a"}
ci := setupRequest(rDataBefore) ci, cancel := setupRequest(rDataBefore)
defer cancel()
r, w := io.Pipe() r, w := io.Pipe()
proto := JSONProtocol{w, r} proto := JSONProtocol{w, r}
go func() { go func() {
@@ -133,7 +138,8 @@ func TestJSONProtocolwriteJSONInputRequestWithData(t *testing.T) {
} }
func TestJSONProtocolwriteJSONInputRequestWithoutData(t *testing.T) { func TestJSONProtocolwriteJSONInputRequestWithoutData(t *testing.T) {
ci := setupRequest(nil) ci, cancel := setupRequest(nil)
defer cancel()
r, w := io.Pipe() r, w := io.Pipe()
proto := JSONProtocol{w, r} proto := JSONProtocol{w, r}
go func() { go func() {
@@ -177,7 +183,8 @@ func TestJSONProtocolwriteJSONInputRequestWithoutData(t *testing.T) {
} }
func TestJSONProtocolwriteJSONInputRequestWithQuery(t *testing.T) { func TestJSONProtocolwriteJSONInputRequestWithQuery(t *testing.T) {
ci := setupRequest(nil) ci, cancel := setupRequest(nil)
defer cancel()
r, w := io.Pipe() r, w := io.Pipe()
proto := JSONProtocol{w, r} proto := JSONProtocol{w, r}
go func() { go func() {

View File

@@ -67,8 +67,7 @@ type callHandle struct {
c *call // the agent's version of call c *call // the agent's version of call
// Timings, for metrics: // Timings, for metrics:
receivedTime strfmt.DateTime // When was the call received? receivedTime strfmt.DateTime // When was the call received?
allocatedTime strfmt.DateTime // When did we finish allocating capacity?
// For implementing http.ResponseWriter: // For implementing http.ResponseWriter:
headers http.Header headers http.Header
@@ -530,7 +529,7 @@ func (pr *pureRunner) handleTryCall(tc *runner.TryCall, state *callHandle) error
return err return err
} }
agent_call, err := pr.a.GetCall(FromModelAndInput(&c, state.pipeToFnR), WithWriter(state)) agent_call, err := pr.a.GetCall(FromModelAndInput(&c, state.pipeToFnR), WithWriter(state), WithContext(state.ctx))
if err != nil { if err != nil {
state.enqueueCallResponse(err) state.enqueueCallResponse(err)
return err return err
@@ -545,7 +544,6 @@ func (pr *pureRunner) handleTryCall(tc *runner.TryCall, state *callHandle) error
} }
state.c.slotHashId = string(hashId[:]) state.c.slotHashId = string(hashId[:])
} }
state.allocatedTime = strfmt.DateTime(time.Now())
pr.spawnSubmit(state) pr.spawnSubmit(state)
return nil return nil

View File

@@ -31,7 +31,7 @@ func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall
// The key is just the path in this case // The key is just the path in this case
key := call.Model().Path key := call.Model().Path
sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key)) sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key))
timeout := time.After(call.LbDeadline().Sub(time.Now()))
for { for {
runners, err := rp.Runners(call) runners, err := rp.Runners(call)
if err != nil { if err != nil {
@@ -43,8 +43,6 @@ func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall
select { select {
case <-ctx.Done(): case <-ctx.Done():
return models.ErrCallTimeoutServerBusy return models.ErrCallTimeoutServerBusy
case <-timeout:
return models.ErrCallTimeoutServerBusy
default: default:
} }
@@ -69,8 +67,6 @@ func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall
select { select {
case <-ctx.Done(): case <-ctx.Done():
return models.ErrCallTimeoutServerBusy return models.ErrCallTimeoutServerBusy
case <-timeout:
return models.ErrCallTimeoutServerBusy
case <-time.After(p.rrInterval): case <-time.After(p.rrInterval):
} }
} }

View File

@@ -25,7 +25,6 @@ func NewNaivePlacer() Placer {
} }
func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error { func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error {
timeout := time.After(call.LbDeadline().Sub(time.Now()))
for { for {
runners, err := rp.Runners(call) runners, err := rp.Runners(call)
@@ -37,8 +36,6 @@ func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call Runner
select { select {
case <-ctx.Done(): case <-ctx.Done():
return models.ErrCallTimeoutServerBusy return models.ErrCallTimeoutServerBusy
case <-timeout:
return models.ErrCallTimeoutServerBusy
default: default:
} }
@@ -62,8 +59,6 @@ func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call Runner
select { select {
case <-ctx.Done(): case <-ctx.Done():
return models.ErrCallTimeoutServerBusy return models.ErrCallTimeoutServerBusy
case <-timeout:
return models.ErrCallTimeoutServerBusy
case <-time.After(sp.rrInterval): case <-time.After(sp.rrInterval):
} }
} }

View File

@@ -4,7 +4,6 @@ import (
"context" "context"
"io" "io"
"net/http" "net/http"
"time"
"github.com/fnproject/fn/api/models" "github.com/fnproject/fn/api/models"
) )
@@ -42,7 +41,6 @@ type Runner interface {
// processed by a RunnerPool // processed by a RunnerPool
type RunnerCall interface { type RunnerCall interface {
SlotHashId() string SlotHashId() string
LbDeadline() time.Time
RequestBody() io.ReadCloser RequestBody() io.ReadCloser
ResponseWriter() http.ResponseWriter ResponseWriter() http.ResponseWriter
StdErr() io.ReadWriteCloser StdErr() io.ReadWriteCloser

View File

@@ -60,7 +60,7 @@ func TestGetExactCall(t *testing.T) {
} }
u.Path = path.Join(u.Path, "r", s.AppName, s.RoutePath) u.Path = path.Join(u.Path, "r", s.AppName, s.RoutePath)
callID := CallAsync(t, u, &bytes.Buffer{}) callID := CallAsync(t, s.Context, u, &bytes.Buffer{})
cfg := &call.GetAppsAppCallsCallParams{ cfg := &call.GetAppsAppCallsCallParams{
Call: callID, Call: callID,

View File

@@ -2,6 +2,7 @@ package tests
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"io" "io"
"net/url" "net/url"
@@ -15,9 +16,9 @@ import (
"github.com/fnproject/fn_go/models" "github.com/fnproject/fn_go/models"
) )
func CallAsync(t *testing.T, u url.URL, content io.Reader) string { func CallAsync(t *testing.T, ctx context.Context, u url.URL, content io.Reader) string {
output := &bytes.Buffer{} output := &bytes.Buffer{}
_, err := CallFN(u.String(), content, output, "POST", []string{}) _, err := CallFN(ctx, u.String(), content, output, "POST", []string{})
if err != nil { if err != nil {
t.Errorf("Got unexpected error: %v", err) t.Errorf("Got unexpected error: %v", err)
} }
@@ -41,9 +42,9 @@ func CallAsync(t *testing.T, u url.URL, content io.Reader) string {
return callID.CallID return callID.CallID
} }
func CallSync(t *testing.T, u url.URL, content io.Reader) string { func CallSync(t *testing.T, ctx context.Context, u url.URL, content io.Reader) string {
output := &bytes.Buffer{} output := &bytes.Buffer{}
resp, err := CallFN(u.String(), content, output, "POST", []string{}) resp, err := CallFN(ctx, u.String(), content, output, "POST", []string{})
if err != nil { if err != nil {
t.Errorf("Got unexpected error: %v", err) t.Errorf("Got unexpected error: %v", err)
} }
@@ -75,7 +76,7 @@ func TestCanCallfunction(t *testing.T) {
content := &bytes.Buffer{} content := &bytes.Buffer{}
output := &bytes.Buffer{} output := &bytes.Buffer{}
_, err := CallFN(u.String(), content, output, "POST", []string{}) _, err := CallFN(s.Context, u.String(), content, output, "POST", []string{})
if err != nil { if err != nil {
t.Errorf("Got unexpected error: %v", err) t.Errorf("Got unexpected error: %v", err)
} }
@@ -104,7 +105,7 @@ func TestCallOutputMatch(t *testing.T) {
Name string Name string
}{Name: "John"}) }{Name: "John"})
output := &bytes.Buffer{} output := &bytes.Buffer{}
_, err := CallFN(u.String(), content, output, "POST", []string{}) _, err := CallFN(s.Context, u.String(), content, output, "POST", []string{})
if err != nil { if err != nil {
t.Errorf("Got unexpected error: %v", err) t.Errorf("Got unexpected error: %v", err)
} }
@@ -134,7 +135,7 @@ func TestCanCallAsync(t *testing.T) {
Type: newRouteType, Type: newRouteType,
}) })
CallAsync(t, u, &bytes.Buffer{}) CallAsync(t, s.Context, u, &bytes.Buffer{})
} }
func TestCanGetAsyncState(t *testing.T) { func TestCanGetAsyncState(t *testing.T) {
@@ -157,7 +158,7 @@ func TestCanGetAsyncState(t *testing.T) {
Type: newRouteType, Type: newRouteType,
}) })
callID := CallAsync(t, u, &bytes.Buffer{}) callID := CallAsync(t, s.Context, u, &bytes.Buffer{})
cfg := &call.GetAppsAppCallsCallParams{ cfg := &call.GetAppsAppCallsCallParams{
Call: callID, Call: callID,
App: s.AppName, App: s.AppName,
@@ -221,7 +222,7 @@ func TestCanCauseTimeout(t *testing.T) {
}{Seconds: 11}) }{Seconds: 11})
output := &bytes.Buffer{} output := &bytes.Buffer{}
resp, _ := CallFN(u.String(), content, output, "POST", []string{}) resp, _ := CallFN(s.Context, u.String(), content, output, "POST", []string{})
if !strings.Contains(output.String(), "Timed out") { if !strings.Contains(output.String(), "Timed out") {
t.Errorf("Must fail because of timeout, but got error message: %v", output.String()) t.Errorf("Must fail because of timeout, but got error message: %v", output.String())
@@ -270,7 +271,7 @@ func TestCallResponseHeadersMatch(t *testing.T) {
u.Path = path.Join(u.Path, "r", s.AppName, rt.Path) u.Path = path.Join(u.Path, "r", s.AppName, rt.Path)
content := &bytes.Buffer{} content := &bytes.Buffer{}
output := &bytes.Buffer{} output := &bytes.Buffer{}
CallFN(u.String(), content, output, "POST", CallFN(s.Context, u.String(), content, output, "POST",
[]string{ []string{
"ACCEPT: application/xml", "ACCEPT: application/xml",
"ACCEPT: application/json; q=0.2", "ACCEPT: application/json; q=0.2",
@@ -305,7 +306,7 @@ func TestCanWriteLogs(t *testing.T) {
Size int Size int
}{Size: 20}) }{Size: 20})
callID := CallSync(t, u, content) callID := CallSync(t, s.Context, u, content)
cfg := &operations.GetAppsAppCallsCallLogParams{ cfg := &operations.GetAppsAppCallsCallLogParams{
Call: callID, Call: callID,
@@ -353,7 +354,7 @@ func TestOversizedLog(t *testing.T) {
Size int Size int
}{Size: size}) //exceeding log by 1 symbol }{Size: size}) //exceeding log by 1 symbol
callID := CallSync(t, u, content) callID := CallSync(t, s.Context, u, content)
cfg := &operations.GetAppsAppCallsCallLogParams{ cfg := &operations.GetAppsAppCallsCallLogParams{
Call: callID, Call: callID,

View File

@@ -41,7 +41,7 @@ func TestFnJSONFormats(t *testing.T) {
}) })
content := bytes.NewBuffer(b) content := bytes.NewBuffer(b)
output := &bytes.Buffer{} output := &bytes.Buffer{}
resp, err := CallFN(u.String(), content, output, "POST", []string{}) resp, err := CallFN(s.Context, u.String(), content, output, "POST", []string{})
if err != nil { if err != nil {
t.Errorf("Got unexpected error: %v", err) t.Errorf("Got unexpected error: %v", err)
} }

View File

@@ -136,6 +136,8 @@ func (s *TestHarness) Cleanup() {
for app, _ := range s.createdApps { for app, _ := range s.createdApps {
safeDeleteApp(ctx, s.Client, app) safeDeleteApp(ctx, s.Client, app)
} }
s.Cancel()
} }
func EnvAsHeader(req *http.Request, selectedEnv []string) { func EnvAsHeader(req *http.Request, selectedEnv []string) {
@@ -151,7 +153,7 @@ func EnvAsHeader(req *http.Request, selectedEnv []string) {
} }
} }
func CallFN(u string, content io.Reader, output io.Writer, method string, env []string) (*http.Response, error) { func CallFN(ctx context.Context, u string, content io.Reader, output io.Writer, method string, env []string) (*http.Response, error) {
if method == "" { if method == "" {
if content == nil { if content == nil {
method = "GET" method = "GET"
@@ -164,8 +166,8 @@ func CallFN(u string, content io.Reader, output io.Writer, method string, env []
if err != nil { if err != nil {
return nil, fmt.Errorf("error running route: %s", err) return nil, fmt.Errorf("error running route: %s", err)
} }
req.Header.Set("Content-Type", "application/json") req.Header.Set("Content-Type", "application/json")
req = req.WithContext(ctx)
if len(env) > 0 { if len(env) > 0 {
EnvAsHeader(req, env) EnvAsHeader(req, env)

View File

@@ -2,6 +2,7 @@ package tests
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@@ -10,8 +11,8 @@ import (
"path" "path"
"strings" "strings"
"testing" "testing"
"time"
apimodels "github.com/fnproject/fn/api/models"
apiutils "github.com/fnproject/fn/test/fn-api-tests" apiutils "github.com/fnproject/fn/test/fn-api-tests"
sdkmodels "github.com/fnproject/fn_go/models" sdkmodels "github.com/fnproject/fn_go/models"
) )
@@ -75,7 +76,7 @@ func TestCanExecuteFunction(t *testing.T) {
content := bytes.NewBuffer([]byte(body)) content := bytes.NewBuffer([]byte(body))
output := &bytes.Buffer{} output := &bytes.Buffer{}
resp, err := apiutils.CallFN(u.String(), content, output, "POST", []string{}) resp, err := apiutils.CallFN(s.Context, u.String(), content, output, "POST", []string{})
if err != nil { if err != nil {
t.Errorf("Got unexpected error: %v", err) t.Errorf("Got unexpected error: %v", err)
} }
@@ -118,7 +119,7 @@ func TestCanExecuteBigOutput(t *testing.T) {
content := bytes.NewBuffer([]byte(body)) content := bytes.NewBuffer([]byte(body))
output := &bytes.Buffer{} output := &bytes.Buffer{}
resp, err := apiutils.CallFN(u.String(), content, output, "POST", []string{}) resp, err := apiutils.CallFN(s.Context, u.String(), content, output, "POST", []string{})
if err != nil { if err != nil {
t.Errorf("Got unexpected error: %v", err) t.Errorf("Got unexpected error: %v", err)
} }
@@ -163,7 +164,7 @@ func TestCanExecuteTooBigOutput(t *testing.T) {
content := bytes.NewBuffer([]byte(body)) content := bytes.NewBuffer([]byte(body))
output := &bytes.Buffer{} output := &bytes.Buffer{}
resp, err := apiutils.CallFN(u.String(), content, output, "POST", []string{}) resp, err := apiutils.CallFN(s.Context, u.String(), content, output, "POST", []string{})
if err != nil { if err != nil {
t.Errorf("Got unexpected error: %v", err) t.Errorf("Got unexpected error: %v", err)
} }
@@ -208,7 +209,7 @@ func TestCanExecuteEmptyOutput(t *testing.T) {
content := bytes.NewBuffer([]byte(body)) content := bytes.NewBuffer([]byte(body))
output := &bytes.Buffer{} output := &bytes.Buffer{}
resp, err := apiutils.CallFN(u.String(), content, output, "POST", []string{}) resp, err := apiutils.CallFN(s.Context, u.String(), content, output, "POST", []string{})
if err != nil { if err != nil {
t.Errorf("Got unexpected error: %v", err) t.Errorf("Got unexpected error: %v", err)
} }
@@ -256,7 +257,7 @@ func TestBasicConcurrentExecution(t *testing.T) {
body := `{"echoContent": "HelloWorld", "sleepTime": 0, "isDebug": true}` body := `{"echoContent": "HelloWorld", "sleepTime": 0, "isDebug": true}`
content := bytes.NewBuffer([]byte(body)) content := bytes.NewBuffer([]byte(body))
output := &bytes.Buffer{} output := &bytes.Buffer{}
resp, err := apiutils.CallFN(u.String(), content, output, "POST", []string{}) resp, err := apiutils.CallFN(s.Context, u.String(), content, output, "POST", []string{})
if err != nil { if err != nil {
results <- fmt.Errorf("Got unexpected error: %v", err) results <- fmt.Errorf("Got unexpected error: %v", err)
return return
@@ -288,10 +289,14 @@ func TestSaturatedSystem(t *testing.T) {
s := apiutils.SetupHarness() s := apiutils.SetupHarness()
// override default 60 secs with shorter.
s.Cancel()
s.Context, s.Cancel = context.WithTimeout(context.Background(), 4*time.Second)
s.GivenAppExists(t, &sdkmodels.App{Name: s.AppName}) s.GivenAppExists(t, &sdkmodels.App{Name: s.AppName})
defer s.Cleanup() defer s.Cleanup()
timeout := int32(5) timeout := int32(1)
rt := s.BasicRoute() rt := s.BasicRoute()
rt.Image = "fnproject/fn-test-utils" rt.Image = "fnproject/fn-test-utils"
@@ -316,28 +321,8 @@ func TestSaturatedSystem(t *testing.T) {
content := bytes.NewBuffer([]byte(body)) content := bytes.NewBuffer([]byte(body))
output := &bytes.Buffer{} output := &bytes.Buffer{}
resp, err := apiutils.CallFN(u.String(), content, output, "POST", []string{}) resp, err := apiutils.CallFN(s.Context, u.String(), content, output, "POST", []string{})
if err != nil { if resp != nil || err == nil || s.Context.Err() == nil {
if err != apimodels.ErrCallTimeoutServerBusy { t.Fatalf("Expected response: %v err:%v", resp, err)
t.Errorf("Got unexpected error: %v", err)
}
}
// LB may respond either with:
// timeout: a timeout during a call to a runner
// too busy: a timeout during LB retry loop
exp1 := "{\"error\":{\"message\":\"Timed out - server too busy\"}}\n"
exp2 := "{\"error\":{\"message\":\"Timed out\"}}\n"
actual := output.String()
if strings.Contains(exp1, actual) && len(exp1) == len(actual) {
} else if strings.Contains(exp2, actual) && len(exp2) == len(actual) {
} else {
t.Errorf("Assertion error.\n\tExpected: %v or %v\n\tActual: %v", exp1, exp2, output.String())
}
if resp.StatusCode != http.StatusServiceUnavailable && resp.StatusCode != http.StatusGatewayTimeout {
t.Fatalf("StatusCode check failed on %v", resp.StatusCode)
} }
} }