mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
* fn: latency metrics for various call states
This complements the API latency metrics available
on LB agent. In this case, we would like to measure
calls that have finished with the following status:
"completed"
"canceled"
"timeouts"
"errors"
"server_busy"
and while measuring this latency, we subtract the
amount of time actual function execution took. This
is not precise, but an approximation mostly suitable
for trending.
Going forward, we could also subtract UDS wait time and/or
docker pull latency from this latency as an enhancement
to this PR.
291 lines
7.0 KiB
Go
291 lines
7.0 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/fnproject/fn/api/models"
|
|
pool "github.com/fnproject/fn/api/runnerpool"
|
|
)
|
|
|
|
type mockRunner struct {
|
|
wg sync.WaitGroup
|
|
sleep time.Duration
|
|
mtx sync.Mutex
|
|
maxCalls int32 // Max concurrent calls
|
|
curCalls int32 // Current calls
|
|
procCalls int32 // Processed calls
|
|
addr string
|
|
}
|
|
|
|
type mockRunnerPool struct {
|
|
runners []pool.Runner
|
|
generator pool.MTLSRunnerFactory
|
|
}
|
|
|
|
func newMockRunnerPool(rf pool.MTLSRunnerFactory, runnerAddrs []string) *mockRunnerPool {
|
|
var runners []pool.Runner
|
|
for _, addr := range runnerAddrs {
|
|
r, err := rf(addr, nil)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
runners = append(runners, r)
|
|
}
|
|
|
|
return &mockRunnerPool{
|
|
runners: runners,
|
|
generator: rf,
|
|
}
|
|
}
|
|
|
|
func (rp *mockRunnerPool) Runners(ctx context.Context, call pool.RunnerCall) ([]pool.Runner, error) {
|
|
return rp.runners, nil
|
|
}
|
|
|
|
func (rp *mockRunnerPool) Shutdown(context.Context) error {
|
|
return nil
|
|
}
|
|
|
|
func NewMockRunnerFactory(sleep time.Duration, maxCalls int32) pool.MTLSRunnerFactory {
|
|
return func(addr string, tlsConf *tls.Config) (pool.Runner, error) {
|
|
return &mockRunner{
|
|
sleep: sleep,
|
|
maxCalls: maxCalls,
|
|
addr: addr,
|
|
}, nil
|
|
}
|
|
}
|
|
|
|
func FaultyRunnerFactory() pool.MTLSRunnerFactory {
|
|
return func(addr string, tlsConf *tls.Config) (pool.Runner, error) {
|
|
return &mockRunner{
|
|
addr: addr,
|
|
}, errors.New("Creation of new runner failed")
|
|
}
|
|
}
|
|
|
|
func (r *mockRunner) checkAndIncrCalls() error {
|
|
r.mtx.Lock()
|
|
defer r.mtx.Unlock()
|
|
if r.curCalls >= r.maxCalls {
|
|
return models.ErrCallTimeoutServerBusy //TODO is that the correct error?
|
|
}
|
|
r.curCalls++
|
|
return nil
|
|
}
|
|
|
|
func (r *mockRunner) decrCalls() {
|
|
r.mtx.Lock()
|
|
defer r.mtx.Unlock()
|
|
r.curCalls--
|
|
}
|
|
|
|
func (r *mockRunner) Status(ctx context.Context) (*pool.RunnerStatus, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (r *mockRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, error) {
|
|
err := r.checkAndIncrCalls()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
defer r.decrCalls()
|
|
|
|
r.wg.Add(1)
|
|
defer r.wg.Done()
|
|
|
|
time.Sleep(r.sleep)
|
|
|
|
r.procCalls++
|
|
return true, nil
|
|
}
|
|
|
|
func (r *mockRunner) Close(context.Context) error {
|
|
go func() {
|
|
r.wg.Wait()
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func (r *mockRunner) Address() string {
|
|
return r.addr
|
|
}
|
|
|
|
type mockRunnerCall struct {
|
|
r *http.Request
|
|
rw http.ResponseWriter
|
|
stdErr io.ReadWriteCloser
|
|
model *models.Call
|
|
slotHashId string
|
|
|
|
// amount of time user execution inside container
|
|
userExecTime *time.Duration
|
|
}
|
|
|
|
func (c *mockRunnerCall) SlotHashId() string {
|
|
return c.slotHashId
|
|
}
|
|
|
|
func (c *mockRunnerCall) Extensions() map[string]string {
|
|
return nil
|
|
}
|
|
|
|
func (c *mockRunnerCall) RequestBody() io.ReadCloser {
|
|
return c.r.Body
|
|
}
|
|
|
|
func (c *mockRunnerCall) ResponseWriter() http.ResponseWriter {
|
|
return c.rw
|
|
}
|
|
|
|
func (c *mockRunnerCall) StdErr() io.ReadWriteCloser {
|
|
return c.stdErr
|
|
}
|
|
|
|
func (c *mockRunnerCall) Model() *models.Call {
|
|
return c.model
|
|
}
|
|
|
|
func (c *mockRunnerCall) AddUserExecutionTime(dur time.Duration) {
|
|
if c.userExecTime == nil {
|
|
c.userExecTime = new(time.Duration)
|
|
}
|
|
*c.userExecTime += dur
|
|
}
|
|
|
|
func (c *mockRunnerCall) GetUserExecutionTime() *time.Duration {
|
|
return c.userExecTime
|
|
}
|
|
|
|
func setupMockRunnerPool(expectedRunners []string, execSleep time.Duration, maxCalls int32) *mockRunnerPool {
|
|
rf := NewMockRunnerFactory(execSleep, maxCalls)
|
|
return newMockRunnerPool(rf, expectedRunners)
|
|
}
|
|
|
|
func TestOneRunner(t *testing.T) {
|
|
cfg := pool.NewPlacerConfig()
|
|
placer := pool.NewNaivePlacer(&cfg)
|
|
rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5)
|
|
modelCall := &models.Call{Type: models.TypeSync}
|
|
call := &mockRunnerCall{model: modelCall}
|
|
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Second))
|
|
defer cancel()
|
|
err := placer.PlaceCall(ctx, rp, call)
|
|
if err != nil {
|
|
t.Fatalf("Failed to place call on runner %v", err)
|
|
}
|
|
}
|
|
|
|
func TestEnforceTimeoutFromContext(t *testing.T) {
|
|
cfg := pool.NewPlacerConfig()
|
|
placer := pool.NewNaivePlacer(&cfg)
|
|
rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5)
|
|
|
|
modelCall := &models.Call{Type: models.TypeSync}
|
|
call := &mockRunnerCall{model: modelCall}
|
|
|
|
ctx, cancel := context.WithDeadline(context.Background(), time.Now())
|
|
defer cancel()
|
|
err := placer.PlaceCall(ctx, rp, call)
|
|
if err == nil {
|
|
t.Fatal("Call should have timed out")
|
|
}
|
|
}
|
|
|
|
func TestDetachedPlacerTimeout(t *testing.T) {
|
|
// In this test we set the detached placer timeout to a value lower than the request timeout (call.Timeout)
|
|
// the fake placer will just sleep for a time greater of the detached placement timeout and it will return
|
|
// the right error only if the detached timeout exceeds but the request timeout is still valid
|
|
cfg := pool.NewPlacerConfig()
|
|
cfg.DetachedPlacerTimeout = 300 * time.Millisecond
|
|
placer := pool.NewFakeDetachedPlacer(&cfg, 400*time.Millisecond)
|
|
|
|
modelCall := &models.Call{Type: models.TypeDetached}
|
|
call := &mockRunnerCall{model: modelCall}
|
|
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
|
|
defer cancel()
|
|
err := placer.PlaceCall(ctx, nil, call)
|
|
if err == nil {
|
|
t.Fatal("Detached call should have time out because of the expiration of the placement timeout")
|
|
}
|
|
|
|
}
|
|
|
|
func TestRRRunner(t *testing.T) {
|
|
cfg := pool.NewPlacerConfig()
|
|
placer := pool.NewNaivePlacer(&cfg)
|
|
rp := setupMockRunnerPool([]string{"171.19.0.1", "171.19.0.2"}, 10*time.Millisecond, 2)
|
|
|
|
parallelCalls := 2
|
|
var wg sync.WaitGroup
|
|
failures := make(chan error, parallelCalls)
|
|
for i := 0; i < parallelCalls; i++ {
|
|
wg.Add(1)
|
|
go func(i int) {
|
|
defer wg.Done()
|
|
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Millisecond))
|
|
defer cancel()
|
|
modelCall := &models.Call{Type: models.TypeSync}
|
|
call := &mockRunnerCall{model: modelCall}
|
|
|
|
err := placer.PlaceCall(ctx, rp, call)
|
|
if err != nil {
|
|
failures <- fmt.Errorf("Timed out call %d", i)
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
wg.Wait()
|
|
close(failures)
|
|
|
|
err := <-failures
|
|
if err != nil {
|
|
t.Fatalf("Expected no error %s", err.Error())
|
|
}
|
|
if rp.runners[1].(*mockRunner).procCalls != 1 && rp.runners[0].(*mockRunner).procCalls != 1 {
|
|
t.Fatal("Expected rr runner")
|
|
}
|
|
}
|
|
|
|
func TestEnforceLbTimeout(t *testing.T) {
|
|
cfg := pool.NewPlacerConfig()
|
|
placer := pool.NewNaivePlacer(&cfg)
|
|
rp := setupMockRunnerPool([]string{"171.19.0.1", "171.19.0.2"}, 10*time.Millisecond, 1)
|
|
|
|
parallelCalls := 5
|
|
var wg sync.WaitGroup
|
|
failures := make(chan error, parallelCalls)
|
|
for i := 0; i < parallelCalls; i++ {
|
|
wg.Add(1)
|
|
go func(i int) {
|
|
defer wg.Done()
|
|
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Millisecond))
|
|
defer cancel()
|
|
|
|
modelCall := &models.Call{Type: models.TypeSync}
|
|
call := &mockRunnerCall{model: modelCall}
|
|
|
|
err := placer.PlaceCall(ctx, rp, call)
|
|
if err != nil {
|
|
failures <- fmt.Errorf("Timed out call %d", i)
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
wg.Wait()
|
|
close(failures)
|
|
|
|
err := <-failures
|
|
if err == nil {
|
|
t.Fatal("Expected a call failure")
|
|
}
|
|
}
|