fn: cookie and driver api changes (#1312)

Now obsoleted driver.PrepareCookie() call handled image and
container creation. In agent, going forward we will need finer
grained control over the timeouts implied by the contexts.
For this reason, with this change, we split PrepareCookie()
into Validate/Pull/Create calls under Cookie interface.
This commit is contained in:
Tolga Ceylan
2018-11-14 16:51:05 -08:00
committed by GitHub
parent 65f3f915be
commit fe2b9fb53d
7 changed files with 239 additions and 149 deletions

View File

@@ -782,7 +782,21 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
// WARNING: we wait forever.
defer cookie.Close(common.BackgroundContext(ctx))
err = a.driver.PrepareCookie(ctx, cookie)
shouldPull, err := cookie.ValidateImage(ctx)
if err != nil {
call.slots.queueSlot(&hotSlot{done: make(chan struct{}), fatalErr: err})
return
}
if shouldPull {
err = cookie.PullImage(ctx)
if err != nil {
call.slots.queueSlot(&hotSlot{done: make(chan struct{}), fatalErr: err})
return
}
}
err = cookie.CreateContainer(ctx)
if err != nil {
call.slots.queueSlot(&hotSlot{done: make(chan struct{}), fatalErr: err})
return

View File

@@ -2,13 +2,19 @@ package docker
import (
"context"
"encoding/json"
"fmt"
"net/http"
"path"
"strings"
"github.com/fnproject/fn/api/agent/drivers"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models"
"github.com/fsouza/go-dockerclient"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
// A cookie identifies a unique request to run a task.
@@ -27,6 +33,11 @@ type cookie struct {
// do we need to remove container at exit?
isCreated bool
imgReg string
imgRepo string
imgTag string
imgAuthConf *docker.AuthConfiguration
}
func (c *cookie) configureLogger(log logrus.FieldLogger) {
@@ -228,7 +239,7 @@ func (c *cookie) Freeze(ctx context.Context) error {
err := c.drv.docker.PauseContainer(c.task.Id(), ctx)
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{"call_id": c.task.Id()}).Error("error pausing container")
log.WithError(err).WithFields(logrus.Fields{"call_id": c.task.Id()}).Error("error pausing container")
}
return err
}
@@ -240,9 +251,110 @@ func (c *cookie) Unfreeze(ctx context.Context) error {
err := c.drv.docker.UnpauseContainer(c.task.Id(), ctx)
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{"call_id": c.task.Id()}).Error("error unpausing container")
log.WithError(err).WithFields(logrus.Fields{"call_id": c.task.Id()}).Error("error unpausing container")
}
return err
}
func (c *cookie) ValidateImage(ctx context.Context) (bool, error) {
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "ValidateImage"})
log.WithFields(logrus.Fields{"call_id": c.task.Id()}).Debug("docker auth and inspect image")
// ask for docker creds before looking for image, as the tasker may need to
// validate creds even if the image is downloaded.
config := findRegistryConfig(c.imgReg, c.drv.auths)
if task, ok := c.task.(Auther); ok {
_, span := trace.StartSpan(ctx, "docker_auth")
authConfig, err := task.DockerAuth()
span.End()
if err != nil {
return false, err
}
if authConfig != nil {
config = authConfig
}
}
c.imgAuthConf = config
// see if we already have it
_, err := c.drv.docker.InspectImage(ctx, c.task.Image())
if err == docker.ErrNoSuchImage {
return true, nil
}
return false, err
}
func (c *cookie) PullImage(ctx context.Context) error {
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "PullImage"})
cfg := c.imgAuthConf
if cfg == nil {
log.Fatal("invalid usage: call ValidateImage first")
}
repo := path.Join(c.imgReg, c.imgRepo)
log = common.Logger(ctx).WithFields(logrus.Fields{"registry": cfg.ServerAddress, "username": cfg.Username, "image": c.task.Image()})
log.WithFields(logrus.Fields{"call_id": c.task.Id()}).Debug("docker pull")
err := c.drv.docker.PullImage(docker.PullImageOptions{Repository: repo, Tag: c.imgTag, Context: ctx}, *cfg)
if err != nil {
log.WithError(err).Error("Failed to pull image")
// TODO need to inspect for hub or network errors and pick; for now, assume
// 500 if not a docker error
msg := err.Error()
code := http.StatusInternalServerError
if dErr, ok := err.(*docker.Error); ok {
msg = dockerMsg(dErr)
code = dErr.Status // 401/404
}
return models.NewAPIError(code, fmt.Errorf("Failed to pull image '%s': %s", c.task.Image(), msg))
}
return nil
}
func (c *cookie) CreateContainer(ctx context.Context) error {
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "CreateContainer"})
log.WithFields(logrus.Fields{"call_id": c.task.Id()}).Debug("docker create container")
// here let's assume we have created container, logically this should be after 'CreateContainer', but we
// are not 100% sure that *any* failure to CreateContainer does not ever leave a container around especially
// going through fsouza+docker-api.
c.isCreated = true
c.opts.Context = ctx
_, err := c.drv.docker.CreateContainer(c.opts)
if err != nil {
// since we retry under the hood, if the container gets created and retry fails, we can just ignore error
if err != docker.ErrContainerAlreadyExists {
log.WithError(err).Error("Could not create container")
// NOTE: if the container fails to create we don't really want to show to user since they aren't directly configuring the container
return err
}
}
return nil
}
// removes docker err formatting: 'API Error (code) {"message":"..."}'
func dockerMsg(derr *docker.Error) string {
// derr.Message is a JSON response from docker, which has a "message" field we want to extract if possible.
// this is pretty lame, but it is what it is
var v struct {
Msg string `json:"message"`
}
err := json.Unmarshal([]byte(derr.Message), &v)
if err != nil {
// If message was not valid JSON, the raw body is still better than nothing.
return derr.Message
}
return v.Msg
}
var _ drivers.Cookie = &cookie{}

