mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Vast commit, includes: * Introduces the Trigger domain entity. * Introduces the Fns domain entity. * V2 of the API for interacting with the new entities in swaggerv2.yml * Adds v2 end points for Apps to support PUT updates. * Rewrites the datastore level tests into a new pattern. * V2 routes use entity ID over name as the path parameter.
432 lines
12 KiB
Go
432 lines
12 KiB
Go
package tests
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
|
|
"github.com/fnproject/fn/api/agent"
|
|
"github.com/fnproject/fn/api/agent/drivers"
|
|
"github.com/fnproject/fn/api/agent/hybrid"
|
|
"github.com/fnproject/fn/api/common"
|
|
"github.com/fnproject/fn/api/models"
|
|
pool "github.com/fnproject/fn/api/runnerpool"
|
|
"github.com/fnproject/fn/api/server"
|
|
_ "github.com/fnproject/fn/api/server/defaultexts"
|
|
|
|
// We need docker client here, since we have a custom driver that wraps generic
|
|
// docker driver.
|
|
"github.com/fsouza/go-dockerclient"
|
|
"github.com/sirupsen/logrus"
|
|
|
|
"io/ioutil"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
LBAddress = "http://127.0.0.1:8081"
|
|
)
|
|
|
|
func LB() (string, error) {
|
|
u, err := url.Parse(LBAddress)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return u.Host, nil
|
|
}
|
|
|
|
func NewSystemTestNodePool() (pool.RunnerPool, error) {
|
|
myAddr := whoAmI()
|
|
runners := []string{
|
|
fmt.Sprintf("%s:9190", myAddr),
|
|
fmt.Sprintf("%s:9191", myAddr),
|
|
fmt.Sprintf("%s:9192", myAddr),
|
|
}
|
|
return agent.DefaultStaticRunnerPool(runners), nil
|
|
}
|
|
|
|
type state struct {
|
|
memory string
|
|
cancel func()
|
|
}
|
|
|
|
func SetUpSystem() (*state, error) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
state := &state{
|
|
cancel: cancel,
|
|
}
|
|
|
|
api, err := SetUpAPINode(ctx)
|
|
if err != nil {
|
|
return state, err
|
|
}
|
|
logrus.Info("Created API node")
|
|
|
|
lb, err := SetUpLBNode(ctx)
|
|
if err != nil {
|
|
return state, err
|
|
}
|
|
logrus.Info("Created LB node")
|
|
|
|
state.memory = os.Getenv(agent.EnvMaxTotalMemory)
|
|
os.Setenv(agent.EnvMaxTotalMemory, strconv.FormatUint(256*1024*1024, 10))
|
|
|
|
pr0, err := SetUpPureRunnerNode(ctx, 0)
|
|
if err != nil {
|
|
return state, err
|
|
}
|
|
pr1, err := SetUpPureRunnerNode(ctx, 1)
|
|
if err != nil {
|
|
return state, err
|
|
}
|
|
pr2, err := SetUpPureRunnerNode(ctx, 2)
|
|
if err != nil {
|
|
return state, err
|
|
}
|
|
logrus.Info("Created Pure Runner nodes")
|
|
|
|
go func() { api.Start(ctx) }()
|
|
logrus.Info("Started API node")
|
|
go func() { lb.Start(ctx) }()
|
|
logrus.Info("Started LB node")
|
|
go func() { pr0.Start(ctx) }()
|
|
go func() { pr1.Start(ctx) }()
|
|
go func() { pr2.Start(ctx) }()
|
|
logrus.Info("Started Pure Runner nodes")
|
|
// Wait for init - not great
|
|
time.Sleep(5 * time.Second)
|
|
return state, nil
|
|
}
|
|
|
|
func downloadMetrics() {
|
|
|
|
fileName, ok := os.LookupEnv("SYSTEM_TEST_PROMETHEUS_FILE")
|
|
if !ok || fileName == "" {
|
|
return
|
|
}
|
|
|
|
resp, err := http.Get(LBAddress + "/metrics")
|
|
if err != nil {
|
|
logrus.WithError(err).Fatal("Fetching metrics, got unexpected error")
|
|
}
|
|
defer resp.Body.Close()
|
|
body, err := ioutil.ReadAll(resp.Body)
|
|
if err != nil {
|
|
logrus.WithError(err).Fatal("Reading metrics body, got unexpected error")
|
|
}
|
|
|
|
err = ioutil.WriteFile(fileName, body, 0644)
|
|
if err != nil {
|
|
logrus.WithError(err).Fatalf("Writing metrics body to %v, got unexpected error", fileName)
|
|
}
|
|
}
|
|
|
|
func CleanUpSystem(st *state) error {
|
|
|
|
downloadMetrics()
|
|
|
|
_, err := http.Get("http://127.0.0.1:8081/shutdown")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = http.Get("http://127.0.0.1:8082/shutdown")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = http.Get("http://127.0.0.1:8083/shutdown")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = http.Get("http://127.0.0.1:8084/shutdown")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = http.Get("http://127.0.0.1:8085/shutdown")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if st.cancel != nil {
|
|
st.cancel()
|
|
}
|
|
|
|
// Wait for shutdown - not great
|
|
time.Sleep(5 * time.Second)
|
|
|
|
if st.memory != "" {
|
|
os.Setenv(agent.EnvMaxTotalMemory, st.memory)
|
|
} else {
|
|
os.Unsetenv(agent.EnvMaxTotalMemory)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func SetUpAPINode(ctx context.Context) (*server.Server, error) {
|
|
curDir := pwd()
|
|
var defaultDB, defaultMQ string
|
|
defaultDB = fmt.Sprintf("sqlite3://%s/data/fn.db", curDir)
|
|
defaultMQ = fmt.Sprintf("bolt://%s/data/fn.mq", curDir)
|
|
nodeType := server.ServerTypeAPI
|
|
opts := make([]server.ServerOption, 0)
|
|
opts = append(opts, server.WithWebPort(8085))
|
|
opts = append(opts, server.WithType(nodeType))
|
|
opts = append(opts, server.WithLogLevel(getEnv(server.EnvLogLevel, server.DefaultLogLevel)))
|
|
opts = append(opts, server.WithLogDest(getEnv(server.EnvLogDest, server.DefaultLogDest), "API"))
|
|
opts = append(opts, server.WithDBURL(getEnv(server.EnvDBURL, defaultDB)))
|
|
opts = append(opts, server.WithMQURL(getEnv(server.EnvMQURL, defaultMQ)))
|
|
opts = append(opts, server.WithLogURL(""))
|
|
opts = append(opts, server.WithLogstoreFromDatastore())
|
|
opts = append(opts, server.EnableShutdownEndpoint(ctx, func() {})) // TODO: do it properly
|
|
return server.New(ctx, opts...), nil
|
|
}
|
|
|
|
func SetUpLBNode(ctx context.Context) (*server.Server, error) {
|
|
nodeType := server.ServerTypeLB
|
|
opts := make([]server.ServerOption, 0)
|
|
opts = append(opts, server.WithWebPort(8081))
|
|
opts = append(opts, server.WithType(nodeType))
|
|
opts = append(opts, server.WithLogLevel(getEnv(server.EnvLogLevel, server.DefaultLogLevel)))
|
|
opts = append(opts, server.WithLogDest(getEnv(server.EnvLogDest, server.DefaultLogDest), "LB"))
|
|
opts = append(opts, server.WithDBURL(""))
|
|
opts = append(opts, server.WithMQURL(""))
|
|
opts = append(opts, server.WithLogURL(""))
|
|
opts = append(opts, server.EnableShutdownEndpoint(ctx, func() {})) // TODO: do it properly
|
|
ridProvider := &server.RIDProvider{
|
|
HeaderName: "fn_request_id",
|
|
RIDGenerator: common.FnRequestID,
|
|
}
|
|
opts = append(opts, server.WithRIDProvider(ridProvider))
|
|
opts = append(opts, server.WithPrometheus())
|
|
|
|
apiURL := "http://127.0.0.1:8085"
|
|
cl, err := hybrid.NewClient(apiURL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
nodePool, err := NewSystemTestNodePool()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
placer := pool.NewNaivePlacer()
|
|
|
|
keys := []string{"fn_appname", "fn_path"}
|
|
pool.RegisterPlacerViews(keys)
|
|
|
|
// Create an LB Agent with a Call Overrider to intercept calls in GetCall(). Overrider in this example
|
|
// scrubs CPU/TmpFsSize and adds FN_CHEESE key/value into extensions.
|
|
lbAgent, err := agent.NewLBAgent(agent.NewCachedDataAccess(cl), nodePool, placer, agent.WithLBCallOverrider(LBCallOverrider))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
opts = append(opts, server.WithAgent(lbAgent))
|
|
return server.New(ctx, opts...), nil
|
|
}
|
|
|
|
func SetUpPureRunnerNode(ctx context.Context, nodeNum int) (*server.Server, error) {
|
|
nodeType := server.ServerTypePureRunner
|
|
opts := make([]server.ServerOption, 0)
|
|
opts = append(opts, server.WithWebPort(8082+nodeNum))
|
|
opts = append(opts, server.WithGRPCPort(9190+nodeNum))
|
|
opts = append(opts, server.WithType(nodeType))
|
|
opts = append(opts, server.WithLogLevel(getEnv(server.EnvLogLevel, server.DefaultLogLevel)))
|
|
opts = append(opts, server.WithLogDest(getEnv(server.EnvLogDest, server.DefaultLogDest), "PURE-RUNNER"))
|
|
opts = append(opts, server.WithDBURL(""))
|
|
opts = append(opts, server.WithMQURL(""))
|
|
opts = append(opts, server.WithLogURL(""))
|
|
opts = append(opts, server.EnableShutdownEndpoint(ctx, func() {})) // TODO: do it properly
|
|
|
|
ds, err := hybrid.NewNopDataStore()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
grpcAddr := fmt.Sprintf(":%d", 9190+nodeNum)
|
|
|
|
// This is our Agent config, which we will use for both inner agent and docker.
|
|
cfg, err := agent.NewAgentConfig()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// customer driver that overrides generic docker driver
|
|
drv := &customDriver{
|
|
drv: agent.NewDockerDriver(cfg),
|
|
}
|
|
|
|
// inner agent for pure-runners
|
|
innerAgent := agent.New(ds,
|
|
agent.WithConfig(cfg),
|
|
agent.WithDockerDriver(drv),
|
|
agent.WithoutAsyncDequeue(),
|
|
agent.WithCallOverrider(PureRunnerCallOverrider))
|
|
|
|
cancelCtx, cancel := context.WithCancel(ctx)
|
|
|
|
// now create pure-runner that wraps agent.
|
|
pureRunner, err := agent.NewPureRunner(cancel, grpcAddr, agent.PureRunnerWithAgent(innerAgent))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
opts = append(opts, server.WithAgent(pureRunner), server.WithExtraCtx(cancelCtx))
|
|
return server.New(ctx, opts...), nil
|
|
}
|
|
|
|
func pwd() string {
|
|
cwd, err := os.Getwd()
|
|
if err != nil {
|
|
logrus.WithError(err).Fatalln("couldn't get working directory, possibly unsupported platform?")
|
|
}
|
|
// Replace forward slashes in case this is windows, URL parser errors
|
|
return strings.Replace(cwd, "\\", "/", -1)
|
|
}
|
|
|
|
func getEnv(key, fallback string) string {
|
|
if value, ok := os.LookupEnv(key); ok {
|
|
return value
|
|
}
|
|
return fallback
|
|
}
|
|
|
|
func getEnvInt(key string, fallback int) int {
|
|
if value, ok := os.LookupEnv(key); ok {
|
|
// linter liked this better than if/else
|
|
var err error
|
|
var i int
|
|
if i, err = strconv.Atoi(value); err != nil {
|
|
panic(err) // not sure how to handle this
|
|
}
|
|
return i
|
|
}
|
|
return fallback
|
|
}
|
|
|
|
// whoAmI searches for a non-local address on any network interface, returning
|
|
// the first one it finds. it could be expanded to search eth0 or en0 only but
|
|
// to date this has been unnecessary.
|
|
func whoAmI() net.IP {
|
|
ints, _ := net.Interfaces()
|
|
for _, i := range ints {
|
|
if i.Name == "docker0" || i.Name == "vboxnet0" || i.Name == "lo" {
|
|
// not perfect
|
|
continue
|
|
}
|
|
addrs, _ := i.Addrs()
|
|
for _, a := range addrs {
|
|
ip, _, err := net.ParseCIDR(a.String())
|
|
if a.Network() == "ip+net" && err == nil && ip.To4() != nil {
|
|
if !bytes.Equal(ip, net.ParseIP("127.0.0.1")) {
|
|
return ip
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func TestMain(m *testing.M) {
|
|
state, err := SetUpSystem()
|
|
if err != nil {
|
|
logrus.WithError(err).Fatal("Could not initialize system")
|
|
os.Exit(1)
|
|
}
|
|
// call flag.Parse() here if TestMain uses flags
|
|
result := m.Run()
|
|
err = CleanUpSystem(state)
|
|
if err != nil {
|
|
logrus.WithError(err).Warn("Could not clean up system")
|
|
}
|
|
if result == 0 {
|
|
fmt.Fprintln(os.Stdout, "😀 👍 🎗")
|
|
}
|
|
os.Exit(result)
|
|
}
|
|
|
|
// Memory Only LB Agent Call Option
|
|
func LBCallOverrider(c *models.Call, exts map[string]string) (map[string]string, error) {
|
|
|
|
// Set TmpFsSize and CPU to unlimited. This means LB operates on Memory
|
|
// only. Operators/Service providers are expected to override this
|
|
// and apply their own filter to set/override CPU/TmpFsSize/Memory
|
|
// and Extension variables.
|
|
c.TmpFsSize = 0
|
|
c.CPUs = models.MilliCPUs(0)
|
|
delete(c.Config, "FN_CPUS")
|
|
|
|
if exts == nil {
|
|
exts = make(map[string]string)
|
|
}
|
|
|
|
// Add an FN_CHEESE extension to be intercepted and specially handled by Pure Runner customDriver below
|
|
exts["FN_CHEESE"] = "Tete de Moine"
|
|
return exts, nil
|
|
}
|
|
|
|
// Pure Runner Agent Call Option
|
|
func PureRunnerCallOverrider(c *models.Call, exts map[string]string) (map[string]string, error) {
|
|
|
|
if exts == nil {
|
|
exts = make(map[string]string)
|
|
}
|
|
|
|
// Add an FN_WINE extension, just an example...
|
|
exts["FN_WINE"] = "1982 Margaux"
|
|
return exts, nil
|
|
}
|
|
|
|
// An example Pure Runner docker driver. Using CreateCookie, it intercepts a generated cookie to
|
|
// add an environment variable FN_CHEESE if it finds a FN_CHEESE extension.
|
|
type customDriver struct {
|
|
drv drivers.Driver
|
|
}
|
|
|
|
// implements Driver
|
|
func (d *customDriver) CreateCookie(ctx context.Context, task drivers.ContainerTask) (drivers.Cookie, error) {
|
|
cookie, err := d.drv.CreateCookie(ctx, task)
|
|
if err != nil {
|
|
return cookie, err
|
|
}
|
|
|
|
// docker driver specific data
|
|
obj := cookie.ContainerOptions()
|
|
opts, ok := obj.(docker.CreateContainerOptions)
|
|
if !ok {
|
|
logrus.Fatal("Unexpected driver, should be docker")
|
|
}
|
|
|
|
// if call extensions include 'foo', then let's add FN_CHEESE env vars, which should
|
|
// end up in Env/Config.
|
|
ext := task.Extensions()
|
|
cheese, ok := ext["FN_CHEESE"]
|
|
if ok {
|
|
opts.Config.Env = append(opts.Config.Env, "FN_CHEESE="+cheese)
|
|
}
|
|
|
|
wine, ok := ext["FN_WINE"]
|
|
if ok {
|
|
opts.Config.Env = append(opts.Config.Env, "FN_WINE="+wine)
|
|
}
|
|
|
|
return cookie, nil
|
|
}
|
|
|
|
// implements Driver
|
|
func (d *customDriver) PrepareCookie(ctx context.Context, cookie drivers.Cookie) error {
|
|
return d.drv.PrepareCookie(ctx, cookie)
|
|
}
|
|
|
|
// implements Driver
|
|
func (d *customDriver) Close() error {
|
|
return d.drv.Close()
|
|
}
|
|
|
|
var _ drivers.Driver = &customDriver{}
|