mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Using ctx logger in more places to get more context in the logs - ie: call_id
This commit is contained in:
committed by
Reed Allman
parent
6217ef7d35
commit
e56ac42bc2
@@ -107,17 +107,18 @@ func NewDocker(env *common.Environment, conf drivers.Config) *DockerDriver {
|
||||
// rkt w/o a docker driver configured; also, we don't have to tote around a
|
||||
// driver in any tasker that may be interested in registry information (2/2
|
||||
// cases thus far).
|
||||
func CheckRegistry(image string, config docker.AuthConfiguration) (Sizer, error) {
|
||||
func CheckRegistry(ctx context.Context, image string, config docker.AuthConfiguration) (Sizer, error) {
|
||||
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "CheckRegistry"})
|
||||
registry, repo, tag := drivers.ParseImage(image)
|
||||
|
||||
reg, err := registryForConfig(config, registry)
|
||||
reg, err := registryForConfig(ctx, config, registry)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mani, err := reg.Manifest(repo, tag)
|
||||
if err != nil {
|
||||
logrus.WithFields(logrus.Fields{"username": config.Username, "server": config.ServerAddress, "image": image}).WithError(err).Error("Credentials not authorized, trying next.")
|
||||
log.WithFields(logrus.Fields{"username": config.Username, "server": config.ServerAddress, "image": image}).WithError(err).Error("Credentials not authorized, trying next.")
|
||||
//if !isAuthError(err) {
|
||||
// // TODO we might retry this, since if this was the registry that was supposed to
|
||||
// // auth the task will erroneously be set to 'error'
|
||||
@@ -153,7 +154,8 @@ func (s *sizer) Size() (int64, error) {
|
||||
return sum, nil
|
||||
}
|
||||
|
||||
func registryURL(addr string) (string, error) {
|
||||
func registryURL(ctx context.Context, addr string) (string, error) {
|
||||
log := common.Logger(ctx)
|
||||
if addr == "" || strings.Contains(addr, "hub.docker.com") || strings.Contains(addr, "index.docker.io") {
|
||||
return hubURL, nil
|
||||
}
|
||||
@@ -163,7 +165,7 @@ func registryURL(addr string) (string, error) {
|
||||
// TODO we could error the task out from this with a user error but since
|
||||
// we have a list of auths to check, just return the error so as to be
|
||||
// skipped... horrible api as it is
|
||||
logrus.WithFields(logrus.Fields{"auth_addr": addr}).WithError(err).Error("error parsing server address url, skipping")
|
||||
log.WithFields(logrus.Fields{"auth_addr": addr}).WithError(err).Error("error parsing server address url, skipping")
|
||||
return "", err
|
||||
}
|
||||
|
||||
@@ -189,13 +191,13 @@ func isAuthError(err error) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func registryForConfig(config docker.AuthConfiguration, reg string) (*registry.Registry, error) {
|
||||
func registryForConfig(ctx context.Context, config docker.AuthConfiguration, reg string) (*registry.Registry, error) {
|
||||
if reg == "" {
|
||||
reg = config.ServerAddress
|
||||
}
|
||||
|
||||
var err error
|
||||
config.ServerAddress, err = registryURL(reg)
|
||||
config.ServerAddress, err = registryURL(ctx, reg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -213,12 +215,13 @@ func registryForConfig(config docker.AuthConfiguration, reg string) (*registry.R
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask) (drivers.Cookie, error) {
|
||||
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "Prepare"})
|
||||
var cmd []string
|
||||
if task.Command() != "" {
|
||||
// NOTE: this is hyper-sensitive and may not be correct like this even, but it passes old tests
|
||||
// task.Command() in swapi is always "sh /mnt/task/.runtask" so fields is safe
|
||||
cmd = strings.Fields(task.Command())
|
||||
logrus.WithFields(logrus.Fields{"call_id": task.Id(), "cmd": cmd, "len": len(cmd)}).Debug("docker command")
|
||||
log.WithFields(logrus.Fields{"call_id": task.Id(), "cmd": cmd, "len": len(cmd)}).Debug("docker command")
|
||||
}
|
||||
|
||||
envvars := make([]string, 0, len(task.EnvVars()))
|
||||
@@ -253,11 +256,11 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask
|
||||
container.Config.Volumes[containerDir] = struct{}{}
|
||||
mapn := fmt.Sprintf("%s:%s", hostDir, containerDir)
|
||||
container.HostConfig.Binds = append(container.HostConfig.Binds, mapn)
|
||||
logrus.WithFields(logrus.Fields{"volumes": mapn, "call_id": task.Id()}).Debug("setting volumes")
|
||||
log.WithFields(logrus.Fields{"volumes": mapn, "call_id": task.Id()}).Debug("setting volumes")
|
||||
}
|
||||
|
||||
if wd := task.WorkDir(); wd != "" {
|
||||
logrus.WithFields(logrus.Fields{"wd": wd, "call_id": task.Id()}).Debug("setting work dir")
|
||||
log.WithFields(logrus.Fields{"wd": wd, "call_id": task.Id()}).Debug("setting work dir")
|
||||
container.Config.WorkingDir = wd
|
||||
}
|
||||
|
||||
@@ -272,7 +275,7 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask
|
||||
if err != nil {
|
||||
// since we retry under the hood, if the container gets created and retry fails, we can just ignore error
|
||||
if err != docker.ErrContainerAlreadyExists {
|
||||
logrus.WithFields(logrus.Fields{"call_id": task.Id(), "command": container.Config.Cmd, "memory": container.Config.Memory,
|
||||
log.WithFields(logrus.Fields{"call_id": task.Id(), "command": container.Config.Cmd, "memory": container.Config.Memory,
|
||||
"cpu_shares": container.Config.CPUShares, "hostname": container.Config.Hostname, "name": container.Name,
|
||||
"image": container.Config.Image, "volumes": container.Config.Volumes, "binds": container.HostConfig.Binds, "container": containerName,
|
||||
}).WithError(err).Error("Could not create container")
|
||||
@@ -356,7 +359,7 @@ func (drv *DockerDriver) pullImage(ctx context.Context, task drivers.ContainerTa
|
||||
}
|
||||
|
||||
var err error
|
||||
config.ServerAddress, err = registryURL(config.ServerAddress)
|
||||
config.ServerAddress, err = registryURL(ctx, config.ServerAddress)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -462,6 +465,7 @@ func (drv *DockerDriver) cancel(container string) {
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) collectStats(ctx context.Context, container string, task drivers.ContainerTask) {
|
||||
log := common.Logger(ctx)
|
||||
done := make(chan bool)
|
||||
defer close(done)
|
||||
dstats := make(chan *docker.Stats, 1)
|
||||
@@ -478,7 +482,7 @@ func (drv *DockerDriver) collectStats(ctx context.Context, container string, tas
|
||||
})
|
||||
|
||||
if err != nil && err != io.ErrClosedPipe {
|
||||
logrus.WithError(err).WithFields(logrus.Fields{"container": container, "call_id": task.Id()}).Error("error streaming docker stats for task")
|
||||
log.WithError(err).WithFields(logrus.Fields{"container": container, "call_id": task.Id()}).Error("error streaming docker stats for task")
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
@@ -97,19 +97,20 @@ type dockerWrap struct {
|
||||
}
|
||||
|
||||
func (d *dockerWrap) retry(ctx context.Context, f func() error) error {
|
||||
log := common.Logger(ctx)
|
||||
var b common.Backoff
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
d.Inc("task", "fail.docker", 1, 1)
|
||||
logrus.WithError(ctx.Err()).Warnf("retrying on docker errors timed out, restart docker or rotate this instance?")
|
||||
log.WithError(ctx.Err()).Warnf("retrying on docker errors timed out, restart docker or rotate this instance?")
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
err := filter(f())
|
||||
err := filter(ctx, f())
|
||||
if common.IsTemporary(err) || isDocker50x(err) {
|
||||
logrus.WithError(err).Warn("docker temporary error, retrying")
|
||||
log.WithError(err).Warn("docker temporary error, retrying")
|
||||
b.Sleep()
|
||||
d.Inc("task", "error.docker", 1, 1)
|
||||
continue
|
||||
@@ -156,7 +157,8 @@ func temp(err error) error {
|
||||
}
|
||||
|
||||
// some 500s are totally cool
|
||||
func filter(err error) error {
|
||||
func filter(ctx context.Context, err error) error {
|
||||
log := common.Logger(ctx)
|
||||
// "API error (500): {\"message\":\"service endpoint with name task-57d722ecdecb9e7be16aff17 already exists\"}\n" -> ok since container exists
|
||||
switch {
|
||||
default:
|
||||
@@ -165,24 +167,26 @@ func filter(err error) error {
|
||||
return err
|
||||
case strings.Contains(err.Error(), "service endpoint with name"):
|
||||
}
|
||||
logrus.WithError(err).Warn("filtering error")
|
||||
log.WithError(err).Warn("filtering error")
|
||||
return nil
|
||||
}
|
||||
|
||||
func filterNoSuchContainer(err error) error {
|
||||
func filterNoSuchContainer(ctx context.Context, err error) error {
|
||||
log := common.Logger(ctx)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
_, containerNotFound := err.(*docker.NoSuchContainer)
|
||||
dockerErr, ok := err.(*docker.Error)
|
||||
if containerNotFound || (ok && dockerErr.Status == 404) {
|
||||
logrus.WithError(err).Error("filtering error")
|
||||
log.WithError(err).Error("filtering error")
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func filterNotRunning(err error) error {
|
||||
func filterNotRunning(ctx context.Context, err error) error {
|
||||
log := common.Logger(ctx)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
@@ -190,7 +194,7 @@ func filterNotRunning(err error) error {
|
||||
_, containerNotRunning := err.(*docker.ContainerNotRunning)
|
||||
dockerErr, ok := err.(*docker.Error)
|
||||
if containerNotRunning || (ok && dockerErr.Status == 304) {
|
||||
logrus.WithError(err).Error("filtering error")
|
||||
log.WithError(err).Error("filtering error")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -216,7 +220,7 @@ func (d *dockerWrap) WaitContainerWithContext(id string, ctx context.Context) (c
|
||||
code, err = d.dockerNoTimeout.WaitContainerWithContext(id, ctx)
|
||||
return err
|
||||
})
|
||||
return code, filterNoSuchContainer(err)
|
||||
return code, filterNoSuchContainer(ctx, err)
|
||||
}
|
||||
|
||||
func (d *dockerWrap) StartContainerWithContext(id string, hostConfig *docker.HostConfig, ctx context.Context) (err error) {
|
||||
@@ -254,7 +258,7 @@ func (d *dockerWrap) RemoveContainer(opts docker.RemoveContainerOptions) (err er
|
||||
err = d.docker.RemoveContainer(opts)
|
||||
return err
|
||||
})
|
||||
return filterNoSuchContainer(err)
|
||||
return filterNoSuchContainer(ctx, err)
|
||||
}
|
||||
|
||||
func (d *dockerWrap) InspectImage(name string) (i *docker.Image, err error) {
|
||||
@@ -284,7 +288,7 @@ func (d *dockerWrap) StopContainer(id string, timeout uint) (err error) {
|
||||
err = d.docker.StopContainer(id, timeout)
|
||||
return err
|
||||
})
|
||||
return filterNotRunning(filterNoSuchContainer(err))
|
||||
return filterNotRunning(ctx, filterNoSuchContainer(ctx, err))
|
||||
}
|
||||
|
||||
func (d *dockerWrap) Stats(opts docker.StatsOptions) (err error) {
|
||||
|
||||
@@ -250,7 +250,7 @@ func (r Runner) EnsureImageExists(ctx context.Context, cfg *task.Config) error {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = docker.CheckRegistry(ctask.Image(), auth)
|
||||
_, err = docker.CheckRegistry(ctx, ctask.Image(), auth)
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user