Files
fn-serverless/test/fn-system-tests/system_test.go
Tolga Ceylan f24172aa9d fn: introducing lb placer basic metrics (#1058)
* fn: introducing lb placer basic metrics

This change adds basic metrics to naive and consistent
hash LB placers. The stats show how many times we scanned
the full runner list, if runner pool failed to return a
runner list or if runner pool returned an empty list.

Placed and not placed status are also tracked along with
if TryExec returned an error or not. Most common error
code, Too-Busy is specifically tracked.

If client cancels/times out, this is also tracked as
a client cancel metric.

For placer latency, we would like to know how much time
the placer spent on searching for a runner until it
successfully places a call. This includes round-trip
times for NACK responses from the runners until a successful
TryExec() call. By excluding last successful TryExec() latency,
we try to exclude function execution & runner container
startup time from this metric in an attempt to isolate
Placer only latency.

* fn: latency and attempt tracker

Removing full scan metric. Tracking number of
runners attempted is a better metric for this
purpose.

Also, if rp.Runners() fail, this is an unrecoverable
error and we should bail out instead of retrying.

* fn: typo fix, ch placer finalize err return

* fn: enable LB placer metrics in WithAgentFromEnv if prometheus is enabled
2018-06-12 13:36:05 -07:00

323 lines
8.2 KiB
Go

package tests
import (
"bytes"
"context"
"fmt"
"github.com/fnproject/fn/api/agent"
"github.com/fnproject/fn/api/agent/hybrid"
pool "github.com/fnproject/fn/api/runnerpool"
"github.com/fnproject/fn/api/server"
_ "github.com/fnproject/fn/api/server/defaultexts"
"github.com/sirupsen/logrus"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"testing"
"time"
)
const (
LBAddress = "http://127.0.0.1:8081"
)
type SystemTestNodePool struct {
runners []pool.Runner
}
func LB() (string, error) {
u, err := url.Parse(LBAddress)
if err != nil {
return "", err
}
return u.Host, nil
}
func NewSystemTestNodePool() (pool.RunnerPool, error) {
myAddr := whoAmI()
runners := []string{
fmt.Sprintf("%s:9190", myAddr),
fmt.Sprintf("%s:9191", myAddr),
fmt.Sprintf("%s:9192", myAddr),
}
return agent.DefaultStaticRunnerPool(runners), nil
}
type state struct {
memory string
cancel func()
}
func SetUpSystem() (*state, error) {
ctx, cancel := context.WithCancel(context.Background())
state := &state{
cancel: cancel,
}
api, err := SetUpAPINode(ctx)
if err != nil {
return state, err
}
logrus.Info("Created API node")
lb, err := SetUpLBNode(ctx)
if err != nil {
return state, err
}
logrus.Info("Created LB node")
state.memory = os.Getenv(agent.EnvMaxTotalMemory)
os.Setenv(agent.EnvMaxTotalMemory, strconv.FormatUint(256*1024*1024, 10))
pr0, err := SetUpPureRunnerNode(ctx, 0)
if err != nil {
return state, err
}
pr1, err := SetUpPureRunnerNode(ctx, 1)
if err != nil {
return state, err
}
pr2, err := SetUpPureRunnerNode(ctx, 2)
if err != nil {
return state, err
}
logrus.Info("Created Pure Runner nodes")
go func() { api.Start(ctx) }()
logrus.Info("Started API node")
go func() { lb.Start(ctx) }()
logrus.Info("Started LB node")
go func() { pr0.Start(ctx) }()
go func() { pr1.Start(ctx) }()
go func() { pr2.Start(ctx) }()
logrus.Info("Started Pure Runner nodes")
// Wait for init - not great
time.Sleep(5 * time.Second)
return state, nil
}
func downloadMetrics() {
fileName, ok := os.LookupEnv("SYSTEM_TEST_PROMETHEUS_FILE")
if !ok || fileName == "" {
return
}
resp, err := http.Get(LBAddress + "/metrics")
if err != nil {
logrus.WithError(err).Fatal("Fetching metrics, got unexpected error")
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
logrus.WithError(err).Fatal("Reading metrics body, got unexpected error")
}
err = ioutil.WriteFile(fileName, body, 0644)
if err != nil {
logrus.WithError(err).Fatalf("Writing metrics body to %v, got unexpected error", fileName)
}
}
func CleanUpSystem(st *state) error {
downloadMetrics()
_, err := http.Get("http://127.0.0.1:8081/shutdown")
if err != nil {
return err
}
_, err = http.Get("http://127.0.0.1:8082/shutdown")
if err != nil {
return err
}
_, err = http.Get("http://127.0.0.1:8083/shutdown")
if err != nil {
return err
}
_, err = http.Get("http://127.0.0.1:8084/shutdown")
if err != nil {
return err
}
_, err = http.Get("http://127.0.0.1:8085/shutdown")
if err != nil {
return err
}
if st.cancel != nil {
st.cancel()
}
// Wait for shutdown - not great
time.Sleep(5 * time.Second)
if st.memory != "" {
os.Setenv(agent.EnvMaxTotalMemory, st.memory)
} else {
os.Unsetenv(agent.EnvMaxTotalMemory)
}
return nil
}
func SetUpAPINode(ctx context.Context) (*server.Server, error) {
curDir := pwd()
var defaultDB, defaultMQ string
defaultDB = fmt.Sprintf("sqlite3://%s/data/fn.db", curDir)
defaultMQ = fmt.Sprintf("bolt://%s/data/fn.mq", curDir)
nodeType := server.ServerTypeAPI
opts := make([]server.ServerOption, 0)
opts = append(opts, server.WithWebPort(8085))
opts = append(opts, server.WithType(nodeType))
opts = append(opts, server.WithLogLevel(getEnv(server.EnvLogLevel, server.DefaultLogLevel)))
opts = append(opts, server.WithLogDest(getEnv(server.EnvLogDest, server.DefaultLogDest), "API"))
opts = append(opts, server.WithDBURL(getEnv(server.EnvDBURL, defaultDB)))
opts = append(opts, server.WithMQURL(getEnv(server.EnvMQURL, defaultMQ)))
opts = append(opts, server.WithLogURL(""))
opts = append(opts, server.WithLogstoreFromDatastore())
opts = append(opts, server.EnableShutdownEndpoint(ctx, func() {})) // TODO: do it properly
return server.New(ctx, opts...), nil
}
func SetUpLBNode(ctx context.Context) (*server.Server, error) {
nodeType := server.ServerTypeLB
opts := make([]server.ServerOption, 0)
opts = append(opts, server.WithWebPort(8081))
opts = append(opts, server.WithType(nodeType))
opts = append(opts, server.WithLogLevel(getEnv(server.EnvLogLevel, server.DefaultLogLevel)))
opts = append(opts, server.WithLogDest(getEnv(server.EnvLogDest, server.DefaultLogDest), "LB"))
opts = append(opts, server.WithDBURL(""))
opts = append(opts, server.WithMQURL(""))
opts = append(opts, server.WithLogURL(""))
opts = append(opts, server.EnableShutdownEndpoint(ctx, func() {})) // TODO: do it properly
opts = append(opts, server.WithPrometheus())
apiURL := "http://127.0.0.1:8085"
cl, err := hybrid.NewClient(apiURL)
if err != nil {
return nil, err
}
nodePool, err := NewSystemTestNodePool()
if err != nil {
return nil, err
}
placer := pool.NewNaivePlacer()
keys := []string{"fn_appname", "fn_path"}
pool.RegisterPlacerViews(keys)
agent, err := agent.NewLBAgent(agent.NewCachedDataAccess(cl), nodePool, placer)
if err != nil {
return nil, err
}
opts = append(opts, server.WithAgent(agent))
return server.New(ctx, opts...), nil
}
func SetUpPureRunnerNode(ctx context.Context, nodeNum int) (*server.Server, error) {
nodeType := server.ServerTypePureRunner
opts := make([]server.ServerOption, 0)
opts = append(opts, server.WithWebPort(8082+nodeNum))
opts = append(opts, server.WithGRPCPort(9190+nodeNum))
opts = append(opts, server.WithType(nodeType))
opts = append(opts, server.WithLogLevel(getEnv(server.EnvLogLevel, server.DefaultLogLevel)))
opts = append(opts, server.WithLogDest(getEnv(server.EnvLogDest, server.DefaultLogDest), "PURE-RUNNER"))
opts = append(opts, server.WithDBURL(""))
opts = append(opts, server.WithMQURL(""))
opts = append(opts, server.WithLogURL(""))
opts = append(opts, server.EnableShutdownEndpoint(ctx, func() {})) // TODO: do it properly
ds, err := hybrid.NewNopDataStore()
if err != nil {
return nil, err
}
grpcAddr := fmt.Sprintf(":%d", 9190+nodeNum)
cancelCtx, cancel := context.WithCancel(ctx)
prAgent, err := agent.NewPureRunner(cancel, grpcAddr, ds, "", "", "", nil)
if err != nil {
return nil, err
}
opts = append(opts, server.WithAgent(prAgent), server.WithExtraCtx(cancelCtx))
return server.New(ctx, opts...), nil
}
func pwd() string {
cwd, err := os.Getwd()
if err != nil {
logrus.WithError(err).Fatalln("couldn't get working directory, possibly unsupported platform?")
}
// Replace forward slashes in case this is windows, URL parser errors
return strings.Replace(cwd, "\\", "/", -1)
}
func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}
func getEnvInt(key string, fallback int) int {
if value, ok := os.LookupEnv(key); ok {
// linter liked this better than if/else
var err error
var i int
if i, err = strconv.Atoi(value); err != nil {
panic(err) // not sure how to handle this
}
return i
}
return fallback
}
// whoAmI searches for a non-local address on any network interface, returning
// the first one it finds. it could be expanded to search eth0 or en0 only but
// to date this has been unnecessary.
func whoAmI() net.IP {
ints, _ := net.Interfaces()
for _, i := range ints {
if i.Name == "docker0" || i.Name == "vboxnet0" || i.Name == "lo" {
// not perfect
continue
}
addrs, _ := i.Addrs()
for _, a := range addrs {
ip, _, err := net.ParseCIDR(a.String())
if a.Network() == "ip+net" && err == nil && ip.To4() != nil {
if !bytes.Equal(ip, net.ParseIP("127.0.0.1")) {
return ip
}
}
}
}
return nil
}
func TestMain(m *testing.M) {
state, err := SetUpSystem()
if err != nil {
logrus.WithError(err).Fatal("Could not initialize system")
os.Exit(1)
}
// call flag.Parse() here if TestMain uses flags
result := m.Run()
err = CleanUpSystem(state)
if err != nil {
logrus.WithError(err).Warn("Could not clean up system")
}
if result == 0 {
fmt.Fprintln(os.Stdout, "😀 👍 🎗")
}
os.Exit(result)
}