mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
fn: wait for async attach with success channel (#810)
* fn: wait for async attach with success channel * fn: debug logs in test.sh * fn: circleci test output as artifact * fn: docker attach non-blocking adjustments * fn: remove retry from risky NB attach
This commit is contained in:
committed by
Reed Allman
parent
e24865f704
commit
0ef0118150
@@ -9,6 +9,7 @@ jobs:
|
|||||||
- GOVERSION=1.9.1
|
- GOVERSION=1.9.1
|
||||||
- OS=linux
|
- OS=linux
|
||||||
- ARCH=amd64
|
- ARCH=amd64
|
||||||
|
- FN_LOG_LEVEL=debug
|
||||||
steps:
|
steps:
|
||||||
- checkout
|
- checkout
|
||||||
# install Go
|
# install Go
|
||||||
|
|||||||
@@ -345,17 +345,36 @@ func dockerMsg(derr *docker.Error) string {
|
|||||||
// Run executes the docker container. If task runs, drivers.RunResult will be returned. If something fails outside the task (ie: Docker), it will return error.
|
// Run executes the docker container. If task runs, drivers.RunResult will be returned. If something fails outside the task (ie: Docker), it will return error.
|
||||||
// The docker driver will attempt to cast the task to a Auther. If that succeeds, private image support is available. See the Auther interface for how to implement this.
|
// The docker driver will attempt to cast the task to a Auther. If that succeeds, private image support is available. See the Auther interface for how to implement this.
|
||||||
func (drv *DockerDriver) run(ctx context.Context, container string, task drivers.ContainerTask) (drivers.WaitResult, error) {
|
func (drv *DockerDriver) run(ctx context.Context, container string, task drivers.ContainerTask) (drivers.WaitResult, error) {
|
||||||
|
|
||||||
|
attachSuccess := make(chan struct{})
|
||||||
mwOut, mwErr := task.Logger()
|
mwOut, mwErr := task.Logger()
|
||||||
|
|
||||||
waiter, err := drv.docker.AttachToContainerNonBlocking(ctx, docker.AttachToContainerOptions{
|
waiter, err := drv.docker.AttachToContainerNonBlocking(ctx, docker.AttachToContainerOptions{
|
||||||
Container: container, OutputStream: mwOut, ErrorStream: mwErr,
|
Success: attachSuccess,
|
||||||
Stream: true, Stdout: true, Stderr: true,
|
Container: container,
|
||||||
Stdin: true, InputStream: task.Input()})
|
OutputStream: mwOut,
|
||||||
|
ErrorStream: mwErr,
|
||||||
|
Stream: true,
|
||||||
|
Stdout: true,
|
||||||
|
Stderr: true,
|
||||||
|
Stdin: true,
|
||||||
|
InputStream: task.Input()})
|
||||||
|
|
||||||
if err != nil && ctx.Err() == nil {
|
if err != nil && ctx.Err() == nil {
|
||||||
// ignore if ctx has errored, rewrite status lay below
|
// ignore if ctx has errored, rewrite status lay below
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Sync up with NB Attacher above before starting the task
|
||||||
|
if err == nil {
|
||||||
|
// WARNING: the I/O below requires docker hijack function to honor
|
||||||
|
// the contract below, specifically if an error is not returned
|
||||||
|
// from AttachToContainerNonBlocking, then max blocking time
|
||||||
|
// here should be what drv.docker dialer/client config was set to.
|
||||||
|
<-attachSuccess
|
||||||
|
attachSuccess <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
// we want to stop trying to collect stats when the container exits
|
// we want to stop trying to collect stats when the container exits
|
||||||
// collectStats will stop when stopSignal is closed or ctx is cancelled
|
// collectStats will stop when stopSignal is closed or ctx is cancelled
|
||||||
stopSignal := make(chan struct{})
|
stopSignal := make(chan struct{})
|
||||||
|
|||||||
@@ -290,22 +290,10 @@ func (d *dockerWrap) Info(ctx context.Context) (info *docker.DockerInfo, err err
|
|||||||
return d.docker.Info()
|
return d.docker.Info()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *dockerWrap) AttachToContainerNonBlocking(ctx context.Context, opts docker.AttachToContainerOptions) (w docker.CloseWaiter, err error) {
|
func (d *dockerWrap) AttachToContainerNonBlocking(ctx context.Context, opts docker.AttachToContainerOptions) (docker.CloseWaiter, error) {
|
||||||
ctx, span := trace.StartSpan(ctx, "docker_attach_container")
|
ctx, span := trace.StartSpan(ctx, "docker_attach_container")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
return d.docker.AttachToContainerNonBlocking(opts)
|
||||||
logger := common.Logger(ctx).WithField("docker_cmd", "AttachContainer")
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, retryTimeout)
|
|
||||||
defer cancel()
|
|
||||||
err = d.retry(ctx, logger, func() error {
|
|
||||||
w, err = d.docker.AttachToContainerNonBlocking(opts)
|
|
||||||
if err != nil {
|
|
||||||
// always retry if attach errors, task is running, we want logs!
|
|
||||||
err = temp(err)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
})
|
|
||||||
return w, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *dockerWrap) WaitContainerWithContext(id string, ctx context.Context) (code int, err error) {
|
func (d *dockerWrap) WaitContainerWithContext(id string, ctx context.Context) (code int, err error) {
|
||||||
|
|||||||
Reference in New Issue
Block a user