Merge branch 'use-ctx-logger' into 'master'

Using ctx logger in more places to get more context in the logs - ie: call_id

See merge request !102
This commit is contained in:
Reed Allman
2017-07-10 16:13:51 -07:00
3 changed files with 34 additions and 26 deletions

View File

@@ -107,17 +107,18 @@ func NewDocker(env *common.Environment, conf drivers.Config) *DockerDriver {
// rkt w/o a docker driver configured; also, we don't have to tote around a
// driver in any tasker that may be interested in registry information (2/2
// cases thus far).
func CheckRegistry(image string, config docker.AuthConfiguration) (Sizer, error) {
func CheckRegistry(ctx context.Context, image string, config docker.AuthConfiguration) (Sizer, error) {
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "CheckRegistry"})
registry, repo, tag := drivers.ParseImage(image)
reg, err := registryForConfig(config, registry)
reg, err := registryForConfig(ctx, config, registry)
if err != nil {
return nil, err
}
mani, err := reg.Manifest(repo, tag)
if err != nil {
logrus.WithFields(logrus.Fields{"username": config.Username, "server": config.ServerAddress, "image": image}).WithError(err).Error("Credentials not authorized, trying next.")
log.WithFields(logrus.Fields{"username": config.Username, "server": config.ServerAddress, "image": image}).WithError(err).Error("Credentials not authorized, trying next.")
//if !isAuthError(err) {
// // TODO we might retry this, since if this was the registry that was supposed to
// // auth the task will erroneously be set to 'error'
@@ -153,7 +154,8 @@ func (s *sizer) Size() (int64, error) {
return sum, nil
}
func registryURL(addr string) (string, error) {
func registryURL(ctx context.Context, addr string) (string, error) {
log := common.Logger(ctx)
if addr == "" || strings.Contains(addr, "hub.docker.com") || strings.Contains(addr, "index.docker.io") {
return hubURL, nil
}
@@ -163,7 +165,7 @@ func registryURL(addr string) (string, error) {
// TODO we could error the task out from this with a user error but since
// we have a list of auths to check, just return the error so as to be
// skipped... horrible api as it is
logrus.WithFields(logrus.Fields{"auth_addr": addr}).WithError(err).Error("error parsing server address url, skipping")
log.WithFields(logrus.Fields{"auth_addr": addr}).WithError(err).Error("error parsing server address url, skipping")
return "", err
}
@@ -189,13 +191,13 @@ func isAuthError(err error) bool {
return false
}
func registryForConfig(config docker.AuthConfiguration, reg string) (*registry.Registry, error) {
func registryForConfig(ctx context.Context, config docker.AuthConfiguration, reg string) (*registry.Registry, error) {
if reg == "" {
reg = config.ServerAddress
}
var err error
config.ServerAddress, err = registryURL(reg)
config.ServerAddress, err = registryURL(ctx, reg)
if err != nil {
return nil, err
}
@@ -213,12 +215,13 @@ func registryForConfig(config docker.AuthConfiguration, reg string) (*registry.R
}
func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask) (drivers.Cookie, error) {
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "Prepare"})
var cmd []string
if task.Command() != "" {
// NOTE: this is hyper-sensitive and may not be correct like this even, but it passes old tests
// task.Command() in swapi is always "sh /mnt/task/.runtask" so fields is safe
cmd = strings.Fields(task.Command())
logrus.WithFields(logrus.Fields{"call_id": task.Id(), "cmd": cmd, "len": len(cmd)}).Debug("docker command")
log.WithFields(logrus.Fields{"call_id": task.Id(), "cmd": cmd, "len": len(cmd)}).Debug("docker command")
}
envvars := make([]string, 0, len(task.EnvVars()))
@@ -253,11 +256,11 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask
container.Config.Volumes[containerDir] = struct{}{}
mapn := fmt.Sprintf("%s:%s", hostDir, containerDir)
container.HostConfig.Binds = append(container.HostConfig.Binds, mapn)
logrus.WithFields(logrus.Fields{"volumes": mapn, "call_id": task.Id()}).Debug("setting volumes")
log.WithFields(logrus.Fields{"volumes": mapn, "call_id": task.Id()}).Debug("setting volumes")
}
if wd := task.WorkDir(); wd != "" {
logrus.WithFields(logrus.Fields{"wd": wd, "call_id": task.Id()}).Debug("setting work dir")
log.WithFields(logrus.Fields{"wd": wd, "call_id": task.Id()}).Debug("setting work dir")
container.Config.WorkingDir = wd
}
@@ -272,7 +275,7 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask
if err != nil {
// since we retry under the hood, if the container gets created and retry fails, we can just ignore error
if err != docker.ErrContainerAlreadyExists {
logrus.WithFields(logrus.Fields{"call_id": task.Id(), "command": container.Config.Cmd, "memory": container.Config.Memory,
log.WithFields(logrus.Fields{"call_id": task.Id(), "command": container.Config.Cmd, "memory": container.Config.Memory,
"cpu_shares": container.Config.CPUShares, "hostname": container.Config.Hostname, "name": container.Name,
"image": container.Config.Image, "volumes": container.Config.Volumes, "binds": container.HostConfig.Binds, "container": containerName,
}).WithError(err).Error("Could not create container")
@@ -356,7 +359,7 @@ func (drv *DockerDriver) pullImage(ctx context.Context, task drivers.ContainerTa
}
var err error
config.ServerAddress, err = registryURL(config.ServerAddress)
config.ServerAddress, err = registryURL(ctx, config.ServerAddress)
if err != nil {
return err
}
@@ -462,6 +465,7 @@ func (drv *DockerDriver) cancel(container string) {
}
func (drv *DockerDriver) collectStats(ctx context.Context, container string, task drivers.ContainerTask) {
log := common.Logger(ctx)
done := make(chan bool)
defer close(done)
dstats := make(chan *docker.Stats, 1)
@@ -478,7 +482,7 @@ func (drv *DockerDriver) collectStats(ctx context.Context, container string, tas
})
if err != nil && err != io.ErrClosedPipe {
logrus.WithError(err).WithFields(logrus.Fields{"container": container, "call_id": task.Id()}).Error("error streaming docker stats for task")
log.WithError(err).WithFields(logrus.Fields{"container": container, "call_id": task.Id()}).Error("error streaming docker stats for task")
}
}()

View File

@@ -97,19 +97,20 @@ type dockerWrap struct {
}
func (d *dockerWrap) retry(ctx context.Context, f func() error) error {
log := common.Logger(ctx)
var b common.Backoff
for {
select {
case <-ctx.Done():
d.Inc("task", "fail.docker", 1, 1)
logrus.WithError(ctx.Err()).Warnf("retrying on docker errors timed out, restart docker or rotate this instance?")
log.WithError(ctx.Err()).Warnf("retrying on docker errors timed out, restart docker or rotate this instance?")
return ctx.Err()
default:
}
err := filter(f())
err := filter(ctx, f())
if common.IsTemporary(err) || isDocker50x(err) {
logrus.WithError(err).Warn("docker temporary error, retrying")
log.WithError(err).Warn("docker temporary error, retrying")
b.Sleep()
d.Inc("task", "error.docker", 1, 1)
continue
@@ -156,7 +157,8 @@ func temp(err error) error {
}
// some 500s are totally cool
func filter(err error) error {
func filter(ctx context.Context, err error) error {
log := common.Logger(ctx)
// "API error (500): {\"message\":\"service endpoint with name task-57d722ecdecb9e7be16aff17 already exists\"}\n" -> ok since container exists
switch {
default:
@@ -165,24 +167,26 @@ func filter(err error) error {
return err
case strings.Contains(err.Error(), "service endpoint with name"):
}
logrus.WithError(err).Warn("filtering error")
log.WithError(err).Warn("filtering error")
return nil
}
func filterNoSuchContainer(err error) error {
func filterNoSuchContainer(ctx context.Context, err error) error {
log := common.Logger(ctx)
if err == nil {
return nil
}
_, containerNotFound := err.(*docker.NoSuchContainer)
dockerErr, ok := err.(*docker.Error)
if containerNotFound || (ok && dockerErr.Status == 404) {
logrus.WithError(err).Error("filtering error")
log.WithError(err).Error("filtering error")
return nil
}
return err
}
func filterNotRunning(err error) error {
func filterNotRunning(ctx context.Context, err error) error {
log := common.Logger(ctx)
if err == nil {
return nil
}
@@ -190,7 +194,7 @@ func filterNotRunning(err error) error {
_, containerNotRunning := err.(*docker.ContainerNotRunning)
dockerErr, ok := err.(*docker.Error)
if containerNotRunning || (ok && dockerErr.Status == 304) {
logrus.WithError(err).Error("filtering error")
log.WithError(err).Error("filtering error")
return nil
}
@@ -216,7 +220,7 @@ func (d *dockerWrap) WaitContainerWithContext(id string, ctx context.Context) (c
code, err = d.dockerNoTimeout.WaitContainerWithContext(id, ctx)
return err
})
return code, filterNoSuchContainer(err)
return code, filterNoSuchContainer(ctx, err)
}
func (d *dockerWrap) StartContainerWithContext(id string, hostConfig *docker.HostConfig, ctx context.Context) (err error) {
@@ -254,7 +258,7 @@ func (d *dockerWrap) RemoveContainer(opts docker.RemoveContainerOptions) (err er
err = d.docker.RemoveContainer(opts)
return err
})
return filterNoSuchContainer(err)
return filterNoSuchContainer(ctx, err)
}
func (d *dockerWrap) InspectImage(name string) (i *docker.Image, err error) {
@@ -284,7 +288,7 @@ func (d *dockerWrap) StopContainer(id string, timeout uint) (err error) {
err = d.docker.StopContainer(id, timeout)
return err
})
return filterNotRunning(filterNoSuchContainer(err))
return filterNotRunning(ctx, filterNoSuchContainer(ctx, err))
}
func (d *dockerWrap) Stats(opts docker.StatsOptions) (err error) {

View File

@@ -250,7 +250,7 @@ func (r Runner) EnsureImageExists(ctx context.Context, cfg *task.Config) error {
return err
}
_, err = docker.CheckRegistry(ctask.Image(), auth)
_, err = docker.CheckRegistry(ctx, ctask.Image(), auth)
return err
}