diff --git a/Makefile b/Makefile index bda858ebf..a93aa9f55 100644 --- a/Makefile +++ b/Makefile @@ -1,74 +1,104 @@ # Just builds -.PHONY: all test dep build test-log-datastore checkfmt pull-images fn-test-utils test-middleware test-extensions test-basic - +.PHONY: dep dep: dep ensure --vendor-only +.PHONY: dep-up dep-up: dep ensure +.PHONY: build build: api/agent/grpc/runner.pb.go go build -o fnserver ./cmd/fnserver +.PHONY: install install: go build -o ${GOPATH}/bin/fnserver ./cmd/fnserver +.PHONY: checkfmt checkfmt: ./go-fmt.sh +.PHONY: clear-images clear-images: -docker images -q -f dangling=true | xargs docker rmi -f - for i in fnproject/fn-test-utils fnproject/hello fnproject/dind fnproject/fnserver ; do \ + for i in fnproject/fn-test-utils fnproject/fn-status-checker fnproject/hello fnproject/dind fnproject/fnserver ; do \ docker images "$$i" --format '{{ .ID }}\t{{ .Repository }}\t{{ .Tag}}' | while read id repo tag; do \ if [ "$$tag" = "" ]; then docker rmi "$$id"; else docker rmi "$$repo:$$tag"; fi; done; done +.PHONY: release-fnserver release-fnserver: ./release.sh +.PHONY: build-dind build-dind: (cd images/dind && ./build.sh) +.PHONY: release-dind release-dind: (cd images/dind && ./release.sh) +.PHONY: fn-status-checker +fn-status-checker: checkfmt + cd images/fn-status-checker && ./build.sh + +.PHONY: fn-test-utils fn-test-utils: checkfmt cd images/fn-test-utils && ./build.sh +.PHONY: test-middleware test-middleware: test-basic cd examples/middleware && go build +.PHONY: test-extensions test-extensions: test-basic cd examples/extensions && go build -test-basic: checkfmt pull-images fn-test-utils +.PHONY: test-basic +test-basic: checkfmt pull-images fn-test-utils fn-status-checker ./test.sh +.PHONY: test test: checkfmt pull-images test-basic test-middleware test-extensions test-system +.PHONY: test-system test-system: test-basic ./system_test.sh sqlite3 ./system_test.sh mysql ./system_test.sh postgres +.PHONY: img-busybox img-busybox: docker pull busybox + +.PHONY: img-hello img-hello: docker pull fnproject/hello + +.PHONY: img-mysql img-mysql: /bin/bash -c "source ./helpers.sh && docker_pull_mysql" + +.PHONY: img-postgres img-postgres: /bin/bash -c "source ./helpers.sh && docker_pull_postgres" + +.PHONY: img-minio img-minio: /bin/bash -c "source ./helpers.sh && docker_pull_minio" +.PHONY: pull-images pull-images: img-hello img-mysql img-postgres img-minio img-busybox +.PHONY: test-datastore test-datastore: cd api/datastore && go test -v ./... +.PHONY: test-log-datastore test-log-datastore: cd api/logs && go test -v ./... +.PHONY: test-build-arm test-build-arm: GOARCH=arm GOARM=5 $(MAKE) build GOARCH=arm GOARM=6 $(MAKE) build @@ -78,15 +108,19 @@ test-build-arm: %.pb.go: %.proto protoc --proto_path=$(@D) --proto_path=./vendor --go_out=plugins=grpc:$(@D) $< +.PHONY: run run: build GIN_MODE=debug ./fnserver +.PHONY: docker-build docker-build: docker build --build-arg HTTPS_PROXY --build-arg HTTP_PROXY -t fnproject/fnserver:latest . +.PHONY: docker-run docker-run: docker-build docker run --rm --privileged -it -e NO_PROXY -e HTTP_PROXY -e FN_LOG_LEVEL=debug -e "FN_DB_URL=sqlite3:///app/data/fn.db" -v ${CURDIR}/data:/app/data -p 8080:8080 fnproject/fnserver +.PHONY: docker-test docker-test: docker run -ti --privileged --rm -e FN_LOG_LEVEL=debug \ -v /var/run/docker.sock:/var/run/docker.sock \ @@ -95,4 +129,5 @@ docker-test: fnproject/go:dev go test \ -v $(shell docker run --rm -ti -v ${CURDIR}:/go/src/github.com/fnproject/fn -w /go/src/github.com/fnproject/fn -e GOPATH=/go golang:alpine sh -c 'go list ./... | grep -v vendor | grep -v examples | grep -v tool | grep -v fn') +.PHONY: all all: dep build diff --git a/api/agent/agent.go b/api/agent/agent.go index bd4b1ef66..1e857ad35 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -207,6 +207,7 @@ func WithCallOverrider(fn CallOverrider) AgentOption { func NewDockerDriver(cfg *AgentConfig) (drivers.Driver, error) { return drivers.New("docker", drivers.Config{ DockerNetworks: cfg.DockerNetworks, + DockerLoadFile: cfg.DockerLoadFile, ServerVersion: cfg.MinDockerVersion, PreForkPoolSize: cfg.PreForkPoolSize, PreForkImage: cfg.PreForkImage, diff --git a/api/agent/config.go b/api/agent/config.go index 71fb1b306..36c9a6539 100644 --- a/api/agent/config.go +++ b/api/agent/config.go @@ -11,6 +11,7 @@ import ( type AgentConfig struct { MinDockerVersion string `json:"min_docker_version"` DockerNetworks string `json:"docker_networks"` + DockerLoadFile string `json:"docker_load_file"` FreezeIdle time.Duration `json:"freeze_idle_msecs"` EjectIdle time.Duration `json:"eject_idle_msecs"` HotPoll time.Duration `json:"hot_poll_msecs"` @@ -36,6 +37,7 @@ type AgentConfig struct { const ( EnvDockerNetworks = "FN_DOCKER_NETWORKS" + EnvDockerLoadFile = "FN_DOCKER_LOAD_FILE" EnvFreezeIdle = "FN_FREEZE_IDLE_MSECS" EnvEjectIdle = "FN_EJECT_IDLE_MSECS" EnvHotPoll = "FN_HOT_POLL_MSECS" @@ -96,6 +98,7 @@ func NewAgentConfig() (*AgentConfig, error) { err = setEnvUint(err, EnvPreForkUseOnce, &cfg.PreForkUseOnce) err = setEnvStr(err, EnvPreForkNetworks, &cfg.PreForkNetworks) err = setEnvStr(err, EnvDockerNetworks, &cfg.DockerNetworks) + err = setEnvStr(err, EnvDockerLoadFile, &cfg.DockerLoadFile) err = setEnvUint(err, EnvMaxTmpFsInodes, &cfg.MaxTmpFsInodes) if err != nil { diff --git a/api/agent/drivers/docker/cookie.go b/api/agent/drivers/docker/cookie.go index 03f015ca3..b7397839e 100644 --- a/api/agent/drivers/docker/cookie.go +++ b/api/agent/drivers/docker/cookie.go @@ -25,6 +25,18 @@ type cookie struct { drv *DockerDriver } +func (c *cookie) configureMem(log logrus.FieldLogger) { + if c.task.Memory() == 0 { + return + } + + mem := int64(c.task.Memory()) + + c.opts.Config.Memory = mem + c.opts.Config.MemorySwap = mem // disables swap + c.opts.Config.KernelMemory = mem +} + func (c *cookie) configureFsSize(log logrus.FieldLogger) { if c.task.FsSize() == 0 { return diff --git a/api/agent/drivers/docker/docker.go b/api/agent/drivers/docker/docker.go index 0b3d8a929..f9ee3ec92 100644 --- a/api/agent/drivers/docker/docker.go +++ b/api/agent/drivers/docker/docker.go @@ -92,6 +92,13 @@ func NewDocker(conf drivers.Config) *DockerDriver { } } + if conf.DockerLoadFile != "" { + err = loadDockerImages(driver, conf.DockerLoadFile) + if err != nil { + logrus.WithError(err).Fatalf("cannot load docker images in %s", conf.DockerLoadFile) + } + } + return driver } @@ -118,6 +125,12 @@ func checkDockerVersion(driver *DockerDriver, expected string) error { return nil } +func loadDockerImages(driver *DockerDriver, filePath string) error { + ctx, log := common.LoggerWithFields(context.Background(), logrus.Fields{"stack": "loadDockerImages"}) + log.Infof("Loading docker images from %v", filePath) + return driver.docker.LoadImages(ctx, filePath) +} + func registryFromEnv() map[string]docker.AuthConfiguration { var auths *docker.AuthConfigurations var err error @@ -208,9 +221,6 @@ func (drv *DockerDriver) CreateCookie(ctx context.Context, task drivers.Containe opts := docker.CreateContainerOptions{ Name: task.Id(), Config: &docker.Config{ - Memory: int64(task.Memory()), - MemorySwap: int64(task.Memory()), // disables swap - KernelMemory: int64(task.Memory()), Image: task.Image(), OpenStdin: true, AttachStdout: true, @@ -234,6 +244,7 @@ func (drv *DockerDriver) CreateCookie(ctx context.Context, task drivers.Containe drv: drv, } + cookie.configureMem(log) cookie.configureCmd(log) cookie.configureEnv(log) cookie.configureCPU(log) diff --git a/api/agent/drivers/docker/docker_client.go b/api/agent/drivers/docker/docker_client.go index e889554be..ddc575f1b 100644 --- a/api/agent/drivers/docker/docker_client.go +++ b/api/agent/drivers/docker/docker_client.go @@ -7,6 +7,7 @@ import ( "crypto/tls" "net" "net/http" + "os" "strings" "time" @@ -46,6 +47,7 @@ type dockerClient interface { InspectContainerWithContext(container string, ctx context.Context) (*docker.Container, error) Stats(opts docker.StatsOptions) error Info(ctx context.Context) (*docker.DockerInfo, error) + LoadImages(ctx context.Context, filePath string) error } // TODO: switch to github.com/docker/engine-api @@ -227,6 +229,24 @@ func filterNoSuchContainer(ctx context.Context, err error) error { return err } +func (d *dockerWrap) LoadImages(ctx context.Context, filePath string) error { + ctx, span := trace.StartSpan(ctx, "docker_load_images") + defer span.End() + + file, err := os.Open(filePath) + if err != nil { + return err + } + defer file.Close() + + // No retries here. LoadImage is typically called at startup and we fail/timeout + // at first attempt. + return d.docker.LoadImage(docker.LoadImageOptions{ + InputStream: file, + Context: ctx, + }) +} + func (d *dockerWrap) Info(ctx context.Context) (info *docker.DockerInfo, err error) { // NOTE: we're not very responsible and prometheus wasn't loved as a child, this // threads through directly down to the docker call, skipping retires, so that we diff --git a/api/agent/drivers/driver.go b/api/agent/drivers/driver.go index 0b51b87da..52b9bec3b 100644 --- a/api/agent/drivers/driver.go +++ b/api/agent/drivers/driver.go @@ -208,6 +208,7 @@ const ( type Config struct { Docker string `json:"docker"` DockerNetworks string `json:"docker_networks"` + DockerLoadFile string `json:"docker_load_file"` ServerVersion string `json:"server_version"` PreForkPoolSize uint64 `json:"pre_fork_pool_size"` PreForkImage string `json:"pre_fork_image"` diff --git a/api/agent/grpc/runner.pb.go b/api/agent/grpc/runner.pb.go index c72d0cc7b..fba58b216 100644 --- a/api/agent/grpc/runner.pb.go +++ b/api/agent/grpc/runner.pb.go @@ -575,7 +575,15 @@ func _RunnerMsg_OneofSizer(msg proto.Message) (n int) { } type RunnerStatus struct { - Active int32 `protobuf:"varint,2,opt,name=active" json:"active,omitempty"` + Active int32 `protobuf:"varint,2,opt,name=active" json:"active,omitempty"` + Failed bool `protobuf:"varint,3,opt,name=failed" json:"failed,omitempty"` + Id string `protobuf:"bytes,4,opt,name=id" json:"id,omitempty"` + Details string `protobuf:"bytes,5,opt,name=details" json:"details,omitempty"` + ErrorCode int32 `protobuf:"varint,6,opt,name=errorCode" json:"errorCode,omitempty"` + ErrorStr string `protobuf:"bytes,7,opt,name=errorStr" json:"errorStr,omitempty"` + CreatedAt string `protobuf:"bytes,8,opt,name=createdAt" json:"createdAt,omitempty"` + StartedAt string `protobuf:"bytes,9,opt,name=startedAt" json:"startedAt,omitempty"` + CompletedAt string `protobuf:"bytes,10,opt,name=completedAt" json:"completedAt,omitempty"` } func (m *RunnerStatus) Reset() { *m = RunnerStatus{} } @@ -590,6 +598,62 @@ func (m *RunnerStatus) GetActive() int32 { return 0 } +func (m *RunnerStatus) GetFailed() bool { + if m != nil { + return m.Failed + } + return false +} + +func (m *RunnerStatus) GetId() string { + if m != nil { + return m.Id + } + return "" +} + +func (m *RunnerStatus) GetDetails() string { + if m != nil { + return m.Details + } + return "" +} + +func (m *RunnerStatus) GetErrorCode() int32 { + if m != nil { + return m.ErrorCode + } + return 0 +} + +func (m *RunnerStatus) GetErrorStr() string { + if m != nil { + return m.ErrorStr + } + return "" +} + +func (m *RunnerStatus) GetCreatedAt() string { + if m != nil { + return m.CreatedAt + } + return "" +} + +func (m *RunnerStatus) GetStartedAt() string { + if m != nil { + return m.StartedAt + } + return "" +} + +func (m *RunnerStatus) GetCompletedAt() string { + if m != nil { + return m.CompletedAt + } + return "" +} + func init() { proto.RegisterType((*TryCall)(nil), "TryCall") proto.RegisterType((*DataFrame)(nil), "DataFrame") @@ -745,44 +809,46 @@ var _RunnerProtocol_serviceDesc = grpc.ServiceDesc{ func init() { proto.RegisterFile("runner.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 611 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x94, 0xfd, 0x8a, 0xd3, 0x40, - 0x10, 0xc0, 0x9b, 0x7e, 0x77, 0x92, 0xfb, 0x60, 0x11, 0x09, 0xf5, 0xc0, 0x12, 0x3f, 0x28, 0x08, - 0x39, 0xad, 0x0a, 0x87, 0xa0, 0xa0, 0x67, 0x8f, 0x28, 0x1c, 0xc8, 0x56, 0xfc, 0xb7, 0xec, 0x25, - 0x73, 0x6d, 0x74, 0x9b, 0x2d, 0xbb, 0xd3, 0xc3, 0x3e, 0x8a, 0x2f, 0xe5, 0x03, 0xf8, 0x34, 0xb2, - 0x9b, 0x34, 0x57, 0xee, 0x1f, 0xfd, 0x6f, 0x67, 0x7e, 0xf3, 0xb5, 0x33, 0xb3, 0x0b, 0x81, 0xde, - 0x14, 0x05, 0xea, 0x78, 0xad, 0x15, 0xa9, 0xe1, 0x83, 0x85, 0x52, 0x0b, 0x89, 0xa7, 0x4e, 0xba, - 0xda, 0x5c, 0x9f, 0xe2, 0x6a, 0x4d, 0xdb, 0x12, 0x46, 0xbf, 0x3d, 0xe8, 0x7d, 0xd5, 0xdb, 0x73, - 0x21, 0x25, 0x1b, 0xc3, 0xf1, 0x4a, 0x65, 0x28, 0xcd, 0x3c, 0x15, 0x52, 0xce, 0xbf, 0x1b, 0x55, - 0x84, 0xde, 0xc8, 0x1b, 0x0f, 0xf8, 0x61, 0xa9, 0xb7, 0x56, 0x9f, 0x8d, 0x2a, 0xd8, 0x08, 0x02, - 0x23, 0x15, 0xcd, 0x97, 0xc2, 0x2c, 0xe7, 0x79, 0x16, 0x36, 0x9d, 0x15, 0x58, 0x5d, 0x22, 0xcc, - 0xf2, 0x53, 0xc6, 0xce, 0x00, 0xf0, 0x27, 0x61, 0x61, 0x72, 0x55, 0x98, 0xb0, 0x35, 0x6a, 0x8d, - 0xfd, 0x49, 0x18, 0x57, 0x99, 0xe2, 0x69, 0x8d, 0xa6, 0x05, 0xe9, 0x2d, 0xdf, 0xb3, 0x1d, 0xbe, - 0x85, 0xa3, 0x3b, 0x98, 0x1d, 0x43, 0xeb, 0x07, 0x6e, 0xab, 0x5a, 0xec, 0x91, 0xdd, 0x83, 0xce, - 0x8d, 0x90, 0x1b, 0xac, 0x32, 0x97, 0xc2, 0x9b, 0xe6, 0x99, 0x17, 0xbd, 0x80, 0xc1, 0x47, 0x41, - 0xe2, 0x42, 0x8b, 0x15, 0x32, 0x06, 0xed, 0x4c, 0x90, 0x70, 0x9e, 0x01, 0x77, 0x67, 0x1b, 0x0c, - 0xd5, 0xb5, 0x73, 0xec, 0x73, 0x7b, 0x8c, 0x5e, 0x01, 0x24, 0x44, 0xeb, 0x04, 0x45, 0x86, 0xfa, - 0x7f, 0x93, 0x45, 0xdf, 0x20, 0xb0, 0x5e, 0x1c, 0xcd, 0xfa, 0x12, 0x49, 0xb0, 0x87, 0xe0, 0x1b, - 0x12, 0xb4, 0x31, 0xf3, 0x54, 0x65, 0xe8, 0xfc, 0x3b, 0x1c, 0x4a, 0xd5, 0xb9, 0xca, 0x90, 0x3d, - 0x81, 0xde, 0xd2, 0xa5, 0x30, 0x61, 0xd3, 0xf5, 0xc3, 0x8f, 0x6f, 0xd3, 0xf2, 0x1d, 0x8b, 0xde, - 0xc1, 0x91, 0xed, 0x11, 0x47, 0xb3, 0x91, 0x34, 0x23, 0xa1, 0x89, 0x3d, 0x82, 0xf6, 0x92, 0x68, - 0x1d, 0x66, 0x23, 0x6f, 0xec, 0x4f, 0x0e, 0xe2, 0xfd, 0xbc, 0x49, 0x83, 0x3b, 0xf8, 0xa1, 0x0b, - 0xed, 0x15, 0x92, 0x88, 0xfe, 0x78, 0x10, 0xd8, 0x00, 0x17, 0x79, 0x91, 0x9b, 0x25, 0x66, 0x2c, - 0x84, 0x9e, 0xd9, 0xa4, 0x29, 0x1a, 0xe3, 0x8a, 0xea, 0xf3, 0x9d, 0x68, 0x49, 0x86, 0x24, 0x72, - 0x69, 0xaa, 0xab, 0xed, 0x44, 0x76, 0x02, 0x03, 0xd4, 0x5a, 0x69, 0x5b, 0x78, 0xd8, 0x72, 0x57, - 0xb9, 0x55, 0xb0, 0x21, 0xf4, 0x9d, 0x30, 0x23, 0x1d, 0xb6, 0x9d, 0x63, 0x2d, 0x5b, 0xcf, 0x54, - 0xa3, 0x20, 0xcc, 0xde, 0x53, 0xd8, 0x71, 0xf0, 0x56, 0x61, 0xa9, 0xb1, 0x57, 0x72, 0xb4, 0x5b, - 0xd2, 0x5a, 0xc1, 0x46, 0xe0, 0xa7, 0x6a, 0xb5, 0x96, 0x58, 0xf2, 0x9e, 0xe3, 0xfb, 0xaa, 0x68, - 0x06, 0x83, 0x73, 0x99, 0x63, 0x41, 0x97, 0x66, 0xc1, 0x4e, 0xa0, 0x45, 0xba, 0x9c, 0x94, 0x3f, - 0xe9, 0xef, 0x96, 0x2b, 0x69, 0x70, 0xab, 0x66, 0xa3, 0x6a, 0xf6, 0x4d, 0x87, 0x21, 0xae, 0xb7, - 0xc2, 0x76, 0xcc, 0x12, 0xdb, 0xb1, 0x2b, 0x95, 0x6d, 0xa3, 0x5f, 0x1e, 0x0c, 0xb8, 0x7b, 0x31, - 0x36, 0xea, 0x6b, 0x08, 0xb4, 0xeb, 0xfd, 0xdc, 0x15, 0x56, 0x85, 0x3f, 0x8e, 0xef, 0x0c, 0x25, - 0x69, 0x70, 0x5f, 0xef, 0xcd, 0xe8, 0x9f, 0xe9, 0xd8, 0x33, 0xe8, 0x5f, 0x57, 0x33, 0x71, 0x2d, - 0xb5, 0x93, 0xdc, 0x1f, 0x54, 0xd2, 0xe0, 0xb5, 0x41, 0x5d, 0xdb, 0x53, 0x08, 0xca, 0xd2, 0x66, - 0x6e, 0x91, 0xd8, 0x7d, 0xe8, 0x8a, 0x94, 0xf2, 0x9b, 0x72, 0x19, 0x3b, 0xbc, 0x92, 0x26, 0x0b, - 0x38, 0x2c, 0xed, 0xbe, 0xd8, 0x67, 0x9d, 0x2a, 0xc9, 0x1e, 0x43, 0x77, 0x5a, 0x2c, 0xc4, 0x02, - 0x19, 0xc4, 0x75, 0xcf, 0x86, 0x10, 0xd7, 0x37, 0x1d, 0x7b, 0xcf, 0x3d, 0x76, 0x0a, 0xdd, 0x5d, - 0xe4, 0xb8, 0xfc, 0x27, 0xe2, 0xdd, 0x3f, 0x11, 0x4f, 0xed, 0x3f, 0x31, 0x3c, 0x88, 0xf7, 0x0b, - 0xb8, 0xea, 0x3a, 0xfc, 0xf2, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x6e, 0x67, 0x4d, 0xd3, 0x64, - 0x04, 0x00, 0x00, + // 655 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x54, 0xed, 0x8a, 0xd3, 0x40, + 0x14, 0x6d, 0xd2, 0x36, 0x4d, 0x6f, 0xb2, 0x1f, 0x0c, 0x22, 0xa1, 0x2e, 0x58, 0xa2, 0x42, 0x41, + 0xc8, 0x6a, 0x55, 0x58, 0x04, 0x05, 0x5d, 0xbb, 0x44, 0x61, 0x41, 0xa6, 0xe2, 0xdf, 0x32, 0x9b, + 0x99, 0xb6, 0xd1, 0x69, 0xa6, 0xcc, 0x4c, 0x17, 0xfb, 0xcf, 0xd7, 0xf0, 0xa5, 0x7c, 0x00, 0x9f, + 0x46, 0x66, 0x92, 0x66, 0xc3, 0x0a, 0x5d, 0xff, 0xe5, 0x9e, 0x33, 0xf7, 0x63, 0xce, 0xb9, 0x19, + 0x08, 0xe5, 0xa6, 0x28, 0x98, 0x4c, 0xd6, 0x52, 0x68, 0x31, 0x78, 0xb0, 0x10, 0x62, 0xc1, 0xd9, + 0xa9, 0x8d, 0xae, 0x36, 0xf3, 0x53, 0xb6, 0x5a, 0xeb, 0x6d, 0x49, 0xc6, 0xbf, 0x1d, 0xe8, 0x7d, + 0x91, 0xdb, 0x73, 0xc2, 0x39, 0x1a, 0xc1, 0xf1, 0x4a, 0x50, 0xc6, 0xd5, 0x2c, 0x23, 0x9c, 0xcf, + 0xbe, 0x29, 0x51, 0x44, 0xce, 0xd0, 0x19, 0xf5, 0xf1, 0x61, 0x89, 0x9b, 0x53, 0x9f, 0x94, 0x28, + 0xd0, 0x10, 0x42, 0xc5, 0x85, 0x9e, 0x2d, 0x89, 0x5a, 0xce, 0x72, 0x1a, 0xb9, 0xf6, 0x14, 0x18, + 0x2c, 0x25, 0x6a, 0xf9, 0x91, 0xa2, 0x33, 0x00, 0xf6, 0x43, 0xb3, 0x42, 0xe5, 0xa2, 0x50, 0x51, + 0x7b, 0xd8, 0x1e, 0x05, 0xe3, 0x28, 0xa9, 0x3a, 0x25, 0x93, 0x9a, 0x9a, 0x14, 0x5a, 0x6e, 0x71, + 0xe3, 0xec, 0xe0, 0x0d, 0x1c, 0xdd, 0xa2, 0xd1, 0x31, 0xb4, 0xbf, 0xb3, 0x6d, 0x35, 0x8b, 0xf9, + 0x44, 0xf7, 0xa0, 0x7b, 0x4d, 0xf8, 0x86, 0x55, 0x9d, 0xcb, 0xe0, 0xb5, 0x7b, 0xe6, 0xc4, 0xcf, + 0xa1, 0xff, 0x81, 0x68, 0x72, 0x21, 0xc9, 0x8a, 0x21, 0x04, 0x1d, 0x4a, 0x34, 0xb1, 0x99, 0x21, + 0xb6, 0xdf, 0xa6, 0x18, 0x13, 0x73, 0x9b, 0xe8, 0x63, 0xf3, 0x19, 0xbf, 0x04, 0x48, 0xb5, 0x5e, + 0xa7, 0x8c, 0x50, 0x26, 0xff, 0xb7, 0x59, 0xfc, 0x15, 0x42, 0x93, 0x85, 0x99, 0x5a, 0x5f, 0x32, + 0x4d, 0xd0, 0x43, 0x08, 0x94, 0x26, 0x7a, 0xa3, 0x66, 0x99, 0xa0, 0xcc, 0xe6, 0x77, 0x31, 0x94, + 0xd0, 0xb9, 0xa0, 0x0c, 0x3d, 0x81, 0xde, 0xd2, 0xb6, 0x50, 0x91, 0x6b, 0xf5, 0x08, 0x92, 0x9b, + 0xb6, 0x78, 0xc7, 0xc5, 0x6f, 0xe1, 0xc8, 0x68, 0x84, 0x99, 0xda, 0x70, 0x3d, 0xd5, 0x44, 0x6a, + 0xf4, 0x08, 0x3a, 0x4b, 0xad, 0xd7, 0x11, 0x1d, 0x3a, 0xa3, 0x60, 0x7c, 0x90, 0x34, 0xfb, 0xa6, + 0x2d, 0x6c, 0xc9, 0xf7, 0x1e, 0x74, 0x56, 0x4c, 0x93, 0xf8, 0x8f, 0x03, 0xa1, 0x29, 0x70, 0x91, + 0x17, 0xb9, 0x5a, 0x32, 0x8a, 0x22, 0xe8, 0xa9, 0x4d, 0x96, 0x31, 0xa5, 0xec, 0x50, 0x3e, 0xde, + 0x85, 0x86, 0xa1, 0x4c, 0x93, 0x9c, 0xab, 0xea, 0x6a, 0xbb, 0x10, 0x9d, 0x40, 0x9f, 0x49, 0x29, + 0xa4, 0x19, 0x3c, 0x6a, 0xdb, 0xab, 0xdc, 0x00, 0x68, 0x00, 0xbe, 0x0d, 0xa6, 0x5a, 0x46, 0x1d, + 0x9b, 0x58, 0xc7, 0x26, 0x33, 0x93, 0x8c, 0x68, 0x46, 0xdf, 0xe9, 0xa8, 0x6b, 0xc9, 0x1b, 0xc0, + 0xb0, 0xca, 0x5c, 0xc9, 0xb2, 0x5e, 0xc9, 0xd6, 0x00, 0x1a, 0x42, 0x90, 0x89, 0xd5, 0x9a, 0xb3, + 0x92, 0xef, 0x59, 0xbe, 0x09, 0xc5, 0x53, 0xe8, 0x9f, 0xf3, 0x9c, 0x15, 0xfa, 0x52, 0x2d, 0xd0, + 0x09, 0xb4, 0xb5, 0x2c, 0x9d, 0x0a, 0xc6, 0xfe, 0x6e, 0xb9, 0xd2, 0x16, 0x36, 0x30, 0x1a, 0x56, + 0xde, 0xbb, 0x96, 0x86, 0xa4, 0xde, 0x0a, 0xa3, 0x98, 0x61, 0x8c, 0x62, 0x57, 0x82, 0x6e, 0xe3, + 0x5f, 0x0e, 0xf4, 0xb1, 0xfd, 0x63, 0x4c, 0xd5, 0x57, 0x10, 0x4a, 0xab, 0xfd, 0xcc, 0x0e, 0x56, + 0x95, 0x3f, 0x4e, 0x6e, 0x99, 0x92, 0xb6, 0x70, 0x20, 0x1b, 0x1e, 0xdd, 0xd9, 0x0e, 0x3d, 0x05, + 0x7f, 0x5e, 0x79, 0x62, 0x25, 0x35, 0x4e, 0x36, 0x8d, 0x4a, 0x5b, 0xb8, 0x3e, 0x50, 0xcf, 0xf6, + 0xd3, 0x85, 0xb0, 0x9c, 0x6d, 0x6a, 0x37, 0x09, 0xdd, 0x07, 0x8f, 0x64, 0x3a, 0xbf, 0x2e, 0xb7, + 0xb1, 0x8b, 0xab, 0xc8, 0xe0, 0x73, 0x92, 0xf3, 0xaa, 0xb6, 0x8f, 0xab, 0x08, 0x1d, 0x82, 0x9b, + 0xd3, 0xca, 0x25, 0x37, 0xa7, 0x4d, 0xcf, 0xbb, 0x7b, 0x3c, 0xf7, 0xf6, 0x79, 0xde, 0xdb, 0xe7, + 0xb9, 0xbf, 0xd7, 0xf3, 0xfe, 0x1d, 0x9e, 0xc3, 0x3f, 0x9e, 0x8f, 0x17, 0x70, 0x58, 0x2a, 0xf0, + 0xd9, 0xbc, 0x58, 0x99, 0xe0, 0xe8, 0x31, 0x78, 0x93, 0x62, 0x41, 0x16, 0x0c, 0x41, 0x52, 0xaf, + 0xc3, 0x00, 0x92, 0xda, 0xc4, 0x91, 0xf3, 0xcc, 0x41, 0xa7, 0xe0, 0xed, 0x34, 0x4b, 0xca, 0x27, + 0x30, 0xd9, 0x3d, 0x81, 0xc9, 0xc4, 0x3c, 0x81, 0x83, 0x83, 0xa4, 0x29, 0xed, 0x95, 0x67, 0xe9, + 0x17, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xaa, 0xe9, 0xb8, 0x88, 0x3f, 0x05, 0x00, 0x00, } diff --git a/api/agent/grpc/runner.proto b/api/agent/grpc/runner.proto index feb0c017b..027213b07 100644 --- a/api/agent/grpc/runner.proto +++ b/api/agent/grpc/runner.proto @@ -64,6 +64,14 @@ message RunnerMsg { message RunnerStatus { int32 active = 2; // Number of currently inflight responses + bool failed = 3; // if status was successful or not + string id = 4; // call id if status image was used + string details = 5; // details for logging/debug + int32 errorCode = 6; // error code if not successful + string errorStr = 7; // error description if not successful + string createdAt = 8; // call latency details: initialization time + string startedAt = 9; // call latency details: start time in container + string completedAt = 10; // call latency details: end time } service RunnerProtocol { diff --git a/api/agent/lb_agent_test.go b/api/agent/lb_agent_test.go index 4c11b5d17..fc0319c99 100644 --- a/api/agent/lb_agent_test.go +++ b/api/agent/lb_agent_test.go @@ -89,6 +89,10 @@ func (r *mockRunner) decrCalls() { r.curCalls-- } +func (r *mockRunner) Status(ctx context.Context) (*pool.RunnerStatus, error) { + return nil, nil +} + func (r *mockRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, error) { err := r.checkAndIncrCalls() if err != nil { diff --git a/api/agent/pure_runner.go b/api/agent/pure_runner.go index a867c07e4..0be58c47d 100644 --- a/api/agent/pure_runner.go +++ b/api/agent/pure_runner.go @@ -13,12 +13,15 @@ import ( "io/ioutil" "net" "net/http" + "net/http/httptest" + "strings" "sync" "sync/atomic" "time" "github.com/fnproject/fn/api/agent/grpc" "github.com/fnproject/fn/api/common" + "github.com/fnproject/fn/api/id" "github.com/fnproject/fn/api/models" "github.com/fnproject/fn/fnext" "github.com/fnproject/fn/grpcutil" @@ -485,13 +488,43 @@ func (ch *callHandle) getDataMsg() *runner.DataFrame { return msg } +const ( + // Here we give 5 seconds of timeout inside the container. We hardcode these numbers here to + // ensure we control idle timeout & timeout as well as how long should cache be valid. + // A cache duration of idleTimeout + 500 msecs allows us to reuse the cache, for about 1.5 secs, + // and during this time, since we allow no queries to go through, the hot container times out. + // + // For now, status tests a single case: a new hot container is spawned when cache is expired + // and when a query is allowed to run. + // TODO: we might want to mix this up and perhaps allow that hot container to handle + // more than one query to test both 'new hot container' and 'old hot container' cases. + StatusCallTimeout = int32(5) + StatusCallIdleTimeout = int32(1) + StatusCallCacheDuration = time.Duration(500)*time.Millisecond + time.Duration(StatusCallIdleTimeout)*time.Second +) + +// statusTracker maintains cache data/state/locks for Status Call invocations. +type statusTracker struct { + inflight int32 + imageName string + + // lock protects expiry/cache/wait fields below. RunnerStatus ptr itself + // stored every time status image is executed. Cache fetches use a shallow + // copy of RunnerStatus to ensure consistency. Shallow copy is sufficient + // since we set/save contents of RunnerStatus once. + lock sync.Mutex + expiry time.Time + cache *runner.RunnerStatus + wait chan struct{} +} + // pureRunner implements Agent and delegates execution of functions to an internal Agent; basically it wraps around it // and provides the gRPC server that implements the LB <-> Runner protocol. type pureRunner struct { gRPCServer *grpc.Server creds credentials.TransportCredentials a Agent - inflight int32 + status statusTracker } // implements Agent @@ -540,6 +573,14 @@ func (pr *pureRunner) handleTryCall(tc *runner.TryCall, state *callHandle) error return err } + // Status image is reserved for internal Status checks. + // We need to make sure normal functions calls cannot call it. + if pr.status.imageName != "" && c.Image == pr.status.imageName { + err = models.ErrRoutesInvalidImage + state.enqueueCallResponse(err) + return err + } + // IMPORTANT: We clear/initialize these dates as start/created/completed dates from // unmarshalled Model from LB-agent represent unrelated time-line events. // From this point, CreatedAt/StartedAt/CompletedAt are based on our local clock. @@ -578,8 +619,8 @@ func (pr *pureRunner) Engage(engagement runner.RunnerProtocol_EngageServer) erro ctx := engagement.Context() log := common.Logger(ctx) // Keep lightweight tabs on what this runner is doing: for draindown tests - atomic.AddInt32(&pr.inflight, 1) - defer atomic.AddInt32(&pr.inflight, -1) + atomic.AddInt32(&pr.status.inflight, 1) + defer atomic.AddInt32(&pr.status.inflight, -1) pv, ok := peer.FromContext(ctx) log.Debug("Starting engagement") @@ -626,11 +667,176 @@ DataLoop: return state.waitError() } +// Runs a status call using status image with baked in parameters. +func (pr *pureRunner) runStatusCall(ctx context.Context, timeout, idleTimeout int32) *runner.RunnerStatus { + + result := &runner.RunnerStatus{} + log := common.Logger(ctx) + start := time.Now() + + // construct call + var c models.Call + + // Most of these arguments are baked in. We might want to make this + // more configurable. + c.ID = id.New().String() + c.Path = "/" + c.Image = pr.status.imageName + c.Type = "sync" + c.Format = "json" + c.TmpFsSize = 0 + c.Memory = 0 + c.CPUs = models.MilliCPUs(0) + c.URL = "/" + c.Method = "GET" + c.CreatedAt = common.DateTime(start) + c.Config = make(models.Config) + c.Config["FN_FORMAT"] = c.Format + c.Payload = "{}" + c.Timeout = timeout + c.IdleTimeout = idleTimeout + + // TODO: reliably shutdown this container after executing one request. + + log.Debugf("Running status call with id=%v timeout=%v image=%v", c.ID, c.Timeout, c.Image) + + recorder := httptest.NewRecorder() + player := ioutil.NopCloser(strings.NewReader(c.Payload)) + + agent_call, err := pr.a.GetCall(FromModelAndInput(&c, player), + WithWriter(recorder), + WithContext(ctx), + ) + + if err == nil { + var mcall *call + mcall = agent_call.(*call) + err = pr.a.Submit(mcall) + } + + resp := recorder.Result() + + if err != nil { + result.ErrorCode = int32(models.GetAPIErrorCode(err)) + result.ErrorStr = err.Error() + result.Failed = true + } else if resp.StatusCode >= http.StatusBadRequest { + result.ErrorCode = int32(resp.StatusCode) + result.Failed = true + } + + // These timestamps are related. To avoid confusion + // and for robustness, nested if stmts below. + if !time.Time(c.CreatedAt).IsZero() { + result.CreatedAt = c.CreatedAt.String() + + if !time.Time(c.StartedAt).IsZero() { + result.StartedAt = c.StartedAt.String() + + if !time.Time(c.CompletedAt).IsZero() { + result.CompletedAt = c.CompletedAt.String() + } else { + // IMPORTANT: We punch this in ourselves. + // This is because call.End() is executed asynchronously. + result.CompletedAt = common.DateTime(time.Now()).String() + } + } + } + + // Status images should not output excessive data since we echo the + // data back to caller. + body, _ := ioutil.ReadAll(resp.Body) + resp.Body.Close() + + // Clamp the log output to 256 bytes if output is too large for logging. + dLen := len(body) + if dLen > 256 { + dLen = 256 + } + log.Debugf("Status call with id=%v result=%+v body[0:%v]=%v", c.ID, result, dLen, string(body[:dLen])) + + result.Details = string(body) + result.Id = c.ID + return result +} + +// Handles a status call concurrency and caching. +func (pr *pureRunner) handleStatusCall(ctx context.Context) (*runner.RunnerStatus, error) { + var myChan chan struct{} + + isWaiter := false + isCached := false + now := time.Now() + + pr.status.lock.Lock() + + if now.Before(pr.status.expiry) { + // cache is still valid. + isCached = true + } else if pr.status.wait != nil { + // A wait channel is already installed, we must wait + isWaiter = true + myChan = pr.status.wait + } else { + // Wait channel is not present, we install a new one. + myChan = make(chan struct{}) + pr.status.wait = myChan + } + + pr.status.lock.Unlock() + + // We either need to wait and/or serve the request from cache + if isWaiter || isCached { + if isWaiter { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-myChan: + } + } + + var cacheObj runner.RunnerStatus + + // A shallow copy is sufficient here, as we do not modify nested data in + // RunnerStatus in any way. + pr.status.lock.Lock() + + cacheObj = *pr.status.cache + + pr.status.lock.Unlock() + + cacheObj.Active = atomic.LoadInt32(&pr.status.inflight) + return &cacheObj, nil + } + + cachePtr := pr.runStatusCall(ctx, StatusCallTimeout, StatusCallIdleTimeout) + cachePtr.Active = atomic.LoadInt32(&pr.status.inflight) + now = time.Now() + + // Pointer store of 'cachePtr' is sufficient here as isWaiter/isCached above perform a shallow + // copy of 'cache' + pr.status.lock.Lock() + + pr.status.cache = cachePtr + pr.status.expiry = now.Add(StatusCallCacheDuration) + pr.status.wait = nil + + pr.status.lock.Unlock() + + // signal waiters + close(myChan) + return cachePtr, nil +} + // implements RunnerProtocolServer func (pr *pureRunner) Status(ctx context.Context, _ *empty.Empty) (*runner.RunnerStatus, error) { - return &runner.RunnerStatus{ - Active: atomic.LoadInt32(&pr.inflight), - }, nil + // Status using image name is disabled. We return inflight request count only + if pr.status.imageName == "" { + return &runner.RunnerStatus{ + Active: atomic.LoadInt32(&pr.status.inflight), + }, nil + } + return pr.handleStatusCall(ctx) } func DefaultPureRunner(cancel context.CancelFunc, addr string, da CallHandler, cert string, key string, ca string) (Agent, error) { @@ -668,6 +874,20 @@ func PureRunnerWithAgent(a Agent) PureRunnerOption { } } +// PureRunnerWithStatusImage returns a PureRunnerOption that annotates a PureRunner with a +// statusImageName attribute. This attribute names an image name to use for the status checks. +// Optionally, the status image can be pre-loaded into docker using FN_DOCKER_LOAD_FILE to avoid +// docker pull during status checks. +func PureRunnerWithStatusImage(imgName string) PureRunnerOption { + return func(pr *pureRunner) error { + if pr.status.imageName != "" { + return fmt.Errorf("Duplicate status image configuration old=%s new=%s", pr.status.imageName, imgName) + } + pr.status.imageName = imgName + return nil + } +} + func NewPureRunner(cancel context.CancelFunc, addr string, options ...PureRunnerOption) (Agent, error) { pr := &pureRunner{} diff --git a/api/agent/runner_client.go b/api/agent/runner_client.go index 1d8d9804e..801f9340b 100644 --- a/api/agent/runner_client.go +++ b/api/agent/runner_client.go @@ -18,6 +18,8 @@ import ( "github.com/fnproject/fn/api/models" pool "github.com/fnproject/fn/api/runnerpool" "github.com/fnproject/fn/grpcutil" + + pb_empty "github.com/golang/protobuf/ptypes/empty" "github.com/sirupsen/logrus" ) @@ -103,6 +105,44 @@ func isTooBusy(err error) bool { return false } +// Translate runner.RunnerStatus to runnerpool.RunnerStatus +func TranslateGRPCStatusToRunnerStatus(status *pb.RunnerStatus) *pool.RunnerStatus { + if status == nil { + return nil + } + + creat, _ := common.ParseDateTime(status.CreatedAt) + start, _ := common.ParseDateTime(status.StartedAt) + compl, _ := common.ParseDateTime(status.CompletedAt) + + return &pool.RunnerStatus{ + ActiveRequestCount: status.Active, + StatusFailed: status.Failed, + StatusId: status.Id, + Details: status.Details, + ErrorCode: status.ErrorCode, + ErrorStr: status.ErrorStr, + CreatedAt: creat, + StartedAt: start, + CompletedAt: compl, + } +} + +// implements Runner +func (r *gRPCRunner) Status(ctx context.Context) (*pool.RunnerStatus, error) { + log := common.Logger(ctx).WithField("runner_addr", r.address) + rid := common.RequestIDFromContext(ctx) + if rid != "" { + // Create a new gRPC metadata where we store the request ID + mp := metadata.Pairs(common.RequestIDContextKey, rid) + ctx = metadata.NewOutgoingContext(ctx, mp) + } + + status, err := r.client.Status(ctx, &pb_empty.Empty{}) + log.WithError(err).Debugf("Status Call %+v", status) + return TranslateGRPCStatusToRunnerStatus(status), err +} + // implements Runner func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, error) { log := common.Logger(ctx).WithField("runner_addr", r.address) diff --git a/api/agent/static_pool_test.go b/api/agent/static_pool_test.go index 07c179424..bb6f1e01c 100644 --- a/api/agent/static_pool_test.go +++ b/api/agent/static_pool_test.go @@ -24,6 +24,10 @@ func (r *mockStaticRunner) TryExec(ctx context.Context, call pool.RunnerCall) (b return true, nil } +func (r *mockStaticRunner) Status(ctx context.Context) (*pool.RunnerStatus, error) { + return nil, nil +} + func (r *mockStaticRunner) Close(context.Context) error { return ErrorGarbanzoBeans } diff --git a/api/models/error.go b/api/models/error.go index e6d1ea765..8036c6607 100644 --- a/api/models/error.go +++ b/api/models/error.go @@ -109,6 +109,10 @@ var ( code: http.StatusBadRequest, error: errors.New("Missing route Image"), } + ErrRoutesInvalidImage = err{ + code: http.StatusBadRequest, + error: errors.New("Invalid route Image"), + } ErrRoutesMissingName = err{ code: http.StatusBadRequest, error: errors.New("Missing route Name"), diff --git a/api/runnerpool/runner_pool.go b/api/runnerpool/runner_pool.go index e09000493..c697315da 100644 --- a/api/runnerpool/runner_pool.go +++ b/api/runnerpool/runner_pool.go @@ -5,6 +5,7 @@ import ( "io" "net/http" + "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/models" ) @@ -31,9 +32,23 @@ type PKIData struct { // MTLSRunnerFactory represents a factory method for constructing runners using mTLS type MTLSRunnerFactory func(addr, certCommonName string, pki *PKIData) (Runner, error) +// RunnerStatus is general information on Runner health as returned by Runner::Status() call +type RunnerStatus struct { + ActiveRequestCount int32 // Number of active running requests on Runner + StatusFailed bool // True if Status execution failed + StatusId string // Call ID for Status + Details string // General/Debug Log information + ErrorCode int32 // If StatusFailed, then error code is set + ErrorStr string // Error details if StatusFailed and ErrorCode is set + CreatedAt common.DateTime // Status creation date at Runner + StartedAt common.DateTime // Status execution date at Runner + CompletedAt common.DateTime // Status completion date at Runner +} + // Runner is the interface to invoke the execution of a function call on a specific runner type Runner interface { TryExec(ctx context.Context, call RunnerCall) (bool, error) + Status(ctx context.Context) (*RunnerStatus, error) Close(ctx context.Context) error Address() string } diff --git a/docs/operating/options.md b/docs/operating/options.md index 651691ab2..9a049e07b 100644 --- a/docs/operating/options.md +++ b/docs/operating/options.md @@ -40,6 +40,7 @@ docker run -e VAR_NAME=VALUE ... | `FN_MAX_FS_SIZE_MB` | Set this option in MB to pass a `size` option to Docker storage driver. This limits the file system size for all containers on the system. See [Docker storage driver options per container](https://docs.docker.com/engine/reference/commandline/run/#set-storage-driver-options-per-container) documentation for details. | None | | `FN_DOCKER_NETWORKS` | Set this option with a list of docker networks for function containers to use. If unset, default docker network is used. | None | | `FN_DISABLE_READONLY_ROOTFS` | Set this option to enable writable root filesystem. By default root filesystem is mounted read-only. | None | +| `FN_DOCKER_LOAD_FILE` | Set this option with an absolute path to a tarball to load a set of docker images during fn server startup. The tarball can be generated using [docker save](https://docs.docker.com/engine/reference/commandline/save/). | None | ## Starting without Docker in Docker diff --git a/images/fn-status-checker/.gitignore b/images/fn-status-checker/.gitignore new file mode 100644 index 000000000..22d0d82f8 --- /dev/null +++ b/images/fn-status-checker/.gitignore @@ -0,0 +1 @@ +vendor diff --git a/images/fn-status-checker/Dockerfile b/images/fn-status-checker/Dockerfile new file mode 100644 index 000000000..eb272fd77 --- /dev/null +++ b/images/fn-status-checker/Dockerfile @@ -0,0 +1,15 @@ +# build stage +FROM golang:1.10-alpine AS build-env +RUN apk --no-cache add git +ENV D=/go/src/github.com/fnproject/fn/images/fn-status-checker +RUN go get -u github.com/golang/dep/cmd/dep +ADD Gopkg.* $D/ +RUN cd $D && dep ensure --vendor-only +ADD . $D +RUN cd $D && go build -ldflags="-s -w" -o fn-status-checker && cp fn-status-checker /tmp/ + +# final stage +FROM alpine +WORKDIR /function +COPY --from=build-env /tmp/fn-status-checker /function +ENTRYPOINT ["./fn-status-checker"] diff --git a/images/fn-status-checker/Gopkg.lock b/images/fn-status-checker/Gopkg.lock new file mode 100644 index 000000000..1e5cae97b --- /dev/null +++ b/images/fn-status-checker/Gopkg.lock @@ -0,0 +1,18 @@ +# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. + + +[[projects]] + branch = "master" + name = "github.com/fnproject/fdk-go" + packages = [ + ".", + "utils" + ] + revision = "5d768b2006f11737b6a69a758ddd6d2fac04923e" + +[solve-meta] + analyzer-name = "dep" + analyzer-version = 1 + inputs-digest = "c55f0d3da5ec2e9e5c9a7c563702e4cf28513fa1aaea1c18664ca2cb7d726f89" + solver-name = "gps-cdcl" + solver-version = 1 diff --git a/images/fn-status-checker/Gopkg.toml b/images/fn-status-checker/Gopkg.toml new file mode 100644 index 000000000..bd8f167c5 --- /dev/null +++ b/images/fn-status-checker/Gopkg.toml @@ -0,0 +1,3 @@ +[[constraint]] + branch = "master" + name = "github.com/fnproject/fdk-go" diff --git a/images/fn-status-checker/build.sh b/images/fn-status-checker/build.sh new file mode 100755 index 000000000..d6ed0eb89 --- /dev/null +++ b/images/fn-status-checker/build.sh @@ -0,0 +1,2 @@ +set -e +docker build --build-arg HTTPS_PROXY --build-arg HTTP_PROXY -t fnproject/fn-status-checker:latest . diff --git a/images/fn-status-checker/release.sh b/images/fn-status-checker/release.sh new file mode 100755 index 000000000..d334fd016 --- /dev/null +++ b/images/fn-status-checker/release.sh @@ -0,0 +1,2 @@ +set -e +docker push fnproject/fn-status-checker:latest diff --git a/images/fn-status-checker/status.go b/images/fn-status-checker/status.go new file mode 100644 index 000000000..b1c940a15 --- /dev/null +++ b/images/fn-status-checker/status.go @@ -0,0 +1,54 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "io" + "io/ioutil" + "log" + + fdk "github.com/fnproject/fdk-go" +) + +func main() { + fdk.Handle(fdk.HandlerFunc(myHandler)) +} + +func myHandler(ctx context.Context, in io.Reader, out io.Writer) { + var input map[string]interface{} + + body, err := ioutil.ReadAll(in) + if err != nil { + log.Print("could not read input") + fdk.WriteStatus(out, 530) + return + } + + err = json.Unmarshal(body, &input) + if err != nil { + log.Print("could not unmarshal json input") + fdk.WriteStatus(out, 531) + return + } + + output, err := json.Marshal(&input) + if err != nil { + log.Print("could not marshal json output") + fdk.WriteStatus(out, 532) + return + } + + written, err := io.Copy(out, bytes.NewReader(output)) + if err != nil { + log.Print("could not write output") + fdk.WriteStatus(out, 533) + return + } + + if written != int64(len(output)) { + log.Print("partial write of output") + fdk.WriteStatus(out, 534) + return + } +} diff --git a/release.sh b/release.sh index 4cca4edbb..6b15a821c 100755 --- a/release.sh +++ b/release.sh @@ -48,6 +48,6 @@ docker tag $user/$image:latest $user/$image_deprecated:latest docker push $user/$image_deprecated:$version docker push $user/$image_deprecated:latest -# release test utils docker image (cd images/fn-test-utils && ./release.sh) +(cd images/fn-status-checker && ./release.sh) diff --git a/test/fn-system-tests/exec_runner_status_test.go b/test/fn-system-tests/exec_runner_status_test.go new file mode 100644 index 000000000..2e6ee61c7 --- /dev/null +++ b/test/fn-system-tests/exec_runner_status_test.go @@ -0,0 +1,144 @@ +package tests + +import ( + "bytes" + "context" + "io" + "net/http" + "net/url" + "path" + "testing" + "time" + + "github.com/fnproject/fn/api/models" + "github.com/fnproject/fn/api/runnerpool" +) + +// We should not be able to invoke a StatusImage +func TestCannotExecuteStatusImage(t *testing.T) { + if StatusImage == "" { + t.Skip("no status image defined") + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + rt := &models.Route{ + Path: routeName + "yogurt", + Image: StatusImage, + Format: format, + Memory: memory, + Type: typ, + } + + rt = ensureRoute(t, rt) + + lb, err := LB() + if err != nil { + t.Fatalf("Got unexpected error: %v", err) + } + u := url.URL{ + Scheme: "http", + Host: lb, + } + u.Path = path.Join(u.Path, "r", appName, rt.Path) + + content := bytes.NewBuffer([]byte(`status`)) + output := &bytes.Buffer{} + + resp, err := callFN(ctx, u.String(), content, output, "POST") + if err != nil { + t.Fatalf("Got unexpected error: %v", err) + } + + if resp.StatusCode != http.StatusBadRequest { + t.Fatalf("StatusCode check failed on %v", resp.StatusCode) + } +} + +// Some dummy RunnerCall implementation +type myCall struct{} + +// implements RunnerCall +func (c *myCall) SlotHashId() string { return "" } +func (c *myCall) Extensions() map[string]string { return nil } +func (c *myCall) RequestBody() io.ReadCloser { return nil } +func (c *myCall) ResponseWriter() http.ResponseWriter { return nil } +func (c *myCall) StdErr() io.ReadWriteCloser { return nil } +func (c *myCall) Model() *models.Call { return nil } + +func TestExecuteRunnerStatus(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + var zoo myCall + + pool, err := NewSystemTestNodePool() + if err != nil { + t.Fatalf("Creating Node Pool failed %v", err) + } + + runners, err := pool.Runners(&zoo) + if err != nil { + t.Fatalf("Getting Runners from Pool failed %v", err) + } + if len(runners) == 0 { + t.Fatalf("Getting Runners from Pool failed no-runners") + } + + concurrency := 10 + res := make(chan *runnerpool.RunnerStatus, concurrency*len(runners)) + + for _, runner := range runners { + for i := 0; i < concurrency; i++ { + go func(dest runnerpool.Runner) { + status, err := dest.Status(ctx) + if err != nil { + t.Fatalf("Runners Status failed for %v err=%v", dest.Address(), err) + } + if status == nil || status.StatusFailed { + t.Fatalf("Runners Status not OK for %v %v", dest.Address(), status) + } + t.Logf("Runner %v got Status=%+v", dest.Address(), status) + res <- status + }(runner) + } + } + + lookup := make(map[string][]*runnerpool.RunnerStatus) + + for i := 0; i < concurrency*len(runners); i++ { + status := <-res + lookup[status.StatusId] = append(lookup[status.StatusId], status) + } + + // WARNING: Possibly flappy test below. Might need to relax the numbers below. + // Why 3? We have a idleTimeout + gracePeriod = 1.5 secs (for cache timeout) for status calls. + // This normally should easily serve all the queries above. (We have 3 runners, each should + // easily take on 10 status calls for that period. + if len(lookup) > 3 { + for key, arr := range lookup { + t.Fatalf("key=%v count=%v", key, len(arr)) + } + } + + // delay + time.Sleep(time.Duration(2 * time.Second)) + + // now we should get fresh data + for _, dest := range runners { + status, err := dest.Status(ctx) + if err != nil { + t.Fatalf("Runners Status failed for %v err=%v", dest.Address(), err) + } + if status == nil || status.StatusFailed { + t.Fatalf("Runners Status not OK for %v %v", dest.Address(), status) + } + t.Logf("Runner %v got Status=%+v", dest.Address(), status) + _, ok := lookup[status.StatusId] + if ok { + t.Fatalf("Runners Status did not return fresh status id %v %v", dest.Address(), status) + } + } + +} diff --git a/test/fn-system-tests/system_test.go b/test/fn-system-tests/system_test.go index f811bc2a8..5538c421a 100644 --- a/test/fn-system-tests/system_test.go +++ b/test/fn-system-tests/system_test.go @@ -31,7 +31,8 @@ import ( ) const ( - LBAddress = "http://127.0.0.1:8081" + LBAddress = "http://127.0.0.1:8081" + StatusImage = "fnproject/fn-status-checker:latest" ) func LB() (string, error) { @@ -276,7 +277,10 @@ func SetUpPureRunnerNode(ctx context.Context, nodeNum int) (*server.Server, erro cancelCtx, cancel := context.WithCancel(ctx) // now create pure-runner that wraps agent. - pureRunner, err := agent.NewPureRunner(cancel, grpcAddr, agent.PureRunnerWithAgent(innerAgent)) + pureRunner, err := agent.NewPureRunner(cancel, grpcAddr, + agent.PureRunnerWithAgent(innerAgent), + agent.PureRunnerWithStatusImage(StatusImage), + ) if err != nil { return nil, err }