View File

@@ -2,14 +2,12 @@ package docker
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"math"
"net/http"
"os"
"path"
"strings"
"sync"
"time"
@@ -154,6 +152,11 @@ func (drv *DockerDriver) Close() error {
return err
}
// Obsoleted.
func (drv *DockerDriver) PrepareCookie(ctx context.Context, cookie drivers.Cookie) error {
return nil
}
func (drv *DockerDriver) pickPool(ctx context.Context, c *cookie) {
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "tryUsePool"})
@@ -259,45 +262,11 @@ func (drv *DockerDriver) CreateCookie(ctx context.Context, task drivers.Containe
// Order is important, Hostname doesn't play well with Network config
cookie.configureHostname(log)
cookie.imgReg, cookie.imgRepo, cookie.imgTag = drivers.ParseImage(task.Image())
return cookie, nil
}
func (drv *DockerDriver) PrepareCookie(ctx context.Context, c drivers.Cookie) error {
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "PrepareCookie"})
cookie, ok := c.(*cookie)
if !ok {
return errors.New("unknown cookie implementation")
}
err := drv.ensureImage(ctx, cookie.task)
if err != nil {
return err
}
// here let's assume we have created container, logically this should be after 'CreateContainer', but we
// are not 100% sure that *any* failure to CreateContainer does not ever leave a container around especially
// going through fsouza+docker-api.
cookie.isCreated = true
cookie.opts.Context = ctx
_, err = drv.docker.CreateContainer(cookie.opts)
if err != nil {
// since we retry under the hood, if the container gets created and retry fails, we can just ignore error
if err != docker.ErrContainerAlreadyExists {
log.WithFields(logrus.Fields{"call_id": cookie.task.Id(), "command": cookie.opts.Config.Cmd, "memory": cookie.opts.Config.Memory,
"cpu_quota": cookie.task.CPUs(), "hostname": cookie.opts.Config.Hostname, "name": cookie.opts.Name,
"image": cookie.opts.Config.Image, "volumes": cookie.opts.Config.Volumes, "binds": cookie.opts.HostConfig.Binds, "container": cookie.opts.Name,
}).WithError(err).Error("Could not create container")
// NOTE: if the container fails to create we don't really want to show to user since they aren't directly configuring the container
return err
}
}
// discard removal error
return nil
}
func (drv *DockerDriver) removeContainer(ctx context.Context, container string) error {
err := drv.docker.RemoveContainer(docker.RemoveContainerOptions{
ID: container, Force: true, RemoveVolumes: true, Context: ctx})
@@ -308,78 +277,6 @@ func (drv *DockerDriver) removeContainer(ctx context.Context, container string)
return nil
}
func (drv *DockerDriver) ensureImage(ctx context.Context, task drivers.ContainerTask) error {
reg, repo, tag := drivers.ParseImage(task.Image())
// ask for docker creds before looking for image, as the tasker may need to
// validate creds even if the image is downloaded.
config := findRegistryConfig(reg, drv.auths)
if task, ok := task.(Auther); ok {
var err error
_, span := trace.StartSpan(ctx, "docker_auth")
authConfig, err := task.DockerAuth()
span.End()
if err != nil {
return err
}
if authConfig != nil {
config = authConfig
}
}
globalRepo := path.Join(reg, repo)
// see if we already have it, if not, pull it
_, err := drv.docker.InspectImage(ctx, task.Image())
if err == docker.ErrNoSuchImage {
err = drv.pullImage(ctx, task, *config, globalRepo, tag)
}
return err
}
func (drv *DockerDriver) pullImage(ctx context.Context, task drivers.ContainerTask, config docker.AuthConfiguration, globalRepo, tag string) error {
log := common.Logger(ctx)
log.WithFields(logrus.Fields{"registry": config.ServerAddress, "username": config.Username, "image": task.Image()}).Info("Pulling image")
err := drv.docker.PullImage(docker.PullImageOptions{Repository: globalRepo, Tag: tag, Context: ctx}, config)
if err != nil {
log.WithFields(logrus.Fields{"registry": config.ServerAddress, "username": config.Username, "image": task.Image()}).WithError(err).Error("Failed to pull image")
// TODO need to inspect for hub or network errors and pick; for now, assume
// 500 if not a docker error
msg := err.Error()
code := http.StatusInternalServerError
if dErr, ok := err.(*docker.Error); ok {
msg = dockerMsg(dErr)
code = dErr.Status // 401/404
}
return models.NewAPIError(code, fmt.Errorf("Failed to pull image '%s': %s", task.Image(), msg))
}
return nil
}
// removes docker err formatting: 'API Error (code) {"message":"..."}'
func dockerMsg(derr *docker.Error) string {
// derr.Message is a JSON response from docker, which has a "message" field we want to extract if possible.
// this is pretty lame, but it is what it is
var v struct {
Msg string `json:"message"`
}
err := json.Unmarshal([]byte(derr.Message), &v)
if err != nil {
// If message was not valid JSON, the raw body is still better than nothing.
return derr.Message
}
return v.Msg
}
// Run executes the docker container. If task runs, drivers.RunResult will be returned. If something fails outside the task (ie: Docker), it will return error.
// The docker driver will attempt to cast the task to a Auther. If that succeeds, private image support is available. See the Auther interface for how to implement this.
func (drv *DockerDriver) run(ctx context.Context, container string, task drivers.ContainerTask) (drivers.WaitResult, error) {

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"path"
"runtime"
"strings"
"sync"
@@ -138,7 +139,9 @@ func NewDockerPool(conf drivers.Config, driver *DockerDriver) DockerPool {
networks = append(networks, "")
}
pool.wg.Add(int(conf.PreForkPoolSize))
pool.wg.Add(1 + int(conf.PreForkPoolSize))
pullGate := make(chan struct{}, 1)
for i := 0; i < int(conf.PreForkPoolSize); i++ {
task := &poolTask{
@@ -148,9 +151,10 @@ func NewDockerPool(conf drivers.Config, driver *DockerDriver) DockerPool {
netMode: networks[i%len(networks)],
}
go pool.nannyContainer(ctx, driver, task)
go pool.nannyContainer(ctx, driver, task, pullGate)
}
go pool.prepareImage(ctx, driver, conf.PreForkImage, pullGate)
return pool
}
@@ -164,12 +168,6 @@ func (pool *dockerPool) performInitState(ctx context.Context, driver *DockerDriv
log := common.Logger(ctx).WithFields(logrus.Fields{"id": task.Id(), "net": task.netMode})
err := driver.ensureImage(ctx, task)
if err != nil {
log.WithError(err).Info("prefork pool image pull failed")
return
}
containerOpts := docker.CreateContainerOptions{
Name: task.Id(),
Config: &docker.Config{
@@ -202,7 +200,7 @@ func (pool *dockerPool) performInitState(ctx context.Context, driver *DockerDriv
// ignore failure here
driver.docker.RemoveContainer(removeOpts)
_, err = driver.docker.CreateContainer(containerOpts)
_, err := driver.docker.CreateContainer(containerOpts)
if err != nil {
log.WithError(err).Info("prefork pool container create failed")
return
@@ -262,10 +260,50 @@ func (pool *dockerPool) performTeardown(ctx context.Context, driver *DockerDrive
driver.docker.RemoveContainer(removeOpts)
}
func (pool *dockerPool) nannyContainer(ctx context.Context, driver *DockerDriver, task *poolTask) {
func (pool *dockerPool) prepareImage(ctx context.Context, driver *DockerDriver, img string, pullGate chan struct{}) {
defer pool.wg.Done()
defer close(pullGate)
log := common.Logger(ctx)
imgReg, imgRepo, imgTag := drivers.ParseImage(img)
opts := docker.PullImageOptions{Repository: path.Join(imgReg, imgRepo), Tag: imgTag, Context: ctx}
config := findRegistryConfig(imgReg, driver.auths)
for ctx.Err() != nil {
err := pool.limiter.Wait(ctx)
if err != nil {
// should not really happen unless ctx has a deadline or burst is 0.
log.WithError(err).Fatal("prefork pool rate limiter failed")
}
_, err = driver.docker.InspectImage(ctx, img)
if err == nil {
return
}
if err != docker.ErrNoSuchImage {
log.WithError(err).Fatal("prefork pool image inspect failed")
}
err = driver.docker.PullImage(opts, *config)
if err == nil {
return
}
log.WithError(err).Error("Failed to pull image")
}
}
func (pool *dockerPool) nannyContainer(ctx context.Context, driver *DockerDriver, task *poolTask, pullGate chan struct{}) {
defer pool.performTeardown(ctx, driver, task)
defer pool.wg.Done()
// wait for image pull
select {
case <-ctx.Done():
case <-pullGate:
}
log := common.Logger(ctx).WithFields(logrus.Fields{"id": task.Id(), "net": task.netMode})
// We spin forever, keeping the pool resident and running at all times.
@@ -275,7 +313,7 @@ func (pool *dockerPool) nannyContainer(ctx context.Context, driver *DockerDriver
err := pool.limiter.Wait(ctx)
if err != nil {
// should not really happen unless ctx has a deadline or burst is 0.
log.WithError(err).Info("prefork pool rate limiter failed")
log.WithError(err).Fatal("prefork pool rate limiter failed")
break
}
}

View File

@@ -55,9 +55,20 @@ func TestRunnerDocker(t *testing.T) {
defer cookie.Close(ctx)
err = dkr.PrepareCookie(ctx, cookie)
shouldPull, err := cookie.ValidateImage(ctx)
if err != nil {
t.Fatal("Couldn't prepare task test")
t.Fatal("Couldn't validate image test")
}
if shouldPull {
err = cookie.PullImage(ctx)
if err != nil {
t.Fatal("Couldn't pull image test")
}
}
err = cookie.CreateContainer(ctx)
if err != nil {
t.Fatal("Couldn't create container test")
}
waiter, err := cookie.Run(ctx)
@@ -95,11 +106,6 @@ func TestRunnerDockerNetworks(t *testing.T) {
defer cookie1.Close(ctx)
err = dkr.PrepareCookie(ctx, cookie1)
if err != nil {
t.Fatal("Couldn't prepare task1 test")
}
cookie2, err := dkr.CreateCookie(ctx, task2)
if err != nil {
t.Fatal("Couldn't create task2 cookie")
@@ -107,12 +113,6 @@ func TestRunnerDockerNetworks(t *testing.T) {
defer cookie2.Close(ctx)
err = dkr.PrepareCookie(ctx, cookie2)
if err != nil {
t.Fatal("Couldn't prepare task2 test")
}
defer cookie2.Close(ctx)
c1 := cookie1.(*cookie)
c2 := cookie2.(*cookie)
@@ -167,9 +167,19 @@ func TestRunnerDockerStdout(t *testing.T) {
defer cookie.Close(ctx)
err = dkr.PrepareCookie(ctx, cookie)
shouldPull, err := cookie.ValidateImage(ctx)
if err != nil {
t.Fatal("Couldn't prepare task test")
t.Fatal("Couldn't validate image test")
}
if shouldPull {
err = cookie.PullImage(ctx)
if err != nil {
t.Fatal("Couldn't pull image test")
}
}
err = cookie.CreateContainer(ctx)
if err != nil {
t.Fatal("Couldn't create container test")
}
waiter, err := cookie.Run(ctx)

View File

@@ -41,6 +41,16 @@ type Cookie interface {
// Unfreeze a frozen container to unpause frozen processes
Unfreeze(ctx context.Context) error
// Validate/Inspect and Authenticate image. Returns true if the image needs
// to be pulled and non-nil error if validation/auth/inspection fails.
ValidateImage(ctx context.Context) (bool, error)
// Pull the image.
PullImage(ctx context.Context) error
// Create container which can be Run() later
CreateContainer(ctx context.Context) error
// Fetch driver specific container configuration. Use this to
// access the container create options. If Driver.Prepare() is not
// yet called with the cookie, then this can be used to modify container
@@ -61,14 +71,7 @@ type Driver interface {
// Callers should Close the cookie regardless of whether they prepare or run it.
CreateCookie(ctx context.Context, task ContainerTask) (Cookie, error)
// PrepareCookie can be used in order to do any preparation that a specific driver
// may need to do before running the task, and can be useful to put
// preparation that the task can recover from into (i.e. if pulling an image
// fails because a registry is down, the task doesn't need to be failed). It
// returns a cookie that can be used to execute the task.
// Callers should Close the cookie regardless of whether they run it.
//
// The returned cookie should respect the task's timeout when it is run.
// Obsoleted. No-Op
PrepareCookie(ctx context.Context, cookie Cookie) error
// close & shutdown the driver

View File

@@ -43,9 +43,25 @@ func (c *cookie) Unfreeze(context.Context) error {
return nil
}
func (c *cookie) Close(context.Context) error { return nil }
func (c *cookie) ValidateImage(context.Context) (bool, error) {
return false, nil
}
func (c *cookie) ContainerOptions() interface{} { return nil }
func (c *cookie) PullImage(context.Context) error {
return nil
}
func (c *cookie) CreateContainer(context.Context) error {
return nil
}
func (c *cookie) Close(context.Context) error {
return nil
}
func (c *cookie) ContainerOptions() interface{} {
return nil
}
func (c *cookie) Run(ctx context.Context) (drivers.WaitResult, error) {
c.m.count++