mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
currently: * container ran out of memory (code 137) * container exited with other code != 0 * unable to pull image (auth/404) there may be others but this is a good start (the most common). notably, for both hot and cold these should bubble up (if deterministic, which hub isn't always), and these are useful for users to use in debugging why things aren't working. added tests to make sure that these behaviors are working. also changed the behavior such that when the container exits we return a 502 instead of a 503, just to be able to distinguish the fact that fn is working as expected but the container is acting funky (400 is weird here, so idk). removed references to old IsUserVisible crap and slightly changed the interface for RunResult for plumbing reasons (to get the error type, specifically). fixed an issue where if ~/.docker/config.json exists sometimes pulling images wouldn't work deterministically (should be more inline w/ expectations now) closes #275
466 lines
16 KiB
Go
466 lines
16 KiB
Go
package docker
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"path"
|
|
"strings"
|
|
|
|
"github.com/fnproject/fn/api/agent/drivers"
|
|
"github.com/fnproject/fn/api/common"
|
|
"github.com/fnproject/fn/api/models"
|
|
"github.com/fsouza/go-dockerclient"
|
|
"github.com/opentracing/opentracing-go"
|
|
"github.com/opentracing/opentracing-go/log"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// A drivers.ContainerTask should implement the Auther interface if it would
|
|
// like to use not-necessarily-public docker images for any or all task
|
|
// invocations.
|
|
type Auther interface {
|
|
// DockerAuth should return docker auth credentials that will authenticate
|
|
// against a docker registry for a given drivers.ContainerTask.Image(). An
|
|
// error may be returned which will cause the task not to be run, this can be
|
|
// useful for an implementer to do things like testing auth configurations
|
|
// before returning them; e.g. if the implementer would like to impose
|
|
// certain restrictions on images or if credentials must be acquired right
|
|
// before runtime and there's an error doing so. If these credentials don't
|
|
// work, the docker pull will fail and the task will be set to error status.
|
|
DockerAuth() (docker.AuthConfiguration, error)
|
|
}
|
|
|
|
type runResult struct {
|
|
err error
|
|
status string
|
|
}
|
|
|
|
func (r *runResult) Error() error { return r.err }
|
|
func (r *runResult) Status() string { return r.status }
|
|
|
|
type DockerDriver struct {
|
|
conf drivers.Config
|
|
docker dockerClient // retries on *docker.Client, restricts ad hoc *docker.Client usage / retries
|
|
hostname string
|
|
auths map[string]docker.AuthConfiguration
|
|
}
|
|
|
|
// implements drivers.Driver
|
|
func NewDocker(conf drivers.Config) *DockerDriver {
|
|
hostname, err := os.Hostname()
|
|
if err != nil {
|
|
logrus.WithError(err).Fatal("couldn't resolve hostname")
|
|
}
|
|
|
|
return &DockerDriver{
|
|
conf: conf,
|
|
docker: newClient(),
|
|
hostname: hostname,
|
|
auths: registryFromEnv(),
|
|
}
|
|
}
|
|
|
|
func registryFromEnv() map[string]docker.AuthConfiguration {
|
|
var auths *docker.AuthConfigurations
|
|
var err error
|
|
if reg := os.Getenv("DOCKER_AUTH"); reg != "" {
|
|
// TODO docker does not use this itself, we should get rid of env docker config (nor is this documented..)
|
|
auths, err = docker.NewAuthConfigurations(strings.NewReader(reg))
|
|
} else {
|
|
auths, err = docker.NewAuthConfigurationsFromDockerCfg()
|
|
}
|
|
|
|
if err != nil {
|
|
logrus.WithError(err).Info("no docker auths from config files found (this is fine)")
|
|
return nil
|
|
}
|
|
return auths.Configs
|
|
}
|
|
|
|
func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask) (drivers.Cookie, error) {
|
|
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "Prepare"})
|
|
var cmd []string
|
|
if task.Command() != "" {
|
|
// NOTE: this is hyper-sensitive and may not be correct like this even, but it passes old tests
|
|
cmd = strings.Fields(task.Command())
|
|
log.WithFields(logrus.Fields{"call_id": task.Id(), "cmd": cmd, "len": len(cmd)}).Debug("docker command")
|
|
}
|
|
|
|
envvars := make([]string, 0, len(task.EnvVars()))
|
|
for name, val := range task.EnvVars() {
|
|
envvars = append(envvars, name+"="+val)
|
|
}
|
|
|
|
container := docker.CreateContainerOptions{
|
|
Name: task.Id(),
|
|
Config: &docker.Config{
|
|
Env: envvars,
|
|
Cmd: cmd,
|
|
Memory: int64(task.Memory()),
|
|
CPUShares: drv.conf.CPUShares,
|
|
Hostname: drv.hostname,
|
|
Image: task.Image(),
|
|
Volumes: map[string]struct{}{},
|
|
OpenStdin: true,
|
|
AttachStdin: true,
|
|
StdinOnce: true,
|
|
},
|
|
HostConfig: &docker.HostConfig{},
|
|
Context: ctx,
|
|
}
|
|
|
|
volumes := task.Volumes()
|
|
for _, mapping := range volumes {
|
|
hostDir := mapping[0]
|
|
containerDir := mapping[1]
|
|
container.Config.Volumes[containerDir] = struct{}{}
|
|
mapn := fmt.Sprintf("%s:%s", hostDir, containerDir)
|
|
container.HostConfig.Binds = append(container.HostConfig.Binds, mapn)
|
|
log.WithFields(logrus.Fields{"volumes": mapn, "call_id": task.Id()}).Debug("setting volumes")
|
|
}
|
|
|
|
if wd := task.WorkDir(); wd != "" {
|
|
log.WithFields(logrus.Fields{"wd": wd, "call_id": task.Id()}).Debug("setting work dir")
|
|
container.Config.WorkingDir = wd
|
|
}
|
|
|
|
err := drv.ensureImage(ctx, task)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
_, err = drv.docker.CreateContainer(container)
|
|
if err != nil {
|
|
// since we retry under the hood, if the container gets created and retry fails, we can just ignore error
|
|
if err != docker.ErrContainerAlreadyExists {
|
|
log.WithFields(logrus.Fields{"call_id": task.Id(), "command": container.Config.Cmd, "memory": container.Config.Memory,
|
|
"cpu_shares": container.Config.CPUShares, "hostname": container.Config.Hostname, "name": container.Name,
|
|
"image": container.Config.Image, "volumes": container.Config.Volumes, "binds": container.HostConfig.Binds, "container": container.Name,
|
|
}).WithError(err).Error("Could not create container")
|
|
|
|
// NOTE: if the container fails to create we don't really want to show to user since they aren't directly configuring the container
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// discard removal error
|
|
return &cookie{id: task.Id(), task: task, drv: drv}, nil
|
|
}
|
|
|
|
type cookie struct {
|
|
id string
|
|
task drivers.ContainerTask
|
|
drv *DockerDriver
|
|
}
|
|
|
|
func (c *cookie) Close(ctx context.Context) error {
|
|
return c.drv.removeContainer(ctx, c.id)
|
|
}
|
|
|
|
func (c *cookie) Run(ctx context.Context) (drivers.WaitResult, error) {
|
|
return c.drv.run(ctx, c.id, c.task)
|
|
}
|
|
|
|
func (drv *DockerDriver) removeContainer(ctx context.Context, container string) error {
|
|
err := drv.docker.RemoveContainer(docker.RemoveContainerOptions{
|
|
ID: container, Force: true, RemoveVolumes: true, Context: ctx})
|
|
|
|
if err != nil {
|
|
logrus.WithError(err).WithFields(logrus.Fields{"container": container}).Error("error removing container")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (drv *DockerDriver) ensureImage(ctx context.Context, task drivers.ContainerTask) error {
|
|
reg, _, _ := drivers.ParseImage(task.Image())
|
|
|
|
// ask for docker creds before looking for image, as the tasker may need to
|
|
// validate creds even if the image is downloaded.
|
|
|
|
var config docker.AuthConfiguration // default, tries docker hub w/o user/pass
|
|
|
|
// if any configured host auths match task registry, try them (task docker auth can override)
|
|
// TODO this is still a little hairy using suffix, we should probably try to parse it as a
|
|
// url and extract the host (from both the config file & image)
|
|
for _, v := range drv.auths {
|
|
if reg != "" && strings.HasSuffix(v.ServerAddress, reg) {
|
|
config = v
|
|
break
|
|
}
|
|
}
|
|
|
|
if task, ok := task.(Auther); ok {
|
|
var err error
|
|
span, _ := opentracing.StartSpanFromContext(ctx, "docker_auth")
|
|
config, err = task.DockerAuth()
|
|
span.Finish()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if reg != "" {
|
|
config.ServerAddress = reg
|
|
}
|
|
|
|
// see if we already have it, if not, pull it
|
|
_, err := drv.docker.InspectImage(ctx, task.Image())
|
|
if err == docker.ErrNoSuchImage {
|
|
err = drv.pullImage(ctx, task, config)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (drv *DockerDriver) pullImage(ctx context.Context, task drivers.ContainerTask, config docker.AuthConfiguration) error {
|
|
log := common.Logger(ctx)
|
|
reg, repo, tag := drivers.ParseImage(task.Image())
|
|
globalRepo := path.Join(reg, repo)
|
|
if reg != "" {
|
|
config.ServerAddress = reg
|
|
}
|
|
|
|
var err error
|
|
config.ServerAddress, err = registryURL(config.ServerAddress)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.WithFields(logrus.Fields{"registry": config.ServerAddress, "username": config.Username, "image": task.Image()}).Info("Pulling image")
|
|
|
|
err = drv.docker.PullImage(docker.PullImageOptions{Repository: globalRepo, Tag: tag, Context: ctx}, config)
|
|
if err != nil {
|
|
log.WithFields(logrus.Fields{"registry": config.ServerAddress, "username": config.Username, "image": task.Image()}).WithError(err).Error("Failed to pull image")
|
|
|
|
// TODO need to inspect for hub or network errors and pick; for now, assume
|
|
// 500 if not a docker error
|
|
msg := err.Error()
|
|
code := http.StatusInternalServerError
|
|
if dErr, ok := err.(*docker.Error); ok {
|
|
msg = dockerMsg(dErr)
|
|
code = dErr.Status // 401/404
|
|
}
|
|
|
|
return models.NewAPIError(code, fmt.Errorf("Failed to pull image '%s': %s", task.Image(), msg))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// removes docker err formatting: 'API Error (code) {"message":"..."}'
|
|
func dockerMsg(derr *docker.Error) string {
|
|
// derr.Message is a JSON response from docker, which has a "message" field we want to extract if possible.
|
|
// this is pretty lame, but it is what it is
|
|
var v struct {
|
|
Msg string `json:"message"`
|
|
}
|
|
|
|
err := json.Unmarshal([]byte(derr.Message), &v)
|
|
if err != nil {
|
|
// If message was not valid JSON, the raw body is still better than nothing.
|
|
return derr.Message
|
|
}
|
|
return v.Msg
|
|
}
|
|
|
|
// Run executes the docker container. If task runs, drivers.RunResult will be returned. If something fails outside the task (ie: Docker), it will return error.
|
|
// The docker driver will attempt to cast the task to a Auther. If that succeeds, private image support is available. See the Auther interface for how to implement this.
|
|
func (drv *DockerDriver) run(ctx context.Context, container string, task drivers.ContainerTask) (drivers.WaitResult, error) {
|
|
timeout := task.Timeout()
|
|
|
|
var cancel context.CancelFunc
|
|
if timeout <= 0 {
|
|
ctx, cancel = context.WithCancel(ctx)
|
|
} else {
|
|
ctx, cancel = context.WithTimeout(ctx, timeout)
|
|
}
|
|
defer cancel() // do this so that after Run exits, collect stops
|
|
go drv.collectStats(ctx, container, task)
|
|
|
|
mwOut, mwErr := task.Logger()
|
|
|
|
waiter, err := drv.docker.AttachToContainerNonBlocking(ctx, docker.AttachToContainerOptions{
|
|
Container: container, OutputStream: mwOut, ErrorStream: mwErr,
|
|
Stream: true, Logs: true, Stdout: true, Stderr: true,
|
|
Stdin: true, InputStream: task.Input()})
|
|
if err != nil && ctx.Err() == nil {
|
|
// ignore if ctx has errored, rewrite status lay below
|
|
return nil, err
|
|
}
|
|
|
|
err = drv.startTask(ctx, container)
|
|
if err != nil && ctx.Err() == nil {
|
|
// if there's just a timeout making the docker calls, drv.wait below will rewrite it to timeout
|
|
return nil, err
|
|
}
|
|
|
|
return &waitResult{
|
|
container: container,
|
|
waiter: waiter,
|
|
drv: drv,
|
|
}, nil
|
|
}
|
|
|
|
// implements drivers.WaitResult
|
|
type waitResult struct {
|
|
container string
|
|
waiter docker.CloseWaiter
|
|
drv *DockerDriver
|
|
}
|
|
|
|
func (w *waitResult) Wait(ctx context.Context) (drivers.RunResult, error) {
|
|
defer func() {
|
|
w.waiter.Close()
|
|
w.waiter.Wait() // make sure we gather all logs
|
|
}()
|
|
|
|
status, err := w.drv.wait(ctx, w.container)
|
|
return &runResult{
|
|
status: status,
|
|
err: err,
|
|
}, nil
|
|
}
|
|
|
|
func (drv *DockerDriver) collectStats(ctx context.Context, container string, task drivers.ContainerTask) {
|
|
log := common.Logger(ctx)
|
|
done := make(chan bool)
|
|
defer close(done)
|
|
dstats := make(chan *docker.Stats, 1)
|
|
go func() {
|
|
// NOTE: docker automatically streams every 1s. we can skip or avg samples if we'd like but
|
|
// the memory overhead is < 1MB for 3600 stat points so this seems fine, seems better to stream
|
|
// (internal docker api streams) than open/close stream for 1 sample over and over.
|
|
// must be called in goroutine, docker.Stats() blocks
|
|
err := drv.docker.Stats(docker.StatsOptions{
|
|
ID: container,
|
|
Stats: dstats,
|
|
Stream: true,
|
|
Done: done, // A flag that enables stopping the stats operation
|
|
})
|
|
|
|
if err != nil && err != io.ErrClosedPipe {
|
|
log.WithError(err).WithFields(logrus.Fields{"container": container, "call_id": task.Id()}).Error("error streaming docker stats for task")
|
|
}
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case ds, ok := <-dstats:
|
|
if !ok {
|
|
return
|
|
}
|
|
task.WriteStat(cherryPick(ds))
|
|
}
|
|
}
|
|
}
|
|
|
|
func cherryPick(ds *docker.Stats) drivers.Stat {
|
|
// TODO cpu % is as a % of the whole system... cpu is weird since we're sharing it
|
|
// across a bunch of containers and it scales based on how many we're sharing with,
|
|
// do we want users to see as a % of system?
|
|
systemDelta := float64(ds.CPUStats.SystemCPUUsage - ds.PreCPUStats.SystemCPUUsage)
|
|
cores := float64(len(ds.CPUStats.CPUUsage.PercpuUsage))
|
|
var cpuUser, cpuKernel, cpuTotal float64
|
|
if systemDelta > 0 {
|
|
// TODO we could leave these in docker format and let hud/viz tools do this instead of us... like net is, could do same for mem, too. thoughts?
|
|
cpuUser = (float64(ds.CPUStats.CPUUsage.UsageInUsermode-ds.PreCPUStats.CPUUsage.UsageInUsermode) / systemDelta) * cores * 100.0
|
|
cpuKernel = (float64(ds.CPUStats.CPUUsage.UsageInKernelmode-ds.PreCPUStats.CPUUsage.UsageInKernelmode) / systemDelta) * cores * 100.0
|
|
cpuTotal = (float64(ds.CPUStats.CPUUsage.TotalUsage-ds.PreCPUStats.CPUUsage.TotalUsage) / systemDelta) * cores * 100.0
|
|
}
|
|
|
|
var rx, tx float64
|
|
for _, v := range ds.Networks {
|
|
rx += float64(v.RxBytes)
|
|
tx += float64(v.TxBytes)
|
|
}
|
|
|
|
var blkRead, blkWrite uint64
|
|
for _, bioEntry := range ds.BlkioStats.IOServiceBytesRecursive {
|
|
switch strings.ToLower(bioEntry.Op) {
|
|
case "read":
|
|
blkRead = blkRead + bioEntry.Value
|
|
case "write":
|
|
blkWrite = blkWrite + bioEntry.Value
|
|
}
|
|
}
|
|
|
|
return drivers.Stat{
|
|
Timestamp: ds.Read,
|
|
Metrics: map[string]uint64{
|
|
// source: https://godoc.org/github.com/fsouza/go-dockerclient#Stats
|
|
// ex (for future expansion): {"read":"2016-08-03T18:08:05Z","pids_stats":{},"network":{},"networks":{"eth0":{"rx_bytes":508,"tx_packets":6,"rx_packets":6,"tx_bytes":508}},"memory_stats":{"stats":{"cache":16384,"pgpgout":281,"rss":8826880,"pgpgin":2440,"total_rss":8826880,"hierarchical_memory_limit":536870912,"total_pgfault":3809,"active_anon":8843264,"total_active_anon":8843264,"total_pgpgout":281,"total_cache":16384,"pgfault":3809,"total_pgpgin":2440},"max_usage":8953856,"usage":8953856,"limit":536870912},"blkio_stats":{"io_service_bytes_recursive":[{"major":202,"op":"Read"},{"major":202,"op":"Write"},{"major":202,"op":"Sync"},{"major":202,"op":"Async"},{"major":202,"op":"Total"}],"io_serviced_recursive":[{"major":202,"op":"Read"},{"major":202,"op":"Write"},{"major":202,"op":"Sync"},{"major":202,"op":"Async"},{"major":202,"op":"Total"}]},"cpu_stats":{"cpu_usage":{"percpu_usage":[47641874],"usage_in_usermode":30000000,"total_usage":47641874},"system_cpu_usage":8880800500000000,"throttling_data":{}},"precpu_stats":{"cpu_usage":{"percpu_usage":[44946186],"usage_in_usermode":30000000,"total_usage":44946186},"system_cpu_usage":8880799510000000,"throttling_data":{}}}
|
|
// mostly stolen values from docker stats cli api...
|
|
|
|
// net
|
|
"net_rx": uint64(rx),
|
|
"net_tx": uint64(tx),
|
|
// mem
|
|
"mem_limit": ds.MemoryStats.Limit,
|
|
"mem_usage": ds.MemoryStats.Usage,
|
|
// i/o
|
|
"disk_read": blkRead,
|
|
"disk_write": blkWrite,
|
|
// cpu
|
|
"cpu_user": uint64(cpuUser),
|
|
"cpu_total": uint64(cpuTotal),
|
|
"cpu_kernel": uint64(cpuKernel),
|
|
},
|
|
}
|
|
}
|
|
|
|
func (drv *DockerDriver) startTask(ctx context.Context, container string) error {
|
|
log := common.Logger(ctx)
|
|
log.WithFields(logrus.Fields{"container": container}).Debug("Starting container execution")
|
|
err := drv.docker.StartContainerWithContext(container, nil, ctx)
|
|
if err != nil {
|
|
dockerErr, ok := err.(*docker.Error)
|
|
_, containerAlreadyRunning := err.(*docker.ContainerAlreadyRunning)
|
|
if containerAlreadyRunning || (ok && dockerErr.Status == 304) {
|
|
// 304=container already started -- so we can ignore error
|
|
} else {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (drv *DockerDriver) wait(ctx context.Context, container string) (status string, err error) {
|
|
// wait retries internally until ctx is up, so we can ignore the error and
|
|
// just say it was a timeout if we have [fatal] errors talking to docker, etc.
|
|
// a more prevalent case is calling wait & container already finished, so again ignore err.
|
|
exitCode, _ := drv.docker.WaitContainerWithContext(container, ctx)
|
|
|
|
// check the context first, if it's done then exitCode is invalid iff zero
|
|
// (can't know 100% without inspecting, but that's expensive and this is a good guess)
|
|
// if exitCode is non-zero, we prefer that since it proves termination.
|
|
if exitCode == 0 {
|
|
select {
|
|
case <-ctx.Done(): // check if task was canceled or timed out
|
|
switch ctx.Err() {
|
|
case context.DeadlineExceeded:
|
|
return drivers.StatusTimeout, context.DeadlineExceeded
|
|
case context.Canceled:
|
|
return drivers.StatusCancelled, context.Canceled
|
|
}
|
|
default:
|
|
}
|
|
}
|
|
|
|
switch exitCode {
|
|
default:
|
|
return drivers.StatusError, models.NewAPIError(http.StatusBadGateway, fmt.Errorf("container exit code %d", exitCode))
|
|
case 0:
|
|
return drivers.StatusSuccess, nil
|
|
case 137: // OOM
|
|
opentracing.SpanFromContext(ctx).LogFields(log.String("docker", "oom"))
|
|
return drivers.StatusKilled, models.NewAPIError(http.StatusBadGateway, errors.New("container out of memory"))
|
|
}
|
|
}
|